from __future__ import annotations import asyncio import logging from datetime import datetime from pathlib import Path from .compose_manager import ComposeManager from .models import CommandResult logger = logging.getLogger(__name__) class MysqlBackupManager: def __init__(self, *, enabled: bool, backup_dir: str, dump_command: str, timeout_seconds: int, compose_manager: ComposeManager) -> None: self.enabled = enabled self.backup_dir = Path(backup_dir) self.dump_command = dump_command self.timeout_seconds = timeout_seconds self.compose_manager = compose_manager if self.enabled: self.backup_dir.mkdir(parents=True, exist_ok=True) async def backup_before_upgrade(self, target_release: str) -> CommandResult: if not self.enabled: return CommandResult(success=True, stdout="mysql backup skipped", returncode=0) timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") backup_file = self.backup_dir / f"mysql-backup-{target_release}-{timestamp}.sql" command = self.dump_command.replace("{backup_file}", str(backup_file)) logger.info("开始执行MySQL备份,目标文件: %s", backup_file) result = await self._run_shell_command(command) if result.success: logger.info("MySQL备份完成: %s", backup_file) result.stdout = str(backup_file) else: logger.error("MySQL备份失败: %s", result.stderr or result.stdout) return result async def _run_shell_command(self, command: str) -> CommandResult: process = await asyncio.create_subprocess_shell( command, cwd=str(self.compose_manager.working_dir), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=self.timeout_seconds) except asyncio.TimeoutError: process.kill() await process.wait() return CommandResult(success=False, stderr=f"mysql backup timeout after {self.timeout_seconds}s", returncode=-1) return CommandResult( success=process.returncode == 0, stdout=stdout.decode("utf-8", errors="ignore"), stderr=stderr.decode("utf-8", errors="ignore"), returncode=process.returncode or 0, )