An error occurred while loading the file. Please try again.
mQTTClientHandler.py 3.36 KiB
import json
from utils.loggingFactory import LoggerFactory
from datetime import datetime
import paho.mqtt.client as mqtt
import jsonhandler
from utils.influx import InfluxDBHelper
import os
class MQTTClientHandler:
    MAPPING_FILE_NAME = os.path.join(
        os.path.dirname(os.path.abspath(__file__)),
        "mac_to_room.json"
    MEASUREMENT_NAME = "sensor_data"
    TAG_ROOM = "room"
    TAG_MAC = "mac"
    FIELD_CO2 = "co2"
    FIELD_TEMP = "temperature"
    FIELD_HUMIDITY = "humidity"
    # Konstruktor
    def __init__(
        self, broker_url: str, topic: str, influx_writer: InfluxDBHelper
        print("DAS IST EIN TEST")
        self.logger = LoggerFactory.get_logger(__name__)
        # key: mac : value : room
        self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
        self.broker_url = broker_url
        self.topic = topic
        self.influx_writer = influx_writer
        self.client = mqtt.Client()
        # Methoden werden hier Events zugeteilt
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
    def on_connect(self, client, userdata, flags, rc):
        self.logger.info("Connected with result code " + str(rc))
        print("Connected")
        client.subscribe(self.topic)
        self.logger.info("Subscribed to " + self.topic)
    # eventuell refactorn und die Aufgaben in Methoden aufteilen
    def on_message(self, client, userdata, msg):
        """
        Wenn das Topic eine Nachricht bekommt wird diese Methode ausgeführt
        self: ist die MQTTClientHandler instanz, die wird gebraucht um die Einträge in
        die InfluxDB zu schreiben
        """
        print("Message")
        msg = json.loads(msg.payload)
        metadate = msg["metadata"]
        # hier prüfen, ob die Mac-Adresse einen Raum hat,
        # wenn nicht trage es in mac_to_room leer ein
        # "aa:bb:cc:dd:ee:ff" : ""
        mac = metadate["mac-address"]
        if mac not in self.mac_to_room:
            self.logger.warning(
                f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt."
            print("MAC war nicht in File")
            self.mac_to_room[mac] = ""  # leerer Platzhalter
jsonhandler.write_json(self.mac_to_room, self.MAPPING_FILE_NAME) self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME) print("keine Ahnung") return self.write_to_influxDB(msg, metadate) def write_to_influxDB(self, msg: dict, metadate: dict): try: print(msg) self.influx_writer.write_point( measurement=self.MEASUREMENT_NAME, tags={ self.TAG_ROOM: self.mac_to_room[metadate["mac-address"]], self.TAG_MAC: metadate["mac-address"], }, fields={ self.FIELD_CO2: msg["co2"], self.FIELD_TEMP: msg["temp"], self.FIELD_HUMIDITY: msg["rh"], }, timestamp=metadate["time"], # fix ) self.logger.info( f"Wrote to InfluxDB: {msg}" ) except Exception as e: self.logger.error(f"Failed writing to InfluxDb: {e}") def start(self): self.client.connect(self.broker_url) self.client.loop_forever()