import json import subprocess from datetime import datetime from flask import ( Blueprint, Response, current_app, flash, jsonify, redirect, render_template, request, url_for, ) from db import db, Setting bp = Blueprint("cfg", __name__) _SETTING_KEYS = [ "smtp_host", "smtp_port", "smtp_user", "smtp_password", "smtp_from", "smtp_to", "smtp_tls", "smtp_ssl", "notify_on_success", "notify_on_error", ] def _get_setting(key, default=""): s = Setting.query.filter_by(key=key).first() return s.value if s else default @bp.route("/settings", methods=["GET", "POST"]) def settings(): if request.method == "POST": action = request.form.get("action") if action == "test_smtp": from notifications import send_test_email try: send_test_email( host=request.form.get("smtp_host", "").strip(), port=int(request.form.get("smtp_port", 587) or 587), user=request.form.get("smtp_user", "").strip(), password=request.form.get("smtp_password", ""), from_addr=request.form.get("smtp_from", "").strip(), to_addr=request.form.get("smtp_to", "").strip(), use_ssl=request.form.get("smtp_ssl") == "1", use_tls=request.form.get("smtp_tls") == "1", ) flash("Email de test envoyé avec succès.", "success") except Exception as exc: flash(f"Échec du test SMTP : {exc}", "error") else: for key in _SETTING_KEYS: if key in ("smtp_tls", "smtp_ssl", "notify_on_success", "notify_on_error"): value = "1" if request.form.get(key) == "1" else "0" else: value = request.form.get(key, "").strip() s = Setting.query.filter_by(key=key).first() if s is None: s = Setting(key=key, value=value) db.session.add(s) else: s.value = value db.session.commit() flash("Paramètres enregistrés.", "success") return redirect(url_for("cfg.settings")) from db import Destination, RemoteInstance cfg = {k: _get_setting(k) for k in _SETTING_KEYS} cfg.setdefault("smtp_port", "587") cfg["smtp_tls"] = cfg.get("smtp_tls") or "1" cfg["smtp_ssl"] = cfg.get("smtp_ssl") or "0" cfg["notify_on_error"] = cfg.get("notify_on_error") or "1" api_token = current_app.config.get("API_TOKEN", "") instance_url = current_app.config.get("INSTANCE_URL", "") destinations = Destination.query.order_by(Destination.name).all() instances = RemoteInstance.query.order_by(RemoteInstance.name).all() return render_template("settings.html", cfg=cfg, api_token=api_token, instance_url=instance_url, destinations=destinations, instances=instances) @bp.route("/settings/export-config") def export_config(): from db import Job, Destination, RemoteInstance jobs_data = [] for j in Job.query.order_by(Job.name).all(): dest_names = [jd.label for jd in j.job_destinations if jd.dest_type == "ssh"] inst_names = [jd.label for jd in j.job_destinations if jd.dest_type == "instance"] jobs_data.append({ "name": j.name, "type": j.type, "config_json": j.config_json, "cron_expr": j.cron_expr, "retention_mode": j.retention_mode, "retention_value": j.retention_value, "retention_gfs_config": j.retention_gfs_config, "enabled": j.enabled, "core_only": j.core_only, "destination_names": dest_names, "remote_instance_names": inst_names, }) dest_data = [] for d in Destination.query.order_by(Destination.name).all(): dest_data.append({ "name": d.name, "host": d.host, "port": d.port, "user": d.user, "remote_path": d.remote_path, "key_name": d.key_name, "enabled": d.enabled, }) inst_data = [] for i in RemoteInstance.query.order_by(RemoteInstance.name).all(): inst_data.append({ "name": i.name, "url": i.url, "api_key": i.api_key, }) settings_data = {k: _get_setting(k) for k in _SETTING_KEYS} payload = { "version": 1, "exported_at": datetime.utcnow().isoformat(), "instance_name": current_app.config.get("INSTANCE_NAME", ""), "jobs": jobs_data, "destinations": dest_data, "remote_instances": inst_data, "settings": settings_data, } filename = f"backupmanager_config_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json" return Response( json.dumps(payload, ensure_ascii=False, indent=2), mimetype="application/json", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @bp.route("/settings/import-config", methods=["POST"]) def import_config(): from db import Job, Destination, RemoteInstance from scheduler import schedule_job, remove_job f = request.files.get("config_file") if not f or not f.filename: flash("Aucun fichier sélectionné.", "error") return redirect(url_for("cfg.settings") + "?tab=config") try: payload = json.loads(f.read().decode("utf-8")) except Exception: flash("Fichier invalide — JSON attendu.", "error") return redirect(url_for("cfg.settings") + "?tab=config") if payload.get("version") != 1: flash("Format de fichier non reconnu (version != 1).", "error") return redirect(url_for("cfg.settings") + "?tab=config") counts = {"destinations": 0, "instances": 0, "jobs": 0, "settings": 0} # --- Destinations --- for d_data in payload.get("destinations", []): name = d_data.get("name", "").strip() if not name: continue dest = Destination.query.filter_by(name=name).first() if dest is None: dest = Destination() db.session.add(dest) dest.name = name dest.host = d_data.get("host", "") dest.port = int(d_data.get("port", 22)) dest.user = d_data.get("user", "root") dest.remote_path = d_data.get("remote_path", "") dest.key_name = d_data.get("key_name") or None dest.enabled = bool(d_data.get("enabled", True)) counts["destinations"] += 1 db.session.flush() # --- Instances distantes --- for i_data in payload.get("remote_instances", []): name = i_data.get("name", "").strip() if not name: continue inst = RemoteInstance.query.filter_by(name=name).first() if inst is None: inst = RemoteInstance() db.session.add(inst) inst.name = name inst.url = i_data.get("url", "").rstrip("/") inst.api_key = i_data.get("api_key", "") counts["instances"] += 1 db.session.flush() # --- Jobs --- dest_by_name = {d.name: d for d in Destination.query.all()} inst_by_name = {i.name: i for i in RemoteInstance.query.all()} for j_data in payload.get("jobs", []): name = j_data.get("name", "").strip() if not name: continue job = Job.query.filter_by(name=name).first() if job is None: job = Job() db.session.add(job) else: remove_job(job.id) job.name = name job.type = j_data.get("type", "ynh_app") job.config_json = j_data.get("config_json") or "{}" job.cron_expr = j_data.get("cron_expr") or "" job.retention_mode = j_data.get("retention_mode", "count") job.retention_value = int(j_data.get("retention_value", 2)) job.retention_gfs_config = j_data.get("retention_gfs_config") job.enabled = bool(j_data.get("enabled", True)) job.core_only = bool(j_data.get("core_only", False)) job.updated_at = datetime.utcnow() # Compat ancien format (destination_name / remote_instance_name) et nouveau (listes) dest_names = j_data.get("destination_names") or ( [j_data["destination_name"]] if j_data.get("destination_name") else [] ) inst_names = j_data.get("remote_instance_names") or ( [j_data["remote_instance_name"]] if j_data.get("remote_instance_name") else [] ) from db import JobDestination new_jds = [] for dname in dest_names: if dname and dname in dest_by_name: new_jds.append(JobDestination(dest_type="ssh", dest_id=dest_by_name[dname].id)) for iname in inst_names: if iname and iname in inst_by_name: new_jds.append(JobDestination(dest_type="instance", dest_id=inst_by_name[iname].id)) job.job_destinations = new_jds counts["jobs"] += 1 db.session.flush() # --- Paramètres SMTP --- for key, value in payload.get("settings", {}).items(): if key not in _SETTING_KEYS: continue s = Setting.query.filter_by(key=key).first() if s is None: s = Setting(key=key, value=value) db.session.add(s) else: s.value = value counts["settings"] += 1 db.session.commit() # Replanifier les jobs actifs for job in Job.query.filter_by(enabled=True).all(): schedule_job(job) flash( f"Import réussi — {counts['jobs']} job(s), " f"{counts['destinations']} destination(s), " f"{counts['instances']} instance(s), " f"{counts['settings']} paramètre(s).", "success", ) return redirect(url_for("cfg.settings") + "?tab=config") @bp.route("/internal/databases/") def internal_databases(db_type): """Liste les bases de données disponibles pour le formulaire job.""" databases = [] try: if db_type == "mysql": result = subprocess.run( ["sudo", "mysql", "--skip-column-names", "-e", "SHOW DATABASES;"], capture_output=True, text=True, timeout=10, ) if result.returncode == 0: exclude = {"information_schema", "performance_schema", "mysql", "sys"} databases = [d.strip() for d in result.stdout.splitlines() if d.strip() and d.strip() not in exclude] elif db_type == "postgresql": result = subprocess.run( ["sudo", "-u", "postgres", "psql", "-Atc", "SELECT datname FROM pg_database WHERE datistemplate = false;"], capture_output=True, text=True, timeout=10, ) if result.returncode == 0: databases = [d.strip() for d in result.stdout.splitlines() if d.strip()] except Exception: pass return jsonify(databases)