An error occurred while loading the file. Please try again.
-
Wolfgang Knopki authored5e1b787e
import json
from utils.loggingFactory import LoggerFactory
from datetime import datetime
import paho.mqtt.client as mqtt
import jsonhandler
from utils.influx import InfluxDBHelper
import os
class MQTTClientHandler:
MAPPING_FILE_NAME = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"mac_to_room.json"
)
MEASUREMENT_NAME = "sensor_data"
TAG_ROOM = "room"
TAG_MAC = "mac"
FIELD_CO2 = "co2"
FIELD_TEMP = "temperature"
FIELD_HUMIDITY = "humidity"
# Konstruktor
def __init__(
self, broker_url: str, topic: str, influx_writer: InfluxDBHelper
):
print("DAS IST EIN TEST")
self.logger = LoggerFactory.get_logger(__name__)
# key: mac : value : room
self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
self.broker_url = broker_url
self.topic = topic
self.influx_writer = influx_writer
self.client = mqtt.Client()
# Methoden werden hier Events zugeteilt
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
def on_connect(self, client, userdata, flags, rc):
self.logger.info("Connected with result code " + str(rc))
print("Connected")
client.subscribe(self.topic)
self.logger.info("Subscribed to " + self.topic)
# eventuell refactorn und die Aufgaben in Methoden aufteilen
def on_message(self, client, userdata, msg):
"""
Wenn das Topic eine Nachricht bekommt wird diese Methode ausgeführt
self: ist die MQTTClientHandler instanz, die wird gebraucht um die Einträge in
die InfluxDB zu schreiben
"""
print("Message")
msg = json.loads(msg.payload)
metadate = msg["metadata"]
# hier prüfen, ob die Mac-Adresse einen Raum hat,
# wenn nicht trage es in mac_to_room leer ein
# "aa:bb:cc:dd:ee:ff" : ""
mac = metadate["mac-address"]
if mac not in self.mac_to_room:
self.logger.warning(
f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt."
)
print("MAC war nicht in File")
self.mac_to_room[mac] = "" # leerer Platzhalter
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
jsonhandler.write_json(self.mac_to_room, self.MAPPING_FILE_NAME)
self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
print("keine Ahnung")
return
self.write_to_influxDB(msg, metadate)
def write_to_influxDB(self, msg: dict, metadate: dict):
try:
print(msg)
self.influx_writer.write_point(
measurement=self.MEASUREMENT_NAME,
tags={
self.TAG_ROOM: self.mac_to_room[metadate["mac-address"]],
self.TAG_MAC: metadate["mac-address"],
},
fields={
self.FIELD_CO2: msg["co2"],
self.FIELD_TEMP: msg["temp"],
self.FIELD_HUMIDITY: msg["rh"],
},
timestamp=metadate["time"], # fix
)
self.logger.info(
f"Wrote to InfluxDB: {msg}"
)
except Exception as e:
self.logger.error(f"Failed writing to InfluxDb: {e}")
def start(self):
self.client.connect(self.broker_url)
self.client.loop_forever()