소스 검색

test: suite pytest — rétention (count/daily/GFS), utils, export/import config

35 tests unitaires et d'intégration (Flask test client SQLite in-memory).
Lancer avec : cd sources && python -m pytest tests/ -v

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Cedric Hansen 1 개월 전
부모
커밋
bc50504283
6개의 변경된 파일534개의 추가작업 그리고 0개의 파일을 삭제
  1. 1 0
      sources/requirements-dev.txt
  2. 0 0
      sources/tests/__init__.py
  3. 74 0
      sources/tests/conftest.py
  4. 242 0
      sources/tests/test_config_io.py
  5. 170 0
      sources/tests/test_retention.py
  6. 47 0
      sources/tests/test_utils.py

+ 1 - 0
sources/requirements-dev.txt

@@ -0,0 +1 @@
+pytest>=8.0

+ 0 - 0
sources/tests/__init__.py


+ 74 - 0
sources/tests/conftest.py

@@ -0,0 +1,74 @@
+import json
+import os
+import sys
+import pytest
+
+# sources/ doit être dans le path pour les imports
+sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
+
+
+@pytest.fixture(scope="session")
+def app():
+    from flask import Flask
+    from db import db as _db
+
+    flask_app = Flask(
+        __name__,
+        template_folder=os.path.join(os.path.dirname(os.path.dirname(__file__)), "templates"),
+    )
+    flask_app.config.update({
+        "TESTING": True,
+        "SQLALCHEMY_DATABASE_URI": "sqlite:///:memory:",
+        "SQLALCHEMY_TRACK_MODIFICATIONS": False,
+        "SECRET_KEY": "test-secret",
+        "INSTANCE_NAME": "test",
+        "YUNOHOST_BACKUP_DIR": "/tmp/bm_test_backups",
+        "DATA_DIR": "/tmp/bm_test_data",
+        "API_TOKEN": "test-token",
+        "INSTANCE_URL": "http://localhost",
+        "LOG_PATH": "/tmp/bm_test.log",
+    })
+    flask_app.jinja_env.filters["fromjson"] = json.loads
+    _db.init_app(flask_app)
+
+    from blueprints.settings import bp as bp_cfg
+    from blueprints.jobs import bp as bp_jobs
+    from blueprints.destinations import bp as bp_dest
+    from blueprints.network import bp as bp_net
+    from blueprints.api import bp as bp_api
+    flask_app.register_blueprint(bp_cfg)
+    flask_app.register_blueprint(bp_jobs)
+    flask_app.register_blueprint(bp_dest)
+    flask_app.register_blueprint(bp_net)
+    flask_app.register_blueprint(bp_api)
+
+    @flask_app.context_processor
+    def _inject():
+        from datetime import datetime as _dt
+        return {
+            "instance_name": flask_app.config.get("INSTANCE_NAME", ""),
+            "instance_url": flask_app.config.get("INSTANCE_URL", ""),
+            "api_token": flask_app.config.get("API_TOKEN", ""),
+            "now": _dt.utcnow(),
+        }
+
+    with flask_app.app_context():
+        _db.create_all()
+
+    return flask_app
+
+
+@pytest.fixture
+def client(app):
+    return app.test_client()
+
+
+@pytest.fixture(autouse=True)
+def clean_db(app):
+    with app.app_context():
+        yield
+        from db import db as _db
+        _db.session.remove()
+        for table in reversed(_db.metadata.sorted_tables):
+            _db.session.execute(table.delete())
+        _db.session.commit()

+ 242 - 0
sources/tests/test_config_io.py

@@ -0,0 +1,242 @@
+"""Tests d'intégration pour l'export et l'import de configuration."""
+import io
+import json
+
+
+def _payload(**overrides):
+    """Payload d'import minimal valide."""
+    base = {
+        "version": 1,
+        "jobs": [],
+        "destinations": [],
+        "remote_instances": [],
+        "settings": {},
+    }
+    base.update(overrides)
+    return base
+
+
+def _job_data(**overrides):
+    """Données d'un job valide pour l'import."""
+    base = {
+        "name": "Mon job",
+        "type": "ynh_system",
+        "config_json": "{}",
+        "cron_expr": "",
+        "retention_mode": "count",
+        "retention_value": 7,
+        "retention_gfs_config": None,
+        "enabled": True,
+        "core_only": False,
+        "destination_name": None,
+        "remote_instance_name": None,
+    }
+    base.update(overrides)
+    return base
+
+
+def _dest_data(**overrides):
+    base = {
+        "name": "VPS-OVH",
+        "host": "vps.example.com",
+        "port": 22,
+        "user": "backup",
+        "remote_path": "/backups",
+        "key_name": None,
+        "enabled": True,
+    }
+    base.update(overrides)
+    return base
+
+
+def _do_import(client, payload):
+    raw = json.dumps(payload).encode()
+    return client.post(
+        "/settings/import-config",
+        data={"config_file": (io.BytesIO(raw), "config.json")},
+        content_type="multipart/form-data",
+    )
+
+
+# ---------------------------------------------------------------------------
+# Export
+# ---------------------------------------------------------------------------
+
+class TestExportConfig:
+    def test_db_vide_retourne_json_valide(self, client):
+        resp = client.get("/settings/export-config")
+        assert resp.status_code == 200
+        assert "application/json" in resp.content_type
+        data = resp.get_json()
+        assert data["version"] == 1
+        assert data["instance_name"] == "test"
+        assert data["jobs"] == []
+        assert data["destinations"] == []
+        assert data["remote_instances"] == []
+
+    def test_exporte_jobs(self, client, app):
+        with app.app_context():
+            from db import db, Job
+            job = Job(
+                name="Sys backup", type="ynh_system", config_json="{}",
+                cron_expr="0 3 * * *", retention_mode="count",
+                retention_value=5, enabled=True, core_only=False,
+            )
+            db.session.add(job)
+            db.session.commit()
+
+        resp = client.get("/settings/export-config")
+        data = resp.get_json()
+        assert len(data["jobs"]) == 1
+        j = data["jobs"][0]
+        assert j["name"] == "Sys backup"
+        assert j["type"] == "ynh_system"
+        assert j["retention_value"] == 5
+        assert j["destination_name"] is None
+
+    def test_exporte_job_avec_destination(self, client, app):
+        with app.app_context():
+            from db import db, Job, Destination
+            dest = Destination(
+                name="VPS-OVH", host="vps.example.com", port=22,
+                user="backup", remote_path="/backups", enabled=True,
+            )
+            db.session.add(dest)
+            db.session.flush()
+            job = Job(
+                name="Job avec dest", type="ynh_system", config_json="{}",
+                cron_expr="", retention_mode="count", retention_value=7,
+                enabled=True, core_only=False, destination_id=dest.id,
+            )
+            db.session.add(job)
+            db.session.commit()
+
+        resp = client.get("/settings/export-config")
+        data = resp.get_json()
+        j = data["jobs"][0]
+        assert j["destination_name"] == "VPS-OVH"
+
+    def test_nom_fichier_dans_header(self, client):
+        resp = client.get("/settings/export-config")
+        cd = resp.headers.get("Content-Disposition", "")
+        assert "backupmanager_config_" in cd
+        assert ".json" in cd
+
+
+# ---------------------------------------------------------------------------
+# Import
+# ---------------------------------------------------------------------------
+
+class TestImportConfig:
+    def test_cree_job(self, client, app):
+        resp = _do_import(client, _payload(jobs=[_job_data(name="Nouveau job")]))
+        assert resp.status_code == 302
+
+        with app.app_context():
+            from db import Job
+            jobs = Job.query.all()
+            assert len(jobs) == 1
+            assert jobs[0].name == "Nouveau job"
+
+    def test_met_a_jour_job_existant(self, client, app):
+        with app.app_context():
+            from db import db, Job
+            job = Job(
+                name="Mon job", type="ynh_system", config_json="{}",
+                cron_expr="", retention_mode="count", retention_value=3,
+                enabled=True, core_only=False,
+            )
+            db.session.add(job)
+            db.session.commit()
+
+        resp = _do_import(client, _payload(jobs=[_job_data(retention_value=10)]))
+        assert resp.status_code == 302
+
+        with app.app_context():
+            from db import Job
+            jobs = Job.query.all()
+            assert len(jobs) == 1
+            assert jobs[0].retention_value == 10
+
+    def test_cree_destination(self, client, app):
+        resp = _do_import(client, _payload(destinations=[_dest_data()]))
+        assert resp.status_code == 302
+
+        with app.app_context():
+            from db import Destination
+            dests = Destination.query.all()
+            assert len(dests) == 1
+            assert dests[0].name == "VPS-OVH"
+            assert dests[0].host == "vps.example.com"
+
+    def test_lie_job_a_destination(self, client, app):
+        payload = _payload(
+            jobs=[_job_data(destination_name="VPS-OVH")],
+            destinations=[_dest_data()],
+        )
+        _do_import(client, payload)
+
+        with app.app_context():
+            from db import Job, Destination
+            dest = Destination.query.filter_by(name="VPS-OVH").first()
+            job = Job.query.filter_by(name="Mon job").first()
+            assert job.destination_id == dest.id
+
+    def test_cree_instance_distante(self, client, app):
+        payload = _payload(remote_instances=[{
+            "name": "Tom", "url": "https://tom.example.com", "api_key": "secret123",
+        }])
+        _do_import(client, payload)
+
+        with app.app_context():
+            from db import RemoteInstance
+            instances = RemoteInstance.query.all()
+            assert len(instances) == 1
+            assert instances[0].name == "Tom"
+            assert instances[0].url == "https://tom.example.com"
+
+    def test_importe_retention_gfs(self, client, app):
+        gfs = {"daily": 7, "weekly": 4, "monthly": 12}
+        payload = _payload(jobs=[_job_data(
+            retention_mode="gfs",
+            retention_value=0,
+            retention_gfs_config=json.dumps(gfs),
+        )])
+        _do_import(client, payload)
+
+        with app.app_context():
+            from db import Job
+            job = Job.query.first()
+            assert job.retention_mode == "gfs"
+            cfg = json.loads(job.retention_gfs_config)
+            assert cfg == gfs
+
+    def test_json_invalide_redirige_avec_erreur(self, client):
+        resp = client.post(
+            "/settings/import-config",
+            data={"config_file": (io.BytesIO(b"pas du json"), "config.json")},
+            content_type="multipart/form-data",
+        )
+        assert resp.status_code == 302
+
+    def test_version_incorrecte_rejete(self, client, app):
+        payload = _payload()
+        payload["version"] = 99
+        _do_import(client, payload)
+
+        with app.app_context():
+            from db import Job
+            assert Job.query.count() == 0
+
+    def test_import_idempotent(self, client, app):
+        payload = _payload(jobs=[_job_data()])
+        _do_import(client, payload)
+        _do_import(client, payload)
+
+        with app.app_context():
+            from db import Job
+            assert Job.query.count() == 1
+
+    def test_sans_fichier_redirige(self, client):
+        resp = client.post("/settings/import-config", data={})
+        assert resp.status_code == 302

+ 170 - 0
sources/tests/test_retention.py

@@ -0,0 +1,170 @@
+"""Tests unitaires pour les algorithmes de rétention (fonctions pures, sans Flask)."""
+from datetime import datetime, timedelta
+
+
+def days_ago(n):
+    return datetime.utcnow() - timedelta(days=n)
+
+
+# ---------------------------------------------------------------------------
+# _retention_count
+# ---------------------------------------------------------------------------
+
+class TestRetentionCount:
+    def test_supprime_les_plus_anciens(self):
+        from retention import _retention_count
+        archives = [
+            "a_20260101.tar", "a_20260102.tar", "a_20260103.tar",
+            "a_20260104.tar", "a_20260105.tar",
+        ]
+        assert _retention_count(archives, 3) == ["a_20260101.tar", "a_20260102.tar"]
+
+    def test_moins_que_n_garde_tout(self):
+        from retention import _retention_count
+        archives = ["a_20260101.tar", "a_20260102.tar"]
+        assert _retention_count(archives, 5) == []
+
+    def test_exactement_n_garde_tout(self):
+        from retention import _retention_count
+        archives = ["a_20260101.tar", "a_20260102.tar", "a_20260103.tar"]
+        assert _retention_count(archives, 3) == []
+
+    def test_liste_vide(self):
+        from retention import _retention_count
+        assert _retention_count([], 5) == []
+
+    def test_keep_1_supprime_tous_sauf_dernier(self):
+        from retention import _retention_count
+        archives = ["a_20260101.tar", "a_20260102.tar", "a_20260103.tar"]
+        assert _retention_count(archives, 1) == ["a_20260101.tar", "a_20260102.tar"]
+
+
+# ---------------------------------------------------------------------------
+# _retention_daily
+# ---------------------------------------------------------------------------
+
+class TestRetentionDaily:
+    def test_supprime_archives_trop_anciennes(self):
+        from retention import _retention_daily
+        archives = [
+            f"a_{days_ago(35).strftime('%Y%m%d')}.tar",
+            f"a_{days_ago(31).strftime('%Y%m%d')}.tar",
+            f"a_{days_ago(10).strftime('%Y%m%d')}.tar",
+            f"a_{days_ago(1).strftime('%Y%m%d')}.tar",
+        ]
+        result = _retention_daily(archives, 30)
+        assert archives[0] in result
+        assert archives[1] in result
+        assert archives[2] not in result
+        assert archives[3] not in result
+
+    def test_deduplique_meme_jour(self):
+        from retention import _retention_daily
+        today = days_ago(0).strftime("%Y%m%d")
+        a1 = f"a_{today}.tar"
+        a2 = f"a_{today}_2.tar"
+        archives = sorted([a1, a2])
+        result = _retention_daily(archives, 30)
+        assert len(result) == 1
+
+    def test_garde_une_par_jour_dans_fenetre(self):
+        from retention import _retention_daily
+        archives = [f"a_{days_ago(i).strftime('%Y%m%d')}.tar" for i in range(9, -1, -1)]
+        assert _retention_daily(archives, 30) == []
+
+    def test_liste_vide(self):
+        from retention import _retention_daily
+        assert _retention_daily([], 30) == []
+
+    def test_toutes_hors_fenetre(self):
+        from retention import _retention_daily
+        archives = [
+            f"a_{days_ago(60).strftime('%Y%m%d')}.tar",
+            f"a_{days_ago(45).strftime('%Y%m%d')}.tar",
+        ]
+        result = _retention_daily(archives, 30)
+        assert set(result) == set(archives)
+
+
+# ---------------------------------------------------------------------------
+# _retention_gfs
+# ---------------------------------------------------------------------------
+
+class TestRetentionGFS:
+    def test_liste_vide(self):
+        from retention import _retention_gfs
+        assert _retention_gfs([], {"daily": 7, "weekly": 4, "monthly": 12}) == []
+
+    def test_tous_dans_fenetre_daily_gardes(self):
+        from retention import _retention_gfs
+        archives = [f"a_2026050{i}.tar" for i in range(1, 8)]  # 7 archives
+        assert _retention_gfs(archives, {"daily": 7, "weekly": 2, "monthly": 2}) == []
+
+    def test_daily_supprime_au_dela_de_n(self):
+        from retention import _retention_gfs
+        # 15 archives consécutives (mai 01-15), daily=7
+        # Le mensuel et hebdo tombent dans la fenêtre daily pour cet exemple
+        archives = [f"a_202605{i:02d}.tar" for i in range(1, 16)]
+        to_delete = _retention_gfs(archives, {"daily": 7, "weekly": 1, "monthly": 1})
+        # Les 7 plus récentes (mai 9-15) sont gardées par daily
+        for i in range(9, 16):
+            assert f"a_202605{i:02d}.tar" not in to_delete
+
+    def test_monthly_sauve_archive_ancienne(self):
+        from retention import _retention_gfs
+        # Archives sur 5 mois distincts
+        archives = [
+            "a_20260115.tar",  # janvier (plus vieux)
+            "a_20260215.tar",  # février
+            "a_20260315.tar",  # mars
+            "a_20260415.tar",  # avril
+            "a_20260510.tar",  # mai
+            "a_20260511.tar",  # mai (plus récent)
+        ]
+        # daily=2 garde mai11+mai10, weekly=1 garde semaine de mai11,
+        # monthly=3 garde mai→mai11, avr→avr15, mars→mars15
+        # Supprimés : jan15, fév15
+        to_delete = _retention_gfs(archives, {"daily": 2, "weekly": 1, "monthly": 3})
+        assert "a_20260115.tar" in to_delete
+        assert "a_20260215.tar" in to_delete
+        assert "a_20260315.tar" not in to_delete
+        assert "a_20260415.tar" not in to_delete
+        assert "a_20260510.tar" not in to_delete
+        assert "a_20260511.tar" not in to_delete
+        assert len(to_delete) == 2
+
+    def test_weekly_un_seul_representant_par_semaine(self):
+        from retention import _retention_gfs
+        # Semaine 19 (mai 04-10) + semaine 20 (mai 11) — vérifiées ISO
+        # 2026-05-04 est un lundi (début de semaine 19)
+        # 2026-05-11 est un lundi (début de semaine 20)
+        archives = [
+            "a_20260504.tar", "a_20260505.tar", "a_20260506.tar",
+            "a_20260507.tar", "a_20260508.tar", "a_20260509.tar",
+            "a_20260510.tar", "a_20260511.tar",
+        ]
+        # daily=3 : mai11, mai10, mai09
+        # weekly=2 : sem20→mai11, sem19→mai10 (déjà en daily)
+        # monthly=1 : mai→mai11
+        # Keepers : {mai09, mai10, mai11}
+        # Supprimés : {mai04, mai05, mai06, mai07, mai08}
+        to_delete = _retention_gfs(archives, {"daily": 3, "weekly": 2, "monthly": 1})
+        assert "a_20260511.tar" not in to_delete
+        assert "a_20260510.tar" not in to_delete
+        assert "a_20260509.tar" not in to_delete
+        assert "a_20260504.tar" in to_delete
+        assert "a_20260508.tar" in to_delete
+        assert len(to_delete) == 5
+
+    def test_une_archive_peut_couvrir_plusieurs_niveaux(self):
+        from retention import _retention_gfs
+        # Une seule archive : elle doit être conservée même avec daily=1, weekly=1, monthly=1
+        archives = ["a_20260511.tar"]
+        assert _retention_gfs(archives, {"daily": 1, "weekly": 1, "monthly": 1}) == []
+
+    def test_defaults_utilises_si_config_vide(self):
+        from retention import _retention_gfs
+        # Config vide → defaults daily=7, weekly=4, monthly=12
+        # Avec 3 archives, tout doit être gardé
+        archives = ["a_20260509.tar", "a_20260510.tar", "a_20260511.tar"]
+        assert _retention_gfs(archives, {}) == []

+ 47 - 0
sources/tests/test_utils.py

@@ -0,0 +1,47 @@
+"""Tests unitaires pour jobs/utils.py (fonctions sans sudo)."""
+from unittest.mock import patch
+
+
+class TestUniqueArchiveName:
+    def test_sans_collision(self):
+        from jobs.utils import unique_archive_name
+        with patch("jobs.utils.sudo_exists", return_value=False):
+            result = unique_archive_name("instance_app_20260511", "/backups")
+        assert result == "instance_app_20260511"
+
+    def test_une_collision(self):
+        from jobs.utils import unique_archive_name
+        calls = {"n": 0}
+
+        def exists(path):
+            calls["n"] += 1
+            return calls["n"] == 1  # première vérification → collision, ensuite libre
+
+        with patch("jobs.utils.sudo_exists", side_effect=exists):
+            result = unique_archive_name("instance_app_20260511", "/backups")
+        assert result == "instance_app_20260511_2"
+
+    def test_trois_collisions(self):
+        from jobs.utils import unique_archive_name
+        calls = {"n": 0}
+
+        def exists(path):
+            calls["n"] += 1
+            return calls["n"] <= 3  # base, _2, _3 occupés ; _4 libre
+
+        with patch("jobs.utils.sudo_exists", side_effect=exists):
+            result = unique_archive_name("instance_app_20260511", "/backups")
+        assert result == "instance_app_20260511_4"
+
+    def test_chemin_correct_transmis(self):
+        from jobs.utils import unique_archive_name
+        seen_paths = []
+
+        def exists(path):
+            seen_paths.append(path)
+            return False
+
+        with patch("jobs.utils.sudo_exists", side_effect=exists):
+            unique_archive_name("myarchive", "/var/backups")
+
+        assert seen_paths == ["/var/backups/myarchive.tar"]