An error occurred while loading the file. Please try again.
MQTTClientHandler.py 1.34 KiB
import json
from datetime import datetime
import paho.mqtt.client as mqtt
from mqtt_influx_backend import InfluxDBWriter
class MQTTClientHandler:
    def __init__(self, broker_url: str, topic: str, influx_writer: InfluxDBWriter):
        self.broker_url = broker_url
        self.topic = topic
        self.influx_writer = influx_writer
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
    def on_connect(self, client, userdata, flags, rc):
        print("Connected with result code " + str(rc))
        client.subscribe(self.topic)
    def on_message(self, client, userdata, msg):
        utfMgs = msg.payload.decode("utf-8")
        #dictionary = json.loads(utfMgs)
        print(utfMgs)
        print(utfMgs)
        print(utfMgs)
        data = utfMgs
        try:
            value = float(data.get("co2"))
            self.influx_writer.write_point(
                measurement="temperature",
                tags={"topic": msg.topic},
                fields={"value": value},
                timestamp=datetime.utcnow()
            print("Wrote to InfluxDB:", value)
        except Exception as e:
            print("Error processing message:", e)
    def start(self):
        self.client.connect(self.broker_url)
        self.client.loop_forever()