An error occurred while loading the file. Please try again.
mQTTClientHandler.py 3.13 KiB
import json
from src.mqtt_influx_backend.loggingFactory import LoggerFactory
from datetime import datetime
import paho.mqtt.client as mqtt
from src.mqtt_influx_backend import jsonhandler
from src.mqtt_influx_backend import influxDBWriter
class MQTTClientHandler:
    MAPPING_FILE_NAME = "src/mqtt_influx_backend/mac_to_room.json"
    MEASUREMENT_NAME = "sensor_data"
    TAG_ROOM = "room"
    TAG_MAC = "mac"
    FIELD_CO2 = "co2"
    FIELD_TEMP = "temperature"
    FIELD_HUMIDITY = "humidity"
    # Konstruktor
    def __init__(self, broker_url: str, topic: str, influx_writer: influxDBWriter):
        self.logger = LoggerFactory.get_logger(__name__)
        # key: mac : value : room
        self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
        self.broker_url = broker_url
        self.topic = topic
        self.influx_writer = influx_writer
        self.client = mqtt.Client()
        # Methoden werden hier Events zugeteilt 
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
    def on_connect(self, client, userdata, flags, rc):
        self.logger.info("Connected with result code " + str(rc))
        client.subscribe(self.topic)
        self.logger.info("Subscribed to " + self.topic )
    # eventuell refactorn und die Aufgaben in Methoden aufteilen
    def on_message(self, client, userdata, msg):
        """
        Wenn das Topic eine Nachricht bekommt wird diese Methode ausgeführt
        self: ist die MQTTClientHandler instanz, die wird gebraucht um die Einträge in 
        die InfluxDB zu schreiben
        """
        msg = json.loads(msg.payload)
        metadate = msg["metadata"]
        # hier prüfen, ob die Mac-Adresse einen Raum hat,
        # wenn nicht trage es in mac_to_room leer ein
        # "aa:bb:cc:dd:ee:ff" : ""
        mac = metadate["mac-address"]
        if mac not in self.mac_to_room:
            self.logger.warning(f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt.")
            self.mac_to_room[mac] = ""  # leerer Platzhalter
            jsonhandler.write_json(self.mac_to_room, self.MAPPING_FILE_NAME)
            self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
            return
        self.write_to_influxDB(msg,metadate)
    def write_to_influxDB(self, msg : dict, metadate: dict):
        try:
            self.influx_writer.write_point(
                measurement=self.MEASUREMENT_NAME,
                tags={
                    self.TAG_ROOM : self.mac_to_room[metadate["mac-address"]],
                    self.TAG_MAC: metadate["mac-address"]
                fields={
self.FIELD_CO2: msg["co2"], self.FIELD_TEMP: msg["temp"], self.FIELD_HUMIDITY: msg["rh"], }, timestamp=metadate["time"], #fix ) print("Wrote to InfluxDB:", msg) # muss später rausgeschmiessen werden except Exception as e: self.logger.error(f"Failed writing to InfluxDb: {e}") def start(self): self.client.connect(self.broker_url) self.client.loop_forever()