scheduler.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. from apscheduler.schedulers.background import BackgroundScheduler
  2. from apscheduler.triggers.cron import CronTrigger
  3. _flask_app = None
  4. scheduler = BackgroundScheduler(
  5. job_defaults={"coalesce": True, "max_instances": 1},
  6. timezone="UTC",
  7. )
  8. def init_scheduler(flask_app):
  9. global _flask_app
  10. _flask_app = flask_app
  11. # Nettoyage immédiat au démarrage : tout run "running" en DB est forcément
  12. # un vestige d'un process précédent (app redémarrée pendant un backup).
  13. with flask_app.app_context():
  14. from db import db, Run
  15. import datetime as _dt
  16. stale = Run.query.filter_by(status="running").all()
  17. now = _dt.datetime.utcnow()
  18. for run in stale:
  19. run.status = "error"
  20. run.log_text = (run.log_text or "") + "\n[interrompu] Run interrompu par un redémarrage de l'application."
  21. run.finished_at = now
  22. if stale:
  23. db.session.commit()
  24. if not scheduler.running:
  25. scheduler.start()
  26. # Filet de sécurité : marque en erreur tout run resté bloqué > 6h
  27. scheduler.add_job(
  28. func=_cleanup_stuck_runs,
  29. trigger="interval",
  30. hours=1,
  31. id="cleanup_stuck_runs",
  32. replace_existing=True,
  33. )
  34. def _cleanup_stuck_runs():
  35. import datetime as _dt
  36. with _flask_app.app_context():
  37. from db import db, Run
  38. cutoff = _dt.datetime.utcnow() - _dt.timedelta(hours=6)
  39. stuck = Run.query.filter(
  40. Run.status == "running",
  41. Run.started_at < cutoff,
  42. ).all()
  43. now = _dt.datetime.utcnow()
  44. for run in stuck:
  45. duration = now - run.started_at
  46. hours = int(duration.total_seconds() // 3600)
  47. minutes = int((duration.total_seconds() % 3600) // 60)
  48. run.status = "error"
  49. run.finished_at = now
  50. if run.archive_name:
  51. # L'archive a été créée localement ; le timeout est survenu pendant le transfert
  52. detail = (
  53. f" Archive locale présente ({run.archive_name})."
  54. " Le processus de transfert était probablement encore en cours."
  55. )
  56. else:
  57. # Le backup lui-même n'a pas terminé dans les 6h
  58. detail = " Aucune archive locale détectée : la création du backup n'a pas abouti dans les délais."
  59. run.log_text = (run.log_text or "") + (
  60. f"\n\n[timeout] Run bloqué depuis {hours}h{minutes:02d} — "
  61. f"marqué en erreur par le nettoyage automatique.{detail}"
  62. )
  63. if stuck:
  64. db.session.commit()
  65. def _execute_job(job_id):
  66. with _flask_app.app_context():
  67. from jobs.ynh_backup import execute_job
  68. execute_job(job_id)
  69. def schedule_job(job):
  70. import logging
  71. job_key = f"job_{job.id}"
  72. if not job.cron_expr:
  73. return # job manuel uniquement, pas de planification APScheduler
  74. try:
  75. trigger = CronTrigger.from_crontab(job.cron_expr)
  76. except Exception:
  77. logging.warning(f"Job #{job.id} « {job.name} » : expression cron invalide « {job.cron_expr} » — job non planifié.")
  78. return
  79. if scheduler.get_job(job_key):
  80. scheduler.reschedule_job(job_key, trigger=trigger)
  81. else:
  82. scheduler.add_job(
  83. func=_execute_job,
  84. trigger=trigger,
  85. id=job_key,
  86. kwargs={"job_id": job.id},
  87. replace_existing=True,
  88. )
  89. def remove_job(job_id):
  90. job_key = f"job_{job_id}"
  91. if scheduler.get_job(job_key):
  92. scheduler.remove_job(job_key)
  93. def get_next_run(job_id):
  94. job_key = f"job_{job_id}"
  95. apsjob = scheduler.get_job(job_key)
  96. if apsjob and apsjob.next_run_time:
  97. return apsjob.next_run_time
  98. return None