import json import subprocess import threading from datetime import datetime from flask import ( Blueprint, current_app, flash, redirect, render_template, request, url_for, ) from db import db, Job, Run, Destination, RemoteInstance from helpers import read_archive_info, get_ynh_apps bp = Blueprint("jobs", __name__) # --- Dashboard local ---------------------------------------------------------- @bp.route("/") def index(): from db import RemoteInstance jobs = Job.query.order_by(Job.name).all() last_runs = { j.id: Run.query.filter_by(job_id=j.id).order_by(Run.started_at.desc()).first() for j in jobs } instances = RemoteInstance.query.order_by(RemoteInstance.name).all() return render_template("dashboard_local.html", jobs=jobs, last_runs=last_runs, instances=instances) # --- CRUD Jobs ---------------------------------------------------------------- def _used_app_ids(exclude_job_id=None): """Retourne les app_id déjà couverts par un job ynh_app existant.""" q = Job.query.filter_by(type="ynh_app") if exclude_job_id: q = q.filter(Job.id != exclude_job_id) return {json.loads(j.config_json).get("app_id") for j in q.all() if j.config_json} @bp.route("/jobs/new", methods=["GET", "POST"]) def job_new(): if request.method == "POST": return _save_job(None) return render_template("job_form.html", job=None, ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids()), destinations=Destination.query.filter_by(enabled=True).all(), remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all()) @bp.route("/jobs//edit", methods=["GET", "POST"]) def job_edit(job_id): job = db.get_or_404(Job, job_id) if request.method == "POST": return _save_job(job) return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job_id)), destinations=Destination.query.filter_by(enabled=True).all(), remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all()) @bp.route("/jobs//delete", methods=["POST"]) def job_delete(job_id): job = db.get_or_404(Job, job_id) from scheduler import remove_job remove_job(job.id) db.session.delete(job) db.session.commit() flash(f"Job « {job.name} » supprimé.", "success") return redirect(url_for("jobs.index")) @bp.route("/jobs//run", methods=["POST"]) def job_run_now(job_id): job = db.get_or_404(Job, job_id) from scheduler import _execute_job app = current_app._get_current_object() threading.Thread(target=_execute_job, args=(job.id,), daemon=True).start() flash(f"Job « {job.name} » lancé manuellement.", "success") return redirect(url_for("jobs.index")) @bp.route("/jobs/bulk", methods=["POST"]) def jobs_bulk(): action = request.form.get("action") job_ids = [int(jid) for jid in request.form.getlist("job_ids") if jid.isdigit()] if not job_ids: return redirect(url_for("jobs.index")) from scheduler import schedule_job, remove_job, _execute_job if action == "run": for jid in job_ids: job = db.session.get(Job, jid) if job: threading.Thread(target=_execute_job, args=(jid,), daemon=True).start() flash(f"{len(job_ids)} job(s) lancé(s) en arrière-plan.", "info") elif action == "enable": for jid in job_ids: job = db.session.get(Job, jid) if job: job.enabled = True job.updated_at = datetime.utcnow() schedule_job(job) db.session.commit() flash(f"{len(job_ids)} job(s) activé(s).", "success") elif action == "disable": for jid in job_ids: job = db.session.get(Job, jid) if job: job.enabled = False job.updated_at = datetime.utcnow() remove_job(jid) db.session.commit() flash(f"{len(job_ids)} job(s) désactivé(s).", "info") elif action == "delete": names = [] for jid in job_ids: job = db.session.get(Job, jid) if job: names.append(job.name) remove_job(jid) db.session.delete(job) db.session.commit() flash(f"{len(names)} job(s) supprimé(s).", "success") return redirect(url_for("jobs.index")) @bp.route("/jobs//toggle", methods=["POST"]) def job_toggle(job_id): job = db.get_or_404(Job, job_id) from scheduler import schedule_job, remove_job job.enabled = not job.enabled job.updated_at = datetime.utcnow() db.session.commit() if job.enabled: schedule_job(job) flash(f"Job « {job.name} » activé.", "success") else: remove_job(job.id) flash(f"Job « {job.name} » désactivé.", "info") return redirect(url_for("jobs.index")) @bp.route("/jobs//history") def job_history(job_id): job = db.get_or_404(Job, job_id) runs = Run.query.filter_by(job_id=job_id).order_by(Run.started_at.desc()).limit(100).all() return render_template("job_history.html", job=job, runs=runs) # --- Navigateur d'archives ---------------------------------------------------- @bp.route("/archives") def archives(): from jobs.utils import batch_list_archives from db import _size_human, RemoteInstance backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"] # UN seul appel sudo find pour toutes les tailles + mtimes file_stats = batch_list_archives(backup_dir) sorted_names = sorted(file_stats, key=lambda n: file_stats[n]["mtime"], reverse=True) # Pré-charger runs et jobs en une passe DB (pas de subprocess) runs_by_archive = {} for run in Run.query.order_by(Run.started_at.desc()).all(): if run.archive_name and run.archive_name not in runs_by_archive: runs_by_archive[run.archive_name] = run jobs_by_id = {j.id: j for j in Job.query.all()} items = [] for name in sorted_names: size_bytes = file_stats[name]["size_bytes"] or None run = runs_by_archive.get(name) job = jobs_by_id.get(run.job_id) if run else None items.append({ "name": name, "type": job.type if job else "", "job_name": job.name if job else "—", "job_id": job.id if job else None, "last_status": run.status if run else None, "run_at": run.started_at if run else None, "size_bytes": size_bytes, "size_human": _size_human(size_bytes) if size_bytes else "—", }) instances = RemoteInstance.query.order_by(RemoteInstance.name).all() return render_template("archives.html", items=items, instances=instances) @bp.route("/archives//download") def archive_download(archive_name): import os, subprocess from flask import Response, stream_with_context backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"] archive_path = os.path.join(backup_dir, archive_name + ".tar") tmp_path = f"/tmp/backupmanager_webdl_{archive_name}.tar" try: r = subprocess.run(["sudo", "rsync", archive_path, tmp_path], capture_output=True, text=True, timeout=3600) if r.returncode != 0: flash(f"Téléchargement impossible : {r.stderr.strip()}", "error") return redirect(url_for("jobs.archives")) def _stream(): try: with open(tmp_path, "rb") as f: while True: chunk = f.read(1024 * 1024) if not chunk: break yield chunk finally: subprocess.run(["sudo", "rm", "-f", tmp_path], capture_output=True) return Response( stream_with_context(_stream()), mimetype="application/octet-stream", headers={"Content-Disposition": f'attachment; filename="{archive_name}.tar"'}, ) except Exception as exc: subprocess.run(["sudo", "rm", "-f", tmp_path], capture_output=True) flash(f"Erreur : {exc}", "error") return redirect(url_for("jobs.archives")) @bp.route("/archives//delete", methods=["POST"]) def archive_delete(archive_name): import os, subprocess backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"] tar_path = os.path.join(backup_dir, archive_name + ".tar") info_path = os.path.join(backup_dir, archive_name + ".info.json") subprocess.run(["sudo", "rm", "-f", tar_path, info_path], capture_output=True) flash(f"Archive « {archive_name} » supprimée.", "success") return redirect(url_for("jobs.archives")) # --- Restauration ------------------------------------------------------------- @bp.route("/archives//restore", methods=["GET", "POST"]) def archive_restore(archive_name): backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"] info = read_archive_info(archive_name, backup_dir) if request.method == "GET": return render_template("restore_confirm.html", archive_name=archive_name, info=info) _start_restore(archive_name) flash(f"Restauration de « {archive_name} » démarrée en arrière-plan.", "success") return redirect(url_for("jobs.index")) def _start_restore(archive_name): """Crée un Run de restauration et lance le thread. Retourne (restore_run_id, archive_type).""" backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"] info = read_archive_info(archive_name, backup_dir) archive_type = info.get("type", "") original_run = Run.query.filter_by(archive_name=archive_name).first() restore_run_id = None if original_run: restore_run = Run( job_id=original_run.job_id, started_at=datetime.utcnow(), status="running", archive_name=archive_name, log_text="[RESTAURATION en cours…]", ) db.session.add(restore_run) db.session.commit() restore_run_id = restore_run.id app = current_app._get_current_object() threading.Thread( target=_do_restore_job, args=(app, archive_name, archive_type, restore_run_id), daemon=True, ).start() return restore_run_id, archive_type def _do_restore_job(app, archive_name, archive_type, restore_run_id): with app.app_context(): run = db.session.get(Run, restore_run_id) if restore_run_id else None try: backup_dir = app.config["YUNOHOST_BACKUP_DIR"] if archive_type == "custom_dir": from jobs.custom_dir import restore_custom_dir log = restore_custom_dir(archive_name, backup_dir) elif archive_type in ("mysql", "postgresql"): from jobs.db_dump import restore_db_dump log = restore_db_dump(archive_name, backup_dir) elif archive_type == "ynh_app": result = subprocess.run( ["sudo", "yunohost", "backup", "restore", archive_name, "--apps", "--force"], capture_output=True, text=True, timeout=3600, ) log = (result.stdout + result.stderr).strip() if result.returncode != 0: raise RuntimeError(f"yunohost backup restore a échoué :\n{log}") elif archive_type == "ynh_system": result = subprocess.run( ["sudo", "yunohost", "backup", "restore", archive_name, "--system", "--force"], capture_output=True, text=True, timeout=3600, ) log = (result.stdout + result.stderr).strip() if result.returncode != 0: raise RuntimeError(f"yunohost backup restore a échoué :\n{log}") else: raise NotImplementedError( f"Restauration non supportée pour le type '{archive_type}'." ) if run: run.status = "success" run.finished_at = datetime.utcnow() run.log_text = f"[RESTAURATION]\n{log or 'OK'}" db.session.commit() except Exception as exc: app.logger.error(f"Restauration {archive_name} échouée : {exc}") if run: run.status = "error" run.finished_at = datetime.utcnow() run.log_text = f"[RESTAURATION]\n{exc}" db.session.commit() # --- Helper save job ---------------------------------------------------------- def _save_job(job): f = request.form job_type = f.get("type", "") name = f.get("name", "").strip() if not name: flash("Le nom est requis.", "error") return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job.id if job else None)), destinations=Destination.query.filter_by(enabled=True).all(), remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all()) cfg = {} if job_type == "ynh_app": cfg = {"app_id": f.get("app_id", ""), "core_only": f.get("core_only") == "1"} elif job_type == "ynh_system": cfg = {} elif job_type in ("mysql", "postgresql"): dbname = f.get("db_database", "").strip() if not dbname: flash("Le nom de la base de données est requis.", "error") return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job.id if job else None)), destinations=Destination.query.filter_by(enabled=True).all(), remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all()) cfg = {"database": dbname} elif job_type == "custom_dir": source_path = f.get("source_path", "").strip().rstrip("/") if not source_path or not source_path.startswith("/"): flash("Le chemin source doit être un chemin absolu (ex: /opt/monapp).", "error") return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job.id if job else None)), destinations=Destination.query.filter_by(enabled=True).all(), remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all()) excludes = [e.strip() for e in f.get("excludes", "").splitlines() if e.strip()] restore_cfg = {} user_name = f.get("restore_user_name", "").strip() if user_name: restore_cfg["system_user"] = { "name": user_name, "home": f.get("restore_user_home", source_path).strip() or source_path, "shell": f.get("restore_user_shell", "/bin/false").strip() or "/bin/false", } service_name = f.get("restore_service_name", "").strip() if service_name: restore_cfg["systemd_service"] = { "name": service_name, "service_file": f.get("restore_service_file", "").strip(), } owner = f.get("restore_perm_owner", "").strip() mode = f.get("restore_perm_mode", "").strip() if owner or mode: restore_cfg["permissions"] = {} if owner: restore_cfg["permissions"]["owner"] = owner if mode: restore_cfg["permissions"]["mode"] = mode post_cmds = [c.strip() for c in f.get("restore_post_cmds", "").splitlines() if c.strip()] if post_cmds: restore_cfg["post_restore_commands"] = post_cmds cfg = {"source_path": source_path, "excludes": excludes, "restore": restore_cfg} if job is None: job = Job() db.session.add(job) from scheduler import schedule_job, remove_job transfer_target = f.get("transfer_target", "").strip() job.name = name job.type = job_type job.config_json = json.dumps(cfg) cron_raw = (f.get("cron_expr") or "").strip() job.cron_expr = cron_raw # "" = manuel (NOT NULL compatible avec le schéma existant) job.retention_mode = f.get("retention_mode", "count") job.retention_value = int(f.get("retention_value", 7)) job.enabled = f.get("enabled") == "1" job.core_only = cfg.get("core_only", False) if transfer_target.startswith("dest:"): job.destination_id = int(transfer_target[5:]) job.remote_instance_id = None elif transfer_target.startswith("inst:"): job.destination_id = None job.remote_instance_id = int(transfer_target[5:]) else: job.destination_id = None job.remote_instance_id = None job.updated_at = datetime.utcnow() db.session.commit() if job.enabled: schedule_job(job) else: remove_job(job.id) flash(f"Job « {job.name} » enregistré.", "success") return redirect(url_for("jobs.index"))