An error occurred while loading the file. Please try again.
influx.py 3.79 KiB
import os
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import WriteOptions
class InfluxDBHelper:
    def __init__(self, url: str, token: str, org: str, bucket: str):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.bucket = bucket
        self.org = org
        self.query_api = self.client.query_api()
        # self.write_api = self.client.write_api(write_options=SYNCHRONOUS) good for debug
        self.write_api = self.client.write_api(
            write_options=WriteOptions(batch_size=1000, flush_interval=10000)
    def write_point(
        self, measurement: str, tags: dict, fields: dict, timestamp=None
        """ """
        point = Point(measurement)
        for k, v in tags.items():
            point.tag(k, v)
        for k, v in fields.items():
            point.field(k, v)
        if timestamp:
            point.time(timestamp, WritePrecision.NS)
        self.write_api.write(bucket=self.bucket, org=self.org, record=point)
    def ping(self) -> bool:
        return self.client.ping()
    def get_all_data(self):
        """ """
        query = f'''
            from(bucket: "{self.bucket}")
                |> range(start: -20d)
                |> filter(fn: (r) => r["_measurement"] == "sensor_data")
        '''
        return self.query_api.query(org=self.org, query=query)
    def get_latest_room_data(self, room_id: str, start: str = "-5M", stop: str = "now()"):
        query = f'''
            from(bucket: "{self.bucket}")
            |> range(start: {start}, stop: {stop})
            |> filter(fn: (r) => r["_measurement"] == "sensor_data")
            |> filter(fn: (r) => r["room"] == "{room_id}")
            |> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature")
        '''
        return self.query_api.query(org=self.org, query=query)
    def get_room_data_in_range(self, room_id: str, start: str = "-30d", stop: str = "now()"):
        query = f'''
            from(bucket: "{self.bucket}")
            |> range(start: {start}, stop: {stop})
            |> filter(fn: (r) => r["_measurement"] == "sensor_data")
            |> filter(fn: (r) => r["room"] == "{room_id}")
            |> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature")
        '''
        return self.query_api.query(org=self.org, query=query)
    def list_rooms(self):
        query = f'''
        import "influxdata/influxdb/schema"
        schema.tagValues(
            bucket: "{self.bucket}",
            tag: "room"
        '''
        return self.query_api.query(org=self.org, query=query)
def get_room_data_csv(self, room_id: str, start: str = "-30d", stop: str = "now()"): query = f''' from(bucket: "{self.bucket}") |> range(start: {start}, stop: {stop}) |> filter(fn: (r) => r["_measurement"] == "sensor_data") |> filter(fn: (r) => r["room"] == "{room_id}") |> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature") ''' return self.query_api.query_csv(query=query, org=self.org) def get_aggregated_room_data(self, room_id: str, window: str = "1h"): query = f''' from(bucket: "{self.bucket}") |> range(start: -30d) |> filter(fn: (r) => r["_measurement"] == "sensor_data") |> filter(fn: (r) => r["room"] == "{room_id}") |> filter(fn: (r) => r["_field"] == "co2" or r["_field"] == "humidity" or r["_field"] == "temperature") |> aggregateWindow(every: {window}, fn: mean, createEmpty: false) |> yield(name: "mean") ''' return self.query_api.query(org=self.org, query=query)