network.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. import os
  2. import subprocess
  3. import threading
  4. from flask import (
  5. Blueprint,
  6. current_app,
  7. flash,
  8. redirect,
  9. render_template,
  10. request,
  11. url_for,
  12. )
  13. from db import db, Job, Run, RemoteInstance, RemoteRun
  14. from db import _size_human
  15. bp = Blueprint("network", __name__)
  16. # --- Instances distantes ------------------------------------------------------
  17. @bp.route("/remote-instances")
  18. def remote_instances_list():
  19. instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
  20. return render_template("remote_instances.html", instances=instances)
  21. @bp.route("/remote-instances/new", methods=["GET", "POST"])
  22. def remote_instance_new():
  23. if request.method == "POST":
  24. return _save_remote_instance(None)
  25. return render_template("remote_instance_form.html", inst=None)
  26. @bp.route("/remote-instances/<int:inst_id>/edit", methods=["GET", "POST"])
  27. def remote_instance_edit(inst_id):
  28. inst = db.get_or_404(RemoteInstance, inst_id)
  29. if request.method == "POST":
  30. return _save_remote_instance(inst)
  31. return render_template("remote_instance_form.html", inst=inst)
  32. @bp.route("/remote-instances/<int:inst_id>/delete", methods=["POST"])
  33. def remote_instance_delete(inst_id):
  34. inst = db.get_or_404(RemoteInstance, inst_id)
  35. db.session.delete(inst)
  36. db.session.commit()
  37. flash(f"Instance « {inst.name} » supprimée.", "success")
  38. return redirect(url_for("network.remote_instances_list"))
  39. @bp.route("/remote-instances/<int:inst_id>/test", methods=["POST"])
  40. def remote_instance_test(inst_id):
  41. inst = db.get_or_404(RemoteInstance, inst_id)
  42. from federation.client import FederationClient
  43. from datetime import datetime
  44. try:
  45. data = FederationClient(inst).health()
  46. inst.status = "online"
  47. inst.last_seen = datetime.utcnow()
  48. db.session.commit()
  49. flash(f"Instance « {inst.name} » en ligne — {data.get('instance', '?')}.", "success")
  50. except Exception as exc:
  51. inst.status = "error"
  52. db.session.commit()
  53. flash(f"Connexion échouée vers « {inst.name} » : {exc}", "error")
  54. return redirect(url_for("network.remote_instances_list"))
  55. @bp.route("/remote-instances/<int:inst_id>/sync", methods=["POST"])
  56. def remote_instance_sync(inst_id):
  57. inst = db.get_or_404(RemoteInstance, inst_id)
  58. from federation.client import sync_instance
  59. try:
  60. sync_instance(inst)
  61. flash(f"Instance « {inst.name} » synchronisée.", "success")
  62. except Exception as exc:
  63. flash(f"Synchronisation échouée pour « {inst.name} » : {exc}", "error")
  64. return redirect(url_for("network.remote_instances_list"))
  65. # --- Dashboard réseau ---------------------------------------------------------
  66. @bp.route("/network")
  67. def dashboard_network():
  68. local_jobs = Job.query.order_by(Job.name).all()
  69. local_jobs_data = []
  70. for job in local_jobs:
  71. run = Run.query.filter_by(job_id=job.id).order_by(Run.started_at.desc()).first()
  72. local_jobs_data.append(_JobRow(
  73. job_id=job.id, name=job.name, type=job.type,
  74. last_run_at=run.started_at if run else None,
  75. last_status=run.status if run else None,
  76. last_archive_name=run.archive_name if run else None,
  77. last_size_bytes=run.size_bytes if run else None,
  78. ))
  79. instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
  80. return render_template("dashboard_network.html",
  81. local_jobs_data=local_jobs_data,
  82. instances=instances,
  83. instances_for_push=instances)
  84. @bp.route("/network/sync-all", methods=["POST"])
  85. def network_sync_all():
  86. from federation.client import sync_instance
  87. instances = RemoteInstance.query.all()
  88. errors = []
  89. for inst in instances:
  90. try:
  91. sync_instance(inst)
  92. except Exception as exc:
  93. errors.append(f"{inst.name}: {exc}")
  94. if errors:
  95. flash("Synchronisation partielle — " + " | ".join(errors), "error")
  96. else:
  97. flash(f"{len(instances)} instance(s) synchronisée(s).", "success")
  98. return redirect(url_for("network.dashboard_network"))
  99. # --- Contrôle distant ---------------------------------------------------------
  100. @bp.route("/remote-instances/<int:inst_id>/run-job/<int:job_id>", methods=["POST"])
  101. def remote_job_run(inst_id, job_id):
  102. inst = db.get_or_404(RemoteInstance, inst_id)
  103. from federation.client import FederationClient
  104. try:
  105. FederationClient(inst).run_job(job_id)
  106. flash(f"Job déclenché sur « {inst.name} ».", "success")
  107. except Exception as exc:
  108. flash(f"Impossible de lancer le job sur « {inst.name} » : {exc}", "error")
  109. return redirect(url_for("network.dashboard_network"))
  110. # --- Push / Pull archives -----------------------------------------------------
  111. @bp.route("/archives/<path:archive_name>/push/<int:inst_id>", methods=["POST"])
  112. def archive_push(archive_name, inst_id):
  113. inst = db.get_or_404(RemoteInstance, inst_id)
  114. app = current_app._get_current_object()
  115. threading.Thread(target=_do_push_archive, args=(app, archive_name, inst.id), daemon=True).start()
  116. flash(f"Envoi de « {archive_name} » vers « {inst.name} » démarré en arrière-plan.", "success")
  117. return redirect(request.referrer or url_for("jobs.index"))
  118. @bp.route("/remote-instances/<int:inst_id>/pull-latest/<int:remote_job_id>", methods=["POST"])
  119. def archive_pull_latest(inst_id, remote_job_id):
  120. inst = db.get_or_404(RemoteInstance, inst_id)
  121. app = current_app._get_current_object()
  122. threading.Thread(target=_do_pull_latest, args=(app, inst.id, remote_job_id), daemon=True).start()
  123. flash(f"Rapatriement depuis « {inst.name} » démarré en arrière-plan.", "success")
  124. return redirect(url_for("network.dashboard_network"))
  125. def _do_push_archive(app, archive_name, inst_id):
  126. """Pousse une archive locale vers une instance distante via HTTP chunked."""
  127. import hashlib as _hashlib
  128. from federation.client import FederationClient
  129. from jobs.utils import sudo_exists
  130. with app.app_context():
  131. inst = db.session.get(RemoteInstance, inst_id)
  132. backup_dir = app.config["YUNOHOST_BACKUP_DIR"]
  133. archive_path = os.path.join(backup_dir, archive_name + ".tar")
  134. tmp_path = None
  135. try:
  136. tmp_path = f"/tmp/backupmanager_push_{archive_name}.tar"
  137. result = subprocess.run(
  138. ["sudo", "rsync", archive_path, tmp_path],
  139. capture_output=True, text=True,
  140. )
  141. if result.returncode != 0:
  142. raise RuntimeError(f"Copie locale échouée : {result.stderr.strip()}")
  143. total_size = os.path.getsize(tmp_path)
  144. sha256 = _hashlib.sha256()
  145. chunk_size = 50 * 1024 * 1024
  146. with open(tmp_path, "rb") as f:
  147. while True:
  148. data = f.read(65536)
  149. if not data:
  150. break
  151. sha256.update(data)
  152. checksum = sha256.hexdigest()
  153. client = FederationClient(inst)
  154. upload_info = client.upload_start(archive_name + ".tar", total_size, checksum, chunk_size)
  155. upload_id = upload_info["upload_id"]
  156. with open(tmp_path, "rb") as f:
  157. n = 0
  158. while True:
  159. data = f.read(chunk_size)
  160. if not data:
  161. break
  162. client.upload_chunk(upload_id, n, data)
  163. n += 1
  164. # Transmettre le .info.json si présent (via sudo rsync vers /tmp)
  165. info_json_content = None
  166. info_path = os.path.join(backup_dir, archive_name + ".info.json")
  167. if sudo_exists(info_path):
  168. tmp_info_src = f"/tmp/backupmanager_push_{archive_name}.info.json"
  169. r = subprocess.run(["sudo", "rsync", info_path, tmp_info_src],
  170. capture_output=True)
  171. if r.returncode == 0:
  172. try:
  173. with open(tmp_info_src, "r", encoding="utf-8", errors="replace") as fh:
  174. info_json_content = fh.read()
  175. finally:
  176. subprocess.run(["sudo", "rm", "-rf", tmp_info_src],
  177. capture_output=True)
  178. client.upload_finish_with_info(upload_id, info_json_content)
  179. app.logger.info(f"Push {archive_name} → {inst.name} OK")
  180. except Exception as exc:
  181. app.logger.error(f"Push {archive_name} → {inst.name} échoué : {exc}")
  182. finally:
  183. if tmp_path and os.path.exists(tmp_path):
  184. os.unlink(tmp_path)
  185. def _do_pull_latest(app, inst_id, remote_job_id):
  186. """Rapatrie la dernière archive d'un job distant (.tar + .info.json)."""
  187. from federation.client import FederationClient, sync_instance
  188. with app.app_context():
  189. inst = db.session.get(RemoteInstance, inst_id)
  190. backup_dir = app.config["YUNOHOST_BACKUP_DIR"]
  191. try:
  192. client = FederationClient(inst)
  193. sync_instance(inst)
  194. db.session.refresh(inst)
  195. runs = client.get_job_runs(remote_job_id)
  196. if not runs:
  197. raise RuntimeError(f"Aucun run distant pour le job {remote_job_id}")
  198. archive_name = runs[0].get("archive_name")
  199. if not archive_name:
  200. raise RuntimeError("Le dernier run distant n'a pas d'archive.")
  201. archive_bytes = client.download_archive(archive_name)
  202. tmp_tar = f"/tmp/backupmanager_pull_{archive_name}.tar"
  203. with open(tmp_tar, "wb") as f:
  204. f.write(archive_bytes)
  205. subprocess.run(["sudo", "rsync", tmp_tar,
  206. os.path.join(backup_dir, archive_name + ".tar")], check=True)
  207. os.unlink(tmp_tar)
  208. info_bytes = client.download_info_json(archive_name)
  209. if info_bytes:
  210. tmp_info = f"/tmp/backupmanager_pull_{archive_name}.info.json"
  211. with open(tmp_info, "wb") as f:
  212. f.write(info_bytes)
  213. subprocess.run(["sudo", "rsync", tmp_info,
  214. os.path.join(backup_dir, archive_name + ".info.json")],
  215. check=True)
  216. os.unlink(tmp_info)
  217. else:
  218. app.logger.warning(
  219. f"Pull {archive_name}: .info.json absent ou inaccessible sur {inst.name}"
  220. )
  221. app.logger.info(f"Pull {archive_name} ← {inst.name} OK")
  222. except Exception as exc:
  223. app.logger.error(f"Pull ← {inst.name} échoué : {exc}")
  224. # --- Helper save instance -----------------------------------------------------
  225. def _save_remote_instance(inst):
  226. f = request.form
  227. name = f.get("name", "").strip()
  228. url = f.get("url", "").strip().rstrip("/")
  229. api_key = f.get("api_key", "").strip()
  230. if not name or not url or not api_key:
  231. flash("Nom, URL et token API sont requis.", "error")
  232. return render_template("remote_instance_form.html", inst=inst)
  233. if inst is None:
  234. inst = RemoteInstance()
  235. db.session.add(inst)
  236. inst.name = name
  237. inst.url = url
  238. inst.api_key = api_key
  239. db.session.commit()
  240. flash(f"Instance « {inst.name} » enregistrée.", "success")
  241. return redirect(url_for("network.remote_instances_list"))
  242. # --- DTO dashboard réseau -----------------------------------------------------
  243. class _JobRow:
  244. def __init__(self, job_id, name, type, last_run_at, last_status,
  245. last_archive_name, last_size_bytes):
  246. self.job_id = job_id
  247. self.name = name
  248. self.type = type
  249. self.last_run_at = last_run_at
  250. self.last_status = last_status
  251. self.last_archive_name = last_archive_name
  252. self.last_size_bytes = last_size_bytes
  253. @property
  254. def size_human(self):
  255. return _size_human(self.last_size_bytes)