Files
cosma-qc/scripts/ingest.py
Poulpe 192550b60b ingest+dispatcher — support acquisition depuis remote host via SSH
- ingest.py : --remote-host <alias> pour scanner/exiftool via SSH, stocke
  les chemins avec préfixe "alias:" pour que le worker sache puller direct
- dispatcher.py : scp_to_worker détecte "host:path" et fait pull remote
  (worker → source host) au lieu du double hop via dispatcher
- _path_basename gère les paths préfixés pour ffmpeg

Permet d'ingester les vidéos depuis n'importe quelle machine accessible
en SSH sans passer 145GB par le conteneur FastAPI.
2026-04-21 13:31:40 +00:00

198 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Scan an acquisition directory, group GoPro MP4s into continuous segments,
and insert jobs into the cosma-qc DB.
Usage:
python3 ingest.py /mnt/portablessd/COSMA-<date>/ --name "La Ciotat 8 avril" [--gap-min 5]
Directory layout expected (we saw this from the real SSD):
<root>/media/gopro{1,2}/GP{1,2}_AUV{209,210}/GX*.MP4
The AUV tag and GoPro id come from folder names. The serial is read via
exiftool (falls back to folder name if unavailable). Continuous segments are
derived from EXIF CreateDate timestamps with a configurable gap threshold.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sqlite3
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db"))
FOLDER_RE = re.compile(r"GP(?P<gopro>\d+)_AUV(?P<auv>\d+)", re.I)
REMOTE_HOST: str | None = None # set via --remote-host
def _run_cmd(args: list[str], timeout: int = 10) -> str:
"""Run a command locally or on REMOTE_HOST via SSH."""
if REMOTE_HOST:
import shlex as _shlex
remote_cmd = " ".join(_shlex.quote(a) for a in args)
args = ["ssh", "-o", "BatchMode=yes", REMOTE_HOST, remote_cmd]
return subprocess.check_output(args, stderr=subprocess.DEVNULL, text=True, timeout=timeout).strip()
def exif_create_date(path: Path) -> datetime | None:
try:
out = _run_cmd(
["exiftool", "-s3", "-CreateDate", "-api", "QuickTimeUTC=1", str(path)],
)
if not out:
return None
# Strip timezone suffix (+HH:MM or -HH:MM) if present
import re as _re
out = _re.sub(r'[+-]\d{2}:\d{2}$', '', out).strip()
return datetime.strptime(out, "%Y:%m:%d %H:%M:%S")
except Exception:
return None
def exif_duration_s(path: Path) -> float | None:
try:
out = _run_cmd(["exiftool", "-s3", "-Duration#", str(path)])
return float(out) if out else None
except Exception:
return None
def exif_serial(path: Path) -> str | None:
try:
out = _run_cmd(
["exiftool", "-s3", "-SerialNumber", "-CameraSerialNumber", str(path)],
).splitlines()
for line in out:
line = line.strip()
if line:
return line
except Exception:
pass
return None
def group_segments(videos: list[dict], gap_min: int) -> list[dict]:
"""Group consecutive videos into segments when gap between end-of-A and
start-of-B is below `gap_min` minutes."""
videos = sorted(videos, key=lambda v: v["start"])
segments: list[list[dict]] = []
for v in videos:
if not segments:
segments.append([v]); continue
last = segments[-1][-1]
last_end = last["start"] + timedelta(seconds=last["duration"] or 0)
if (v["start"] - last_end) <= timedelta(minutes=gap_min):
segments[-1].append(v)
else:
segments.append([v])
out = []
for seg in segments:
start = seg[0]["start"]
end = seg[-1]["start"] + timedelta(seconds=seg[-1]["duration"] or 0)
prefix = f"{REMOTE_HOST}:" if REMOTE_HOST else ""
out.append({
"start": start, "end": end,
"label": f"{start.strftime('%H:%M')}{end.strftime('%H:%M')}",
"videos": [prefix + str(v["path"]) for v in seg],
})
return out
def _list_mp4s(root: Path) -> list[Path]:
if REMOTE_HOST:
import shlex as _shlex
out = subprocess.check_output(
["ssh", "-o", "BatchMode=yes", REMOTE_HOST,
f"find {_shlex.quote(str(root))} -type f -iname '*.MP4'"],
text=True, timeout=60,
)
return [Path(l.strip()) for l in out.splitlines() if l.strip()]
return list(root.rglob("*.MP4"))
def scan(root: Path) -> dict:
"""Return {(auv, gopro_tag): {serial, videos[]}}"""
grouped: dict[tuple[str, str], dict] = {}
for mp4 in _list_mp4s(root):
m = FOLDER_RE.search(str(mp4.parent))
if not m:
continue
auv = f"AUV{m.group('auv')}"
gopro_tag = f"GP{m.group('gopro')}"
key = (auv, gopro_tag)
start = exif_create_date(mp4)
dur = exif_duration_s(mp4)
if not start:
print(f" [skip] no CreateDate: {mp4}"); continue
serial = exif_serial(mp4)
slot = grouped.setdefault(key, {"serial": serial, "videos": []})
if serial and not slot["serial"]:
slot["serial"] = serial
slot["videos"].append({"path": mp4, "start": start, "duration": dur or 0})
return grouped
def main():
ap = argparse.ArgumentParser()
ap.add_argument("root", type=Path)
ap.add_argument("--name", required=True, help="Acquisition name")
ap.add_argument("--gap-min", type=int, default=5, help="Max gap between videos in one segment")
ap.add_argument("--dry-run", action="store_true")
ap.add_argument("--remote-host", default=None,
help="SSH alias to read videos/exiftool from (stored paths get 'alias:' prefix)")
args = ap.parse_args()
global REMOTE_HOST
REMOTE_HOST = args.remote_host
if not REMOTE_HOST and not args.root.exists():
raise SystemExit(f"root not found: {args.root}")
print(f"Scanning {args.root}{' @ ' + REMOTE_HOST if REMOTE_HOST else ''}...")
grouped = scan(args.root)
if not grouped:
print("No (auv, gopro) folders found — expected GPx_AUVyyy layout."); return
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH, isolation_level=None)
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
if args.dry_run:
acq_id = -1
else:
cur = conn.execute(
"INSERT INTO acquisitions (name, source_path) VALUES (?, ?)",
(args.name, str(args.root)),
)
acq_id = cur.lastrowid
print(f"Created acquisition id={acq_id}")
total_jobs = 0
for (auv, gopro_tag), info in sorted(grouped.items()):
serial = info["serial"] or gopro_tag
segs = group_segments(info["videos"], args.gap_min)
print(f"\n{auv} / {gopro_tag} (serial={serial}) — {len(info['videos'])} videos → {len(segs)} segments")
for seg in segs:
dur_min = (seg["end"] - seg["start"]).total_seconds() / 60
print(f" · {seg['label']} ({dur_min:.1f} min, {len(seg['videos'])} files)")
if args.dry_run:
continue
conn.execute("""
INSERT INTO jobs (acquisition_id, auv, gopro_serial, segment_label,
video_paths, status)
VALUES (?, ?, ?, ?, ?, 'queued')
""", (acq_id, auv, serial, seg["label"], json.dumps(seg["videos"])))
total_jobs += 1
print(f"\nInserted {total_jobs} jobs.")
if __name__ == "__main__":
main()