feat: frame QC scoring + viser per-AUV button
Stage 04 frame extract:
- New lib_frame_qc.py: per-frame Laplacian/contrast/blue-dominance scoring
- Classes: bottom_visible / water_no_bottom / turbid_water / out_of_water
- Sample 1/5 frames after extraction, write qc.json per segment
- Record metrics (frames_total, frames_bottom_visible, bottom_visible_pct)
- Mark job degraded when bottom_visible_pct < 50%
Per-AUV viser view:
- scripts/viser_auv.py loads all PLYs of an AUV, color per file
- POST /pipeline/missions/{id}/auvs/{auv}/view rsyncs ply -> worker
- launches viser on hashed port 9300+, returns URL
- _pipeline.html exposes AUV list, JS handler opens viser tab
This commit is contained in:
85
app/main.py
85
app/main.py
@@ -320,12 +320,17 @@ async def partial_pipeline(request: Request):
|
||||
counts = {}
|
||||
for j in jobs:
|
||||
counts[j["status"]] = counts.get(j["status"], 0) + 1
|
||||
auvs: list[str] = []
|
||||
for j in jobs:
|
||||
if j["auv_id"] and j["auv_id"] not in auvs:
|
||||
auvs.append(j["auv_id"])
|
||||
data["missions"].append({
|
||||
"id": m["id"],
|
||||
"name": m["name"],
|
||||
"status": m["status"],
|
||||
"jobs": [dict(j) for j in jobs],
|
||||
"counts": counts,
|
||||
"auvs": auvs,
|
||||
})
|
||||
except Exception as e:
|
||||
data["error"] = str(e)[:200]
|
||||
@@ -571,6 +576,86 @@ async def live_job(job_id: int):
|
||||
return {"url": row["viser_url"]}
|
||||
|
||||
|
||||
VISER_AUV_BASE = 9300
|
||||
PIPELINE_DATA_BASE = Path(os.environ.get("COSMA_PIPELINE_DATA", "/cosma-pipeline/data"))
|
||||
|
||||
|
||||
@app.post("/pipeline/missions/{mission_id}/auvs/{auv_id}/view")
|
||||
async def view_auv(mission_id: int, auv_id: str):
|
||||
"""Launch viser showing all PLYs for one AUV from a mission."""
|
||||
if not PIPELINE_DB.exists():
|
||||
raise HTTPException(404, "state.db introuvable")
|
||||
import hashlib
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
|
||||
tmp_path = tmp.name
|
||||
shutil.copy2(str(PIPELINE_DB), tmp_path)
|
||||
try:
|
||||
with sqlite3.connect(tmp_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
m = conn.execute(
|
||||
"SELECT name FROM missions WHERE id=?", (mission_id,)
|
||||
).fetchone()
|
||||
finally:
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except Exception:
|
||||
pass
|
||||
if not m:
|
||||
raise HTTPException(404, "mission introuvable")
|
||||
mission_name = m["name"]
|
||||
ply_dir_local = PIPELINE_DATA_BASE / mission_name / "ply" / auv_id
|
||||
if not ply_dir_local.exists():
|
||||
return JSONResponse(
|
||||
{"ok": False, "error": f"PLY dir {ply_dir_local} pas encore (stitch pas done)"},
|
||||
status_code=409,
|
||||
)
|
||||
|
||||
h = int(hashlib.md5(f"{mission_id}-{auv_id}".encode()).hexdigest()[:6], 16)
|
||||
port = VISER_AUV_BASE + (h % 100)
|
||||
worker = WORKERS[1] if len(WORKERS) > 1 else WORKERS[0]
|
||||
alias = worker["ssh_alias"]
|
||||
host = worker["host"]
|
||||
worker_dir = f"/tmp/cosma-viser-auv/{mission_name}/{auv_id}"
|
||||
|
||||
rsync = await asyncio.create_subprocess_exec(
|
||||
"rsync", "-az", "--delete",
|
||||
"-e", "ssh -o BatchMode=yes -o StrictHostKeyChecking=no",
|
||||
f"{ply_dir_local}/", f"{alias}:{worker_dir}/",
|
||||
stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
_, err = await rsync.communicate()
|
||||
if rsync.returncode != 0:
|
||||
raise HTTPException(500, f"rsync failed: {err.decode()[:200]}")
|
||||
|
||||
local_script = Path(__file__).parent.parent / "scripts" / "viser_auv.py"
|
||||
scp = await asyncio.create_subprocess_exec(
|
||||
"scp", "-o", "BatchMode=yes", "-o", "StrictHostKeyChecking=no",
|
||||
str(local_script), f"{alias}:/tmp/viser_auv.py",
|
||||
stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
_, err = await scp.communicate()
|
||||
if scp.returncode != 0:
|
||||
raise HTTPException(500, f"scp failed: {err.decode()[:200]}")
|
||||
|
||||
venv_py = f"{worker.get('lingbot_path', '/root/ai-video/lingbot-map')}/.venv/bin/python"
|
||||
launch_cmd = (
|
||||
f"pkill -f 'viser_auv.py.*--port {port}' 2>/dev/null ; sleep 1 ; "
|
||||
f"setsid nohup {venv_py} /tmp/viser_auv.py --ply-dir {worker_dir} --port {port} "
|
||||
f"</dev/null >/tmp/viser_auv_{port}.log 2>&1 & disown ; sleep 0.3"
|
||||
)
|
||||
launch = await asyncio.create_subprocess_exec(
|
||||
"ssh", "-o", "BatchMode=yes", "-o", "StrictHostKeyChecking=no", alias, launch_cmd,
|
||||
stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
await launch.communicate()
|
||||
await asyncio.sleep(4)
|
||||
|
||||
return {"ok": True, "url": f"http://{host}:{port}/", "auv_id": auv_id, "port": port}
|
||||
|
||||
|
||||
@app.post("/stitches/{stitch_id}/view")
|
||||
async def view_stitch(stitch_id: int):
|
||||
with closing(db()) as conn:
|
||||
|
||||
@@ -220,3 +220,12 @@ code { background: rgba(255,255,255,0.05); padding: 0 0.25rem; border-radius: 3p
|
||||
.status-degraded, .pj-badge.status-degraded { color: var(--warn); background: rgba(245,197,24,0.1); }
|
||||
.status-error, .pj-badge.status-error { color: var(--err); background: rgba(255,92,122,0.1); }
|
||||
.status-ingested, .pm-status.status-ingested { color: var(--accent); background: rgba(95,208,255,0.12); }
|
||||
|
||||
/* AUV viser buttons (per-mission) */
|
||||
.pm-auvs { display: flex; gap: 0.4rem; flex-wrap: wrap; margin: 0.3rem 0 0.5rem; align-items: center; }
|
||||
.pm-auvs-label { color: var(--muted, #888); font-size: 0.72rem; text-transform: uppercase; letter-spacing: 0.05em; }
|
||||
.btn-viser-auv { font-size: 0.72rem; padding: 2px 8px; background: transparent;
|
||||
border: 1px solid var(--accent, #4af); color: var(--accent, #4af); border-radius: 3px;
|
||||
cursor: pointer; font-family: inherit; }
|
||||
.btn-viser-auv:hover { background: var(--accent, #4af); color: #062036; }
|
||||
.btn-viser-auv:disabled { opacity: 0.5; cursor: wait; }
|
||||
|
||||
@@ -16,6 +16,16 @@
|
||||
{% if m.counts.get('error') %}<span class="cnt err">{{ m.counts.error }} error</span>{% endif %}
|
||||
</span>
|
||||
</div>
|
||||
{% if m.auvs %}
|
||||
<div class="pm-auvs">
|
||||
<span class="pm-auvs-label">Viser AUV:</span>
|
||||
{% for auv_id in m.auvs %}
|
||||
<button class="btn-viser-auv"
|
||||
data-url="/pipeline/missions/{{ m.id }}/auvs/{{ auv_id }}/view"
|
||||
type="button">{{ auv_id }} ↗</button>
|
||||
{% endfor %}
|
||||
</div>
|
||||
{% endif %}
|
||||
<table class="pipeline-jobs-table">
|
||||
<thead>
|
||||
<tr><th>AUV</th><th>Segment</th><th>Stage</th><th>Status</th><th>Worker</th><th>Duree</th></tr>
|
||||
|
||||
@@ -104,6 +104,31 @@ document.addEventListener('click', async (e) => {
|
||||
btn.textContent = 'viser ↗';
|
||||
btn.disabled = false;
|
||||
});
|
||||
|
||||
document.addEventListener('click', async (e) => {
|
||||
const btn = e.target.closest('.btn-viser-auv');
|
||||
if (!btn) return;
|
||||
e.preventDefault();
|
||||
const url = btn.dataset.url;
|
||||
if (!url) return;
|
||||
const original = btn.textContent;
|
||||
btn.textContent = 'launch…';
|
||||
btn.disabled = true;
|
||||
try {
|
||||
const res = await fetch(url, { method: 'POST' });
|
||||
const d = await res.json().catch(() => ({}));
|
||||
if (res.ok && d.url) {
|
||||
window.open(d.url, '_blank');
|
||||
} else {
|
||||
alert(d.error || d.detail || ('HTTP ' + res.status));
|
||||
}
|
||||
} catch (err) {
|
||||
alert('Erreur réseau: ' + err);
|
||||
} finally {
|
||||
btn.textContent = original;
|
||||
btn.disabled = false;
|
||||
}
|
||||
});
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
|
||||
341
pipeline/stages/04_frame_extract.py
Normal file
341
pipeline/stages/04_frame_extract.py
Normal file
@@ -0,0 +1,341 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Stage 04 — Extract frames from GoPro videos.
|
||||
|
||||
For each MP4 in /mnt/ssd/<mission>/raw_data/medias/videos/GP*-AUV*/
|
||||
- Skip files < 2MB (placeholders)
|
||||
- Auto-trim hors-eau: sample frames at start/end, detect non-blue/green pixels
|
||||
- ffmpeg fps=1, scale=518:294, q:v=3
|
||||
- Output: ~/cosma-pipeline/data/<mission>/frames/<AUV>/<segment>/frame_XXXXX.jpg
|
||||
- Skip if output dir exists and has >= expected frames
|
||||
- Log to SQLite state.db
|
||||
|
||||
Usage:
|
||||
python3 04_frame_extract.py --mission 20260505-Lepradet
|
||||
python3 04_frame_extract.py --video /mnt/ssd/.../GP1-AUV210/GX019837.MP4
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from orchestrator.db import init_db, get_conn, upsert_job, record_metric, now_iso
|
||||
from lib_frame_qc import score_image_file, aggregate as qc_aggregate
|
||||
|
||||
QC_SAMPLE_RATE = int(os.environ.get("COSMA_QC_SAMPLE_RATE", "5"))
|
||||
QC_BOTTOM_OK_PCT = float(os.environ.get("COSMA_QC_BOTTOM_OK_PCT", "50"))
|
||||
|
||||
PIPELINE_BASE = Path(os.environ.get("COSMA_PIPELINE_BASE", "/home/cosma/cosma-pipeline"))
|
||||
SSD_BASE = Path(os.environ.get("COSMA_SSD_BASE", "/mnt/ssd"))
|
||||
MIN_VIDEO_SIZE_MB = 2.0
|
||||
|
||||
|
||||
def is_underwater_frame(frame_bgr: np.ndarray, threshold: float = 0.6) -> bool:
|
||||
"""Return True if frame looks like underwater footage (dominant blue/green).
|
||||
Hors-eau: R > G-5 AND R > B-5 (dry/air dominant).
|
||||
Underwater: blue or green channel dominant.
|
||||
"""
|
||||
b, g, r = cv2.split(frame_bgr.astype(np.float32))
|
||||
mean_r = float(np.mean(r))
|
||||
mean_g = float(np.mean(g))
|
||||
mean_b = float(np.mean(b))
|
||||
# Not underwater: red dominates
|
||||
if mean_r > mean_g - 5 and mean_r > mean_b - 5:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def detect_water_range(video_path: Path, sample_count: int = 10) -> tuple[float, float]:
|
||||
"""Detect start/end times of underwater portion by sampling frames.
|
||||
Returns (start_s, end_s). Returns (0, duration) if uncertain.
|
||||
"""
|
||||
cap = cv2.VideoCapture(str(video_path))
|
||||
if not cap.isOpened():
|
||||
return 0.0, -1.0
|
||||
|
||||
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
|
||||
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
||||
duration_s = total_frames / fps if fps > 0 else 0
|
||||
|
||||
# Sample frames: first 20% and last 20%
|
||||
probe_times_start = [duration_s * i / (sample_count * 5) for i in range(sample_count)]
|
||||
probe_times_end = [duration_s * (1 - i / (sample_count * 5)) for i in range(sample_count)]
|
||||
|
||||
# Find first underwater frame from start
|
||||
start_s = 0.0
|
||||
for t in probe_times_start:
|
||||
cap.set(cv2.CAP_PROP_POS_MSEC, t * 1000)
|
||||
ret, frame = cap.read()
|
||||
if ret and is_underwater_frame(frame):
|
||||
start_s = max(0.0, t - 2.0)
|
||||
break
|
||||
|
||||
# Find last underwater frame from end
|
||||
end_s = duration_s
|
||||
for t in sorted(probe_times_end, reverse=True):
|
||||
cap.set(cv2.CAP_PROP_POS_MSEC, t * 1000)
|
||||
ret, frame = cap.read()
|
||||
if ret and is_underwater_frame(frame):
|
||||
end_s = min(duration_s, t + 2.0)
|
||||
break
|
||||
|
||||
cap.release()
|
||||
return start_s, end_s
|
||||
|
||||
|
||||
def get_video_duration(video_path: Path) -> float:
|
||||
"""Get video duration in seconds via ffprobe."""
|
||||
cmd = [
|
||||
"ffprobe", "-v", "quiet", "-print_format", "json",
|
||||
"-show_streams", str(video_path)
|
||||
]
|
||||
try:
|
||||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
data = json.loads(r.stdout)
|
||||
for stream in data.get("streams", []):
|
||||
dur = float(stream.get("duration", 0))
|
||||
if dur > 0:
|
||||
return dur
|
||||
except Exception:
|
||||
pass
|
||||
return 0.0
|
||||
|
||||
|
||||
def extract_frames(video_path: Path, out_dir: Path, fps: int = 1,
|
||||
scale: str = "518:294", quality: int = 3,
|
||||
start_s: float = 0.0, end_s: float = -1.0) -> dict:
|
||||
"""Run ffmpeg to extract frames. Returns metrics dict."""
|
||||
out_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Build ffmpeg args
|
||||
cmd = ["ffmpeg", "-y", "-loglevel", "error"]
|
||||
cmd += ["-ss", str(start_s), "-i", str(video_path)]
|
||||
if end_s > 0 and end_s > start_s:
|
||||
cmd += ["-t", str(end_s - start_s)]
|
||||
cmd += [
|
||||
"-vf", f"fps={fps},scale={scale}",
|
||||
"-q:v", str(quality),
|
||||
str(out_dir / "frame_%05d.jpg"),
|
||||
]
|
||||
|
||||
t0 = time.time()
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600)
|
||||
elapsed = time.time() - t0
|
||||
|
||||
frames = sorted(out_dir.glob("frame_*.jpg"))
|
||||
n_frames = len(frames)
|
||||
|
||||
metrics = {
|
||||
"video": str(video_path),
|
||||
"out_dir": str(out_dir),
|
||||
"n_frames": n_frames,
|
||||
"elapsed_s": round(elapsed, 1),
|
||||
"returncode": result.returncode,
|
||||
"start_s": start_s,
|
||||
"end_s": end_s,
|
||||
}
|
||||
if result.returncode != 0:
|
||||
metrics["error"] = result.stderr[-500:]
|
||||
print(f" [04] ffmpeg error for {video_path.name}: {result.stderr[-200:]}")
|
||||
else:
|
||||
print(f" [04] {video_path.name}: {n_frames} frames in {elapsed:.1f}s")
|
||||
# Score a subsample for QC
|
||||
qc = qc_segment(out_dir, sample_rate=QC_SAMPLE_RATE)
|
||||
if qc:
|
||||
metrics.update(qc)
|
||||
|
||||
return metrics
|
||||
|
||||
|
||||
def qc_segment(frames_dir: Path, sample_rate: int = 5) -> dict | None:
|
||||
"""Sample 1/sample_rate frames, score each, write qc.json, return aggregate."""
|
||||
frames = sorted(frames_dir.glob("frame_*.jpg"))
|
||||
if not frames:
|
||||
return None
|
||||
sampled = frames[::max(1, sample_rate)]
|
||||
per_frame = []
|
||||
for f in sampled:
|
||||
s = score_image_file(f)
|
||||
if s is not None:
|
||||
per_frame.append(s)
|
||||
if not per_frame:
|
||||
return None
|
||||
agg = qc_aggregate(per_frame)
|
||||
qc_payload = {
|
||||
"frames_in_dir": len(frames),
|
||||
"frames_sampled": len(per_frame),
|
||||
"sample_rate": sample_rate,
|
||||
**agg,
|
||||
"per_frame": per_frame,
|
||||
}
|
||||
try:
|
||||
(frames_dir / "qc.json").write_text(json.dumps(qc_payload, indent=2))
|
||||
except Exception as e:
|
||||
print(f" [04] qc.json write failed: {e}")
|
||||
print(
|
||||
f" [04] QC: bottom_visible={agg['bottom_visible_pct']}% "
|
||||
f"(b={agg['frames_bottom_visible']} ooo={agg['frames_out_of_water']} "
|
||||
f"turb={agg['frames_turbid']} nob={agg['frames_water_no_bottom']})"
|
||||
)
|
||||
return agg
|
||||
|
||||
|
||||
def process_video(video_path: Path, auv_id: str, mission_name: str) -> dict:
|
||||
"""Process one video file. Returns metrics."""
|
||||
size_mb = video_path.stat().st_size / (1024 * 1024)
|
||||
if size_mb < MIN_VIDEO_SIZE_MB:
|
||||
print(f" [04] Skip {video_path.name} ({size_mb:.1f}MB < {MIN_VIDEO_SIZE_MB}MB)")
|
||||
return {"video": str(video_path), "skipped": True, "reason": "placeholder"}
|
||||
|
||||
segment = video_path.stem
|
||||
out_dir = PIPELINE_BASE / "data" / mission_name / "frames" / auv_id / segment
|
||||
|
||||
# Check if already done
|
||||
if out_dir.exists():
|
||||
existing = list(out_dir.glob("frame_*.jpg"))
|
||||
duration_s = get_video_duration(video_path)
|
||||
expected = max(1, int(duration_s) - 10) if duration_s > 0 else 1
|
||||
if len(existing) >= expected:
|
||||
print(f" [04] {video_path.name}: already done ({len(existing)} frames), skip")
|
||||
cached_m: dict = {"video": str(video_path), "n_frames": len(existing), "cached": True,
|
||||
"out_dir": str(out_dir)}
|
||||
# Re-run QC if qc.json is missing (idempotent enrichment)
|
||||
if not (out_dir / "qc.json").exists():
|
||||
qc = qc_segment(out_dir, sample_rate=QC_SAMPLE_RATE)
|
||||
if qc:
|
||||
cached_m.update(qc)
|
||||
else:
|
||||
try:
|
||||
cached_qc = json.loads((out_dir / "qc.json").read_text())
|
||||
for k in (
|
||||
"frames_total", "frames_bottom_visible", "frames_out_of_water",
|
||||
"frames_turbid", "frames_water_no_bottom", "bottom_visible_pct",
|
||||
):
|
||||
if k in cached_qc:
|
||||
cached_m[k] = cached_qc[k]
|
||||
except Exception:
|
||||
pass
|
||||
return cached_m
|
||||
|
||||
print(f" [04] {video_path.name} ({size_mb:.0f}MB): detecting water range...")
|
||||
start_s, end_s = detect_water_range(video_path)
|
||||
print(f" [04] water range: {start_s:.1f}s → {end_s:.1f}s")
|
||||
|
||||
return extract_frames(video_path, out_dir, start_s=start_s, end_s=end_s)
|
||||
|
||||
|
||||
def find_auv_videos(mission_path: Path) -> dict[str, list[Path]]:
|
||||
"""Find all MP4 files per AUV in medias/videos/GP*-AUV*/."""
|
||||
videos_root = mission_path / "raw_data/medias/videos"
|
||||
result: dict[str, list[Path]] = {}
|
||||
|
||||
for gopro_dir in sorted(videos_root.glob("GP*-AUV*")):
|
||||
# Extract AUV ID from dir name: GP1-AUV210 -> AUV210
|
||||
parts = gopro_dir.name.split("-")
|
||||
if len(parts) >= 2:
|
||||
auv_id = parts[1]
|
||||
mp4_files = [f for f in sorted(gopro_dir.glob("GX*.MP4"))
|
||||
if f.stat().st_size / (1024 * 1024) >= MIN_VIDEO_SIZE_MB]
|
||||
if mp4_files:
|
||||
if auv_id not in result:
|
||||
result[auv_id] = []
|
||||
result[auv_id].extend(mp4_files)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def process_mission(mission_name: str) -> list[dict]:
|
||||
mission_path = SSD_BASE / mission_name
|
||||
auv_videos = find_auv_videos(mission_path)
|
||||
print(f"[04] Found AUVs: {list(auv_videos.keys())}")
|
||||
|
||||
all_metrics = []
|
||||
init_db()
|
||||
|
||||
for auv_id, videos in sorted(auv_videos.items()):
|
||||
print(f"[04] === {auv_id}: {len(videos)} videos ===")
|
||||
for video_path in videos:
|
||||
t0 = time.time()
|
||||
m = process_video(video_path, auv_id, mission_name)
|
||||
m["auv_id"] = auv_id
|
||||
all_metrics.append(m)
|
||||
|
||||
if not m.get("skipped"):
|
||||
with get_conn() as conn:
|
||||
mission_row = conn.execute(
|
||||
"SELECT id FROM missions WHERE name=?", (mission_name,)
|
||||
).fetchone()
|
||||
if mission_row:
|
||||
bottom_pct = m.get("bottom_visible_pct")
|
||||
if m.get("returncode", 0) != 0:
|
||||
job_status = "error"
|
||||
elif bottom_pct is not None and bottom_pct < QC_BOTTOM_OK_PCT:
|
||||
job_status = "degraded"
|
||||
else:
|
||||
job_status = "done"
|
||||
job_id = upsert_job(
|
||||
conn, mission_row["id"], auv_id,
|
||||
video_path.stem, "04_frame_extract",
|
||||
status=job_status,
|
||||
output_path=m.get("out_dir", ""),
|
||||
error_msg=(
|
||||
f"bottom_visible_pct={bottom_pct}% <{QC_BOTTOM_OK_PCT}%"
|
||||
if job_status == "degraded" else None
|
||||
),
|
||||
)
|
||||
if not m.get("cached"):
|
||||
record_metric(conn, job_id, "frames_extracted",
|
||||
value=m.get("n_frames", 0),
|
||||
pass_fail="pass" if m.get("n_frames", 0) > 0 else "fail")
|
||||
record_metric(conn, job_id, "extract_time_s",
|
||||
value=m.get("elapsed_s", 0))
|
||||
# Always record QC metrics (so cached frames also get scored history)
|
||||
for k in (
|
||||
"frames_total", "frames_bottom_visible", "frames_out_of_water",
|
||||
"frames_turbid", "frames_water_no_bottom",
|
||||
):
|
||||
if k in m:
|
||||
record_metric(conn, job_id, k, value=float(m[k]))
|
||||
if bottom_pct is not None:
|
||||
record_metric(
|
||||
conn, job_id, "bottom_visible_pct",
|
||||
value=float(bottom_pct),
|
||||
pass_fail="pass" if bottom_pct >= QC_BOTTOM_OK_PCT else "degraded",
|
||||
)
|
||||
|
||||
return all_metrics
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="Stage 04 — Extract frames from GoPro videos")
|
||||
ap.add_argument("--mission", type=str, help="Mission name (e.g. 20260505-Lepradet)")
|
||||
ap.add_argument("--video", type=Path, help="Single video path")
|
||||
ap.add_argument("--auv", type=str, default="UNKNOWN", help="AUV ID for single video mode")
|
||||
args = ap.parse_args()
|
||||
|
||||
if args.video:
|
||||
mission_name = args.mission or "unknown"
|
||||
m = process_video(args.video, args.auv, mission_name)
|
||||
print(f"\nResult: {m}")
|
||||
elif args.mission:
|
||||
metrics = process_mission(args.mission)
|
||||
print("\n=== Stage 04 summary ===")
|
||||
total_frames = sum(m.get("n_frames", 0) for m in metrics if not m.get("skipped"))
|
||||
skipped = sum(1 for m in metrics if m.get("skipped"))
|
||||
print(f"Total frames: {total_frames}, skipped: {skipped}")
|
||||
else:
|
||||
ap.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
83
pipeline/stages/lib_frame_qc.py
Normal file
83
pipeline/stages/lib_frame_qc.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""Frame quality scoring for underwater footage.
|
||||
|
||||
For each frame, compute:
|
||||
- laplacian_var: focus/sharpness (cv2.Laplacian variance)
|
||||
- contrast: stddev of grayscale
|
||||
- blue_dominance: mean(B - R), positive = water dominant
|
||||
- mean_r/g/b: per-channel means
|
||||
|
||||
Classification (priority order):
|
||||
- mean_r > mean_g + 5 AND mean_r > mean_b + 5 → 'out_of_water'
|
||||
- laplacian_var < 50 AND contrast < 25 → 'turbid_water'
|
||||
- laplacian_var >= 80 AND contrast >= 35
|
||||
AND blue_dominance > -10 → 'bottom_visible'
|
||||
- else → 'water_no_bottom'
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import Counter
|
||||
from typing import Iterable
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
def score_frame(frame_bgr: np.ndarray) -> dict:
|
||||
"""Return per-frame QC metrics + class label."""
|
||||
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
|
||||
lap_var = float(cv2.Laplacian(gray, cv2.CV_64F).var())
|
||||
contrast = float(gray.std())
|
||||
b, g, r = cv2.split(frame_bgr)
|
||||
mean_r = float(r.mean())
|
||||
mean_g = float(g.mean())
|
||||
mean_b = float(b.mean())
|
||||
blue_dom = float(mean_b - mean_r)
|
||||
|
||||
if mean_r > mean_g + 5 and mean_r > mean_b + 5:
|
||||
klass = "out_of_water"
|
||||
elif lap_var < 50 and contrast < 25:
|
||||
klass = "turbid_water"
|
||||
elif lap_var >= 80 and contrast >= 35 and blue_dom > -10:
|
||||
klass = "bottom_visible"
|
||||
else:
|
||||
klass = "water_no_bottom"
|
||||
|
||||
return {
|
||||
"laplacian_var": round(lap_var, 2),
|
||||
"contrast": round(contrast, 2),
|
||||
"blue_dominance": round(blue_dom, 2),
|
||||
"mean_r": round(mean_r, 1),
|
||||
"mean_g": round(mean_g, 1),
|
||||
"mean_b": round(mean_b, 1),
|
||||
"class": klass,
|
||||
"score_ok": klass == "bottom_visible",
|
||||
}
|
||||
|
||||
|
||||
def score_image_file(path) -> dict | None:
|
||||
"""Load image with OpenCV and score it. Returns None on failure."""
|
||||
img = cv2.imread(str(path))
|
||||
if img is None:
|
||||
return None
|
||||
res = score_frame(img)
|
||||
res["file"] = str(path)
|
||||
return res
|
||||
|
||||
|
||||
def aggregate(scores: Iterable[dict]) -> dict:
|
||||
"""Aggregate a sequence of score_frame() dicts."""
|
||||
scores = list(scores)
|
||||
total = len(scores)
|
||||
counts = Counter(s["class"] for s in scores)
|
||||
bottom = counts.get("bottom_visible", 0)
|
||||
return {
|
||||
"frames_total": total,
|
||||
"frames_bottom_visible": bottom,
|
||||
"frames_out_of_water": counts.get("out_of_water", 0),
|
||||
"frames_turbid": counts.get("turbid_water", 0),
|
||||
"frames_water_no_bottom": counts.get("water_no_bottom", 0),
|
||||
"bottom_visible_pct": round(100.0 * bottom / total, 1) if total else 0.0,
|
||||
}
|
||||
|
||||
|
||||
CLASS_ORDER = ("bottom_visible", "water_no_bottom", "turbid_water", "out_of_water")
|
||||
72
scripts/viser_auv.py
Normal file
72
scripts/viser_auv.py
Normal file
@@ -0,0 +1,72 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Open viser viewer with all PLYs from one AUV.
|
||||
|
||||
Usage:
|
||||
viser_auv.py --ply-dir /path/to/auv/ply --port 9210
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--ply-dir", required=True)
|
||||
ap.add_argument("--port", type=int, default=9210)
|
||||
ap.add_argument("--point-size", type=float, default=0.01)
|
||||
ap.add_argument("--max-points-per-ply", type=int, default=1_500_000)
|
||||
args = ap.parse_args()
|
||||
|
||||
try:
|
||||
import open3d as o3d
|
||||
import viser
|
||||
except ImportError as e:
|
||||
sys.exit(f"missing dep: {e}")
|
||||
|
||||
ply_dir = Path(args.ply_dir)
|
||||
plys = sorted(ply_dir.glob("**/*.ply"))
|
||||
print(f"Found {len(plys)} PLY files in {ply_dir}", flush=True)
|
||||
if not plys:
|
||||
sys.exit("no PLY found")
|
||||
|
||||
server = viser.ViserServer(host="0.0.0.0", port=args.port)
|
||||
palette = [
|
||||
(1.0, 0.30, 0.30), (0.30, 1.0, 0.30), (0.30, 0.55, 1.0),
|
||||
(1.0, 0.85, 0.20), (1.0, 0.30, 1.0), (0.30, 1.0, 1.0),
|
||||
(1.0, 0.55, 0.20), (0.55, 0.30, 1.0),
|
||||
]
|
||||
|
||||
for i, p in enumerate(plys):
|
||||
pcd = o3d.io.read_point_cloud(str(p))
|
||||
pts = np.asarray(pcd.points, dtype=np.float32)
|
||||
if len(pts) == 0:
|
||||
print(f" ! {p.name}: empty", flush=True)
|
||||
continue
|
||||
if pcd.has_colors():
|
||||
cols = np.asarray(pcd.colors, dtype=np.float32)
|
||||
else:
|
||||
cols = np.tile(palette[i % len(palette)], (len(pts), 1)).astype(np.float32)
|
||||
if len(pts) > args.max_points_per_ply:
|
||||
idx = np.random.choice(len(pts), args.max_points_per_ply, replace=False)
|
||||
pts = pts[idx]
|
||||
cols = cols[idx]
|
||||
# viser wants uint8 colors
|
||||
cols_u8 = (cols * 255).clip(0, 255).astype(np.uint8)
|
||||
name = f"/{p.parent.name}_{p.stem}"
|
||||
server.scene.add_point_cloud(
|
||||
name=name, points=pts, colors=cols_u8, point_size=args.point_size
|
||||
)
|
||||
print(f" + {p.name}: {len(pts):,} pts", flush=True)
|
||||
|
||||
print(f"Viser ready on port {args.port}", flush=True)
|
||||
while True:
|
||||
time.sleep(60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user