Serveur FastAPI reçoit le flux JSONL (sim ou ROV réel) sur /ws/ingest, SLAM incrémental, rediffuse carte+poses sur /ws/live, GUI live et export PLY. Déployé Docker sur caddy-net, exposé /moulin-live/. Client PC stream_client.py. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
229 lines
7.4 KiB
Python
229 lines
7.4 KiB
Python
"""
|
|
app.py — Serveur FastAPI moulin-mapper.
|
|
|
|
Lance avec : uvicorn app:app --host 0.0.0.0 --port 8211
|
|
Déployé derrière préfixe /moulin-live/ via reverse-proxy Caddy.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import time
|
|
from typing import Set
|
|
|
|
import numpy as np
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Form, Request
|
|
from fastapi.responses import HTMLResponse, PlainTextResponse, JSONResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
|
|
from slam_incremental import IncrementalSLAM
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Config
|
|
# ---------------------------------------------------------------------------
|
|
|
|
TOKEN = os.environ.get("MOULIN_TOKEN", "moulin-2026")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# État global de session
|
|
# ---------------------------------------------------------------------------
|
|
|
|
slam = IncrementalSLAM()
|
|
session_records: list = [] # tous les enregistrements bruts reçus
|
|
session_start_time: float = time.time()
|
|
debit_window: list = [] # timestamps récents pour calcul débit
|
|
|
|
# Clients /ws/live connectés
|
|
live_clients: Set[WebSocket] = set()
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# App FastAPI (root_path vide — le préfixe est géré par Caddy côté client)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
app = FastAPI(title="moulin-mapper live")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Broadcast aux clients /ws/live
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def broadcast(payload: dict) -> None:
|
|
if not live_clients:
|
|
return
|
|
msg = json.dumps(payload)
|
|
dead = set()
|
|
for ws in list(live_clients):
|
|
try:
|
|
await ws.send_text(msg)
|
|
except Exception:
|
|
dead.add(ws)
|
|
live_clients.difference_update(dead)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WS /ws/ingest — reçoit le flux du client ROV/sim
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.websocket("/ws/ingest")
|
|
async def ws_ingest(websocket: WebSocket, token: str = ""):
|
|
if token != TOKEN:
|
|
await websocket.close(code=4001)
|
|
return
|
|
|
|
await websocket.accept()
|
|
try:
|
|
while True:
|
|
raw = await websocket.receive_text()
|
|
try:
|
|
data = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# Accepte un seul record ou une liste
|
|
records = data if isinstance(data, list) else [data]
|
|
|
|
sweep_completed = False
|
|
for rec in records:
|
|
session_records.append(rec)
|
|
debit_window.append(time.time())
|
|
# Nettoie la fenêtre (5 secondes)
|
|
cutoff = time.time() - 5.0
|
|
while debit_window and debit_window[0] < cutoff:
|
|
debit_window.pop(0)
|
|
|
|
completed = slam.add_record(rec)
|
|
if completed:
|
|
sweep_completed = True
|
|
|
|
if sweep_completed:
|
|
delta = slam.get_sweep_delta()
|
|
delta["debit"] = len(debit_window) / 5.0
|
|
await broadcast(delta)
|
|
|
|
except WebSocketDisconnect:
|
|
pass
|
|
except Exception as e:
|
|
print(f"[ingest] erreur: {e}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# WS /ws/live — navigateur s'abonne
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.websocket("/ws/live")
|
|
async def ws_live(websocket: WebSocket):
|
|
await websocket.accept()
|
|
live_clients.add(websocket)
|
|
|
|
# Envoie l'état complet courant dès la connexion
|
|
try:
|
|
snapshot = slam.get_state_snapshot()
|
|
snapshot["debit"] = len(debit_window) / 5.0
|
|
await websocket.send_text(json.dumps(snapshot))
|
|
|
|
# Garde la connexion ouverte jusqu'à déconnexion
|
|
while True:
|
|
try:
|
|
# ping keepalive (pas de timeout natif FastAPI WS)
|
|
await asyncio.wait_for(websocket.receive_text(), timeout=30.0)
|
|
except asyncio.TimeoutError:
|
|
await websocket.send_text(json.dumps({"type": "ping"}))
|
|
except WebSocketDisconnect:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
live_clients.discard(websocket)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /session/reset
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.post("/session/reset")
|
|
async def session_reset(token: str = Form(...)):
|
|
if token != TOKEN:
|
|
raise HTTPException(status_code=403, detail="Token invalide")
|
|
slam.reset()
|
|
session_records.clear()
|
|
debit_window.clear()
|
|
await broadcast({"type": "reset"})
|
|
return {"status": "ok"}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /healthz
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/healthz")
|
|
async def healthz():
|
|
return {
|
|
"status": "ok",
|
|
"records": slam.records_count,
|
|
"sweeps": slam.sweeps_done,
|
|
"map_points": len(slam.imap),
|
|
"loop_closures": slam.loop_closures,
|
|
"live_clients": len(live_clients),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /cloud.ply — export nuage 3D (contour extrudé altitude/depth)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/cloud.ply")
|
|
async def cloud_ply():
|
|
map_arr = slam.imap.get_array()
|
|
if len(map_arr) == 0:
|
|
raise HTTPException(status_code=204, detail="Carte vide")
|
|
|
|
# Extrude les points 2D sur l'intervalle [depth - altitude, depth]
|
|
# Utilise les dernières valeurs connues depuis les enregistrements
|
|
depth = 1.5
|
|
altitude = 0.5
|
|
if session_records:
|
|
last = session_records[-1]
|
|
depth = last.get("depth", 1.5)
|
|
altitude = last.get("altitude", 0.5)
|
|
|
|
z_top = -depth + altitude # fond (proche du bas)
|
|
z_bot = -depth # surface de l'eau (z=0)
|
|
|
|
pts_3d = []
|
|
for pt in map_arr:
|
|
pts_3d.append((pt[0], pt[1], z_top))
|
|
pts_3d.append((pt[0], pt[1], z_bot))
|
|
|
|
lines = [
|
|
"ply",
|
|
"format ascii 1.0",
|
|
f"element vertex {len(pts_3d)}",
|
|
"property float x",
|
|
"property float y",
|
|
"property float z",
|
|
"end_header",
|
|
]
|
|
for p in pts_3d:
|
|
lines.append(f"{p[0]:.4f} {p[1]:.4f} {p[2]:.4f}")
|
|
|
|
content = "\n".join(lines) + "\n"
|
|
from fastapi.responses import Response
|
|
return Response(
|
|
content=content,
|
|
media_type="application/octet-stream",
|
|
headers={"Content-Disposition": "attachment; filename=moulin_cloud.ply"},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fichiers statiques + page principale
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def index():
|
|
static_path = os.path.join(os.path.dirname(__file__), "static", "index.html")
|
|
with open(static_path, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
|
|
app.mount("/static", StaticFiles(directory=os.path.join(os.path.dirname(__file__), "static")), name="static")
|