-
Gezer authored
and added new file in backend "influxdb_service" where we can get data from the database but still need to be implemented right and fixed
d00e180d
import os
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import WriteOptions
class InfluxDBHelper:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.client = InfluxDBClient(url=url, token=token, org=org)
self.bucket = bucket
self.org = org
self.query_api = self.client.query_api()
# self.write_api = self.client.write_api(write_options=SYNCHRONOUS) good for debug
self.write_api = self.client.write_api(write_options=WriteOptions(batch_size=1000, flush_interval=10000))
def write_point(self, measurement: str, tags: dict, fields: dict, timestamp=None):
'''
'''
point = Point(measurement)
for k, v in tags.items():
point.tag(k, v)
for k, v in fields.items():
point.field(k, v)
if timestamp:
point.time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
def get_all_data(self):
'''
'''
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
'''
return self.query_api.query(org=self.org, query=query)
def get_latest_room_data(self, room_id: str):
'''
'''
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -5m)
|> filter(fn: (r) => r["_measurement"] == "co2")
|> filter(fn: (r) => r["room"] == "{room_id}")
|> last()
'''
return self.query_api.query(org=self.org, query=query)