An error occurred while loading the file. Please try again.
influx.py 1.94 KiB
import os
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import WriteOptions
class InfluxDBHelper:
    def __init__(self, url: str, token: str, org: str, bucket: str):
        self.url = url
        self.token = token
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.bucket = bucket
        self.org = org
        self.query_api = self.client.query_api()
        # self.write_api = self.client.write_api(write_options=SYNCHRONOUS) good for debug
        self.write_api = self.client.write_api(
            write_options=WriteOptions(batch_size=1000, flush_interval=10000)
    def write_point(
        self, measurement: str, tags: dict, fields: dict, timestamp=None
        """ """
        point = Point(measurement)
        for k, v in tags.items():
            point.tag(k, v)
        for k, v in fields.items():
            point.field(k, v)
        if timestamp:
            point.time(timestamp, WritePrecision.NS)
        self.write_api.write(bucket=self.bucket, org=self.org, record=point)
    def get_token(self):
        return self.token
    def get_url(self):
        return self.url
    def ping(self):
        return self.client.ping()
    def get_all_data(self):
        """ """
        query = f'''
            from(bucket: "{self.bucket}")
                |> range(start: -20d)
                |> filter(fn: (r) => r["_measurement"] == "sensor_data")
        '''
        return self.query_api.query(org=self.org, query=query)
    def get_latest_room_data(self, room_id: str):
        """ """
        query = f'''
            from(bucket: "{self.bucket}")
                |> range(start: -5m)
                |> filter(fn: (r) => r["_measurement"] == "co2")
                |> filter(fn: (r) => r["room"] == "{room_id}")
                |> last()
        '''
        return self.query_api.query(org=self.org, query=query)