| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- """Tests d'intégration pour l'export et l'import de configuration."""
- import io
- import json
- def _payload(**overrides):
- """Payload d'import minimal valide."""
- base = {
- "version": 1,
- "jobs": [],
- "destinations": [],
- "remote_instances": [],
- "settings": {},
- }
- base.update(overrides)
- return base
- def _job_data(**overrides):
- """Données d'un job valide pour l'import."""
- base = {
- "name": "Mon job",
- "type": "ynh_system",
- "config_json": "{}",
- "cron_expr": "",
- "retention_mode": "count",
- "retention_value": 7,
- "retention_gfs_config": None,
- "enabled": True,
- "core_only": False,
- "destination_names": [],
- "remote_instance_names": [],
- }
- base.update(overrides)
- return base
- def _dest_data(**overrides):
- base = {
- "name": "VPS-OVH",
- "host": "vps.example.com",
- "port": 22,
- "user": "backup",
- "remote_path": "/backups",
- "key_name": None,
- "enabled": True,
- }
- base.update(overrides)
- return base
- def _do_import(client, payload):
- raw = json.dumps(payload).encode()
- return client.post(
- "/settings/import-config",
- data={"config_file": (io.BytesIO(raw), "config.json")},
- content_type="multipart/form-data",
- )
- # ---------------------------------------------------------------------------
- # Export
- # ---------------------------------------------------------------------------
- class TestExportConfig:
- def test_db_vide_retourne_json_valide(self, client):
- resp = client.get("/settings/export-config")
- assert resp.status_code == 200
- assert "application/json" in resp.content_type
- data = resp.get_json()
- assert data["version"] == 1
- assert data["instance_name"] == "test"
- assert data["jobs"] == []
- assert data["destinations"] == []
- assert data["remote_instances"] == []
- def test_exporte_jobs(self, client, app):
- with app.app_context():
- from db import db, Job
- job = Job(
- name="Sys backup", type="ynh_system", config_json="{}",
- cron_expr="0 3 * * *", retention_mode="count",
- retention_value=5, enabled=True, core_only=False,
- )
- db.session.add(job)
- db.session.commit()
- resp = client.get("/settings/export-config")
- data = resp.get_json()
- assert len(data["jobs"]) == 1
- j = data["jobs"][0]
- assert j["name"] == "Sys backup"
- assert j["type"] == "ynh_system"
- assert j["retention_value"] == 5
- assert j["destination_names"] == []
- def test_exporte_job_avec_destination(self, client, app):
- with app.app_context():
- from db import db, Job, Destination, JobDestination
- dest = Destination(
- name="VPS-OVH", host="vps.example.com", port=22,
- user="backup", remote_path="/backups", enabled=True,
- )
- db.session.add(dest)
- db.session.flush()
- job = Job(
- name="Job avec dest", type="ynh_system", config_json="{}",
- cron_expr="", retention_mode="count", retention_value=7,
- enabled=True, core_only=False,
- job_destinations=[JobDestination(dest_type="ssh", dest_id=dest.id)],
- )
- db.session.add(job)
- db.session.commit()
- resp = client.get("/settings/export-config")
- data = resp.get_json()
- j = data["jobs"][0]
- assert j["destination_names"] == ["VPS-OVH"]
- def test_nom_fichier_dans_header(self, client):
- resp = client.get("/settings/export-config")
- cd = resp.headers.get("Content-Disposition", "")
- assert "backupmanager_config_" in cd
- assert ".json" in cd
- # ---------------------------------------------------------------------------
- # Import
- # ---------------------------------------------------------------------------
- class TestImportConfig:
- def test_cree_job(self, client, app):
- resp = _do_import(client, _payload(jobs=[_job_data(name="Nouveau job")]))
- assert resp.status_code == 302
- with app.app_context():
- from db import Job
- jobs = Job.query.all()
- assert len(jobs) == 1
- assert jobs[0].name == "Nouveau job"
- def test_met_a_jour_job_existant(self, client, app):
- with app.app_context():
- from db import db, Job
- job = Job(
- name="Mon job", type="ynh_system", config_json="{}",
- cron_expr="", retention_mode="count", retention_value=3,
- enabled=True, core_only=False,
- )
- db.session.add(job)
- db.session.commit()
- resp = _do_import(client, _payload(jobs=[_job_data(retention_value=10)]))
- assert resp.status_code == 302
- with app.app_context():
- from db import Job
- jobs = Job.query.all()
- assert len(jobs) == 1
- assert jobs[0].retention_value == 10
- def test_cree_destination(self, client, app):
- resp = _do_import(client, _payload(destinations=[_dest_data()]))
- assert resp.status_code == 302
- with app.app_context():
- from db import Destination
- dests = Destination.query.all()
- assert len(dests) == 1
- assert dests[0].name == "VPS-OVH"
- assert dests[0].host == "vps.example.com"
- def test_lie_job_a_destination(self, client, app):
- payload = _payload(
- jobs=[_job_data(destination_names=["VPS-OVH"])],
- destinations=[_dest_data()],
- )
- _do_import(client, payload)
- with app.app_context():
- from db import Job, Destination
- dest = Destination.query.filter_by(name="VPS-OVH").first()
- job = Job.query.filter_by(name="Mon job").first()
- assert len(job.job_destinations) == 1
- assert job.job_destinations[0].dest_type == "ssh"
- assert job.job_destinations[0].dest_id == dest.id
- def test_cree_instance_distante(self, client, app):
- payload = _payload(remote_instances=[{
- "name": "Tom", "url": "https://tom.example.com", "api_key": "secret123",
- }])
- _do_import(client, payload)
- with app.app_context():
- from db import RemoteInstance
- instances = RemoteInstance.query.all()
- assert len(instances) == 1
- assert instances[0].name == "Tom"
- assert instances[0].url == "https://tom.example.com"
- def test_importe_retention_gfs(self, client, app):
- gfs = {"daily": 7, "weekly": 4, "monthly": 12}
- payload = _payload(jobs=[_job_data(
- retention_mode="gfs",
- retention_value=0,
- retention_gfs_config=json.dumps(gfs),
- )])
- _do_import(client, payload)
- with app.app_context():
- from db import Job
- job = Job.query.first()
- assert job.retention_mode == "gfs"
- cfg = json.loads(job.retention_gfs_config)
- assert cfg == gfs
- def test_json_invalide_redirige_avec_erreur(self, client):
- resp = client.post(
- "/settings/import-config",
- data={"config_file": (io.BytesIO(b"pas du json"), "config.json")},
- content_type="multipart/form-data",
- )
- assert resp.status_code == 302
- def test_version_incorrecte_rejete(self, client, app):
- payload = _payload()
- payload["version"] = 99
- _do_import(client, payload)
- with app.app_context():
- from db import Job
- assert Job.query.count() == 0
- def test_import_idempotent(self, client, app):
- payload = _payload(jobs=[_job_data()])
- _do_import(client, payload)
- _do_import(client, payload)
- with app.app_context():
- from db import Job
- assert Job.query.count() == 1
- def test_sans_fichier_redirige(self, client):
- resp = client.post("/settings/import-config", data={})
- assert resp.status_code == 302
- def test_compat_ancien_format_destination_name(self, client, app):
- """Ancien format destination_name (singulier) doit être reconnu à l'import."""
- payload = _payload(
- jobs=[_job_data(destination_name="VPS-OVH")],
- destinations=[_dest_data()],
- )
- # Supprimer les clés du nouveau format pour simuler un vieux fichier
- payload["jobs"][0].pop("destination_names", None)
- payload["jobs"][0].pop("remote_instance_names", None)
- _do_import(client, payload)
- with app.app_context():
- from db import Job, Destination
- dest = Destination.query.filter_by(name="VPS-OVH").first()
- job = Job.query.filter_by(name="Mon job").first()
- assert len(job.job_destinations) == 1
- assert job.job_destinations[0].dest_type == "ssh"
- assert job.job_destinations[0].dest_id == dest.id
- def test_compat_ancien_format_remote_instance_name(self, client, app):
- """Ancien format remote_instance_name (singulier) doit être reconnu à l'import."""
- payload = _payload(
- jobs=[_job_data(remote_instance_name="Tom")],
- remote_instances=[{"name": "Tom", "url": "https://tom.example.com", "api_key": "k"}],
- )
- payload["jobs"][0].pop("destination_names", None)
- payload["jobs"][0].pop("remote_instance_names", None)
- _do_import(client, payload)
- with app.app_context():
- from db import Job, RemoteInstance
- inst = RemoteInstance.query.filter_by(name="Tom").first()
- job = Job.query.filter_by(name="Mon job").first()
- assert len(job.job_destinations) == 1
- assert job.job_destinations[0].dest_type == "instance"
- assert job.job_destinations[0].dest_id == inst.id
|