feat: Three.js trajectory viewer — Flask :5051 + trajectoires + frustums + PLY décimé
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
142
viz/server.py
Normal file
142
viz/server.py
Normal file
@@ -0,0 +1,142 @@
|
||||
# viz/server.py
|
||||
import sys
|
||||
import json
|
||||
from pathlib import Path
|
||||
import numpy as np
|
||||
import h5py
|
||||
from flask import Flask, jsonify, send_from_directory
|
||||
|
||||
app = Flask(__name__, static_folder="static")
|
||||
|
||||
# Configured at startup
|
||||
_TRAJ_H5 = "data/trajectory_world.h5"
|
||||
_FIXES_H5 = "data/sparse_fixes.h5"
|
||||
_PLY_PATH = ""
|
||||
|
||||
def _load_data() -> dict:
|
||||
out: dict = {}
|
||||
|
||||
# USV GPS track (always try first — most reliable)
|
||||
try:
|
||||
with h5py.File(_FIXES_H5, "r") as f:
|
||||
if "usv_gps" in f:
|
||||
e = f["usv_gps/easting"][:]
|
||||
n = f["usv_gps/northing"][:]
|
||||
t = f["usv_gps/t_ns"][:].tolist()
|
||||
rtk = f["usv_gps/rtk_status"][:].tolist()
|
||||
# Center on centroid for Three.js
|
||||
ce, cn = float(e.mean()), float(n.mean())
|
||||
out["origin"] = {"easting": ce, "northing": cn}
|
||||
out["usv_gps"] = {
|
||||
"x": (e - ce).tolist(), "y": (n - cn).tolist(),
|
||||
"z": [0.0] * len(e), "t": t, "rtk": rtk,
|
||||
}
|
||||
else:
|
||||
ce, cn = 0.0, 0.0
|
||||
out["origin"] = {"easting": 0.0, "northing": 0.0}
|
||||
|
||||
if "auv_mcap" in f:
|
||||
dep = f["auv_mcap/depth_m"][:]
|
||||
t_auv = f["auv_mcap/t_ns"][:].tolist()
|
||||
# AUV lat/lon zeros — plot depth over time as Z only
|
||||
n_pts = len(dep)
|
||||
out["auv_depth"] = {
|
||||
"x": list(range(n_pts)), # index as proxy for time
|
||||
"y": [0.0] * n_pts,
|
||||
"z": (-dep).tolist(), # positive = deeper
|
||||
"t": t_auv,
|
||||
}
|
||||
|
||||
if "usbl_fixes" in f:
|
||||
north = f["usbl_fixes/north_m"][:]
|
||||
east = f["usbl_fixes/east_m"][:]
|
||||
depth = f["usbl_fixes/depth_m"][:]
|
||||
valid = ~(np.isnan(north) | (north == 0) & (east == 0))
|
||||
if valid.any():
|
||||
out["usbl"] = {
|
||||
"x": east[valid].tolist(),
|
||||
"y": north[valid].tolist(),
|
||||
"z": (-depth[valid]).tolist(),
|
||||
}
|
||||
except Exception as e:
|
||||
out["error_fixes"] = str(e)
|
||||
|
||||
# Fused trajectory (lingbot-aligned)
|
||||
try:
|
||||
with h5py.File(_TRAJ_H5, "r") as f:
|
||||
status = f.attrs.get("status", "unknown")
|
||||
out["traj_status"] = status
|
||||
if status == "aligned" and "poses_world" in f:
|
||||
pw = f["poses_world"]
|
||||
ce = out.get("origin", {}).get("easting", 0.0)
|
||||
cn = out.get("origin", {}).get("northing", 0.0)
|
||||
out["fused"] = {
|
||||
"x": (pw["x_m"][:] - ce).tolist(),
|
||||
"y": (pw["y_m"][:] - cn).tolist(),
|
||||
"z": pw["z_m"][:].tolist(),
|
||||
"t": pw["t_ns"][:].tolist(),
|
||||
"T_4x4": pw["T_4x4"][:].tolist(),
|
||||
}
|
||||
elif "poses_world" in f:
|
||||
# local only
|
||||
pw = f["poses_world"]
|
||||
out["lingbot_local"] = {
|
||||
"x": pw["x_m"][:].tolist(),
|
||||
"y": pw["y_m"][:].tolist(),
|
||||
"z": pw["z_m"][:].tolist(),
|
||||
"t": pw["t_ns"][:].tolist(),
|
||||
"T_4x4": pw["T_4x4"][:].tolist() if "T_4x4" in pw else [],
|
||||
}
|
||||
except Exception as e:
|
||||
out["error_traj"] = str(e)
|
||||
|
||||
# Decimated PLY
|
||||
if _PLY_PATH and Path(_PLY_PATH).exists():
|
||||
try:
|
||||
import open3d as o3d
|
||||
pcd = o3d.io.read_point_cloud(_PLY_PATH)
|
||||
n_pts = len(pcd.points)
|
||||
if n_pts > 200_000:
|
||||
vox = float((pcd.get_max_bound() - pcd.get_min_bound()).max()) * (200_000 / n_pts) ** (1/3)
|
||||
pcd = pcd.voxel_down_sample(float(vox))
|
||||
pts = np.asarray(pcd.points)
|
||||
ce = out.get("origin", {}).get("easting", 0.0)
|
||||
cn = out.get("origin", {}).get("northing", 0.0)
|
||||
out["ply"] = {
|
||||
"x": (pts[:,0] - ce).tolist(),
|
||||
"y": (pts[:,1] - cn).tolist(),
|
||||
"z": pts[:,2].tolist(),
|
||||
}
|
||||
except Exception as e:
|
||||
out["error_ply"] = str(e)
|
||||
|
||||
return out
|
||||
|
||||
@app.route("/api/trajectory")
|
||||
def api_trajectory():
|
||||
return jsonify(_load_data())
|
||||
|
||||
@app.route("/trajectory")
|
||||
def viewer():
|
||||
return send_from_directory("static", "trajectory.html")
|
||||
|
||||
@app.route("/")
|
||||
def root():
|
||||
return '<meta http-equiv="refresh" content="0;url=/trajectory">'
|
||||
|
||||
def _main():
|
||||
global _TRAJ_H5, _FIXES_H5, _PLY_PATH
|
||||
import argparse
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument("--trajectory", default="data/trajectory_world.h5")
|
||||
p.add_argument("--fixes", default="data/sparse_fixes.h5")
|
||||
p.add_argument("--ply", default="")
|
||||
p.add_argument("--port", type=int, default=5051)
|
||||
args = p.parse_args()
|
||||
_TRAJ_H5 = args.trajectory
|
||||
_FIXES_H5 = args.fixes
|
||||
_PLY_PATH = args.ply
|
||||
app.run(host="0.0.0.0", port=args.port, debug=False)
|
||||
|
||||
if __name__ == "__main__":
|
||||
_main()
|
||||
Reference in New Issue
Block a user