| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- from apscheduler.schedulers.background import BackgroundScheduler
- from apscheduler.triggers.cron import CronTrigger
- _flask_app = None
- scheduler = BackgroundScheduler(
- job_defaults={"coalesce": True, "max_instances": 1},
- timezone="UTC",
- )
- def init_scheduler(flask_app):
- global _flask_app
- _flask_app = flask_app
- # Nettoyage immédiat au démarrage : tout run "running" en DB est forcément
- # un vestige d'un process précédent (app redémarrée pendant un backup).
- with flask_app.app_context():
- from db import db, Run
- import datetime as _dt
- stale = Run.query.filter_by(status="running").all()
- now = _dt.datetime.utcnow()
- for run in stale:
- run.status = "error"
- run.log_text = (run.log_text or "") + "\n[interrompu] Run interrompu par un redémarrage de l'application."
- run.finished_at = now
- if stale:
- db.session.commit()
- if not scheduler.running:
- scheduler.start()
- # Filet de sécurité : marque en erreur tout run resté bloqué > 6h
- scheduler.add_job(
- func=_cleanup_stuck_runs,
- trigger="interval",
- hours=1,
- id="cleanup_stuck_runs",
- replace_existing=True,
- )
- def _cleanup_stuck_runs():
- import datetime as _dt
- with _flask_app.app_context():
- from db import db, Run
- cutoff = _dt.datetime.utcnow() - _dt.timedelta(hours=6)
- stuck = Run.query.filter(
- Run.status == "running",
- Run.started_at < cutoff,
- ).all()
- now = _dt.datetime.utcnow()
- for run in stuck:
- duration = now - run.started_at
- hours = int(duration.total_seconds() // 3600)
- minutes = int((duration.total_seconds() % 3600) // 60)
- run.status = "error"
- run.log_text = (run.log_text or "") + (
- f"\n[timeout] Run bloqué depuis {hours}h{minutes:02d} — marqué en erreur par le nettoyage automatique."
- )
- run.finished_at = now
- if stuck:
- db.session.commit()
- def _execute_job(job_id):
- with _flask_app.app_context():
- from jobs.ynh_backup import execute_job
- execute_job(job_id)
- def schedule_job(job):
- import logging
- job_key = f"job_{job.id}"
- if not job.cron_expr:
- return # job manuel uniquement, pas de planification APScheduler
- try:
- trigger = CronTrigger.from_crontab(job.cron_expr)
- except Exception:
- logging.warning(f"Job #{job.id} « {job.name} » : expression cron invalide « {job.cron_expr} » — job non planifié.")
- return
- if scheduler.get_job(job_key):
- scheduler.reschedule_job(job_key, trigger=trigger)
- else:
- scheduler.add_job(
- func=_execute_job,
- trigger=trigger,
- id=job_key,
- kwargs={"job_id": job.id},
- replace_existing=True,
- )
- def remove_job(job_id):
- job_key = f"job_{job_id}"
- if scheduler.get_job(job_key):
- scheduler.remove_job(job_key)
- def get_next_run(job_id):
- job_key = f"job_{job_id}"
- apsjob = scheduler.get_job(job_key)
- if apsjob and apsjob.next_run_time:
- return apsjob.next_run_time
- return None
|