"""Tests d'intégration pour l'export et l'import de configuration.""" import io import json def _payload(**overrides): """Payload d'import minimal valide.""" base = { "version": 1, "jobs": [], "destinations": [], "remote_instances": [], "settings": {}, } base.update(overrides) return base def _job_data(**overrides): """Données d'un job valide pour l'import.""" base = { "name": "Mon job", "type": "ynh_system", "config_json": "{}", "cron_expr": "", "retention_mode": "count", "retention_value": 7, "retention_gfs_config": None, "enabled": True, "core_only": False, "destination_name": None, "remote_instance_name": None, } base.update(overrides) return base def _dest_data(**overrides): base = { "name": "VPS-OVH", "host": "vps.example.com", "port": 22, "user": "backup", "remote_path": "/backups", "key_name": None, "enabled": True, } base.update(overrides) return base def _do_import(client, payload): raw = json.dumps(payload).encode() return client.post( "/settings/import-config", data={"config_file": (io.BytesIO(raw), "config.json")}, content_type="multipart/form-data", ) # --------------------------------------------------------------------------- # Export # --------------------------------------------------------------------------- class TestExportConfig: def test_db_vide_retourne_json_valide(self, client): resp = client.get("/settings/export-config") assert resp.status_code == 200 assert "application/json" in resp.content_type data = resp.get_json() assert data["version"] == 1 assert data["instance_name"] == "test" assert data["jobs"] == [] assert data["destinations"] == [] assert data["remote_instances"] == [] def test_exporte_jobs(self, client, app): with app.app_context(): from db import db, Job job = Job( name="Sys backup", type="ynh_system", config_json="{}", cron_expr="0 3 * * *", retention_mode="count", retention_value=5, enabled=True, core_only=False, ) db.session.add(job) db.session.commit() resp = client.get("/settings/export-config") data = resp.get_json() assert len(data["jobs"]) == 1 j = data["jobs"][0] assert j["name"] == "Sys backup" assert j["type"] == "ynh_system" assert j["retention_value"] == 5 assert j["destination_name"] is None def test_exporte_job_avec_destination(self, client, app): with app.app_context(): from db import db, Job, Destination dest = Destination( name="VPS-OVH", host="vps.example.com", port=22, user="backup", remote_path="/backups", enabled=True, ) db.session.add(dest) db.session.flush() job = Job( name="Job avec dest", type="ynh_system", config_json="{}", cron_expr="", retention_mode="count", retention_value=7, enabled=True, core_only=False, destination_id=dest.id, ) db.session.add(job) db.session.commit() resp = client.get("/settings/export-config") data = resp.get_json() j = data["jobs"][0] assert j["destination_name"] == "VPS-OVH" def test_nom_fichier_dans_header(self, client): resp = client.get("/settings/export-config") cd = resp.headers.get("Content-Disposition", "") assert "backupmanager_config_" in cd assert ".json" in cd # --------------------------------------------------------------------------- # Import # --------------------------------------------------------------------------- class TestImportConfig: def test_cree_job(self, client, app): resp = _do_import(client, _payload(jobs=[_job_data(name="Nouveau job")])) assert resp.status_code == 302 with app.app_context(): from db import Job jobs = Job.query.all() assert len(jobs) == 1 assert jobs[0].name == "Nouveau job" def test_met_a_jour_job_existant(self, client, app): with app.app_context(): from db import db, Job job = Job( name="Mon job", type="ynh_system", config_json="{}", cron_expr="", retention_mode="count", retention_value=3, enabled=True, core_only=False, ) db.session.add(job) db.session.commit() resp = _do_import(client, _payload(jobs=[_job_data(retention_value=10)])) assert resp.status_code == 302 with app.app_context(): from db import Job jobs = Job.query.all() assert len(jobs) == 1 assert jobs[0].retention_value == 10 def test_cree_destination(self, client, app): resp = _do_import(client, _payload(destinations=[_dest_data()])) assert resp.status_code == 302 with app.app_context(): from db import Destination dests = Destination.query.all() assert len(dests) == 1 assert dests[0].name == "VPS-OVH" assert dests[0].host == "vps.example.com" def test_lie_job_a_destination(self, client, app): payload = _payload( jobs=[_job_data(destination_name="VPS-OVH")], destinations=[_dest_data()], ) _do_import(client, payload) with app.app_context(): from db import Job, Destination dest = Destination.query.filter_by(name="VPS-OVH").first() job = Job.query.filter_by(name="Mon job").first() assert job.destination_id == dest.id def test_cree_instance_distante(self, client, app): payload = _payload(remote_instances=[{ "name": "Tom", "url": "https://tom.example.com", "api_key": "secret123", }]) _do_import(client, payload) with app.app_context(): from db import RemoteInstance instances = RemoteInstance.query.all() assert len(instances) == 1 assert instances[0].name == "Tom" assert instances[0].url == "https://tom.example.com" def test_importe_retention_gfs(self, client, app): gfs = {"daily": 7, "weekly": 4, "monthly": 12} payload = _payload(jobs=[_job_data( retention_mode="gfs", retention_value=0, retention_gfs_config=json.dumps(gfs), )]) _do_import(client, payload) with app.app_context(): from db import Job job = Job.query.first() assert job.retention_mode == "gfs" cfg = json.loads(job.retention_gfs_config) assert cfg == gfs def test_json_invalide_redirige_avec_erreur(self, client): resp = client.post( "/settings/import-config", data={"config_file": (io.BytesIO(b"pas du json"), "config.json")}, content_type="multipart/form-data", ) assert resp.status_code == 302 def test_version_incorrecte_rejete(self, client, app): payload = _payload() payload["version"] = 99 _do_import(client, payload) with app.app_context(): from db import Job assert Job.query.count() == 0 def test_import_idempotent(self, client, app): payload = _payload(jobs=[_job_data()]) _do_import(client, payload) _do_import(client, payload) with app.app_context(): from db import Job assert Job.query.count() == 1 def test_sans_fichier_redirige(self, client): resp = client.post("/settings/import-config", data={}) assert resp.status_code == 302