An error occurred while loading the file. Please try again.
-
Gezer authoredfa5008bb
import os
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import WriteOptions
class InfluxDBHelper:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.url = url
self.token = token
self.client = InfluxDBClient(url=url, token=token, org=org)
self.bucket = bucket
self.org = org
self.query_api = self.client.query_api()
# self.write_api = self.client.write_api(write_options=SYNCHRONOUS) good for debug
self.write_api = self.client.write_api(
write_options=WriteOptions(batch_size=1000, flush_interval=10000)
)
def write_point(
self, measurement: str, tags: dict, fields: dict, timestamp=None
):
""" """
point = Point(measurement)
for k, v in tags.items():
point.tag(k, v)
for k, v in fields.items():
point.field(k, v)
if timestamp:
point.time(timestamp, WritePrecision.NS)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
def get_token(self):
return self.token
def get_url(self):
return self.url
def ping(self):
return self.client.ping()
def get_all_data(self):
""" """
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -20d)
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
'''
return self.query_api.query(org=self.org, query=query)
def get_latest_room_data(self, room_id: str):
""" """
query = f'''
from(bucket: "{self.bucket}")
|> range(start: -5m)
|> filter(fn: (r) => r["_measurement"] == "co2")
|> filter(fn: (r) => r["room"] == "{room_id}")
|> last()
'''
return self.query_api.query(org=self.org, query=query)