Stages 01-03 opérationnels sur 20260505-Lepradet: - 01_ingest: manifest auto, 3 AUVs vidéo, 3 AUVs bags, mapping AUV2xx↔AUV0xx - 02_usbl_parse: MCAP (format incompatible firmware) → fallback serial CSV, 213 pts bruts - 03_usbl_filter: MAD-3σ + moving-avg + Kalman optionnel, dégradé gracieux si null lat/lon - orchestrator/db.py: SQLite schema missions/jobs/metrics idempotent - config/: thresholds.yaml + default_params.yaml versionnés - qa/checks.py: vérifications pass/fail/degraded par étape Note: MCAP bags corrompus ou format non-standard firmware — lat/lon absent. Statut degraded (pas crash). Nécessite investigation format MCAP spécifique.
160 lines
5.5 KiB
Python
160 lines
5.5 KiB
Python
#!/usr/bin/env python3
|
|
"""SQLite schema for cosma-pipeline orchestrator.
|
|
|
|
Tables:
|
|
missions — one row per mission folder on SSD
|
|
jobs — one row per (mission, auv, segment, stage)
|
|
metrics — one row per (job, metric_name) for QA + cron iteration
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sqlite3
|
|
from contextlib import contextmanager
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
DB_PATH = Path(os.environ.get("COSMA_PIPELINE_DB", "/home/cosma/cosma-pipeline/state.db"))
|
|
|
|
SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS missions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL UNIQUE,
|
|
ssd_path TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
-- pending | ingesting | running | done | degraded | error
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL,
|
|
manifest TEXT, -- JSON blob from 01_ingest
|
|
notes TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
mission_id INTEGER NOT NULL REFERENCES missions(id),
|
|
auv_id TEXT NOT NULL, -- e.g. AUV010
|
|
segment_label TEXT NOT NULL, -- e.g. 2026-05-05_08-16-00
|
|
stage TEXT NOT NULL, -- 01_ingest .. 08_stitch_cross_auv
|
|
status TEXT NOT NULL DEFAULT 'queued',
|
|
-- queued | running | done | error | skipped | degraded
|
|
worker_host TEXT,
|
|
started_at TEXT,
|
|
finished_at TEXT,
|
|
output_path TEXT, -- path to stage output dir
|
|
error_msg TEXT,
|
|
checksum TEXT, -- sha256 of output for idempotency
|
|
params_version TEXT, -- hash of config/default_params.yaml at run time
|
|
created_at TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS metrics (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
job_id INTEGER NOT NULL REFERENCES jobs(id),
|
|
name TEXT NOT NULL, -- e.g. usbl_points_before, usbl_points_after
|
|
value REAL,
|
|
text_value TEXT,
|
|
pass_fail TEXT, -- pass | fail | degraded | skip
|
|
recorded_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_mission ON jobs(mission_id);
|
|
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
|
|
CREATE INDEX IF NOT EXISTS idx_metrics_job ON metrics(job_id);
|
|
CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics(name);
|
|
"""
|
|
|
|
|
|
def now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat(timespec="seconds")
|
|
|
|
|
|
def init_db(path: Path | None = None) -> Path:
|
|
p = path or DB_PATH
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
with sqlite3.connect(p) as conn:
|
|
conn.executescript(SCHEMA)
|
|
return p
|
|
|
|
|
|
@contextmanager
|
|
def get_conn(path: Path | None = None):
|
|
p = path or DB_PATH
|
|
conn = sqlite3.connect(p)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA foreign_keys=ON")
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def upsert_mission(conn: sqlite3.Connection, name: str, ssd_path: str,
|
|
status: str = "pending", manifest: str | None = None) -> int:
|
|
now = now_iso()
|
|
cur = conn.execute(
|
|
"SELECT id FROM missions WHERE name = ?", (name,)
|
|
)
|
|
row = cur.fetchone()
|
|
if row:
|
|
conn.execute(
|
|
"UPDATE missions SET ssd_path=?, status=?, manifest=?, updated_at=? WHERE id=?",
|
|
(ssd_path, status, manifest, now, row["id"])
|
|
)
|
|
return row["id"]
|
|
else:
|
|
cur = conn.execute(
|
|
"INSERT INTO missions (name, ssd_path, status, manifest, created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(name, ssd_path, status, manifest, now, now)
|
|
)
|
|
return cur.lastrowid
|
|
|
|
|
|
def upsert_job(conn: sqlite3.Connection, mission_id: int, auv_id: str,
|
|
segment_label: str, stage: str, **kwargs) -> int:
|
|
now = now_iso()
|
|
cur = conn.execute(
|
|
"SELECT id FROM jobs WHERE mission_id=? AND auv_id=? AND segment_label=? AND stage=?",
|
|
(mission_id, auv_id, segment_label, stage)
|
|
)
|
|
row = cur.fetchone()
|
|
fields = {k: v for k, v in kwargs.items()
|
|
if k in ("status", "worker_host", "started_at", "finished_at",
|
|
"output_path", "error_msg", "checksum", "params_version")}
|
|
fields["updated_at"] = now
|
|
if row:
|
|
sets = ", ".join(f"{k}=?" for k in fields)
|
|
vals = list(fields.values()) + [row["id"]]
|
|
conn.execute(f"UPDATE jobs SET {sets} WHERE id=?", vals)
|
|
return row["id"]
|
|
else:
|
|
fields.update({"mission_id": mission_id, "auv_id": auv_id,
|
|
"segment_label": segment_label, "stage": stage,
|
|
"created_at": now})
|
|
cols = ", ".join(fields.keys())
|
|
placeholders = ", ".join("?" for _ in fields)
|
|
cur = conn.execute(f"INSERT INTO jobs ({cols}) VALUES ({placeholders})",
|
|
list(fields.values()))
|
|
return cur.lastrowid
|
|
|
|
|
|
def record_metric(conn: sqlite3.Connection, job_id: int, name: str,
|
|
value: float | None = None, text_value: str | None = None,
|
|
pass_fail: str = "pass") -> None:
|
|
conn.execute(
|
|
"INSERT INTO metrics (job_id, name, value, text_value, pass_fail, recorded_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(job_id, name, value, text_value, pass_fail, now_iso())
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
p = init_db()
|
|
print(f"DB initialized: {p}")
|