diff --git a/pipeline_runner/main.py b/pipeline_runner/main.py index 68a3de5..a78905d 100644 --- a/pipeline_runner/main.py +++ b/pipeline_runner/main.py @@ -1,6 +1,7 @@ import asyncio import gzip import json +import time from pathlib import Path from typing import AsyncGenerator @@ -9,7 +10,7 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from .config import OUTPUT_DIR -from .runner import run_pipeline, scan_sorties +from .runner import run_pipeline, scan_sorties, scan_sorties_local app = FastAPI(title="COSMA Pipeline Runner") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) @@ -17,10 +18,51 @@ app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], all # Active pipeline jobs: sortie_id → asyncio.Queue _jobs: dict[str, asyncio.Queue] = {} +# Cache sorties avec TTL 10min +_sorties_cache: list | None = None +_sorties_cache_ts: float = 0.0 +_SORTIES_TTL = 600.0 # 10 minutes +_sorties_refresh_lock: asyncio.Lock | None = None + + +def _get_lock() -> asyncio.Lock: + global _sorties_refresh_lock + if _sorties_refresh_lock is None: + _sorties_refresh_lock = asyncio.Lock() + return _sorties_refresh_lock + + +async def _refresh_sorties_cache() -> None: + """Refresh cache in background (holds lock to avoid parallel rclone calls).""" + global _sorties_cache, _sorties_cache_ts + lock = _get_lock() + async with lock: + # Double-check after acquiring lock + if time.monotonic() - _sorties_cache_ts < _SORTIES_TTL: + return + result = await asyncio.to_thread(scan_sorties) + _sorties_cache = result + _sorties_cache_ts = time.monotonic() + @app.get("/sorties") async def list_sorties(): - return await scan_sorties() + global _sorties_cache, _sorties_cache_ts + now = time.monotonic() + if _sorties_cache is None: + # Premier appel: bloquant (cache vide) + await _refresh_sorties_cache() + elif now - _sorties_cache_ts >= _SORTIES_TTL: + # Cache périmé: retourne le cache, refresh en arrière-plan + asyncio.create_task(_refresh_sorties_cache()) + return _sorties_cache or [] + + +@app.get("/sorties/local") +async def list_sorties_local(): + """Scan /data/sorties local (NAS, instantané) sans rclone.""" + sorties = await asyncio.to_thread(scan_sorties_local) + return sorties @app.post("/run/{sortie_id:path}") diff --git a/pipeline_runner/runner.py b/pipeline_runner/runner.py index db70dc3..e9686f3 100644 --- a/pipeline_runner/runner.py +++ b/pipeline_runner/runner.py @@ -122,7 +122,7 @@ async def run_pipeline(sortie_id: str, queue: asyncio.Queue) -> None: async def scan_sorties() -> list[dict]: - """List available sorties on GDrive via rclone lsd.""" + """List available sorties on GDrive via rclone lsd (lent ~30s).""" result = subprocess.run( ["rclone", "lsd", GDRIVE_REMOTE], capture_output=True, text=True @@ -135,3 +135,15 @@ async def scan_sorties() -> list[dict]: processed = (OUTPUT_DIR / name / "processed" / "usv.json.gz").exists() sorties.append({"id": name, "processed": processed}) return sorties + + +def scan_sorties_local() -> list[dict]: + """List sorties already synced locally in OUTPUT_DIR (instantané, pas rclone).""" + if not OUTPUT_DIR.exists(): + return [] + sorties = [] + for d in sorted(OUTPUT_DIR.iterdir()): + if d.is_dir(): + processed = (d / "processed" / "usv.json.gz").exists() + sorties.append({"id": d.name, "processed": processed}) + return sorties diff --git a/viewer/index.html b/viewer/index.html index c6b4e0c..ee236f6 100644 --- a/viewer/index.html +++ b/viewer/index.html @@ -1121,29 +1121,60 @@ async function loadSortieData(sortieId) { } } -async function loadSorties() { - try { - const resp = await fetch(`${API2}/sorties`); - if (!resp.ok) return; - const sorties = await resp.json(); - const sel = document.getElementById('sortie-select'); - sorties.forEach(s => { - const opt = document.createElement('option'); - opt.value = s.id; - opt.textContent = s.id + (s.processed ? ' ✓' : ''); - sel.appendChild(opt); - }); - sel.addEventListener('change', () => { - const btn = document.getElementById('btn-sync'); - btn.disabled = !sel.value; - if (sel.value) { - const opt = sel.options[sel.selectedIndex]; - if (opt.textContent.includes('✓')) { - loadSortieData(sel.value); - } +function _wireSortieSelect() { + const sel = document.getElementById('sortie-select'); + // Evite double-binding si appelé plusieurs fois + if (sel._wired) return; + sel._wired = true; + sel.addEventListener('change', () => { + const btn = document.getElementById('btn-sync'); + btn.disabled = !sel.value; + if (sel.value) { + const opt = sel.options[sel.selectedIndex]; + if (opt.textContent.includes('✓')) { + loadSortieData(sel.value); } + } + }); +} + +function _populateSortieSelect(sorties) { + const sel = document.getElementById('sortie-select'); + // Vider sauf première option placeholder + while (sel.options.length > 1) sel.remove(1); + if (!sorties || !sorties.length) { + sel.options[0].textContent = '— Aucune sortie —'; + return; + } + sel.options[0].textContent = '— Sortie —'; + sorties.forEach(s => { + const opt = document.createElement('option'); + opt.value = s.id; + opt.textContent = s.id + (s.processed ? ' ✓' : ''); + sel.appendChild(opt); + }); + _wireSortieSelect(); +} + +function loadSorties() { + const sel = document.getElementById('sortie-select'); + sel.options[0].textContent = 'Sorties: chargement…'; + + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), 5000); + + fetch(`${API2}/sorties`, { signal: controller.signal }) + .then(resp => { + clearTimeout(timeoutId); + if (!resp.ok) throw new Error('HTTP ' + resp.status); + return resp.json(); + }) + .then(sorties => { _populateSortieSelect(sorties); }) + .catch(e => { + clearTimeout(timeoutId); + sel.options[0].textContent = '— Pipeline indisponible —'; + console.warn('loadSorties:', e.message); }); - } catch(e) { console.warn('pipeline-runner unavailable', e); } } document.getElementById('btn-sync').addEventListener('click', async () => {