jobs.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. import json
  2. import subprocess
  3. import threading
  4. from datetime import datetime
  5. from flask import (
  6. Blueprint,
  7. current_app,
  8. flash,
  9. redirect,
  10. render_template,
  11. request,
  12. url_for,
  13. )
  14. from db import db, Job, Run, Destination
  15. from helpers import read_archive_info, get_ynh_apps
  16. bp = Blueprint("jobs", __name__)
  17. # --- Dashboard local ----------------------------------------------------------
  18. @bp.route("/")
  19. def index():
  20. from db import RemoteInstance
  21. jobs = Job.query.order_by(Job.name).all()
  22. last_runs = {
  23. j.id: Run.query.filter_by(job_id=j.id).order_by(Run.started_at.desc()).first()
  24. for j in jobs
  25. }
  26. instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
  27. return render_template("dashboard_local.html", jobs=jobs, last_runs=last_runs,
  28. instances=instances)
  29. # --- CRUD Jobs ----------------------------------------------------------------
  30. @bp.route("/jobs/new", methods=["GET", "POST"])
  31. def job_new():
  32. if request.method == "POST":
  33. return _save_job(None)
  34. return render_template("job_form.html", job=None, ynh_apps=get_ynh_apps(),
  35. destinations=Destination.query.filter_by(enabled=True).all())
  36. @bp.route("/jobs/<int:job_id>/edit", methods=["GET", "POST"])
  37. def job_edit(job_id):
  38. job = db.get_or_404(Job, job_id)
  39. if request.method == "POST":
  40. return _save_job(job)
  41. return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(),
  42. destinations=Destination.query.filter_by(enabled=True).all())
  43. @bp.route("/jobs/<int:job_id>/delete", methods=["POST"])
  44. def job_delete(job_id):
  45. job = db.get_or_404(Job, job_id)
  46. from scheduler import remove_job
  47. remove_job(job.id)
  48. db.session.delete(job)
  49. db.session.commit()
  50. flash(f"Job « {job.name} » supprimé.", "success")
  51. return redirect(url_for("jobs.index"))
  52. @bp.route("/jobs/<int:job_id>/run", methods=["POST"])
  53. def job_run_now(job_id):
  54. job = db.get_or_404(Job, job_id)
  55. from scheduler import _execute_job
  56. app = current_app._get_current_object()
  57. threading.Thread(target=_execute_job, args=(job.id,), daemon=True).start()
  58. flash(f"Job « {job.name} » lancé manuellement.", "success")
  59. return redirect(url_for("jobs.index"))
  60. @bp.route("/jobs/<int:job_id>/toggle", methods=["POST"])
  61. def job_toggle(job_id):
  62. job = db.get_or_404(Job, job_id)
  63. from scheduler import schedule_job, remove_job
  64. job.enabled = not job.enabled
  65. job.updated_at = datetime.utcnow()
  66. db.session.commit()
  67. if job.enabled:
  68. schedule_job(job)
  69. flash(f"Job « {job.name} » activé.", "success")
  70. else:
  71. remove_job(job.id)
  72. flash(f"Job « {job.name} » désactivé.", "info")
  73. return redirect(url_for("jobs.index"))
  74. @bp.route("/jobs/<int:job_id>/history")
  75. def job_history(job_id):
  76. job = db.get_or_404(Job, job_id)
  77. runs = Run.query.filter_by(job_id=job_id).order_by(Run.started_at.desc()).limit(100).all()
  78. return render_template("job_history.html", job=job, runs=runs)
  79. # --- Navigateur d'archives ----------------------------------------------------
  80. @bp.route("/archives")
  81. def archives():
  82. import os
  83. from jobs.utils import sudo_listdir, sudo_getsize, sudo_getmtime
  84. from db import _size_human, RemoteInstance
  85. backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
  86. all_files = sudo_listdir(backup_dir)
  87. tar_names = [f[:-4] for f in all_files if f.endswith(".tar")]
  88. # Tri par mtime décroissant
  89. tar_names.sort(
  90. key=lambda n: sudo_getmtime(os.path.join(backup_dir, n + ".tar")),
  91. reverse=True,
  92. )
  93. items = []
  94. for name in tar_names:
  95. tar_path = os.path.join(backup_dir, name + ".tar")
  96. size_bytes = sudo_getsize(tar_path) or None
  97. run = Run.query.filter_by(archive_name=name).order_by(Run.started_at.desc()).first()
  98. job = db.session.get(Job, run.job_id) if run else None
  99. info = read_archive_info(name, backup_dir)
  100. arch_type = info.get("type") or (job.type if job else "")
  101. items.append({
  102. "name": name,
  103. "type": arch_type,
  104. "job_name": job.name if job else "—",
  105. "job_id": job.id if job else None,
  106. "last_status": run.status if run else None,
  107. "run_at": run.started_at if run else None,
  108. "size_bytes": size_bytes,
  109. "size_human": _size_human(size_bytes) if size_bytes else "—",
  110. })
  111. instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
  112. return render_template("archives.html", items=items, instances=instances)
  113. @bp.route("/archives/<path:archive_name>/download")
  114. def archive_download(archive_name):
  115. import os, subprocess
  116. from flask import Response, stream_with_context
  117. backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
  118. archive_path = os.path.join(backup_dir, archive_name + ".tar")
  119. tmp_path = f"/tmp/backupmanager_webdl_{archive_name}.tar"
  120. try:
  121. r = subprocess.run(["sudo", "rsync", archive_path, tmp_path],
  122. capture_output=True, text=True, timeout=3600)
  123. if r.returncode != 0:
  124. flash(f"Téléchargement impossible : {r.stderr.strip()}", "error")
  125. return redirect(url_for("jobs.archives"))
  126. def _stream():
  127. try:
  128. with open(tmp_path, "rb") as f:
  129. while True:
  130. chunk = f.read(1024 * 1024)
  131. if not chunk:
  132. break
  133. yield chunk
  134. finally:
  135. subprocess.run(["sudo", "rm", "-f", tmp_path], capture_output=True)
  136. return Response(
  137. stream_with_context(_stream()),
  138. mimetype="application/octet-stream",
  139. headers={"Content-Disposition": f'attachment; filename="{archive_name}.tar"'},
  140. )
  141. except Exception as exc:
  142. subprocess.run(["sudo", "rm", "-f", tmp_path], capture_output=True)
  143. flash(f"Erreur : {exc}", "error")
  144. return redirect(url_for("jobs.archives"))
  145. @bp.route("/archives/<path:archive_name>/delete", methods=["POST"])
  146. def archive_delete(archive_name):
  147. import os, subprocess
  148. backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
  149. tar_path = os.path.join(backup_dir, archive_name + ".tar")
  150. info_path = os.path.join(backup_dir, archive_name + ".info.json")
  151. subprocess.run(["sudo", "rm", "-f", tar_path, info_path], capture_output=True)
  152. flash(f"Archive « {archive_name} » supprimée.", "success")
  153. return redirect(url_for("jobs.archives"))
  154. # --- Restauration -------------------------------------------------------------
  155. @bp.route("/archives/<path:archive_name>/restore", methods=["GET", "POST"])
  156. def archive_restore(archive_name):
  157. backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
  158. info = read_archive_info(archive_name, backup_dir)
  159. if request.method == "GET":
  160. return render_template("restore_confirm.html", archive_name=archive_name, info=info)
  161. _start_restore(archive_name)
  162. flash(f"Restauration de « {archive_name} » démarrée en arrière-plan.", "success")
  163. return redirect(url_for("jobs.index"))
  164. def _start_restore(archive_name):
  165. """Crée un Run de restauration et lance le thread. Retourne (restore_run_id, archive_type)."""
  166. backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
  167. info = read_archive_info(archive_name, backup_dir)
  168. archive_type = info.get("type", "")
  169. original_run = Run.query.filter_by(archive_name=archive_name).first()
  170. restore_run_id = None
  171. if original_run:
  172. restore_run = Run(
  173. job_id=original_run.job_id,
  174. started_at=datetime.utcnow(),
  175. status="running",
  176. archive_name=archive_name,
  177. log_text="[RESTAURATION en cours…]",
  178. )
  179. db.session.add(restore_run)
  180. db.session.commit()
  181. restore_run_id = restore_run.id
  182. app = current_app._get_current_object()
  183. threading.Thread(
  184. target=_do_restore_job,
  185. args=(app, archive_name, archive_type, restore_run_id),
  186. daemon=True,
  187. ).start()
  188. return restore_run_id, archive_type
  189. def _do_restore_job(app, archive_name, archive_type, restore_run_id):
  190. with app.app_context():
  191. run = db.session.get(Run, restore_run_id) if restore_run_id else None
  192. try:
  193. backup_dir = app.config["YUNOHOST_BACKUP_DIR"]
  194. if archive_type == "custom_dir":
  195. from jobs.custom_dir import restore_custom_dir
  196. log = restore_custom_dir(archive_name, backup_dir)
  197. elif archive_type in ("mysql", "postgresql"):
  198. from jobs.db_dump import restore_db_dump
  199. log = restore_db_dump(archive_name, backup_dir)
  200. elif archive_type == "ynh_app":
  201. result = subprocess.run(
  202. ["sudo", "yunohost", "backup", "restore", archive_name,
  203. "--apps", "--force"],
  204. capture_output=True, text=True, timeout=3600,
  205. )
  206. log = (result.stdout + result.stderr).strip()
  207. if result.returncode != 0:
  208. raise RuntimeError(f"yunohost backup restore a échoué :\n{log}")
  209. elif archive_type == "ynh_system":
  210. result = subprocess.run(
  211. ["sudo", "yunohost", "backup", "restore", archive_name,
  212. "--system", "--force"],
  213. capture_output=True, text=True, timeout=3600,
  214. )
  215. log = (result.stdout + result.stderr).strip()
  216. if result.returncode != 0:
  217. raise RuntimeError(f"yunohost backup restore a échoué :\n{log}")
  218. else:
  219. raise NotImplementedError(
  220. f"Restauration non supportée pour le type '{archive_type}'."
  221. )
  222. if run:
  223. run.status = "success"
  224. run.finished_at = datetime.utcnow()
  225. run.log_text = f"[RESTAURATION]\n{log or 'OK'}"
  226. db.session.commit()
  227. except Exception as exc:
  228. app.logger.error(f"Restauration {archive_name} échouée : {exc}")
  229. if run:
  230. run.status = "error"
  231. run.finished_at = datetime.utcnow()
  232. run.log_text = f"[RESTAURATION]\n{exc}"
  233. db.session.commit()
  234. # --- Helper save job ----------------------------------------------------------
  235. def _save_job(job):
  236. f = request.form
  237. job_type = f.get("type", "")
  238. name = f.get("name", "").strip()
  239. if not name:
  240. flash("Le nom est requis.", "error")
  241. return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(),
  242. destinations=Destination.query.filter_by(enabled=True).all())
  243. cfg = {}
  244. if job_type == "ynh_app":
  245. cfg = {"app_id": f.get("app_id", ""), "core_only": f.get("core_only") == "1"}
  246. elif job_type == "ynh_system":
  247. cfg = {}
  248. elif job_type in ("mysql", "postgresql"):
  249. dbname = f.get("db_database", "").strip()
  250. if not dbname:
  251. flash("Le nom de la base de données est requis.", "error")
  252. return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(),
  253. destinations=Destination.query.filter_by(enabled=True).all())
  254. cfg = {"database": dbname}
  255. elif job_type == "custom_dir":
  256. source_path = f.get("source_path", "").strip().rstrip("/")
  257. if not source_path or not source_path.startswith("/"):
  258. flash("Le chemin source doit être un chemin absolu (ex: /opt/monapp).", "error")
  259. return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(),
  260. destinations=Destination.query.filter_by(enabled=True).all())
  261. excludes = [e.strip() for e in f.get("excludes", "").splitlines() if e.strip()]
  262. restore_cfg = {}
  263. user_name = f.get("restore_user_name", "").strip()
  264. if user_name:
  265. restore_cfg["system_user"] = {
  266. "name": user_name,
  267. "home": f.get("restore_user_home", source_path).strip() or source_path,
  268. "shell": f.get("restore_user_shell", "/bin/false").strip() or "/bin/false",
  269. }
  270. service_name = f.get("restore_service_name", "").strip()
  271. if service_name:
  272. restore_cfg["systemd_service"] = {
  273. "name": service_name,
  274. "service_file": f.get("restore_service_file", "").strip(),
  275. }
  276. owner = f.get("restore_perm_owner", "").strip()
  277. mode = f.get("restore_perm_mode", "").strip()
  278. if owner or mode:
  279. restore_cfg["permissions"] = {}
  280. if owner:
  281. restore_cfg["permissions"]["owner"] = owner
  282. if mode:
  283. restore_cfg["permissions"]["mode"] = mode
  284. post_cmds = [c.strip() for c in f.get("restore_post_cmds", "").splitlines() if c.strip()]
  285. if post_cmds:
  286. restore_cfg["post_restore_commands"] = post_cmds
  287. cfg = {"source_path": source_path, "excludes": excludes, "restore": restore_cfg}
  288. if job is None:
  289. job = Job()
  290. db.session.add(job)
  291. from scheduler import schedule_job, remove_job
  292. dest_id = f.get("destination_id", "").strip()
  293. job.name = name
  294. job.type = job_type
  295. job.config_json = json.dumps(cfg)
  296. job.cron_expr = f.get("cron_expr", "0 3 * * *").strip()
  297. job.retention_mode = f.get("retention_mode", "count")
  298. job.retention_value = int(f.get("retention_value", 7))
  299. job.enabled = f.get("enabled") == "1"
  300. job.core_only = cfg.get("core_only", False)
  301. job.destination_id = int(dest_id) if dest_id else None
  302. job.updated_at = datetime.utcnow()
  303. db.session.commit()
  304. if job.enabled:
  305. schedule_job(job)
  306. else:
  307. remove_job(job.id)
  308. flash(f"Job « {job.name} » enregistré.", "success")
  309. return redirect(url_for("jobs.index"))