from __future__ import annotations import asyncio import json import os import sqlite3 from contextlib import asynccontextmanager, closing from datetime import datetime, timezone from pathlib import Path from typing import Any def _fmt_dur(seconds: float) -> str: if seconds is None or seconds < 0: return "" s = int(seconds) if s < 60: return f"{s}s" m, s = divmod(s, 60) if m < 60: return f"{m}m{s:02d}s" if s else f"{m}m" h, m = divmod(m, 60) if h < 24: return f"{h}h{m:02d}m" if m else f"{h}h" d, h = divmod(h, 24) return f"{d}d{h:02d}h" def _parse_ts(s: str | None) -> datetime | None: if not s: return None try: return datetime.fromisoformat(s.replace("Z", "+00:00")) except Exception: return None def _job_duration_s(job: sqlite3.Row) -> int: start = _parse_ts(job["started_at"]) end = _parse_ts(job["finished_at"]) or datetime.now(timezone.utc) if not start: return 0 if start.tzinfo is None: start = start.replace(tzinfo=timezone.utc) if end.tzinfo is None: end = end.replace(tzinfo=timezone.utc) return int((end - start).total_seconds()) from fastapi import FastAPI, Form, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates DB_PATH = Path(os.environ.get("COSMA_QC_DB", "/var/lib/cosma-qc/jobs.db")) WORKERS = json.loads(os.environ.get("COSMA_QC_WORKERS", json.dumps([ {"host": "192.168.0.87", "ssh_alias": "gpu", "gpu": "RTX 3060 12GB"}, {"host": "192.168.0.84", "ssh_alias": "cosma-vm","gpu": "RTX 3090 24GB"}, ]))) STATUSES = ("queued", "extracting", "running", "done", "error") def db() -> sqlite3.Connection: DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH, isolation_level=None) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA foreign_keys=ON") conn.row_factory = sqlite3.Row return conn def init_schema() -> None: with closing(db()) as conn: conn.executescript(""" CREATE TABLE IF NOT EXISTS acquisitions ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, source_path TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY, acquisition_id INTEGER NOT NULL REFERENCES acquisitions(id) ON DELETE CASCADE, auv TEXT NOT NULL, gopro_serial TEXT NOT NULL, segment_label TEXT NOT NULL, video_paths TEXT NOT NULL, frame_count INTEGER, frames_dir TEXT, status TEXT NOT NULL DEFAULT 'queued', worker_host TEXT, viser_url TEXT, ply_path TEXT, progress INTEGER NOT NULL DEFAULT 0, log_tail TEXT, error TEXT, started_at TEXT, finished_at TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS jobs_status_idx ON jobs(status); CREATE INDEX IF NOT EXISTS jobs_acq_idx ON jobs(acquisition_id); CREATE TABLE IF NOT EXISTS stitches ( id INTEGER PRIMARY KEY, acquisition_id INTEGER NOT NULL REFERENCES acquisitions(id) ON DELETE CASCADE, level TEXT NOT NULL DEFAULT 'per_auv', auv TEXT, input_job_ids TEXT NOT NULL DEFAULT '[]', input_stitch_ids TEXT NOT NULL DEFAULT '[]', output_ply TEXT, status TEXT NOT NULL DEFAULT 'queued', worker_host TEXT, started_at TEXT, finished_at TEXT, error TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS stitches_acq_idx ON stitches(acquisition_id); """) @asynccontextmanager async def lifespan(_: FastAPI): init_schema() yield app = FastAPI(title="cosma-qc", lifespan=lifespan) templates = Jinja2Templates(directory=Path(__file__).parent / "templates") app.mount("/static", StaticFiles(directory=Path(__file__).parent / "static"), name="static") def _build_acquisitions(): with closing(db()) as conn: acqs = conn.execute( "SELECT * FROM acquisitions ORDER BY created_at DESC" ).fetchall() jobs = conn.execute( "SELECT * FROM jobs ORDER BY auv, gopro_serial, segment_label" ).fetchall() stitches = conn.execute( "SELECT * FROM stitches ORDER BY level DESC, auv" ).fetchall() by_acq: dict[int, list[dict]] = {} by_acq_total: dict[int, int] = {} for j in jobs: d = dict(j) dur_s = _job_duration_s(j) d["_duration"] = _fmt_dur(dur_s) by_acq.setdefault(j["acquisition_id"], []).append(d) by_acq_total[j["acquisition_id"]] = by_acq_total.get(j["acquisition_id"], 0) + dur_s stitches_by_acq: dict[int, list[dict]] = {} for s in stitches: d = dict(s) start = _parse_ts(s["started_at"]) end = _parse_ts(s["finished_at"]) or ( datetime.now(timezone.utc) if s["status"] == "running" else None ) if start and end: if start.tzinfo is None: start = start.replace(tzinfo=timezone.utc) if end.tzinfo is None: end = end.replace(tzinfo=timezone.utc) d["_duration"] = _fmt_dur(int((end - start).total_seconds())) else: d["_duration"] = "" stitches_by_acq.setdefault(s["acquisition_id"], []).append(d) return [ { "id": acq["id"], "name": acq["name"], "source_path": acq["source_path"], "jobs": by_acq.get(acq["id"], []), "stitches": stitches_by_acq.get(acq["id"], []), "total_duration": _fmt_dur(by_acq_total.get(acq["id"], 0)), } for acq in acqs ] @app.get("/", response_class=HTMLResponse) async def index(request: Request): acquisitions = _build_acquisitions() return templates.TemplateResponse("index.html", { "request": request, "acquisitions": acquisitions, "workers": WORKERS, }) @app.get("/api/jobs") async def list_jobs(): with closing(db()) as conn: rows = conn.execute("SELECT * FROM jobs ORDER BY created_at DESC LIMIT 500").fetchall() return [dict(r) for r in rows] @app.get("/partials/jobs", response_class=HTMLResponse) async def partial_jobs(request: Request): return templates.TemplateResponse( "_jobs_table.html", {"request": request, "acquisitions": _build_acquisitions()}, ) @app.get("/partials/monitor", response_class=HTMLResponse) async def partial_monitor(request: Request): stats = await asyncio.gather(*[_worker_stats(w) for w in WORKERS]) return templates.TemplateResponse("_monitor.html", {"request": request, "workers": stats}) async def _worker_stats(worker: dict) -> dict: alias = worker["ssh_alias"] try: proc = await asyncio.create_subprocess_exec( "ssh", "-o", "ConnectTimeout=3", "-o", "BatchMode=yes", alias, "nvidia-smi --query-gpu=memory.used,memory.total,utilization.gpu --format=csv,noheader,nounits && df -h / | tail -1", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) out, _ = await asyncio.wait_for(proc.communicate(), timeout=4) text = out.decode().strip().splitlines() gpu_line = text[0].split(",") if text else ["?", "?", "?"] disk = text[1].split() if len(text) > 1 else ["?"] * 6 return { **worker, "online": True, "vram_used_mib": int(gpu_line[0].strip()) if gpu_line[0].strip().isdigit() else None, "vram_total_mib": int(gpu_line[1].strip()) if gpu_line[1].strip().isdigit() else None, "gpu_util_pct": int(gpu_line[2].strip()) if gpu_line[2].strip().isdigit() else None, "disk_used_pct": disk[4] if len(disk) > 4 else "?", } except Exception as e: return {**worker, "online": False, "error": str(e)[:80]} @app.post("/jobs/{job_id}/cancel") async def cancel_job(job_id: int): with closing(db()) as conn: conn.execute( "UPDATE jobs SET status='error', error='cancelled by user', finished_at=datetime('now') " "WHERE id=? AND status IN ('queued','extracting','running')", (job_id,), ) return {"ok": True} @app.post("/jobs/{job_id}/retry") async def retry_job(job_id: int): with closing(db()) as conn: conn.execute( "UPDATE jobs SET status='queued', error=NULL, progress=0, started_at=NULL, " "finished_at=NULL, worker_host=NULL WHERE id=? AND status='error'", (job_id,), ) return {"ok": True} @app.post("/stitches/{stitch_id}/cancel") async def cancel_stitch(stitch_id: int): with closing(db()) as conn: conn.execute( "UPDATE stitches SET status='error', error='cancelled by user', finished_at=datetime('now') " "WHERE id=? AND status IN ('queued','running')", (stitch_id,), ) return {"ok": True} @app.post("/stitches/{stitch_id}/retry") async def retry_stitch(stitch_id: int): with closing(db()) as conn: conn.execute( "UPDATE stitches SET status='queued', error=NULL, output_ply=NULL, " "started_at=NULL, finished_at=NULL, worker_host=NULL WHERE id=? AND status='error'", (stitch_id,), ) return {"ok": True}