diff --git a/README.md b/README.md index 3dfdf01..4789647 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,49 @@ # cosma-qc -COSMA post-acquisition QC pipeline: per-GoPro lingbot-map reconstruction, job queue, web dashboard \ No newline at end of file +**COSMA post-acquisition QC pipeline** — reconstruction photogrammétrique +par GoPro (lingbot-map), queue de jobs distribués, dashboard web pour suivi +terrain le jour même. + +## Objectif + +Après une acquisition AUV (2 GoPros × 2-3 AUVs × heures d'enregistrement), +savoir rapidement si la couverture est complète avant de replier la mission — +sans attendre les 30 jours du traitement photogrammétrique complet. + +## Pipeline + +``` +SSD plugged ─┐ + ├─▶ Ingestion ─▶ Frame extraction (per GoPro × segment) + │ │ + │ ▼ + │ Job queue (SQLite) + │ │ + │ ┌──────────────┼──────────────┐ + ▼ ▼ ▼ ▼ + Dashboard Worker .87 Worker .84 (scalable) + (FastAPI) (3060) (3090) + │ │ │ + │ └─▶ PLY ◀──────┘ + │ │ + │ ▼ + └──────── ICP stitch (Open3D) ─▶ viser viewer +``` + +## Stack + +- **Backend** : FastAPI + SQLite +- **Frontend** : HTMX (UI réactive sans build JS) +- **Queue** : table SQLite + workers SSH-triggered +- **Monitoring** : polling `nvidia-smi` sur .87 / .84, `df` pour disque +- **Reconstruction** : lingbot-map (GCT-Stream windowed) +- **Stitch** : Open3D ICP + +## Déploiement + +- Service sur .82 (stable, Caddy pour URL propre) +- Workers : SSH vers .87 (3060 12 GB) et .84 (3090 24 GB) + +## État + +Scaffold en cours. diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..87e2c52 --- /dev/null +++ b/app/main.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +import asyncio +import json +import os +import sqlite3 +from contextlib import asynccontextmanager, closing +from datetime import datetime +from pathlib import Path +from typing import Any + +from fastapi import FastAPI, Form, HTTPException, Request +from fastapi.responses import HTMLResponse, JSONResponse +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates + +DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db")) +WORKERS = json.loads(os.environ.get("COSMA_QC_WORKERS", json.dumps([ + {"host": "192.168.0.87", "ssh_alias": "gpu", "gpu": "RTX 3060 12GB"}, + {"host": "192.168.0.84", "ssh_alias": "cosma-vm","gpu": "RTX 3090 24GB"}, +]))) + +STATUSES = ("queued", "extracting", "running", "done", "error") + + +def db() -> sqlite3.Connection: + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(DB_PATH, isolation_level=None) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + conn.row_factory = sqlite3.Row + return conn + + +def init_schema() -> None: + with closing(db()) as conn: + conn.executescript(""" + CREATE TABLE IF NOT EXISTS acquisitions ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + source_path TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + + CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY, + acquisition_id INTEGER NOT NULL REFERENCES acquisitions(id) ON DELETE CASCADE, + auv TEXT NOT NULL, + gopro_serial TEXT NOT NULL, + segment_label TEXT NOT NULL, + video_paths TEXT NOT NULL, + frame_count INTEGER, + frames_dir TEXT, + status TEXT NOT NULL DEFAULT 'queued', + worker_host TEXT, + viser_url TEXT, + ply_path TEXT, + progress INTEGER NOT NULL DEFAULT 0, + log_tail TEXT, + error TEXT, + started_at TEXT, + finished_at TEXT, + created_at TEXT NOT NULL DEFAULT (datetime('now')) + ); + + CREATE INDEX IF NOT EXISTS jobs_status_idx ON jobs(status); + CREATE INDEX IF NOT EXISTS jobs_acq_idx ON jobs(acquisition_id); + """) + + +@asynccontextmanager +async def lifespan(_: FastAPI): + init_schema() + yield + + +app = FastAPI(title="cosma-qc", lifespan=lifespan) +templates = Jinja2Templates(directory=Path(__file__).parent / "templates") +app.mount("/static", StaticFiles(directory=Path(__file__).parent / "static"), name="static") + + +@app.get("/", response_class=HTMLResponse) +async def index(request: Request): + with closing(db()) as conn: + jobs = conn.execute(""" + SELECT j.*, a.name AS acquisition_name + FROM jobs j + LEFT JOIN acquisitions a ON a.id = j.acquisition_id + ORDER BY j.created_at DESC + LIMIT 200 + """).fetchall() + return templates.TemplateResponse("index.html", { + "request": request, + "jobs": jobs, + "workers": WORKERS, + }) + + +@app.get("/api/jobs") +async def list_jobs(): + with closing(db()) as conn: + rows = conn.execute("SELECT * FROM jobs ORDER BY created_at DESC LIMIT 500").fetchall() + return [dict(r) for r in rows] + + +@app.get("/partials/jobs", response_class=HTMLResponse) +async def partial_jobs(request: Request): + with closing(db()) as conn: + jobs = conn.execute(""" + SELECT j.*, a.name AS acquisition_name + FROM jobs j + LEFT JOIN acquisitions a ON a.id = j.acquisition_id + ORDER BY j.created_at DESC + LIMIT 200 + """).fetchall() + return templates.TemplateResponse("_jobs_table.html", {"request": request, "jobs": jobs}) + + +@app.get("/partials/monitor", response_class=HTMLResponse) +async def partial_monitor(request: Request): + stats = await asyncio.gather(*[_worker_stats(w) for w in WORKERS]) + return templates.TemplateResponse("_monitor.html", {"request": request, "workers": stats}) + + +async def _worker_stats(worker: dict) -> dict: + alias = worker["ssh_alias"] + try: + proc = await asyncio.create_subprocess_exec( + "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", alias, + "nvidia-smi --query-gpu=memory.used,memory.total,utilization.gpu --format=csv,noheader,nounits && df -h / | tail -1", + stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, + ) + out, _ = await asyncio.wait_for(proc.communicate(), timeout=4) + text = out.decode().strip().splitlines() + gpu_line = text[0].split(",") if text else ["?", "?", "?"] + disk = text[1].split() if len(text) > 1 else ["?"] * 6 + return { + **worker, + "online": True, + "vram_used_mib": int(gpu_line[0].strip()) if gpu_line[0].strip().isdigit() else None, + "vram_total_mib": int(gpu_line[1].strip()) if gpu_line[1].strip().isdigit() else None, + "gpu_util_pct": int(gpu_line[2].strip()) if gpu_line[2].strip().isdigit() else None, + "disk_used_pct": disk[4] if len(disk) > 4 else "?", + } + except Exception as e: + return {**worker, "online": False, "error": str(e)[:80]} + + +@app.post("/jobs/{job_id}/cancel") +async def cancel_job(job_id: int): + with closing(db()) as conn: + conn.execute( + "UPDATE jobs SET status='error', error='cancelled by user', finished_at=datetime('now') " + "WHERE id=? AND status IN ('queued','extracting','running')", + (job_id,), + ) + return {"ok": True} + + +@app.post("/jobs/{job_id}/retry") +async def retry_job(job_id: int): + with closing(db()) as conn: + conn.execute( + "UPDATE jobs SET status='queued', error=NULL, progress=0, started_at=NULL, " + "finished_at=NULL, worker_host=NULL WHERE id=? AND status='error'", + (job_id,), + ) + return {"ok": True} diff --git a/app/static/style.css b/app/static/style.css new file mode 100644 index 0000000..9dd500a --- /dev/null +++ b/app/static/style.css @@ -0,0 +1,69 @@ +:root { + --bg: #0b1220; + --panel: #121a2b; + --border: #1f2a44; + --text: #e6edf7; + --muted: #7b8aa8; + --accent: #5fd0ff; + --ok: #3ddc84; + --warn: #f5c518; + --err: #ff5c7a; +} +* { box-sizing: border-box; } +body { + font-family: ui-monospace, "JetBrains Mono", Menlo, monospace; + background: var(--bg); color: var(--text); + margin: 0; padding: 1.5rem; +} +header { display: flex; align-items: baseline; gap: 1rem; margin-bottom: 1.25rem; } +header h1 { margin: 0; font-size: 1.25rem; color: var(--accent); } +.sub { color: var(--muted); font-size: 0.85rem; } +h2 { font-size: 1rem; color: var(--accent); margin: 1.5rem 0 0.5rem; } + +.muted { color: var(--muted); } +.err { color: var(--err); } + +section { background: var(--panel); border: 1px solid var(--border); + border-radius: 10px; padding: 1rem; margin-bottom: 1rem; } + +.worker-grid { display: grid; gap: 1rem; + grid-template-columns: repeat(auto-fill, minmax(240px, 1fr)); } +.worker { background: rgba(255,255,255,0.02); border: 1px solid var(--border); + border-radius: 8px; padding: 0.75rem; } +.worker.offline { opacity: 0.55; border-color: var(--err); } +.worker .hdr { display: flex; justify-content: space-between; + gap: 0.5rem; align-items: center; margin-bottom: 0.5rem; flex-wrap: wrap; } +.worker .gpu { color: var(--muted); font-size: 0.8rem; } +.worker .state { color: var(--ok); font-size: 0.75rem; text-transform: uppercase; } +.worker.offline .state { color: var(--err); } +.bar { display: grid; grid-template-columns: 60px 1fr auto; gap: 0.5rem; + align-items: center; margin-bottom: 0.25rem; font-size: 0.8rem; } +.bar small { color: var(--muted); } + +progress { appearance: none; height: 10px; width: 100%; + border-radius: 6px; overflow: hidden; border: 1px solid var(--border); background: #0a1020; } +progress::-webkit-progress-bar { background: #0a1020; } +progress::-webkit-progress-value { background: var(--accent); } +progress::-moz-progress-bar { background: var(--accent); } + +table.jobs { width: 100%; border-collapse: collapse; font-size: 0.85rem; } +table.jobs th, table.jobs td { text-align: left; padding: 0.45rem 0.55rem; + border-bottom: 1px solid var(--border); } +table.jobs th { color: var(--muted); font-weight: normal; text-transform: uppercase; + font-size: 0.72rem; letter-spacing: 0.04em; } +tr.status-done td { color: var(--ok); } +tr.status-error td { color: var(--err); } +tr.err-row td { color: var(--err); padding-top: 0; border-top: none; } + +.pill { padding: 0.15rem 0.5rem; border-radius: 999px; font-size: 0.7rem; + background: rgba(255,255,255,0.05); border: 1px solid var(--border); } +.pill.queued { color: var(--muted); } +.pill.extracting, .pill.running { color: var(--warn); border-color: var(--warn); } +.pill.done { color: var(--ok); border-color: var(--ok); } +.pill.error { color: var(--err); border-color: var(--err); } + +button { background: transparent; color: var(--accent); border: 1px solid var(--border); + padding: 0.2rem 0.6rem; border-radius: 6px; cursor: pointer; font-family: inherit; font-size: 0.75rem; } +button:hover { border-color: var(--accent); } +a { color: var(--accent); } +code { background: rgba(255,255,255,0.05); padding: 0 0.25rem; border-radius: 3px; } diff --git a/app/templates/_jobs_table.html b/app/templates/_jobs_table.html new file mode 100644 index 0000000..0063b96 --- /dev/null +++ b/app/templates/_jobs_table.html @@ -0,0 +1,44 @@ +{% if not jobs %} +

Aucun job. Ingeste un dossier d'acquisition via scripts/ingest.py.

+{% else %} + + + + + + + + + {% for j in jobs %} + + + + + + + + + + + + + {% if j.error %} + + {% endif %} + {% endfor %} + +
#AcquisitionAUVGoProSegmentFramesStatusWorkerProgressActions
{{ j.id }}{{ j.acquisition_name }}{{ j.auv }}{{ j.gopro_serial }}{{ j.segment_label }}{{ j.frame_count or "—" }}{{ j.status }}{{ j.worker_host or "—" }} + {% if j.status == 'running' or j.status == 'extracting' %} + {{ j.progress }}% + {% elif j.status == 'done' and j.viser_url %} + viser + {% if j.ply_path %} · PLY{% endif %} + {% else %}—{% endif %} + + {% if j.status in ['queued','extracting','running'] %} + + {% elif j.status == 'error' %} + + {% endif %} +
{{ j.error }}
+{% endif %} diff --git a/app/templates/_monitor.html b/app/templates/_monitor.html new file mode 100644 index 0000000..ba35ff1 --- /dev/null +++ b/app/templates/_monitor.html @@ -0,0 +1,26 @@ +
+ {% for w in workers %} +
+
+ {{ w.host }} + {{ w.gpu }} + {% if w.online %}online{% else %}offline{% endif %} +
+ {% if w.online %} +
+ VRAM + + {{ w.vram_used_mib }} / {{ w.vram_total_mib }} MiB +
+
+ GPU + + {{ w.gpu_util_pct }}% +
+
Disk /{{ w.disk_used_pct }}
+ {% else %} +
{{ w.error or "unreachable" }}
+ {% endif %} +
+ {% endfor %} +
diff --git a/app/templates/index.html b/app/templates/index.html new file mode 100644 index 0000000..e60b19f --- /dev/null +++ b/app/templates/index.html @@ -0,0 +1,27 @@ + + + + + cosma-qc — dashboard + + + + + +
+

cosma-qc

+ post-acquisition QC · lingbot-map pipeline +
+ +
+

Chargement des workers…

+
+ +
+

Jobs

+
+

Chargement…

+
+
+ + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..c337bf8 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,14 @@ +[project] +name = "cosma-qc" +version = "0.1.0" +description = "COSMA post-acquisition QC pipeline" +requires-python = ">=3.11" +dependencies = [ + "fastapi>=0.115", + "jinja2>=3.1", + "uvicorn[standard]>=0.30", + "python-multipart>=0.0.9", +] + +[tool.uv] +package = false diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py new file mode 100644 index 0000000..6176ec7 --- /dev/null +++ b/scripts/dispatcher.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +"""Dispatcher daemon: picks queued jobs and runs them on available workers. + +One-shot worker loop. Run as a systemd service (or manually). Handles both +extraction (ffmpeg on the worker) and reconstruction (lingbot-map on the +worker). Progress is written back to the DB. + +Env: + COSMA_QC_DB : SQLite path (default /var/lib/cosma-qc/jobs.db) + COSMA_QC_WORKERS : JSON list of workers [{host, ssh_alias, gpu, vram_mib, + frames_dir, lingbot_path}] + COSMA_QC_FPS : extraction fps (default 3) + COSMA_QC_IMG_H : image height (default 294) + COSMA_QC_IMG_W : image width (default 518) + +Jobs lifecycle: + queued → extracting → running → done + ↘ error +""" +from __future__ import annotations + +import json +import os +import re +import shlex +import sqlite3 +import subprocess +import sys +import time +from contextlib import closing +from datetime import datetime, timezone +from pathlib import Path + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat(timespec="seconds") + +DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db")) +FPS = int(os.environ.get("COSMA_QC_FPS", "3")) +IMG_H = int(os.environ.get("COSMA_QC_IMG_H", "294")) +IMG_W = int(os.environ.get("COSMA_QC_IMG_W", "518")) +POLL_S = int(os.environ.get("COSMA_QC_POLL_S", "4")) + +DEFAULT_WORKERS = [ + { + "host": "192.168.0.87", "ssh_alias": "gpu", "gpu": "RTX 3060 12GB", + "vram_mib": 11913, + "frames_dir": "/home/floppyrj45/cosma-qc-frames", + "lingbot_path": "/home/floppyrj45/ai-video/lingbot-map", + "viser_port_base": 8100, + }, + { + "host": "192.168.0.84", "ssh_alias": "cosma-vm", "gpu": "RTX 3090 24GB", + "vram_mib": 24576, + "frames_dir": "/home/floppyrj45/cosma-qc-frames", + "lingbot_path": "/home/floppyrj45/ai-video/lingbot-map", + "viser_port_base": 8100, + }, +] +WORKERS = json.loads(os.environ.get("COSMA_QC_WORKERS", json.dumps(DEFAULT_WORKERS))) + + +def db() -> sqlite3.Connection: + conn = sqlite3.connect(DB_PATH, isolation_level=None) + conn.execute("PRAGMA journal_mode=WAL") + conn.row_factory = sqlite3.Row + return conn + + +def ssh(alias: str, cmd: str, timeout: int = 30) -> tuple[int, str, str]: + p = subprocess.run( + ["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", alias, cmd], + capture_output=True, text=True, timeout=timeout, + ) + return p.returncode, p.stdout, p.stderr + + +def worker_free_vram_mib(worker: dict) -> int: + rc, out, _ = ssh(worker["ssh_alias"], "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits") + try: + return int(out.strip().splitlines()[0]) if rc == 0 else 0 + except Exception: + return 0 + + +def pick_worker(estimated_vram_mib: int) -> dict | None: + best = None + for w in WORKERS: + free = worker_free_vram_mib(w) + if free >= estimated_vram_mib and (best is None or free > best[0]): + best = (free, w) + return best[1] if best else None + + +def estimate_vram_mib(frame_count: int) -> int: + # Based on empirical: 300 frames peak ≈ 9.4 GiB, 600 frames OOM @ ~11 GiB. + # Linear extrapolation with headroom. + return int(3500 + 13 * frame_count) # MiB + + +def set_status(job_id: int, **fields): + keys = list(fields.keys()) + vals = [fields[k] for k in keys] + q = "UPDATE jobs SET " + ", ".join(f"{k}=?" for k in keys) + " WHERE id=?" + with closing(db()) as conn: + conn.execute(q, (*vals, job_id)) + + +def count_frames(worker: dict, frames_dir: str) -> int: + rc, out, _ = ssh(worker["ssh_alias"], f"ls {shlex.quote(frames_dir)} 2>/dev/null | wc -l") + try: + return int(out.strip()) if rc == 0 else 0 + except Exception: + return 0 + + +def do_extract(job: sqlite3.Row, worker: dict) -> str: + """Run ffmpeg on the worker for each video in job.video_paths.""" + videos = json.loads(job["video_paths"]) + frames_dir = f"{worker['frames_dir']}/job_{job['id']}" + ssh(worker["ssh_alias"], f"mkdir -p {shlex.quote(frames_dir)}") + idx = 0 + for v in videos: + vf = f"fps={FPS},scale={IMG_W}:{IMG_H}" + pattern = f"{frames_dir}/frame_%06d.jpg" + # Prepend to idx to keep frame ordering across videos. + cmd = ( + f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(v)} " + f"-vf {shlex.quote(vf)} -start_number {idx} -q:v 4 " + f"{shlex.quote(pattern)}" + ) + rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3600) + if rc != 0: + raise RuntimeError(f"ffmpeg failed on {v}: {err[:200]}") + # Count frames now present to bump idx + idx = count_frames(worker, frames_dir) + set_status(job["id"], frame_count=idx) + return frames_dir + + +def do_reconstruct(job: sqlite3.Row, worker: dict, frames_dir: str) -> tuple[str, str]: + port = worker["viser_port_base"] + job["id"] + log = f"/tmp/cosma-qc-job-{job['id']}.log" + ckpt = f"{worker['lingbot_path']}/checkpoints/lingbot-map/lingbot-map-long.pt" + cmd = ( + f"cd {shlex.quote(worker['lingbot_path'])} && source .venv/bin/activate && " + f"python3 demo.py --model_path {shlex.quote(ckpt)} " + f"--image_folder {shlex.quote(frames_dir)} --port {port} " + f"--use_sdpa --mode windowed --window_size 16 --overlap_size 2 --offload_to_cpu " + f"> {log} 2>&1" + ) + rc, _, err = ssh(worker["ssh_alias"], cmd, timeout=3 * 3600) + if rc != 0: + tail = ssh(worker["ssh_alias"], f"tail -30 {log}")[1] + raise RuntimeError(f"demo.py failed: {err[:200]}\n---\n{tail[:800]}") + viser_url = f"http://{worker['host']}:{port}" + return viser_url, log + + +def run_one(job: sqlite3.Row): + job_id = job["id"] + estimated = estimate_vram_mib(job["frame_count"] or 400) + worker = pick_worker(estimated) + if not worker: + return # retry later + set_status(job_id, status="extracting", worker_host=worker["host"], + started_at=_now_iso()) + try: + frames_dir = do_extract(job, worker) + frame_count = count_frames(worker, frames_dir) + set_status(job_id, frames_dir=frames_dir, frame_count=frame_count, + status="running", progress=0) + viser_url, log = do_reconstruct(job, worker, frames_dir) + set_status(job_id, status="done", viser_url=viser_url, progress=100, + log_tail=log, + finished_at=_now_iso()) + except Exception as e: + set_status(job_id, status="error", error=str(e)[:2000], + finished_at=_now_iso()) + + +def pop_queued() -> sqlite3.Row | None: + with closing(db()) as conn: + return conn.execute( + "SELECT * FROM jobs WHERE status='queued' ORDER BY created_at LIMIT 1" + ).fetchone() + + +def main(): + print(f"cosma-qc dispatcher · DB={DB_PATH} · workers={[w['host'] for w in WORKERS]}") + while True: + job = pop_queued() + if job is None: + time.sleep(POLL_S); continue + print(f"→ picking up job #{job['id']} ({job['auv']}/{job['gopro_serial']}/{job['segment_label']})") + run_one(job) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + sys.exit(0) diff --git a/scripts/ingest.py b/scripts/ingest.py new file mode 100644 index 0000000..7f545c9 --- /dev/null +++ b/scripts/ingest.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 +"""Scan an acquisition directory, group GoPro MP4s into continuous segments, +and insert jobs into the cosma-qc DB. + +Usage: + python3 ingest.py /mnt/portablessd/COSMA-/ --name "La Ciotat 8 avril" [--gap-min 5] + +Directory layout expected (we saw this from the real SSD): + + /media/gopro{1,2}/GP{1,2}_AUV{209,210}/GX*.MP4 + +The AUV tag and GoPro id come from folder names. The serial is read via +exiftool (falls back to folder name if unavailable). Continuous segments are +derived from EXIF CreateDate timestamps with a configurable gap threshold. +""" +from __future__ import annotations + +import argparse +import json +import os +import re +import sqlite3 +import subprocess +from datetime import datetime, timedelta +from pathlib import Path + +DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db")) +FOLDER_RE = re.compile(r"GP(?P\d+)_AUV(?P\d+)", re.I) + + +def exif_create_date(path: Path) -> datetime | None: + try: + out = subprocess.check_output( + ["exiftool", "-s3", "-CreateDate", "-api", "QuickTimeUTC=1", str(path)], + stderr=subprocess.DEVNULL, text=True, timeout=10, + ).strip() + return datetime.strptime(out, "%Y:%m:%d %H:%M:%S") if out else None + except Exception: + return None + + +def exif_duration_s(path: Path) -> float | None: + try: + out = subprocess.check_output( + ["exiftool", "-s3", "-Duration#", str(path)], + stderr=subprocess.DEVNULL, text=True, timeout=10, + ).strip() + return float(out) if out else None + except Exception: + return None + + +def exif_serial(path: Path) -> str | None: + try: + out = subprocess.check_output( + ["exiftool", "-s3", "-SerialNumber", "-CameraSerialNumber", str(path)], + stderr=subprocess.DEVNULL, text=True, timeout=10, + ).strip().splitlines() + for line in out: + line = line.strip() + if line: + return line + except Exception: + pass + return None + + +def group_segments(videos: list[dict], gap_min: int) -> list[dict]: + """Group consecutive videos into segments when gap between end-of-A and + start-of-B is below `gap_min` minutes.""" + videos = sorted(videos, key=lambda v: v["start"]) + segments: list[list[dict]] = [] + for v in videos: + if not segments: + segments.append([v]); continue + last = segments[-1][-1] + last_end = last["start"] + timedelta(seconds=last["duration"] or 0) + if (v["start"] - last_end) <= timedelta(minutes=gap_min): + segments[-1].append(v) + else: + segments.append([v]) + out = [] + for seg in segments: + start = seg[0]["start"] + end = seg[-1]["start"] + timedelta(seconds=seg[-1]["duration"] or 0) + out.append({ + "start": start, "end": end, + "label": f"{start.strftime('%H:%M')}–{end.strftime('%H:%M')}", + "videos": [str(v["path"]) for v in seg], + }) + return out + + +def scan(root: Path) -> dict: + """Return {(auv, gopro_tag): {serial, videos[]}}""" + grouped: dict[tuple[str, str], dict] = {} + for mp4 in root.rglob("*.MP4"): + m = FOLDER_RE.search(str(mp4.parent)) + if not m: + continue + auv = f"AUV{m.group('auv')}" + gopro_tag = f"GP{m.group('gopro')}" + key = (auv, gopro_tag) + start = exif_create_date(mp4) + dur = exif_duration_s(mp4) + if not start: + print(f" [skip] no CreateDate: {mp4}"); continue + serial = exif_serial(mp4) + slot = grouped.setdefault(key, {"serial": serial, "videos": []}) + if serial and not slot["serial"]: + slot["serial"] = serial + slot["videos"].append({"path": mp4, "start": start, "duration": dur or 0}) + return grouped + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("root", type=Path) + ap.add_argument("--name", required=True, help="Acquisition name") + ap.add_argument("--gap-min", type=int, default=5, help="Max gap between videos in one segment") + ap.add_argument("--dry-run", action="store_true") + args = ap.parse_args() + + if not args.root.exists(): + raise SystemExit(f"root not found: {args.root}") + + print(f"Scanning {args.root}...") + grouped = scan(args.root) + if not grouped: + print("No (auv, gopro) folders found — expected GPx_AUVyyy layout."); return + + DB_PATH.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(DB_PATH, isolation_level=None) + conn.execute("PRAGMA foreign_keys=ON") + conn.row_factory = sqlite3.Row + + if args.dry_run: + acq_id = -1 + else: + cur = conn.execute( + "INSERT INTO acquisitions (name, source_path) VALUES (?, ?)", + (args.name, str(args.root)), + ) + acq_id = cur.lastrowid + print(f"Created acquisition id={acq_id}") + + total_jobs = 0 + for (auv, gopro_tag), info in sorted(grouped.items()): + serial = info["serial"] or gopro_tag + segs = group_segments(info["videos"], args.gap_min) + print(f"\n{auv} / {gopro_tag} (serial={serial}) — {len(info['videos'])} videos → {len(segs)} segments") + for seg in segs: + dur_min = (seg["end"] - seg["start"]).total_seconds() / 60 + print(f" · {seg['label']} ({dur_min:.1f} min, {len(seg['videos'])} files)") + if args.dry_run: + continue + conn.execute(""" + INSERT INTO jobs (acquisition_id, auv, gopro_serial, segment_label, + video_paths, status) + VALUES (?, ?, ?, ?, ?, 'queued') + """, (acq_id, auv, serial, seg["label"], json.dumps(seg["videos"]))) + total_jobs += 1 + + print(f"\nInserted {total_jobs} jobs.") + + +if __name__ == "__main__": + main()