client.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import math
  2. from datetime import datetime
  3. import requests
  4. class FederationClient:
  5. """Client HTTP vers une instance BackupManager distante."""
  6. def __init__(self, instance):
  7. self.base = instance.url.rstrip("/")
  8. self.headers = {"X-BackupManager-Key": instance.api_key}
  9. self.timeout = 15
  10. def _get(self, path):
  11. r = requests.get(f"{self.base}{path}", headers=self.headers,
  12. timeout=self.timeout, verify=True)
  13. r.raise_for_status()
  14. return r.json()
  15. def _post(self, path, json=None, data=None, extra_headers=None):
  16. h = dict(self.headers)
  17. if extra_headers:
  18. h.update(extra_headers)
  19. r = requests.post(f"{self.base}{path}", headers=h,
  20. json=json, data=data, timeout=self.timeout, verify=True)
  21. r.raise_for_status()
  22. return r.json()
  23. def health(self):
  24. return self._get("/api/v1/health")
  25. def summary(self):
  26. return self._get("/api/v1/summary")
  27. def get_jobs(self):
  28. return self._get("/api/v1/jobs")
  29. def get_job_runs(self, job_id):
  30. return self._get(f"/api/v1/jobs/{job_id}/runs")
  31. def get_archives(self):
  32. return self._get("/api/v1/archives")
  33. def run_job(self, job_id):
  34. return self._post(f"/api/v1/jobs/{job_id}/run")
  35. def restore_archive(self, archive_name):
  36. return self._post(f"/api/v1/archives/{archive_name}/restore")
  37. def restore_status(self, archive_name):
  38. return self._get(f"/api/v1/archives/{archive_name}/restore/status")
  39. def upload_start(self, filename, total_size, checksum, chunk_size=50 * 1024 * 1024):
  40. chunks_total = math.ceil(total_size / chunk_size) if chunk_size else 1
  41. return self._post("/api/v1/archives/upload/start", json={
  42. "filename": filename,
  43. "total_size": total_size,
  44. "chunk_size": chunk_size,
  45. "chunks_total": chunks_total,
  46. "checksum": checksum,
  47. })
  48. def upload_chunk(self, upload_id, n, data):
  49. r = requests.post(
  50. f"{self.base}/api/v1/archives/upload/{upload_id}/chunk/{n}",
  51. headers=self.headers,
  52. data=data,
  53. timeout=300,
  54. verify=True,
  55. )
  56. r.raise_for_status()
  57. return r.json()
  58. def upload_finish(self, upload_id):
  59. return self._post(f"/api/v1/archives/upload/{upload_id}/finish")
  60. def upload_finish_with_info(self, upload_id, info_json_str=None):
  61. payload = {}
  62. if info_json_str:
  63. payload["info_json"] = info_json_str
  64. r = requests.post(
  65. f"{self.base}/api/v1/archives/upload/{upload_id}/finish",
  66. headers=self.headers,
  67. json=payload if payload else None,
  68. timeout=self.timeout,
  69. verify=True,
  70. )
  71. r.raise_for_status()
  72. return r.json()
  73. def download_archive(self, archive_name):
  74. """Télécharge une archive distante en mémoire (archives de taille raisonnable)."""
  75. r = requests.get(
  76. f"{self.base}/api/v1/archives/{archive_name}/download",
  77. headers=self.headers,
  78. timeout=3600,
  79. stream=True,
  80. verify=True,
  81. )
  82. r.raise_for_status()
  83. return r.content
  84. def upload_cancel(self, upload_id):
  85. r = requests.delete(
  86. f"{self.base}/api/v1/archives/upload/{upload_id}",
  87. headers=self.headers,
  88. timeout=self.timeout,
  89. verify=True,
  90. )
  91. r.raise_for_status()
  92. return r.json()
  93. def sync_instance(instance):
  94. """Synchronise l'état d'une instance distante dans remote_runs."""
  95. from db import db, RemoteRun
  96. client = FederationClient(instance)
  97. try:
  98. summary = client.summary()
  99. instance.last_seen = datetime.utcnow()
  100. instance.status = "online"
  101. RemoteRun.query.filter_by(instance_id=instance.id).delete()
  102. for job_data in summary.get("jobs", []):
  103. last_run = job_data.get("last_run")
  104. last_run_at = None
  105. if last_run and last_run.get("started_at"):
  106. try:
  107. last_run_at = datetime.fromisoformat(last_run["started_at"])
  108. except ValueError:
  109. pass
  110. rr = RemoteRun(
  111. instance_id=instance.id,
  112. job_id=job_data["id"],
  113. job_name=job_data["name"],
  114. job_type=job_data["type"],
  115. last_run_at=last_run_at,
  116. last_status=last_run["status"] if last_run else None,
  117. last_archive_name=last_run.get("archive_name") if last_run else None,
  118. last_size_bytes=last_run.get("size_bytes") if last_run else None,
  119. )
  120. db.session.add(rr)
  121. db.session.commit()
  122. except Exception as exc:
  123. instance.status = "error"
  124. instance.last_seen = datetime.utcnow()
  125. db.session.commit()
  126. raise exc