| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452 |
- import json
- import subprocess
- import threading
- from datetime import datetime
- from flask import (
- Blueprint,
- current_app,
- flash,
- redirect,
- render_template,
- request,
- url_for,
- )
- from db import db, Job, Run, Destination, RemoteInstance
- from helpers import read_archive_info, get_ynh_apps
- bp = Blueprint("jobs", __name__)
- # --- Dashboard local ----------------------------------------------------------
- @bp.route("/")
- def index():
- from db import RemoteInstance
- jobs = Job.query.order_by(Job.name).all()
- last_runs = {
- j.id: Run.query.filter_by(job_id=j.id).order_by(Run.started_at.desc()).first()
- for j in jobs
- }
- instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
- return render_template("dashboard_local.html", jobs=jobs, last_runs=last_runs,
- instances=instances)
- # --- CRUD Jobs ----------------------------------------------------------------
- def _used_app_ids(exclude_job_id=None):
- """Retourne les app_id déjà couverts par un job ynh_app existant."""
- q = Job.query.filter_by(type="ynh_app")
- if exclude_job_id:
- q = q.filter(Job.id != exclude_job_id)
- return {json.loads(j.config_json).get("app_id") for j in q.all() if j.config_json}
- @bp.route("/jobs/new", methods=["GET", "POST"])
- def job_new():
- if request.method == "POST":
- return _save_job(None)
- return render_template("job_form.html", job=None,
- ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids()),
- destinations=Destination.query.filter_by(enabled=True).all(),
- remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all())
- @bp.route("/jobs/<int:job_id>/edit", methods=["GET", "POST"])
- def job_edit(job_id):
- job = db.get_or_404(Job, job_id)
- if request.method == "POST":
- return _save_job(job)
- return render_template("job_form.html", job=job,
- ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job_id)),
- destinations=Destination.query.filter_by(enabled=True).all(),
- remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all())
- @bp.route("/jobs/<int:job_id>/delete", methods=["POST"])
- def job_delete(job_id):
- job = db.get_or_404(Job, job_id)
- from scheduler import remove_job
- remove_job(job.id)
- db.session.delete(job)
- db.session.commit()
- flash(f"Job « {job.name} » supprimé.", "success")
- return redirect(url_for("jobs.index"))
- @bp.route("/jobs/<int:job_id>/run", methods=["POST"])
- def job_run_now(job_id):
- job = db.get_or_404(Job, job_id)
- from scheduler import _execute_job
- app = current_app._get_current_object()
- threading.Thread(target=_execute_job, args=(job.id,), daemon=True).start()
- flash(f"Job « {job.name} » lancé manuellement.", "success")
- return redirect(url_for("jobs.index"))
- @bp.route("/jobs/bulk", methods=["POST"])
- def jobs_bulk():
- action = request.form.get("action")
- job_ids = [int(jid) for jid in request.form.getlist("job_ids") if jid.isdigit()]
- if not job_ids:
- return redirect(url_for("jobs.index"))
- from scheduler import schedule_job, remove_job, _execute_job
- if action == "run":
- for jid in job_ids:
- job = db.session.get(Job, jid)
- if job:
- threading.Thread(target=_execute_job, args=(jid,), daemon=True).start()
- flash(f"{len(job_ids)} job(s) lancé(s) en arrière-plan.", "info")
- elif action == "enable":
- for jid in job_ids:
- job = db.session.get(Job, jid)
- if job:
- job.enabled = True
- job.updated_at = datetime.utcnow()
- schedule_job(job)
- db.session.commit()
- flash(f"{len(job_ids)} job(s) activé(s).", "success")
- elif action == "disable":
- for jid in job_ids:
- job = db.session.get(Job, jid)
- if job:
- job.enabled = False
- job.updated_at = datetime.utcnow()
- remove_job(jid)
- db.session.commit()
- flash(f"{len(job_ids)} job(s) désactivé(s).", "info")
- elif action == "delete":
- names = []
- for jid in job_ids:
- job = db.session.get(Job, jid)
- if job:
- names.append(job.name)
- remove_job(jid)
- db.session.delete(job)
- db.session.commit()
- flash(f"{len(names)} job(s) supprimé(s).", "success")
- return redirect(url_for("jobs.index"))
- @bp.route("/jobs/<int:job_id>/toggle", methods=["POST"])
- def job_toggle(job_id):
- job = db.get_or_404(Job, job_id)
- from scheduler import schedule_job, remove_job
- job.enabled = not job.enabled
- job.updated_at = datetime.utcnow()
- db.session.commit()
- if job.enabled:
- schedule_job(job)
- flash(f"Job « {job.name} » activé.", "success")
- else:
- remove_job(job.id)
- flash(f"Job « {job.name} » désactivé.", "info")
- return redirect(url_for("jobs.index"))
- @bp.route("/jobs/<int:job_id>/history")
- def job_history(job_id):
- job = db.get_or_404(Job, job_id)
- runs = Run.query.filter_by(job_id=job_id).order_by(Run.started_at.desc()).limit(100).all()
- return render_template("job_history.html", job=job, runs=runs)
- # --- Navigateur d'archives ----------------------------------------------------
- @bp.route("/archives")
- def archives():
- from jobs.utils import batch_list_archives
- from db import _size_human, RemoteInstance
- backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
- # UN seul appel sudo find pour toutes les tailles + mtimes
- file_stats = batch_list_archives(backup_dir)
- sorted_names = sorted(file_stats, key=lambda n: file_stats[n]["mtime"], reverse=True)
- # Pré-charger runs et jobs en une passe DB (pas de subprocess)
- runs_by_archive = {}
- for run in Run.query.order_by(Run.started_at.desc()).all():
- if run.archive_name and run.archive_name not in runs_by_archive:
- runs_by_archive[run.archive_name] = run
- jobs_by_id = {j.id: j for j in Job.query.all()}
- items = []
- for name in sorted_names:
- size_bytes = file_stats[name]["size_bytes"] or None
- run = runs_by_archive.get(name)
- job = jobs_by_id.get(run.job_id) if run else None
- items.append({
- "name": name,
- "type": job.type if job else "",
- "job_name": job.name if job else "—",
- "job_id": job.id if job else None,
- "last_status": run.status if run else None,
- "run_at": run.started_at if run else None,
- "size_bytes": size_bytes,
- "size_human": _size_human(size_bytes) if size_bytes else "—",
- })
- instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
- return render_template("archives.html", items=items, instances=instances)
- @bp.route("/archives/<path:archive_name>/download")
- def archive_download(archive_name):
- import os, subprocess
- from flask import Response, stream_with_context
- backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
- archive_path = os.path.join(backup_dir, archive_name + ".tar")
- tmp_path = f"/tmp/backupmanager_webdl_{archive_name}.tar"
- try:
- r = subprocess.run(["sudo", "rsync", archive_path, tmp_path],
- capture_output=True, text=True, timeout=3600)
- if r.returncode != 0:
- flash(f"Téléchargement impossible : {r.stderr.strip()}", "error")
- return redirect(url_for("jobs.archives"))
- def _stream():
- try:
- with open(tmp_path, "rb") as f:
- while True:
- chunk = f.read(1024 * 1024)
- if not chunk:
- break
- yield chunk
- finally:
- subprocess.run(["sudo", "rm", "-rf", tmp_path], capture_output=True)
- return Response(
- stream_with_context(_stream()),
- mimetype="application/octet-stream",
- headers={"Content-Disposition": f'attachment; filename="{archive_name}.tar"'},
- )
- except Exception as exc:
- subprocess.run(["sudo", "rm", "-f", tmp_path], capture_output=True)
- flash(f"Erreur : {exc}", "error")
- return redirect(url_for("jobs.archives"))
- @bp.route("/archives/<path:archive_name>/delete", methods=["POST"])
- def archive_delete(archive_name):
- from jobs.utils import sudo_rm_archive
- backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
- sudo_rm_archive(archive_name, backup_dir)
- flash(f"Archive « {archive_name} » supprimée.", "success")
- return redirect(url_for("jobs.archives"))
- # --- Restauration -------------------------------------------------------------
- @bp.route("/archives/<path:archive_name>/restore", methods=["GET", "POST"])
- def archive_restore(archive_name):
- backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
- info = read_archive_info(archive_name, backup_dir)
- if request.method == "GET":
- return render_template("restore_confirm.html", archive_name=archive_name, info=info)
- _start_restore(archive_name)
- flash(f"Restauration de « {archive_name} » démarrée en arrière-plan.", "success")
- return redirect(url_for("jobs.index"))
- def _start_restore(archive_name):
- """Crée un Run de restauration et lance le thread. Retourne (restore_run_id, archive_type)."""
- backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
- info = read_archive_info(archive_name, backup_dir)
- archive_type = info.get("type", "")
- original_run = Run.query.filter_by(archive_name=archive_name).first()
- restore_run_id = None
- if original_run:
- restore_run = Run(
- job_id=original_run.job_id,
- started_at=datetime.utcnow(),
- status="running",
- archive_name=archive_name,
- log_text="[RESTAURATION en cours…]",
- )
- db.session.add(restore_run)
- db.session.commit()
- restore_run_id = restore_run.id
- app = current_app._get_current_object()
- threading.Thread(
- target=_do_restore_job,
- args=(app, archive_name, archive_type, restore_run_id),
- daemon=True,
- ).start()
- return restore_run_id, archive_type
- def _do_restore_job(app, archive_name, archive_type, restore_run_id):
- with app.app_context():
- run = db.session.get(Run, restore_run_id) if restore_run_id else None
- try:
- backup_dir = app.config["YUNOHOST_BACKUP_DIR"]
- if archive_type == "custom_dir":
- from jobs.custom_dir import restore_custom_dir
- log = restore_custom_dir(archive_name, backup_dir)
- elif archive_type in ("mysql", "postgresql"):
- from jobs.db_dump import restore_db_dump
- log = restore_db_dump(archive_name, backup_dir)
- elif archive_type == "ynh_app":
- result = subprocess.run(
- ["sudo", "yunohost", "backup", "restore", archive_name,
- "--apps", "--force"],
- capture_output=True, text=True, timeout=3600,
- )
- log = (result.stdout + result.stderr).strip()
- if result.returncode != 0:
- raise RuntimeError(f"yunohost backup restore a échoué :\n{log}")
- elif archive_type == "ynh_system":
- result = subprocess.run(
- ["sudo", "yunohost", "backup", "restore", archive_name,
- "--system", "--force"],
- capture_output=True, text=True, timeout=3600,
- )
- log = (result.stdout + result.stderr).strip()
- if result.returncode != 0:
- raise RuntimeError(f"yunohost backup restore a échoué :\n{log}")
- else:
- raise NotImplementedError(
- f"Restauration non supportée pour le type '{archive_type}'."
- )
- if run:
- run.status = "success"
- run.finished_at = datetime.utcnow()
- run.log_text = f"[RESTAURATION]\n{log or 'OK'}"
- db.session.commit()
- except Exception as exc:
- app.logger.error(f"Restauration {archive_name} échouée : {exc}")
- if run:
- run.status = "error"
- run.finished_at = datetime.utcnow()
- run.log_text = f"[RESTAURATION]\n{exc}"
- db.session.commit()
- # --- Helper save job ----------------------------------------------------------
- def _save_job(job):
- f = request.form
- job_type = f.get("type", "")
- name = f.get("name", "").strip()
- if not name:
- flash("Le nom est requis.", "error")
- return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job.id if job else None)),
- destinations=Destination.query.filter_by(enabled=True).all(),
- remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all())
- cfg = {}
- if job_type == "ynh_app":
- cfg = {"app_id": f.get("app_id", ""), "core_only": f.get("core_only") == "1"}
- elif job_type == "ynh_system":
- selected = f.getlist("system_hooks")
- all_hooks = {"conf_ynh_settings", "conf_ynh_firewall", "conf_ssowat", "conf_nginx",
- "conf_ynh_certs", "conf_ynh_domain", "conf_ynh_user", "data_home", "data_mail"}
- if not selected:
- flash("Sélectionnez au moins un hook système.", "error")
- return render_template("job_form.html", job=job,
- ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job.id if job else None)),
- destinations=Destination.query.filter_by(enabled=True).all(),
- remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all())
- cfg = {"hooks": [] if set(selected) >= all_hooks else sorted(selected)}
- elif job_type in ("mysql", "postgresql"):
- dbname = f.get("db_database", "").strip()
- if not dbname:
- flash("Le nom de la base de données est requis.", "error")
- return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job.id if job else None)),
- destinations=Destination.query.filter_by(enabled=True).all(),
- remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all())
- cfg = {"database": dbname}
- elif job_type == "custom_dir":
- source_path = f.get("source_path", "").strip().rstrip("/")
- if not source_path or not source_path.startswith("/"):
- flash("Le chemin source doit être un chemin absolu (ex: /opt/monapp).", "error")
- return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job.id if job else None)),
- destinations=Destination.query.filter_by(enabled=True).all(),
- remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all())
- excludes = [e.strip() for e in f.get("excludes", "").splitlines() if e.strip()]
- restore_cfg = {}
- user_name = f.get("restore_user_name", "").strip()
- if user_name:
- restore_cfg["system_user"] = {
- "name": user_name,
- "home": f.get("restore_user_home", source_path).strip() or source_path,
- "shell": f.get("restore_user_shell", "/bin/false").strip() or "/bin/false",
- }
- service_name = f.get("restore_service_name", "").strip()
- if service_name:
- restore_cfg["systemd_service"] = {
- "name": service_name,
- "service_file": f.get("restore_service_file", "").strip(),
- }
- owner = f.get("restore_perm_owner", "").strip()
- mode = f.get("restore_perm_mode", "").strip()
- if owner or mode:
- restore_cfg["permissions"] = {}
- if owner:
- restore_cfg["permissions"]["owner"] = owner
- if mode:
- restore_cfg["permissions"]["mode"] = mode
- post_cmds = [c.strip() for c in f.get("restore_post_cmds", "").splitlines() if c.strip()]
- if post_cmds:
- restore_cfg["post_restore_commands"] = post_cmds
- cfg = {"source_path": source_path, "excludes": excludes, "restore": restore_cfg}
- if job is None:
- job = Job()
- db.session.add(job)
- from scheduler import schedule_job, remove_job
- transfer_target = f.get("transfer_target", "").strip()
- retention_mode = f.get("retention_mode", "count")
- if retention_mode == "gfs":
- gfs_cfg = {
- "daily": max(1, int(f.get("gfs_daily", 7))),
- "weekly": max(1, int(f.get("gfs_weekly", 4))),
- "monthly": max(1, int(f.get("gfs_monthly", 12))),
- }
- retention_value = 0
- gfs_config_json = json.dumps(gfs_cfg)
- else:
- gfs_config_json = None
- retention_value = int(f.get("retention_value", 2))
- job.name = name
- job.type = job_type
- job.config_json = json.dumps(cfg)
- cron_raw = (f.get("cron_expr") or "").strip()
- job.cron_expr = cron_raw # "" = manuel (NOT NULL compatible avec le schéma existant)
- job.retention_mode = retention_mode
- job.retention_value = retention_value
- job.retention_gfs_config = gfs_config_json
- job.enabled = f.get("enabled") == "1"
- job.core_only = cfg.get("core_only", False)
- if transfer_target.startswith("dest:"):
- job.destination_id = int(transfer_target[5:])
- job.remote_instance_id = None
- elif transfer_target.startswith("inst:"):
- job.destination_id = None
- job.remote_instance_id = int(transfer_target[5:])
- else:
- job.destination_id = None
- job.remote_instance_id = None
- job.updated_at = datetime.utcnow()
- db.session.commit()
- if job.enabled:
- schedule_job(job)
- else:
- remove_job(job.id)
- flash(f"Job « {job.name} » enregistré.", "success")
- return redirect(url_for("jobs.index"))
|