ingest+dispatcher — support acquisition depuis remote host via SSH
- ingest.py : --remote-host <alias> pour scanner/exiftool via SSH, stocke les chemins avec préfixe "alias:" pour que le worker sache puller direct - dispatcher.py : scp_to_worker détecte "host:path" et fait pull remote (worker → source host) au lieu du double hop via dispatcher - _path_basename gère les paths préfixés pour ffmpeg Permet d'ingester les vidéos depuis n'importe quelle machine accessible en SSH sans passer 145GB par le conteneur FastAPI.
This commit is contained in:
@@ -121,6 +121,24 @@ def count_frames(worker: dict, frames_dir: str) -> int:
|
|||||||
|
|
||||||
|
|
||||||
def scp_to_worker(local_path: str, worker: dict, remote_path: str):
|
def scp_to_worker(local_path: str, worker: dict, remote_path: str):
|
||||||
|
"""Copy a file to the worker.
|
||||||
|
|
||||||
|
`local_path` may be either:
|
||||||
|
- a path on the dispatcher host (standard scp from here)
|
||||||
|
- "host:abs_path" — pulled by the worker directly from `host`
|
||||||
|
(avoids routing bytes through the dispatcher).
|
||||||
|
"""
|
||||||
|
if ":" in local_path and not local_path.startswith("/"):
|
||||||
|
src_host, src_path = local_path.split(":", 1)
|
||||||
|
# Pull from source host directly on the worker
|
||||||
|
pull_cmd = (
|
||||||
|
f"scp -o BatchMode=yes {shlex.quote(src_host)}:{shlex.quote(src_path)} "
|
||||||
|
f"{shlex.quote(remote_path)}"
|
||||||
|
)
|
||||||
|
rc, _, err = ssh(worker["ssh_alias"], pull_cmd, timeout=7200)
|
||||||
|
if rc != 0:
|
||||||
|
raise RuntimeError(f"remote scp ({src_host}→{worker['host']}) failed: {err[:200]}")
|
||||||
|
return
|
||||||
r = subprocess.run(
|
r = subprocess.run(
|
||||||
["scp", "-o", "BatchMode=yes", local_path, f"{worker['ssh_alias']}:{remote_path}"],
|
["scp", "-o", "BatchMode=yes", local_path, f"{worker['ssh_alias']}:{remote_path}"],
|
||||||
capture_output=True, timeout=1800,
|
capture_output=True, timeout=1800,
|
||||||
@@ -129,6 +147,12 @@ def scp_to_worker(local_path: str, worker: dict, remote_path: str):
|
|||||||
raise RuntimeError(f"scp failed: {r.stderr.decode()[:200]}")
|
raise RuntimeError(f"scp failed: {r.stderr.decode()[:200]}")
|
||||||
|
|
||||||
|
|
||||||
|
def _path_basename(p: str) -> str:
|
||||||
|
if ":" in p and not p.startswith("/"):
|
||||||
|
return Path(p.split(":", 1)[1]).name
|
||||||
|
return Path(p).name
|
||||||
|
|
||||||
|
|
||||||
def do_extract(job: sqlite3.Row, worker: dict) -> str:
|
def do_extract(job: sqlite3.Row, worker: dict) -> str:
|
||||||
videos = json.loads(job["video_paths"])
|
videos = json.loads(job["video_paths"])
|
||||||
frames_dir = f"{worker['frames_dir']}/job_{job['id']}"
|
frames_dir = f"{worker['frames_dir']}/job_{job['id']}"
|
||||||
@@ -138,10 +162,10 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str:
|
|||||||
vf = f"fps={FPS},scale={IMG_W}:{IMG_H}"
|
vf = f"fps={FPS},scale={IMG_W}:{IMG_H}"
|
||||||
pattern = f"{frames_dir}/frame_%06d.jpg"
|
pattern = f"{frames_dir}/frame_%06d.jpg"
|
||||||
# Copy video to worker if it doesn't exist there
|
# Copy video to worker if it doesn't exist there
|
||||||
worker_src = f"{frames_dir}/src_{Path(v).name}"
|
worker_src = f"{frames_dir}/src_{_path_basename(v)}"
|
||||||
rc_check = ssh(worker["ssh_alias"], f"test -f {shlex.quote(worker_src)}")[0]
|
rc_check = ssh(worker["ssh_alias"], f"test -f {shlex.quote(worker_src)}")[0]
|
||||||
if rc_check != 0:
|
if rc_check != 0:
|
||||||
print(f" scp {Path(v).name} → {worker['host']}...")
|
print(f" scp {_path_basename(v)} → {worker['host']}...")
|
||||||
scp_to_worker(v, worker, worker_src)
|
scp_to_worker(v, worker, worker_src)
|
||||||
cmd = (
|
cmd = (
|
||||||
f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(worker_src)} "
|
f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(worker_src)} "
|
||||||
|
|||||||
@@ -27,13 +27,23 @@ from pathlib import Path
|
|||||||
DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db"))
|
DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db"))
|
||||||
FOLDER_RE = re.compile(r"GP(?P<gopro>\d+)_AUV(?P<auv>\d+)", re.I)
|
FOLDER_RE = re.compile(r"GP(?P<gopro>\d+)_AUV(?P<auv>\d+)", re.I)
|
||||||
|
|
||||||
|
REMOTE_HOST: str | None = None # set via --remote-host
|
||||||
|
|
||||||
|
|
||||||
|
def _run_cmd(args: list[str], timeout: int = 10) -> str:
|
||||||
|
"""Run a command locally or on REMOTE_HOST via SSH."""
|
||||||
|
if REMOTE_HOST:
|
||||||
|
import shlex as _shlex
|
||||||
|
remote_cmd = " ".join(_shlex.quote(a) for a in args)
|
||||||
|
args = ["ssh", "-o", "BatchMode=yes", REMOTE_HOST, remote_cmd]
|
||||||
|
return subprocess.check_output(args, stderr=subprocess.DEVNULL, text=True, timeout=timeout).strip()
|
||||||
|
|
||||||
|
|
||||||
def exif_create_date(path: Path) -> datetime | None:
|
def exif_create_date(path: Path) -> datetime | None:
|
||||||
try:
|
try:
|
||||||
out = subprocess.check_output(
|
out = _run_cmd(
|
||||||
["exiftool", "-s3", "-CreateDate", "-api", "QuickTimeUTC=1", str(path)],
|
["exiftool", "-s3", "-CreateDate", "-api", "QuickTimeUTC=1", str(path)],
|
||||||
stderr=subprocess.DEVNULL, text=True, timeout=10,
|
)
|
||||||
).strip()
|
|
||||||
if not out:
|
if not out:
|
||||||
return None
|
return None
|
||||||
# Strip timezone suffix (+HH:MM or -HH:MM) if present
|
# Strip timezone suffix (+HH:MM or -HH:MM) if present
|
||||||
@@ -46,10 +56,7 @@ def exif_create_date(path: Path) -> datetime | None:
|
|||||||
|
|
||||||
def exif_duration_s(path: Path) -> float | None:
|
def exif_duration_s(path: Path) -> float | None:
|
||||||
try:
|
try:
|
||||||
out = subprocess.check_output(
|
out = _run_cmd(["exiftool", "-s3", "-Duration#", str(path)])
|
||||||
["exiftool", "-s3", "-Duration#", str(path)],
|
|
||||||
stderr=subprocess.DEVNULL, text=True, timeout=10,
|
|
||||||
).strip()
|
|
||||||
return float(out) if out else None
|
return float(out) if out else None
|
||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
@@ -57,10 +64,9 @@ def exif_duration_s(path: Path) -> float | None:
|
|||||||
|
|
||||||
def exif_serial(path: Path) -> str | None:
|
def exif_serial(path: Path) -> str | None:
|
||||||
try:
|
try:
|
||||||
out = subprocess.check_output(
|
out = _run_cmd(
|
||||||
["exiftool", "-s3", "-SerialNumber", "-CameraSerialNumber", str(path)],
|
["exiftool", "-s3", "-SerialNumber", "-CameraSerialNumber", str(path)],
|
||||||
stderr=subprocess.DEVNULL, text=True, timeout=10,
|
).splitlines()
|
||||||
).strip().splitlines()
|
|
||||||
for line in out:
|
for line in out:
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if line:
|
if line:
|
||||||
@@ -88,18 +94,31 @@ def group_segments(videos: list[dict], gap_min: int) -> list[dict]:
|
|||||||
for seg in segments:
|
for seg in segments:
|
||||||
start = seg[0]["start"]
|
start = seg[0]["start"]
|
||||||
end = seg[-1]["start"] + timedelta(seconds=seg[-1]["duration"] or 0)
|
end = seg[-1]["start"] + timedelta(seconds=seg[-1]["duration"] or 0)
|
||||||
|
prefix = f"{REMOTE_HOST}:" if REMOTE_HOST else ""
|
||||||
out.append({
|
out.append({
|
||||||
"start": start, "end": end,
|
"start": start, "end": end,
|
||||||
"label": f"{start.strftime('%H:%M')}–{end.strftime('%H:%M')}",
|
"label": f"{start.strftime('%H:%M')}–{end.strftime('%H:%M')}",
|
||||||
"videos": [str(v["path"]) for v in seg],
|
"videos": [prefix + str(v["path"]) for v in seg],
|
||||||
})
|
})
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _list_mp4s(root: Path) -> list[Path]:
|
||||||
|
if REMOTE_HOST:
|
||||||
|
import shlex as _shlex
|
||||||
|
out = subprocess.check_output(
|
||||||
|
["ssh", "-o", "BatchMode=yes", REMOTE_HOST,
|
||||||
|
f"find {_shlex.quote(str(root))} -type f -iname '*.MP4'"],
|
||||||
|
text=True, timeout=60,
|
||||||
|
)
|
||||||
|
return [Path(l.strip()) for l in out.splitlines() if l.strip()]
|
||||||
|
return list(root.rglob("*.MP4"))
|
||||||
|
|
||||||
|
|
||||||
def scan(root: Path) -> dict:
|
def scan(root: Path) -> dict:
|
||||||
"""Return {(auv, gopro_tag): {serial, videos[]}}"""
|
"""Return {(auv, gopro_tag): {serial, videos[]}}"""
|
||||||
grouped: dict[tuple[str, str], dict] = {}
|
grouped: dict[tuple[str, str], dict] = {}
|
||||||
for mp4 in root.rglob("*.MP4"):
|
for mp4 in _list_mp4s(root):
|
||||||
m = FOLDER_RE.search(str(mp4.parent))
|
m = FOLDER_RE.search(str(mp4.parent))
|
||||||
if not m:
|
if not m:
|
||||||
continue
|
continue
|
||||||
@@ -124,12 +143,17 @@ def main():
|
|||||||
ap.add_argument("--name", required=True, help="Acquisition name")
|
ap.add_argument("--name", required=True, help="Acquisition name")
|
||||||
ap.add_argument("--gap-min", type=int, default=5, help="Max gap between videos in one segment")
|
ap.add_argument("--gap-min", type=int, default=5, help="Max gap between videos in one segment")
|
||||||
ap.add_argument("--dry-run", action="store_true")
|
ap.add_argument("--dry-run", action="store_true")
|
||||||
|
ap.add_argument("--remote-host", default=None,
|
||||||
|
help="SSH alias to read videos/exiftool from (stored paths get 'alias:' prefix)")
|
||||||
args = ap.parse_args()
|
args = ap.parse_args()
|
||||||
|
|
||||||
if not args.root.exists():
|
global REMOTE_HOST
|
||||||
|
REMOTE_HOST = args.remote_host
|
||||||
|
|
||||||
|
if not REMOTE_HOST and not args.root.exists():
|
||||||
raise SystemExit(f"root not found: {args.root}")
|
raise SystemExit(f"root not found: {args.root}")
|
||||||
|
|
||||||
print(f"Scanning {args.root}...")
|
print(f"Scanning {args.root}{' @ ' + REMOTE_HOST if REMOTE_HOST else ''}...")
|
||||||
grouped = scan(args.root)
|
grouped = scan(args.root)
|
||||||
if not grouped:
|
if not grouped:
|
||||||
print("No (auv, gopro) folders found — expected GPx_AUVyyy layout."); return
|
print("No (auv, gopro) folders found — expected GPx_AUVyyy layout."); return
|
||||||
|
|||||||
Reference in New Issue
Block a user