An error occurred while loading the file. Please try again.
mqtt_to_influx.py 1.98 KiB
'''
import json
from datetime import datetime
import influxdb_client, os, time
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from dotenv import load_dotenv
import os
load_dotenv()  # <-- lädt deine .env Datei
# InfluxDB config
INFLUXDB_URL = os.environ.get("INFLUXDB_URL")
INFLUXDB_TOKEN = os.environ.get("INFLUXDB_TOKEN")
INFLUXDB_ORG = os.environ.get("INFLUXDB_ORG")
INFLUXDB_BUCKET = os.environ.get("INFLUXDB_BUCKET")
print(INFLUXDB_URL)
write_client = influxdb_client.InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_api = write_client.write_api(write_options=SYNCHRONOUS)
# MQTT config
MQTT_BROKER_URL = os.environ.get("MQTT_BROKER_URL")
MQTT_PUBLISH_TOPIC = os.environ.get("MQTT_TOPIC")
for value in range(5):
  point = (
    Point("measurement1")
    .tag("mac-adress", "22:de:aa:21::a2")
    .field("ppm", value)
  write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point)
  time.sleep(1) # separate points by 1 second
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))
    client.subscribe(MQTT_PUBLISH_TOPIC)
def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))
    try:
        # Decode JSON payload (z. B. {"value": 22.5, "unit": "C"})
        data = json.loads(msg.payload.decode())
        value = float(data.get("value"))
        # Create data point
        point = Point("temperature") \
            .tag("topic", msg.topic) \
            .field("value", value) \
            .time(datetime.utcnow())
        # Write to InfluxDB
        write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point)
        print("Wrote to InfluxDB:", point)
    except Exception as e:
        print("Error processing message:", e)
# Start MQTT client
mqttc = mqtt.Client() mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.connect(MQTT_BROKER_URL) mqttc.loop_forever() '''