diff --git a/README.md b/README.md index 7dfbc0e93d1e0993a314da8146a3bf90d4dd9cb4..910a2e43d922daa8c14de41a371386626e55afb9 100644 --- a/README.md +++ b/README.md @@ -11,4 +11,15 @@ docker compose up // Startet des docker-compose.yaml in dem aktuellen Verzeichni ## Eventuelle ToDos dieses projekt anschauen: -https://github.com/influxdata/iot-api-python \ No newline at end of file +https://github.com/influxdata/iot-api-python + +Json Format für die Nachrichten + { + "metadata":{ + "mac-address":"test", + "time":"2025-04-10 11:33:21+01" + }, + "co2":528, + "temp":26.0, +  "rh":24.2 +} \ No newline at end of file diff --git a/backend/.env.docker b/backend/.env.docker new file mode 100644 index 0000000000000000000000000000000000000000..9b6ca8bf11c0c1a534f8f19730d191dfbf17a903 --- /dev/null +++ b/backend/.env.docker @@ -0,0 +1,8 @@ +# InfluxDB config +INFLUXDB_URL=http://influxdb2:8086 +INFLUXDB_ORG=docs +INFLUXDB_BUCKET=co2-test +INFLUXDB_TOKEN=35aKI6fq8SRli6cTmSGgvrqn8t4jYKp-ABgL7HGjwez9rh6YXqEt2F4ZGf_jJ_yATjcE8d4aMlqsmu_VaybTWA== +# MQTT config +MQTT_BROKER_URL=mosquitto +MQTT_TOPIC="co2/#" \ No newline at end of file diff --git a/backend/app/influxdb_service.py b/backend/app/influxdb_service.py index 36e04e68bc45b0a77c41396901db5287b3cab1cd..63dd82c93b26ec574a8970c48f45023b2e8b3124 100644 --- a/backend/app/influxdb_service.py +++ b/backend/app/influxdb_service.py @@ -5,26 +5,50 @@ from utils.influx import InfluxDBHelper load_dotenv() -_bucket=os.getenv("INFLUXDB_BUCKET") -_org=os.getenv("INFLUXDB_ORG") +load_dotenv() +# Create the InfluxDB client client = InfluxDBHelper( url=os.getenv("INFLUXDB_URL"), token=os.getenv("INFLUXDB_TOKEN"), - org=_org, - bucket=_bucket, + org=os.getenv("INFLUXDB_ORG"), + bucket=os.getenv("INFLUXDB_BUCKET"), ) +# Build a Flux query builder = ( FluxQueryBuilder() - .bucket(_bucket) + .bucket(os.getenv("INFLUXDB_BUCKET")) .time_range("-30d", "now()") .filter_measurement("sensor_data") - .filter_fields("co2", "rh", "temp") # with "or" we should implement "and" too + .filter_fields("co2", "humidity", "temperature") + .pivot() .mean() ) +# Get the query string +flux_query = builder.build() +print("Generated Flux Query:\n", flux_query) + +# Run the query +tables = client.query_api.query(org=os.getenv("INFLUXDB_ORG"), query=flux_query) + +# Output the results +for table in tables: + for record in table.records: + print(record.values) """ +flux_query = builder.build() + +print(flux_query) + +tables = client.query_api.query(org= _org, query= flux_query) + +for table in tables: + for record in table.records: + print(record) + + Some query examples from(bucket: "co2-test") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) @@ -52,12 +76,3 @@ from(bucket: "co2-dev") """ -flux_query = builder.build() - -print(flux_query) - -tables = client.query_api.query(org= _org, query= flux_query) - -for table in tables: - for record in table.records: - print(record) diff --git a/backend/stream_processing/influxDBWriter.py b/backend/stream_processing/influxDBWriter.py deleted file mode 100644 index 0213e5654b99aae06f1e9a7aea6e7e9aeab4cfa8..0000000000000000000000000000000000000000 --- a/backend/stream_processing/influxDBWriter.py +++ /dev/null @@ -1,23 +0,0 @@ -from influxdb_client import InfluxDBClient, Point, WritePrecision -from influxdb_client.client.write_api import SYNCHRONOUS - - -class InfluxDBWriter: - def __init__(self, url: str, token: str, org: str, bucket: str): - self.client = InfluxDBClient(url=url, token=token, org=org) - self.write_api = self.client.write_api(write_options=SYNCHRONOUS) - self.bucket = bucket - self.org = org - - def write_point( - self, measurement: str, tags: dict, fields: dict, timestamp=None - ): - point = Point(measurement) - for k, v in tags.items(): - point.tag(k, v) - for k, v in fields.items(): - point.field(k, v) - if timestamp: - point.time(timestamp, WritePrecision.NS) - - self.write_api.write(bucket=self.bucket, org=self.org, record=point) diff --git a/backend/stream_processing/mQTTClientHandler.py b/backend/stream_processing/mQTTClientHandler.py index 23f1e42ed9f2180f815dd035046713627ac73870..662d547cec23299051de8c227ff053402d4929f0 100644 --- a/backend/stream_processing/mQTTClientHandler.py +++ b/backend/stream_processing/mQTTClientHandler.py @@ -35,6 +35,7 @@ class MQTTClientHandler: self.logger.info("Connected with result code " + str(rc)) client.subscribe(self.topic) self.logger.info("Subscribed to " + self.topic) + print("Connected with result code " + str(rc) + "\n" + "Subscribed to " + self.topic) # eventuell refactorn und die Aufgaben in Methoden aufteilen def on_message(self, client, userdata, msg): @@ -55,6 +56,7 @@ class MQTTClientHandler: self.logger.warning( f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt." ) + print(f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt.") self.mac_to_room[mac] = "" # leerer Platzhalter jsonhandler.write_json(self.mac_to_room, self.MAPPING_FILE_NAME) self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME) @@ -77,10 +79,14 @@ class MQTTClientHandler: }, timestamp=metadate["time"], # fix ) - print( - "Wrote to InfluxDB:", msg - ) # muss später rausgeschmiessen werden + self.logger.info( + f"Wrote to InfluxDB: {msg}" + ) + print(f"Token: {self.influx_writer.get_token()}") + print(f"Wrote to InfluxDB: {msg}") + print(f"Ping: {self.influx_writer.ping()}") except Exception as e: + print(f"Failed writing to InfluxDb: {e}") self.logger.error(f"Failed writing to InfluxDb: {e}") def start(self): diff --git a/backend/stream_processing/mac_to_room.json b/backend/stream_processing/mac_to_room.json index 48037997e7995844b8ab4462e64a508bb1e6c094..df09e0a433af58d2d85c0ed29831b639a3a6f4c1 100644 --- a/backend/stream_processing/mac_to_room.json +++ b/backend/stream_processing/mac_to_room.json @@ -13,5 +13,7 @@ "T\u00dc:AD:BE:EF:12:34": "", "08:3A:8D:E3:DF:AF": "", "TE:ST:TE:ST:TE:ST": "", - "AB:CD:EF:12:34:56": "" + "AB:CD:EF:12:34:56": "", + "a1:b2:c3:d4:e5:f6": "", + "77:88:99:aa:bb:cc": "" } \ No newline at end of file diff --git a/backend/utils/__init__.py b/backend/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/backend/utils/fluxQueryBuilder.py b/backend/utils/fluxQueryBuilder.py index bd83d95f3054e04706fc62404e1b7128e5b1ec24..16eb1cca317bdbc2583f340a313ff40d20f49fa0 100644 --- a/backend/utils/fluxQueryBuilder.py +++ b/backend/utils/fluxQueryBuilder.py @@ -1,149 +1,103 @@ from typing import List, Optional + class FluxQueryBuilder: """ - A builder for constructing Flux queries in a fluent, step-by-step manner. + A builder class for constructing Flux queries (InfluxDB 2.x). + Supports fluent method chaining for readable query construction. Example: - builder = ( + query = ( FluxQueryBuilder() - .bucket("co2-test") - .time_range("v.timeRangeStart", "v.timeRangeStop") + .bucket("sensor_data") + .time_range("-1h", "now()") .filter_measurement("sensor_data") - .filter_fields("co2", "humidity", "temperature") # Optional - .mean() # Optional + .filter_fields("co2", "temperature", "humidity") + .filter_field("room", "1/210") + .pivot() + .mean() + .build() ) - query = builder.build() """ def __init__(self) -> None: - # internal state for each part of the query self._bucket: Optional[str] = None self._start: Optional[str] = None self._stop: Optional[str] = None self._measurement: Optional[str] = None self._fields: List[str] = [] - self._mean : Optional[bool] = None - self._dict : Optional[dict] = None + self._field_filters: dict = {} # e.g. {"room": "1/210"} + self._use_mean: bool = False + self._use_pivot: bool = False def bucket(self, name: str) -> "FluxQueryBuilder": - """ - Set the bucket name for the query. - - :param name: Name of the InfluxDB bucket - :return: self (for method chaining) - """ + """Set the InfluxDB bucket name.""" self._bucket = name return self - - def time_range(self, start: str, stop: str) -> "FluxQueryBuilder": - """ - Define the time window for the query. - :param start: Flux expression for the start time - :param stop: Flux expression for the stop time - :return: self (for method chaining) - """ + def time_range(self, start: str, stop: str) -> "FluxQueryBuilder": + """Set time range for the query.""" self._start = start self._stop = stop return self def filter_measurement(self, measurement: str) -> "FluxQueryBuilder": - """ - Add a filter for the _measurement tag. - - :param measurement: Name of the measurement to filter - :return: self (for method chaining) - """ + """Filter for a specific _measurement value.""" self._measurement = measurement return self - - def filter_field(self, field: str, value:str) -> "FluxQueryBuilder": - """ - Add filters for one or more _field tags (OR-combined). - :param - - Example: field = "room", value = "1/210" - - Result: |> filter(fn: (r) => r["room"] == "1/210") - - :return: self (for method chaining) - """ - # extend the list of fields to filter - self._dict[field] = value + + def filter_field(self, field: str, value: str) -> "FluxQueryBuilder": + """Add a tag filter like r["room"] == "1/210".""" + self._field_filters[field] = value return self def filter_fields(self, *fields: str) -> "FluxQueryBuilder": - """ - Add filters for one or more _field tags (OR-combined). - - :param fields: List of field names to include - :return: self (for method chaining) - """ - # extend the list of fields to filter + """Filter for multiple _field values using OR.""" self._fields.extend(fields) return self - + def mean(self) -> "FluxQueryBuilder": - """ - Activates average function for the _values of each table. + """Apply mean() aggregation.""" + self._use_mean = True + return self - :return: self (for method chaining) - """ - self._mean = True + def pivot(self) -> "FluxQueryBuilder": + """Apply pivot() to restructure results with multiple fields per timestamp.""" + self._use_pivot = True return self def build(self) -> str: - """ - Assemble and return the final Flux query string. - - :raises ValueError: if mandatory parts are missing - :return: Flux query as a multiline string - """ - # Validate required parts + """Construct and return the final Flux query string.""" if not self._bucket: raise ValueError("Bucket name is required.") if not (self._start and self._stop): - raise ValueError("Both start and stop times are required.") + raise ValueError("Start and stop times are required.") if not self._measurement: - raise ValueError("Measurement filter is required.") + raise ValueError("Measurement is required.") - # Build the query lines lines: List[str] = [] - - # Bucket lines.append(f'from(bucket: "{self._bucket}")') - # Range - lines.append(f'|> range(start: {self._start}, stop: {self._stop})') - # Measurement - lines.append( - f'|> filter(fn: (r) => r["_measurement"] == "{self._measurement}")' - ) + lines.append(f' |> range(start: {self._start}, stop: {self._stop})') + lines.append(f' |> filter(fn: (r) => r["_measurement"] == "{self._measurement}")') - # Add specified filter if any - #if self._dict: - #for key, value in self._dict.items(): - #lines.append(f'|> filter(fn: (r) => r["_field"] == "{f}")') - #lines.append(f'r["_field"] == "{f}"') + # Optional tag filters (e.g., room or mac) + for key, value in self._field_filters.items(): + lines.append(f' |> filter(fn: (r) => r["{key}"] == "{value}")') - # Add field filters if any + # Optional field filters (_field == ...) if self._fields: - # create OR expression for fields or_expr = " or ".join(f'r["_field"] == "{f}"' for f in self._fields) - lines.append(f'|> filter(fn: (r) => {or_expr})') + lines.append(f' |> filter(fn: (r) => {or_expr})') - # Add mean() function if called - if self.mean: - lines.append(f'|> mean()') + if self._use_mean: + lines.append(' |> mean()') - # Join lines with proper indentation - return "\n ".join(lines) + if self._use_pivot: + lines.append(' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")') - def reset(self) -> "FluxQueryBuilder": - """ - Reset the builder state to start fresh. + return "\n".join(lines) - :return: self (for method chaining) - """ + def reset(self) -> "FluxQueryBuilder": + """Reset the builder so a new query can be built from scratch.""" self.__init__() return self diff --git a/backend/utils/influx.py b/backend/utils/influx.py index ee4014cbc8e6564ede4daa2f986dc001c86bef96..107252e47884b0886f4a5e3e4d3df391f11eec6a 100644 --- a/backend/utils/influx.py +++ b/backend/utils/influx.py @@ -5,6 +5,7 @@ from influxdb_client.client.write_api import WriteOptions class InfluxDBHelper: def __init__(self, url: str, token: str, org: str, bucket: str): + self.token = token self.client = InfluxDBClient(url=url, token=token, org=org) self.bucket = bucket self.org = org @@ -26,6 +27,12 @@ class InfluxDBHelper: if timestamp: point.time(timestamp, WritePrecision.NS) self.write_api.write(bucket=self.bucket, org=self.org, record=point) + + def get_token(self): + return self.token + + def ping(self): + return self.client.ping() def get_all_data(self): """ """ diff --git a/docker-compose.yaml b/docker-compose.yaml index 1f1a5c6db87880992f1044af561f8a8f84987e09..1590d69a283573c8f6c02ca33653282aade2f14c 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -64,7 +64,7 @@ services: DOCKER_INFLUXDB_INIT_PASSWORD_FILE: /run/secrets/influxdb2-admin-password DOCKER_INFLUXDB_INIT_ADMIN_TOKEN_FILE: /run/secrets/influxdb2-admin-token DOCKER_INFLUXDB_INIT_ORG: docs - DOCKER_INFLUXDB_INIT_BUCKET: home + DOCKER_INFLUXDB_INIT_BUCKET: co2-test secrets: - influxdb2-admin-username - influxdb2-admin-password @@ -76,7 +76,6 @@ services: - type: volume source: influxdb2-config target: /etc/influxdb2 - secrets: influxdb2-admin-username: file: ./services/influxdb/influxdb2-admin-username @@ -84,8 +83,6 @@ secrets: file: ./services/influxdb/influxdb2-admin-password influxdb2-admin-token: file: ./services/influxdb/influxdb2-admin-token - volumes: influxdb2-data: - influxdb2-config: - + influxdb2-config: \ No newline at end of file diff --git a/frontend/src/components/LiveChart.vue b/frontend/src/components/LiveChart.vue index fea96e51833573d980ea0a6f73c7a31f24245e72..17effade146c0d3758b65093780ee43d12f0a942 100644 --- a/frontend/src/components/LiveChart.vue +++ b/frontend/src/components/LiveChart.vue @@ -67,8 +67,6 @@ updateChart: () => chartRef.value?.chart?.update() }) - // Beispielzustand - const count = ref(0) </script> <style scoped> diff --git a/services/influxdb/compose.yaml b/services/influxdb/compose.yaml index cf6baeebf72bbaf8715975483d54e88ad35b3c16..212b029b5476e26b68fd1d4a970b5e6c9a49902d 100644 --- a/services/influxdb/compose.yaml +++ b/services/influxdb/compose.yaml @@ -10,10 +10,10 @@ services: DOCKER_INFLUXDB_INIT_PASSWORD_FILE: /run/secrets/influxdb2-admin-password DOCKER_INFLUXDB_INIT_ADMIN_TOKEN_FILE: /run/secrets/influxdb2-admin-token DOCKER_INFLUXDB_INIT_ORG: docs - DOCKER_INFLUXDB_INIT_BUCKET: home + DOCKER_INFLUXDB_INIT_BUCKET: co2-test secrets: - influxdb2-admin-username - - influxdb2-admin-password + - influxdb2-admin-password - influxdb2-admin-token volumes: - type: volume @@ -24,11 +24,11 @@ services: target: /etc/influxdb2 secrets: influxdb2-admin-username: - file: /opt/stacks/influxdb/influxdb2-admin-username + file: ./influxdb2-admin-username influxdb2-admin-password: - file: /opt/stacks/influxdb/influxdb2-admin-password + file: ./influxdb2-admin-password influxdb2-admin-token: - file: /opt/stacks/influxdb/influxdb2-admin-token + file: ./influxdb2-admin-token volumes: influxdb2-data: influxdb2-config: diff --git a/services/influxdb/influxdb2-admin-token b/services/influxdb/influxdb2-admin-token index dd9fb541a73a74caae14360280eef26790d9be64..6f3eee4c3f44dab3c69b3a9d3107e8e53fe6d4d0 100644 --- a/services/influxdb/influxdb2-admin-token +++ b/services/influxdb/influxdb2-admin-token @@ -1 +1 @@ -MyInitialAdminToken0== +w-Isk1D35T90Srj_auFTxsbksn1zRB5MiNZf6h6RuNdb9-2s9ie5c1488JqoYILKrceVm0LaE5KCN2dXdDM-jA== \ No newline at end of file diff --git a/services/mqtt/data/mosquitto.db b/services/mqtt/data/mosquitto.db index 762cc36a198e6e81bcc6265eb6cb1e53577a0122..9d20846b891a8075c71345a58fd85f7b86fa955e 100644 Binary files a/services/mqtt/data/mosquitto.db and b/services/mqtt/data/mosquitto.db differ