| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- import os
- import subprocess
- import threading
- from flask import (
- Blueprint,
- current_app,
- flash,
- redirect,
- render_template,
- request,
- url_for,
- )
- from db import db, Job, Run, RemoteInstance, RemoteRun
- from db import _size_human
- bp = Blueprint("network", __name__)
- # --- Instances distantes ------------------------------------------------------
- @bp.route("/remote-instances")
- def remote_instances_list():
- instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
- return render_template("remote_instances.html", instances=instances)
- @bp.route("/remote-instances/new", methods=["GET", "POST"])
- def remote_instance_new():
- if request.method == "POST":
- return _save_remote_instance(None)
- return render_template("remote_instance_form.html", inst=None)
- @bp.route("/remote-instances/<int:inst_id>/edit", methods=["GET", "POST"])
- def remote_instance_edit(inst_id):
- inst = db.get_or_404(RemoteInstance, inst_id)
- if request.method == "POST":
- return _save_remote_instance(inst)
- return render_template("remote_instance_form.html", inst=inst)
- @bp.route("/remote-instances/<int:inst_id>/delete", methods=["POST"])
- def remote_instance_delete(inst_id):
- inst = db.get_or_404(RemoteInstance, inst_id)
- db.session.delete(inst)
- db.session.commit()
- flash(f"Instance « {inst.name} » supprimée.", "success")
- return redirect(url_for("network.remote_instances_list"))
- @bp.route("/remote-instances/<int:inst_id>/test", methods=["POST"])
- def remote_instance_test(inst_id):
- inst = db.get_or_404(RemoteInstance, inst_id)
- from federation.client import FederationClient
- from datetime import datetime
- try:
- data = FederationClient(inst).health()
- inst.status = "online"
- inst.last_seen = datetime.utcnow()
- db.session.commit()
- flash(f"Instance « {inst.name} » en ligne — {data.get('instance', '?')}.", "success")
- except Exception as exc:
- inst.status = "error"
- db.session.commit()
- flash(f"Connexion échouée vers « {inst.name} » : {exc}", "error")
- return redirect(url_for("network.remote_instances_list"))
- @bp.route("/remote-instances/<int:inst_id>/sync", methods=["POST"])
- def remote_instance_sync(inst_id):
- inst = db.get_or_404(RemoteInstance, inst_id)
- from federation.client import sync_instance
- try:
- sync_instance(inst)
- flash(f"Instance « {inst.name} » synchronisée.", "success")
- except Exception as exc:
- flash(f"Synchronisation échouée pour « {inst.name} » : {exc}", "error")
- return redirect(url_for("network.remote_instances_list"))
- # --- Dashboard réseau ---------------------------------------------------------
- @bp.route("/network")
- def dashboard_network():
- local_jobs = Job.query.order_by(Job.name).all()
- local_jobs_data = []
- for job in local_jobs:
- run = Run.query.filter_by(job_id=job.id).order_by(Run.started_at.desc()).first()
- local_jobs_data.append(_JobRow(
- job_id=job.id, name=job.name, type=job.type,
- last_run_at=run.started_at if run else None,
- last_status=run.status if run else None,
- last_archive_name=run.archive_name if run else None,
- last_size_bytes=run.size_bytes if run else None,
- ))
- instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
- return render_template("dashboard_network.html",
- local_jobs_data=local_jobs_data,
- instances=instances,
- instances_for_push=instances)
- @bp.route("/network/sync-all", methods=["POST"])
- def network_sync_all():
- from federation.client import sync_instance
- instances = RemoteInstance.query.all()
- errors = []
- for inst in instances:
- try:
- sync_instance(inst)
- except Exception as exc:
- errors.append(f"{inst.name}: {exc}")
- if errors:
- flash("Synchronisation partielle — " + " | ".join(errors), "error")
- else:
- flash(f"{len(instances)} instance(s) synchronisée(s).", "success")
- return redirect(url_for("network.dashboard_network"))
- # --- Contrôle distant ---------------------------------------------------------
- @bp.route("/remote-instances/<int:inst_id>/run-job/<int:job_id>", methods=["POST"])
- def remote_job_run(inst_id, job_id):
- inst = db.get_or_404(RemoteInstance, inst_id)
- from federation.client import FederationClient
- try:
- FederationClient(inst).run_job(job_id)
- flash(f"Job déclenché sur « {inst.name} ».", "success")
- except Exception as exc:
- flash(f"Impossible de lancer le job sur « {inst.name} » : {exc}", "error")
- return redirect(url_for("network.dashboard_network"))
- # --- Push / Pull archives -----------------------------------------------------
- @bp.route("/archives/<path:archive_name>/push/<int:inst_id>", methods=["POST"])
- def archive_push(archive_name, inst_id):
- inst = db.get_or_404(RemoteInstance, inst_id)
- app = current_app._get_current_object()
- threading.Thread(target=_do_push_archive, args=(app, archive_name, inst.id), daemon=True).start()
- flash(f"Envoi de « {archive_name} » vers « {inst.name} » démarré en arrière-plan.", "success")
- return redirect(request.referrer or url_for("jobs.index"))
- @bp.route("/remote-instances/<int:inst_id>/pull-latest/<int:remote_job_id>", methods=["POST"])
- def archive_pull_latest(inst_id, remote_job_id):
- inst = db.get_or_404(RemoteInstance, inst_id)
- app = current_app._get_current_object()
- threading.Thread(target=_do_pull_latest, args=(app, inst.id, remote_job_id), daemon=True).start()
- flash(f"Rapatriement depuis « {inst.name} » démarré en arrière-plan.", "success")
- return redirect(url_for("network.dashboard_network"))
- def _do_push_archive(app, archive_name, inst_id):
- """Pousse une archive locale vers une instance distante via HTTP chunked."""
- import hashlib as _hashlib
- from federation.client import FederationClient
- from jobs.utils import sudo_exists
- with app.app_context():
- inst = db.session.get(RemoteInstance, inst_id)
- backup_dir = app.config["YUNOHOST_BACKUP_DIR"]
- archive_path = os.path.join(backup_dir, archive_name + ".tar")
- tmp_path = None
- try:
- tmp_path = f"/tmp/backupmanager_push_{archive_name}.tar"
- result = subprocess.run(
- ["sudo", "rsync", archive_path, tmp_path],
- capture_output=True, text=True,
- )
- if result.returncode != 0:
- raise RuntimeError(f"Copie locale échouée : {result.stderr.strip()}")
- total_size = os.path.getsize(tmp_path)
- sha256 = _hashlib.sha256()
- chunk_size = 50 * 1024 * 1024
- with open(tmp_path, "rb") as f:
- while True:
- data = f.read(65536)
- if not data:
- break
- sha256.update(data)
- checksum = sha256.hexdigest()
- client = FederationClient(inst)
- upload_info = client.upload_start(archive_name + ".tar", total_size, checksum, chunk_size)
- upload_id = upload_info["upload_id"]
- with open(tmp_path, "rb") as f:
- n = 0
- while True:
- data = f.read(chunk_size)
- if not data:
- break
- client.upload_chunk(upload_id, n, data)
- n += 1
- # Transmettre le .info.json si présent (via sudo rsync vers /tmp)
- info_json_content = None
- info_path = os.path.join(backup_dir, archive_name + ".info.json")
- if sudo_exists(info_path):
- tmp_info_src = f"/tmp/backupmanager_push_{archive_name}.info.json"
- r = subprocess.run(["sudo", "rsync", info_path, tmp_info_src],
- capture_output=True)
- if r.returncode == 0:
- try:
- with open(tmp_info_src, "r", encoding="utf-8", errors="replace") as fh:
- info_json_content = fh.read()
- finally:
- subprocess.run(["sudo", "rm", "-rf", tmp_info_src],
- capture_output=True)
- client.upload_finish_with_info(upload_id, info_json_content)
- app.logger.info(f"Push {archive_name} → {inst.name} OK")
- except Exception as exc:
- app.logger.error(f"Push {archive_name} → {inst.name} échoué : {exc}")
- finally:
- if tmp_path and os.path.exists(tmp_path):
- os.unlink(tmp_path)
- def _do_pull_latest(app, inst_id, remote_job_id):
- """Rapatrie la dernière archive d'un job distant (.tar + .info.json)."""
- from federation.client import FederationClient, sync_instance
- with app.app_context():
- inst = db.session.get(RemoteInstance, inst_id)
- backup_dir = app.config["YUNOHOST_BACKUP_DIR"]
- try:
- client = FederationClient(inst)
- sync_instance(inst)
- db.session.refresh(inst)
- runs = client.get_job_runs(remote_job_id)
- if not runs:
- raise RuntimeError(f"Aucun run distant pour le job {remote_job_id}")
- archive_name = runs[0].get("archive_name")
- if not archive_name:
- raise RuntimeError("Le dernier run distant n'a pas d'archive.")
- archive_bytes = client.download_archive(archive_name)
- tmp_tar = f"/tmp/backupmanager_pull_{archive_name}.tar"
- with open(tmp_tar, "wb") as f:
- f.write(archive_bytes)
- subprocess.run(["sudo", "rsync", tmp_tar,
- os.path.join(backup_dir, archive_name + ".tar")], check=True)
- os.unlink(tmp_tar)
- info_bytes = client.download_info_json(archive_name)
- if info_bytes:
- tmp_info = f"/tmp/backupmanager_pull_{archive_name}.info.json"
- with open(tmp_info, "wb") as f:
- f.write(info_bytes)
- subprocess.run(["sudo", "rsync", tmp_info,
- os.path.join(backup_dir, archive_name + ".info.json")],
- check=True)
- os.unlink(tmp_info)
- else:
- app.logger.warning(
- f"Pull {archive_name}: .info.json absent ou inaccessible sur {inst.name}"
- )
- app.logger.info(f"Pull {archive_name} ← {inst.name} OK")
- except Exception as exc:
- app.logger.error(f"Pull ← {inst.name} échoué : {exc}")
- # --- Helper save instance -----------------------------------------------------
- def _save_remote_instance(inst):
- f = request.form
- name = f.get("name", "").strip()
- url = f.get("url", "").strip().rstrip("/")
- api_key = f.get("api_key", "").strip()
- if not name or not url or not api_key:
- flash("Nom, URL et token API sont requis.", "error")
- return render_template("remote_instance_form.html", inst=inst)
- if inst is None:
- inst = RemoteInstance()
- db.session.add(inst)
- inst.name = name
- inst.url = url
- inst.api_key = api_key
- db.session.commit()
- flash(f"Instance « {inst.name} » enregistrée.", "success")
- return redirect(url_for("network.remote_instances_list"))
- # --- DTO dashboard réseau -----------------------------------------------------
- class _JobRow:
- def __init__(self, job_id, name, type, last_run_at, last_status,
- last_archive_name, last_size_bytes):
- self.job_id = job_id
- self.name = name
- self.type = type
- self.last_run_at = last_run_at
- self.last_status = last_status
- self.last_archive_name = last_archive_name
- self.last_size_bytes = last_size_bytes
- @property
- def size_human(self):
- return _size_human(self.last_size_bytes)
|