test_config_io.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. """Tests d'intégration pour l'export et l'import de configuration."""
  2. import io
  3. import json
  4. def _payload(**overrides):
  5. """Payload d'import minimal valide."""
  6. base = {
  7. "version": 1,
  8. "jobs": [],
  9. "destinations": [],
  10. "remote_instances": [],
  11. "settings": {},
  12. }
  13. base.update(overrides)
  14. return base
  15. def _job_data(**overrides):
  16. """Données d'un job valide pour l'import."""
  17. base = {
  18. "name": "Mon job",
  19. "type": "ynh_system",
  20. "config_json": "{}",
  21. "cron_expr": "",
  22. "retention_mode": "count",
  23. "retention_value": 7,
  24. "retention_gfs_config": None,
  25. "enabled": True,
  26. "core_only": False,
  27. "destination_names": [],
  28. "remote_instance_names": [],
  29. }
  30. base.update(overrides)
  31. return base
  32. def _dest_data(**overrides):
  33. base = {
  34. "name": "VPS-OVH",
  35. "host": "vps.example.com",
  36. "port": 22,
  37. "user": "backup",
  38. "remote_path": "/backups",
  39. "key_name": None,
  40. "enabled": True,
  41. }
  42. base.update(overrides)
  43. return base
  44. def _do_import(client, payload):
  45. raw = json.dumps(payload).encode()
  46. return client.post(
  47. "/settings/import-config",
  48. data={"config_file": (io.BytesIO(raw), "config.json")},
  49. content_type="multipart/form-data",
  50. )
  51. # ---------------------------------------------------------------------------
  52. # Export
  53. # ---------------------------------------------------------------------------
  54. class TestExportConfig:
  55. def test_db_vide_retourne_json_valide(self, client):
  56. resp = client.get("/settings/export-config")
  57. assert resp.status_code == 200
  58. assert "application/json" in resp.content_type
  59. data = resp.get_json()
  60. assert data["version"] == 1
  61. assert data["instance_name"] == "test"
  62. assert data["jobs"] == []
  63. assert data["destinations"] == []
  64. assert data["remote_instances"] == []
  65. def test_exporte_jobs(self, client, app):
  66. with app.app_context():
  67. from db import db, Job
  68. job = Job(
  69. name="Sys backup", type="ynh_system", config_json="{}",
  70. cron_expr="0 3 * * *", retention_mode="count",
  71. retention_value=5, enabled=True, core_only=False,
  72. )
  73. db.session.add(job)
  74. db.session.commit()
  75. resp = client.get("/settings/export-config")
  76. data = resp.get_json()
  77. assert len(data["jobs"]) == 1
  78. j = data["jobs"][0]
  79. assert j["name"] == "Sys backup"
  80. assert j["type"] == "ynh_system"
  81. assert j["retention_value"] == 5
  82. assert j["destination_names"] == []
  83. def test_exporte_job_avec_destination(self, client, app):
  84. with app.app_context():
  85. from db import db, Job, Destination, JobDestination
  86. dest = Destination(
  87. name="VPS-OVH", host="vps.example.com", port=22,
  88. user="backup", remote_path="/backups", enabled=True,
  89. )
  90. db.session.add(dest)
  91. db.session.flush()
  92. job = Job(
  93. name="Job avec dest", type="ynh_system", config_json="{}",
  94. cron_expr="", retention_mode="count", retention_value=7,
  95. enabled=True, core_only=False,
  96. job_destinations=[JobDestination(dest_type="ssh", dest_id=dest.id)],
  97. )
  98. db.session.add(job)
  99. db.session.commit()
  100. resp = client.get("/settings/export-config")
  101. data = resp.get_json()
  102. j = data["jobs"][0]
  103. assert j["destination_names"] == ["VPS-OVH"]
  104. def test_nom_fichier_dans_header(self, client):
  105. resp = client.get("/settings/export-config")
  106. cd = resp.headers.get("Content-Disposition", "")
  107. assert "backupmanager_config_" in cd
  108. assert ".json" in cd
  109. # ---------------------------------------------------------------------------
  110. # Import
  111. # ---------------------------------------------------------------------------
  112. class TestImportConfig:
  113. def test_cree_job(self, client, app):
  114. resp = _do_import(client, _payload(jobs=[_job_data(name="Nouveau job")]))
  115. assert resp.status_code == 302
  116. with app.app_context():
  117. from db import Job
  118. jobs = Job.query.all()
  119. assert len(jobs) == 1
  120. assert jobs[0].name == "Nouveau job"
  121. def test_met_a_jour_job_existant(self, client, app):
  122. with app.app_context():
  123. from db import db, Job
  124. job = Job(
  125. name="Mon job", type="ynh_system", config_json="{}",
  126. cron_expr="", retention_mode="count", retention_value=3,
  127. enabled=True, core_only=False,
  128. )
  129. db.session.add(job)
  130. db.session.commit()
  131. resp = _do_import(client, _payload(jobs=[_job_data(retention_value=10)]))
  132. assert resp.status_code == 302
  133. with app.app_context():
  134. from db import Job
  135. jobs = Job.query.all()
  136. assert len(jobs) == 1
  137. assert jobs[0].retention_value == 10
  138. def test_cree_destination(self, client, app):
  139. resp = _do_import(client, _payload(destinations=[_dest_data()]))
  140. assert resp.status_code == 302
  141. with app.app_context():
  142. from db import Destination
  143. dests = Destination.query.all()
  144. assert len(dests) == 1
  145. assert dests[0].name == "VPS-OVH"
  146. assert dests[0].host == "vps.example.com"
  147. def test_lie_job_a_destination(self, client, app):
  148. payload = _payload(
  149. jobs=[_job_data(destination_names=["VPS-OVH"])],
  150. destinations=[_dest_data()],
  151. )
  152. _do_import(client, payload)
  153. with app.app_context():
  154. from db import Job, Destination
  155. dest = Destination.query.filter_by(name="VPS-OVH").first()
  156. job = Job.query.filter_by(name="Mon job").first()
  157. assert len(job.job_destinations) == 1
  158. assert job.job_destinations[0].dest_type == "ssh"
  159. assert job.job_destinations[0].dest_id == dest.id
  160. def test_cree_instance_distante(self, client, app):
  161. payload = _payload(remote_instances=[{
  162. "name": "Tom", "url": "https://tom.example.com", "api_key": "secret123",
  163. }])
  164. _do_import(client, payload)
  165. with app.app_context():
  166. from db import RemoteInstance
  167. instances = RemoteInstance.query.all()
  168. assert len(instances) == 1
  169. assert instances[0].name == "Tom"
  170. assert instances[0].url == "https://tom.example.com"
  171. def test_importe_retention_gfs(self, client, app):
  172. gfs = {"daily": 7, "weekly": 4, "monthly": 12}
  173. payload = _payload(jobs=[_job_data(
  174. retention_mode="gfs",
  175. retention_value=0,
  176. retention_gfs_config=json.dumps(gfs),
  177. )])
  178. _do_import(client, payload)
  179. with app.app_context():
  180. from db import Job
  181. job = Job.query.first()
  182. assert job.retention_mode == "gfs"
  183. cfg = json.loads(job.retention_gfs_config)
  184. assert cfg == gfs
  185. def test_json_invalide_redirige_avec_erreur(self, client):
  186. resp = client.post(
  187. "/settings/import-config",
  188. data={"config_file": (io.BytesIO(b"pas du json"), "config.json")},
  189. content_type="multipart/form-data",
  190. )
  191. assert resp.status_code == 302
  192. def test_version_incorrecte_rejete(self, client, app):
  193. payload = _payload()
  194. payload["version"] = 99
  195. _do_import(client, payload)
  196. with app.app_context():
  197. from db import Job
  198. assert Job.query.count() == 0
  199. def test_import_idempotent(self, client, app):
  200. payload = _payload(jobs=[_job_data()])
  201. _do_import(client, payload)
  202. _do_import(client, payload)
  203. with app.app_context():
  204. from db import Job
  205. assert Job.query.count() == 1
  206. def test_sans_fichier_redirige(self, client):
  207. resp = client.post("/settings/import-config", data={})
  208. assert resp.status_code == 302