An error occurred while loading the file. Please try again.
-
Gezer authored9fc93f71
import json
from utils.loggingFactory import LoggerFactory
from datetime import datetime
import paho.mqtt.client as mqtt
from stream_processing import jsonhandler
from utils.influx import InfluxDBHelper
import os
class MQTTClientHandler:
MAPPING_FILE_NAME = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"mac_to_room.json"
)
MEASUREMENT_NAME = "sensor_data"
TAG_ROOM = "room"
TAG_MAC = "mac"
FIELD_CO2 = "co2"
FIELD_TEMP = "temperature"
FIELD_HUMIDITY = "humidity"
# Konstruktor
def __init__(
self, broker_url: str, topic: str, influx_writer: InfluxDBHelper
):
self.logger = LoggerFactory.get_logger(__name__)
# key: mac : value : room
self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
self.broker_url = broker_url
self.topic = topic
self.influx_writer = influx_writer
self.client = mqtt.Client()
# Methoden werden hier Events zugeteilt
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
def on_connect(self, client, userdata, flags, rc):
self.logger.info("Connected with result code " + str(rc))
client.subscribe(self.topic)
self.logger.info("Subscribed to " + self.topic)
print("Connected with result code " + str(rc) + "\n" + "Subscribed to " + self.topic)
# eventuell refactorn und die Aufgaben in Methoden aufteilen
def on_message(self, client, userdata, msg):
"""
Wenn das Topic eine Nachricht bekommt wird diese Methode ausgeführt
self: ist die MQTTClientHandler instanz, die wird gebraucht um die Einträge in
die InfluxDB zu schreiben
"""
msg = json.loads(msg.payload)
metadate = msg["metadata"]
# hier prüfen, ob die Mac-Adresse einen Raum hat,
# wenn nicht trage es in mac_to_room leer ein
# "aa:bb:cc:dd:ee:ff" : ""
mac = metadate["mac-address"]
if mac not in self.mac_to_room:
self.logger.warning(
f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt."
)
print(f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt.")
self.mac_to_room[mac] = "" # leerer Platzhalter
jsonhandler.write_json(self.mac_to_room, self.MAPPING_FILE_NAME)
self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
return
self.write_to_influxDB(msg, metadate)
def write_to_influxDB(self, msg: dict, metadate: dict):
try:
self.influx_writer.write_point(
measurement=self.MEASUREMENT_NAME,
tags={
self.TAG_ROOM: self.mac_to_room[metadate["mac-address"]],
self.TAG_MAC: metadate["mac-address"],
},
fields={
self.FIELD_CO2: msg["co2"],
self.FIELD_TEMP: msg["temp"],
self.FIELD_HUMIDITY: msg["rh"],
},
timestamp=metadate["time"], # fix
)
self.logger.info(
f"Wrote to InfluxDB: {msg}"
)
print(f"Token: {self.influx_writer.get_token()}")
print(f"Url: {self.influx_writer.get_url()}")
print(f"Wrote to InfluxDB: {msg}")
print(f"Ping: {self.influx_writer.ping()}")
except Exception as e:
print(f"Failed writing to InfluxDb: {e}")
self.logger.error(f"Failed writing to InfluxDb: {e}")
def start(self):
self.client.connect(self.broker_url)
self.client.loop_forever()