- ingest.py : --remote-host <alias> pour scanner/exiftool via SSH, stocke les chemins avec préfixe "alias:" pour que le worker sache puller direct - dispatcher.py : scp_to_worker détecte "host:path" et fait pull remote (worker → source host) au lieu du double hop via dispatcher - _path_basename gère les paths préfixés pour ffmpeg Permet d'ingester les vidéos depuis n'importe quelle machine accessible en SSH sans passer 145GB par le conteneur FastAPI.
198 lines
6.8 KiB
Python
198 lines
6.8 KiB
Python
#!/usr/bin/env python3
|
||
"""Scan an acquisition directory, group GoPro MP4s into continuous segments,
|
||
and insert jobs into the cosma-qc DB.
|
||
|
||
Usage:
|
||
python3 ingest.py /mnt/portablessd/COSMA-<date>/ --name "La Ciotat 8 avril" [--gap-min 5]
|
||
|
||
Directory layout expected (we saw this from the real SSD):
|
||
|
||
<root>/media/gopro{1,2}/GP{1,2}_AUV{209,210}/GX*.MP4
|
||
|
||
The AUV tag and GoPro id come from folder names. The serial is read via
|
||
exiftool (falls back to folder name if unavailable). Continuous segments are
|
||
derived from EXIF CreateDate timestamps with a configurable gap threshold.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
import subprocess
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
|
||
DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db"))
|
||
FOLDER_RE = re.compile(r"GP(?P<gopro>\d+)_AUV(?P<auv>\d+)", re.I)
|
||
|
||
REMOTE_HOST: str | None = None # set via --remote-host
|
||
|
||
|
||
def _run_cmd(args: list[str], timeout: int = 10) -> str:
|
||
"""Run a command locally or on REMOTE_HOST via SSH."""
|
||
if REMOTE_HOST:
|
||
import shlex as _shlex
|
||
remote_cmd = " ".join(_shlex.quote(a) for a in args)
|
||
args = ["ssh", "-o", "BatchMode=yes", REMOTE_HOST, remote_cmd]
|
||
return subprocess.check_output(args, stderr=subprocess.DEVNULL, text=True, timeout=timeout).strip()
|
||
|
||
|
||
def exif_create_date(path: Path) -> datetime | None:
|
||
try:
|
||
out = _run_cmd(
|
||
["exiftool", "-s3", "-CreateDate", "-api", "QuickTimeUTC=1", str(path)],
|
||
)
|
||
if not out:
|
||
return None
|
||
# Strip timezone suffix (+HH:MM or -HH:MM) if present
|
||
import re as _re
|
||
out = _re.sub(r'[+-]\d{2}:\d{2}$', '', out).strip()
|
||
return datetime.strptime(out, "%Y:%m:%d %H:%M:%S")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def exif_duration_s(path: Path) -> float | None:
|
||
try:
|
||
out = _run_cmd(["exiftool", "-s3", "-Duration#", str(path)])
|
||
return float(out) if out else None
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def exif_serial(path: Path) -> str | None:
|
||
try:
|
||
out = _run_cmd(
|
||
["exiftool", "-s3", "-SerialNumber", "-CameraSerialNumber", str(path)],
|
||
).splitlines()
|
||
for line in out:
|
||
line = line.strip()
|
||
if line:
|
||
return line
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def group_segments(videos: list[dict], gap_min: int) -> list[dict]:
|
||
"""Group consecutive videos into segments when gap between end-of-A and
|
||
start-of-B is below `gap_min` minutes."""
|
||
videos = sorted(videos, key=lambda v: v["start"])
|
||
segments: list[list[dict]] = []
|
||
for v in videos:
|
||
if not segments:
|
||
segments.append([v]); continue
|
||
last = segments[-1][-1]
|
||
last_end = last["start"] + timedelta(seconds=last["duration"] or 0)
|
||
if (v["start"] - last_end) <= timedelta(minutes=gap_min):
|
||
segments[-1].append(v)
|
||
else:
|
||
segments.append([v])
|
||
out = []
|
||
for seg in segments:
|
||
start = seg[0]["start"]
|
||
end = seg[-1]["start"] + timedelta(seconds=seg[-1]["duration"] or 0)
|
||
prefix = f"{REMOTE_HOST}:" if REMOTE_HOST else ""
|
||
out.append({
|
||
"start": start, "end": end,
|
||
"label": f"{start.strftime('%H:%M')}–{end.strftime('%H:%M')}",
|
||
"videos": [prefix + str(v["path"]) for v in seg],
|
||
})
|
||
return out
|
||
|
||
|
||
def _list_mp4s(root: Path) -> list[Path]:
|
||
if REMOTE_HOST:
|
||
import shlex as _shlex
|
||
out = subprocess.check_output(
|
||
["ssh", "-o", "BatchMode=yes", REMOTE_HOST,
|
||
f"find {_shlex.quote(str(root))} -type f -iname '*.MP4'"],
|
||
text=True, timeout=60,
|
||
)
|
||
return [Path(l.strip()) for l in out.splitlines() if l.strip()]
|
||
return list(root.rglob("*.MP4"))
|
||
|
||
|
||
def scan(root: Path) -> dict:
|
||
"""Return {(auv, gopro_tag): {serial, videos[]}}"""
|
||
grouped: dict[tuple[str, str], dict] = {}
|
||
for mp4 in _list_mp4s(root):
|
||
m = FOLDER_RE.search(str(mp4.parent))
|
||
if not m:
|
||
continue
|
||
auv = f"AUV{m.group('auv')}"
|
||
gopro_tag = f"GP{m.group('gopro')}"
|
||
key = (auv, gopro_tag)
|
||
start = exif_create_date(mp4)
|
||
dur = exif_duration_s(mp4)
|
||
if not start:
|
||
print(f" [skip] no CreateDate: {mp4}"); continue
|
||
serial = exif_serial(mp4)
|
||
slot = grouped.setdefault(key, {"serial": serial, "videos": []})
|
||
if serial and not slot["serial"]:
|
||
slot["serial"] = serial
|
||
slot["videos"].append({"path": mp4, "start": start, "duration": dur or 0})
|
||
return grouped
|
||
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("root", type=Path)
|
||
ap.add_argument("--name", required=True, help="Acquisition name")
|
||
ap.add_argument("--gap-min", type=int, default=5, help="Max gap between videos in one segment")
|
||
ap.add_argument("--dry-run", action="store_true")
|
||
ap.add_argument("--remote-host", default=None,
|
||
help="SSH alias to read videos/exiftool from (stored paths get 'alias:' prefix)")
|
||
args = ap.parse_args()
|
||
|
||
global REMOTE_HOST
|
||
REMOTE_HOST = args.remote_host
|
||
|
||
if not REMOTE_HOST and not args.root.exists():
|
||
raise SystemExit(f"root not found: {args.root}")
|
||
|
||
print(f"Scanning {args.root}{' @ ' + REMOTE_HOST if REMOTE_HOST else ''}...")
|
||
grouped = scan(args.root)
|
||
if not grouped:
|
||
print("No (auv, gopro) folders found — expected GPx_AUVyyy layout."); return
|
||
|
||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||
conn = sqlite3.connect(DB_PATH, isolation_level=None)
|
||
conn.execute("PRAGMA foreign_keys=ON")
|
||
conn.row_factory = sqlite3.Row
|
||
|
||
if args.dry_run:
|
||
acq_id = -1
|
||
else:
|
||
cur = conn.execute(
|
||
"INSERT INTO acquisitions (name, source_path) VALUES (?, ?)",
|
||
(args.name, str(args.root)),
|
||
)
|
||
acq_id = cur.lastrowid
|
||
print(f"Created acquisition id={acq_id}")
|
||
|
||
total_jobs = 0
|
||
for (auv, gopro_tag), info in sorted(grouped.items()):
|
||
serial = info["serial"] or gopro_tag
|
||
segs = group_segments(info["videos"], args.gap_min)
|
||
print(f"\n{auv} / {gopro_tag} (serial={serial}) — {len(info['videos'])} videos → {len(segs)} segments")
|
||
for seg in segs:
|
||
dur_min = (seg["end"] - seg["start"]).total_seconds() / 60
|
||
print(f" · {seg['label']} ({dur_min:.1f} min, {len(seg['videos'])} files)")
|
||
if args.dry_run:
|
||
continue
|
||
conn.execute("""
|
||
INSERT INTO jobs (acquisition_id, auv, gopro_serial, segment_label,
|
||
video_paths, status)
|
||
VALUES (?, ?, ?, ?, ?, 'queued')
|
||
""", (acq_id, auv, serial, seg["label"], json.dumps(seg["videos"])))
|
||
total_jobs += 1
|
||
|
||
print(f"\nInserted {total_jobs} jobs.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|