An error occurred while loading the file. Please try again.
mQTTClientHandler.py 3.62 KiB
import json
from utils.loggingFactory import LoggerFactory
from datetime import datetime
import paho.mqtt.client as mqtt
from stream_processing import jsonhandler
from utils.influx import InfluxDBHelper
import os
class MQTTClientHandler:
    MAPPING_FILE_NAME = os.path.join(
        os.path.dirname(os.path.abspath(__file__)),
        "mac_to_room.json"
    MEASUREMENT_NAME = "sensor_data"
    TAG_ROOM = "room"
    TAG_MAC = "mac"
    FIELD_CO2 = "co2"
    FIELD_TEMP = "temperature"
    FIELD_HUMIDITY = "humidity"
    # Konstruktor
    def __init__(
        self, broker_url: str, topic: str, influx_writer: InfluxDBHelper
        self.logger = LoggerFactory.get_logger(__name__)
        # key: mac : value : room
        self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
        self.broker_url = broker_url
        self.topic = topic
        self.influx_writer = influx_writer
        self.client = mqtt.Client()
        # Methoden werden hier Events zugeteilt
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
    def on_connect(self, client, userdata, flags, rc):
        self.logger.info("Connected with result code " + str(rc))
        client.subscribe(self.topic)
        self.logger.info("Subscribed to " + self.topic)
        print("Connected with result code " + str(rc) + "\n" + "Subscribed to " + self.topic)
    # eventuell refactorn und die Aufgaben in Methoden aufteilen
    def on_message(self, client, userdata, msg):
        """
        Wenn das Topic eine Nachricht bekommt wird diese Methode ausgeführt
        self: ist die MQTTClientHandler instanz, die wird gebraucht um die Einträge in
        die InfluxDB zu schreiben
        """
        msg = json.loads(msg.payload)
        metadate = msg["metadata"]
        # hier prüfen, ob die Mac-Adresse einen Raum hat,
        # wenn nicht trage es in mac_to_room leer ein
        # "aa:bb:cc:dd:ee:ff" : ""
        mac = metadate["mac-address"]
        if mac not in self.mac_to_room:
            self.logger.warning(
                f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt."
            print(f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt.")
            self.mac_to_room[mac] = ""  # leerer Platzhalter
            jsonhandler.write_json(self.mac_to_room, self.MAPPING_FILE_NAME)
            self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
            return
self.write_to_influxDB(msg, metadate) def write_to_influxDB(self, msg: dict, metadate: dict): try: self.influx_writer.write_point( measurement=self.MEASUREMENT_NAME, tags={ self.TAG_ROOM: self.mac_to_room[metadate["mac-address"]], self.TAG_MAC: metadate["mac-address"], }, fields={ self.FIELD_CO2: msg["co2"], self.FIELD_TEMP: msg["temp"], self.FIELD_HUMIDITY: msg["rh"], }, timestamp=metadate["time"], # fix ) self.logger.info( f"Wrote to InfluxDB: {msg}" ) print(f"Token: {self.influx_writer.get_token()}") print(f"Url: {self.influx_writer.get_url()}") print(f"Wrote to InfluxDB: {msg}") print(f"Ping: {self.influx_writer.ping()}") except Exception as e: print(f"Failed writing to InfluxDb: {e}") self.logger.error(f"Failed writing to InfluxDb: {e}") def start(self): self.client.connect(self.broker_url) self.client.loop_forever()