from flask_sqlalchemy import SQLAlchemy from datetime import datetime db = SQLAlchemy() # --------------------------------------------------------------------------- # Helpers partagés # --------------------------------------------------------------------------- def _size_human(size_bytes): if not size_bytes: return "—" n = float(size_bytes) for unit in ("o", "Ko", "Mo", "Go"): if n < 1024: return f"{n:.0f} {unit}" n /= 1024 return f"{n:.1f} To" class Destination(db.Model): __tablename__ = "destinations" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.Text, nullable=False) host = db.Column(db.Text, nullable=False) port = db.Column(db.Integer, default=22) user = db.Column(db.Text, nullable=False, default="root") remote_path = db.Column(db.Text, nullable=False) key_name = db.Column(db.Text) # nom du fichier clé dans data_dir/keys/ enabled = db.Column(db.Boolean, default=True) created_at = db.Column(db.DateTime, default=datetime.utcnow) @property def remote_str(self): return f"{self.user}@{self.host}:{self.remote_path}" class Job(db.Model): __tablename__ = "jobs" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.Text, nullable=False) type = db.Column(db.Text, nullable=False) # ynh_app|ynh_system|custom_dir|mysql|postgresql config_json = db.Column(db.Text) cron_expr = db.Column(db.Text, nullable=False, default="") # "" = déclenchement manuel uniquement retention_mode = db.Column(db.Text, nullable=False) # count|daily|gfs retention_value = db.Column(db.Integer, nullable=False) retention_gfs_config = db.Column(db.Text, nullable=True) # JSON {"daily":N,"weekly":M,"monthly":P} enabled = db.Column(db.Boolean, default=True) core_only = db.Column(db.Boolean, default=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) runs = db.relationship("Run", backref="job", lazy=True, cascade="all, delete-orphan") job_destinations = db.relationship("JobDestination", backref="job", lazy="joined", cascade="all, delete-orphan", order_by="JobDestination.id") def next_run(self): from scheduler import get_next_run return get_next_run(self.id) class Setting(db.Model): __tablename__ = "settings" key = db.Column(db.Text, primary_key=True) value = db.Column(db.Text, nullable=False, default="") class Run(db.Model): __tablename__ = "runs" id = db.Column(db.Integer, primary_key=True) job_id = db.Column(db.Integer, db.ForeignKey("jobs.id"), nullable=False) started_at = db.Column(db.DateTime) finished_at = db.Column(db.DateTime) status = db.Column(db.Text) # running|success|error log_text = db.Column(db.Text) archive_name = db.Column(db.Text) size_bytes = db.Column(db.Integer) @property def duration_seconds(self): if self.started_at and self.finished_at: return int((self.finished_at - self.started_at).total_seconds()) return None @property def size_human(self): return _size_human(self.size_bytes) # --------------------------------------------------------------------------- # Fédération # --------------------------------------------------------------------------- class RemoteInstance(db.Model): __tablename__ = "remote_instances" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.Text, nullable=False) url = db.Column(db.Text, nullable=False) # https://tom.domaine.fr api_key = db.Column(db.Text, nullable=False) last_seen = db.Column(db.DateTime) status = db.Column(db.Text, default="unknown") # online|offline|error|unknown created_at = db.Column(db.DateTime, default=datetime.utcnow) remote_runs = db.relationship("RemoteRun", backref="instance", lazy=True, cascade="all, delete-orphan") @property def url_display(self): return self.url.rstrip("/") class RemoteRun(db.Model): __tablename__ = "remote_runs" id = db.Column(db.Integer, primary_key=True) instance_id = db.Column(db.Integer, db.ForeignKey("remote_instances.id"), nullable=False) job_id = db.Column(db.Integer) # id sur l'instance distante job_name = db.Column(db.Text) job_type = db.Column(db.Text) last_run_at = db.Column(db.DateTime) last_status = db.Column(db.Text) last_archive_name = db.Column(db.Text) last_size_bytes = db.Column(db.Integer) # Alias pour compatibilité avec le template dashboard_network @property def name(self): return self.job_name @property def type(self): return self.job_type @property def size_human(self): return _size_human(self.last_size_bytes) @property def last_size_human(self): return _size_human(self.last_size_bytes) class JobDestination(db.Model): __tablename__ = "job_destinations" id = db.Column(db.Integer, primary_key=True) job_id = db.Column(db.Integer, db.ForeignKey("jobs.id"), nullable=False) dest_type = db.Column(db.Text, nullable=False) # 'ssh' | 'instance' dest_id = db.Column(db.Integer, nullable=False) @property def resolved(self): if self.dest_type == "ssh": return db.session.get(Destination, self.dest_id) if self.dest_type == "instance": return db.session.get(RemoteInstance, self.dest_id) return None @property def label(self): obj = self.resolved return obj.name if obj else f"#{self.dest_id}" @property def detail(self): obj = self.resolved if obj is None: return "" return obj.remote_str if self.dest_type == "ssh" else obj.url class Upload(db.Model): __tablename__ = "uploads" upload_id = db.Column(db.Text, primary_key=True) # uuid4 filename = db.Column(db.Text) total_size = db.Column(db.Integer) chunk_size = db.Column(db.Integer) chunks_total = db.Column(db.Integer) chunks_received = db.Column(db.Integer, default=0) checksum = db.Column(db.Text) # SHA256 de l'archive complète started_at = db.Column(db.DateTime, default=datetime.utcnow) status = db.Column(db.Text, default="pending") # pending|in_progress|complete|error