# viz/server.py import sys import json from pathlib import Path import numpy as np import h5py from flask import Flask, jsonify, send_from_directory app = Flask(__name__, static_folder="static", static_url_path="/nav/static") # Configured at startup _TRAJ_H5 = "data/trajectory_world.h5" _FIXES_H5 = "data/sparse_fixes.h5" _PLY_PATH = "" def _load_data() -> dict: out: dict = {} # USV GPS track (always try first — most reliable) try: with h5py.File(_FIXES_H5, "r") as f: if "usv_gps" in f: e = f["usv_gps/easting"][:] n = f["usv_gps/northing"][:] t = f["usv_gps/t_ns"][:].tolist() rtk = f["usv_gps/rtk_status"][:].tolist() # Center on centroid for Three.js ce, cn = float(e.mean()), float(n.mean()) out["origin"] = {"easting": ce, "northing": cn} out["usv_gps"] = { "x": (e - ce).tolist(), "y": (n - cn).tolist(), "z": [0.0] * len(e), "t": t, "rtk": rtk, } else: ce, cn = 0.0, 0.0 out["origin"] = {"easting": 0.0, "northing": 0.0} if "auv_mcap" in f: dep = f["auv_mcap/depth_m"][:] t_ns_auv = f["auv_mcap/t_ns"][:] alt = f["auv_mcap/altitude_m"][:] if "auv_mcap/altitude_m" in f \ else np.full(len(dep), np.nan) step = max(1, len(dep) // 600) t_s = ((t_ns_auv[::step].astype(float) - float(t_ns_auv[0])) / 1e9).tolist() out["auv_profile"] = { "t_s": t_s, "depth_m": dep[::step].tolist(), "altitude_m": alt[::step].tolist(), } if "usbl_fixes" in f and "usv_gps" in f: bearing = f["usbl_fixes/bearing_deg"][:] range_m = f["usbl_fixes/range_m"][:] usbl_dep = f["usbl_fixes/depth_m"][:] t_usbl = f["usbl_fixes/t_ns"][:] usv_e_raw = f["usv_gps/easting"][:] usv_n_raw = f["usv_gps/northing"][:] usv_t_raw = f["usv_gps/t_ns"][:] hdg_raw = f["usv_gps/heading_deg"][:] \ if "usv_gps/heading_deg" in f \ else np.full(len(usv_t_raw), np.nan) valid = ~(np.isnan(bearing) | np.isnan(range_m) | np.isnan(usbl_dep)) if valid.any(): t_f = t_usbl[valid].astype(float) t_usv = usv_t_raw.astype(float) usv_ei = np.interp(t_f, t_usv, usv_e_raw) usv_ni = np.interp(t_f, t_usv, usv_n_raw) hdg_i = np.interp(t_f, t_usv, hdg_raw) \ if not np.all(np.isnan(hdg_raw)) \ else np.zeros(valid.sum()) range_h = np.sqrt(np.maximum(0.0, range_m[valid]**2 - usbl_dep[valid]**2)) angle = np.deg2rad(hdg_i + bearing[valid]) auv_e = usv_ei + range_h * np.sin(angle) auv_n = usv_ni + range_h * np.cos(angle) ce = out.get("origin", {}).get("easting", 0.0) cn = out.get("origin", {}).get("northing", 0.0) out["auv_usbl"] = { "x": (auv_e - ce).tolist(), "y": (auv_n - cn).tolist(), "z": (-usbl_dep[valid]).tolist(), } except Exception as e: out["error_fixes"] = str(e) # Fused trajectory (lingbot-aligned) try: with h5py.File(_TRAJ_H5, "r") as f: status = f.attrs.get("status", "unknown") out["traj_status"] = status if status == "aligned" and "poses_world" in f: pw = f["poses_world"] ce = out.get("origin", {}).get("easting", 0.0) cn = out.get("origin", {}).get("northing", 0.0) out["fused"] = { "x": (pw["x_m"][:] - ce).tolist(), "y": (pw["y_m"][:] - cn).tolist(), "z": pw["z_m"][:].tolist(), "t": pw["t_ns"][:].tolist(), "T_4x4": pw["T_4x4"][:].tolist(), } elif "poses_world" in f: # local only pw = f["poses_world"] out["lingbot_local"] = { "x": pw["x_m"][:].tolist(), "y": pw["y_m"][:].tolist(), "z": pw["z_m"][:].tolist(), "t": pw["t_ns"][:].tolist(), "T_4x4": pw["T_4x4"][:].tolist() if "T_4x4" in pw else [], } except Exception as e: out["error_traj"] = str(e) # Decimated PLY if _PLY_PATH and Path(_PLY_PATH).exists(): try: import open3d as o3d pcd = o3d.io.read_point_cloud(_PLY_PATH) n_pts = len(pcd.points) MAX_PTS = 300_000 if n_pts > MAX_PTS: vol = float(np.prod(pcd.get_max_bound() - pcd.get_min_bound())) vox = max((vol / MAX_PTS) ** (1/3), 0.02) pcd = pcd.voxel_down_sample(vox) pts = np.asarray(pcd.points) ce = out.get("origin", {}).get("easting", 0.0) cn = out.get("origin", {}).get("northing", 0.0) out["ply"] = { "x": (pts[:,0] - ce).tolist(), "y": (pts[:,1] - cn).tolist(), "z": pts[:,2].tolist(), } except Exception as e: out["error_ply"] = str(e) return out @app.route("/api/trajectory") def api_trajectory(): return jsonify(_load_data()) @app.route("/trajectory") def viewer(): return send_from_directory("static", "trajectory.html") @app.route("/") def root(): return '' def _main(): global _TRAJ_H5, _FIXES_H5, _PLY_PATH import argparse p = argparse.ArgumentParser() p.add_argument("--trajectory", default="data/trajectory_world.h5") p.add_argument("--fixes", default="data/sparse_fixes.h5") p.add_argument("--ply", default="") p.add_argument("--port", type=int, default=5051) args = p.parse_args() _TRAJ_H5 = args.trajectory _FIXES_H5 = args.fixes _PLY_PATH = args.ply app.run(host="0.0.0.0", port=args.port, debug=False) # ── COSMA QC Platform additions ────────────────────────── import os from pathlib import Path _DATA_DIR = Path(os.environ.get("COSMA_DATA_DIR", "/data/cosma")) def _load_nav_data(job_id: int) -> dict: out = {"job_id": job_id} poses_path = _DATA_DIR / f"job_{job_id}_poses.npz" ply_path = _DATA_DIR / f"job_{job_id}_decimated.ply" if poses_path.exists(): d = np.load(str(poses_path)) poses = d["poses"] # (N, 3, 4) xyz = poses[:, :3, 3] out["track"] = { "x": xyz[:, 0].tolist(), "y": xyz[:, 1].tolist(), "z": xyz[:, 2].tolist(), } out["n_poses"] = int(len(poses)) out["ply_ready"] = ply_path.exists() out["ply_path"] = str(ply_path) if ply_path.exists() else None return out @app.route("/map") def map_view(): from flask import render_template return render_template("map.html") @app.route("/nav") def nav_view(): from flask import render_template return render_template("nav.html") @app.route("/api/jobs") def api_jobs(): jobs = [] for p in sorted(_DATA_DIR.glob("job_*_decimated.ply")): parts = p.stem.split("_") jid = int(parts[1]) jobs.append({ "id": jid, "ply": str(p), "poses": str(_DATA_DIR / f"job_{jid}_poses.npz"), }) return jsonify(jobs) @app.route("/api/job//nav") def api_job_nav(job_id: int): return jsonify(_load_nav_data(job_id)) @app.route("/api/job//ply") def api_job_ply(job_id: int): ply_path = _DATA_DIR / f"job_{job_id}_decimated.ply" if not ply_path.exists(): return jsonify({"error": "PLY non disponible"}), 404 try: import open3d as o3d pcd = o3d.io.read_point_cloud(str(ply_path)) pts = np.asarray(pcd.points) return jsonify({"x": pts[:, 0].tolist(), "y": pts[:, 1].tolist(), "z": pts[:, 2].tolist(), "n": int(len(pts))}) except ImportError: return jsonify({"error": "open3d not installed"}), 503 if __name__ == "__main__": _main()