from __future__ import annotations import asyncio from contextlib import suppress from pathlib import Path from fastapi import FastAPI, HTTPException from .cloud_client import CloudClient from .compose_manager import ComposeManager from .config import AgentConfig from .manifest_store import ManifestStore from .models import ( AgentStatus, ConfirmUpgradeRequest, HeartbeatPayload, LocalStatusResponse, OperationResult, PostponeUpgradeRequest, ReportPayload, UpdateCheckRequest, utc_now, ) from .mysql_backup import MysqlBackupManager from .registry_login import RegistryLoginManager from .state_store import StateStore class AgentService: def __init__(self, config: AgentConfig) -> None: self.config = config self.compose = ComposeManager(config.compose) self.cloud = CloudClient(config.cloud) self.state_store = StateStore(config.storage.state_file) self.manifest_store = ManifestStore(config.storage.manifest_dir) self.mysql_backup = MysqlBackupManager( enabled=config.mysql_backup.enabled, backup_dir=config.mysql_backup.backup_dir, dump_command=config.mysql_backup.dump_command, timeout_seconds=config.mysql_backup.timeout_seconds, compose_manager=self.compose, ) self.registry_login = RegistryLoginManager( enabled=config.registry.enabled, server=config.registry.server, username=config.registry.username, password=config.registry.password, timeout_seconds=config.registry.timeout_seconds, compose_manager=self.compose, ) self.state = self.state_store.load( vehicle_id=config.vehicle.vehicle_id, vin=config.vehicle.vin, current_release=config.vehicle.current_release, ) self.lock = asyncio.Lock() self.upgrade_task: asyncio.Task[None] | None = None self.background_tasks: list[asyncio.Task[None]] = [] self.last_backup_file: str | None = None self.last_target_release: str | None = None self.last_images: dict[str, str] = {} async def startup(self) -> None: self.background_tasks = [ asyncio.create_task(self._heartbeat_loop(), name="heartbeat-loop"), asyncio.create_task(self._update_loop(), name="update-loop"), ] async def shutdown(self) -> None: for task in self.background_tasks: task.cancel() for task in self.background_tasks: with suppress(asyncio.CancelledError): await task def get_local_status(self) -> LocalStatusResponse: return LocalStatusResponse( vehicle_id=self.state.vehicle_id, vin=self.state.vin, current_release=self.state.current_release, status=self.state.status.value, available_update=self.state.available_update, last_result=self.state.last_result, updated_at=self.state.updated_at, ) async def postpone_upgrade(self, request: PostponeUpgradeRequest) -> OperationResult: async with self.lock: if not self.state.available_update: return OperationResult(success=False, detail="当前没有可延期的升级任务") self.state.status = AgentStatus.WAIT_USER_CONFIRM self.state.last_result = f"用户稍后提醒: {request.reason}" self._touch_and_save() await self._safe_report(AgentStatus.WAIT_USER_CONFIRM, self.state.last_result) return OperationResult(success=True, detail="已记录稍后提醒") async def confirm_upgrade(self, request: ConfirmUpgradeRequest) -> OperationResult: async with self.lock: if not self.state.available_update: raise HTTPException(status_code=400, detail="当前没有可升级版本") if self.upgrade_task and not self.upgrade_task.done(): return OperationResult(success=False, detail="升级任务正在执行中") self.state.last_result = f"用户已确认升级: {request.confirmed_by}" self._touch_and_save() self.upgrade_task = asyncio.create_task(self._execute_upgrade(), name="upgrade-task") return OperationResult(success=True, detail="升级任务已启动") async def check_update_once(self) -> None: async with self.lock: payload = UpdateCheckRequest( vehicle_id=self.state.vehicle_id, vin=self.state.vin, current_release=self.state.current_release, ) try: response = await self.cloud.check_update(payload) except Exception as exc: async with self.lock: self.state.last_check_at = utc_now() self.state.last_result = f"检查更新失败: {exc}" self._touch_and_save() return async with self.lock: self.state.last_check_at = utc_now() if response.has_update and response.manifest: self.manifest_store.save(response.manifest) self.state.available_update = response.manifest self.state.status = AgentStatus.WAIT_USER_CONFIRM self.state.last_result = f"发现新版本 {response.manifest.release_version},等待人工确认" else: if self.state.status in {AgentStatus.IDLE, AgentStatus.SUCCESS, AgentStatus.WAIT_USER_CONFIRM}: self.state.status = AgentStatus.IDLE self.state.available_update = None self.state.last_result = response.message or "当前无可用更新" self._touch_and_save() async def heartbeat_once(self) -> None: async with self.lock: if self.state.status == AgentStatus.SUCCESS: self.state.status = AgentStatus.IDLE self.state.last_result = self.state.last_result or "升级成功,恢复空闲状态" self._touch_and_save() payload = HeartbeatPayload( vehicle_id=self.state.vehicle_id, vin=self.state.vin, current_release=self.state.current_release, agent_status=self.state.status.value, target_release=self.last_target_release, last_result=self.state.last_result, images=self.last_images, backup_file=self.last_backup_file, updated_at=self.state.updated_at, ) try: await self.cloud.heartbeat(payload) async with self.lock: self.state.last_heartbeat_at = utc_now() self._touch_and_save() except Exception as exc: async with self.lock: self.state.last_result = f"心跳上报失败: {exc}" self._touch_and_save() async def _heartbeat_loop(self) -> None: while True: await self.heartbeat_once() await asyncio.sleep(self.config.polling.heartbeat_interval_seconds) async def _update_loop(self) -> None: while True: await self.check_update_once() await asyncio.sleep(self.config.polling.update_interval_seconds) async def _execute_upgrade(self) -> None: async with self.lock: manifest = self.state.available_update if not manifest: return target_release = manifest.release_version env_mapping = manifest.components.to_env_mapping() self.last_target_release = target_release self.last_images = dict(env_mapping) self.last_backup_file = None self.state.status = AgentStatus.BACKING_UP_DATABASE self.state.last_result = f"开始升级到 {target_release},先执行数据库备份" self._touch_and_save() await self._safe_report(AgentStatus.BACKING_UP_DATABASE, f"开始升级到 {target_release},先执行数据库备份", target_release) try: backup_result = await self.mysql_backup.backup_before_upgrade(target_release) if not backup_result.success: raise RuntimeError(f"数据库备份失败: {backup_result.stderr or backup_result.stdout}") backup_file_path = (backup_result.stdout or "").strip() self.last_backup_file = backup_file_path or None backup_file_name = Path(backup_file_path).name if backup_file_path else "" backup_message = f"数据库备份完成,开始拉取镜像: {target_release}" if backup_file_name: backup_message = f"数据库备份完成({backup_file_name}),开始拉取镜像: {target_release}" async with self.lock: self.state.status = AgentStatus.PULLING_IMAGE self.state.last_result = backup_message self._touch_and_save() await self._safe_report(AgentStatus.PULLING_IMAGE, backup_message, target_release) login_result = await self.registry_login.login_if_needed() if not login_result.success: raise RuntimeError(f"私有仓库登录失败: {login_result.stderr or login_result.stdout}") await self.compose.apply_manifest(env_mapping) pull_result = await self.compose.pull() if not pull_result.success: raise RuntimeError(f"拉取镜像失败: {pull_result.stderr or pull_result.stdout}") async with self.lock: self.state.status = AgentStatus.RESTARTING_SERVICE self.state.last_result = "镜像拉取完成,开始重启服务" self._touch_and_save() await self._safe_report(AgentStatus.RESTARTING_SERVICE, "镜像拉取完成,开始重启服务", target_release) up_result = await self.compose.up() if not up_result.success: raise RuntimeError(f"服务启动失败: {up_result.stderr or up_result.stdout}") async with self.lock: self.state.status = AgentStatus.HEALTH_CHECKING self.state.last_result = "服务已启动,开始健康检查" self._touch_and_save() await self._safe_report(AgentStatus.HEALTH_CHECKING, "服务已启动,开始健康检查", target_release) health = await self.compose.health_check() if not health.success: raise RuntimeError(health.detail) async with self.lock: self.state.status = AgentStatus.SUCCESS self.state.current_release = target_release self.state.available_update = None self.state.last_result = f"升级成功: {target_release}" self._touch_and_save() await self._safe_report(AgentStatus.SUCCESS, f"升级成功: {target_release}", target_release) self.last_target_release = None except Exception as exc: await self.compose.rollback() async with self.lock: self.state.status = AgentStatus.ROLLED_BACK self.state.last_result = f"升级失败并已回滚: {exc}" self._touch_and_save() await self._safe_report(AgentStatus.ROLLED_BACK, f"升级失败并已回滚: {exc}", target_release) async def _safe_report(self, status: AgentStatus, detail: str, release_version: str | None = None) -> None: payload = ReportPayload( vehicle_id=self.state.vehicle_id, vin=self.state.vin, current_release=self.state.current_release, target_release=release_version or self.last_target_release, agent_status=status.value, success=status == AgentStatus.SUCCESS, message=detail, images=self.last_images, backup_file=self.last_backup_file, ) with suppress(Exception): await self.cloud.report(payload) def _touch_and_save(self) -> None: self.state.updated_at = utc_now() self.state_store.save(self.state) def create_app(config: AgentConfig) -> FastAPI: service = AgentService(config) app = FastAPI(title="Vehicle OTA Agent", version="0.1.0") @app.on_event("startup") async def on_startup() -> None: await service.startup() @app.on_event("shutdown") async def on_shutdown() -> None: await service.shutdown() @app.get("/health") async def health() -> dict[str, str]: return {"status": "ok"} @app.get("/ota/status", response_model=LocalStatusResponse) async def local_status() -> LocalStatusResponse: return service.get_local_status() @app.post("/ota/check-update") async def check_update() -> OperationResult: await service.check_update_once() return OperationResult(success=True, detail="已执行一次检查更新") @app.post("/ota/confirm", response_model=OperationResult) async def confirm_upgrade(request: ConfirmUpgradeRequest) -> OperationResult: return await service.confirm_upgrade(request) @app.post("/ota/postpone", response_model=OperationResult) async def postpone_upgrade(request: PostponeUpgradeRequest) -> OperationResult: return await service.postpone_upgrade(request) return app