| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- from flask_sqlalchemy import SQLAlchemy
- from datetime import datetime
- db = SQLAlchemy()
- class Job(db.Model):
- __tablename__ = "jobs"
- id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.Text, nullable=False)
- type = db.Column(db.Text, nullable=False) # ynh_app|ynh_system|custom_dir|mysql|postgresql
- config_json = db.Column(db.Text)
- cron_expr = db.Column(db.Text, nullable=False)
- retention_mode = db.Column(db.Text, nullable=False) # count|daily|gfs
- retention_value = db.Column(db.Integer, nullable=False)
- enabled = db.Column(db.Boolean, default=True)
- core_only = db.Column(db.Boolean, default=False)
- created_at = db.Column(db.DateTime, default=datetime.utcnow)
- updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
- runs = db.relationship("Run", backref="job", lazy=True, cascade="all, delete-orphan")
- def next_run(self):
- from scheduler import get_next_run
- return get_next_run(self.id)
- class Run(db.Model):
- __tablename__ = "runs"
- id = db.Column(db.Integer, primary_key=True)
- job_id = db.Column(db.Integer, db.ForeignKey("jobs.id"), nullable=False)
- started_at = db.Column(db.DateTime)
- finished_at = db.Column(db.DateTime)
- status = db.Column(db.Text) # running|success|error
- log_text = db.Column(db.Text)
- archive_name = db.Column(db.Text)
- size_bytes = db.Column(db.Integer)
- @property
- def duration_seconds(self):
- if self.started_at and self.finished_at:
- return int((self.finished_at - self.started_at).total_seconds())
- return None
- @property
- def size_human(self):
- if not self.size_bytes:
- return "—"
- n = float(self.size_bytes)
- for unit in ("o", "Ko", "Mo", "Go"):
- if n < 1024:
- return f"{n:.0f} {unit}"
- n /= 1024
- return f"{n:.1f} To"
|