import asyncio import gzip import json import time from pathlib import Path from typing import AsyncGenerator from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from .config import OUTPUT_DIR from .runner import run_pipeline, scan_sorties, scan_sorties_local app = FastAPI(title="COSMA Pipeline Runner") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) # Active pipeline jobs: sortie_id → asyncio.Queue _jobs: dict[str, asyncio.Queue] = {} # Cache sorties avec TTL 10min _sorties_cache: list | None = None _sorties_cache_ts: float = 0.0 _SORTIES_TTL = 600.0 # 10 minutes _sorties_refresh_lock: asyncio.Lock | None = None def _get_lock() -> asyncio.Lock: global _sorties_refresh_lock if _sorties_refresh_lock is None: _sorties_refresh_lock = asyncio.Lock() return _sorties_refresh_lock async def _refresh_sorties_cache() -> None: """Refresh cache in background (holds lock to avoid parallel rclone calls).""" global _sorties_cache, _sorties_cache_ts lock = _get_lock() async with lock: # Double-check after acquiring lock if time.monotonic() - _sorties_cache_ts < _SORTIES_TTL: return result = await asyncio.to_thread(scan_sorties) _sorties_cache = result _sorties_cache_ts = time.monotonic() @app.get("/sorties") async def list_sorties(): global _sorties_cache, _sorties_cache_ts now = time.monotonic() if _sorties_cache is None: # Premier appel: bloquant (cache vide) await _refresh_sorties_cache() elif now - _sorties_cache_ts >= _SORTIES_TTL: # Cache périmé: retourne le cache, refresh en arrière-plan asyncio.create_task(_refresh_sorties_cache()) return _sorties_cache or [] @app.get("/sorties/local") async def list_sorties_local(): """Scan /data/sorties local (NAS, instantané) sans rclone.""" sorties = await asyncio.to_thread(scan_sorties_local) return sorties @app.post("/run/{sortie_id:path}") async def run_sortie(sortie_id: str): if sortie_id in _jobs: return {"status": "already_running"} queue: asyncio.Queue = asyncio.Queue() _jobs[sortie_id] = queue asyncio.create_task(_run_and_cleanup(sortie_id, queue)) return {"status": "started"} async def _run_and_cleanup(sortie_id: str, queue: asyncio.Queue): try: await run_pipeline(sortie_id, queue) finally: await asyncio.sleep(30) _jobs.pop(sortie_id, None) @app.get("/events/{sortie_id:path}") async def sse_events(sortie_id: str): if sortie_id not in _jobs: raise HTTPException(404, "No active job for this sortie") async def generate() -> AsyncGenerator[str, None]: queue = _jobs[sortie_id] while True: try: event = await asyncio.wait_for(queue.get(), timeout=60) yield f"data: {json.dumps(event)}\n\n" if event.get("step") in ("error", "write") and event.get("pct") in (0, 100): break except asyncio.TimeoutError: yield "data: {\"step\":\"ping\"}\n\n" return StreamingResponse(generate(), media_type="text/event-stream") def _read_gz(path: Path) -> dict: with gzip.open(path) as f: return json.loads(f.read()) @app.get("/sorties/{sortie_id:path}/usv") async def get_usv(sortie_id: str): p = OUTPUT_DIR / sortie_id / "processed" / "usv.json.gz" if not p.exists(): raise HTTPException(404, "USV data not found — run pipeline first") return JSONResponse(_read_gz(p)) @app.get("/sorties/{sortie_id:path}/auvs") async def list_auvs(sortie_id: str): proc = OUTPUT_DIR / sortie_id / "processed" auvs = [p.name.removesuffix(".json.gz").removeprefix("auv_") for p in proc.glob("auv_*.json.gz")] return sorted(auvs) @app.get("/sorties/{sortie_id:path}/auv/{auv_id}") async def get_auv(sortie_id: str, auv_id: str): p = OUTPUT_DIR / sortie_id / "processed" / f"auv_{auv_id}.json.gz" if not p.exists(): raise HTTPException(404, f"AUV {auv_id} data not found") return JSONResponse(_read_gz(p)) @app.get("/sorties/{sortie_id:path}/tracks") async def get_tracks(sortie_id: str): p = OUTPUT_DIR / sortie_id / "processed" / "tracks.geojson" if not p.exists(): raise HTTPException(404, "tracks.geojson not found") with open(p) as f: return JSONResponse(json.load(f))