| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- import json
- import subprocess
- from datetime import datetime
- from flask import (
- Blueprint,
- Response,
- current_app,
- flash,
- jsonify,
- redirect,
- render_template,
- request,
- url_for,
- )
- from db import db, Setting
- bp = Blueprint("cfg", __name__)
- _SETTING_KEYS = [
- "smtp_host", "smtp_port", "smtp_user", "smtp_password",
- "smtp_from", "smtp_to", "smtp_tls", "smtp_ssl",
- "notify_on_success", "notify_on_error",
- ]
- def _get_setting(key, default=""):
- s = Setting.query.filter_by(key=key).first()
- return s.value if s else default
- @bp.route("/settings", methods=["GET", "POST"])
- def settings():
- if request.method == "POST":
- action = request.form.get("action")
- if action == "test_smtp":
- from notifications import send_test_email
- try:
- send_test_email(
- host=request.form.get("smtp_host", "").strip(),
- port=int(request.form.get("smtp_port", 587) or 587),
- user=request.form.get("smtp_user", "").strip(),
- password=request.form.get("smtp_password", ""),
- from_addr=request.form.get("smtp_from", "").strip(),
- to_addr=request.form.get("smtp_to", "").strip(),
- use_ssl=request.form.get("smtp_ssl") == "1",
- use_tls=request.form.get("smtp_tls") == "1",
- )
- flash("Email de test envoyé avec succès.", "success")
- except Exception as exc:
- flash(f"Échec du test SMTP : {exc}", "error")
- else:
- for key in _SETTING_KEYS:
- if key in ("smtp_tls", "smtp_ssl", "notify_on_success", "notify_on_error"):
- value = "1" if request.form.get(key) == "1" else "0"
- else:
- value = request.form.get(key, "").strip()
- s = Setting.query.filter_by(key=key).first()
- if s is None:
- s = Setting(key=key, value=value)
- db.session.add(s)
- else:
- s.value = value
- db.session.commit()
- flash("Paramètres enregistrés.", "success")
- return redirect(url_for("cfg.settings"))
- from db import Destination, RemoteInstance
- cfg = {k: _get_setting(k) for k in _SETTING_KEYS}
- cfg.setdefault("smtp_port", "587")
- cfg["smtp_tls"] = cfg.get("smtp_tls") or "1"
- cfg["smtp_ssl"] = cfg.get("smtp_ssl") or "0"
- cfg["notify_on_error"] = cfg.get("notify_on_error") or "1"
- api_token = current_app.config.get("API_TOKEN", "")
- instance_url = current_app.config.get("INSTANCE_URL", "")
- destinations = Destination.query.order_by(Destination.name).all()
- instances = RemoteInstance.query.order_by(RemoteInstance.name).all()
- return render_template("settings.html", cfg=cfg, api_token=api_token,
- instance_url=instance_url, destinations=destinations,
- instances=instances)
- @bp.route("/settings/export-config")
- def export_config():
- from db import Job, Destination, RemoteInstance
- jobs_data = []
- for j in Job.query.order_by(Job.name).all():
- dest_name = j.destination.name if j.destination_id and j.destination else None
- inst_name = j.remote_instance.name if j.remote_instance_id and j.remote_instance else None
- jobs_data.append({
- "name": j.name,
- "type": j.type,
- "config_json": j.config_json,
- "cron_expr": j.cron_expr,
- "retention_mode": j.retention_mode,
- "retention_value": j.retention_value,
- "retention_gfs_config": j.retention_gfs_config,
- "enabled": j.enabled,
- "core_only": j.core_only,
- "destination_name": dest_name,
- "remote_instance_name": inst_name,
- })
- dest_data = []
- for d in Destination.query.order_by(Destination.name).all():
- dest_data.append({
- "name": d.name,
- "host": d.host,
- "port": d.port,
- "user": d.user,
- "remote_path": d.remote_path,
- "key_name": d.key_name,
- "enabled": d.enabled,
- })
- inst_data = []
- for i in RemoteInstance.query.order_by(RemoteInstance.name).all():
- inst_data.append({
- "name": i.name,
- "url": i.url,
- "api_key": i.api_key,
- })
- settings_data = {k: _get_setting(k) for k in _SETTING_KEYS}
- payload = {
- "version": 1,
- "exported_at": datetime.utcnow().isoformat(),
- "instance_name": current_app.config.get("INSTANCE_NAME", ""),
- "jobs": jobs_data,
- "destinations": dest_data,
- "remote_instances": inst_data,
- "settings": settings_data,
- }
- filename = f"backupmanager_config_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
- return Response(
- json.dumps(payload, ensure_ascii=False, indent=2),
- mimetype="application/json",
- headers={"Content-Disposition": f'attachment; filename="{filename}"'},
- )
- @bp.route("/settings/import-config", methods=["POST"])
- def import_config():
- from db import Job, Destination, RemoteInstance
- from scheduler import schedule_job, remove_job
- f = request.files.get("config_file")
- if not f or not f.filename:
- flash("Aucun fichier sélectionné.", "error")
- return redirect(url_for("cfg.settings") + "?tab=config")
- try:
- payload = json.loads(f.read().decode("utf-8"))
- except Exception:
- flash("Fichier invalide — JSON attendu.", "error")
- return redirect(url_for("cfg.settings") + "?tab=config")
- if payload.get("version") != 1:
- flash("Format de fichier non reconnu (version != 1).", "error")
- return redirect(url_for("cfg.settings") + "?tab=config")
- counts = {"destinations": 0, "instances": 0, "jobs": 0, "settings": 0}
- # --- Destinations ---
- for d_data in payload.get("destinations", []):
- name = d_data.get("name", "").strip()
- if not name:
- continue
- dest = Destination.query.filter_by(name=name).first()
- if dest is None:
- dest = Destination()
- db.session.add(dest)
- dest.name = name
- dest.host = d_data.get("host", "")
- dest.port = int(d_data.get("port", 22))
- dest.user = d_data.get("user", "root")
- dest.remote_path = d_data.get("remote_path", "")
- dest.key_name = d_data.get("key_name") or None
- dest.enabled = bool(d_data.get("enabled", True))
- counts["destinations"] += 1
- db.session.flush()
- # --- Instances distantes ---
- for i_data in payload.get("remote_instances", []):
- name = i_data.get("name", "").strip()
- if not name:
- continue
- inst = RemoteInstance.query.filter_by(name=name).first()
- if inst is None:
- inst = RemoteInstance()
- db.session.add(inst)
- inst.name = name
- inst.url = i_data.get("url", "").rstrip("/")
- inst.api_key = i_data.get("api_key", "")
- counts["instances"] += 1
- db.session.flush()
- # --- Jobs ---
- dest_by_name = {d.name: d for d in Destination.query.all()}
- inst_by_name = {i.name: i for i in RemoteInstance.query.all()}
- for j_data in payload.get("jobs", []):
- name = j_data.get("name", "").strip()
- if not name:
- continue
- job = Job.query.filter_by(name=name).first()
- if job is None:
- job = Job()
- db.session.add(job)
- else:
- remove_job(job.id)
- job.name = name
- job.type = j_data.get("type", "ynh_app")
- job.config_json = j_data.get("config_json") or "{}"
- job.cron_expr = j_data.get("cron_expr") or ""
- job.retention_mode = j_data.get("retention_mode", "count")
- job.retention_value = int(j_data.get("retention_value", 2))
- job.retention_gfs_config = j_data.get("retention_gfs_config")
- job.enabled = bool(j_data.get("enabled", True))
- job.core_only = bool(j_data.get("core_only", False))
- job.updated_at = datetime.utcnow()
- dest_name = j_data.get("destination_name")
- inst_name = j_data.get("remote_instance_name")
- job.destination_id = dest_by_name[dest_name].id if dest_name and dest_name in dest_by_name else None
- job.remote_instance_id = inst_by_name[inst_name].id if inst_name and inst_name in inst_by_name else None
- counts["jobs"] += 1
- db.session.flush()
- # --- Paramètres SMTP ---
- for key, value in payload.get("settings", {}).items():
- if key not in _SETTING_KEYS:
- continue
- s = Setting.query.filter_by(key=key).first()
- if s is None:
- s = Setting(key=key, value=value)
- db.session.add(s)
- else:
- s.value = value
- counts["settings"] += 1
- db.session.commit()
- # Replanifier les jobs actifs
- for job in Job.query.filter_by(enabled=True).all():
- schedule_job(job)
- flash(
- f"Import réussi — {counts['jobs']} job(s), "
- f"{counts['destinations']} destination(s), "
- f"{counts['instances']} instance(s), "
- f"{counts['settings']} paramètre(s).",
- "success",
- )
- return redirect(url_for("cfg.settings") + "?tab=config")
- @bp.route("/internal/databases/<db_type>")
- def internal_databases(db_type):
- """Liste les bases de données disponibles pour le formulaire job."""
- databases = []
- try:
- if db_type == "mysql":
- result = subprocess.run(
- ["sudo", "mysql", "--skip-column-names", "-e", "SHOW DATABASES;"],
- capture_output=True, text=True, timeout=10,
- )
- if result.returncode == 0:
- exclude = {"information_schema", "performance_schema", "mysql", "sys"}
- databases = [d.strip() for d in result.stdout.splitlines()
- if d.strip() and d.strip() not in exclude]
- elif db_type == "postgresql":
- result = subprocess.run(
- ["sudo", "-u", "postgres", "psql", "-Atc",
- "SELECT datname FROM pg_database WHERE datistemplate = false;"],
- capture_output=True, text=True, timeout=10,
- )
- if result.returncode == 0:
- databases = [d.strip() for d in result.stdout.splitlines() if d.strip()]
- except Exception:
- pass
- return jsonify(databases)
|