From 1a4fffd2c1f9bdb29f2f474b274f582b4cadcfaa Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 11 May 2026 10:55:44 +0000 Subject: [PATCH] feat: pipeline monitor + orchestrator stats dashboard --- app/main.py | 78 +++++++++++++++++++++++++++++++++++- app/static/style.css | 22 ++++++++++ app/templates/_monitor.html | 27 +++++++++++++ app/templates/_pipeline.html | 47 ++++++++++++++++++++++ app/templates/index.html | 7 ++++ docker-compose.yml | 2 + 6 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 app/templates/_pipeline.html diff --git a/app/main.py b/app/main.py index eb448ef..ecc94a8 100644 --- a/app/main.py +++ b/app/main.py @@ -52,6 +52,7 @@ from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db")) +PIPELINE_DB = Path("/cosma-pipeline/state.db") WORKERS = json.loads(os.environ.get("COSMA_QC_WORKERS", json.dumps([ {"host": "192.168.0.87", "ssh_alias": "gpu", "gpu": "RTX 3060 12GB"}, {"host": "192.168.0.84", "ssh_alias": "cosma-vm","gpu": "RTX 3090 24GB"}, @@ -295,12 +296,58 @@ async def partial_jobs(request: Request): ) +@app.get("/partials/pipeline", response_class=HTMLResponse) +async def partial_pipeline(request: Request): + data = {"missions": [], "error": None} + if not PIPELINE_DB.exists(): + data["error"] = f"{PIPELINE_DB} introuvable" + else: + try: + import shutil, tempfile + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: + tmp_path = tmp.name + shutil.copy2(str(PIPELINE_DB), tmp_path) + with sqlite3.connect(tmp_path) as conn: + conn.row_factory = sqlite3.Row + missions = conn.execute( + "SELECT * FROM missions ORDER BY created_at DESC LIMIT 20" + ).fetchall() + for m in missions: + jobs = conn.execute( + "SELECT * FROM jobs WHERE mission_id=? ORDER BY stage, auv_id", + (m["id"],) + ).fetchall() + counts = {} + for j in jobs: + counts[j["status"]] = counts.get(j["status"], 0) + 1 + data["missions"].append({ + "id": m["id"], + "name": m["name"], + "status": m["status"], + "jobs": [dict(j) for j in jobs], + "counts": counts, + }) + except Exception as e: + data["error"] = str(e)[:200] + finally: + try: + import os + os.unlink(tmp_path) + except Exception: + pass + return templates.TemplateResponse("_pipeline.html", {"request": request, **data}) + + @app.get("/partials/monitor", response_class=HTMLResponse) async def partial_monitor(request: Request): - stats = await asyncio.gather(*[_worker_stats(w) for w in WORKERS]) + stats, orch = await asyncio.gather( + asyncio.gather(*[_worker_stats(w) for w in WORKERS]), + _orchestrator_stats(), + ) return templates.TemplateResponse("_monitor.html", { "request": request, "workers": stats, + "orchestrator": orch, "dispatcher": _dispatcher_status(), }) @@ -353,6 +400,35 @@ async def _worker_stats(worker: dict) -> dict: return {**worker, "online": False, "error": str(e)[:80]} +async def _orchestrator_stats() -> dict: + base = {"host": "192.168.0.83", "role": "orchestrateur (.83)", "cpu": None, "ram_used_pct": None, + "ram_total_mib": None, "ssd_used_pct": None, "ssd_avail": None, "online": False} + try: + cmd = ( + r"uptime | grep -oP 'load average: \K[\d., ]+' ; " + "free -m | awk '/^Mem:/{print $2,$3}' ; " + "df -h /mnt/ssd 2>/dev/null | tail -1 || echo '- - - - - -'" + ) + proc = await asyncio.create_subprocess_exec( + "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", "cosma-self", cmd, + stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, + ) + out, _ = await asyncio.wait_for(proc.communicate(), timeout=5) + lines = out.decode().strip().splitlines() + load = lines[0].strip() if lines else "?" + ram = lines[1].split() if len(lines) > 1 else ["?", "?"] + disk = lines[2].split() if len(lines) > 2 else ["?"] * 6 + total_mib = int(ram[0]) if ram[0].isdigit() else None + used_mib = int(ram[1]) if len(ram) > 1 and ram[1].isdigit() else None + ram_pct = int(used_mib * 100 / total_mib) if total_mib and used_mib else None + return {**base, "online": True, "cpu_load": load, + "ram_used_pct": ram_pct, "ram_total_mib": total_mib, "ram_used_mib": used_mib, + "ssd_used_pct": disk[4] if len(disk) > 4 else "?", + "ssd_avail": disk[3] if len(disk) > 3 else "?"} + except Exception as e: + return {**base, "error": str(e)[:80]} + + @app.post("/jobs/{job_id}/cancel") async def cancel_job(job_id: int): with closing(db()) as conn: diff --git a/app/static/style.css b/app/static/style.css index 92c5260..cd86bdc 100644 --- a/app/static/style.css +++ b/app/static/style.css @@ -198,3 +198,25 @@ code { background: rgba(255,255,255,0.05); padding: 0 0.25rem; border-radius: 3p .viewer-btn { background: #1a3a2a; color: #4ade80; border: 1px solid #4ade80; border-radius: 3px; padding: 2px 8px; cursor: pointer; font-size: 0.8rem; } .viewer-btn:hover { background: #4ade80; color: #0a1a10; } .viewer-btn:disabled { opacity: 0.5; cursor: wait; } + +/* ==== Pipeline section ==== */ +.pipeline-mission { margin-bottom: 1rem; } +.pm-header { display: flex; align-items: center; gap: 0.75rem; margin-bottom: 0.4rem; flex-wrap: wrap; } +.pm-name { font-weight: 600; color: var(--accent); } +.pm-status { font-size: 0.75rem; padding: 0.1rem 0.4rem; border-radius: 4px; text-transform: uppercase; font-weight: 600; } +.pm-counts { display: flex; gap: 0.4rem; flex-wrap: wrap; } +.cnt { font-size: 0.72rem; padding: 0.1rem 0.35rem; border-radius: 3px; background: rgba(255,255,255,0.05); } +.cnt.ok { color: var(--ok); } .cnt.busy { color: var(--accent); } .cnt.warn { color: var(--warn); } .cnt.err { color: var(--err); } + +.pipeline-jobs-table { width: 100%; border-collapse: collapse; font-size: 0.82rem; } +.pipeline-jobs-table th { text-align: left; padding: 3px 8px; color: var(--muted); font-size: 0.70rem; text-transform: uppercase; border-bottom: 1px solid var(--border); } +.pipeline-jobs-table td { padding: 4px 8px; border-bottom: 1px solid rgba(255,255,255,0.03); } +.pipeline-jobs-table tr.pj-err-row td { padding: 0 8px 4px; } + +.pj-badge { font-size: 0.70rem; padding: 1px 5px; border-radius: 3px; text-transform: uppercase; font-weight: 600; } +.status-done, .pj-badge.status-done { color: var(--ok); background: rgba(61,220,132,0.1); } +.status-running, .pj-badge.status-running { color: var(--accent); background: rgba(95,208,255,0.1); } +.status-queued, .pj-badge.status-queued { color: var(--muted); } +.status-degraded, .pj-badge.status-degraded { color: var(--warn); background: rgba(245,197,24,0.1); } +.status-error, .pj-badge.status-error { color: var(--err); background: rgba(255,92,122,0.1); } +.status-ingested, .pm-status.status-ingested { color: var(--accent); background: rgba(95,208,255,0.12); } diff --git a/app/templates/_monitor.html b/app/templates/_monitor.html index 8208e6a..20b2d17 100644 --- a/app/templates/_monitor.html +++ b/app/templates/_monitor.html @@ -12,6 +12,33 @@ +{% if orchestrator %} +
+
+ {{ orchestrator.role }} + orchestrateur + {% if orchestrator.online %}online{% else %}offline{% endif %} +
+ {% if orchestrator.online %} +
+ CPU + {{ orchestrator.cpu_load or '?' }} +
+
+ RAM + + {{ orchestrator.ram_used_mib or '?' }} / {{ orchestrator.ram_total_mib or '?' }} MiB +
+
+ SSD {{ orchestrator.ssd_avail }} dispo + {{ orchestrator.ssd_used_pct }} utilise +
+ {% else %} +
{{ orchestrator.error or "unreachable" }}
+ {% endif %} +
+{% endif %} +
{% for w in workers %}
diff --git a/app/templates/_pipeline.html b/app/templates/_pipeline.html new file mode 100644 index 0000000..c3d5d20 --- /dev/null +++ b/app/templates/_pipeline.html @@ -0,0 +1,47 @@ +{% if error %} +

{{ error }}

+{% elif not missions %} +

Aucune mission dans state.db.

+{% else %} +{% for m in missions %} +
+
+ {{ m.name }} + {{ m.status }} + + {% if m.counts.get('done') %}{{ m.counts.done }} done{% endif %} + {% if m.counts.get('running') %}{{ m.counts.running }} running{% endif %} + {% if m.counts.get('queued') %}{{ m.counts.queued }} queued{% endif %} + {% if m.counts.get('degraded') %}{{ m.counts.degraded }} degraded{% endif %} + {% if m.counts.get('error') %}{{ m.counts.error }} error{% endif %} + +
+ + + + + + {% for j in m.jobs %} + + + + + + + + + {% if j.error_msg %} + + {% endif %} + {% endfor %} + +
AUVSegmentStageStatusWorkerDuree
{{ j.auv_id }}{{ j.segment_label or '-' }}{{ j.stage }}{{ j.status }}{{ j.worker_host or '-' }} + {% if j.started_at and j.finished_at %} + {{ j.finished_at[11:16] if j.finished_at else '' }} + {% elif j.started_at %} + {{ j.started_at[11:16] }} → + {% else %}-{% endif %} +
{{ j.error_msg[:120] }}
+
+{% endfor %} +{% endif %} diff --git a/app/templates/index.html b/app/templates/index.html index c51659c..4da8543 100644 --- a/app/templates/index.html +++ b/app/templates/index.html @@ -18,6 +18,13 @@

Chargement des workers…

+
+

Pipeline reconstruction

+
+

Chargement pipeline...

+
+
+

Jobs

diff --git a/docker-compose.yml b/docker-compose.yml index e37e9d5..afbee90 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,8 @@ services: volumes: - /home/cosma/cosma-qc-data:/var/lib/cosma-qc - /home/cosma/.ssh:/ssh-in:ro + - /home/cosma/cosma-pipeline:/cosma-pipeline:ro + - /mnt/ssd:/mnt/ssd:ro environment: COSMA_QC_WORKERS: | [