scheduler.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. from apscheduler.schedulers.background import BackgroundScheduler
  2. from apscheduler.triggers.cron import CronTrigger
  3. _flask_app = None
  4. scheduler = BackgroundScheduler(
  5. job_defaults={"coalesce": True, "max_instances": 1},
  6. timezone="UTC",
  7. )
  8. def init_scheduler(flask_app):
  9. global _flask_app
  10. _flask_app = flask_app
  11. if not scheduler.running:
  12. scheduler.start()
  13. # Nettoyage des runs bloqués à "running" (app redémarrée pendant un backup)
  14. scheduler.add_job(
  15. func=_cleanup_stuck_runs,
  16. trigger="interval",
  17. hours=1,
  18. id="cleanup_stuck_runs",
  19. replace_existing=True,
  20. )
  21. def _cleanup_stuck_runs():
  22. from datetime import timedelta
  23. with _flask_app.app_context():
  24. from db import db, Run
  25. cutoff = __import__("datetime").datetime.utcnow() - timedelta(hours=6)
  26. stuck = Run.query.filter(
  27. Run.status == "running",
  28. Run.started_at < cutoff,
  29. ).all()
  30. for run in stuck:
  31. run.status = "error"
  32. run.log_text = (run.log_text or "") + "\n[timeout] Run marqué en erreur par le nettoyage automatique."
  33. run.finished_at = __import__("datetime").datetime.utcnow()
  34. if stuck:
  35. db.session.commit()
  36. def _execute_job(job_id):
  37. with _flask_app.app_context():
  38. from jobs.ynh_backup import execute_job
  39. execute_job(job_id)
  40. def schedule_job(job):
  41. import logging
  42. job_key = f"job_{job.id}"
  43. try:
  44. trigger = CronTrigger.from_crontab(job.cron_expr)
  45. except Exception:
  46. logging.warning(f"Job #{job.id} « {job.name} » : expression cron invalide « {job.cron_expr} » — job non planifié.")
  47. return
  48. if scheduler.get_job(job_key):
  49. scheduler.reschedule_job(job_key, trigger=trigger)
  50. else:
  51. scheduler.add_job(
  52. func=_execute_job,
  53. trigger=trigger,
  54. id=job_key,
  55. kwargs={"job_id": job.id},
  56. replace_existing=True,
  57. )
  58. def remove_job(job_id):
  59. job_key = f"job_{job_id}"
  60. if scheduler.get_job(job_key):
  61. scheduler.remove_job(job_key)
  62. def get_next_run(job_id):
  63. job_key = f"job_{job_id}"
  64. apsjob = scheduler.get_job(job_key)
  65. if apsjob and apsjob.next_run_time:
  66. return apsjob.next_run_time
  67. return None