Files
cosma-qc/pipeline/orchestrator/db.py
Ubuntu e597407ee5 feat(pipeline): jalon 1-3 — ingest, USBL parse, filter
Stages 01-03 opérationnels sur 20260505-Lepradet:
- 01_ingest: manifest auto, 3 AUVs vidéo, 3 AUVs bags, mapping AUV2xx↔AUV0xx
- 02_usbl_parse: MCAP (format incompatible firmware) → fallback serial CSV, 213 pts bruts
- 03_usbl_filter: MAD-3σ + moving-avg + Kalman optionnel, dégradé gracieux si null lat/lon
- orchestrator/db.py: SQLite schema missions/jobs/metrics idempotent
- config/: thresholds.yaml + default_params.yaml versionnés
- qa/checks.py: vérifications pass/fail/degraded par étape

Note: MCAP bags corrompus ou format non-standard firmware — lat/lon absent.
Statut degraded (pas crash). Nécessite investigation format MCAP spécifique.
2026-05-11 10:25:27 +00:00

160 lines
5.5 KiB
Python

#!/usr/bin/env python3
"""SQLite schema for cosma-pipeline orchestrator.
Tables:
missions — one row per mission folder on SSD
jobs — one row per (mission, auv, segment, stage)
metrics — one row per (job, metric_name) for QA + cron iteration
"""
from __future__ import annotations
import os
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
DB_PATH = Path(os.environ.get("COSMA_PIPELINE_DB", "/home/cosma/cosma-pipeline/state.db"))
SCHEMA = """
CREATE TABLE IF NOT EXISTS missions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
ssd_path TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
-- pending | ingesting | running | done | degraded | error
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
manifest TEXT, -- JSON blob from 01_ingest
notes TEXT
);
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mission_id INTEGER NOT NULL REFERENCES missions(id),
auv_id TEXT NOT NULL, -- e.g. AUV010
segment_label TEXT NOT NULL, -- e.g. 2026-05-05_08-16-00
stage TEXT NOT NULL, -- 01_ingest .. 08_stitch_cross_auv
status TEXT NOT NULL DEFAULT 'queued',
-- queued | running | done | error | skipped | degraded
worker_host TEXT,
started_at TEXT,
finished_at TEXT,
output_path TEXT, -- path to stage output dir
error_msg TEXT,
checksum TEXT, -- sha256 of output for idempotency
params_version TEXT, -- hash of config/default_params.yaml at run time
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER NOT NULL REFERENCES jobs(id),
name TEXT NOT NULL, -- e.g. usbl_points_before, usbl_points_after
value REAL,
text_value TEXT,
pass_fail TEXT, -- pass | fail | degraded | skip
recorded_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_jobs_mission ON jobs(mission_id);
CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);
CREATE INDEX IF NOT EXISTS idx_metrics_job ON metrics(job_id);
CREATE INDEX IF NOT EXISTS idx_metrics_name ON metrics(name);
"""
def now_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
def init_db(path: Path | None = None) -> Path:
p = path or DB_PATH
p.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(p) as conn:
conn.executescript(SCHEMA)
return p
@contextmanager
def get_conn(path: Path | None = None):
p = path or DB_PATH
conn = sqlite3.connect(p)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def upsert_mission(conn: sqlite3.Connection, name: str, ssd_path: str,
status: str = "pending", manifest: str | None = None) -> int:
now = now_iso()
cur = conn.execute(
"SELECT id FROM missions WHERE name = ?", (name,)
)
row = cur.fetchone()
if row:
conn.execute(
"UPDATE missions SET ssd_path=?, status=?, manifest=?, updated_at=? WHERE id=?",
(ssd_path, status, manifest, now, row["id"])
)
return row["id"]
else:
cur = conn.execute(
"INSERT INTO missions (name, ssd_path, status, manifest, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(name, ssd_path, status, manifest, now, now)
)
return cur.lastrowid
def upsert_job(conn: sqlite3.Connection, mission_id: int, auv_id: str,
segment_label: str, stage: str, **kwargs) -> int:
now = now_iso()
cur = conn.execute(
"SELECT id FROM jobs WHERE mission_id=? AND auv_id=? AND segment_label=? AND stage=?",
(mission_id, auv_id, segment_label, stage)
)
row = cur.fetchone()
fields = {k: v for k, v in kwargs.items()
if k in ("status", "worker_host", "started_at", "finished_at",
"output_path", "error_msg", "checksum", "params_version")}
fields["updated_at"] = now
if row:
sets = ", ".join(f"{k}=?" for k in fields)
vals = list(fields.values()) + [row["id"]]
conn.execute(f"UPDATE jobs SET {sets} WHERE id=?", vals)
return row["id"]
else:
fields.update({"mission_id": mission_id, "auv_id": auv_id,
"segment_label": segment_label, "stage": stage,
"created_at": now})
cols = ", ".join(fields.keys())
placeholders = ", ".join("?" for _ in fields)
cur = conn.execute(f"INSERT INTO jobs ({cols}) VALUES ({placeholders})",
list(fields.values()))
return cur.lastrowid
def record_metric(conn: sqlite3.Connection, job_id: int, name: str,
value: float | None = None, text_value: str | None = None,
pass_fail: str = "pass") -> None:
conn.execute(
"INSERT INTO metrics (job_id, name, value, text_value, pass_fail, recorded_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(job_id, name, value, text_value, pass_fail, now_iso())
)
if __name__ == "__main__":
p = init_db()
print(f"DB initialized: {p}")