| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- from flask_sqlalchemy import SQLAlchemy
- from datetime import datetime
- db = SQLAlchemy()
- class Destination(db.Model):
- __tablename__ = "destinations"
- id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.Text, nullable=False)
- host = db.Column(db.Text, nullable=False)
- port = db.Column(db.Integer, default=22)
- user = db.Column(db.Text, nullable=False, default="root")
- remote_path = db.Column(db.Text, nullable=False)
- key_name = db.Column(db.Text) # nom du fichier clé dans data_dir/keys/
- enabled = db.Column(db.Boolean, default=True)
- created_at = db.Column(db.DateTime, default=datetime.utcnow)
- jobs = db.relationship("Job", backref="destination", lazy=True)
- @property
- def remote_str(self):
- return f"{self.user}@{self.host}:{self.remote_path}"
- class Job(db.Model):
- __tablename__ = "jobs"
- id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.Text, nullable=False)
- type = db.Column(db.Text, nullable=False) # ynh_app|ynh_system|custom_dir|mysql|postgresql
- config_json = db.Column(db.Text)
- cron_expr = db.Column(db.Text, nullable=False)
- retention_mode = db.Column(db.Text, nullable=False) # count|daily|gfs
- retention_value = db.Column(db.Integer, nullable=False)
- enabled = db.Column(db.Boolean, default=True)
- core_only = db.Column(db.Boolean, default=False)
- destination_id = db.Column(db.Integer, db.ForeignKey("destinations.id"), nullable=True)
- created_at = db.Column(db.DateTime, default=datetime.utcnow)
- updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
- runs = db.relationship("Run", backref="job", lazy=True, cascade="all, delete-orphan")
- def next_run(self):
- from scheduler import get_next_run
- return get_next_run(self.id)
- class Setting(db.Model):
- __tablename__ = "settings"
- key = db.Column(db.Text, primary_key=True)
- value = db.Column(db.Text, nullable=False, default="")
- class Run(db.Model):
- __tablename__ = "runs"
- id = db.Column(db.Integer, primary_key=True)
- job_id = db.Column(db.Integer, db.ForeignKey("jobs.id"), nullable=False)
- started_at = db.Column(db.DateTime)
- finished_at = db.Column(db.DateTime)
- status = db.Column(db.Text) # running|success|error
- log_text = db.Column(db.Text)
- archive_name = db.Column(db.Text)
- size_bytes = db.Column(db.Integer)
- @property
- def duration_seconds(self):
- if self.started_at and self.finished_at:
- return int((self.finished_at - self.started_at).total_seconds())
- return None
- @property
- def size_human(self):
- if not self.size_bytes:
- return "—"
- n = float(self.size_bytes)
- for unit in ("o", "Ko", "Mo", "Go"):
- if n < 1024:
- return f"{n:.0f} {unit}"
- n /= 1024
- return f"{n:.1f} To"
|