import requests import time import json from datetime import datetime, timedelta class AdaxConnector: def __init__(self, id:str, secret:str) -> None: self.__client_id = id self.__client_secret = secret self.__access_token = None self.__token_expiration = None self.__refresh_token = None self._base_url = "https://api-1.adax.no/client-api" self._endpoints = { "content":"rest/v1/content/", "control":"rest/v1/control/", "energy_log":"rest/v1/energy_log/" } self._request_types = self._endpoints.keys() self.__get_access_token() def __send_auth_request(self, data:dict): req = requests.Request() req.url = f"{self._base_url}/auth/token" req.method = 'POST' req.headers = {'Accept': 'application/json', 'Content-Type': 'application/x-www-form-urlencoded'} req.data = data prepped = req.prepare() with requests.Session() as s: response = s.send(prepped) return response.json() def send_request(self, request_type:str, data:dict={}): # Check if request_type exists if not request_type in self._request_types: return f"Request type [{request_type}] unknown" else: # Check if access token is valid if self.__token_is_expired(): self.__refresh_access_token() req = requests.Request() req.url = f"{self._base_url}/{self._endpoints[request_type]}" req.headers = {"Authorization": f"Bearer {self.__access_token}", "Content-Type": "application/json"} req.method = 'POST' if request_type == 'control' else 'GET' if request_type == 'energy_log': req.url += data['room_id'] if req.method == 'POST': req.json = data prepped = req.prepare() with requests.Session() as s: response = s.send(prepped) return response.json() def __get_access_token(self) -> None: data = { "grant_type": "password", "username": self.__client_id, "password": self.__client_secret } response = self.__send_auth_request(data=data) self.__access_token = response['access_token'] self.__refresh_token = response['refresh_token'] self.__token_expiration = datetime.now()+timedelta(seconds=86400) # print(response) def __refresh_access_token(self) -> None: data = { "grant_type": "refresh_token", "refresh_token": self.__refresh_token, "username": self.__client_id, "password": self.__client_secret } response = self.__send_auth_request(data=data) self.__access_token = response['access_token'] self.__refresh_token = response['refresh_token'] self.__token_expiration = datetime.now()+timedelta(seconds=86400) # print(response) def __token_is_expired(self) -> bool: now = datetime.now() return self.__token_expiration < now def __time_until_access_token_expires(self) -> dict: """Converts a timedelta object to a dictionary of days, hours, minutes, and seconds.""" td = self.__token_expiration-datetime.now() days = td.days seconds = td.seconds microseconds = td.microseconds hours, remainder = divmod(seconds, 3600) minutes, seconds = divmod(remainder, 60) # Handle microseconds by adding them to the seconds seconds += microseconds / 1000000.0 return { "days": days, "hours": hours, "minutes": minutes, "seconds": seconds, } def get_content(self)-> dict: response = self.send_request(request_type="content") # r = json.dumps(response, indent=2) # print(r) return response def disable_heating_for_room(self, room_id:int): data = {'rooms': [{ 'id': room_id, 'heatingEnabled': False, 'targetTemperature': 1800}] } response = self.send_request(request_type="control", data=data) return response def _enable_heating_for_room(self, room_id:int): data = {'rooms': [{ 'id': room_id, 'heatingEnabled': True}] } response = self.send_request(request_type="control", data=data) return response['rooms'][0]['status'] def _set_room_temperature(self, room_id:int, target_temp:int): data = {'rooms': [{ 'id': room_id, 'targetTemperature': target_temp}] } response = self.send_request(request_type="control", data=data) return response['rooms'][0]['status'] def _get_energy_info(self, room_id:int): data = {'room_id': str(room_id)} response = self.send_request(request_type='energy_log', data=data) return response