feat: pipeline monitor + orchestrator stats dashboard

This commit is contained in:
Ubuntu
2026-05-11 10:55:44 +00:00
parent e597407ee5
commit 1a4fffd2c1
6 changed files with 182 additions and 1 deletions

View File

@@ -52,6 +52,7 @@ from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db"))
PIPELINE_DB = Path("/cosma-pipeline/state.db")
WORKERS = json.loads(os.environ.get("COSMA_QC_WORKERS", json.dumps([
{"host": "192.168.0.87", "ssh_alias": "gpu", "gpu": "RTX 3060 12GB"},
{"host": "192.168.0.84", "ssh_alias": "cosma-vm","gpu": "RTX 3090 24GB"},
@@ -295,12 +296,58 @@ async def partial_jobs(request: Request):
)
@app.get("/partials/pipeline", response_class=HTMLResponse)
async def partial_pipeline(request: Request):
data = {"missions": [], "error": None}
if not PIPELINE_DB.exists():
data["error"] = f"{PIPELINE_DB} introuvable"
else:
try:
import shutil, tempfile
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp:
tmp_path = tmp.name
shutil.copy2(str(PIPELINE_DB), tmp_path)
with sqlite3.connect(tmp_path) as conn:
conn.row_factory = sqlite3.Row
missions = conn.execute(
"SELECT * FROM missions ORDER BY created_at DESC LIMIT 20"
).fetchall()
for m in missions:
jobs = conn.execute(
"SELECT * FROM jobs WHERE mission_id=? ORDER BY stage, auv_id",
(m["id"],)
).fetchall()
counts = {}
for j in jobs:
counts[j["status"]] = counts.get(j["status"], 0) + 1
data["missions"].append({
"id": m["id"],
"name": m["name"],
"status": m["status"],
"jobs": [dict(j) for j in jobs],
"counts": counts,
})
except Exception as e:
data["error"] = str(e)[:200]
finally:
try:
import os
os.unlink(tmp_path)
except Exception:
pass
return templates.TemplateResponse("_pipeline.html", {"request": request, **data})
@app.get("/partials/monitor", response_class=HTMLResponse)
async def partial_monitor(request: Request):
stats = await asyncio.gather(*[_worker_stats(w) for w in WORKERS])
stats, orch = await asyncio.gather(
asyncio.gather(*[_worker_stats(w) for w in WORKERS]),
_orchestrator_stats(),
)
return templates.TemplateResponse("_monitor.html", {
"request": request,
"workers": stats,
"orchestrator": orch,
"dispatcher": _dispatcher_status(),
})
@@ -353,6 +400,35 @@ async def _worker_stats(worker: dict) -> dict:
return {**worker, "online": False, "error": str(e)[:80]}
async def _orchestrator_stats() -> dict:
base = {"host": "192.168.0.83", "role": "orchestrateur (.83)", "cpu": None, "ram_used_pct": None,
"ram_total_mib": None, "ssd_used_pct": None, "ssd_avail": None, "online": False}
try:
cmd = (
r"uptime | grep -oP 'load average: \K[\d., ]+' ; "
"free -m | awk '/^Mem:/{print $2,$3}' ; "
"df -h /mnt/ssd 2>/dev/null | tail -1 || echo '- - - - - -'"
)
proc = await asyncio.create_subprocess_exec(
"ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", "cosma-self", cmd,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
out, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
lines = out.decode().strip().splitlines()
load = lines[0].strip() if lines else "?"
ram = lines[1].split() if len(lines) > 1 else ["?", "?"]
disk = lines[2].split() if len(lines) > 2 else ["?"] * 6
total_mib = int(ram[0]) if ram[0].isdigit() else None
used_mib = int(ram[1]) if len(ram) > 1 and ram[1].isdigit() else None
ram_pct = int(used_mib * 100 / total_mib) if total_mib and used_mib else None
return {**base, "online": True, "cpu_load": load,
"ram_used_pct": ram_pct, "ram_total_mib": total_mib, "ram_used_mib": used_mib,
"ssd_used_pct": disk[4] if len(disk) > 4 else "?",
"ssd_avail": disk[3] if len(disk) > 3 else "?"}
except Exception as e:
return {**base, "error": str(e)[:80]}
@app.post("/jobs/{job_id}/cancel")
async def cancel_job(job_id: int):
with closing(db()) as conn: