import json import os import subprocess import tarfile import tempfile import time from datetime import datetime from db import db, Job, Run def run_db_dump(job, instance, backup_dir): """Point d'entrée commun mysql et postgresql.""" if job.type == "mysql": return _run_mysql(job, instance, backup_dir) elif job.type == "postgresql": return _run_postgresql(job, instance, backup_dir) raise ValueError(f"Type inconnu pour db_dump : {job.type}") # --------------------------------------------------------------------------- # MySQL # --------------------------------------------------------------------------- def _run_mysql(job, instance, backup_dir): from flask import current_app cfg = json.loads(job.config_json or "{}") dbname = cfg.get("database", "") if not dbname: raise ValueError("Nom de base de données manquant dans la configuration du job.") archive_name = _archive_name(instance, "mysql", dbname) _abort_if_exists(archive_name, backup_dir) with tempfile.TemporaryDirectory() as tmpdir: dump_path = os.path.join(tmpdir, f"{dbname}.sql") result = subprocess.run( [ "sudo", "mysqldump", "--single-transaction", "--routines", "--triggers", "--result-file", dump_path, dbname, ], capture_output=True, text=True, timeout=7200, ) log = (result.stdout + result.stderr).strip() if result.returncode != 0: raise RuntimeError(f"mysqldump a échoué (code {result.returncode}) :\n{log}") _write_tar(tmpdir, dump_path, dbname, archive_name, backup_dir, job, instance, current_app.config.get("INSTANCE_URL", "")) return archive_name, log or "mysqldump terminé sans sortie." # --------------------------------------------------------------------------- # PostgreSQL # --------------------------------------------------------------------------- def _run_postgresql(job, instance, backup_dir): from flask import current_app cfg = json.loads(job.config_json or "{}") dbname = cfg.get("database", "") if not dbname: raise ValueError("Nom de base de données manquant dans la configuration du job.") archive_name = _archive_name(instance, "postgresql", dbname) _abort_if_exists(archive_name, backup_dir) with tempfile.TemporaryDirectory() as tmpdir: dump_path = os.path.join(tmpdir, f"{dbname}.sql") # pg_dump doit tourner en tant qu'utilisateur postgres result = subprocess.run( ["sudo", "-u", "postgres", "pg_dump", "--format=plain", dbname], capture_output=True, timeout=7200, ) if result.returncode != 0: log = result.stderr.decode("utf-8", errors="replace").strip() raise RuntimeError(f"pg_dump a échoué (code {result.returncode}) :\n{log}") with open(dump_path, "wb") as f: f.write(result.stdout) log = result.stderr.decode("utf-8", errors="replace").strip() _write_tar(tmpdir, dump_path, dbname, archive_name, backup_dir, job, instance, current_app.config.get("INSTANCE_URL", "")) return archive_name, log or "pg_dump terminé sans sortie." # --------------------------------------------------------------------------- # Helpers partagés # --------------------------------------------------------------------------- def _archive_name(instance, db_type, dbname): date_str = datetime.utcnow().strftime("%Y%m%d") return f"{instance}_{db_type}_{dbname}_{date_str}" def _abort_if_exists(archive_name, backup_dir): path = os.path.join(backup_dir, archive_name + ".tar") if os.path.exists(path): raise RuntimeError( f"L'archive {archive_name}.tar existe déjà. " "Supprimez-la ou attendez le prochain cycle." ) def _write_tar(tmpdir, dump_path, dbname, archive_name, backup_dir, job, instance, instance_url): """Crée le .tar dans backup_dir et le .info.json YunoHost à côté.""" archive_path = os.path.join(backup_dir, archive_name + ".tar") # backup_info.json embarqué dans le tar (métadonnées BackupManager) import json as _json info = { "instance_name": instance, "instance_url": instance_url, "type": job.type, "database": dbname, "created_at": datetime.utcnow().isoformat(), "backupmanager_version": "1.0.0", } info_path = os.path.join(tmpdir, "backup_info.json") with open(info_path, "w") as f: _json.dump(info, f, indent=2) with tarfile.open(archive_path, "w") as tar: tar.add(dump_path, arcname=f"db/{dbname}.sql") tar.add(info_path, arcname="backup_info.json") # .info.json YunoHost (hors tar, requis pour listing webadmin) size = os.path.getsize(archive_path) ynh_info = { "created_at": int(time.time()), "description": f"BackupManager: {job.type} {dbname}", "size": size, "from_before_upgrade": False, "apps": {}, "system": {}, } ynh_info_path = os.path.join(backup_dir, archive_name + ".info.json") with open(ynh_info_path, "w") as f: _json.dump(ynh_info, f, indent=2)