import json import logging import os import subprocess from datetime import datetime from flask import ( Flask, flash, jsonify, redirect, render_template, request, url_for, ) from werkzeug.middleware.proxy_fix import ProxyFix # --- Configuration ----------------------------------------------------------- _config_path = os.environ.get( "BACKUPMANAGER_CONFIG", os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.py"), ) app = Flask(__name__) app.config.from_pyfile(_config_path) app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///" + app.config["DB_PATH"] app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False # Proxy headers Nginx → Flask (sous-chemin + HTTPS) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) # Filtre Jinja2 pour désérialiser du JSON dans les templates app.jinja_env.filters["fromjson"] = json.loads # Logging os.makedirs(os.path.dirname(app.config["LOG_PATH"]), exist_ok=True) logging.basicConfig( filename=app.config["LOG_PATH"], level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", ) # --- Extensions -------------------------------------------------------------- from db import db, Job, Run db.init_app(app) from scheduler import init_scheduler, schedule_job, remove_job # --- Démarrage --------------------------------------------------------------- with app.app_context(): db.create_all() init_scheduler(app) for _job in Job.query.filter_by(enabled=True).all(): schedule_job(_job) # --- Auth API ---------------------------------------------------------------- @app.before_request def _check_api_auth(): if not request.path.startswith("/api/"): return if request.path == "/api/v1/health": return token = request.headers.get("X-BackupManager-Key", "") if token != app.config["API_TOKEN"]: return jsonify({"error": "Unauthorized"}), 401 # --- Context processors ------------------------------------------------------ @app.context_processor def _inject_globals(): return { "instance_name": app.config.get("INSTANCE_NAME", ""), "now": datetime.utcnow(), } # --- Helpers ----------------------------------------------------------------- def _get_ynh_apps(): try: result = subprocess.run( ["sudo", "yunohost", "app", "list", "--output-as", "json"], capture_output=True, text=True, timeout=15, ) if result.returncode == 0: return json.loads(result.stdout).get("apps", []) except Exception: pass return [] # --- Routes dashboard -------------------------------------------------------- @app.route("/") def index(): jobs = Job.query.order_by(Job.name).all() last_runs = { j.id: Run.query.filter_by(job_id=j.id).order_by(Run.started_at.desc()).first() for j in jobs } return render_template("dashboard_local.html", jobs=jobs, last_runs=last_runs) @app.route("/jobs/new", methods=["GET", "POST"]) def job_new(): if request.method == "POST": return _save_job(None) return render_template("job_form.html", job=None, ynh_apps=_get_ynh_apps()) @app.route("/jobs//edit", methods=["GET", "POST"]) def job_edit(job_id): job = db.get_or_404(Job, job_id) if request.method == "POST": return _save_job(job) return render_template("job_form.html", job=job, ynh_apps=_get_ynh_apps()) @app.route("/jobs//delete", methods=["POST"]) def job_delete(job_id): job = db.get_or_404(Job, job_id) remove_job(job.id) db.session.delete(job) db.session.commit() flash(f"Job « {job.name} » supprimé.", "success") return redirect(url_for("index")) @app.route("/jobs//run", methods=["POST"]) def job_run_now(job_id): job = db.get_or_404(Job, job_id) from scheduler import _execute_job import threading t = threading.Thread(target=_execute_job, args=(job.id,), daemon=True) t.start() flash(f"Job « {job.name} » lancé manuellement.", "success") return redirect(url_for("index")) @app.route("/jobs//history") def job_history(job_id): job = db.get_or_404(Job, job_id) runs = Run.query.filter_by(job_id=job_id).order_by(Run.started_at.desc()).limit(100).all() return render_template("job_history.html", job=job, runs=runs) @app.route("/jobs//toggle", methods=["POST"]) def job_toggle(job_id): job = db.get_or_404(Job, job_id) job.enabled = not job.enabled job.updated_at = datetime.utcnow() db.session.commit() if job.enabled: schedule_job(job) flash(f"Job « {job.name} » activé.", "success") else: remove_job(job.id) flash(f"Job « {job.name} » désactivé.", "info") return redirect(url_for("index")) def _save_job(job): f = request.form job_type = f.get("type", "") name = f.get("name", "").strip() if not name: flash("Le nom est requis.", "error") return render_template("job_form.html", job=job, ynh_apps=_get_ynh_apps()) cfg = {} if job_type == "ynh_app": cfg = {"app_id": f.get("app_id", ""), "core_only": f.get("core_only") == "1"} elif job_type == "ynh_system": cfg = {} elif job_type in ("mysql", "postgresql"): dbname = f.get("db_database", "").strip() if not dbname: flash("Le nom de la base de données est requis.", "error") return render_template("job_form.html", job=job, ynh_apps=_get_ynh_apps()) cfg = {"database": dbname} if job is None: job = Job() db.session.add(job) job.name = name job.type = job_type job.config_json = json.dumps(cfg) job.cron_expr = f.get("cron_expr", "0 3 * * *").strip() job.retention_mode = f.get("retention_mode", "count") job.retention_value = int(f.get("retention_value", 7)) job.enabled = f.get("enabled") == "1" job.core_only = cfg.get("core_only", False) job.updated_at = datetime.utcnow() db.session.commit() if job.enabled: schedule_job(job) else: remove_job(job.id) flash(f"Job « {job.name} » enregistré.", "success") return redirect(url_for("index")) # --- API v1 ------------------------------------------------------------------ @app.route("/api/v1/health") def api_health(): return jsonify({"status": "ok", "instance": app.config.get("INSTANCE_NAME")}) @app.route("/api/v1/jobs") def api_jobs(): jobs = Job.query.all() return jsonify([ { "id": j.id, "name": j.name, "type": j.type, "cron_expr": j.cron_expr, "enabled": j.enabled, "retention_mode": j.retention_mode, "retention_value": j.retention_value, } for j in jobs ]) @app.route("/api/v1/jobs//runs") def api_job_runs(job_id): runs = Run.query.filter_by(job_id=job_id).order_by(Run.started_at.desc()).limit(50).all() return jsonify([ { "id": r.id, "started_at": r.started_at.isoformat() if r.started_at else None, "finished_at": r.finished_at.isoformat() if r.finished_at else None, "status": r.status, "archive_name": r.archive_name, "size_bytes": r.size_bytes, } for r in runs ]) @app.route("/api/v1/jobs//run", methods=["POST"]) def api_job_run(job_id): job = db.get_or_404(Job, job_id) from scheduler import _execute_job import threading threading.Thread(target=_execute_job, args=(job.id,), daemon=True).start() return jsonify({"status": "triggered", "job_id": job_id}) @app.route("/api/v1/archives") def api_archives(): backup_dir = app.config["YUNOHOST_BACKUP_DIR"] archives = [] try: for fname in sorted(os.listdir(backup_dir)): if fname.endswith(".tar"): path = os.path.join(backup_dir, fname) archives.append({ "name": fname[:-4], "size_bytes": os.path.getsize(path), "modified_at": datetime.utcfromtimestamp(os.path.getmtime(path)).isoformat(), }) except OSError: pass return jsonify(archives) @app.route("/api/v1/archives/", methods=["DELETE"]) def api_archive_delete(name): backup_dir = app.config["YUNOHOST_BACKUP_DIR"] for ext in (".tar", ".info.json"): path = os.path.join(backup_dir, name + ext) if os.path.exists(path): os.remove(path) return jsonify({"status": "deleted", "name": name})