import os import subprocess import threading from flask import ( Blueprint, current_app, flash, redirect, render_template, request, url_for, ) from db import db, Job, Run, RemoteInstance, RemoteRun from db import _size_human bp = Blueprint("network", __name__) # --- Instances distantes ------------------------------------------------------ @bp.route("/remote-instances") def remote_instances_list(): instances = RemoteInstance.query.order_by(RemoteInstance.name).all() return render_template("remote_instances.html", instances=instances) @bp.route("/remote-instances/new", methods=["GET", "POST"]) def remote_instance_new(): if request.method == "POST": return _save_remote_instance(None) return render_template("remote_instance_form.html", inst=None) @bp.route("/remote-instances//edit", methods=["GET", "POST"]) def remote_instance_edit(inst_id): inst = db.get_or_404(RemoteInstance, inst_id) if request.method == "POST": return _save_remote_instance(inst) return render_template("remote_instance_form.html", inst=inst) @bp.route("/remote-instances//delete", methods=["POST"]) def remote_instance_delete(inst_id): inst = db.get_or_404(RemoteInstance, inst_id) db.session.delete(inst) db.session.commit() flash(f"Instance « {inst.name} » supprimée.", "success") return redirect(url_for("network.remote_instances_list")) @bp.route("/remote-instances//test", methods=["POST"]) def remote_instance_test(inst_id): inst = db.get_or_404(RemoteInstance, inst_id) from federation.client import FederationClient from datetime import datetime try: data = FederationClient(inst).health() inst.status = "online" inst.last_seen = datetime.utcnow() db.session.commit() flash(f"Instance « {inst.name} » en ligne — {data.get('instance', '?')}.", "success") except Exception as exc: inst.status = "error" db.session.commit() flash(f"Connexion échouée vers « {inst.name} » : {exc}", "error") return redirect(url_for("network.remote_instances_list")) @bp.route("/remote-instances//sync", methods=["POST"]) def remote_instance_sync(inst_id): inst = db.get_or_404(RemoteInstance, inst_id) from federation.client import sync_instance try: sync_instance(inst) flash(f"Instance « {inst.name} » synchronisée.", "success") except Exception as exc: flash(f"Synchronisation échouée pour « {inst.name} » : {exc}", "error") return redirect(url_for("network.remote_instances_list")) # --- Dashboard réseau --------------------------------------------------------- @bp.route("/network") def dashboard_network(): local_jobs = Job.query.order_by(Job.name).all() local_jobs_data = [] for job in local_jobs: run = Run.query.filter_by(job_id=job.id).order_by(Run.started_at.desc()).first() local_jobs_data.append(_JobRow( job_id=job.id, name=job.name, type=job.type, last_run_at=run.started_at if run else None, last_status=run.status if run else None, last_archive_name=run.archive_name if run else None, last_size_bytes=run.size_bytes if run else None, )) instances = RemoteInstance.query.order_by(RemoteInstance.name).all() return render_template("dashboard_network.html", local_jobs_data=local_jobs_data, instances=instances, instances_for_push=instances) @bp.route("/network/sync-all", methods=["POST"]) def network_sync_all(): from federation.client import sync_instance instances = RemoteInstance.query.all() errors = [] for inst in instances: try: sync_instance(inst) except Exception as exc: errors.append(f"{inst.name}: {exc}") if errors: flash("Synchronisation partielle — " + " | ".join(errors), "error") else: flash(f"{len(instances)} instance(s) synchronisée(s).", "success") return redirect(url_for("network.dashboard_network")) # --- Contrôle distant --------------------------------------------------------- @bp.route("/remote-instances//run-job/", methods=["POST"]) def remote_job_run(inst_id, job_id): inst = db.get_or_404(RemoteInstance, inst_id) from federation.client import FederationClient try: FederationClient(inst).run_job(job_id) flash(f"Job déclenché sur « {inst.name} ».", "success") except Exception as exc: flash(f"Impossible de lancer le job sur « {inst.name} » : {exc}", "error") return redirect(url_for("network.dashboard_network")) # --- Push / Pull archives ----------------------------------------------------- @bp.route("/archives//push/", methods=["POST"]) def archive_push(archive_name, inst_id): inst = db.get_or_404(RemoteInstance, inst_id) app = current_app._get_current_object() threading.Thread(target=_do_push_archive, args=(app, archive_name, inst.id), daemon=True).start() flash(f"Envoi de « {archive_name} » vers « {inst.name} » démarré en arrière-plan.", "success") return redirect(request.referrer or url_for("jobs.index")) @bp.route("/remote-instances//pull-latest/", methods=["POST"]) def archive_pull_latest(inst_id, remote_job_id): inst = db.get_or_404(RemoteInstance, inst_id) app = current_app._get_current_object() threading.Thread(target=_do_pull_latest, args=(app, inst.id, remote_job_id), daemon=True).start() flash(f"Rapatriement depuis « {inst.name} » démarré en arrière-plan.", "success") return redirect(url_for("network.dashboard_network")) def _do_push_archive(app, archive_name, inst_id): """Pousse une archive locale vers une instance distante via HTTP chunked.""" import hashlib as _hashlib from federation.client import FederationClient from jobs.utils import sudo_exists with app.app_context(): inst = db.session.get(RemoteInstance, inst_id) backup_dir = app.config["YUNOHOST_BACKUP_DIR"] archive_path = os.path.join(backup_dir, archive_name + ".tar") tmp_path = None try: tmp_path = f"/tmp/backupmanager_push_{archive_name}.tar" result = subprocess.run( ["sudo", "rsync", archive_path, tmp_path], capture_output=True, text=True, ) if result.returncode != 0: raise RuntimeError(f"Copie locale échouée : {result.stderr.strip()}") total_size = os.path.getsize(tmp_path) sha256 = _hashlib.sha256() chunk_size = 50 * 1024 * 1024 with open(tmp_path, "rb") as f: while True: data = f.read(65536) if not data: break sha256.update(data) checksum = sha256.hexdigest() client = FederationClient(inst) upload_info = client.upload_start(archive_name + ".tar", total_size, checksum, chunk_size) upload_id = upload_info["upload_id"] with open(tmp_path, "rb") as f: n = 0 while True: data = f.read(chunk_size) if not data: break client.upload_chunk(upload_id, n, data) n += 1 # Transmettre le .info.json si présent (via sudo rsync vers /tmp) info_json_content = None info_path = os.path.join(backup_dir, archive_name + ".info.json") if sudo_exists(info_path): tmp_info_src = f"/tmp/backupmanager_push_{archive_name}.info.json" r = subprocess.run(["sudo", "rsync", info_path, tmp_info_src], capture_output=True) if r.returncode == 0: try: with open(tmp_info_src, "r", encoding="utf-8", errors="replace") as fh: info_json_content = fh.read() finally: subprocess.run(["sudo", "rm", "-rf", tmp_info_src], capture_output=True) client.upload_finish_with_info(upload_id, info_json_content) app.logger.info(f"Push {archive_name} → {inst.name} OK") except Exception as exc: app.logger.error(f"Push {archive_name} → {inst.name} échoué : {exc}") finally: if tmp_path and os.path.exists(tmp_path): os.unlink(tmp_path) def _do_pull_latest(app, inst_id, remote_job_id): """Rapatrie la dernière archive d'un job distant (.tar + .info.json).""" from federation.client import FederationClient, sync_instance with app.app_context(): inst = db.session.get(RemoteInstance, inst_id) backup_dir = app.config["YUNOHOST_BACKUP_DIR"] try: client = FederationClient(inst) sync_instance(inst) db.session.refresh(inst) runs = client.get_job_runs(remote_job_id) if not runs: raise RuntimeError(f"Aucun run distant pour le job {remote_job_id}") archive_name = runs[0].get("archive_name") if not archive_name: raise RuntimeError("Le dernier run distant n'a pas d'archive.") archive_bytes = client.download_archive(archive_name) tmp_tar = f"/tmp/backupmanager_pull_{archive_name}.tar" with open(tmp_tar, "wb") as f: f.write(archive_bytes) subprocess.run(["sudo", "rsync", tmp_tar, os.path.join(backup_dir, archive_name + ".tar")], check=True) os.unlink(tmp_tar) info_bytes = client.download_info_json(archive_name) if info_bytes: tmp_info = f"/tmp/backupmanager_pull_{archive_name}.info.json" with open(tmp_info, "wb") as f: f.write(info_bytes) subprocess.run(["sudo", "rsync", tmp_info, os.path.join(backup_dir, archive_name + ".info.json")], check=True) os.unlink(tmp_info) else: app.logger.warning( f"Pull {archive_name}: .info.json absent ou inaccessible sur {inst.name}" ) app.logger.info(f"Pull {archive_name} ← {inst.name} OK") except Exception as exc: app.logger.error(f"Pull ← {inst.name} échoué : {exc}") # --- Helper save instance ----------------------------------------------------- def _save_remote_instance(inst): f = request.form name = f.get("name", "").strip() url = f.get("url", "").strip().rstrip("/") api_key = f.get("api_key", "").strip() if not name or not url or not api_key: flash("Nom, URL et token API sont requis.", "error") return render_template("remote_instance_form.html", inst=inst) if inst is None: inst = RemoteInstance() db.session.add(inst) inst.name = name inst.url = url inst.api_key = api_key db.session.commit() flash(f"Instance « {inst.name} » enregistrée.", "success") return redirect(url_for("network.remote_instances_list")) # --- DTO dashboard réseau ----------------------------------------------------- class _JobRow: def __init__(self, job_id, name, type, last_run_at, last_status, last_archive_name, last_size_bytes): self.job_id = job_id self.name = name self.type = type self.last_run_at = last_run_at self.last_status = last_status self.last_archive_name = last_archive_name self.last_size_bytes = last_size_bytes @property def size_human(self): return _size_human(self.last_size_bytes)