import csv import json import os import pwd import re import subprocess import tarfile import tempfile import time from datetime import datetime # --------------------------------------------------------------------------- # Backup # --------------------------------------------------------------------------- def backup_custom_dir(job, instance, backup_dir): """Sauvegarde un répertoire arbitraire au format compatible YunoHost.""" cfg = json.loads(job.config_json or "{}") source_path = cfg.get("source_path", "").rstrip("/") excludes = cfg.get("excludes", []) if not source_path: raise ValueError("source_path manquant dans la configuration du job.") if not os.path.isabs(source_path): raise ValueError(f"source_path doit être un chemin absolu : {source_path}") from jobs.utils import unique_archive_name label = _slugify(job.name) base_name = f"{instance}_{label}_{datetime.utcnow().strftime('%Y%m%d')}" archive_name = unique_archive_name(base_name, backup_dir) archive_path = os.path.join(backup_dir, archive_name + ".tar") from flask import current_app instance_url = current_app.config.get("INSTANCE_URL", "") tmpdir = tempfile.mkdtemp(prefix="backupmanager_") try: # Répertoire de destination dans l'archive : data/custom{source_path} dest_in_archive = os.path.join(tmpdir, "data", "custom" + source_path) os.makedirs(dest_in_archive, exist_ok=True) # Copie avec sudo rsync (accès root pour lire tous les fichiers) rsync_cmd = ["sudo", "rsync", "-az", "--delete"] for exc in excludes: rsync_cmd += ["--exclude", exc] rsync_cmd += [source_path + "/", dest_in_archive + "/"] result = subprocess.run(rsync_cmd, capture_output=True, text=True, timeout=7200) log = (result.stdout + result.stderr).strip() if result.returncode != 0: raise RuntimeError(f"rsync a échoué (code {result.returncode}) :\n{log}") # backup.csv (requis YunoHost) csv_path = os.path.join(tmpdir, "backup.csv") with open(csv_path, "w", newline="") as f: writer = csv.writer(f, delimiter=";") writer.writerow(["source", "dest"]) writer.writerow([f"data/custom{source_path}", source_path]) # backup_info.json (métadonnées BackupManager) info = { "instance_name": instance, "instance_url": instance_url, "type": "custom_dir", "source_path": source_path, "job_name": job.name, "created_at": datetime.utcnow().isoformat(), "backupmanager_version": "1.0.0", "restore": cfg.get("restore", {}), } info_path = os.path.join(tmpdir, "backup_info.json") with open(info_path, "w") as f: json.dump(info, f, indent=2) # Création du .tar via sudo (pour lire les fichiers root-owned dans tmpdir) result = subprocess.run( ["sudo", "tar", "-cf", archive_path, "-C", tmpdir, "backup.csv", "backup_info.json", "data"], capture_output=True, text=True, timeout=600, ) if result.returncode != 0: raise RuntimeError(f"tar a échoué : {result.stderr.strip()}") # Rendre l'archive accessible à l'app subprocess.run( ["sudo", "chown", f"{_get_current_user()}:", archive_path], check=True, ) # .info.json YunoHost — écrit dans tmpdir puis copié via sudo rsync from jobs.utils import sudo_getsize size = sudo_getsize(archive_path) ynh_info = { "created_at": int(time.time()), "description": f"BackupManager: custom_dir {source_path}", "size": size, "from_before_upgrade": False, "apps": {}, "system": {}, } tmp_ynh_info = os.path.join(tmpdir, archive_name + ".info.json") with open(tmp_ynh_info, "w") as f: json.dump(ynh_info, f, indent=2) subprocess.run( ["sudo", "rsync", tmp_ynh_info, os.path.join(backup_dir, archive_name + ".info.json")], capture_output=True, ) finally: subprocess.run(["sudo", "rm", "-rf", tmpdir], check=False) return archive_name, log or "rsync terminé sans sortie." # --------------------------------------------------------------------------- # Restore # --------------------------------------------------------------------------- def restore_custom_dir(archive_name, backup_dir): """Restauration complète d'un custom_dir : fichiers + user + service + permissions.""" archive_path = os.path.join(backup_dir, archive_name + ".tar") from jobs.utils import sudo_exists if not sudo_exists(archive_path): raise FileNotFoundError(f"Archive introuvable : {archive_path}") info = _read_backup_info(archive_path) source_path = info.get("source_path", "").rstrip("/") restore_cfg = info.get("restore", {}) log_lines = [] tmpdir = tempfile.mkdtemp(prefix="backupmanager_restore_") try: # Extraction complète dans tmpdir result = subprocess.run( ["sudo", "tar", "-xf", archive_path, "-C", tmpdir], capture_output=True, text=True, timeout=600, ) if result.returncode != 0: raise RuntimeError(f"Extraction échouée : {result.stderr.strip()}") log_lines.append("Archive extraite.") extracted_data = os.path.join(tmpdir, "data", "custom" + source_path) if not os.path.isdir(extracted_data): raise RuntimeError( f"Chemin attendu absent dans l'archive : data/custom{source_path}" ) # Créer le répertoire de destination si absent subprocess.run(["sudo", "mkdir", "-p", source_path], check=True) # Restauration des fichiers result = subprocess.run( ["sudo", "rsync", "-az", "--delete", extracted_data + "/", source_path + "/"], capture_output=True, text=True, timeout=7200, ) if result.returncode != 0: raise RuntimeError(f"rsync restore a échoué : {result.stderr.strip()}") log_lines.append(f"Fichiers restaurés vers {source_path}.") # User système user_cfg = restore_cfg.get("system_user", {}) if user_cfg.get("name"): _restore_system_user(user_cfg, log_lines) # Permissions perms_cfg = restore_cfg.get("permissions", {}) if perms_cfg: _restore_permissions(source_path, perms_cfg, log_lines) # Service systemd service_cfg = restore_cfg.get("systemd_service", {}) if service_cfg.get("name"): _restore_systemd_service(service_cfg, log_lines) # Commandes post-restauration post_cmds = restore_cfg.get("post_restore_commands", []) for cmd in post_cmds: _run_command(cmd, log_lines) finally: subprocess.run(["sudo", "rm", "-rf", tmpdir], check=False) return "\n".join(log_lines) def _restore_system_user(user_cfg, log_lines): name = user_cfg["name"] home = user_cfg.get("home", "/opt/" + name) shell = user_cfg.get("shell", "/bin/false") try: pwd.getpwnam(name) log_lines.append(f"Utilisateur système '{name}' déjà existant.") except KeyError: subprocess.run( ["sudo", "useradd", "--system", "--home-dir", home, "--no-create-home", "--shell", shell, name], check=True, ) log_lines.append(f"Utilisateur système '{name}' créé.") def _restore_permissions(source_path, perms_cfg, log_lines): owner = perms_cfg.get("owner") mode = perms_cfg.get("mode") if owner: subprocess.run(["sudo", "chown", "-R", owner, source_path], check=True) log_lines.append(f"Propriétaire défini : {owner}.") if mode: subprocess.run(["sudo", "chmod", "-R", mode, source_path], check=True) log_lines.append(f"Permissions définies : {mode}.") def _restore_systemd_service(service_cfg, log_lines): name = service_cfg["name"] service_file = service_cfg.get("service_file", "") if service_file and os.path.exists(service_file): subprocess.run(["sudo", "systemctl", "daemon-reload"], check=False) log_lines.append("systemctl daemon-reload effectué.") subprocess.run(["sudo", "systemctl", "enable", name], check=False) result = subprocess.run( ["sudo", "systemctl", "start", name], capture_output=True, text=True, ) if result.returncode == 0: log_lines.append(f"Service '{name}' activé et démarré.") else: log_lines.append( f"Service '{name}' : démarrage échoué — {result.stderr.strip()}" ) def _run_command(cmd, log_lines): result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=120 ) out = (result.stdout + result.stderr).strip() status = "✓" if result.returncode == 0 else "✗" log_lines.append(f"{status} {cmd}" + (f"\n {out}" if out else "")) # --------------------------------------------------------------------------- # Utilitaires # --------------------------------------------------------------------------- def _slugify(s): return re.sub(r'[^a-z0-9]+', '-', s.lower().strip()).strip('-') def _get_current_user(): import getpass return getpass.getuser() def _read_backup_info(archive_path): try: with tarfile.open(archive_path) as tar: member = tar.extractfile("backup_info.json") if member: return json.loads(member.read()) except Exception: pass return {} def read_backup_info_from_dir(archive_name, backup_dir): """Utilisé par app.py pour afficher les infos de restauration.""" archive_path = os.path.join(backup_dir, archive_name + ".tar") return _read_backup_info(archive_path)