jobs.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. import json
  2. import subprocess
  3. import threading
  4. from datetime import datetime
  5. from flask import (
  6. Blueprint,
  7. current_app,
  8. flash,
  9. redirect,
  10. render_template,
  11. request,
  12. url_for,
  13. )
  14. from db import db, Job, Run, Destination, RemoteInstance
  15. from helpers import read_archive_info, get_ynh_apps
  16. bp = Blueprint("jobs", __name__)
  17. # --- Dashboard local ----------------------------------------------------------
  18. @bp.route("/")
  19. def index():
  20. from db import RemoteInstance
  21. jobs = Job.query.order_by(Job.name).all()
  22. last_runs = {
  23. j.id: Run.query.filter_by(job_id=j.id).order_by(Run.started_at.desc()).first()
  24. for j in jobs
  25. }
  26. instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
  27. return render_template("dashboard_local.html", jobs=jobs, last_runs=last_runs,
  28. instances=instances)
  29. # --- CRUD Jobs ----------------------------------------------------------------
  30. def _used_app_ids(exclude_job_id=None):
  31. """Retourne les app_id déjà couverts par un job ynh_app existant."""
  32. q = Job.query.filter_by(type="ynh_app")
  33. if exclude_job_id:
  34. q = q.filter(Job.id != exclude_job_id)
  35. return {json.loads(j.config_json).get("app_id") for j in q.all() if j.config_json}
  36. @bp.route("/jobs/new", methods=["GET", "POST"])
  37. def job_new():
  38. if request.method == "POST":
  39. return _save_job(None)
  40. return render_template("job_form.html", job=None,
  41. ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids()),
  42. destinations=Destination.query.filter_by(enabled=True).all(),
  43. remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all())
  44. @bp.route("/jobs/<int:job_id>/edit", methods=["GET", "POST"])
  45. def job_edit(job_id):
  46. job = db.get_or_404(Job, job_id)
  47. if request.method == "POST":
  48. return _save_job(job)
  49. return render_template("job_form.html", job=job,
  50. ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job_id)),
  51. destinations=Destination.query.filter_by(enabled=True).all(),
  52. remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all())
  53. @bp.route("/jobs/<int:job_id>/delete", methods=["POST"])
  54. def job_delete(job_id):
  55. job = db.get_or_404(Job, job_id)
  56. from scheduler import remove_job
  57. remove_job(job.id)
  58. db.session.delete(job)
  59. db.session.commit()
  60. flash(f"Job « {job.name} » supprimé.", "success")
  61. return redirect(url_for("jobs.index"))
  62. @bp.route("/jobs/<int:job_id>/run", methods=["POST"])
  63. def job_run_now(job_id):
  64. job = db.get_or_404(Job, job_id)
  65. from scheduler import _execute_job
  66. app = current_app._get_current_object()
  67. threading.Thread(target=_execute_job, args=(job.id,), daemon=True).start()
  68. flash(f"Job « {job.name} » lancé manuellement.", "success")
  69. return redirect(url_for("jobs.index"))
  70. @bp.route("/jobs/bulk", methods=["POST"])
  71. def jobs_bulk():
  72. action = request.form.get("action")
  73. job_ids = [int(jid) for jid in request.form.getlist("job_ids") if jid.isdigit()]
  74. if not job_ids:
  75. return redirect(url_for("jobs.index"))
  76. from scheduler import schedule_job, remove_job, _execute_job
  77. if action == "run":
  78. for jid in job_ids:
  79. job = db.session.get(Job, jid)
  80. if job:
  81. threading.Thread(target=_execute_job, args=(jid,), daemon=True).start()
  82. flash(f"{len(job_ids)} job(s) lancé(s) en arrière-plan.", "info")
  83. elif action == "enable":
  84. for jid in job_ids:
  85. job = db.session.get(Job, jid)
  86. if job:
  87. job.enabled = True
  88. job.updated_at = datetime.utcnow()
  89. schedule_job(job)
  90. db.session.commit()
  91. flash(f"{len(job_ids)} job(s) activé(s).", "success")
  92. elif action == "disable":
  93. for jid in job_ids:
  94. job = db.session.get(Job, jid)
  95. if job:
  96. job.enabled = False
  97. job.updated_at = datetime.utcnow()
  98. remove_job(jid)
  99. db.session.commit()
  100. flash(f"{len(job_ids)} job(s) désactivé(s).", "info")
  101. elif action == "delete":
  102. names = []
  103. for jid in job_ids:
  104. job = db.session.get(Job, jid)
  105. if job:
  106. names.append(job.name)
  107. remove_job(jid)
  108. db.session.delete(job)
  109. db.session.commit()
  110. flash(f"{len(names)} job(s) supprimé(s).", "success")
  111. return redirect(url_for("jobs.index"))
  112. @bp.route("/jobs/<int:job_id>/toggle", methods=["POST"])
  113. def job_toggle(job_id):
  114. job = db.get_or_404(Job, job_id)
  115. from scheduler import schedule_job, remove_job
  116. job.enabled = not job.enabled
  117. job.updated_at = datetime.utcnow()
  118. db.session.commit()
  119. if job.enabled:
  120. schedule_job(job)
  121. flash(f"Job « {job.name} » activé.", "success")
  122. else:
  123. remove_job(job.id)
  124. flash(f"Job « {job.name} » désactivé.", "info")
  125. return redirect(url_for("jobs.index"))
  126. @bp.route("/jobs/<int:job_id>/history")
  127. def job_history(job_id):
  128. job = db.get_or_404(Job, job_id)
  129. runs = Run.query.filter_by(job_id=job_id).order_by(Run.started_at.desc()).limit(100).all()
  130. return render_template("job_history.html", job=job, runs=runs)
  131. # --- Navigateur d'archives ----------------------------------------------------
  132. @bp.route("/archives")
  133. def archives():
  134. from jobs.utils import batch_list_archives
  135. from db import _size_human, RemoteInstance
  136. backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
  137. # UN seul appel sudo find pour toutes les tailles + mtimes
  138. file_stats = batch_list_archives(backup_dir)
  139. sorted_names = sorted(file_stats, key=lambda n: file_stats[n]["mtime"], reverse=True)
  140. # Pré-charger runs et jobs en une passe DB (pas de subprocess)
  141. runs_by_archive = {}
  142. for run in Run.query.order_by(Run.started_at.desc()).all():
  143. if run.archive_name and run.archive_name not in runs_by_archive:
  144. runs_by_archive[run.archive_name] = run
  145. jobs_by_id = {j.id: j for j in Job.query.all()}
  146. items = []
  147. for name in sorted_names:
  148. size_bytes = file_stats[name]["size_bytes"] or None
  149. run = runs_by_archive.get(name)
  150. job = jobs_by_id.get(run.job_id) if run else None
  151. items.append({
  152. "name": name,
  153. "type": job.type if job else "",
  154. "job_name": job.name if job else "—",
  155. "job_id": job.id if job else None,
  156. "last_status": run.status if run else None,
  157. "run_at": run.started_at if run else None,
  158. "size_bytes": size_bytes,
  159. "size_human": _size_human(size_bytes) if size_bytes else "—",
  160. })
  161. instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
  162. return render_template("archives.html", items=items, instances=instances)
  163. @bp.route("/archives/<path:archive_name>/download")
  164. def archive_download(archive_name):
  165. import os, subprocess
  166. from flask import Response, stream_with_context
  167. backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
  168. archive_path = os.path.join(backup_dir, archive_name + ".tar")
  169. tmp_path = f"/tmp/backupmanager_webdl_{archive_name}.tar"
  170. try:
  171. r = subprocess.run(["sudo", "rsync", archive_path, tmp_path],
  172. capture_output=True, text=True, timeout=3600)
  173. if r.returncode != 0:
  174. flash(f"Téléchargement impossible : {r.stderr.strip()}", "error")
  175. return redirect(url_for("jobs.archives"))
  176. def _stream():
  177. try:
  178. with open(tmp_path, "rb") as f:
  179. while True:
  180. chunk = f.read(1024 * 1024)
  181. if not chunk:
  182. break
  183. yield chunk
  184. finally:
  185. subprocess.run(["sudo", "rm", "-f", tmp_path], capture_output=True)
  186. return Response(
  187. stream_with_context(_stream()),
  188. mimetype="application/octet-stream",
  189. headers={"Content-Disposition": f'attachment; filename="{archive_name}.tar"'},
  190. )
  191. except Exception as exc:
  192. subprocess.run(["sudo", "rm", "-f", tmp_path], capture_output=True)
  193. flash(f"Erreur : {exc}", "error")
  194. return redirect(url_for("jobs.archives"))
  195. @bp.route("/archives/<path:archive_name>/delete", methods=["POST"])
  196. def archive_delete(archive_name):
  197. import os, subprocess
  198. backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
  199. tar_path = os.path.join(backup_dir, archive_name + ".tar")
  200. info_path = os.path.join(backup_dir, archive_name + ".info.json")
  201. subprocess.run(["sudo", "rm", "-f", tar_path, info_path], capture_output=True)
  202. flash(f"Archive « {archive_name} » supprimée.", "success")
  203. return redirect(url_for("jobs.archives"))
  204. # --- Restauration -------------------------------------------------------------
  205. @bp.route("/archives/<path:archive_name>/restore", methods=["GET", "POST"])
  206. def archive_restore(archive_name):
  207. backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
  208. info = read_archive_info(archive_name, backup_dir)
  209. if request.method == "GET":
  210. return render_template("restore_confirm.html", archive_name=archive_name, info=info)
  211. _start_restore(archive_name)
  212. flash(f"Restauration de « {archive_name} » démarrée en arrière-plan.", "success")
  213. return redirect(url_for("jobs.index"))
  214. def _start_restore(archive_name):
  215. """Crée un Run de restauration et lance le thread. Retourne (restore_run_id, archive_type)."""
  216. backup_dir = current_app.config["YUNOHOST_BACKUP_DIR"]
  217. info = read_archive_info(archive_name, backup_dir)
  218. archive_type = info.get("type", "")
  219. original_run = Run.query.filter_by(archive_name=archive_name).first()
  220. restore_run_id = None
  221. if original_run:
  222. restore_run = Run(
  223. job_id=original_run.job_id,
  224. started_at=datetime.utcnow(),
  225. status="running",
  226. archive_name=archive_name,
  227. log_text="[RESTAURATION en cours…]",
  228. )
  229. db.session.add(restore_run)
  230. db.session.commit()
  231. restore_run_id = restore_run.id
  232. app = current_app._get_current_object()
  233. threading.Thread(
  234. target=_do_restore_job,
  235. args=(app, archive_name, archive_type, restore_run_id),
  236. daemon=True,
  237. ).start()
  238. return restore_run_id, archive_type
  239. def _do_restore_job(app, archive_name, archive_type, restore_run_id):
  240. with app.app_context():
  241. run = db.session.get(Run, restore_run_id) if restore_run_id else None
  242. try:
  243. backup_dir = app.config["YUNOHOST_BACKUP_DIR"]
  244. if archive_type == "custom_dir":
  245. from jobs.custom_dir import restore_custom_dir
  246. log = restore_custom_dir(archive_name, backup_dir)
  247. elif archive_type in ("mysql", "postgresql"):
  248. from jobs.db_dump import restore_db_dump
  249. log = restore_db_dump(archive_name, backup_dir)
  250. elif archive_type == "ynh_app":
  251. result = subprocess.run(
  252. ["sudo", "yunohost", "backup", "restore", archive_name,
  253. "--apps", "--force"],
  254. capture_output=True, text=True, timeout=3600,
  255. )
  256. log = (result.stdout + result.stderr).strip()
  257. if result.returncode != 0:
  258. raise RuntimeError(f"yunohost backup restore a échoué :\n{log}")
  259. elif archive_type == "ynh_system":
  260. result = subprocess.run(
  261. ["sudo", "yunohost", "backup", "restore", archive_name,
  262. "--system", "--force"],
  263. capture_output=True, text=True, timeout=3600,
  264. )
  265. log = (result.stdout + result.stderr).strip()
  266. if result.returncode != 0:
  267. raise RuntimeError(f"yunohost backup restore a échoué :\n{log}")
  268. else:
  269. raise NotImplementedError(
  270. f"Restauration non supportée pour le type '{archive_type}'."
  271. )
  272. if run:
  273. run.status = "success"
  274. run.finished_at = datetime.utcnow()
  275. run.log_text = f"[RESTAURATION]\n{log or 'OK'}"
  276. db.session.commit()
  277. except Exception as exc:
  278. app.logger.error(f"Restauration {archive_name} échouée : {exc}")
  279. if run:
  280. run.status = "error"
  281. run.finished_at = datetime.utcnow()
  282. run.log_text = f"[RESTAURATION]\n{exc}"
  283. db.session.commit()
  284. # --- Helper save job ----------------------------------------------------------
  285. def _save_job(job):
  286. f = request.form
  287. job_type = f.get("type", "")
  288. name = f.get("name", "").strip()
  289. if not name:
  290. flash("Le nom est requis.", "error")
  291. return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job.id if job else None)),
  292. destinations=Destination.query.filter_by(enabled=True).all(),
  293. remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all())
  294. cfg = {}
  295. if job_type == "ynh_app":
  296. cfg = {"app_id": f.get("app_id", ""), "core_only": f.get("core_only") == "1"}
  297. elif job_type == "ynh_system":
  298. cfg = {}
  299. elif job_type in ("mysql", "postgresql"):
  300. dbname = f.get("db_database", "").strip()
  301. if not dbname:
  302. flash("Le nom de la base de données est requis.", "error")
  303. return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job.id if job else None)),
  304. destinations=Destination.query.filter_by(enabled=True).all(),
  305. remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all())
  306. cfg = {"database": dbname}
  307. elif job_type == "custom_dir":
  308. source_path = f.get("source_path", "").strip().rstrip("/")
  309. if not source_path or not source_path.startswith("/"):
  310. flash("Le chemin source doit être un chemin absolu (ex: /opt/monapp).", "error")
  311. return render_template("job_form.html", job=job, ynh_apps=get_ynh_apps(exclude_app_ids=_used_app_ids(exclude_job_id=job.id if job else None)),
  312. destinations=Destination.query.filter_by(enabled=True).all(),
  313. remote_instances=RemoteInstance.query.order_by(RemoteInstance.name).all())
  314. excludes = [e.strip() for e in f.get("excludes", "").splitlines() if e.strip()]
  315. restore_cfg = {}
  316. user_name = f.get("restore_user_name", "").strip()
  317. if user_name:
  318. restore_cfg["system_user"] = {
  319. "name": user_name,
  320. "home": f.get("restore_user_home", source_path).strip() or source_path,
  321. "shell": f.get("restore_user_shell", "/bin/false").strip() or "/bin/false",
  322. }
  323. service_name = f.get("restore_service_name", "").strip()
  324. if service_name:
  325. restore_cfg["systemd_service"] = {
  326. "name": service_name,
  327. "service_file": f.get("restore_service_file", "").strip(),
  328. }
  329. owner = f.get("restore_perm_owner", "").strip()
  330. mode = f.get("restore_perm_mode", "").strip()
  331. if owner or mode:
  332. restore_cfg["permissions"] = {}
  333. if owner:
  334. restore_cfg["permissions"]["owner"] = owner
  335. if mode:
  336. restore_cfg["permissions"]["mode"] = mode
  337. post_cmds = [c.strip() for c in f.get("restore_post_cmds", "").splitlines() if c.strip()]
  338. if post_cmds:
  339. restore_cfg["post_restore_commands"] = post_cmds
  340. cfg = {"source_path": source_path, "excludes": excludes, "restore": restore_cfg}
  341. if job is None:
  342. job = Job()
  343. db.session.add(job)
  344. from scheduler import schedule_job, remove_job
  345. transfer_target = f.get("transfer_target", "").strip()
  346. job.name = name
  347. job.type = job_type
  348. job.config_json = json.dumps(cfg)
  349. cron_raw = (f.get("cron_expr") or "").strip()
  350. job.cron_expr = cron_raw # "" = manuel (NOT NULL compatible avec le schéma existant)
  351. job.retention_mode = f.get("retention_mode", "count")
  352. job.retention_value = int(f.get("retention_value", 7))
  353. job.enabled = f.get("enabled") == "1"
  354. job.core_only = cfg.get("core_only", False)
  355. if transfer_target.startswith("dest:"):
  356. job.destination_id = int(transfer_target[5:])
  357. job.remote_instance_id = None
  358. elif transfer_target.startswith("inst:"):
  359. job.destination_id = None
  360. job.remote_instance_id = int(transfer_target[5:])
  361. else:
  362. job.destination_id = None
  363. job.remote_instance_id = None
  364. job.updated_at = datetime.utcnow()
  365. db.session.commit()
  366. if job.enabled:
  367. schedule_job(job)
  368. else:
  369. remove_job(job.id)
  370. flash(f"Job « {job.name} » enregistré.", "success")
  371. return redirect(url_for("jobs.index"))