import glob import hashlib import json import logging import math import os import shutil import subprocess import threading import uuid from datetime import datetime from flask import ( Flask, flash, jsonify, redirect, render_template, request, url_for, ) from werkzeug.middleware.proxy_fix import ProxyFix # --- Configuration ----------------------------------------------------------- _config_path = os.environ.get( "BACKUPMANAGER_CONFIG", os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.py"), ) app = Flask(__name__) app.config.from_pyfile(_config_path) app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///" + app.config["DB_PATH"] app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False # Proxy headers Nginx → Flask (sous-chemin + HTTPS) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) # Filtre Jinja2 pour désérialiser du JSON dans les templates app.jinja_env.filters["fromjson"] = json.loads # Logging os.makedirs(os.path.dirname(app.config["LOG_PATH"]), exist_ok=True) logging.basicConfig( filename=app.config["LOG_PATH"], level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) # --- Extensions -------------------------------------------------------------- from db import db, Job, Run, Destination, Setting, RemoteInstance, RemoteRun, Upload db.init_app(app) from scheduler import init_scheduler, schedule_job, remove_job # --- Démarrage --------------------------------------------------------------- with app.app_context(): db.create_all() init_scheduler(app) for _job in Job.query.filter_by(enabled=True).all(): schedule_job(_job) # --- Auth API ---------------------------------------------------------------- @app.before_request def _check_api_auth(): if not request.path.startswith("/api/"): return if request.path == "/api/v1/health": return token = request.headers.get("X-BackupManager-Key", "") if token != app.config["API_TOKEN"]: return jsonify({"error": "Unauthorized"}), 401 # --- Context processors ------------------------------------------------------ @app.context_processor def _inject_globals(): return { "instance_name": app.config.get("INSTANCE_NAME", ""), "now": datetime.utcnow(), } # --- Helpers ----------------------------------------------------------------- def _read_archive_info(archive_name): backup_dir = app.config["YUNOHOST_BACKUP_DIR"] archive_path = os.path.join(backup_dir, archive_name + ".tar") from jobs.utils import sudo_read_backup_info info = sudo_read_backup_info(archive_path) if not info.get("type"): # Archives YunoHost natives : déterminer le type depuis la table Run run = Run.query.filter_by(archive_name=archive_name).first() if run: job = db.session.get(Job, run.job_id) if job: info["type"] = job.type info["_from_run"] = True return info def _get_ynh_apps(): try: result = subprocess.run( ["sudo", "yunohost", "app", "list", "--output-as", "json"], capture_output=True, text=True, timeout=15, ) if result.returncode == 0: return json.loads(result.stdout).get("apps", []) except Exception: pass return [] # --- Routes dashboard -------------------------------------------------------- @app.route("/") def index(): jobs = Job.query.order_by(Job.name).all() last_runs = { j.id: Run.query.filter_by(job_id=j.id).order_by(Run.started_at.desc()).first() for j in jobs } return render_template("dashboard_local.html", jobs=jobs, last_runs=last_runs) @app.route("/jobs/new", methods=["GET", "POST"]) def job_new(): if request.method == "POST": return _save_job(None) return render_template("job_form.html", job=None, ynh_apps=_get_ynh_apps(), destinations=Destination.query.filter_by(enabled=True).all()) @app.route("/jobs//edit", methods=["GET", "POST"]) def job_edit(job_id): job = db.get_or_404(Job, job_id) if request.method == "POST": return _save_job(job) return render_template("job_form.html", job=job, ynh_apps=_get_ynh_apps(), destinations=Destination.query.filter_by(enabled=True).all()) @app.route("/jobs//delete", methods=["POST"]) def job_delete(job_id): job = db.get_or_404(Job, job_id) remove_job(job.id) db.session.delete(job) db.session.commit() flash(f"Job « {job.name} » supprimé.", "success") return redirect(url_for("index")) @app.route("/jobs//run", methods=["POST"]) def job_run_now(job_id): job = db.get_or_404(Job, job_id) from scheduler import _execute_job import threading t = threading.Thread(target=_execute_job, args=(job.id,), daemon=True) t.start() flash(f"Job « {job.name} » lancé manuellement.", "success") return redirect(url_for("index")) @app.route("/jobs//history") def job_history(job_id): job = db.get_or_404(Job, job_id) runs = Run.query.filter_by(job_id=job_id).order_by(Run.started_at.desc()).limit(100).all() return render_template("job_history.html", job=job, runs=runs) def _do_restore_job(archive_name, archive_type, restore_run_id): """Exécute la restauration en arrière-plan et met à jour le Run.""" with app.app_context(): run = db.session.get(Run, restore_run_id) if restore_run_id else None try: backup_dir = app.config["YUNOHOST_BACKUP_DIR"] if archive_type == "custom_dir": from jobs.custom_dir import restore_custom_dir log = restore_custom_dir(archive_name, backup_dir) elif archive_type in ("mysql", "postgresql"): from jobs.db_dump import restore_db_dump log = restore_db_dump(archive_name, backup_dir) elif archive_type == "ynh_app": result = subprocess.run( ["sudo", "yunohost", "backup", "restore", archive_name, "--apps", "--force"], capture_output=True, text=True, timeout=3600, ) log = (result.stdout + result.stderr).strip() if result.returncode != 0: raise RuntimeError(f"yunohost backup restore a échoué :\n{log}") elif archive_type == "ynh_system": result = subprocess.run( ["sudo", "yunohost", "backup", "restore", archive_name, "--system", "--force"], capture_output=True, text=True, timeout=3600, ) log = (result.stdout + result.stderr).strip() if result.returncode != 0: raise RuntimeError(f"yunohost backup restore a échoué :\n{log}") else: raise NotImplementedError( f"Restauration non supportée pour le type '{archive_type}'." ) if run: run.status = "success" run.finished_at = datetime.utcnow() run.log_text = f"[RESTAURATION]\n{log or 'OK'}" db.session.commit() except Exception as exc: app.logger.error(f"Restauration {archive_name} échouée : {exc}") if run: run.status = "error" run.finished_at = datetime.utcnow() run.log_text = f"[RESTAURATION]\n{exc}" db.session.commit() def _start_restore(archive_name): """Crée un Run de restauration et lance le thread. Retourne (restore_run_id, archive_type).""" info = _read_archive_info(archive_name) archive_type = info.get("type", "") original_run = Run.query.filter_by(archive_name=archive_name).first() restore_run_id = None if original_run: restore_run = Run( job_id=original_run.job_id, started_at=datetime.utcnow(), status="running", archive_name=archive_name, log_text="[RESTAURATION en cours…]", ) db.session.add(restore_run) db.session.commit() restore_run_id = restore_run.id threading.Thread( target=_do_restore_job, args=(archive_name, archive_type, restore_run_id), daemon=True, ).start() return restore_run_id, archive_type @app.route("/archives//restore", methods=["GET", "POST"]) def archive_restore(archive_name): info = _read_archive_info(archive_name) if request.method == "GET": return render_template("restore_confirm.html", archive_name=archive_name, info=info) _start_restore(archive_name) flash(f"Restauration de « {archive_name} » démarrée en arrière-plan.", "success") return redirect(url_for("index")) @app.route("/jobs//toggle", methods=["POST"]) def job_toggle(job_id): job = db.get_or_404(Job, job_id) job.enabled = not job.enabled job.updated_at = datetime.utcnow() db.session.commit() if job.enabled: schedule_job(job) flash(f"Job « {job.name} » activé.", "success") else: remove_job(job.id) flash(f"Job « {job.name} » désactivé.", "info") return redirect(url_for("index")) def _save_job(job): f = request.form job_type = f.get("type", "") name = f.get("name", "").strip() if not name: flash("Le nom est requis.", "error") return render_template("job_form.html", job=job, ynh_apps=_get_ynh_apps(), destinations=Destination.query.filter_by(enabled=True).all()) cfg = {} if job_type == "ynh_app": cfg = {"app_id": f.get("app_id", ""), "core_only": f.get("core_only") == "1"} elif job_type == "ynh_system": cfg = {} elif job_type in ("mysql", "postgresql"): dbname = f.get("db_database", "").strip() if not dbname: flash("Le nom de la base de données est requis.", "error") return render_template("job_form.html", job=job, ynh_apps=_get_ynh_apps(), destinations=Destination.query.filter_by(enabled=True).all()) cfg = {"database": dbname} elif job_type == "custom_dir": source_path = f.get("source_path", "").strip().rstrip("/") if not source_path or not source_path.startswith("/"): flash("Le chemin source doit être un chemin absolu (ex: /opt/monapp).", "error") return render_template("job_form.html", job=job, ynh_apps=_get_ynh_apps(), destinations=Destination.query.filter_by(enabled=True).all()) excludes = [e.strip() for e in f.get("excludes", "").splitlines() if e.strip()] restore_cfg = {} user_name = f.get("restore_user_name", "").strip() if user_name: restore_cfg["system_user"] = { "name": user_name, "home": f.get("restore_user_home", source_path).strip() or source_path, "shell": f.get("restore_user_shell", "/bin/false").strip() or "/bin/false", } service_name = f.get("restore_service_name", "").strip() if service_name: restore_cfg["systemd_service"] = { "name": service_name, "service_file": f.get("restore_service_file", "").strip(), } owner = f.get("restore_perm_owner", "").strip() mode = f.get("restore_perm_mode", "").strip() if owner or mode: restore_cfg["permissions"] = {} if owner: restore_cfg["permissions"]["owner"] = owner if mode: restore_cfg["permissions"]["mode"] = mode post_cmds = [c.strip() for c in f.get("restore_post_cmds", "").splitlines() if c.strip()] if post_cmds: restore_cfg["post_restore_commands"] = post_cmds cfg = {"source_path": source_path, "excludes": excludes, "restore": restore_cfg} if job is None: job = Job() db.session.add(job) dest_id = f.get("destination_id", "").strip() job.name = name job.type = job_type job.config_json = json.dumps(cfg) job.cron_expr = f.get("cron_expr", "0 3 * * *").strip() job.retention_mode = f.get("retention_mode", "count") job.retention_value = int(f.get("retention_value", 7)) job.enabled = f.get("enabled") == "1" job.core_only = cfg.get("core_only", False) job.destination_id = int(dest_id) if dest_id else None job.updated_at = datetime.utcnow() db.session.commit() if job.enabled: schedule_job(job) else: remove_job(job.id) flash(f"Job « {job.name} » enregistré.", "success") return redirect(url_for("index")) # --- Destinations ------------------------------------------------------------ @app.route("/destinations") def destinations_list(): destinations = Destination.query.order_by(Destination.name).all() return render_template("destinations.html", destinations=destinations) @app.route("/destinations/new", methods=["GET", "POST"]) def destination_new(): if request.method == "POST": return _save_destination(None) return render_template("destination_form.html", dest=None) @app.route("/destinations//edit", methods=["GET", "POST"]) def destination_edit(dest_id): dest = db.get_or_404(Destination, dest_id) if request.method == "POST": return _save_destination(dest) pub_key = _get_pub_key(dest) return render_template("destination_form.html", dest=dest, pub_key=pub_key) @app.route("/destinations//delete", methods=["POST"]) def destination_delete(dest_id): dest = db.get_or_404(Destination, dest_id) db.session.delete(dest) db.session.commit() flash(f"Destination « {dest.name} » supprimée.", "success") return redirect(url_for("destinations_list")) @app.route("/destinations//test", methods=["POST"]) def destination_test(dest_id): dest = db.get_or_404(Destination, dest_id) from jobs.transfer import test_connection ok, msg = test_connection(dest, app.config["DATA_DIR"]) flash(msg, "success" if ok else "error") return redirect(url_for("destinations_list")) @app.route("/archives//transfer", methods=["POST"]) def archive_transfer(archive_name): dest_id = request.form.get("destination_id", type=int) dest = db.get_or_404(Destination, dest_id) def _do_transfer(): with app.app_context(): try: from jobs.transfer import transfer_archive transfer_archive(archive_name, dest, app.config["YUNOHOST_BACKUP_DIR"], app.config["DATA_DIR"]) app.logger.info(f"Transfert {archive_name} → {dest.remote_str} OK") except Exception as exc: app.logger.error(f"Transfert {archive_name} échoué : {exc}") import threading threading.Thread(target=_do_transfer, daemon=True).start() flash(f"Transfert de « {archive_name} » vers {dest.remote_str} démarré.", "success") return redirect(request.referrer or url_for("index")) def _save_destination(dest): f = request.form name = f.get("name", "").strip() host = f.get("host", "").strip() if not name or not host: flash("Nom et hôte sont requis.", "error") return render_template("destination_form.html", dest=dest) is_new = dest is None if is_new: dest = Destination() db.session.add(dest) dest.name = name dest.host = host dest.port = int(f.get("port", 22) or 22) dest.user = f.get("user", "root").strip() or "root" dest.remote_path = f.get("remote_path", "/home/yunohost.backup/archives").strip() dest.enabled = f.get("enabled") == "1" db.session.flush() # obtenir l'id si nouveau # Génération de la clé SSH si absente if not dest.key_name: from jobs.transfer import generate_key dest.key_name = generate_key(dest.name, app.config["DATA_DIR"]) db.session.commit() flash(f"Destination « {dest.name} » enregistrée.", "success") return redirect(url_for("destination_edit", dest_id=dest.id)) def _get_pub_key(dest): if not dest.key_name: return None from jobs.transfer import get_public_key return get_public_key(dest.key_name, app.config["DATA_DIR"]) # --- Paramètres -------------------------------------------------------------- _SETTING_KEYS = [ "smtp_host", "smtp_port", "smtp_user", "smtp_password", "smtp_from", "smtp_to", "smtp_tls", "smtp_ssl", "notify_on_success", "notify_on_error", ] def _get_setting(key, default=""): s = Setting.query.filter_by(key=key).first() return s.value if s else default @app.route("/settings", methods=["GET", "POST"]) def settings(): if request.method == "POST": action = request.form.get("action") if action == "test_smtp": from notifications import send_test_email try: send_test_email( host=request.form.get("smtp_host", "").strip(), port=int(request.form.get("smtp_port", 587) or 587), user=request.form.get("smtp_user", "").strip(), password=request.form.get("smtp_password", ""), from_addr=request.form.get("smtp_from", "").strip(), to_addr=request.form.get("smtp_to", "").strip(), use_ssl=request.form.get("smtp_ssl") == "1", use_tls=request.form.get("smtp_tls") == "1", ) flash("Email de test envoyé avec succès.", "success") except Exception as exc: flash(f"Échec du test SMTP : {exc}", "error") else: for key in _SETTING_KEYS: if key in ("smtp_tls", "smtp_ssl", "notify_on_success", "notify_on_error"): value = "1" if request.form.get(key) == "1" else "0" else: value = request.form.get(key, "").strip() s = Setting.query.filter_by(key=key).first() if s is None: s = Setting(key=key, value=value) db.session.add(s) else: s.value = value db.session.commit() flash("Paramètres enregistrés.", "success") return redirect(url_for("settings")) cfg = {k: _get_setting(k) for k in _SETTING_KEYS} cfg.setdefault("smtp_port", "587") cfg["smtp_tls"] = cfg.get("smtp_tls") or "1" cfg["smtp_ssl"] = cfg.get("smtp_ssl") or "0" cfg["notify_on_error"] = cfg.get("notify_on_error") or "1" api_token = app.config.get("API_TOKEN", "") instance_url = app.config.get("INSTANCE_URL", "") return render_template("settings.html", cfg=cfg, api_token=api_token, instance_url=instance_url) # --- Routes internes (usage formulaires) ------------------------------------- @app.route("/internal/databases/") def internal_databases(db_type): """Liste les bases de données disponibles pour le formulaire job.""" databases = [] try: if db_type == "mysql": result = subprocess.run( ["sudo", "mysql", "--skip-column-names", "-e", "SHOW DATABASES;"], capture_output=True, text=True, timeout=10, ) if result.returncode == 0: exclude = {"information_schema", "performance_schema", "mysql", "sys"} databases = [d.strip() for d in result.stdout.splitlines() if d.strip() and d.strip() not in exclude] elif db_type == "postgresql": result = subprocess.run( ["sudo", "-u", "postgres", "psql", "-Atc", "SELECT datname FROM pg_database WHERE datistemplate = false;"], capture_output=True, text=True, timeout=10, ) if result.returncode == 0: databases = [d.strip() for d in result.stdout.splitlines() if d.strip()] except Exception: pass return jsonify(databases) # --- API v1 ------------------------------------------------------------------ @app.route("/api/v1/health") def api_health(): return jsonify({"status": "ok", "instance": app.config.get("INSTANCE_NAME")}) @app.route("/api/v1/jobs") def api_jobs(): jobs = Job.query.all() return jsonify([ { "id": j.id, "name": j.name, "type": j.type, "cron_expr": j.cron_expr, "enabled": j.enabled, "retention_mode": j.retention_mode, "retention_value": j.retention_value, } for j in jobs ]) @app.route("/api/v1/jobs//runs") def api_job_runs(job_id): runs = Run.query.filter_by(job_id=job_id).order_by(Run.started_at.desc()).limit(50).all() return jsonify([ { "id": r.id, "started_at": r.started_at.isoformat() if r.started_at else None, "finished_at": r.finished_at.isoformat() if r.finished_at else None, "status": r.status, "archive_name": r.archive_name, "size_bytes": r.size_bytes, } for r in runs ]) @app.route("/api/v1/jobs//run", methods=["POST"]) def api_job_run(job_id): job = db.get_or_404(Job, job_id) from scheduler import _execute_job import threading threading.Thread(target=_execute_job, args=(job.id,), daemon=True).start() return jsonify({"status": "triggered", "job_id": job_id}) @app.route("/api/v1/archives") def api_archives(): backup_dir = app.config["YUNOHOST_BACKUP_DIR"] archives = [] try: from jobs.utils import sudo_listdir, sudo_getsize, sudo_getmtime for fname in sorted(sudo_listdir(backup_dir)): if fname.endswith(".tar"): path = os.path.join(backup_dir, fname) archives.append({ "name": fname[:-4], "size_bytes": sudo_getsize(path), "modified_at": datetime.utcfromtimestamp(sudo_getmtime(path)).isoformat(), }) except OSError: pass return jsonify(archives) @app.route("/api/v1/archives/", methods=["DELETE"]) def api_archive_delete(name): backup_dir = app.config["YUNOHOST_BACKUP_DIR"] from jobs.utils import sudo_exists for ext in (".tar", ".info.json"): path = os.path.join(backup_dir, name + ext) if sudo_exists(path): subprocess.run(["sudo", "rm", "-f", path], capture_output=True) return jsonify({"status": "deleted", "name": name}) @app.route("/api/v1/archives//info") def api_archive_info(name): return jsonify(_read_archive_info(name)) @app.route("/api/v1/archives//restore", methods=["POST"]) def api_archive_restore(name): restore_run_id, _ = _start_restore(name) return jsonify({"status": "started", "run_id": restore_run_id}) @app.route("/api/v1/archives//restore/status") def api_archive_restore_status(name): run = (Run.query .filter(Run.archive_name == name, Run.log_text.like("[RESTAURATION%")) .order_by(Run.started_at.desc()) .first()) if not run: return jsonify({"error": "Aucune restauration trouvée pour cette archive."}), 404 return jsonify({ "status": run.status, "log": run.log_text, "started_at": run.started_at.isoformat() if run.started_at else None, "finished_at": run.finished_at.isoformat() if run.finished_at else None, }) @app.route("/api/v1/summary") def api_summary(): jobs = Job.query.all() result = [] for job in jobs: last_run = (Run.query.filter_by(job_id=job.id) .order_by(Run.started_at.desc()).first()) result.append({ "id": job.id, "name": job.name, "type": job.type, "cron_expr": job.cron_expr, "enabled": job.enabled, "last_run": { "id": last_run.id, "started_at": last_run.started_at.isoformat() if last_run.started_at else None, "status": last_run.status, "archive_name": last_run.archive_name, "size_bytes": last_run.size_bytes, } if last_run else None, }) return jsonify({"instance": app.config.get("INSTANCE_NAME"), "jobs": result}) # --- Upload chunked ----------------------------------------------------------- @app.route("/api/v1/archives/upload/start", methods=["POST"]) def api_upload_start(): data = request.get_json(force=True) or {} filename = data.get("filename", "") total_size = int(data.get("total_size", 0)) chunk_size = int(data.get("chunk_size", 50 * 1024 * 1024)) chunks_total = int(data.get("chunks_total", math.ceil(total_size / chunk_size) if chunk_size else 1)) checksum = data.get("checksum", "") if not filename: return jsonify({"error": "filename requis"}), 400 upload_id = str(uuid.uuid4()) upload = Upload( upload_id=upload_id, filename=filename, total_size=total_size, chunk_size=chunk_size, chunks_total=chunks_total, chunks_received=0, checksum=checksum, status="pending", ) db.session.add(upload) db.session.commit() return jsonify({"upload_id": upload_id, "chunks_total": chunks_total}) @app.route("/api/v1/archives/upload//chunk/", methods=["POST"]) def api_upload_chunk(upload_id, n): upload = db.get_or_404(Upload, upload_id) if upload.status == "complete": return jsonify({"error": "upload déjà terminé"}), 400 tmp_dir = os.path.join(app.config["DATA_DIR"], "uploads", upload_id) os.makedirs(tmp_dir, exist_ok=True) chunk_path = os.path.join(tmp_dir, f"chunk_{n:06d}") with open(chunk_path, "wb") as f: f.write(request.data) upload.chunks_received = (upload.chunks_received or 0) + 1 upload.status = "in_progress" db.session.commit() return jsonify({"chunk": n, "received": upload.chunks_received}) @app.route("/api/v1/archives/upload//finish", methods=["POST"]) def api_upload_finish(upload_id): upload = db.get_or_404(Upload, upload_id) tmp_dir = os.path.join(app.config["DATA_DIR"], "uploads", upload_id) backup_dir = app.config["YUNOHOST_BACKUP_DIR"] chunk_files = sorted(glob.glob(os.path.join(tmp_dir, "chunk_*"))) if not chunk_files: return jsonify({"error": "aucun chunk reçu"}), 400 tmp_archive = os.path.join(tmp_dir, upload.filename) sha256 = hashlib.sha256() with open(tmp_archive, "wb") as out: for chunk_file in chunk_files: with open(chunk_file, "rb") as f: data = f.read() out.write(data) sha256.update(data) if upload.checksum and sha256.hexdigest() != upload.checksum: upload.status = "error" db.session.commit() shutil.rmtree(tmp_dir, ignore_errors=True) return jsonify({"error": "checksum invalide"}), 400 dest_path = os.path.join(backup_dir, upload.filename) result = subprocess.run( ["sudo", "rsync", tmp_archive, dest_path], capture_output=True, text=True, ) if result.returncode != 0: upload.status = "error" db.session.commit() shutil.rmtree(tmp_dir, ignore_errors=True) return jsonify({"error": result.stderr.strip()}), 500 # .info.json optionnel transmis dans le body JSON data = request.get_json(silent=True) or {} info_json_str = data.get("info_json") if info_json_str: archive_base = upload.filename[:-4] if upload.filename.endswith(".tar") else upload.filename tmp_info = os.path.join(tmp_dir, archive_base + ".info.json") with open(tmp_info, "w") as f: f.write(info_json_str) subprocess.run( ["sudo", "rsync", tmp_info, os.path.join(backup_dir, archive_base + ".info.json")], capture_output=True, ) shutil.rmtree(tmp_dir, ignore_errors=True) upload.status = "complete" db.session.commit() return jsonify({"status": "complete", "filename": upload.filename}) @app.route("/api/v1/archives//info-json-download") def api_archive_info_json_download(name): """Téléchargement du .info.json via sudo rsync (pour pull inter-instances).""" from jobs.utils import sudo_exists backup_dir = app.config["YUNOHOST_BACKUP_DIR"] info_path = os.path.join(backup_dir, name + ".info.json") if not sudo_exists(info_path): return jsonify({"error": "info.json introuvable"}), 404 tmp_path = f"/tmp/backupmanager_dl_{name}.info.json" content = None try: result = subprocess.run(["sudo", "rsync", info_path, tmp_path], capture_output=True, text=True) if result.returncode != 0: return jsonify({"error": result.stderr.strip()}), 500 with open(tmp_path, "rb") as f: content = f.read() except Exception as exc: return jsonify({"error": str(exc)}), 500 finally: subprocess.run(["sudo", "rm", "-rf", tmp_path], capture_output=True) from flask import Response as _R return _R(content, mimetype="application/json") @app.route("/api/v1/archives//download") def api_archive_download(name): """Téléchargement d'une archive via sudo rsync vers /tmp (pour pull inter-instances).""" from flask import Response, stream_with_context from jobs.utils import sudo_exists backup_dir = app.config["YUNOHOST_BACKUP_DIR"] archive_path = os.path.join(backup_dir, name + ".tar") if not sudo_exists(archive_path): return jsonify({"error": "archive introuvable"}), 404 tmp_path = f"/tmp/backupmanager_dl_{name}.tar" try: result = subprocess.run( ["sudo", "rsync", archive_path, tmp_path], capture_output=True, text=True, timeout=3600, ) if result.returncode != 0: return jsonify({"error": result.stderr.strip()}), 500 def stream_and_cleanup(): try: with open(tmp_path, "rb") as f: while True: chunk = f.read(1024 * 1024) if not chunk: break yield chunk finally: if os.path.exists(tmp_path): os.unlink(tmp_path) return Response( stream_with_context(stream_and_cleanup()), mimetype="application/octet-stream", headers={"Content-Disposition": f'attachment; filename="{name}.tar"'}, ) except Exception as exc: if os.path.exists(tmp_path): os.unlink(tmp_path) return jsonify({"error": str(exc)}), 500 @app.route("/api/v1/archives/upload/", methods=["DELETE"]) def api_upload_cancel(upload_id): upload = db.get_or_404(Upload, upload_id) tmp_dir = os.path.join(app.config["DATA_DIR"], "uploads", upload_id) shutil.rmtree(tmp_dir, ignore_errors=True) db.session.delete(upload) db.session.commit() return jsonify({"status": "cancelled"}) # --- Instances distantes (3B) ------------------------------------------------- @app.route("/remote-instances") def remote_instances_list(): instances = RemoteInstance.query.order_by(RemoteInstance.name).all() return render_template("remote_instances.html", instances=instances) @app.route("/remote-instances/new", methods=["GET", "POST"]) def remote_instance_new(): if request.method == "POST": return _save_remote_instance(None) return render_template("remote_instance_form.html", inst=None) @app.route("/remote-instances//edit", methods=["GET", "POST"]) def remote_instance_edit(inst_id): inst = db.get_or_404(RemoteInstance, inst_id) if request.method == "POST": return _save_remote_instance(inst) return render_template("remote_instance_form.html", inst=inst) @app.route("/remote-instances//delete", methods=["POST"]) def remote_instance_delete(inst_id): inst = db.get_or_404(RemoteInstance, inst_id) db.session.delete(inst) db.session.commit() flash(f"Instance « {inst.name} » supprimée.", "success") return redirect(url_for("remote_instances_list")) @app.route("/remote-instances//test", methods=["POST"]) def remote_instance_test(inst_id): inst = db.get_or_404(RemoteInstance, inst_id) from federation.client import FederationClient try: data = FederationClient(inst).health() inst.status = "online" inst.last_seen = datetime.utcnow() db.session.commit() flash(f"Instance « {inst.name} » en ligne — {data.get('instance', '?')}.", "success") except Exception as exc: inst.status = "error" db.session.commit() flash(f"Connexion échouée vers « {inst.name} » : {exc}", "error") return redirect(url_for("remote_instances_list")) @app.route("/remote-instances//sync", methods=["POST"]) def remote_instance_sync(inst_id): inst = db.get_or_404(RemoteInstance, inst_id) from federation.client import sync_instance try: sync_instance(inst) flash(f"Instance « {inst.name} » synchronisée.", "success") except Exception as exc: flash(f"Synchronisation échouée pour « {inst.name} » : {exc}", "error") return redirect(url_for("remote_instances_list")) @app.route("/network") def dashboard_network(): local_jobs = Job.query.order_by(Job.name).all() local_jobs_data = [] for job in local_jobs: run = Run.query.filter_by(job_id=job.id).order_by(Run.started_at.desc()).first() local_jobs_data.append(_JobRow( job_id=job.id, name=job.name, type=job.type, last_run_at=run.started_at if run else None, last_status=run.status if run else None, last_archive_name=run.archive_name if run else None, last_size_bytes=run.size_bytes if run else None, )) instances = RemoteInstance.query.order_by(RemoteInstance.name).all() return render_template("dashboard_network.html", local_jobs_data=local_jobs_data, instances=instances, instances_for_push=instances) @app.route("/network/sync-all", methods=["POST"]) def network_sync_all(): from federation.client import sync_instance instances = RemoteInstance.query.all() errors = [] for inst in instances: try: sync_instance(inst) except Exception as exc: errors.append(f"{inst.name}: {exc}") if errors: flash("Synchronisation partielle — " + " | ".join(errors), "error") else: flash(f"{len(instances)} instance(s) synchronisée(s).", "success") return redirect(url_for("dashboard_network")) @app.route("/remote-instances//run-job/", methods=["POST"]) def remote_job_run(inst_id, job_id): inst = db.get_or_404(RemoteInstance, inst_id) from federation.client import FederationClient try: FederationClient(inst).run_job(job_id) flash(f"Job déclenché sur « {inst.name} ».", "success") except Exception as exc: flash(f"Impossible de lancer le job sur « {inst.name} » : {exc}", "error") return redirect(url_for("dashboard_network")) @app.route("/archives//push/", methods=["POST"]) def archive_push(archive_name, inst_id): inst = db.get_or_404(RemoteInstance, inst_id) threading.Thread(target=_do_push_archive, args=(archive_name, inst.id), daemon=True).start() flash(f"Envoi de « {archive_name} » vers « {inst.name} » démarré en arrière-plan.", "success") return redirect(request.referrer or url_for("index")) @app.route("/remote-instances//pull-latest/", methods=["POST"]) def archive_pull_latest(inst_id, remote_job_id): inst = db.get_or_404(RemoteInstance, inst_id) threading.Thread(target=_do_pull_latest, args=(inst.id, remote_job_id), daemon=True).start() flash(f"Rapatriement depuis « {inst.name} » démarré en arrière-plan.", "success") return redirect(url_for("dashboard_network")) def _do_push_archive(archive_name, inst_id): """Pousse une archive locale vers une instance distante via HTTP chunked.""" import hashlib as _hashlib from federation.client import FederationClient from jobs.utils import sudo_exists with app.app_context(): inst = db.session.get(RemoteInstance, inst_id) backup_dir = app.config["YUNOHOST_BACKUP_DIR"] archive_path = os.path.join(backup_dir, archive_name + ".tar") tmp_path = None try: # Copie vers /tmp accessible par l'app tmp_path = f"/tmp/backupmanager_push_{archive_name}.tar" result = subprocess.run( ["sudo", "rsync", archive_path, tmp_path], capture_output=True, text=True, ) if result.returncode != 0: raise RuntimeError(f"Copie locale échouée : {result.stderr.strip()}") total_size = os.path.getsize(tmp_path) sha256 = _hashlib.sha256() chunk_size = 50 * 1024 * 1024 with open(tmp_path, "rb") as f: while True: data = f.read(65536) if not data: break sha256.update(data) checksum = sha256.hexdigest() client = FederationClient(inst) upload_info = client.upload_start(archive_name + ".tar", total_size, checksum, chunk_size) upload_id = upload_info["upload_id"] with open(tmp_path, "rb") as f: n = 0 while True: data = f.read(chunk_size) if not data: break client.upload_chunk(upload_id, n, data) n += 1 # Finish + transmettre le .info.json si présent info_json_content = None info_path = os.path.join(backup_dir, archive_name + ".info.json") if sudo_exists(info_path): r = subprocess.run(["sudo", "cat", info_path], capture_output=True) if r.returncode == 0: info_json_content = r.stdout.decode("utf-8", errors="replace") client.upload_finish_with_info(upload_id, info_json_content) app.logger.info(f"Push {archive_name} → {inst.name} OK") except Exception as exc: app.logger.error(f"Push {archive_name} → {inst.name} échoué : {exc}") finally: if tmp_path and os.path.exists(tmp_path): os.unlink(tmp_path) def _do_pull_latest(inst_id, remote_job_id): """Rapatrie la dernière archive d'un job distant (.tar + .info.json).""" from federation.client import FederationClient, sync_instance with app.app_context(): inst = db.session.get(RemoteInstance, inst_id) backup_dir = app.config["YUNOHOST_BACKUP_DIR"] try: client = FederationClient(inst) # Sync pour obtenir la dernière archive sync_instance(inst) db.session.refresh(inst) # Récupère le dernier run de ce job distant runs = client.get_job_runs(remote_job_id) if not runs: raise RuntimeError(f"Aucun run distant pour le job {remote_job_id}") archive_name = runs[0].get("archive_name") if not archive_name: raise RuntimeError("Le dernier run distant n'a pas d'archive.") # Télécharge le .tar archive_bytes = client.download_archive(archive_name) tmp_tar = f"/tmp/backupmanager_pull_{archive_name}.tar" with open(tmp_tar, "wb") as f: f.write(archive_bytes) subprocess.run(["sudo", "rsync", tmp_tar, os.path.join(backup_dir, archive_name + ".tar")], check=True) os.unlink(tmp_tar) # Télécharge le .info.json si disponible info_bytes = client.download_info_json(archive_name) if info_bytes: tmp_info = f"/tmp/backupmanager_pull_{archive_name}.info.json" with open(tmp_info, "wb") as f: f.write(info_bytes) subprocess.run(["sudo", "rsync", tmp_info, os.path.join(backup_dir, archive_name + ".info.json")], check=True) os.unlink(tmp_info) else: app.logger.warning(f"Pull {archive_name}: .info.json absent ou inaccessible sur {inst.name}") app.logger.info(f"Pull {archive_name} ← {inst.name} OK") except Exception as exc: app.logger.error(f"Pull ← {inst.name} échoué : {exc}") class _JobRow: """DTO pour le dashboard réseau (local et distant).""" def __init__(self, job_id, name, type, last_run_at, last_status, last_archive_name, last_size_bytes): self.job_id = job_id self.name = name self.type = type self.last_run_at = last_run_at self.last_status = last_status self.last_archive_name = last_archive_name self.last_size_bytes = last_size_bytes @property def size_human(self): from db import _size_human return _size_human(self.last_size_bytes) def _save_remote_instance(inst): f = request.form name = f.get("name", "").strip() url = f.get("url", "").strip().rstrip("/") api_key = f.get("api_key", "").strip() if not name or not url or not api_key: flash("Nom, URL et token API sont requis.", "error") return render_template("remote_instance_form.html", inst=inst) if inst is None: inst = RemoteInstance() db.session.add(inst) inst.name = name inst.url = url inst.api_key = api_key db.session.commit() flash(f"Instance « {inst.name} » enregistrée.", "success") return redirect(url_for("remote_instances_list"))