diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..e7f1c73124997c9faea1847393cce5875cdb4126 --- /dev/null +++ b/.gitignore @@ -0,0 +1,18 @@ +# Ignoriere die virtuelle Umgebung +.venv/ +venv/ + +# Ignoriere die __pycache__-Ordner +__pycache__/ + +# Ignoriere Python Bytecode-Dateien +*.pyc +*.pyo + +# Ignoriere IDE-spezifische Dateien (falls du zum Beispiel VSCode oder PyCharm verwendest) +.vscode/ +.idea/ + +# Ignoriere andere temporäre Dateien +*.log +*.bak diff --git a/README.md b/README.md deleted file mode 100644 index 1dda1b8cbf315233cecfe25004cf0f06c7994d43..0000000000000000000000000000000000000000 --- a/README.md +++ /dev/null @@ -1,64 +0,0 @@ -# MQTT Influx Backend - -Ein MQTT-Subscriber, der Sensordaten (z. B. COâ‚‚-Werte) von einem MQTT-Broker empfängt und in eine InfluxDB schreibt. - -## 📠Projektstruktur - -ToDo - ---- - -## âš™ï¸ Setup - -### 1. Projekt klonen oder in das Verzeichnis wechseln -```bash -cd ~/dev/StreamProcessor - -2. Virtuelle Umgebung mit uv erstellen - -uv venv .venv -source .venv/bin/activate - -3. Abhängigkeiten installieren - -uv pip install . - -🧪 Testen der InfluxDB-Verbindung - -Zum Testen, ob das Schreiben in die Datenbank funktioniert, kannst du folgendes Skript ausführen: - -python Main.py - -Das Skript schreibt einige Testwerte in den co2-test Bucket in InfluxDB. -🔠.env Datei (Beispiel) - -Erstelle im Projektverzeichnis eine .env Datei mit folgendem Inhalt: - -INFLUXDB_URL=http://localhost:8086 -INFLUXDB_TOKEN=dein_token -INFLUXDB_ORG=dein_org -INFLUXDB_BUCKET=co2-test - -MQTT_BROKER_URL=mqtt.eclipseprojects.io -MQTT_TOPIC=co2/# - - 🛑 Wichtig: Diese Datei sollte nicht in Git eingecheckt werden – sensibel! - -🚀 Kommende Features - - Live-Daten über MQTT empfangen (co2/#) - - Optionales Logging oder Monitoring - - Datenvisualisierung per Dashboard - -👤 Autor - -Aaron Mele - -Morten Stetter - -Patrick Ade - -Emre Gezer -📧 21geem1bif@hft-stuttgart.de diff --git a/build/lib/mqtt_influx_backend/InfluxDBWriter.py b/build/lib/mqtt_influx_backend/InfluxDBWriter.py new file mode 100644 index 0000000000000000000000000000000000000000..398a7f4c23e3d1de08f10ba86cbd6579f2ebbf0a --- /dev/null +++ b/build/lib/mqtt_influx_backend/InfluxDBWriter.py @@ -0,0 +1,20 @@ +from influxdb_client import InfluxDBClient, Point, WritePrecision +from influxdb_client.client.write_api import SYNCHRONOUS + +class InfluxDBWriter: + def __init__(self, url: str, token: str, org: str, bucket: str): + self.client = InfluxDBClient(url=url, token=token, org=org) + self.write_api = self.client.write_api(write_options=SYNCHRONOUS) + self.bucket = bucket + self.org = org + + def write_point(self, measurement: str, tags: dict, fields: dict, timestamp=None): + point = Point(measurement) + for k, v in tags.items(): + point.tag(k, v) + for k, v in fields.items(): + point.field(k, v) + if timestamp: + point.time(timestamp, WritePrecision.NS) + + self.write_api.write(bucket=self.bucket, org=self.org, record=point) diff --git a/build/lib/mqtt_influx_backend/MQTTClientHandler.py b/build/lib/mqtt_influx_backend/MQTTClientHandler.py new file mode 100644 index 0000000000000000000000000000000000000000..d42bf56f0ff674799be6a9c57270d71775eee13d --- /dev/null +++ b/build/lib/mqtt_influx_backend/MQTTClientHandler.py @@ -0,0 +1,37 @@ +import json +from datetime import datetime +import paho.mqtt.client as mqtt +from mqtt_influx_backend import InfluxDBWriter + +class MQTTClientHandler: + def __init__(self, broker_url: str, topic: str, influx_writer: InfluxDBWriter): + self.broker_url = broker_url + self.topic = topic + self.influx_writer = influx_writer + self.client = mqtt.Client() + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + + def on_connect(self, client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + client.subscribe(self.topic) + + def on_message(self, client, userdata, msg): + print("hallo bin in der methofe") + try: + data = json.loads(msg.payload.decode()) + value = float(data.get("value")) + + self.influx_writer.write_point( + measurement="temperature", + tags={"topic": msg.topic}, + fields={"value": value}, + timestamp=datetime.utcnow() + ) + print("Wrote to InfluxDB:", value) + except Exception as e: + print("Error processing message:", e) + + def start(self): + self.client.connect(self.broker_url) + self.client.loop_forever() diff --git a/build/lib/mqtt_influx_backend/Main.py b/build/lib/mqtt_influx_backend/Main.py new file mode 100644 index 0000000000000000000000000000000000000000..4202370acfd55ac2839940c61537793464e1eb6c --- /dev/null +++ b/build/lib/mqtt_influx_backend/Main.py @@ -0,0 +1,26 @@ +from dotenv import load_dotenv +import os + +from mqtt_influx_backend.MQTTClientHandler import MQTTClientHandler +from mqtt_influx_backend.InfluxDBWriter import InfluxDBWriter + +load_dotenv() + +def main(): + influx_writer = InfluxDBWriter( + url=os.getenv("INFLUXDB_URL"), + token=os.getenv("INFLUXDB_TOKEN"), + org=os.getenv("INFLUXDB_ORG"), + bucket=os.getenv("INFLUXDB_BUCKET") + ) + + mqtt_handler = MQTTClientHandler( + broker_url=os.getenv("MQTT_BROKER_URL"), + topic=os.getenv("MQTT_TOPIC"), + influx_writer=influx_writer + ) + + mqtt_handler.start() + +if __name__ == "__main__": + main() diff --git a/build/lib/mqtt_to_influx.py b/build/lib/mqtt_to_influx.py new file mode 100644 index 0000000000000000000000000000000000000000..504609084b08ccd398f03020b4a8db7845e2e347 --- /dev/null +++ b/build/lib/mqtt_to_influx.py @@ -0,0 +1,73 @@ +import json +from datetime import datetime + +import influxdb_client, os, time +from influxdb_client import InfluxDBClient, Point, WritePrecision +from influxdb_client.client.write_api import SYNCHRONOUS + +# InfluxDB config +INFLUXDB_URL = os.environ.get("INFLUXDB_URL") +INFLUXDB_TOKEN = os.environ.get("INFLUXDB_TOKEN") +INFLUXDB_ORG = os.environ.get("INFLUXDB_ORG") +INFLUXDB_BUCKET = os.environ.get("INFLUXDB_BUCKET") + +write_client = influxdb_client.InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) + +bucket="co2-test" + +write_api = write_client.write_api(write_options=SYNCHRONOUS) + +# MQTT config +MQTT_BROKER_URL = os.environ.get("MQTT_BROKER_URL") +MQTT_PUBLISH_TOPIC = os.environ.get("MQTT_TOPIC") + +for value in range(5): + point = ( + Point("measurement1") + .tag("mac-adress", "22:de:aa:21::a2") + .field("ppm", value) + ) + write_api.write(bucket=bucket, org=INFLUXDB_ORG, record=point) + time.sleep(1) # separate points by 1 second + + + + +''' +def on_connect(client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + client.subscribe(MQTT_PUBLISH_TOPIC) + +def on_message(client, userdata, msg): + print(msg.topic + " " + str(msg.payload)) + + try: + # Decode JSON payload (z. B. {"value": 22.5, "unit": "C"}) + data = json.loads(msg.payload.decode()) + value = float(data.get("value")) + + # Create data point + point = Point("temperature") \ + .tag("topic", msg.topic) \ + .field("value", value) \ + .time(datetime.utcnow()) + + # Write to InfluxDB + write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point) + print("Wrote to InfluxDB:", point) + + except Exception as e: + print("Error processing message:", e) +''' + + +# Start MQTT client +''' +mqttc = mqtt.Client() +mqttc.on_connect = on_connect +mqttc.on_message = on_message + +mqttc.connect(MQTT_BROKER_URL) +mqttc.loop_forever() +''' + diff --git a/mqtt_influx_backend.egg-info/PKG-INFO b/mqtt_influx_backend.egg-info/PKG-INFO new file mode 100644 index 0000000000000000000000000000000000000000..f6c854a48d2a898a5e3b83efec66037f60ca650e --- /dev/null +++ b/mqtt_influx_backend.egg-info/PKG-INFO @@ -0,0 +1,9 @@ +Metadata-Version: 2.4 +Name: mqtt-influx-backend +Version: 0.1.0 +Summary: MQTT subscriber that stores data in InfluxDB +Author-email: Dein Name <deine.email@example.com> +Requires-Python: >=3.8 +Requires-Dist: influxdb-client +Requires-Dist: paho-mqtt +Requires-Dist: python-dotenv diff --git a/mqtt_influx_backend.egg-info/SOURCES.txt b/mqtt_influx_backend.egg-info/SOURCES.txt new file mode 100644 index 0000000000000000000000000000000000000000..92ee43a393b7fdeb972c41a02127212c89954e81 --- /dev/null +++ b/mqtt_influx_backend.egg-info/SOURCES.txt @@ -0,0 +1,7 @@ +mqtt_to_influx.py +pyproject.toml +mqtt_influx_backend.egg-info/PKG-INFO +mqtt_influx_backend.egg-info/SOURCES.txt +mqtt_influx_backend.egg-info/dependency_links.txt +mqtt_influx_backend.egg-info/requires.txt +mqtt_influx_backend.egg-info/top_level.txt \ No newline at end of file diff --git a/mqtt_influx_backend.egg-info/dependency_links.txt b/mqtt_influx_backend.egg-info/dependency_links.txt new file mode 100644 index 0000000000000000000000000000000000000000..8b137891791fe96927ad78e64b0aad7bded08bdc --- /dev/null +++ b/mqtt_influx_backend.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/mqtt_influx_backend.egg-info/requires.txt b/mqtt_influx_backend.egg-info/requires.txt new file mode 100644 index 0000000000000000000000000000000000000000..71b4ba31498e200ce0adf0b5c4319668288ca4c2 --- /dev/null +++ b/mqtt_influx_backend.egg-info/requires.txt @@ -0,0 +1,3 @@ +influxdb-client +paho-mqtt +python-dotenv diff --git a/mqtt_influx_backend.egg-info/top_level.txt b/mqtt_influx_backend.egg-info/top_level.txt new file mode 100644 index 0000000000000000000000000000000000000000..e657adfad8f51a28d6c48f070b72310823b08fdf --- /dev/null +++ b/mqtt_influx_backend.egg-info/top_level.txt @@ -0,0 +1 @@ +mqtt_to_influx diff --git a/mqtt_to_influx.py b/mqtt_to_influx.py new file mode 100644 index 0000000000000000000000000000000000000000..8060d481e4c409521e768b2c02c181b32f2276a5 --- /dev/null +++ b/mqtt_to_influx.py @@ -0,0 +1,80 @@ + +''' +import json +from datetime import datetime + +import influxdb_client, os, time +from influxdb_client import InfluxDBClient, Point, WritePrecision +from influxdb_client.client.write_api import SYNCHRONOUS +from dotenv import load_dotenv +import os + +load_dotenv() # <-- lädt deine .env Datei + +# InfluxDB config +INFLUXDB_URL = os.environ.get("INFLUXDB_URL") +INFLUXDB_TOKEN = os.environ.get("INFLUXDB_TOKEN") +INFLUXDB_ORG = os.environ.get("INFLUXDB_ORG") +INFLUXDB_BUCKET = os.environ.get("INFLUXDB_BUCKET") + +print(INFLUXDB_URL) + +write_client = influxdb_client.InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) + +write_api = write_client.write_api(write_options=SYNCHRONOUS) + +# MQTT config +MQTT_BROKER_URL = os.environ.get("MQTT_BROKER_URL") +MQTT_PUBLISH_TOPIC = os.environ.get("MQTT_TOPIC") + +for value in range(5): + point = ( + Point("measurement1") + .tag("mac-adress", "22:de:aa:21::a2") + .field("ppm", value) + ) + write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point) + time.sleep(1) # separate points by 1 second + + + + + +def on_connect(client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + client.subscribe(MQTT_PUBLISH_TOPIC) + +def on_message(client, userdata, msg): + print(msg.topic + " " + str(msg.payload)) + + try: + # Decode JSON payload (z. B. {"value": 22.5, "unit": "C"}) + data = json.loads(msg.payload.decode()) + value = float(data.get("value")) + + # Create data point + point = Point("temperature") \ + .tag("topic", msg.topic) \ + .field("value", value) \ + .time(datetime.utcnow()) + + # Write to InfluxDB + write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point) + print("Wrote to InfluxDB:", point) + + except Exception as e: + print("Error processing message:", e) + + + +# Start MQTT client +mqttc = mqtt.Client() +mqttc.on_connect = on_connect +mqttc.on_message = on_message + +mqttc.connect(MQTT_BROKER_URL) +mqttc.loop_forever() + + + +''' diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000000000000000000000000000000000000..f9e7db84f2ead29489fb3bd36e784ffb8b8c169c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,20 @@ +[project] +name = "mqtt-influx-backend" +version = "0.1.0" +description = "MQTT subscriber that stores data in InfluxDB" +authors = [ + { name = "Emre Gezer", email = "21geem1bif@hft-stuttgart.de" } +] +dependencies = [ + "influxdb-client", + "paho-mqtt", + "python-dotenv" +] +requires-python = ">=3.8" + +[tool.uv] +# Optional: für zukünftige Konfigurationen + +[tool.setuptools] +package-dir = {"" = "src"} +packages = ["mqtt_influx_backend"] diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/mqtt_influx_backend.egg-info/PKG-INFO b/src/mqtt_influx_backend.egg-info/PKG-INFO new file mode 100644 index 0000000000000000000000000000000000000000..60300eb2332aa05970c424755e4a79ecb7b32d9e --- /dev/null +++ b/src/mqtt_influx_backend.egg-info/PKG-INFO @@ -0,0 +1,9 @@ +Metadata-Version: 2.4 +Name: mqtt-influx-backend +Version: 0.1.0 +Summary: MQTT subscriber that stores data in InfluxDB +Author-email: Emre Gezer <21geem1bif@hft-stuttgart.de> +Requires-Python: >=3.8 +Requires-Dist: influxdb-client +Requires-Dist: paho-mqtt +Requires-Dist: python-dotenv diff --git a/src/mqtt_influx_backend.egg-info/SOURCES.txt b/src/mqtt_influx_backend.egg-info/SOURCES.txt new file mode 100644 index 0000000000000000000000000000000000000000..cd04f13f60475dbc2ae8e9caefecb73ea3040c82 --- /dev/null +++ b/src/mqtt_influx_backend.egg-info/SOURCES.txt @@ -0,0 +1,10 @@ +README.md +pyproject.toml +src/mqtt_influx_backend/InfluxDBWriter.py +src/mqtt_influx_backend/MQTTClientHandler.py +src/mqtt_influx_backend/Main.py +src/mqtt_influx_backend.egg-info/PKG-INFO +src/mqtt_influx_backend.egg-info/SOURCES.txt +src/mqtt_influx_backend.egg-info/dependency_links.txt +src/mqtt_influx_backend.egg-info/requires.txt +src/mqtt_influx_backend.egg-info/top_level.txt \ No newline at end of file diff --git a/src/mqtt_influx_backend.egg-info/dependency_links.txt b/src/mqtt_influx_backend.egg-info/dependency_links.txt new file mode 100644 index 0000000000000000000000000000000000000000..8b137891791fe96927ad78e64b0aad7bded08bdc --- /dev/null +++ b/src/mqtt_influx_backend.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/src/mqtt_influx_backend.egg-info/requires.txt b/src/mqtt_influx_backend.egg-info/requires.txt new file mode 100644 index 0000000000000000000000000000000000000000..71b4ba31498e200ce0adf0b5c4319668288ca4c2 --- /dev/null +++ b/src/mqtt_influx_backend.egg-info/requires.txt @@ -0,0 +1,3 @@ +influxdb-client +paho-mqtt +python-dotenv diff --git a/src/mqtt_influx_backend.egg-info/top_level.txt b/src/mqtt_influx_backend.egg-info/top_level.txt new file mode 100644 index 0000000000000000000000000000000000000000..90227a0f27aa20585989302de846024dd63a5ab4 --- /dev/null +++ b/src/mqtt_influx_backend.egg-info/top_level.txt @@ -0,0 +1 @@ +mqtt_influx_backend diff --git a/src/mqtt_influx_backend/InfluxDBWriter.py b/src/mqtt_influx_backend/InfluxDBWriter.py new file mode 100644 index 0000000000000000000000000000000000000000..398a7f4c23e3d1de08f10ba86cbd6579f2ebbf0a --- /dev/null +++ b/src/mqtt_influx_backend/InfluxDBWriter.py @@ -0,0 +1,20 @@ +from influxdb_client import InfluxDBClient, Point, WritePrecision +from influxdb_client.client.write_api import SYNCHRONOUS + +class InfluxDBWriter: + def __init__(self, url: str, token: str, org: str, bucket: str): + self.client = InfluxDBClient(url=url, token=token, org=org) + self.write_api = self.client.write_api(write_options=SYNCHRONOUS) + self.bucket = bucket + self.org = org + + def write_point(self, measurement: str, tags: dict, fields: dict, timestamp=None): + point = Point(measurement) + for k, v in tags.items(): + point.tag(k, v) + for k, v in fields.items(): + point.field(k, v) + if timestamp: + point.time(timestamp, WritePrecision.NS) + + self.write_api.write(bucket=self.bucket, org=self.org, record=point) diff --git a/src/mqtt_influx_backend/MQTTClientHandler.py b/src/mqtt_influx_backend/MQTTClientHandler.py new file mode 100644 index 0000000000000000000000000000000000000000..d42bf56f0ff674799be6a9c57270d71775eee13d --- /dev/null +++ b/src/mqtt_influx_backend/MQTTClientHandler.py @@ -0,0 +1,37 @@ +import json +from datetime import datetime +import paho.mqtt.client as mqtt +from mqtt_influx_backend import InfluxDBWriter + +class MQTTClientHandler: + def __init__(self, broker_url: str, topic: str, influx_writer: InfluxDBWriter): + self.broker_url = broker_url + self.topic = topic + self.influx_writer = influx_writer + self.client = mqtt.Client() + self.client.on_connect = self.on_connect + self.client.on_message = self.on_message + + def on_connect(self, client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + client.subscribe(self.topic) + + def on_message(self, client, userdata, msg): + print("hallo bin in der methofe") + try: + data = json.loads(msg.payload.decode()) + value = float(data.get("value")) + + self.influx_writer.write_point( + measurement="temperature", + tags={"topic": msg.topic}, + fields={"value": value}, + timestamp=datetime.utcnow() + ) + print("Wrote to InfluxDB:", value) + except Exception as e: + print("Error processing message:", e) + + def start(self): + self.client.connect(self.broker_url) + self.client.loop_forever() diff --git a/src/mqtt_influx_backend/Main.py b/src/mqtt_influx_backend/Main.py new file mode 100644 index 0000000000000000000000000000000000000000..4202370acfd55ac2839940c61537793464e1eb6c --- /dev/null +++ b/src/mqtt_influx_backend/Main.py @@ -0,0 +1,26 @@ +from dotenv import load_dotenv +import os + +from mqtt_influx_backend.MQTTClientHandler import MQTTClientHandler +from mqtt_influx_backend.InfluxDBWriter import InfluxDBWriter + +load_dotenv() + +def main(): + influx_writer = InfluxDBWriter( + url=os.getenv("INFLUXDB_URL"), + token=os.getenv("INFLUXDB_TOKEN"), + org=os.getenv("INFLUXDB_ORG"), + bucket=os.getenv("INFLUXDB_BUCKET") + ) + + mqtt_handler = MQTTClientHandler( + broker_url=os.getenv("MQTT_BROKER_URL"), + topic=os.getenv("MQTT_TOPIC"), + influx_writer=influx_writer + ) + + mqtt_handler.start() + +if __name__ == "__main__": + main()