An error occurred while loading the file. Please try again.
views.py 3.63 KiB
import os
from django.shortcuts import render
from django.http import JsonResponse
from django.views.decorators.csrf import ensure_csrf_cookie, csrf_protect
import json
from backend.utils.influx import InfluxDBHelper
from dotenv import load_dotenv
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from django.contrib.auth import authenticate, login, logout
from .forms import CreateUserForm
@ensure_csrf_cookie
@require_http_methods(["GET"])
def set_csrf_token(request):
    """
    We set the CSRF cookie on the frontend.
    """
    return JsonResponse({"message": "CSRF cookie set"})
@require_http_methods(["POST"])
def login_view(request):
    try:
        data = json.loads(request.body.decode("utf-8"))
        email = data["email"]
        password = data["password"]
    except json.JSONDecodeError:
        return JsonResponse(
            {"success": False, "message": "Invalid JSON"}, status=400
    user = authenticate(request, username=email, password=password)
    if user:
        login(request, user)  # also creates a session in the browser
        return JsonResponse({"success": True})
    return JsonResponse(
        {"success": False, "message": "Invalid credentials"}, status=401
def logout_view(request):
    logout(request)
    return JsonResponse({"message": "Logged out"})
@require_http_methods(["GET"])
def user(request):
    if request.user.is_authenticated:
        return JsonResponse(
            {"username": request.user.username, "email": request.user.email}
    return JsonResponse({"message": "Not logged in"}, status=401)
@require_http_methods(["POST"])
def register(request):
    data = json.loads(request.body.decode("utf-8"))
    form = CreateUserForm(data)
    if form.is_valid():
        form.save()
        return JsonResponse(
            {"success": "User registered successfully"}, status=201
    else:
        errors = form.errors.as_json()
        return JsonResponse({"error": errors}, status=400)
@require_http_methods(["GET"]) def room_data_range(request): try: data = json.loads(request.body.decode("utf-8")) room = data["room"] start = data["start"] stop = data["stop"] if not room: return JsonResponse({"error": "Missing 'room' parameter"}, status=400) load_dotenv() client = InfluxDBHelper( url=os.getenv("INFLUXDB_URL"), token=os.getenv("INFLUXDB_TOKEN"), org=os.getenv("INFLUXDB_ORG"), bucket=os.getenv("INFLUXDB_BUCKET"), ) tables = client.get_room_data_in_range(room, start, stop) results = [] for table in tables: for record in table.records: results.append({ "time": str(record.get_time()), "field": record.get_field(), "value": record.get_value(), }) return JsonResponse({"room": room, "data": results}, status = 200) except json.JSONDecodeError: return JsonResponse( {"success": False, "message": "Invalid JSON"}, status=400 ) @require_http_methods(["GET"]) def get_rooms(request): client = InfluxDBHelper( url=os.getenv("INFLUXDB_URL"), token=os.getenv("INFLUXDB_TOKEN"), org=os.getenv("INFLUXDB_ORG"), bucket=os.getenv("INFLUXDB_BUCKET"), ) tables = client.list_rooms() rooms = set() for table in tables: for record in table.records: rooms.add(record.get_value()) return JsonResponse({"rooms": sorted(rooms)})