Commit d22cb38d authored by Mele's avatar Mele
Browse files

small chanches to everything ^^

parent 7b77e079
Showing with 362 additions and 75 deletions
+362 -75
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
INFLUXDB_URL=http://influxdb2:8086 INFLUXDB_URL=http://influxdb2:8086
INFLUXDB_ORG=docs INFLUXDB_ORG=docs
INFLUXDB_BUCKET=co2-test INFLUXDB_BUCKET=co2-test
INFLUXDB_TOKEN=w-Isk1D35T90Srj_auFTxsbksn1zRB5MiNZf6h6RuNdb9-2s9ie5c1488JqoYILKrceVm0LaE5KCN2dXdDM-jA== INFLUXDB_TOKEN=1xa5lLACRZDYsvinhABndZ8GGzBY7-gTQsAf309c0aTnPPtBxixPEEOPuXLmkTxUKy8golKae6fsrh1wD4SL0A==
# MQTT config # MQTT config
MQTT_BROKER_URL=mosquitto-broker MQTT_BROKER_URL=mosquitto-broker
MQTT_TOPIC="co2/#" MQTT_TOPIC="co2/#"
\ No newline at end of file
...@@ -5,7 +5,7 @@ FROM python:latest ...@@ -5,7 +5,7 @@ FROM python:latest
WORKDIR /app WORKDIR /app
# Kopiere Projektdateien für Abhängigkeitsmanagement # Kopiere Projektdateien für Abhängigkeitsmanagement
COPY pyproject.toml uv.lock ./ COPY . .
# Installiere pip und uv # Installiere pip und uv
RUN pip install --upgrade pip && \ RUN pip install --upgrade pip && \
...@@ -21,7 +21,7 @@ RUN uv sync ...@@ -21,7 +21,7 @@ RUN uv sync
# für Admin oberfläche # für Admin oberfläche
#RUN python manage.py createsuperuser #RUN python manage.py createsuperuser
COPY . . #COPY . .
RUN .venv/bin/python manage.py migrate RUN .venv/bin/python manage.py migrate
......
This diff is collapsed.
services: services:
stream-processing: stream-processing:
build: build:
context: ./backend context: ./stream_processing
dockerfile: stream_processing/Dockerfile dockerfile: Dockerfile
image: stream-processing image: stream-processing
container_name: stream-processing container_name: stream-processing
volumes: volumes:
- ./backend/stream_processing:/app/backend/stream_processing - ./stream_processing/mac_to_room.json:/app/mac_to_room.json
env_file: env_file:
- ./backend/.env.docker - ./backend/.env.docker
restart: unless-stopped restart: unless-stopped
...@@ -30,7 +30,9 @@ services: ...@@ -30,7 +30,9 @@ services:
- ./backend:/app - ./backend:/app
frontend: frontend:
build: ./frontend build:
context: ./frontend
dockerfile: Dockerfile
ports: ports:
- "5173:5173" - "5173:5173"
volumes: volumes:
...@@ -60,7 +62,7 @@ services: ...@@ -60,7 +62,7 @@ services:
ports: ports:
- "8081:80" # Port 80 im Container auf 8081 lokal mappen - "8081:80" # Port 80 im Container auf 8081 lokal mappen
restart: unless-stopped restart: unless-stopped
influxdb2: influxdb2:
image: influxdb:2 image: influxdb:2
ports: ports:
......
Binary files /dev/null and b/ordnerstruktur.txt differ Binary files /dev/null and b/ordnerstruktur.txt differ
password password
\ No newline at end of file
w-Isk1D35T90Srj_auFTxsbksn1zRB5MiNZf6h6RuNdb9-2s9ie5c1488JqoYILKrceVm0LaE5KCN2dXdDM-jA== 1xa5lLACRZDYsvinhABndZ8GGzBY7-gTQsAf309c0aTnPPtBxixPEEOPuXLmkTxUKy8golKae6fsrh1wD4SL0A==
\ No newline at end of file \ No newline at end of file
admin admin
\ No newline at end of file
# Verwende das neueste Python-Image
FROM python:latest FROM python:latest
# Setze das Arbeitsverzeichnis im Container # Setze das Arbeitsverzeichnis im Container
WORKDIR /app WORKDIR /app
# Kopiere die pyproject.toml und uv.lock aus dem Backend-Hauptverzeichnis # Kopiere die pyproject.toml und uv.lock aus dem Backend-Hauptverzeichnis
COPY ../pyproject.toml ../uv.lock ./ COPY . .
# Installiere pip und uv # Installiere pip und uv
RUN pip install --upgrade pip && \ RUN pip install --upgrade pip && \
...@@ -19,13 +18,14 @@ RUN uv venv .venv && \ ...@@ -19,13 +18,14 @@ RUN uv venv .venv && \
RUN uv sync RUN uv sync
# Kopiere NUR den Stream-Processing-Code # Kopiere NUR den Stream-Processing-Code
COPY . . #COPY . .
# Optional: Kopiere ggf. auch andere Module aus dem Backend, falls du darauf zugreifen willst # Optional: Kopiere ggf. auch andere Module aus dem Backend, falls du darauf zugreifen willst
COPY ../utils ../utils #COPY utils utils
# Setze Umgebungsvariablen # Setze Umgebungsvariablen
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
# Startbefehl für das Stream-Processing # Startbefehl für das Stream-Processing
CMD ["uv", "run", "python", "-m", "backend.stream_processing.main"] CMD ["uv", "run", "python", "-m", "main"]
File moved
...@@ -7,18 +7,21 @@ def load_json(file_name: str) -> dict: ...@@ -7,18 +7,21 @@ def load_json(file_name: str) -> dict:
und gibt diese als dictionary zurück und gibt diese als dictionary zurück
key : value key : value
""" """
print("JSONHANDLER")
if not os.path.exists(file_name): if not os.path.exists(file_name):
print("FILE IST OFFEN")
return {} return {}
with open(file_name) as f: with open(file_name) as f:
mac_room_mapping = json.load(f) mac_room_mapping = json.load(f)
return mac_room_mapping return mac_room_mapping
def write_json(mac_room_mapping: dict, file_name: str): def write_json(mac_room_mapping: dict, file_name: str):
""" print("ES SCHREIBT IN JASON")
Nimmt ein dictionary und schreibt dessen try:
Inhalte in eine JSON Datei print(f"→ Schreibe Datei: {file_name}")
""" with open(file_name, "w") as f:
with open(file_name, "w") as f: print("FILE IST OPEN")
f.seek(0) json.dump(mac_room_mapping, f, indent=4)
json.dump(mac_room_mapping, f, indent=4) print("✅ Datei erfolgreich geschrieben")
f.truncate() # TODO Check if truncate is necessary? except Exception as e:
print("❌ Fehler beim Schreiben:", e)
...@@ -3,7 +3,7 @@ import json ...@@ -3,7 +3,7 @@ import json
from utils.loggingFactory import LoggerFactory from utils.loggingFactory import LoggerFactory
from datetime import datetime from datetime import datetime
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from stream_processing import jsonhandler import jsonhandler
from utils.influx import InfluxDBHelper from utils.influx import InfluxDBHelper
import os import os
...@@ -24,7 +24,9 @@ class MQTTClientHandler: ...@@ -24,7 +24,9 @@ class MQTTClientHandler:
# Konstruktor # Konstruktor
def __init__( def __init__(
self, broker_url: str, topic: str, influx_writer: InfluxDBHelper self, broker_url: str, topic: str, influx_writer: InfluxDBHelper
): ):
print("DAS IST EIN TEST")
self.logger = LoggerFactory.get_logger(__name__) self.logger = LoggerFactory.get_logger(__name__)
# key: mac : value : room # key: mac : value : room
self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME) self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
...@@ -38,6 +40,7 @@ class MQTTClientHandler: ...@@ -38,6 +40,7 @@ class MQTTClientHandler:
def on_connect(self, client, userdata, flags, rc): def on_connect(self, client, userdata, flags, rc):
self.logger.info("Connected with result code " + str(rc)) self.logger.info("Connected with result code " + str(rc))
print("Connected")
client.subscribe(self.topic) client.subscribe(self.topic)
self.logger.info("Subscribed to " + self.topic) self.logger.info("Subscribed to " + self.topic)
...@@ -48,6 +51,9 @@ class MQTTClientHandler: ...@@ -48,6 +51,9 @@ class MQTTClientHandler:
self: ist die MQTTClientHandler instanz, die wird gebraucht um die Einträge in self: ist die MQTTClientHandler instanz, die wird gebraucht um die Einträge in
die InfluxDB zu schreiben die InfluxDB zu schreiben
""" """
print("Message")
msg = json.loads(msg.payload) msg = json.loads(msg.payload)
metadate = msg["metadata"] metadate = msg["metadata"]
...@@ -60,15 +66,18 @@ class MQTTClientHandler: ...@@ -60,15 +66,18 @@ class MQTTClientHandler:
self.logger.warning( self.logger.warning(
f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt." f"Neue MAC-Adresse gefunden: {mac}. Mapping wird ergänzt."
) )
print("MAC war nicht in File")
self.mac_to_room[mac] = "" # leerer Platzhalter self.mac_to_room[mac] = "" # leerer Platzhalter
jsonhandler.write_json(self.mac_to_room, self.MAPPING_FILE_NAME) jsonhandler.write_json(self.mac_to_room, self.MAPPING_FILE_NAME)
self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME) self.mac_to_room = jsonhandler.load_json(self.MAPPING_FILE_NAME)
print("keine Ahnung")
return return
self.write_to_influxDB(msg, metadate) self.write_to_influxDB(msg, metadate)
def write_to_influxDB(self, msg: dict, metadate: dict): def write_to_influxDB(self, msg: dict, metadate: dict):
try: try:
print(msg)
self.influx_writer.write_point( self.influx_writer.write_point(
measurement=self.MEASUREMENT_NAME, measurement=self.MEASUREMENT_NAME,
tags={ tags={
......
...@@ -17,5 +17,7 @@ ...@@ -17,5 +17,7 @@
"a1:b2:c3:d4:e5:f6": "", "a1:b2:c3:d4:e5:f6": "",
"77:88:99:aa:bb:cc": "", "77:88:99:aa:bb:cc": "",
"test": "", "test": "",
"aaron": "" "aaron": "",
"AAFF": "",
"AAFFEE": ""
} }
\ No newline at end of file
from dotenv import load_dotenv from dotenv import load_dotenv
import os import os
from stream_processing.mQTTClientHandler import MQTTClientHandler from mQTTClientHandler import MQTTClientHandler
from utils.influx import InfluxDBHelper from utils.influx import InfluxDBHelper
load_dotenv() load_dotenv()
def main(): def main():
print("🟢 Stream-Processing gestartet", flush=True)
influx_writer = InfluxDBHelper( influx_writer = InfluxDBHelper(
url=os.getenv("INFLUXDB_URL"), url=os.getenv("INFLUXDB_URL"),
token=os.getenv("INFLUXDB_TOKEN"), token=os.getenv("INFLUXDB_TOKEN"),
org=os.getenv("INFLUXDB_ORG"), org=os.getenv("INFLUXDB_ORG"),
bucket=os.getenv("INFLUXDB_BUCKET"), bucket=os.getenv("INFLUXDB_BUCKET"),
) )
print("== ENV =="),
print("TOKEN:", os.getenv("INFLUXDB_TOKEN")),
print("URL:", os.getenv("INFLUXDB_URL")),
print("ORG:", os.getenv("INFLUXDB_ORG")),
print("BUCKET:", os.getenv("INFLUXDB_BUCKET")),
mqtt_handler = MQTTClientHandler( mqtt_handler = MQTTClientHandler(
broker_url=os.getenv("MQTT_BROKER_URL"), broker_url=os.getenv("MQTT_BROKER_URL"),
topic=os.getenv("MQTT_TOPIC"), topic=os.getenv("MQTT_TOPIC"),
......
[project]
name = "backend"
version = "0.1.0"
description = "backend of co2 web service"
authors = [
{name = "Patrick Ade", email = "21adpa1bif@hft-stuttgart.de"},
{name = "Morten Stetter", email = "22stmo1bif@hft-stuttgart.de"},
{name = "Aaron Mele", email = "21meaa1bif@hft-stuttgart.de"},
{name = "Emre Gezer", email = "21geem1bif@hft-stuttgart.de"},
]
maintainers = [
{name = "Max Mustermann", email = "mustermann@example.com"}
]
readme = "README.md"
keywords = ["co2", "sensor", "temperature", "humidity", "monitor", "monitoring", "air quality"]
requires-python = ">=3.12"
dependencies = [
"influxdb-client>=1.40",
"python-dotenv>=1.1",
"paho-mqtt>=2.1",
]
[dependency-groups]
dev = [
"ruff>=0.11.5"
]
[tool.ruff]
# Set the maximum line length to 80.
line-length = 80
[tool.ruff.lint]
# Add the `line-too-long` rule to the enforced rule set.
extend-select = ["E501"]
#[build-system]
#requires = ["setuptools" ] #"wheel"
#build-backend = "setuptools.build_meta"
#[tool.setuptools]
#package-dir = {"" = "."}
#[tool.setuptools.packages.find]
#where = ["backend"]
from typing import List, Optional
class FluxQueryBuilder:
"""
A builder class for constructing Flux queries (InfluxDB 2.x).
Supports fluent method chaining for readable query construction.
Example:
query = (
FluxQueryBuilder()
.bucket("sensor_data")
.time_range("-1h", "now()")
.filter_measurement("sensor_data")
.filter_fields("co2", "temperature", "humidity")
.filter_field("room", "1/210")
.pivot()
.mean()
.build()
)
"""
def __init__(self) -> None:
self._bucket: Optional[str] = None
self._start: Optional[str] = None
self._stop: Optional[str] = None
self._measurement: Optional[str] = None
self._fields: List[str] = []
self._field_filters: dict = {} # e.g. {"room": "1/210"}
self._use_mean: bool = False
self._use_pivot: bool = False
def bucket(self, name: str) -> "FluxQueryBuilder":
"""Set the InfluxDB bucket name."""
self._bucket = name
return self
def time_range(self, start: str, stop: str) -> "FluxQueryBuilder":
"""Set time range for the query."""
self._start = start
self._stop = stop
return self
def filter_measurement(self, measurement: str) -> "FluxQueryBuilder":
"""Filter for a specific _measurement value."""
self._measurement = measurement
return self
def filter_field(self, field: str, value: str) -> "FluxQueryBuilder":
"""Add a tag filter like r["room"] == "1/210"."""
self._field_filters[field] = value
return self
def filter_fields(self, *fields: str) -> "FluxQueryBuilder":
"""Filter for multiple _field values using OR."""
self._fields.extend(fields)
return self
def mean(self) -> "FluxQueryBuilder":
"""Apply mean() aggregation."""
self._use_mean = True
return self
def pivot(self) -> "FluxQueryBuilder":
"""Apply pivot() to restructure results with multiple fields per timestamp."""
self._use_pivot = True
return self
def build(self) -> str:
"""Construct and return the final Flux query string."""
if not self._bucket:
raise ValueError("Bucket name is required.")
if not (self._start and self._stop):
raise ValueError("Start and stop times are required.")
if not self._measurement:
raise ValueError("Measurement is required.")
lines: List[str] = []
lines.append(f'from(bucket: "{self._bucket}")')
lines.append(f' |> range(start: {self._start}, stop: {self._stop})')
lines.append(f' |> filter(fn: (r) => r["_measurement"] == "{self._measurement}")')
# Optional tag filters (e.g., room or mac)
for key, value in self._field_filters.items():
lines.append(f' |> filter(fn: (r) => r["{key}"] == "{value}")')
# Optional field filters (_field == ...)
if self._fields:
or_expr = " or ".join(f'r["_field"] == "{f}"' for f in self._fields)
lines.append(f' |> filter(fn: (r) => {or_expr})')
if self._use_mean:
lines.append(' |> mean()')
if self._use_pivot:
lines.append(' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")')
return "\n".join(lines)
def reset(self) -> "FluxQueryBuilder":
"""Reset the builder so a new query can be built from scratch."""
self.__init__()
return self
import os
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import WriteOptions
class InfluxDBHelper:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.bucket = bucket
self.org = org
self.query_api = self.client.query_api()
# self.write_api = self.client.write_api(write_options=SYNCHRONOUS) good for debug
self.write_api = self.client.write_api(
write_options=WriteOptions(batch_size=1000, flush_interval=10000)
)
def write_point(
self, measurement: str, tags: dict, fields: dict, timestamp=None
):
""" """
point = Point(measurement)
for k, v in tags.items():
point.tag(k, v)
for k, v in fields.items():
point.field(k, v)
if timestamp:
point.time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
def ping(self) -> bool:
return self.client.ping()
def get_all_data(self):
""" """
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -20d)
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
'''
return self.query_api.query(org=self.org, query=query)
def get_latest_room_data(self, room_id: str):
""" """
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -5m)
|> filter(fn: (r) => r["_measurement"] == "co2")
|> filter(fn: (r) => r["room"] == "{room_id}")
|> last()
'''
return self.query_api.query(org=self.org, query=query)
import logging
import os
from logging.handlers import RotatingFileHandler
LOG_DIR = "logs"
LOG_FILE = "app.log"
LOG_PATH = os.path.join(LOG_DIR, LOG_FILE)
class LoggerFactory:
# logger.info("Connected with result code %s", str(rc))
# logger.warning("Neue MAC-Adresse gefunden: %s", mac)
# logger.error("Failed writing to InfluxDb: %s", e)
@staticmethod
def get_logger(name: str, level=logging.DEBUG) -> logging.Logger:
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
logger = logging.getLogger(name)
if logger.hasHandlers():
return logger # vermeidet doppelte Handler
logger.setLevel(level)
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s in %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler = RotatingFileHandler(
LOG_PATH, maxBytes=5_000_000, backupCount=5
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment