Commit 1ac40ac4 authored by Gezer's avatar Gezer
Browse files

added fluxQueryBuilder, modified influxdb_service

parent 9fc93f71
Showing with 185 additions and 5 deletions
+185 -5
import os
from dotenv import load_dotenv
from utils.fluxQueryBuilder import FluxQueryBuilder
from utils.influx import InfluxDBHelper
load_dotenv()
_bucket=os.getenv("INFLUXDB_BUCKET")
_org=os.getenv("INFLUXDB_ORG")
client = InfluxDBHelper(
url=os.getenv("INFLUXDB_URL"),
token=os.getenv("INFLUXDB_TOKEN"),
org=os.getenv("INFLUXDB_ORG"),
bucket=os.getenv("INFLUXDB_BUCKET"),
org=_org,
bucket=_bucket,
)
# 2) build a Flux query at runtime
builder = (
FluxQueryBuilder()
.bucket(_bucket)
.time_range("-30d", "now()")
.filter_measurement("sensor_data")
.filter_fields("co2", "rh", "temp")
.mean()
)
tables = client.get_all_data()
"""
Some query examples
from(bucket: "co2-test")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
|> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature")
from(bucket: "co2-test")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
|> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature")
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total")
|> window(every: 5m) //every five min
|> mean() //gives the average back
from(bucket: "co2-dev")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
|> filter(fn: (r) => r["room"] == "1/210")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")
"""
flux_query = builder.build()
print(flux_query)
tables = client.query_api.query(org= _org, query= flux_query)
for table in tables:
for record in table.records:
......
......@@ -8,7 +8,7 @@ from utils.influx import InfluxDBHelper
class MQTTClientHandler:
MAPPING_FILE_NAME = "stream_processing/mac_to_room.json"
MAPPING_FILE_NAME = "backend/stream_processing/mac_to_room.json"
MEASUREMENT_NAME = "sensor_data"
TAG_ROOM = "room"
TAG_MAC = "mac"
......
......@@ -10,5 +10,8 @@
"AB:AD:BE:EF:12:34": "",
"EF:AD:BE:EF:12:34": "",
"Tx:AD:BE:EF:12:34": "",
"T\u00dc:AD:BE:EF:12:34": ""
"T\u00dc:AD:BE:EF:12:34": "",
"08:3A:8D:E3:DF:AF": "",
"TE:ST:TE:ST:TE:ST": "",
"AB:CD:EF:12:34:56": ""
}
\ No newline at end of file
from typing import List, Optional
class FluxQueryBuilder:
"""
A builder for constructing Flux queries in a fluent, step-by-step manner.
Example:
builder = (
FluxQueryBuilder()
.bucket("co2-test")
.time_range("v.timeRangeStart", "v.timeRangeStop")
.filter_measurement("sensor_data")
.filter_fields("co2", "humidity", "temperature") # Optional
.mean() # Optional
)
query = builder.build()
"""
def __init__(self) -> None:
# internal state for each part of the query
self._bucket: Optional[str] = None
self._start: Optional[str] = None
self._stop: Optional[str] = None
self._measurement: Optional[str] = None
self._fields: List[str] = []
self._mean : Optional[bool] = None
def bucket(self, name: str) -> "FluxQueryBuilder":
"""
Set the bucket name for the query.
:param name: Name of the InfluxDB bucket
:return: self (for method chaining)
"""
self._bucket = name
return self
def time_range(self, start: str, stop: str) -> "FluxQueryBuilder":
"""
Define the time window for the query.
:param start: Flux expression for the start time
:param stop: Flux expression for the stop time
:return: self (for method chaining)
"""
self._start = start
self._stop = stop
return self
def filter_measurement(self, measurement: str) -> "FluxQueryBuilder":
"""
Add a filter for the _measurement tag.
:param measurement: Name of the measurement to filter
:return: self (for method chaining)
"""
self._measurement = measurement
return self
def filter_fields(self, *fields: str) -> "FluxQueryBuilder":
"""
Add filters for one or more _field tags (OR-combined).
:param fields: List of field names to include
:return: self (for method chaining)
"""
# extend the list of fields to filter
self._fields.extend(fields)
return self
def mean(self) -> "FluxQueryBuilder":
"""
Activates average function for the _values of each table.
:return: self (for method chaining)
"""
self._mean = True
return self
def build(self) -> str:
"""
Assemble and return the final Flux query string.
:raises ValueError: if mandatory parts are missing
:return: Flux query as a multiline string
"""
# Validate required parts
if not self._bucket:
raise ValueError("Bucket name is required.")
if not (self._start and self._stop):
raise ValueError("Both start and stop times are required.")
if not self._measurement:
raise ValueError("Measurement filter is required.")
# Build the query lines
lines: List[str] = []
# Bucket
lines.append(f'from(bucket: "{self._bucket}")')
# Range
lines.append(f'|> range(start: {self._start}, stop: {self._stop})')
# Measurement
lines.append(
f'|> filter(fn: (r) => r["_measurement"] == "{self._measurement}")'
)
# Add field filters if any
if self._fields:
# create OR expression for fields
or_expr = " or ".join(f'r["_field"] == "{f}"' for f in self._fields)
lines.append(f'|> filter(fn: (r) => {or_expr})')
# Add mean() function if called
if self.mean:
lines.append(f'|> mean()')
# Join lines with proper indentation
return "\n ".join(lines)
def reset(self) -> "FluxQueryBuilder":
"""
Reset the builder state to start fresh.
:return: self (for method chaining)
"""
self.__init__()
return self
......@@ -46,3 +46,4 @@ class InfluxDBHelper:
|> last()
'''
return self.query_api.query(org=self.org, query=query)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment