An error occurred while loading the file. Please try again.
-
Gezer authored20ba60a7
import os
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import WriteOptions
class InfluxDBHelper:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.bucket = bucket
self.org = org
self.query_api = self.client.query_api()
# self.write_api = self.client.write_api(write_options=SYNCHRONOUS) good for debug
self.write_api = self.client.write_api(
write_options=WriteOptions(batch_size=1000, flush_interval=10000)
)
def write_point(
self, measurement: str, tags: dict, fields: dict, timestamp=None
):
""" """
point = Point(measurement)
for k, v in tags.items():
point.tag(k, v)
for k, v in fields.items():
point.field(k, v)
if timestamp:
point.time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
def ping(self) -> bool:
return self.client.ping()
def get_all_data(self):
""" """
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -20d)
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
'''
return self.query_api.query(org=self.org, query=query)
def get_latest_room_data(self, room_id: str, start: str = "-5M", stop: str = "now()"):
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start}, stop: {stop})
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
|> filter(fn: (r) => r["room"] == "{room_id}")
|> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature")
'''
return self.query_api.query(org=self.org, query=query)
def get_room_data_in_range(self, room_id: str, start: str = "-30d", stop: str = "now()"):
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start}, stop: {stop})
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
|> filter(fn: (r) => r["room"] == "{room_id}")
|> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature")
'''
return self.query_api.query(org=self.org, query=query)
def list_rooms(self):
query = f'''
import "influxdata/influxdb/schema"
schema.tagValues(
bucket: "{self.bucket}",
tag: "room"
)
'''
return self.query_api.query(org=self.org, query=query)
def get_room_data_csv(self, room_id: str, start: str = "-30d", stop: str = "now()"):
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {start}, stop: {stop})
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
|> filter(fn: (r) => r["room"] == "{room_id}")
|> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature")
'''
return self.query_api.query_csv(query=query, org=self.org)
def get_aggregated_room_data(self, room_id: str, window: str = "1h"):
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -30d)
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
|> filter(fn: (r) => r["room"] == "{room_id}")
|> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature")
|> aggregateWindow(every: {window}, fn: mean, createEmpty: false)
|> yield(name: "mean")
'''
return self.query_api.query(org=self.org, query=query)