from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS class InfluxDBWriter: def __init__(self, url: str, token: str, org: str, bucket: str): self.client = InfluxDBClient(url=url, token=token, org=org) self.write_api = self.client.write_api(write_options=SYNCHRONOUS) self.bucket = bucket self.org = org def write_point(self, measurement: str, tags: dict, fields: dict, timestamp=None): point = Point(measurement) for k, v in tags.items(): point.tag(k, v) for k, v in fields.items(): point.field(k, v) if timestamp: point.time(timestamp, WritePrecision.NS) self.write_api.write(bucket=self.bucket, org=self.org, record=point)