Energyclient -

def close(self): self.conn.close() | Problem | Strategy | |---------|----------| | Network failure | Retry with backoff (3–5 attempts) | | Rate limiting | Parse Retry-After header, queue commands | | Invalid token | Re‑authenticate automatically once | | Data gaps | Interpolate or flag missing samples | | Meter offline | Cache commands; apply when reconnected |

def control_load(self, state: bool): """Turn load on/off (demand response)""" resp = self.session.post( f"self.api_root/meters/self.meter_id/control", json="load_control": state ) return resp.status_code == 200 energyclient

def __post_init__(self): self._init_db() self.session = requests.Session() self.session.headers.update("Authorization": f"Bearer self.token") def close(self): self

from oauthlib.oauth2 import BackendApplicationClient from requests_oauthlib import OAuth2Session client = BackendApplicationClient(client_id=CLIENT_ID) oauth = OAuth2Session(client=client) token = oauth.fetch_token(token_url=TOKEN_URL, client_secret=CLIENT_SECRET) Typical core entities: apply when reconnected | def control_load(self

Shopping Cart