Commit 2569b81c authored by Gezer's avatar Gezer
Browse files

Push last stand

parent 5402c072
Showing with 124 additions and 149 deletions
+124 -149
......@@ -11,4 +11,15 @@ docker compose up // Startet des docker-compose.yaml in dem aktuellen Verzeichni
## Eventuelle ToDos
dieses projekt anschauen:
https://github.com/influxdata/iot-api-python
\ No newline at end of file
https://github.com/influxdata/iot-api-python
Json Format für die Nachrichten
{
"metadata":{
"mac-address":"test",
"time":"2025-04-10 11:33:21+01"
},
"co2":528,
"temp":26.0, 
  "rh":24.2
}
\ No newline at end of file
# InfluxDB config
INFLUXDB_URL=http://influxdb2:8086
INFLUXDB_ORG=docs
INFLUXDB_BUCKET=co2-test
INFLUXDB_TOKEN=35aKI6fq8SRli6cTmSGgvrqn8t4jYKp-ABgL7HGjwez9rh6YXqEt2F4ZGf_jJ_yATjcE8d4aMlqsmu_VaybTWA==
# MQTT config
MQTT_BROKER_URL=mosquitto
MQTT_TOPIC="co2/#"
\ No newline at end of file
......@@ -5,26 +5,50 @@ from utils.influx import InfluxDBHelper
load_dotenv()
_bucket=os.getenv("INFLUXDB_BUCKET")
_org=os.getenv("INFLUXDB_ORG")
load_dotenv()
# Create the InfluxDB client
client = InfluxDBHelper(
url=os.getenv("INFLUXDB_URL"),
token=os.getenv("INFLUXDB_TOKEN"),
org=_org,
bucket=_bucket,
org=os.getenv("INFLUXDB_ORG"),
bucket=os.getenv("INFLUXDB_BUCKET"),
)
# Build a Flux query
builder = (
FluxQueryBuilder()
.bucket(_bucket)
.bucket(os.getenv("INFLUXDB_BUCKET"))
.time_range("-30d", "now()")
.filter_measurement("sensor_data")
.filter_fields("co2", "rh", "temp") # with "or" we should implement "and" too
.filter_fields("co2", "humidity", "temperature")
.pivot()
.mean()
)
# Get the query string
flux_query = builder.build()
print("Generated Flux Query:\n", flux_query)
# Run the query
tables = client.query_api.query(org=os.getenv("INFLUXDB_ORG"), query=flux_query)
# Output the results
for table in tables:
for record in table.records:
print(record.values)
"""
flux_query = builder.build()
print(flux_query)
tables = client.query_api.query(org= _org, query= flux_query)
for table in tables:
for record in table.records:
print(record)
Some query examples
from(bucket: "co2-test")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
......@@ -52,12 +76,3 @@ from(bucket: "co2-dev")
"""
flux_query = builder.build()
print(flux_query)
tables = client.query_api.query(org= _org, query= flux_query)
for table in tables:
for record in table.records:
print(record)
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
class InfluxDBWriter:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.bucket = bucket
self.org = org
def write_point(
self, measurement: str, tags: dict, fields: dict, timestamp=None
):
point = Point(measurement)
for k, v in tags.items():
point.tag(k, v)
for k, v in fields.items():
point.field(k, v)
if timestamp:
point.time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
......@@ -35,6 +35,7 @@ class MQTTClientHandler:
self.logger.info("Connected with result code " + str(rc))
client.subscribe(self.topic)
self.logger.info("Subscribed to " + self.topic)
print("Connected with result code " + str(rc) + "\n" + "Subscribed to " + self.topic)
# eventuell refactorn und die Aufgaben in Methoden aufteilen
def on_message(self, client, userdata, msg):
......@@ -55,6 +56,7 @@ class MQTTClientHandler:
self.logger.warning(
f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt."
)
print(f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt.")
self.mac_to_room[mac] = "" # leerer Platzhalter
jsonhandler.write_json(self.mac_to_room, self.MAPPING_FILE_NAME)
self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
......@@ -77,10 +79,14 @@ class MQTTClientHandler:
},
timestamp=metadate["time"], # fix
)
print(
"Wrote to InfluxDB:", msg
) # muss später rausgeschmiessen werden
self.logger.info(
f"Wrote to InfluxDB: {msg}"
)
print(f"Token: {self.influx_writer.get_token()}")
print(f"Wrote to InfluxDB: {msg}")
print(f"Ping: {self.influx_writer.ping()}")
except Exception as e:
print(f"Failed writing to InfluxDb: {e}")
self.logger.error(f"Failed writing to InfluxDb: {e}")
def start(self):
......
......@@ -13,5 +13,7 @@
"T\u00dc:AD:BE:EF:12:34": "",
"08:3A:8D:E3:DF:AF": "",
"TE:ST:TE:ST:TE:ST": "",
"AB:CD:EF:12:34:56": ""
"AB:CD:EF:12:34:56": "",
"a1:b2:c3:d4:e5:f6": "",
"77:88:99:aa:bb:cc": ""
}
\ No newline at end of file
from typing import List, Optional
class FluxQueryBuilder:
"""
A builder for constructing Flux queries in a fluent, step-by-step manner.
A builder class for constructing Flux queries (InfluxDB 2.x).
Supports fluent method chaining for readable query construction.
Example:
builder = (
query = (
FluxQueryBuilder()
.bucket("co2-test")
.time_range("v.timeRangeStart", "v.timeRangeStop")
.bucket("sensor_data")
.time_range("-1h", "now()")
.filter_measurement("sensor_data")
.filter_fields("co2", "humidity", "temperature") # Optional
.mean() # Optional
.filter_fields("co2", "temperature", "humidity")
.filter_field("room", "1/210")
.pivot()
.mean()
.build()
)
query = builder.build()
"""
def __init__(self) -> None:
# internal state for each part of the query
self._bucket: Optional[str] = None
self._start: Optional[str] = None
self._stop: Optional[str] = None
self._measurement: Optional[str] = None
self._fields: List[str] = []
self._mean : Optional[bool] = None
self._dict : Optional[dict] = None
self._field_filters: dict = {} # e.g. {"room": "1/210"}
self._use_mean: bool = False
self._use_pivot: bool = False
def bucket(self, name: str) -> "FluxQueryBuilder":
"""
Set the bucket name for the query.
:param name: Name of the InfluxDB bucket
:return: self (for method chaining)
"""
"""Set the InfluxDB bucket name."""
self._bucket = name
return self
def time_range(self, start: str, stop: str) -> "FluxQueryBuilder":
"""
Define the time window for the query.
:param start: Flux expression for the start time
:param stop: Flux expression for the stop time
:return: self (for method chaining)
"""
def time_range(self, start: str, stop: str) -> "FluxQueryBuilder":
"""Set time range for the query."""
self._start = start
self._stop = stop
return self
def filter_measurement(self, measurement: str) -> "FluxQueryBuilder":
"""
Add a filter for the _measurement tag.
:param measurement: Name of the measurement to filter
:return: self (for method chaining)
"""
"""Filter for a specific _measurement value."""
self._measurement = measurement
return self
def filter_field(self, field: str, value:str) -> "FluxQueryBuilder":
"""
Add filters for one or more _field tags (OR-combined).
:param
Example: field = "room", value = "1/210"
Result: |> filter(fn: (r) => r["room"] == "1/210")
:return: self (for method chaining)
"""
# extend the list of fields to filter
self._dict[field] = value
def filter_field(self, field: str, value: str) -> "FluxQueryBuilder":
"""Add a tag filter like r["room"] == "1/210"."""
self._field_filters[field] = value
return self
def filter_fields(self, *fields: str) -> "FluxQueryBuilder":
"""
Add filters for one or more _field tags (OR-combined).
:param fields: List of field names to include
:return: self (for method chaining)
"""
# extend the list of fields to filter
"""Filter for multiple _field values using OR."""
self._fields.extend(fields)
return self
def mean(self) -> "FluxQueryBuilder":
"""
Activates average function for the _values of each table.
"""Apply mean() aggregation."""
self._use_mean = True
return self
:return: self (for method chaining)
"""
self._mean = True
def pivot(self) -> "FluxQueryBuilder":
"""Apply pivot() to restructure results with multiple fields per timestamp."""
self._use_pivot = True
return self
def build(self) -> str:
"""
Assemble and return the final Flux query string.
:raises ValueError: if mandatory parts are missing
:return: Flux query as a multiline string
"""
# Validate required parts
"""Construct and return the final Flux query string."""
if not self._bucket:
raise ValueError("Bucket name is required.")
if not (self._start and self._stop):
raise ValueError("Both start and stop times are required.")
raise ValueError("Start and stop times are required.")
if not self._measurement:
raise ValueError("Measurement filter is required.")
raise ValueError("Measurement is required.")
# Build the query lines
lines: List[str] = []
# Bucket
lines.append(f'from(bucket: "{self._bucket}")')
# Range
lines.append(f'|> range(start: {self._start}, stop: {self._stop})')
# Measurement
lines.append(
f'|> filter(fn: (r) => r["_measurement"] == "{self._measurement}")'
)
lines.append(f' |> range(start: {self._start}, stop: {self._stop})')
lines.append(f' |> filter(fn: (r) => r["_measurement"] == "{self._measurement}")')
# Add specified filter if any
#if self._dict:
#for key, value in self._dict.items():
#lines.append(f'|> filter(fn: (r) => r["_field"] == "{f}")')
#lines.append(f'r["_field"] == "{f}"')
# Optional tag filters (e.g., room or mac)
for key, value in self._field_filters.items():
lines.append(f' |> filter(fn: (r) => r["{key}"] == "{value}")')
# Add field filters if any
# Optional field filters (_field == ...)
if self._fields:
# create OR expression for fields
or_expr = " or ".join(f'r["_field"] == "{f}"' for f in self._fields)
lines.append(f'|> filter(fn: (r) => {or_expr})')
lines.append(f' |> filter(fn: (r) => {or_expr})')
# Add mean() function if called
if self.mean:
lines.append(f'|> mean()')
if self._use_mean:
lines.append(' |> mean()')
# Join lines with proper indentation
return "\n ".join(lines)
if self._use_pivot:
lines.append(' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")')
def reset(self) -> "FluxQueryBuilder":
"""
Reset the builder state to start fresh.
return "\n".join(lines)
:return: self (for method chaining)
"""
def reset(self) -> "FluxQueryBuilder":
"""Reset the builder so a new query can be built from scratch."""
self.__init__()
return self
......@@ -5,6 +5,7 @@ from influxdb_client.client.write_api import WriteOptions
class InfluxDBHelper:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.token = token
self.client = InfluxDBClient(url=url, token=token, org=org)
self.bucket = bucket
self.org = org
......@@ -26,6 +27,12 @@ class InfluxDBHelper:
if timestamp:
point.time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
def get_token(self):
return self.token
def ping(self):
return self.client.ping()
def get_all_data(self):
""" """
......
......@@ -64,7 +64,7 @@ services:
DOCKER_INFLUXDB_INIT_PASSWORD_FILE: /run/secrets/influxdb2-admin-password
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN_FILE: /run/secrets/influxdb2-admin-token
DOCKER_INFLUXDB_INIT_ORG: docs
DOCKER_INFLUXDB_INIT_BUCKET: home
DOCKER_INFLUXDB_INIT_BUCKET: co2-test
secrets:
- influxdb2-admin-username
- influxdb2-admin-password
......@@ -76,7 +76,6 @@ services:
- type: volume
source: influxdb2-config
target: /etc/influxdb2
secrets:
influxdb2-admin-username:
file: ./services/influxdb/influxdb2-admin-username
......@@ -84,8 +83,6 @@ secrets:
file: ./services/influxdb/influxdb2-admin-password
influxdb2-admin-token:
file: ./services/influxdb/influxdb2-admin-token
volumes:
influxdb2-data:
influxdb2-config:
influxdb2-config:
\ No newline at end of file
......@@ -67,8 +67,6 @@
updateChart: () => chartRef.value?.chart?.update()
})
// Beispielzustand
const count = ref(0)
</script>
<style scoped>
......
......@@ -10,10 +10,10 @@ services:
DOCKER_INFLUXDB_INIT_PASSWORD_FILE: /run/secrets/influxdb2-admin-password
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN_FILE: /run/secrets/influxdb2-admin-token
DOCKER_INFLUXDB_INIT_ORG: docs
DOCKER_INFLUXDB_INIT_BUCKET: home
DOCKER_INFLUXDB_INIT_BUCKET: co2-test
secrets:
- influxdb2-admin-username
- influxdb2-admin-password
- influxdb2-admin-password
- influxdb2-admin-token
volumes:
- type: volume
......@@ -24,11 +24,11 @@ services:
target: /etc/influxdb2
secrets:
influxdb2-admin-username:
file: /opt/stacks/influxdb/influxdb2-admin-username
file: ./influxdb2-admin-username
influxdb2-admin-password:
file: /opt/stacks/influxdb/influxdb2-admin-password
file: ./influxdb2-admin-password
influxdb2-admin-token:
file: /opt/stacks/influxdb/influxdb2-admin-token
file: ./influxdb2-admin-token
volumes:
influxdb2-data:
influxdb2-config:
......
MyInitialAdminToken0==
w-Isk1D35T90Srj_auFTxsbksn1zRB5MiNZf6h6RuNdb9-2s9ie5c1488JqoYILKrceVm0LaE5KCN2dXdDM-jA==
\ No newline at end of file
No preview for this file type
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment