AdaxController/adaxconnector.py

134 lines
5.0 KiB
Python

import requests
import time
import json
from datetime import datetime, timedelta
class AdaxConnector:
def __init__(self, id:str, secret:str) -> None:
self.__client_id = id
self.__client_secret = secret
self.__access_token = None
self.__token_expiration = None
self.__refresh_token = None
self._base_url = "https://api-1.adax.no/client-api"
self._endpoints = {
"content":"rest/v1/content/",
"control":"rest/v1/control/",
"energy_log":"rest/v1/energy_log/"
}
self._request_types = self._endpoints.keys()
self.__get_access_token()
def __send_auth_request(self, data:dict):
req = requests.Request()
req.url = f"{self._base_url}/auth/token"
req.method = 'POST'
req.headers = {'Accept': 'application/json', 'Content-Type': 'application/x-www-form-urlencoded'}
req.data = data
prepped = req.prepare()
with requests.Session() as s:
response = s.send(prepped)
return response.json()
def send_request(self, request_type:str, data:dict={}):
# Check if request_type exists
if not request_type in self._request_types:
return f"Request type [{request_type}] unknown"
else:
# Check if access token is valid
if self.__token_is_expired():
self.__refresh_access_token()
req = requests.Request()
req.url = f"{self._base_url}/{self._endpoints[request_type]}"
req.headers = {"Authorization": f"Bearer {self.__access_token}", "Content-Type": "application/json"}
req.method = 'POST' if request_type == 'control' else 'GET'
if request_type == 'energy_log':
req.url += data['room_id']
if req.method == 'POST':
req.json = data
prepped = req.prepare()
with requests.Session() as s:
response = s.send(prepped)
return response.json()
def __get_access_token(self) -> None:
data = {
"grant_type": "password",
"username": self.__client_id,
"password": self.__client_secret
}
response = self.__send_auth_request(data=data)
self.__access_token = response['access_token']
self.__refresh_token = response['refresh_token']
self.__token_expiration = datetime.now()+timedelta(seconds=86400)
# print(response)
def __refresh_access_token(self) -> None:
data = {
"grant_type": "refresh_token",
"refresh_token": self.__refresh_token,
"username": self.__client_id,
"password": self.__client_secret
}
response = self.__send_auth_request(data=data)
self.__access_token = response['access_token']
self.__refresh_token = response['refresh_token']
self.__token_expiration = datetime.now()+timedelta(seconds=86400)
# print(response)
def __token_is_expired(self) -> bool:
now = datetime.now()
return self.__token_expiration < now
def __time_until_access_token_expires(self) -> dict:
"""Converts a timedelta object to a dictionary of days, hours, minutes, and seconds."""
td = self.__token_expiration-datetime.now()
days = td.days
seconds = td.seconds
microseconds = td.microseconds
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
# Handle microseconds by adding them to the seconds
seconds += microseconds / 1000000.0
return {
"days": days,
"hours": hours,
"minutes": minutes,
"seconds": seconds,
}
def get_content(self)-> dict:
response = self.send_request(request_type="content")
# r = json.dumps(response, indent=2)
# print(r)
return response
def disable_heating_for_room(self, room_id:int):
data = {'rooms': [{ 'id': room_id, 'heatingEnabled': False, 'targetTemperature': 1800}] }
response = self.send_request(request_type="control", data=data)
return response
def _enable_heating_for_room(self, room_id:int):
data = {'rooms': [{ 'id': room_id, 'heatingEnabled': True}] }
response = self.send_request(request_type="control", data=data)
return response['rooms'][0]['status']
def _set_room_temperature(self, room_id:int, target_temp:int):
data = {'rooms': [{ 'id': room_id, 'targetTemperature': target_temp}] }
response = self.send_request(request_type="control", data=data)
return response['rooms'][0]['status']
def _get_energy_info(self, room_id:int):
data = {'room_id': str(room_id)}
response = self.send_request(request_type='energy_log', data=data)
return response