Files
Frobot-OTA-Agent/ota_agent/cloud_client.py
2026-04-23 14:40:29 +08:00

48 lines
1.8 KiB
Python

from __future__ import annotations
import json
import httpx
from .config import CloudConfig
from .models import HeartbeatPayload, ReportPayload, UpdateCheckRequest, UpdateCheckResponse
class CloudClient:
def __init__(self, config: CloudConfig) -> None:
self.config = config
self.timeout = httpx.Timeout(config.timeout_seconds)
def _headers(self) -> dict[str, str]:
return {
self.config.token_header: self.config.token,
"Content-Type": "application/json",
}
async def heartbeat(self, payload: HeartbeatPayload) -> None:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
self.config.base_url.rstrip("/") + self.config.heartbeat_path,
headers=self._headers(),
json=payload.model_dump(by_alias=True),
)
response.raise_for_status()
async def check_update(self, payload: UpdateCheckRequest) -> UpdateCheckResponse:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
self.config.base_url.rstrip("/") + self.config.update_check_path,
headers=self._headers(),
json=payload.model_dump(by_alias=True),
)
response.raise_for_status()
return UpdateCheckResponse(**response.json())
async def report(self, payload: ReportPayload) -> None:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
self.config.base_url.rstrip("/") + self.config.report_path,
headers=self._headers(),
json=payload.model_dump(by_alias=True),
)
response.raise_for_status()