scaffold — FastAPI + SQLite + HTMX dashboard, ingest + dispatcher

- app/main.py : dashboard /, partials /partials/{jobs,monitor} (htmx polling)
- app/templates/ : index, jobs table, monitor card par worker
- app/static/style.css : thème sombre cohérent
- scripts/ingest.py : scan SSD d'acquisition, EXIF CreateDate → segments
  continus par (AUV, GoPro serial) avec seuil configurable
- scripts/dispatcher.py : polling queue, pick worker selon VRAM free,
  extraction ffmpeg + lingbot-map windowed --offload_to_cpu, progression DB
- DB : SQLite (acquisitions + jobs), lifecycle queued→extracting→running→done
- Workers par défaut : .87 (3060 12GB) + .84 (3090 24GB)

Contexte : QC terrain le jour-même (avant photogrammétrie à 30 jours),
plusieurs heures × 2 GoPros × 2-3 AUVs d'enregistrement à traiter en parallèle.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-21 09:52:41 +00:00
parent 17edbcbd8b
commit b7d957c806
9 changed files with 766 additions and 1 deletions

View File

@@ -1,3 +1,49 @@
# cosma-qc
COSMA post-acquisition QC pipeline: per-GoPro lingbot-map reconstruction, job queue, web dashboard
**COSMA post-acquisition QC pipeline** — reconstruction photogrammétrique
par GoPro (lingbot-map), queue de jobs distribués, dashboard web pour suivi
terrain le jour même.
## Objectif
Après une acquisition AUV (2 GoPros × 2-3 AUVs × heures d'enregistrement),
savoir rapidement si la couverture est complète avant de replier la mission —
sans attendre les 30 jours du traitement photogrammétrique complet.
## Pipeline
```
SSD plugged ─┐
├─▶ Ingestion ─▶ Frame extraction (per GoPro × segment)
│ │
│ ▼
│ Job queue (SQLite)
│ │
│ ┌──────────────┼──────────────┐
▼ ▼ ▼ ▼
Dashboard Worker .87 Worker .84 (scalable)
(FastAPI) (3060) (3090)
│ │ │
│ └─▶ PLY ◀──────┘
│ │
│ ▼
└──────── ICP stitch (Open3D) ─▶ viser viewer
```
## Stack
- **Backend** : FastAPI + SQLite
- **Frontend** : HTMX (UI réactive sans build JS)
- **Queue** : table SQLite + workers SSH-triggered
- **Monitoring** : polling `nvidia-smi` sur .87 / .84, `df` pour disque
- **Reconstruction** : lingbot-map (GCT-Stream windowed)
- **Stitch** : Open3D ICP
## Déploiement
- Service sur .82 (stable, Caddy pour URL propre)
- Workers : SSH vers .87 (3060 12 GB) et .84 (3090 24 GB)
## État
Scaffold en cours.

168
app/main.py Normal file
View File

@@ -0,0 +1,168 @@
from __future__ import annotations
import asyncio
import json
import os
import sqlite3
from contextlib import asynccontextmanager, closing
from datetime import datetime
from pathlib import Path
from typing import Any
from fastapi import FastAPI, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db"))
WORKERS = json.loads(os.environ.get("COSMA_QC_WORKERS", json.dumps([
{"host": "192.168.0.87", "ssh_alias": "gpu", "gpu": "RTX 3060 12GB"},
{"host": "192.168.0.84", "ssh_alias": "cosma-vm","gpu": "RTX 3090 24GB"},
])))
STATUSES = ("queued", "extracting", "running", "done", "error")
def db() -> sqlite3.Connection:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH, isolation_level=None)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
def init_schema() -> None:
with closing(db()) as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS acquisitions (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
source_path TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY,
acquisition_id INTEGER NOT NULL REFERENCES acquisitions(id) ON DELETE CASCADE,
auv TEXT NOT NULL,
gopro_serial TEXT NOT NULL,
segment_label TEXT NOT NULL,
video_paths TEXT NOT NULL,
frame_count INTEGER,
frames_dir TEXT,
status TEXT NOT NULL DEFAULT 'queued',
worker_host TEXT,
viser_url TEXT,
ply_path TEXT,
progress INTEGER NOT NULL DEFAULT 0,
log_tail TEXT,
error TEXT,
started_at TEXT,
finished_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS jobs_status_idx ON jobs(status);
CREATE INDEX IF NOT EXISTS jobs_acq_idx ON jobs(acquisition_id);
""")
@asynccontextmanager
async def lifespan(_: FastAPI):
init_schema()
yield
app = FastAPI(title="cosma-qc", lifespan=lifespan)
templates = Jinja2Templates(directory=Path(__file__).parent / "templates")
app.mount("/static", StaticFiles(directory=Path(__file__).parent / "static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
with closing(db()) as conn:
jobs = conn.execute("""
SELECT j.*, a.name AS acquisition_name
FROM jobs j
LEFT JOIN acquisitions a ON a.id = j.acquisition_id
ORDER BY j.created_at DESC
LIMIT 200
""").fetchall()
return templates.TemplateResponse("index.html", {
"request": request,
"jobs": jobs,
"workers": WORKERS,
})
@app.get("/api/jobs")
async def list_jobs():
with closing(db()) as conn:
rows = conn.execute("SELECT * FROM jobs ORDER BY created_at DESC LIMIT 500").fetchall()
return [dict(r) for r in rows]
@app.get("/partials/jobs", response_class=HTMLResponse)
async def partial_jobs(request: Request):
with closing(db()) as conn:
jobs = conn.execute("""
SELECT j.*, a.name AS acquisition_name
FROM jobs j
LEFT JOIN acquisitions a ON a.id = j.acquisition_id
ORDER BY j.created_at DESC
LIMIT 200
""").fetchall()
return templates.TemplateResponse("_jobs_table.html", {"request": request, "jobs": jobs})
@app.get("/partials/monitor", response_class=HTMLResponse)
async def partial_monitor(request: Request):
stats = await asyncio.gather(*[_worker_stats(w) for w in WORKERS])
return templates.TemplateResponse("_monitor.html", {"request": request, "workers": stats})
async def _worker_stats(worker: dict) -> dict:
alias = worker["ssh_alias"]
try:
proc = await asyncio.create_subprocess_exec(
"ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", alias,
"nvidia-smi --query-gpu=memory.used,memory.total,utilization.gpu --format=csv,noheader,nounits && df -h / | tail -1",
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
out, _ = await asyncio.wait_for(proc.communicate(), timeout=4)
text = out.decode().strip().splitlines()
gpu_line = text[0].split(",") if text else ["?", "?", "?"]
disk = text[1].split() if len(text) > 1 else ["?"] * 6
return {
**worker,
"online": True,
"vram_used_mib": int(gpu_line[0].strip()) if gpu_line[0].strip().isdigit() else None,
"vram_total_mib": int(gpu_line[1].strip()) if gpu_line[1].strip().isdigit() else None,
"gpu_util_pct": int(gpu_line[2].strip()) if gpu_line[2].strip().isdigit() else None,
"disk_used_pct": disk[4] if len(disk) > 4 else "?",
}
except Exception as e:
return {**worker, "online": False, "error": str(e)[:80]}
@app.post("/jobs/{job_id}/cancel")
async def cancel_job(job_id: int):
with closing(db()) as conn:
conn.execute(
"UPDATE jobs SET status='error', error='cancelled by user', finished_at=datetime('now') "
"WHERE id=? AND status IN ('queued','extracting','running')",
(job_id,),
)
return {"ok": True}
@app.post("/jobs/{job_id}/retry")
async def retry_job(job_id: int):
with closing(db()) as conn:
conn.execute(
"UPDATE jobs SET status='queued', error=NULL, progress=0, started_at=NULL, "
"finished_at=NULL, worker_host=NULL WHERE id=? AND status='error'",
(job_id,),
)
return {"ok": True}

69
app/static/style.css Normal file
View File

@@ -0,0 +1,69 @@
:root {
--bg: #0b1220;
--panel: #121a2b;
--border: #1f2a44;
--text: #e6edf7;
--muted: #7b8aa8;
--accent: #5fd0ff;
--ok: #3ddc84;
--warn: #f5c518;
--err: #ff5c7a;
}
* { box-sizing: border-box; }
body {
font-family: ui-monospace, "JetBrains Mono", Menlo, monospace;
background: var(--bg); color: var(--text);
margin: 0; padding: 1.5rem;
}
header { display: flex; align-items: baseline; gap: 1rem; margin-bottom: 1.25rem; }
header h1 { margin: 0; font-size: 1.25rem; color: var(--accent); }
.sub { color: var(--muted); font-size: 0.85rem; }
h2 { font-size: 1rem; color: var(--accent); margin: 1.5rem 0 0.5rem; }
.muted { color: var(--muted); }
.err { color: var(--err); }
section { background: var(--panel); border: 1px solid var(--border);
border-radius: 10px; padding: 1rem; margin-bottom: 1rem; }
.worker-grid { display: grid; gap: 1rem;
grid-template-columns: repeat(auto-fill, minmax(240px, 1fr)); }
.worker { background: rgba(255,255,255,0.02); border: 1px solid var(--border);
border-radius: 8px; padding: 0.75rem; }
.worker.offline { opacity: 0.55; border-color: var(--err); }
.worker .hdr { display: flex; justify-content: space-between;
gap: 0.5rem; align-items: center; margin-bottom: 0.5rem; flex-wrap: wrap; }
.worker .gpu { color: var(--muted); font-size: 0.8rem; }
.worker .state { color: var(--ok); font-size: 0.75rem; text-transform: uppercase; }
.worker.offline .state { color: var(--err); }
.bar { display: grid; grid-template-columns: 60px 1fr auto; gap: 0.5rem;
align-items: center; margin-bottom: 0.25rem; font-size: 0.8rem; }
.bar small { color: var(--muted); }
progress { appearance: none; height: 10px; width: 100%;
border-radius: 6px; overflow: hidden; border: 1px solid var(--border); background: #0a1020; }
progress::-webkit-progress-bar { background: #0a1020; }
progress::-webkit-progress-value { background: var(--accent); }
progress::-moz-progress-bar { background: var(--accent); }
table.jobs { width: 100%; border-collapse: collapse; font-size: 0.85rem; }
table.jobs th, table.jobs td { text-align: left; padding: 0.45rem 0.55rem;
border-bottom: 1px solid var(--border); }
table.jobs th { color: var(--muted); font-weight: normal; text-transform: uppercase;
font-size: 0.72rem; letter-spacing: 0.04em; }
tr.status-done td { color: var(--ok); }
tr.status-error td { color: var(--err); }
tr.err-row td { color: var(--err); padding-top: 0; border-top: none; }
.pill { padding: 0.15rem 0.5rem; border-radius: 999px; font-size: 0.7rem;
background: rgba(255,255,255,0.05); border: 1px solid var(--border); }
.pill.queued { color: var(--muted); }
.pill.extracting, .pill.running { color: var(--warn); border-color: var(--warn); }
.pill.done { color: var(--ok); border-color: var(--ok); }
.pill.error { color: var(--err); border-color: var(--err); }
button { background: transparent; color: var(--accent); border: 1px solid var(--border);
padding: 0.2rem 0.6rem; border-radius: 6px; cursor: pointer; font-family: inherit; font-size: 0.75rem; }
button:hover { border-color: var(--accent); }
a { color: var(--accent); }
code { background: rgba(255,255,255,0.05); padding: 0 0.25rem; border-radius: 3px; }

View File

@@ -0,0 +1,44 @@
{% if not jobs %}
<p class="muted">Aucun job. Ingeste un dossier d'acquisition via <code>scripts/ingest.py</code>.</p>
{% else %}
<table class="jobs">
<thead>
<tr>
<th>#</th><th>Acquisition</th><th>AUV</th><th>GoPro</th><th>Segment</th>
<th>Frames</th><th>Status</th><th>Worker</th><th>Progress</th><th>Actions</th>
</tr>
</thead>
<tbody>
{% for j in jobs %}
<tr class="status-{{ j.status }}">
<td>{{ j.id }}</td>
<td>{{ j.acquisition_name }}</td>
<td>{{ j.auv }}</td>
<td><code>{{ j.gopro_serial }}</code></td>
<td>{{ j.segment_label }}</td>
<td>{{ j.frame_count or "—" }}</td>
<td><span class="pill {{ j.status }}">{{ j.status }}</span></td>
<td>{{ j.worker_host or "—" }}</td>
<td>
{% if j.status == 'running' or j.status == 'extracting' %}
<progress value="{{ j.progress }}" max="100"></progress> {{ j.progress }}%
{% elif j.status == 'done' and j.viser_url %}
<a href="{{ j.viser_url }}" target="_blank">viser</a>
{% if j.ply_path %} · <a href="/api/jobs/{{ j.id }}/ply">PLY</a>{% endif %}
{% else %}—{% endif %}
</td>
<td>
{% if j.status in ['queued','extracting','running'] %}
<button hx-post="/jobs/{{ j.id }}/cancel" hx-target="#jobs-table" hx-swap="outerHTML">Stop</button>
{% elif j.status == 'error' %}
<button hx-post="/jobs/{{ j.id }}/retry" hx-target="#jobs-table" hx-swap="outerHTML">Retry</button>
{% endif %}
</td>
</tr>
{% if j.error %}
<tr class="err-row"><td colspan="10"><small>{{ j.error }}</small></td></tr>
{% endif %}
{% endfor %}
</tbody>
</table>
{% endif %}

View File

@@ -0,0 +1,26 @@
<div class="worker-grid">
{% for w in workers %}
<div class="worker {% if not w.online %}offline{% endif %}">
<div class="hdr">
<b>{{ w.host }}</b>
<span class="gpu">{{ w.gpu }}</span>
<span class="state">{% if w.online %}online{% else %}offline{% endif %}</span>
</div>
{% if w.online %}
<div class="bar">
<span>VRAM</span>
<progress value="{{ w.vram_used_mib or 0 }}" max="{{ w.vram_total_mib or 1 }}"></progress>
<small>{{ w.vram_used_mib }} / {{ w.vram_total_mib }} MiB</small>
</div>
<div class="bar">
<span>GPU</span>
<progress value="{{ w.gpu_util_pct or 0 }}" max="100"></progress>
<small>{{ w.gpu_util_pct }}%</small>
</div>
<div class="bar"><span>Disk /</span><small>{{ w.disk_used_pct }}</small></div>
{% else %}
<div class="err">{{ w.error or "unreachable" }}</div>
{% endif %}
</div>
{% endfor %}
</div>

27
app/templates/index.html Normal file
View File

@@ -0,0 +1,27 @@
<!doctype html>
<html lang="fr">
<head>
<meta charset="utf-8">
<title>cosma-qc — dashboard</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<script src="https://unpkg.com/htmx.org@2.0.4"></script>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<header>
<h1>cosma-qc</h1>
<span class="sub">post-acquisition QC · lingbot-map pipeline</span>
</header>
<section id="monitor" hx-get="/partials/monitor" hx-trigger="load, every 5s" hx-swap="innerHTML">
<p class="muted">Chargement des workers…</p>
</section>
<section id="jobs">
<h2>Jobs</h2>
<div id="jobs-table" hx-get="/partials/jobs" hx-trigger="load, every 3s" hx-swap="innerHTML">
<p class="muted">Chargement…</p>
</div>
</section>
</body>
</html>

14
pyproject.toml Normal file
View File

@@ -0,0 +1,14 @@
[project]
name = "cosma-qc"
version = "0.1.0"
description = "COSMA post-acquisition QC pipeline"
requires-python = ">=3.11"
dependencies = [
"fastapi>=0.115",
"jinja2>=3.1",
"uvicorn[standard]>=0.30",
"python-multipart>=0.0.9",
]
[tool.uv]
package = false

203
scripts/dispatcher.py Normal file
View File

@@ -0,0 +1,203 @@
#!/usr/bin/env python3
"""Dispatcher daemon: picks queued jobs and runs them on available workers.
One-shot worker loop. Run as a systemd service (or manually). Handles both
extraction (ffmpeg on the worker) and reconstruction (lingbot-map on the
worker). Progress is written back to the DB.
Env:
COSMA_QC_DB : SQLite path (default /var/lib/cosma-qc/jobs.db)
COSMA_QC_WORKERS : JSON list of workers [{host, ssh_alias, gpu, vram_mib,
frames_dir, lingbot_path}]
COSMA_QC_FPS : extraction fps (default 3)
COSMA_QC_IMG_H : image height (default 294)
COSMA_QC_IMG_W : image width (default 518)
Jobs lifecycle:
queued → extracting → running → done
↘ error
"""
from __future__ import annotations
import json
import os
import re
import shlex
import sqlite3
import subprocess
import sys
import time
from contextlib import closing
from datetime import datetime, timezone
from pathlib import Path
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat(timespec="seconds")
DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db"))
FPS = int(os.environ.get("COSMA_QC_FPS", "3"))
IMG_H = int(os.environ.get("COSMA_QC_IMG_H", "294"))
IMG_W = int(os.environ.get("COSMA_QC_IMG_W", "518"))
POLL_S = int(os.environ.get("COSMA_QC_POLL_S", "4"))
DEFAULT_WORKERS = [
{
"host": "192.168.0.87", "ssh_alias": "gpu", "gpu": "RTX 3060 12GB",
"vram_mib": 11913,
"frames_dir": "/home/floppyrj45/cosma-qc-frames",
"lingbot_path": "/home/floppyrj45/ai-video/lingbot-map",
"viser_port_base": 8100,
},
{
"host": "192.168.0.84", "ssh_alias": "cosma-vm", "gpu": "RTX 3090 24GB",
"vram_mib": 24576,
"frames_dir": "/home/floppyrj45/cosma-qc-frames",
"lingbot_path": "/home/floppyrj45/ai-video/lingbot-map",
"viser_port_base": 8100,
},
]
WORKERS = json.loads(os.environ.get("COSMA_QC_WORKERS", json.dumps(DEFAULT_WORKERS)))
def db() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH, isolation_level=None)
conn.execute("PRAGMA journal_mode=WAL")
conn.row_factory = sqlite3.Row
return conn
def ssh(alias: str, cmd: str, timeout: int = 30) -> tuple[int, str, str]:
p = subprocess.run(
["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", alias, cmd],
capture_output=True, text=True, timeout=timeout,
)
return p.returncode, p.stdout, p.stderr
def worker_free_vram_mib(worker: dict) -> int:
rc, out, _ = ssh(worker["ssh_alias"], "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits")
try:
return int(out.strip().splitlines()[0]) if rc == 0 else 0
except Exception:
return 0
def pick_worker(estimated_vram_mib: int) -> dict | None:
best = None
for w in WORKERS:
free = worker_free_vram_mib(w)
if free >= estimated_vram_mib and (best is None or free > best[0]):
best = (free, w)
return best[1] if best else None
def estimate_vram_mib(frame_count: int) -> int:
# Based on empirical: 300 frames peak ≈ 9.4 GiB, 600 frames OOM @ ~11 GiB.
# Linear extrapolation with headroom.
return int(3500 + 13 * frame_count) # MiB
def set_status(job_id: int, **fields):
keys = list(fields.keys())
vals = [fields[k] for k in keys]
q = "UPDATE jobs SET " + ", ".join(f"{k}=?" for k in keys) + " WHERE id=?"
with closing(db()) as conn:
conn.execute(q, (*vals, job_id))
def count_frames(worker: dict, frames_dir: str) -> int:
rc, out, _ = ssh(worker["ssh_alias"], f"ls {shlex.quote(frames_dir)} 2>/dev/null | wc -l")
try:
return int(out.strip()) if rc == 0 else 0
except Exception:
return 0
def do_extract(job: sqlite3.Row, worker: dict) -> str:
"""Run ffmpeg on the worker for each video in job.video_paths."""
videos = json.loads(job["video_paths"])
frames_dir = f"{worker['frames_dir']}/job_{job['id']}"
ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)}")
idx = 0
for v in videos:
vf = f"fps={FPS},scale={IMG_W}:{IMG_H}"
pattern = f"{frames_dir}/frame_%06d.jpg"
# Prepend to idx to keep frame ordering across videos.
cmd = (
f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(v)} "
f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 "
f"{shlex.quote(pattern)}"
)
rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3600)
if rc != 0:
raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}")
# Count frames now present to bump idx
idx = count_frames(worker, frames_dir)
set_status(job["id"], frame_count=idx)
return frames_dir
def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str, str]:
port = worker["viser_port_base"] + job["id"]
log = f"/tmp/cosma-qc-job-{job['id']}.log"
ckpt = f"{worker['lingbot_path']}/checkpoints/lingbot-map/lingbot-map-long.pt"
cmd = (
f"cd {shlex.quote(worker['lingbot_path'])} && source .venv/bin/activate && "
f"python3 demo.py --model_path {shlex.quote(ckpt)} "
f"--image_folder {shlex.quote(frames_dir)} --port {port} "
f"--use_sdpa --mode windowed --window_size 16 --overlap_size 2 --offload_to_cpu "
f"> {log} 2>&1"
)
rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3 * 3600)
if rc != 0:
tail = ssh(worker["ssh_alias"], f"tail -30 {log}")[1]
raise RuntimeError(f"demo.py failed: {err[:200]}\n---\n{tail[:800]}")
viser_url = f"http://{worker['host']}:{port}"
return viser_url, log
def run_one(job: sqlite3.Row):
job_id = job["id"]
estimated = estimate_vram_mib(job["frame_count"] or 400)
worker = pick_worker(estimated)
if not worker:
return # retry later
set_status(job_id, status="extracting", worker_host=worker["host"],
started_at=_now_iso())
try:
frames_dir = do_extract(job, worker)
frame_count = count_frames(worker, frames_dir)
set_status(job_id, frames_dir=frames_dir, frame_count=frame_count,
status="running", progress=0)
viser_url, log = do_reconstruct(job, worker, frames_dir)
set_status(job_id, status="done", viser_url=viser_url, progress=100,
log_tail=log,
finished_at=_now_iso())
except Exception as e:
set_status(job_id, status="error", error=str(e)[:2000],
finished_at=_now_iso())
def pop_queued() -> sqlite3.Row | None:
with closing(db()) as conn:
return conn.execute(
"SELECT * FROM jobs WHERE status='queued' ORDER BY created_at LIMIT 1"
).fetchone()
def main():
print(f"cosma-qc dispatcher · DB={DB_PATH} · workers={[w['host'] for w in WORKERS]}")
while True:
job = pop_queued()
if job is None:
time.sleep(POLL_S); continue
print(f"→ picking up job #{job['id']} ({job['auv']}/{job['gopro_serial']}/{job['segment_label']})")
run_one(job)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit(0)

168
scripts/ingest.py Normal file
View File

@@ -0,0 +1,168 @@
#!/usr/bin/env python3
"""Scan an acquisition directory, group GoPro MP4s into continuous segments,
and insert jobs into the cosma-qc DB.
Usage:
python3 ingest.py /mnt/portablessd/COSMA-<date>/ --name "La Ciotat 8 avril" [--gap-min 5]
Directory layout expected (we saw this from the real SSD):
<root>/media/gopro{1,2}/GP{1,2}_AUV{209,210}/GX*.MP4
The AUV tag and GoPro id come from folder names. The serial is read via
exiftool (falls back to folder name if unavailable). Continuous segments are
derived from EXIF CreateDate timestamps with a configurable gap threshold.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sqlite3
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db"))
FOLDER_RE = re.compile(r"GP(?P<gopro>\d+)_AUV(?P<auv>\d+)", re.I)
def exif_create_date(path: Path) -> datetime | None:
try:
out = subprocess.check_output(
["exiftool", "-s3", "-CreateDate", "-api", "QuickTimeUTC=1", str(path)],
stderr=subprocess.DEVNULL, text=True, timeout=10,
).strip()
return datetime.strptime(out, "%Y:%m:%d %H:%M:%S") if out else None
except Exception:
return None
def exif_duration_s(path: Path) -> float | None:
try:
out = subprocess.check_output(
["exiftool", "-s3", "-Duration#", str(path)],
stderr=subprocess.DEVNULL, text=True, timeout=10,
).strip()
return float(out) if out else None
except Exception:
return None
def exif_serial(path: Path) -> str | None:
try:
out = subprocess.check_output(
["exiftool", "-s3", "-SerialNumber", "-CameraSerialNumber", str(path)],
stderr=subprocess.DEVNULL, text=True, timeout=10,
).strip().splitlines()
for line in out:
line = line.strip()
if line:
return line
except Exception:
pass
return None
def group_segments(videos: list[dict], gap_min: int) -> list[dict]:
"""Group consecutive videos into segments when gap between end-of-A and
start-of-B is below `gap_min` minutes."""
videos = sorted(videos, key=lambda v: v["start"])
segments: list[list[dict]] = []
for v in videos:
if not segments:
segments.append([v]); continue
last = segments[-1][-1]
last_end = last["start"] + timedelta(seconds=last["duration"] or 0)
if (v["start"] - last_end) <= timedelta(minutes=gap_min):
segments[-1].append(v)
else:
segments.append([v])
out = []
for seg in segments:
start = seg[0]["start"]
end = seg[-1]["start"] + timedelta(seconds=seg[-1]["duration"] or 0)
out.append({
"start": start, "end": end,
"label": f"{start.strftime('%H:%M')}{end.strftime('%H:%M')}",
"videos": [str(v["path"]) for v in seg],
})
return out
def scan(root: Path) -> dict:
"""Return {(auv, gopro_tag): {serial, videos[]}}"""
grouped: dict[tuple[str, str], dict] = {}
for mp4 in root.rglob("*.MP4"):
m = FOLDER_RE.search(str(mp4.parent))
if not m:
continue
auv = f"AUV{m.group('auv')}"
gopro_tag = f"GP{m.group('gopro')}"
key = (auv, gopro_tag)
start = exif_create_date(mp4)
dur = exif_duration_s(mp4)
if not start:
print(f" [skip] no CreateDate: {mp4}"); continue
serial = exif_serial(mp4)
slot = grouped.setdefault(key, {"serial": serial, "videos": []})
if serial and not slot["serial"]:
slot["serial"] = serial
slot["videos"].append({"path": mp4, "start": start, "duration": dur or 0})
return grouped
def main():
ap = argparse.ArgumentParser()
ap.add_argument("root", type=Path)
ap.add_argument("--name", required=True, help="Acquisition name")
ap.add_argument("--gap-min", type=int, default=5, help="Max gap between videos in one segment")
ap.add_argument("--dry-run", action="store_true")
args = ap.parse_args()
if not args.root.exists():
raise SystemExit(f"root not found: {args.root}")
print(f"Scanning {args.root}...")
grouped = scan(args.root)
if not grouped:
print("No (auv, gopro) folders found — expected GPx_AUVyyy layout."); return
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH, isolation_level=None)
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
if args.dry_run:
acq_id = -1
else:
cur = conn.execute(
"INSERT INTO acquisitions (name, source_path) VALUES (?, ?)",
(args.name, str(args.root)),
)
acq_id = cur.lastrowid
print(f"Created acquisition id={acq_id}")
total_jobs = 0
for (auv, gopro_tag), info in sorted(grouped.items()):
serial = info["serial"] or gopro_tag
segs = group_segments(info["videos"], args.gap_min)
print(f"\n{auv} / {gopro_tag} (serial={serial}) — {len(info['videos'])} videos → {len(segs)} segments")
for seg in segs:
dur_min = (seg["end"] - seg["start"]).total_seconds() / 60
print(f" · {seg['label']} ({dur_min:.1f} min, {len(seg['videos'])} files)")
if args.dry_run:
continue
conn.execute("""
INSERT INTO jobs (acquisition_id, auv, gopro_serial, segment_label,
video_paths, status)
VALUES (?, ?, ?, ?, ?, 'queued')
""", (acq_id, auv, serial, seg["label"], json.dumps(seg["videos"])))
total_jobs += 1
print(f"\nInserted {total_jobs} jobs.")
if __name__ == "__main__":
main()