diff --git a/backend/app/influxdb_service.py b/backend/app/influxdb_service.py index 391c9595828581a294b6d1883c3d391a66f79736..a34ec541090a39ca21ea0ce42a6d9ae6a14fe3aa 100644 --- a/backend/app/influxdb_service.py +++ b/backend/app/influxdb_service.py @@ -1,17 +1,64 @@ import os from dotenv import load_dotenv +from utils.fluxQueryBuilder import FluxQueryBuilder from utils.influx import InfluxDBHelper load_dotenv() +_bucket=os.getenv("INFLUXDB_BUCKET") +_org=os.getenv("INFLUXDB_ORG") + client = InfluxDBHelper( url=os.getenv("INFLUXDB_URL"), token=os.getenv("INFLUXDB_TOKEN"), - org=os.getenv("INFLUXDB_ORG"), - bucket=os.getenv("INFLUXDB_BUCKET"), + org=_org, + bucket=_bucket, +) + + +# 2) build a Flux query at runtime +builder = ( + FluxQueryBuilder() + .bucket(_bucket) + .time_range("-30d", "now()") + .filter_measurement("sensor_data") + .filter_fields("co2", "rh", "temp") + .mean() ) -tables = client.get_all_data() +""" +Some query examples +from(bucket: "co2-test") + |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + |> filter(fn: (r) => r["_measurement"] == "sensor_data") + |> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature") + + +from(bucket: "co2-test") + |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + |> filter(fn: (r) => r["_measurement"] == "sensor_data") + |> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature") + +from(bucket: "example-bucket") + |> range(start: -1h) + |> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total") + |> window(every: 5m) //every five min + |> mean() //gives the average back + +from(bucket: "co2-dev") + |> range(start: v.timeRangeStart, stop: v.timeRangeStop) + |> filter(fn: (r) => r["_measurement"] == "sensor_data") + |> filter(fn: (r) => r["room"] == "1/210") + |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false) + |> yield(name: "mean") + +""" + +flux_query = builder.build() + +print(flux_query) + +tables = client.query_api.query(org= _org, query= flux_query) for table in tables: for record in table.records: diff --git a/backend/stream_processing/mQTTClientHandler.py b/backend/stream_processing/mQTTClientHandler.py index 3338cf4107599d31065791454330a559b0749b28..23f1e42ed9f2180f815dd035046713627ac73870 100644 --- a/backend/stream_processing/mQTTClientHandler.py +++ b/backend/stream_processing/mQTTClientHandler.py @@ -8,7 +8,7 @@ from utils.influx import InfluxDBHelper class MQTTClientHandler: - MAPPING_FILE_NAME = "stream_processing/mac_to_room.json" + MAPPING_FILE_NAME = "backend/stream_processing/mac_to_room.json" MEASUREMENT_NAME = "sensor_data" TAG_ROOM = "room" TAG_MAC = "mac" diff --git a/backend/stream_processing/mac_to_room.json b/backend/stream_processing/mac_to_room.json index 65c29eccebfc21fd7d8c39a9e08f1d29d4c84844..48037997e7995844b8ab4462e64a508bb1e6c094 100644 --- a/backend/stream_processing/mac_to_room.json +++ b/backend/stream_processing/mac_to_room.json @@ -10,5 +10,8 @@ "AB:AD:BE:EF:12:34": "", "EF:AD:BE:EF:12:34": "", "Tx:AD:BE:EF:12:34": "", - "T\u00dc:AD:BE:EF:12:34": "" + "T\u00dc:AD:BE:EF:12:34": "", + "08:3A:8D:E3:DF:AF": "", + "TE:ST:TE:ST:TE:ST": "", + "AB:CD:EF:12:34:56": "" } \ No newline at end of file diff --git a/backend/utils/fluxQueryBuilder.py b/backend/utils/fluxQueryBuilder.py new file mode 100644 index 0000000000000000000000000000000000000000..40258a6449bdc336e4d083f40b0b566239df206b --- /dev/null +++ b/backend/utils/fluxQueryBuilder.py @@ -0,0 +1,129 @@ +from typing import List, Optional + +class FluxQueryBuilder: + """ + A builder for constructing Flux queries in a fluent, step-by-step manner. + + Example: + builder = ( + FluxQueryBuilder() + .bucket("co2-test") + .time_range("v.timeRangeStart", "v.timeRangeStop") + .filter_measurement("sensor_data") + .filter_fields("co2", "humidity", "temperature") # Optional + .mean() # Optional + ) + query = builder.build() + """ + + def __init__(self) -> None: + # internal state for each part of the query + self._bucket: Optional[str] = None + self._start: Optional[str] = None + self._stop: Optional[str] = None + self._measurement: Optional[str] = None + self._fields: List[str] = [] + self._mean : Optional[bool] = None + + def bucket(self, name: str) -> "FluxQueryBuilder": + """ + Set the bucket name for the query. + + :param name: Name of the InfluxDB bucket + :return: self (for method chaining) + """ + self._bucket = name + return self + + def time_range(self, start: str, stop: str) -> "FluxQueryBuilder": + """ + Define the time window for the query. + + :param start: Flux expression for the start time + :param stop: Flux expression for the stop time + :return: self (for method chaining) + """ + self._start = start + self._stop = stop + return self + + def filter_measurement(self, measurement: str) -> "FluxQueryBuilder": + """ + Add a filter for the _measurement tag. + + :param measurement: Name of the measurement to filter + :return: self (for method chaining) + """ + self._measurement = measurement + return self + + + + def filter_fields(self, *fields: str) -> "FluxQueryBuilder": + """ + Add filters for one or more _field tags (OR-combined). + + :param fields: List of field names to include + :return: self (for method chaining) + """ + # extend the list of fields to filter + self._fields.extend(fields) + return self + + def mean(self) -> "FluxQueryBuilder": + """ + Activates average function for the _values of each table. + + :return: self (for method chaining) + """ + self._mean = True + return self + + def build(self) -> str: + """ + Assemble and return the final Flux query string. + + :raises ValueError: if mandatory parts are missing + :return: Flux query as a multiline string + """ + # Validate required parts + if not self._bucket: + raise ValueError("Bucket name is required.") + if not (self._start and self._stop): + raise ValueError("Both start and stop times are required.") + if not self._measurement: + raise ValueError("Measurement filter is required.") + + # Build the query lines + lines: List[str] = [] + + # Bucket + lines.append(f'from(bucket: "{self._bucket}")') + # Range + lines.append(f'|> range(start: {self._start}, stop: {self._stop})') + # Measurement + lines.append( + f'|> filter(fn: (r) => r["_measurement"] == "{self._measurement}")' + ) + + # Add field filters if any + if self._fields: + # create OR expression for fields + or_expr = " or ".join(f'r["_field"] == "{f}"' for f in self._fields) + lines.append(f'|> filter(fn: (r) => {or_expr})') + + # Add mean() function if called + if self.mean: + lines.append(f'|> mean()') + + # Join lines with proper indentation + return "\n ".join(lines) + + def reset(self) -> "FluxQueryBuilder": + """ + Reset the builder state to start fresh. + + :return: self (for method chaining) + """ + self.__init__() + return self diff --git a/backend/utils/influx.py b/backend/utils/influx.py index 54fb86f423a85c994ca170e196cf9b5c9e63817b..ee4014cbc8e6564ede4daa2f986dc001c86bef96 100644 --- a/backend/utils/influx.py +++ b/backend/utils/influx.py @@ -46,3 +46,4 @@ class InfluxDBHelper: |> last() ''' return self.query_api.query(org=self.org, query=query) +