from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger _flask_app = None scheduler = BackgroundScheduler( job_defaults={"coalesce": True, "max_instances": 1}, timezone="UTC", ) def init_scheduler(flask_app): global _flask_app _flask_app = flask_app if not scheduler.running: scheduler.start() def _execute_job(job_id): with _flask_app.app_context(): from jobs.ynh_backup import execute_job execute_job(job_id) def schedule_job(job): job_key = f"job_{job.id}" try: trigger = CronTrigger.from_crontab(job.cron_expr) except Exception: return if scheduler.get_job(job_key): scheduler.reschedule_job(job_key, trigger=trigger) else: scheduler.add_job( func=_execute_job, trigger=trigger, id=job_key, kwargs={"job_id": job.id}, replace_existing=True, ) def remove_job(job_id): job_key = f"job_{job_id}" if scheduler.get_job(job_key): scheduler.remove_job(job_key) def get_next_run(job_id): job_key = f"job_{job_id}" apsjob = scheduler.get_job(job_key) if apsjob and apsjob.next_run_time: return apsjob.next_run_time return None