network.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. import os
  2. import subprocess
  3. import threading
  4. from flask import (
  5. Blueprint,
  6. current_app,
  7. flash,
  8. redirect,
  9. render_template,
  10. request,
  11. url_for,
  12. )
  13. from db import db, Job, Run, RemoteInstance, RemoteRun, JobDestination
  14. from db import _size_human
  15. bp = Blueprint("network", __name__)
  16. # --- Instances distantes ------------------------------------------------------
  17. @bp.route("/remote-instances")
  18. def remote_instances_list():
  19. instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
  20. return render_template("remote_instances.html", instances=instances)
  21. @bp.route("/remote-instances/new", methods=["GET", "POST"])
  22. def remote_instance_new():
  23. if request.method == "POST":
  24. return _save_remote_instance(None)
  25. return render_template("remote_instance_form.html", inst=None)
  26. @bp.route("/remote-instances/<int:inst_id>/edit", methods=["GET", "POST"])
  27. def remote_instance_edit(inst_id):
  28. inst = db.get_or_404(RemoteInstance, inst_id)
  29. if request.method == "POST":
  30. return _save_remote_instance(inst)
  31. return render_template("remote_instance_form.html", inst=inst)
  32. @bp.route("/remote-instances/<int:inst_id>/delete", methods=["POST"])
  33. def remote_instance_delete(inst_id):
  34. inst = db.get_or_404(RemoteInstance, inst_id)
  35. JobDestination.query.filter_by(dest_type="instance", dest_id=inst_id).delete()
  36. db.session.delete(inst)
  37. db.session.commit()
  38. flash(f"Instance « {inst.name} » supprimée.", "success")
  39. return redirect(url_for("cfg.settings") + "?tab=instances")
  40. @bp.route("/remote-instances/<int:inst_id>/test", methods=["POST"])
  41. def remote_instance_test(inst_id):
  42. inst = db.get_or_404(RemoteInstance, inst_id)
  43. from federation.client import FederationClient
  44. from datetime import datetime
  45. try:
  46. data = FederationClient(inst).health()
  47. inst.status = "online"
  48. inst.last_seen = datetime.utcnow()
  49. db.session.commit()
  50. flash(f"Instance « {inst.name} » en ligne — {data.get('instance', '?')}.", "success")
  51. except Exception as exc:
  52. inst.status = "error"
  53. db.session.commit()
  54. flash(f"Connexion échouée vers « {inst.name} » : {exc}", "error")
  55. return redirect(url_for("cfg.settings") + "?tab=instances")
  56. @bp.route("/remote-instances/<int:inst_id>/sync", methods=["POST"])
  57. def remote_instance_sync(inst_id):
  58. inst = db.get_or_404(RemoteInstance, inst_id)
  59. from federation.client import sync_instance
  60. try:
  61. sync_instance(inst)
  62. flash(f"Instance « {inst.name} » synchronisée.", "success")
  63. except Exception as exc:
  64. flash(f"Synchronisation échouée pour « {inst.name} » : {exc}", "error")
  65. return redirect(url_for("network.federation"))
  66. # --- Fédération (vue unifiée) ------------------------------------------------
  67. @bp.route("/federation")
  68. def federation():
  69. local_jobs = Job.query.order_by(Job.name).all()
  70. local_jobs_data = []
  71. for job in local_jobs:
  72. run = Run.query.filter_by(job_id=job.id).order_by(Run.started_at.desc()).first()
  73. local_jobs_data.append(_JobRow(
  74. job_id=job.id, name=job.name, type=job.type,
  75. last_run_at=run.started_at if run else None,
  76. last_status=run.status if run else None,
  77. last_archive_name=run.archive_name if run else None,
  78. last_size_bytes=run.size_bytes if run else None,
  79. ))
  80. instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
  81. return render_template("federation.html",
  82. local_jobs_data=local_jobs_data,
  83. instances=instances)
  84. # --- Dashboard réseau (legacy redirect) --------------------------------------
  85. @bp.route("/network")
  86. def dashboard_network():
  87. local_jobs = Job.query.order_by(Job.name).all()
  88. local_jobs_data = []
  89. for job in local_jobs:
  90. run = Run.query.filter_by(job_id=job.id).order_by(Run.started_at.desc()).first()
  91. local_jobs_data.append(_JobRow(
  92. job_id=job.id, name=job.name, type=job.type,
  93. last_run_at=run.started_at if run else None,
  94. last_status=run.status if run else None,
  95. last_archive_name=run.archive_name if run else None,
  96. last_size_bytes=run.size_bytes if run else None,
  97. ))
  98. instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
  99. return render_template("dashboard_network.html",
  100. local_jobs_data=local_jobs_data,
  101. instances=instances,
  102. instances_for_push=instances)
  103. @bp.route("/network/sync-all", methods=["POST"])
  104. def network_sync_all():
  105. from federation.client import sync_instance
  106. instances = RemoteInstance.query.all()
  107. errors = []
  108. for inst in instances:
  109. try:
  110. sync_instance(inst)
  111. except Exception as exc:
  112. errors.append(f"{inst.name}: {exc}")
  113. if errors:
  114. flash("Synchronisation partielle — " + " | ".join(errors), "error")
  115. else:
  116. flash(f"{len(instances)} instance(s) synchronisée(s).", "success")
  117. return redirect(url_for("network.federation"))
  118. # --- Contrôle distant ---------------------------------------------------------
  119. @bp.route("/remote-instances/<int:inst_id>/run-job/<int:job_id>", methods=["POST"])
  120. def remote_job_run(inst_id, job_id):
  121. inst = db.get_or_404(RemoteInstance, inst_id)
  122. from federation.client import FederationClient
  123. try:
  124. FederationClient(inst).run_job(job_id)
  125. flash(f"Job déclenché sur « {inst.name} ».", "success")
  126. except Exception as exc:
  127. flash(f"Impossible de lancer le job sur « {inst.name} » : {exc}", "error")
  128. return redirect(url_for("network.federation"))
  129. # --- Push / Pull archives -----------------------------------------------------
  130. @bp.route("/archives/<path:archive_name>/push/<int:inst_id>", methods=["POST"])
  131. def archive_push(archive_name, inst_id):
  132. inst = db.get_or_404(RemoteInstance, inst_id)
  133. app = current_app._get_current_object()
  134. threading.Thread(target=_do_push_archive, args=(app, archive_name, inst.id), daemon=True).start()
  135. flash(f"Envoi de « {archive_name} » vers « {inst.name} » démarré en arrière-plan.", "success")
  136. return redirect(request.referrer or url_for("jobs.index"))
  137. @bp.route("/remote-instances/<int:inst_id>/pull-latest/<int:remote_job_id>", methods=["POST"])
  138. def archive_pull_latest(inst_id, remote_job_id):
  139. inst = db.get_or_404(RemoteInstance, inst_id)
  140. app = current_app._get_current_object()
  141. threading.Thread(target=_do_pull_latest, args=(app, inst.id, remote_job_id), daemon=True).start()
  142. flash(f"Rapatriement depuis « {inst.name} » démarré en arrière-plan.", "success")
  143. return redirect(url_for("network.federation"))
  144. def _do_push_archive(app, archive_name, inst_id):
  145. """Pousse une archive locale vers une instance distante via HTTP chunked."""
  146. import hashlib as _hashlib
  147. from federation.client import FederationClient
  148. from jobs.utils import sudo_exists
  149. with app.app_context():
  150. inst = db.session.get(RemoteInstance, inst_id)
  151. backup_dir = app.config["YUNOHOST_BACKUP_DIR"]
  152. archive_path = os.path.join(backup_dir, archive_name + ".tar")
  153. tmp_path = None
  154. try:
  155. tmp_path = f"/tmp/backupmanager_push_{archive_name}.tar"
  156. result = subprocess.run(
  157. ["sudo", "rsync", archive_path, tmp_path],
  158. capture_output=True, text=True,
  159. )
  160. if result.returncode != 0:
  161. raise RuntimeError(f"Copie locale échouée : {result.stderr.strip()}")
  162. total_size = os.path.getsize(tmp_path)
  163. sha256 = _hashlib.sha256()
  164. chunk_size = 50 * 1024 * 1024
  165. with open(tmp_path, "rb") as f:
  166. while True:
  167. data = f.read(65536)
  168. if not data:
  169. break
  170. sha256.update(data)
  171. checksum = sha256.hexdigest()
  172. client = FederationClient(inst)
  173. upload_info = client.upload_start(archive_name + ".tar", total_size, checksum, chunk_size)
  174. upload_id = upload_info["upload_id"]
  175. with open(tmp_path, "rb") as f:
  176. n = 0
  177. while True:
  178. data = f.read(chunk_size)
  179. if not data:
  180. break
  181. client.upload_chunk(upload_id, n, data)
  182. n += 1
  183. # Transmettre le .info.json si présent (via sudo rsync vers /tmp)
  184. info_json_content = None
  185. info_path = os.path.join(backup_dir, archive_name + ".info.json")
  186. if sudo_exists(info_path):
  187. tmp_info_src = f"/tmp/backupmanager_push_{archive_name}.info.json"
  188. r = subprocess.run(["sudo", "rsync", info_path, tmp_info_src],
  189. capture_output=True)
  190. if r.returncode == 0:
  191. try:
  192. with open(tmp_info_src, "r", encoding="utf-8", errors="replace") as fh:
  193. info_json_content = fh.read()
  194. finally:
  195. subprocess.run(["sudo", "rm", "-rf", tmp_info_src],
  196. capture_output=True)
  197. client.upload_finish_with_info(upload_id, info_json_content)
  198. app.logger.info(f"Push {archive_name} → {inst.name} OK")
  199. except Exception as exc:
  200. app.logger.error(f"Push {archive_name} → {inst.name} échoué : {exc}")
  201. finally:
  202. if tmp_path and os.path.exists(tmp_path):
  203. os.unlink(tmp_path)
  204. def _do_pull_latest(app, inst_id, remote_job_id):
  205. """Rapatrie la dernière archive d'un job distant (.tar + .info.json)."""
  206. from federation.client import FederationClient, sync_instance
  207. with app.app_context():
  208. inst = db.session.get(RemoteInstance, inst_id)
  209. backup_dir = app.config["YUNOHOST_BACKUP_DIR"]
  210. try:
  211. client = FederationClient(inst)
  212. sync_instance(inst)
  213. db.session.refresh(inst)
  214. runs = client.get_job_runs(remote_job_id)
  215. if not runs:
  216. raise RuntimeError(f"Aucun run distant pour le job {remote_job_id}")
  217. archive_name = runs[0].get("archive_name")
  218. if not archive_name:
  219. raise RuntimeError("Le dernier run distant n'a pas d'archive.")
  220. archive_bytes = client.download_archive(archive_name)
  221. tmp_tar = f"/tmp/backupmanager_pull_{archive_name}.tar"
  222. with open(tmp_tar, "wb") as f:
  223. f.write(archive_bytes)
  224. subprocess.run(["sudo", "rsync", tmp_tar,
  225. os.path.join(backup_dir, archive_name + ".tar")], check=True)
  226. os.unlink(tmp_tar)
  227. info_bytes = client.download_info_json(archive_name)
  228. if info_bytes:
  229. tmp_info = f"/tmp/backupmanager_pull_{archive_name}.info.json"
  230. with open(tmp_info, "wb") as f:
  231. f.write(info_bytes)
  232. subprocess.run(["sudo", "rsync", tmp_info,
  233. os.path.join(backup_dir, archive_name + ".info.json")],
  234. check=True)
  235. os.unlink(tmp_info)
  236. else:
  237. app.logger.warning(
  238. f"Pull {archive_name}: .info.json absent ou inaccessible sur {inst.name}"
  239. )
  240. app.logger.info(f"Pull {archive_name} ← {inst.name} OK")
  241. except Exception as exc:
  242. app.logger.error(f"Pull ← {inst.name} échoué : {exc}")
  243. # --- Helper save instance -----------------------------------------------------
  244. def _save_remote_instance(inst):
  245. f = request.form
  246. name = f.get("name", "").strip()
  247. url = f.get("url", "").strip().rstrip("/")
  248. api_key = f.get("api_key", "").strip()
  249. if not name or not url or not api_key:
  250. flash("Nom, URL et token API sont requis.", "error")
  251. return render_template("remote_instance_form.html", inst=inst)
  252. if inst is None:
  253. inst = RemoteInstance()
  254. db.session.add(inst)
  255. inst.name = name
  256. inst.url = url
  257. inst.api_key = api_key
  258. db.session.commit()
  259. flash(f"Instance « {inst.name} » enregistrée.", "success")
  260. return redirect(url_for("cfg.settings") + "?tab=instances")
  261. # --- DTO dashboard réseau -----------------------------------------------------
  262. class _JobRow:
  263. def __init__(self, job_id, name, type, last_run_at, last_status,
  264. last_archive_name, last_size_bytes):
  265. self.job_id = job_id
  266. self.name = name
  267. self.type = type
  268. self.last_run_at = last_run_at
  269. self.last_status = last_status
  270. self.last_archive_name = last_archive_name
  271. self.last_size_bytes = last_size_bytes
  272. @property
  273. def size_human(self):
  274. return _size_human(self.last_size_bytes)