An error occurred while loading the file. Please try again.
InfluxDBWriter.py 813 bytes
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
class InfluxDBWriter:
    def __init__(self, url: str, token: str, org: str, bucket: str):
        self.client = InfluxDBClient(url=url, token=token, org=org)
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.bucket = bucket
        self.org = org
    def write_point(self, measurement: str, tags: dict, fields: dict, timestamp=None):
        point = Point(measurement)
        for k, v in tags.items():
            point.tag(k, v)
        for k, v in fields.items():
            point.field(k, v)
        if timestamp:
            point.time(timestamp, WritePrecision.NS)
        self.write_api.write(bucket=self.bucket, org=self.org, record=point)