Files
cosma-qc/app/main.py
Poulpe 654bb47825 monitor : temp GPU, conso watts, espace disque, heartbeat dispatcher
- nvidia-smi : +temperature.gpu + power.draw
- UI : tags °C / W / espace disque libre
- Dispatcher heartbeat toutes les 4s → point vert/rouge en haut du monitor
- Fix Docker SSH : copie + chmod 600 au démarrage (Bad owner)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-21 10:42:44 +00:00

313 lines
10 KiB
Python

from __future__ import annotations
import asyncio
import json
import os
import sqlite3
from contextlib import asynccontextmanager, closing
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
def _fmt_dur(seconds: float) -> str:
if seconds is None or seconds < 0:
return ""
s = int(seconds)
if s < 60:
return f"{s}s"
m, s = divmod(s, 60)
if m < 60:
return f"{m}m{s:02d}s" if s else f"{m}m"
h, m = divmod(m, 60)
if h < 24:
return f"{h}h{m:02d}m" if m else f"{h}h"
d, h = divmod(h, 24)
return f"{d}d{h:02d}h"
def _parse_ts(s: str | None) -> datetime | None:
if not s:
return None
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except Exception:
return None
def _job_duration_s(job: sqlite3.Row) -> int:
start = _parse_ts(job["started_at"])
end = _parse_ts(job["finished_at"]) or datetime.now(timezone.utc)
if not start:
return 0
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
if end.tzinfo is None:
end = end.replace(tzinfo=timezone.utc)
return int((end - start).total_seconds())
from fastapi import FastAPI, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db"))
WORKERS = json.loads(os.environ.get("COSMA_QC_WORKERS", json.dumps([
{"host": "192.168.0.87", "ssh_alias": "gpu", "gpu": "RTX 3060 12GB"},
{"host": "192.168.0.84", "ssh_alias": "cosma-vm","gpu": "RTX 3090 24GB"},
])))
STATUSES = ("queued", "extracting", "running", "done", "error")
def db() -> sqlite3.Connection:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_PATH, isolation_level=None)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
def init_schema() -> None:
with closing(db()) as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS acquisitions (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
source_path TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY,
acquisition_id INTEGER NOT NULL REFERENCES acquisitions(id) ON DELETE CASCADE,
auv TEXT NOT NULL,
gopro_serial TEXT NOT NULL,
segment_label TEXT NOT NULL,
video_paths TEXT NOT NULL,
frame_count INTEGER,
frames_dir TEXT,
status TEXT NOT NULL DEFAULT 'queued',
worker_host TEXT,
viser_url TEXT,
ply_path TEXT,
progress INTEGER NOT NULL DEFAULT 0,
log_tail TEXT,
error TEXT,
started_at TEXT,
finished_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS jobs_status_idx ON jobs(status);
CREATE INDEX IF NOT EXISTS jobs_acq_idx ON jobs(acquisition_id);
CREATE TABLE IF NOT EXISTS stitches (
id INTEGER PRIMARY KEY,
acquisition_id INTEGER NOT NULL REFERENCES acquisitions(id) ON DELETE CASCADE,
level TEXT NOT NULL DEFAULT 'per_auv',
auv TEXT,
input_job_ids TEXT NOT NULL DEFAULT '[]',
input_stitch_ids TEXT NOT NULL DEFAULT '[]',
output_ply TEXT,
status TEXT NOT NULL DEFAULT 'queued',
worker_host TEXT,
started_at TEXT,
finished_at TEXT,
error TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS stitches_acq_idx ON stitches(acquisition_id);
""")
@asynccontextmanager
async def lifespan(_: FastAPI):
init_schema()
yield
app = FastAPI(title="cosma-qc", lifespan=lifespan)
templates = Jinja2Templates(directory=Path(__file__).parent / "templates")
app.mount("/static", StaticFiles(directory=Path(__file__).parent / "static"), name="static")
def _build_acquisitions():
with closing(db()) as conn:
acqs = conn.execute(
"SELECT * FROM acquisitions ORDER BY created_at DESC"
).fetchall()
jobs = conn.execute(
"SELECT * FROM jobs ORDER BY auv, gopro_serial, segment_label"
).fetchall()
stitches = conn.execute(
"SELECT * FROM stitches ORDER BY level DESC, auv"
).fetchall()
by_acq: dict[int, list[dict]] = {}
by_acq_total: dict[int, int] = {}
for j in jobs:
d = dict(j)
dur_s = _job_duration_s(j)
d["_duration"] = _fmt_dur(dur_s)
by_acq.setdefault(j["acquisition_id"], []).append(d)
by_acq_total[j["acquisition_id"]] = by_acq_total.get(j["acquisition_id"], 0) + dur_s
stitches_by_acq: dict[int, list[dict]] = {}
for s in stitches:
d = dict(s)
start = _parse_ts(s["started_at"])
end = _parse_ts(s["finished_at"]) or (
datetime.now(timezone.utc) if s["status"] == "running" else None
)
if start and end:
if start.tzinfo is None:
start = start.replace(tzinfo=timezone.utc)
if end.tzinfo is None:
end = end.replace(tzinfo=timezone.utc)
d["_duration"] = _fmt_dur(int((end - start).total_seconds()))
else:
d["_duration"] = ""
stitches_by_acq.setdefault(s["acquisition_id"], []).append(d)
return [
{
"id": acq["id"],
"name": acq["name"],
"source_path": acq["source_path"],
"jobs": by_acq.get(acq["id"], []),
"stitches": stitches_by_acq.get(acq["id"], []),
"total_duration": _fmt_dur(by_acq_total.get(acq["id"], 0)),
}
for acq in acqs
]
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
acquisitions = _build_acquisitions()
return templates.TemplateResponse("index.html", {
"request": request,
"acquisitions": acquisitions,
"workers": WORKERS,
})
@app.get("/api/jobs")
async def list_jobs():
with closing(db()) as conn:
rows = conn.execute("SELECT * FROM jobs ORDER BY created_at DESC LIMIT 500").fetchall()
return [dict(r) for r in rows]
@app.get("/partials/jobs", response_class=HTMLResponse)
async def partial_jobs(request: Request):
return templates.TemplateResponse(
"_jobs_table.html",
{"request": request, "acquisitions": _build_acquisitions()},
)
@app.get("/partials/monitor", response_class=HTMLResponse)
async def partial_monitor(request: Request):
stats = await asyncio.gather(*[_worker_stats(w) for w in WORKERS])
return templates.TemplateResponse("_monitor.html", {
"request": request,
"workers": stats,
"dispatcher": _dispatcher_status(),
})
def _dispatcher_status() -> dict:
hb = DB_PATH.parent / "dispatcher.heartbeat"
try:
ts = _parse_ts(hb.read_text().strip())
if ts:
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age = int((datetime.now(timezone.utc) - ts).total_seconds())
return {"alive": age < 30, "age_s": age}
except Exception:
pass
return {"alive": False, "age_s": None}
async def _worker_stats(worker: dict) -> dict:
alias = worker["ssh_alias"]
try:
proc = await asyncio.create_subprocess_exec(
"ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", alias,
"nvidia-smi --query-gpu=memory.used,memory.total,utilization.gpu,temperature.gpu,power.draw --format=csv,noheader,nounits && df -h / | tail -1",
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
out, _ = await asyncio.wait_for(proc.communicate(), timeout=5)
text = out.decode().strip().splitlines()
g = [x.strip() for x in text[0].split(",")] if text else ["?"] * 5
disk = text[1].split() if len(text) > 1 else ["?"] * 6
def _int(v: str):
try:
return int(float(v))
except Exception:
return None
return {
**worker,
"online": True,
"vram_used_mib": _int(g[0]) if len(g) > 0 else None,
"vram_total_mib": _int(g[1]) if len(g) > 1 else None,
"gpu_util_pct": _int(g[2]) if len(g) > 2 else None,
"gpu_temp_c": _int(g[3]) if len(g) > 3 else None,
"gpu_power_w": _int(g[4]) if len(g) > 4 else None,
"disk_used_pct": disk[4] if len(disk) > 4 else "?",
"disk_avail": disk[3] if len(disk) > 3 else "?",
}
except Exception as e:
return {**worker, "online": False, "error": str(e)[:80]}
@app.post("/jobs/{job_id}/cancel")
async def cancel_job(job_id: int):
with closing(db()) as conn:
conn.execute(
"UPDATE jobs SET status='error', error='cancelled by user', finished_at=datetime('now') "
"WHERE id=? AND status IN ('queued','extracting','running')",
(job_id,),
)
return {"ok": True}
@app.post("/jobs/{job_id}/retry")
async def retry_job(job_id: int):
with closing(db()) as conn:
conn.execute(
"UPDATE jobs SET status='queued', error=NULL, progress=0, started_at=NULL, "
"finished_at=NULL, worker_host=NULL WHERE id=? AND status='error'",
(job_id,),
)
return {"ok": True}
@app.post("/stitches/{stitch_id}/cancel")
async def cancel_stitch(stitch_id: int):
with closing(db()) as conn:
conn.execute(
"UPDATE stitches SET status='error', error='cancelled by user', finished_at=datetime('now') "
"WHERE id=? AND status IN ('queued','running')",
(stitch_id,),
)
return {"ok": True}
@app.post("/stitches/{stitch_id}/retry")
async def retry_stitch(stitch_id: int):
with closing(db()) as conn:
conn.execute(
"UPDATE stitches SET status='queued', error=NULL, output_ply=NULL, "
"started_at=NULL, finished_at=NULL, worker_host=NULL WHERE id=? AND status='error'",
(stitch_id,),
)
return {"ok": True}