from flask_sqlalchemy import SQLAlchemy from datetime import datetime db = SQLAlchemy() class Destination(db.Model): __tablename__ = "destinations" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.Text, nullable=False) host = db.Column(db.Text, nullable=False) port = db.Column(db.Integer, default=22) user = db.Column(db.Text, nullable=False, default="root") remote_path = db.Column(db.Text, nullable=False) key_name = db.Column(db.Text) # nom du fichier clé dans data_dir/keys/ enabled = db.Column(db.Boolean, default=True) created_at = db.Column(db.DateTime, default=datetime.utcnow) jobs = db.relationship("Job", backref="destination", lazy=True) @property def remote_str(self): return f"{self.user}@{self.host}:{self.remote_path}" class Job(db.Model): __tablename__ = "jobs" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.Text, nullable=False) type = db.Column(db.Text, nullable=False) # ynh_app|ynh_system|custom_dir|mysql|postgresql config_json = db.Column(db.Text) cron_expr = db.Column(db.Text, nullable=False) retention_mode = db.Column(db.Text, nullable=False) # count|daily|gfs retention_value = db.Column(db.Integer, nullable=False) enabled = db.Column(db.Boolean, default=True) core_only = db.Column(db.Boolean, default=False) destination_id = db.Column(db.Integer, db.ForeignKey("destinations.id"), nullable=True) created_at = db.Column(db.DateTime, default=datetime.utcnow) updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) runs = db.relationship("Run", backref="job", lazy=True, cascade="all, delete-orphan") def next_run(self): from scheduler import get_next_run return get_next_run(self.id) class Setting(db.Model): __tablename__ = "settings" key = db.Column(db.Text, primary_key=True) value = db.Column(db.Text, nullable=False, default="") class Run(db.Model): __tablename__ = "runs" id = db.Column(db.Integer, primary_key=True) job_id = db.Column(db.Integer, db.ForeignKey("jobs.id"), nullable=False) started_at = db.Column(db.DateTime) finished_at = db.Column(db.DateTime) status = db.Column(db.Text) # running|success|error log_text = db.Column(db.Text) archive_name = db.Column(db.Text) size_bytes = db.Column(db.Integer) @property def duration_seconds(self): if self.started_at and self.finished_at: return int((self.finished_at - self.started_at).total_seconds()) return None @property def size_human(self): if not self.size_bytes: return "—" n = float(self.size_bytes) for unit in ("o", "Ko", "Mo", "Go"): if n < 1024: return f"{n:.0f} {unit}" n /= 1024 return f"{n:.1f} To"