diff --git a/scripts/dispatcher.py b/scripts/dispatcher.py index 2540d5c..d9ced53 100644 --- a/scripts/dispatcher.py +++ b/scripts/dispatcher.py @@ -121,6 +121,24 @@ def count_frames(worker: dict, frames_dir: str) -> int: def scp_to_worker(local_path: str, worker: dict, remote_path: str): + """Copy a file to the worker. + + `local_path` may be either: + - a path on the dispatcher host (standard scp from here) + - "host:abs_path" — pulled by the worker directly from `host` + (avoids routing bytes through the dispatcher). + """ + if ":" in local_path and not local_path.startswith("/"): + src_host, src_path = local_path.split(":", 1) + # Pull from source host directly on the worker + pull_cmd = ( + f"scp -o BatchMode=yes {shlex.quote(src_host)}:{shlex.quote(src_path)} " + f"{shlex.quote(remote_path)}" + ) + rc, _, err = ssh(worker["ssh_alias"], pull_cmd, timeout=7200) + if rc != 0: + raise RuntimeError(f"remote scp ({src_host}→{worker['host']}) failed: {err[:200]}") + return r = subprocess.run( ["scp", "-o", "BatchMode=yes", local_path, f"{worker['ssh_alias']}:{remote_path}"], capture_output=True, timeout=1800, @@ -129,6 +147,12 @@ def scp_to_worker(local_path: str, worker: dict, remote_path: str): raise RuntimeError(f"scp failed: {r.stderr.decode()[:200]}") +def _path_basename(p: str) -> str: + if ":" in p and not p.startswith("/"): + return Path(p.split(":", 1)[1]).name + return Path(p).name + + def do_extract(job: sqlite3.Row, worker: dict) -> str: videos = json.loads(job["video_paths"]) frames_dir = f"{worker['frames_dir']}/job_{job['id']}" @@ -138,10 +162,10 @@ def do_extract(job: sqlite3.Row, worker: dict) -> str: vf = f"fps={FPS},scale={IMG_W}:{IMG_H}" pattern = f"{frames_dir}/frame_%06d.jpg" # Copy video to worker if it doesn't exist there - worker_src = f"{frames_dir}/src_{Path(v).name}" + worker_src = f"{frames_dir}/src_{_path_basename(v)}" rc_check = ssh(worker["ssh_alias"], f"test -f {shlex.quote(worker_src)}")[0] if rc_check != 0: - print(f" scp {Path(v).name} → {worker['host']}...") + print(f" scp {_path_basename(v)} → {worker['host']}...") scp_to_worker(v, worker, worker_src) cmd = ( f"ffmpeg -hide_banner -loglevel error -i {shlex.quote(worker_src)} " diff --git a/scripts/ingest.py b/scripts/ingest.py index fe696e5..558640b 100644 --- a/scripts/ingest.py +++ b/scripts/ingest.py @@ -27,13 +27,23 @@ from pathlib import Path DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db")) FOLDER_RE = re.compile(r"GP(?P\d+)_AUV(?P\d+)", re.I) +REMOTE_HOST: str | None = None # set via --remote-host + + +def _run_cmd(args: list[str], timeout: int = 10) -> str: + """Run a command locally or on REMOTE_HOST via SSH.""" + if REMOTE_HOST: + import shlex as _shlex + remote_cmd = " ".join(_shlex.quote(a) for a in args) + args = ["ssh", "-o", "BatchMode=yes", REMOTE_HOST, remote_cmd] + return subprocess.check_output(args, stderr=subprocess.DEVNULL, text=True, timeout=timeout).strip() + def exif_create_date(path: Path) -> datetime | None: try: - out = subprocess.check_output( + out = _run_cmd( ["exiftool", "-s3", "-CreateDate", "-api", "QuickTimeUTC=1", str(path)], - stderr=subprocess.DEVNULL, text=True, timeout=10, - ).strip() + ) if not out: return None # Strip timezone suffix (+HH:MM or -HH:MM) if present @@ -46,10 +56,7 @@ def exif_create_date(path: Path) -> datetime | None: def exif_duration_s(path: Path) -> float | None: try: - out = subprocess.check_output( - ["exiftool", "-s3", "-Duration#", str(path)], - stderr=subprocess.DEVNULL, text=True, timeout=10, - ).strip() + out = _run_cmd(["exiftool", "-s3", "-Duration#", str(path)]) return float(out) if out else None except Exception: return None @@ -57,10 +64,9 @@ def exif_duration_s(path: Path) -> float | None: def exif_serial(path: Path) -> str | None: try: - out = subprocess.check_output( + out = _run_cmd( ["exiftool", "-s3", "-SerialNumber", "-CameraSerialNumber", str(path)], - stderr=subprocess.DEVNULL, text=True, timeout=10, - ).strip().splitlines() + ).splitlines() for line in out: line = line.strip() if line: @@ -88,18 +94,31 @@ def group_segments(videos: list[dict], gap_min: int) -> list[dict]: for seg in segments: start = seg[0]["start"] end = seg[-1]["start"] + timedelta(seconds=seg[-1]["duration"] or 0) + prefix = f"{REMOTE_HOST}:" if REMOTE_HOST else "" out.append({ "start": start, "end": end, "label": f"{start.strftime('%H:%M')}–{end.strftime('%H:%M')}", - "videos": [str(v["path"]) for v in seg], + "videos": [prefix + str(v["path"]) for v in seg], }) return out +def _list_mp4s(root: Path) -> list[Path]: + if REMOTE_HOST: + import shlex as _shlex + out = subprocess.check_output( + ["ssh", "-o", "BatchMode=yes", REMOTE_HOST, + f"find {_shlex.quote(str(root))} -type f -iname '*.MP4'"], + text=True, timeout=60, + ) + return [Path(l.strip()) for l in out.splitlines() if l.strip()] + return list(root.rglob("*.MP4")) + + def scan(root: Path) -> dict: """Return {(auv, gopro_tag): {serial, videos[]}}""" grouped: dict[tuple[str, str], dict] = {} - for mp4 in root.rglob("*.MP4"): + for mp4 in _list_mp4s(root): m = FOLDER_RE.search(str(mp4.parent)) if not m: continue @@ -124,12 +143,17 @@ def main(): ap.add_argument("--name", required=True, help="Acquisition name") ap.add_argument("--gap-min", type=int, default=5, help="Max gap between videos in one segment") ap.add_argument("--dry-run", action="store_true") + ap.add_argument("--remote-host", default=None, + help="SSH alias to read videos/exiftool from (stored paths get 'alias:' prefix)") args = ap.parse_args() - if not args.root.exists(): + global REMOTE_HOST + REMOTE_HOST = args.remote_host + + if not REMOTE_HOST and not args.root.exists(): raise SystemExit(f"root not found: {args.root}") - print(f"Scanning {args.root}...") + print(f"Scanning {args.root}{' @ ' + REMOTE_HOST if REMOTE_HOST else ''}...") grouped = scan(args.root) if not grouped: print("No (auv, gopro) folders found — expected GPx_AUVyyy layout."); return