Files

222 lines
7.2 KiB
Python

import asyncio
import csv
import gzip
import json
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import AsyncGenerator
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from .config import OUTPUT_DIR
from .runner import run_pipeline, scan_sorties, scan_sorties_local
app = FastAPI(title="COSMA Pipeline Runner")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
# Active pipeline jobs: sortie_id → asyncio.Queue
_jobs: dict[str, asyncio.Queue] = {}
# Cache sorties avec TTL 10min
_sorties_cache: list | None = None
_sorties_cache_ts: float = 0.0
_SORTIES_TTL = 600.0 # 10 minutes
_sorties_refresh_lock: asyncio.Lock | None = None
def _get_lock() -> asyncio.Lock:
global _sorties_refresh_lock
if _sorties_refresh_lock is None:
_sorties_refresh_lock = asyncio.Lock()
return _sorties_refresh_lock
async def _refresh_sorties_cache() -> None:
"""Refresh cache in background (holds lock to avoid parallel rclone calls)."""
global _sorties_cache, _sorties_cache_ts
lock = _get_lock()
async with lock:
# Double-check after acquiring lock
if time.monotonic() - _sorties_cache_ts < _SORTIES_TTL:
return
result = await asyncio.to_thread(scan_sorties)
_sorties_cache = result
_sorties_cache_ts = time.monotonic()
@app.get("/sorties")
async def list_sorties():
global _sorties_cache, _sorties_cache_ts
now = time.monotonic()
if _sorties_cache is None:
# Premier appel: bloquant (cache vide)
await _refresh_sorties_cache()
elif now - _sorties_cache_ts >= _SORTIES_TTL:
# Cache périmé: retourne le cache, refresh en arrière-plan
asyncio.create_task(_refresh_sorties_cache())
return _sorties_cache or []
@app.get("/sorties/local")
async def list_sorties_local():
"""Scan /data/sorties local (NAS, instantané) sans rclone."""
sorties = await asyncio.to_thread(scan_sorties_local)
return sorties
@app.post("/run/{sortie_id:path}")
async def run_sortie(sortie_id: str):
if sortie_id in _jobs:
return {"status": "already_running"}
queue: asyncio.Queue = asyncio.Queue()
_jobs[sortie_id] = queue
asyncio.create_task(_run_and_cleanup(sortie_id, queue))
return {"status": "started"}
async def _run_and_cleanup(sortie_id: str, queue: asyncio.Queue):
try:
await run_pipeline(sortie_id, queue)
finally:
await asyncio.sleep(30)
_jobs.pop(sortie_id, None)
@app.get("/events/{sortie_id:path}")
async def sse_events(sortie_id: str):
if sortie_id not in _jobs:
raise HTTPException(404, "No active job for this sortie")
async def generate() -> AsyncGenerator[str, None]:
queue = _jobs[sortie_id]
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=60)
yield f"data: {json.dumps(event)}\n\n"
if event.get("step") in ("error", "write") and event.get("pct") in (0, 100):
break
except asyncio.TimeoutError:
yield "data: {\"step\":\"ping\"}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
def _read_gz(path: Path) -> dict:
with gzip.open(path) as f:
return json.loads(f.read())
@app.get("/sorties/{sortie_id:path}/usv")
async def get_usv(sortie_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / "usv.json.gz"
if not p.exists():
raise HTTPException(404, "USV data not found — run pipeline first")
return JSONResponse(_read_gz(p))
@app.get("/sorties/{sortie_id:path}/auvs")
async def list_auvs(sortie_id: str):
proc = OUTPUT_DIR / sortie_id / "processed"
auvs = [p.name.removesuffix(".json.gz").removeprefix("auv_")
for p in proc.glob("auv_*.json.gz")]
return sorted(auvs)
@app.get("/sorties/{sortie_id:path}/auv/{auv_id}")
async def get_auv(sortie_id: str, auv_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / f"auv_{auv_id}.json.gz"
if not p.exists():
raise HTTPException(404, f"AUV {auv_id} data not found")
return JSONResponse(_read_gz(p))
@app.get("/sorties/{sortie_id:path}/tracks")
async def get_tracks(sortie_id: str):
p = OUTPUT_DIR / sortie_id / "processed" / "tracks.geojson"
if not p.exists():
raise HTTPException(404, "tracks.geojson not found")
with open(p) as f:
return JSONResponse(json.load(f))
def _ts_nav(ts_str: str) -> float:
for fmt in ("%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.strptime(ts_str.strip(), fmt).replace(tzinfo=timezone.utc).timestamp()
except ValueError:
continue
return 0.0
def _read_usv_track(nav_log_path: Path, max_pts: int = 2000) -> list[dict]:
"""Read navigation_log.csv → [{t_ms, lat, lon, heading, source}] downsampled."""
pts: list[dict] = []
lat_map: dict[float, float] = {}
lon_map: dict[float, float] = {}
heading_map: dict[float, float] = {}
with open(nav_log_path, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
next(reader, None) # skip header
for row in reader:
if len(row) < 3:
continue
ts_str, field, val = row[0], row[1], row[2]
if field not in ("Lat", "Lon", "Heading"):
continue
t = _ts_nav(ts_str)
try:
v = float(val)
except ValueError:
continue
if field == "Lat":
lat_map[t] = v
elif field == "Lon":
lon_map[t] = v
else:
heading_map[t] = v
# Join on lat timestamps (master)
source = nav_log_path.parent.name
for t, lat in sorted(lat_map.items()):
lon = lon_map.get(t)
if lon is None:
# nearest lon within 1s
near = min(lon_map.keys(), key=lambda x: abs(x - t), default=None)
if near is None or abs(near - t) > 1.0:
continue
lon = lon_map[near]
pts.append({
"t_ms": int(t * 1000),
"lat": lat,
"lon": lon,
"heading": heading_map.get(t),
"source": source,
})
# Simple stride downsampling
if len(pts) > max_pts:
step = len(pts) // max_pts
pts = pts[::step]
return pts
_track_cache: dict[str, list[dict]] = {}
@app.get("/sorties/{sortie_id:path}/usv_track")
async def get_usv_track(sortie_id: str):
"""Return USV GPS track [{t_ms, lat, lon, heading, source}] for map polylines."""
if sortie_id in _track_cache:
return JSONResponse(_track_cache[sortie_id])
raw_dir = OUTPUT_DIR / sortie_id / "raw"
nav_logs = list(raw_dir.rglob("*_navigation_log.csv")) if raw_dir.exists() else []
if not nav_logs:
raise HTTPException(404, "No navigation_log.csv found — run pipeline first")
pts: list[dict] = []
for nav_log in nav_logs:
pts.extend(await asyncio.to_thread(_read_usv_track, nav_log))
pts.sort(key=lambda p: p["t_ms"])
_track_cache[sortie_id] = pts
return JSONResponse(pts)