An error occurred while loading the file. Please try again.
MQTTClientHandler.py 1.35 KiB
import json
from datetime import datetime
import paho.mqtt.client as mqtt
from mqtt_influx_backend import InfluxDBWriter
class MQTTClientHandler:
    def __init__(self, broker_url: str, topic: str, influx_writer: InfluxDBWriter):
        self.broker_url = broker_url
        self.topic = topic
        self.influx_writer = influx_writer
        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
    def on_connect(self, client, userdata, flags, rc):
        print("Connected with result code " + str(rc))
        client.subscribe(self.topic)
    def on_message(self, client, userdata, msg):
        msg = json.loads(msg.payload)
        print(msg)
        metadate = msg["metadata"]
        print(metadate["mac"])
        try:
            value = float(msg.get("co2"))
            self.influx_writer.write_point(
                measurement="sensor_data",
                tags={"time": metadate["mac"]},
                tags={"mac": metadate["mac"]},
                fields={"value": value},
                timestamp=metadate["timestamp"]
            print("Wrote to InfluxDB:", value)
        except Exception as e:
            print("Error processing message:", e)
    def start(self):
        self.client.connect(self.broker_url)
        self.client.loop_forever()