| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- import smtplib
- import ssl
- from email.mime.text import MIMEText
- from flask import current_app
- def _get(key, default=""):
- from db import Setting
- s = Setting.query.filter_by(key=key).first()
- return s.value if s else default
- def send_job_notification(run, job):
- """Envoie une notification email après un job. Silencieux si non configuré."""
- if run.status == "success" and _get("notify_on_success", "0") != "1":
- return
- if run.status == "error" and _get("notify_on_error", "1") != "1":
- return
- smtp_host = _get("smtp_host")
- smtp_to = _get("smtp_to")
- if not smtp_host or not smtp_to:
- return
- instance = current_app.config.get("INSTANCE_NAME", "backupmanager")
- if run.status == "success":
- subject = f"[{instance}] ✓ {run.archive_name or job.name} — sauvegarde réussie"
- d = run.duration_seconds or 0
- duration = f"{d // 60}min {d % 60}s" if d >= 60 else f"{d}s"
- body = (
- f"Sauvegarde réussie\n\n"
- f"Job : {job.name}\n"
- f"Type : {job.type}\n"
- f"Archive : {run.archive_name}\n"
- f"Taille : {run.size_human}\n"
- f"Durée : {duration}\n"
- f"Instance : {instance}\n"
- )
- else:
- subject = f"[{instance}] ✗ {job.name} — ERREUR de sauvegarde"
- body = (
- f"Erreur lors de la sauvegarde\n\n"
- f"Job : {job.name}\n"
- f"Type : {job.type}\n"
- f"Instance : {instance}\n\n"
- f"Détail :\n{run.log_text or '(aucun log)'}\n"
- )
- try:
- _send(
- host=smtp_host,
- port=int(_get("smtp_port", "587")),
- user=_get("smtp_user"),
- password=_get("smtp_password"),
- from_addr=_get("smtp_from") or _get("smtp_user"),
- to_addr=smtp_to,
- subject=subject,
- body=body,
- use_ssl=_get("smtp_ssl", "0") == "1",
- use_tls=_get("smtp_tls", "1") == "1",
- )
- except Exception as exc:
- current_app.logger.warning(f"Notification email échouée : {exc}")
- def send_test_email(host, port, user, password, from_addr, to_addr, use_ssl, use_tls):
- """Envoie un email de test. Lève une exception si ça échoue."""
- _send(
- host=host,
- port=int(port),
- user=user,
- password=password,
- from_addr=from_addr or user,
- to_addr=to_addr,
- subject="Test SMTP — Backup Manager",
- body="Si vous recevez cet email, la configuration SMTP est correcte.",
- use_ssl=use_ssl,
- use_tls=use_tls,
- )
- def _send(host, port, user, password, from_addr, to_addr, subject, body, use_ssl, use_tls):
- msg = MIMEText(body, "plain", "utf-8")
- msg["Subject"] = subject
- msg["From"] = from_addr
- msg["To"] = to_addr
- ctx = ssl.create_default_context()
- if use_ssl:
- with smtplib.SMTP_SSL(host, port, context=ctx) as smtp:
- if user:
- smtp.login(user, password)
- smtp.sendmail(from_addr, [to_addr], msg.as_string())
- else:
- with smtplib.SMTP(host, port, timeout=15) as smtp:
- if use_tls:
- smtp.starttls(context=ctx)
- if user:
- smtp.login(user, password)
- smtp.sendmail(from_addr, [to_addr], msg.as_string())
|