import json import logging import os import subprocess from datetime import datetime from flask import ( Flask, flash, jsonify, redirect, render_template, request, url_for, ) from werkzeug.middleware.proxy_fix import ProxyFix # --- Configuration ----------------------------------------------------------- _config_path = os.environ.get( "BACKUPMANAGER_CONFIG", os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.py"), ) app = Flask(__name__) app.config.from_pyfile(_config_path) app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///" + app.config["DB_PATH"] app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False # Proxy headers Nginx → Flask (sous-chemin + HTTPS) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) # Filtre Jinja2 pour désérialiser du JSON dans les templates app.jinja_env.filters["fromjson"] = json.loads # Logging os.makedirs(os.path.dirname(app.config["LOG_PATH"]), exist_ok=True) logging.basicConfig( filename=app.config["LOG_PATH"], level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) # --- Extensions -------------------------------------------------------------- from db import db, Job, Run, Destination, Setting db.init_app(app) from scheduler import init_scheduler, schedule_job, remove_job # --- Démarrage --------------------------------------------------------------- with app.app_context(): db.create_all() init_scheduler(app) for _job in Job.query.filter_by(enabled=True).all(): schedule_job(_job) # --- Auth API ---------------------------------------------------------------- @app.before_request def _check_api_auth(): if not request.path.startswith("/api/"): return if request.path == "/api/v1/health": return token = request.headers.get("X-BackupManager-Key", "") if token != app.config["API_TOKEN"]: return jsonify({"error": "Unauthorized"}), 401 # --- Context processors ------------------------------------------------------ @app.context_processor def _inject_globals(): return { "instance_name": app.config.get("INSTANCE_NAME", ""), "now": datetime.utcnow(), } # --- Helpers ----------------------------------------------------------------- def _read_archive_info(archive_name): backup_dir = app.config["YUNOHOST_BACKUP_DIR"] archive_path = os.path.join(backup_dir, archive_name + ".tar") try: import tarfile as _tarfile with _tarfile.open(archive_path) as tar: member = tar.extractfile("backup_info.json") if member: return json.loads(member.read()) except Exception: pass return {} def _get_ynh_apps(): try: result = subprocess.run( ["sudo", "yunohost", "app", "list", "--output-as", "json"], capture_output=True, text=True, timeout=15, ) if result.returncode == 0: return json.loads(result.stdout).get("apps", []) except Exception: pass return [] # --- Routes dashboard -------------------------------------------------------- @app.route("/") def index(): jobs = Job.query.order_by(Job.name).all() last_runs = { j.id: Run.query.filter_by(job_id=j.id).order_by(Run.started_at.desc()).first() for j in jobs } return render_template("dashboard_local.html", jobs=jobs, last_runs=last_runs) @app.route("/jobs/new", methods=["GET", "POST"]) def job_new(): if request.method == "POST": return _save_job(None) return render_template("job_form.html", job=None, ynh_apps=_get_ynh_apps(), destinations=Destination.query.filter_by(enabled=True).all()) @app.route("/jobs//edit", methods=["GET", "POST"]) def job_edit(job_id): job = db.get_or_404(Job, job_id) if request.method == "POST": return _save_job(job) return render_template("job_form.html", job=job, ynh_apps=_get_ynh_apps(), destinations=Destination.query.filter_by(enabled=True).all()) @app.route("/jobs//delete", methods=["POST"]) def job_delete(job_id): job = db.get_or_404(Job, job_id) remove_job(job.id) db.session.delete(job) db.session.commit() flash(f"Job « {job.name} » supprimé.", "success") return redirect(url_for("index")) @app.route("/jobs//run", methods=["POST"]) def job_run_now(job_id): job = db.get_or_404(Job, job_id) from scheduler import _execute_job import threading t = threading.Thread(target=_execute_job, args=(job.id,), daemon=True) t.start() flash(f"Job « {job.name} » lancé manuellement.", "success") return redirect(url_for("index")) @app.route("/jobs//history") def job_history(job_id): job = db.get_or_404(Job, job_id) runs = Run.query.filter_by(job_id=job_id).order_by(Run.started_at.desc()).limit(100).all() return render_template("job_history.html", job=job, runs=runs) @app.route("/archives//restore", methods=["GET", "POST"]) def archive_restore(archive_name): info = _read_archive_info(archive_name) archive_type = info.get("type", "") if request.method == "GET": return render_template("restore_confirm.html", archive_name=archive_name, info=info) def _do_restore(): with app.app_context(): try: backup_dir = app.config["YUNOHOST_BACKUP_DIR"] if archive_type == "custom_dir": from jobs.custom_dir import restore_custom_dir restore_custom_dir(archive_name, backup_dir) elif archive_type in ("mysql", "postgresql"): from jobs.db_dump import restore_db_dump restore_db_dump(archive_name, backup_dir) else: raise NotImplementedError( f"Restauration non supportée pour le type '{archive_type}'." ) except Exception as exc: app.logger.error(f"Restauration {archive_name} échouée : {exc}") import threading threading.Thread(target=_do_restore, daemon=True).start() flash(f"Restauration de « {archive_name} » démarrée en arrière-plan.", "success") return redirect(url_for("index")) @app.route("/jobs//toggle", methods=["POST"]) def job_toggle(job_id): job = db.get_or_404(Job, job_id) job.enabled = not job.enabled job.updated_at = datetime.utcnow() db.session.commit() if job.enabled: schedule_job(job) flash(f"Job « {job.name} » activé.", "success") else: remove_job(job.id) flash(f"Job « {job.name} » désactivé.", "info") return redirect(url_for("index")) def _save_job(job): f = request.form job_type = f.get("type", "") name = f.get("name", "").strip() if not name: flash("Le nom est requis.", "error") return render_template("job_form.html", job=job, ynh_apps=_get_ynh_apps(), destinations=Destination.query.filter_by(enabled=True).all()) cfg = {} if job_type == "ynh_app": cfg = {"app_id": f.get("app_id", ""), "core_only": f.get("core_only") == "1"} elif job_type == "ynh_system": cfg = {} elif job_type in ("mysql", "postgresql"): dbname = f.get("db_database", "").strip() if not dbname: flash("Le nom de la base de données est requis.", "error") return render_template("job_form.html", job=job, ynh_apps=_get_ynh_apps(), destinations=Destination.query.filter_by(enabled=True).all()) cfg = {"database": dbname} elif job_type == "custom_dir": source_path = f.get("source_path", "").strip().rstrip("/") if not source_path or not source_path.startswith("/"): flash("Le chemin source doit être un chemin absolu (ex: /opt/monapp).", "error") return render_template("job_form.html", job=job, ynh_apps=_get_ynh_apps(), destinations=Destination.query.filter_by(enabled=True).all()) excludes = [e.strip() for e in f.get("excludes", "").splitlines() if e.strip()] restore_cfg = {} user_name = f.get("restore_user_name", "").strip() if user_name: restore_cfg["system_user"] = { "name": user_name, "home": f.get("restore_user_home", source_path).strip() or source_path, "shell": f.get("restore_user_shell", "/bin/false").strip() or "/bin/false", } service_name = f.get("restore_service_name", "").strip() if service_name: restore_cfg["systemd_service"] = { "name": service_name, "service_file": f.get("restore_service_file", "").strip(), } owner = f.get("restore_perm_owner", "").strip() mode = f.get("restore_perm_mode", "").strip() if owner or mode: restore_cfg["permissions"] = {} if owner: restore_cfg["permissions"]["owner"] = owner if mode: restore_cfg["permissions"]["mode"] = mode post_cmds = [c.strip() for c in f.get("restore_post_cmds", "").splitlines() if c.strip()] if post_cmds: restore_cfg["post_restore_commands"] = post_cmds cfg = {"source_path": source_path, "excludes": excludes, "restore": restore_cfg} if job is None: job = Job() db.session.add(job) dest_id = f.get("destination_id", "").strip() job.name = name job.type = job_type job.config_json = json.dumps(cfg) job.cron_expr = f.get("cron_expr", "0 3 * * *").strip() job.retention_mode = f.get("retention_mode", "count") job.retention_value = int(f.get("retention_value", 7)) job.enabled = f.get("enabled") == "1" job.core_only = cfg.get("core_only", False) job.destination_id = int(dest_id) if dest_id else None job.updated_at = datetime.utcnow() db.session.commit() if job.enabled: schedule_job(job) else: remove_job(job.id) flash(f"Job « {job.name} » enregistré.", "success") return redirect(url_for("index")) # --- Destinations ------------------------------------------------------------ @app.route("/destinations") def destinations_list(): destinations = Destination.query.order_by(Destination.name).all() return render_template("destinations.html", destinations=destinations) @app.route("/destinations/new", methods=["GET", "POST"]) def destination_new(): if request.method == "POST": return _save_destination(None) return render_template("destination_form.html", dest=None) @app.route("/destinations//edit", methods=["GET", "POST"]) def destination_edit(dest_id): dest = db.get_or_404(Destination, dest_id) if request.method == "POST": return _save_destination(dest) pub_key = _get_pub_key(dest) return render_template("destination_form.html", dest=dest, pub_key=pub_key) @app.route("/destinations//delete", methods=["POST"]) def destination_delete(dest_id): dest = db.get_or_404(Destination, dest_id) db.session.delete(dest) db.session.commit() flash(f"Destination « {dest.name} » supprimée.", "success") return redirect(url_for("destinations_list")) @app.route("/destinations//test", methods=["POST"]) def destination_test(dest_id): dest = db.get_or_404(Destination, dest_id) from jobs.transfer import test_connection ok, msg = test_connection(dest, app.config["DATA_DIR"]) flash(msg, "success" if ok else "error") return redirect(url_for("destinations_list")) @app.route("/archives//transfer", methods=["POST"]) def archive_transfer(archive_name): dest_id = request.form.get("destination_id", type=int) dest = db.get_or_404(Destination, dest_id) def _do_transfer(): with app.app_context(): try: from jobs.transfer import transfer_archive transfer_archive(archive_name, dest, app.config["YUNOHOST_BACKUP_DIR"], app.config["DATA_DIR"]) app.logger.info(f"Transfert {archive_name} → {dest.remote_str} OK") except Exception as exc: app.logger.error(f"Transfert {archive_name} échoué : {exc}") import threading threading.Thread(target=_do_transfer, daemon=True).start() flash(f"Transfert de « {archive_name} » vers {dest.remote_str} démarré.", "success") return redirect(request.referrer or url_for("index")) def _save_destination(dest): f = request.form name = f.get("name", "").strip() host = f.get("host", "").strip() if not name or not host: flash("Nom et hôte sont requis.", "error") return render_template("destination_form.html", dest=dest) is_new = dest is None if is_new: dest = Destination() db.session.add(dest) dest.name = name dest.host = host dest.port = int(f.get("port", 22) or 22) dest.user = f.get("user", "root").strip() or "root" dest.remote_path = f.get("remote_path", "/home/yunohost.backup/archives").strip() dest.enabled = f.get("enabled") == "1" db.session.flush() # obtenir l'id si nouveau # Génération de la clé SSH si absente if not dest.key_name: from jobs.transfer import generate_key dest.key_name = generate_key(dest.name, app.config["DATA_DIR"]) db.session.commit() flash(f"Destination « {dest.name} » enregistrée.", "success") return redirect(url_for("destination_edit", dest_id=dest.id)) def _get_pub_key(dest): if not dest.key_name: return None from jobs.transfer import get_public_key return get_public_key(dest.key_name, app.config["DATA_DIR"]) # --- Paramètres -------------------------------------------------------------- _SETTING_KEYS = [ "smtp_host", "smtp_port", "smtp_user", "smtp_password", "smtp_from", "smtp_to", "smtp_tls", "smtp_ssl", "notify_on_success", "notify_on_error", ] def _get_setting(key, default=""): s = Setting.query.filter_by(key=key).first() return s.value if s else default @app.route("/settings", methods=["GET", "POST"]) def settings(): if request.method == "POST": action = request.form.get("action") if action == "test_smtp": from notifications import send_test_email try: send_test_email( host=request.form.get("smtp_host", "").strip(), port=int(request.form.get("smtp_port", 587) or 587), user=request.form.get("smtp_user", "").strip(), password=request.form.get("smtp_password", ""), from_addr=request.form.get("smtp_from", "").strip(), to_addr=request.form.get("smtp_to", "").strip(), use_ssl=request.form.get("smtp_ssl") == "1", use_tls=request.form.get("smtp_tls") == "1", ) flash("Email de test envoyé avec succès.", "success") except Exception as exc: flash(f"Échec du test SMTP : {exc}", "error") else: for key in _SETTING_KEYS: if key in ("smtp_tls", "smtp_ssl", "notify_on_success", "notify_on_error"): value = "1" if request.form.get(key) == "1" else "0" else: value = request.form.get(key, "").strip() s = Setting.query.filter_by(key=key).first() if s is None: s = Setting(key=key, value=value) db.session.add(s) else: s.value = value db.session.commit() flash("Paramètres enregistrés.", "success") return redirect(url_for("settings")) cfg = {k: _get_setting(k) for k in _SETTING_KEYS} # valeurs par défaut pour l'affichage cfg.setdefault("smtp_port", "587") cfg["smtp_tls"] = cfg.get("smtp_tls") or "1" cfg["smtp_ssl"] = cfg.get("smtp_ssl") or "0" cfg["notify_on_error"] = cfg.get("notify_on_error") or "1" return render_template("settings.html", cfg=cfg) # --- API v1 ------------------------------------------------------------------ @app.route("/api/v1/health") def api_health(): return jsonify({"status": "ok", "instance": app.config.get("INSTANCE_NAME")}) @app.route("/api/v1/jobs") def api_jobs(): jobs = Job.query.all() return jsonify([ { "id": j.id, "name": j.name, "type": j.type, "cron_expr": j.cron_expr, "enabled": j.enabled, "retention_mode": j.retention_mode, "retention_value": j.retention_value, } for j in jobs ]) @app.route("/api/v1/jobs//runs") def api_job_runs(job_id): runs = Run.query.filter_by(job_id=job_id).order_by(Run.started_at.desc()).limit(50).all() return jsonify([ { "id": r.id, "started_at": r.started_at.isoformat() if r.started_at else None, "finished_at": r.finished_at.isoformat() if r.finished_at else None, "status": r.status, "archive_name": r.archive_name, "size_bytes": r.size_bytes, } for r in runs ]) @app.route("/api/v1/jobs//run", methods=["POST"]) def api_job_run(job_id): job = db.get_or_404(Job, job_id) from scheduler import _execute_job import threading threading.Thread(target=_execute_job, args=(job.id,), daemon=True).start() return jsonify({"status": "triggered", "job_id": job_id}) @app.route("/api/v1/archives") def api_archives(): backup_dir = app.config["YUNOHOST_BACKUP_DIR"] archives = [] try: for fname in sorted(os.listdir(backup_dir)): if fname.endswith(".tar"): path = os.path.join(backup_dir, fname) archives.append({ "name": fname[:-4], "size_bytes": os.path.getsize(path), "modified_at": datetime.utcfromtimestamp(os.path.getmtime(path)).isoformat(), }) except OSError: pass return jsonify(archives) @app.route("/api/v1/archives/", methods=["DELETE"]) def api_archive_delete(name): backup_dir = app.config["YUNOHOST_BACKUP_DIR"] for ext in (".tar", ".info.json"): path = os.path.join(backup_dir, name + ext) if os.path.exists(path): os.remove(path) return jsonify({"status": "deleted", "name": name})