#!/usr/bin/env python3 """QA checks — each function returns {metric: value, pass_fail: str, details: str}.""" from __future__ import annotations import json from pathlib import Path def check_ingest(manifest_path: Path) -> dict: try: m = json.loads(manifest_path.read_text()) n_auv_video = len(m.get("auv_ids_video", [])) n_auv_bags = len(m.get("auv_ids_bags", [])) total_s = m.get("total_video_s", 0) segs = sum(len(v) for v in m.get("segments_per_auv", {}).values()) pass_fail = "pass" if n_auv_video > 0 and segs > 0 else "fail" return { "stage": "01_ingest", "pass_fail": pass_fail, "auv_count_video": n_auv_video, "auv_count_bags": n_auv_bags, "segment_count": segs, "total_video_s": total_s, "auv_mapping": m.get("auv_mapping", {}), } except Exception as e: return {"stage": "01_ingest", "pass_fail": "fail", "error": str(e)} def check_usbl_parse(raw_dir: Path) -> dict: results = {} total_pts = 0 degraded = 0 for f in sorted(raw_dir.glob("*_nav_raw.json")): try: d = json.loads(f.read_text()) pts = len(d.get("points", [])) status = d.get("metrics", {}).get("status", "?") total_pts += pts if status == "degraded": degraded += 1 results[d.get("auv_id", f.stem)] = {"points": pts, "status": status} except Exception as e: results[f.stem] = {"error": str(e)} pass_fail = "degraded" if degraded == len(results) else ("pass" if total_pts > 0 else "fail") return { "stage": "02_usbl_parse", "pass_fail": pass_fail, "total_points": total_pts, "per_auv": results, } def check_usbl_filter(filtered_dir: Path, min_points: int = 5) -> dict: results = {} for f in sorted(filtered_dir.glob("*_nav_filtered.json")): try: d = json.loads(f.read_text()) pts_after = len(d.get("points", [])) m = d.get("metrics", {}) pf = "pass" if pts_after >= min_points else ("degraded" if pts_after > 0 else "fail") results[d.get("auv_id", f.stem)] = { "before": m.get("points_before", 0), "after": pts_after, "removed_null": m.get("points_removed_null", 0), "removed_outlier": m.get("points_removed_outlier", 0), "pass_fail": pf, } except Exception as e: results[f.stem] = {"error": str(e)} overall = "pass" if all(v.get("pass_fail") == "fail" for v in results.values() if "error" not in v): overall = "fail" elif any(v.get("pass_fail") == "degraded" for v in results.values() if "error" not in v): overall = "degraded" return {"stage": "03_usbl_filter", "pass_fail": overall, "per_auv": results}