| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- import math
- from datetime import datetime
- import requests
- class FederationClient:
- """Client HTTP vers une instance BackupManager distante."""
- def __init__(self, instance):
- self.base = instance.url.rstrip("/")
- self.headers = {"X-BackupManager-Key": instance.api_key}
- self.timeout = 15
- def _get(self, path):
- r = requests.get(f"{self.base}{path}", headers=self.headers,
- timeout=self.timeout, verify=True)
- r.raise_for_status()
- return r.json()
- def _post(self, path, json=None, data=None, extra_headers=None):
- h = dict(self.headers)
- if extra_headers:
- h.update(extra_headers)
- r = requests.post(f"{self.base}{path}", headers=h,
- json=json, data=data, timeout=self.timeout, verify=True)
- r.raise_for_status()
- return r.json()
- def health(self):
- return self._get("/api/v1/health")
- def summary(self):
- return self._get("/api/v1/summary")
- def get_jobs(self):
- return self._get("/api/v1/jobs")
- def get_job_runs(self, job_id):
- return self._get(f"/api/v1/jobs/{job_id}/runs")
- def get_archives(self):
- return self._get("/api/v1/archives")
- def run_job(self, job_id):
- return self._post(f"/api/v1/jobs/{job_id}/run")
- def restore_archive(self, archive_name):
- return self._post(f"/api/v1/archives/{archive_name}/restore")
- def restore_status(self, archive_name):
- return self._get(f"/api/v1/archives/{archive_name}/restore/status")
- def upload_start(self, filename, total_size, checksum, chunk_size=50 * 1024 * 1024):
- chunks_total = math.ceil(total_size / chunk_size) if chunk_size else 1
- return self._post("/api/v1/archives/upload/start", json={
- "filename": filename,
- "total_size": total_size,
- "chunk_size": chunk_size,
- "chunks_total": chunks_total,
- "checksum": checksum,
- })
- def upload_chunk(self, upload_id, n, data):
- r = requests.post(
- f"{self.base}/api/v1/archives/upload/{upload_id}/chunk/{n}",
- headers=self.headers,
- data=data,
- timeout=300,
- verify=True,
- )
- r.raise_for_status()
- return r.json()
- def upload_finish(self, upload_id):
- return self._post(f"/api/v1/archives/upload/{upload_id}/finish")
- def upload_finish_with_info(self, upload_id, info_json_str=None):
- payload = {}
- if info_json_str:
- payload["info_json"] = info_json_str
- r = requests.post(
- f"{self.base}/api/v1/archives/upload/{upload_id}/finish",
- headers=self.headers,
- json=payload if payload else None,
- timeout=self.timeout,
- verify=True,
- )
- r.raise_for_status()
- return r.json()
- def download_info_json(self, archive_name):
- """Télécharge le .info.json d'une archive distante. Retourne None si absent."""
- try:
- r = requests.get(
- f"{self.base}/api/v1/archives/{archive_name}/info-json-download",
- headers=self.headers,
- timeout=30,
- verify=True,
- )
- if r.status_code == 404:
- return None
- r.raise_for_status()
- return r.content
- except Exception:
- return None
- def download_archive(self, archive_name):
- """Télécharge une archive distante en mémoire (archives de taille raisonnable)."""
- r = requests.get(
- f"{self.base}/api/v1/archives/{archive_name}/download",
- headers=self.headers,
- timeout=3600,
- stream=True,
- verify=True,
- )
- r.raise_for_status()
- return r.content
- def upload_cancel(self, upload_id):
- r = requests.delete(
- f"{self.base}/api/v1/archives/upload/{upload_id}",
- headers=self.headers,
- timeout=self.timeout,
- verify=True,
- )
- r.raise_for_status()
- return r.json()
- def sync_instance(instance):
- """Synchronise l'état d'une instance distante dans remote_runs."""
- from db import db, RemoteRun
- client = FederationClient(instance)
- try:
- summary = client.summary()
- instance.last_seen = datetime.utcnow()
- instance.status = "online"
- RemoteRun.query.filter_by(instance_id=instance.id).delete()
- for job_data in summary.get("jobs", []):
- last_run = job_data.get("last_run")
- last_run_at = None
- if last_run and last_run.get("started_at"):
- try:
- last_run_at = datetime.fromisoformat(last_run["started_at"])
- except ValueError:
- pass
- rr = RemoteRun(
- instance_id=instance.id,
- job_id=job_data["id"],
- job_name=job_data["name"],
- job_type=job_data["type"],
- last_run_at=last_run_at,
- last_status=last_run["status"] if last_run else None,
- last_archive_name=last_run.get("archive_name") if last_run else None,
- last_size_bytes=last_run.get("size_bytes") if last_run else None,
- )
- db.session.add(rr)
- db.session.commit()
- except Exception as exc:
- instance.status = "error"
- instance.last_seen = datetime.utcnow()
- db.session.commit()
- raise exc
|