auto-iter 20260513-2231: GX019817 RoPE skip, 4 PLY done ready for stage06
This commit is contained in:
244
scripts/photomosaic_overlay.py
Executable file
244
scripts/photomosaic_overlay.py
Executable file
@@ -0,0 +1,244 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Photomosaic overlay: place each frame at (east, north, heading) on 2D canvas.
|
||||
|
||||
KISS: cv2 only, running-mean compositing.
|
||||
"""
|
||||
import argparse
|
||||
import csv
|
||||
import glob
|
||||
import math
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
|
||||
def load_traj(path):
|
||||
"""Return list of dicts with frame_idx, east, north, heading."""
|
||||
rows = []
|
||||
with open(path) as f:
|
||||
rdr = csv.DictReader(f)
|
||||
for r in rdr:
|
||||
try:
|
||||
fi = int(r["frame_idx"])
|
||||
except (KeyError, ValueError):
|
||||
continue
|
||||
# Prefer corrected east/north, fallback to raw east_m/north_m
|
||||
if "east_m_corr" in r and r["east_m_corr"] != "":
|
||||
e = float(r["east_m_corr"])
|
||||
n = float(r["north_m_corr"])
|
||||
elif "east_m" in r:
|
||||
e = float(r["east_m"])
|
||||
n = float(r["north_m"])
|
||||
else:
|
||||
continue
|
||||
h = float(r["heading_deg"]) if "heading_deg" in r and r["heading_deg"] != "" else None
|
||||
rows.append({"frame_idx": fi, "east": e, "north": n, "heading": h})
|
||||
return rows
|
||||
|
||||
|
||||
def attach_headings(traj, heading_csv):
|
||||
"""Join heading_deg from secondary CSV by frame_idx (loopclosed CSVs miss heading)."""
|
||||
if all(r["heading"] is not None for r in traj):
|
||||
return traj
|
||||
by_idx = {}
|
||||
with open(heading_csv) as f:
|
||||
rdr = csv.DictReader(f)
|
||||
for r in rdr:
|
||||
try:
|
||||
by_idx[int(r["frame_idx"])] = float(r["heading_deg"])
|
||||
except (KeyError, ValueError):
|
||||
pass
|
||||
for r in traj:
|
||||
if r["heading"] is None:
|
||||
r["heading"] = by_idx.get(r["frame_idx"], 0.0)
|
||||
return traj
|
||||
|
||||
|
||||
def find_frame(frames_dir, frame_idx):
|
||||
"""Try both naming conventions: frame_0001.jpg or frame_00001.jpg."""
|
||||
for digits in (4, 5, 6):
|
||||
p = os.path.join(frames_dir, f"frame_{frame_idx+1:0{digits}d}.jpg")
|
||||
if os.path.exists(p):
|
||||
return p
|
||||
p = os.path.join(frames_dir, f"frame_{frame_idx:0{digits}d}.jpg")
|
||||
if os.path.exists(p):
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--frames-dir", required=True)
|
||||
ap.add_argument("--traj-csv", required=True)
|
||||
ap.add_argument("--heading-csv", default=None, help="Fallback CSV for heading_deg if missing")
|
||||
ap.add_argument("--altitude", type=float, default=1.5)
|
||||
ap.add_argument("--fov-h", type=float, default=122.0)
|
||||
ap.add_argument("--fov-v", type=float, default=80.0)
|
||||
ap.add_argument("--alpha", type=float, default=0.3)
|
||||
ap.add_argument("--sample-every", type=int, default=5)
|
||||
ap.add_argument("--out", required=True)
|
||||
ap.add_argument("--max-canvas-px", type=int, default=4000)
|
||||
ap.add_argument("--heading-sign", type=int, default=-1, help="-1 for clockwise-from-north (default)")
|
||||
args = ap.parse_args()
|
||||
|
||||
t0 = time.time()
|
||||
|
||||
# 1. Load trajectory
|
||||
traj = load_traj(args.traj_csv)
|
||||
if args.heading_csv:
|
||||
traj = attach_headings(traj, args.heading_csv)
|
||||
else:
|
||||
# Auto-fallback: try dvl_full_*.csv next to traj_csv
|
||||
if any(r["heading"] is None for r in traj):
|
||||
base = os.path.basename(args.traj_csv)
|
||||
# Extract video tag like GX039839 / GX019818
|
||||
for token in base.replace(".", "_").split("_"):
|
||||
if token.startswith("GX") and len(token) >= 6:
|
||||
cand = f"/tmp/dvl_full_{token}.csv"
|
||||
if os.path.exists(cand):
|
||||
print(f"[heading] joining from {cand}")
|
||||
traj = attach_headings(traj, cand)
|
||||
break
|
||||
|
||||
if not traj:
|
||||
print("ERROR: empty trajectory", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Sample
|
||||
traj = traj[:: args.sample_every]
|
||||
print(f"[traj] {len(traj)} frames after sampling (every {args.sample_every})")
|
||||
|
||||
# 2. Footprint at altitude
|
||||
fp_w = 2.0 * args.altitude * math.tan(math.radians(args.fov_h / 2.0))
|
||||
fp_h = 2.0 * args.altitude * math.tan(math.radians(args.fov_v / 2.0))
|
||||
print(f"[footprint] {fp_w:.2f}m wide x {fp_h:.2f}m tall (alt={args.altitude}m)")
|
||||
|
||||
# 3. World bbox + margin
|
||||
es = [r["east"] for r in traj]
|
||||
ns = [r["north"] for r in traj]
|
||||
margin = 1.5 * max(fp_w, fp_h)
|
||||
e_min, e_max = min(es) - margin, max(es) + margin
|
||||
n_min, n_max = min(ns) - margin, max(ns) + margin
|
||||
world_w = e_max - e_min
|
||||
world_h = n_max - n_min
|
||||
print(f"[bbox] east [{e_min:.1f},{e_max:.1f}] north [{n_min:.1f},{n_max:.1f}] = {world_w:.1f}m x {world_h:.1f}m")
|
||||
|
||||
# 4. Canvas pixel size
|
||||
ppm = args.max_canvas_px / max(world_w, world_h)
|
||||
canvas_w = int(world_w * ppm)
|
||||
canvas_h = int(world_h * ppm)
|
||||
print(f"[canvas] {canvas_w}x{canvas_h} px ({ppm:.1f} px/m)")
|
||||
|
||||
# Compositing buffers: sum (float32 BGR) + count (int)
|
||||
acc = np.zeros((canvas_h, canvas_w, 3), dtype=np.float32)
|
||||
cnt = np.zeros((canvas_h, canvas_w), dtype=np.int32)
|
||||
|
||||
fp_px_w = max(2, int(fp_w * ppm))
|
||||
fp_px_h = max(2, int(fp_h * ppm))
|
||||
print(f"[footprint-px] {fp_px_w}x{fp_px_h}")
|
||||
|
||||
placed = 0
|
||||
skipped = 0
|
||||
for i, r in enumerate(traj):
|
||||
path = find_frame(args.frames_dir, r["frame_idx"])
|
||||
if not path:
|
||||
skipped += 1
|
||||
continue
|
||||
img = cv2.imread(path)
|
||||
if img is None:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
# Resize image to footprint pixel size first (keep aspect)
|
||||
img_resized = cv2.resize(img, (fp_px_w, fp_px_h), interpolation=cv2.INTER_AREA)
|
||||
|
||||
# Rotate by heading. Convention: heading 0 = north, positive clockwise.
|
||||
# We rotate image so image "up" aligns with north direction.
|
||||
# cv2 rotation positive = counterclockwise → use -heading * sign
|
||||
heading = r["heading"] if r["heading"] is not None else 0.0
|
||||
angle = args.heading_sign * heading # default -1 = clockwise
|
||||
|
||||
# Build canvas the size of the rotated bounding box
|
||||
diag = int(math.ceil(math.sqrt(fp_px_w ** 2 + fp_px_h ** 2)))
|
||||
# Pad to diag x diag for safe rotation
|
||||
pad_h = (diag - fp_px_h) // 2
|
||||
pad_w = (diag - fp_px_w) // 2
|
||||
padded = cv2.copyMakeBorder(
|
||||
img_resized, pad_h, diag - fp_px_h - pad_h,
|
||||
pad_w, diag - fp_px_w - pad_w,
|
||||
cv2.BORDER_CONSTANT, value=0,
|
||||
)
|
||||
# Mask = 1 where valid pixels
|
||||
mask = np.zeros((padded.shape[0], padded.shape[1]), dtype=np.uint8)
|
||||
mask[pad_h:pad_h + fp_px_h, pad_w:pad_w + fp_px_w] = 255
|
||||
|
||||
M = cv2.getRotationMatrix2D((diag / 2, diag / 2), angle, 1.0)
|
||||
rotated = cv2.warpAffine(padded, M, (diag, diag), flags=cv2.INTER_LINEAR, borderValue=0)
|
||||
rotated_mask = cv2.warpAffine(mask, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0)
|
||||
|
||||
# Place at world (east, north).
|
||||
# Canvas: x = east (left→right), y = -north (top→bottom, north up)
|
||||
cx_world = r["east"]
|
||||
cy_world = r["north"]
|
||||
px = int((cx_world - e_min) * ppm)
|
||||
py = int((n_max - cy_world) * ppm) # flip Y for image coords
|
||||
|
||||
# Top-left of paste
|
||||
x0 = px - diag // 2
|
||||
y0 = py - diag // 2
|
||||
x1 = x0 + diag
|
||||
y1 = y0 + diag
|
||||
|
||||
# Clip to canvas
|
||||
cx0 = max(0, x0)
|
||||
cy0 = max(0, y0)
|
||||
cx1 = min(canvas_w, x1)
|
||||
cy1 = min(canvas_h, y1)
|
||||
if cx1 <= cx0 or cy1 <= cy0:
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
sx0 = cx0 - x0
|
||||
sy0 = cy0 - y0
|
||||
sx1 = sx0 + (cx1 - cx0)
|
||||
sy1 = sy0 + (cy1 - cy0)
|
||||
|
||||
sub_img = rotated[sy0:sy1, sx0:sx1].astype(np.float32)
|
||||
sub_mask = rotated_mask[sy0:sy1, sx0:sx1] > 0
|
||||
|
||||
# Running mean: add to acc + increment cnt only where mask
|
||||
acc[cy0:cy1, cx0:cx1][sub_mask] += sub_img[sub_mask]
|
||||
cnt[cy0:cy1, cx0:cx1][sub_mask] += 1
|
||||
placed += 1
|
||||
|
||||
if (i + 1) % 100 == 0:
|
||||
print(f" ... {i+1}/{len(traj)} placed={placed} skipped={skipped}")
|
||||
|
||||
# Finalize: divide by count
|
||||
out = np.zeros_like(acc, dtype=np.uint8)
|
||||
valid = cnt > 0
|
||||
out[valid] = (acc[valid] / cnt[valid, None]).astype(np.uint8)
|
||||
|
||||
# Draw trajectory polyline (thin blue)
|
||||
pts = []
|
||||
for r in traj:
|
||||
px = int((r["east"] - e_min) * ppm)
|
||||
py = int((n_max - r["north"]) * ppm)
|
||||
pts.append((px, py))
|
||||
for i in range(1, len(pts)):
|
||||
cv2.line(out, pts[i - 1], pts[i], (255, 200, 0), 1, cv2.LINE_AA)
|
||||
# Mark start (green) and end (red)
|
||||
if pts:
|
||||
cv2.circle(out, pts[0], 8, (0, 255, 0), 2)
|
||||
cv2.circle(out, pts[-1], 8, (0, 0, 255), 2)
|
||||
|
||||
cv2.imwrite(args.out, out)
|
||||
dt = time.time() - t0
|
||||
print(f"[done] placed={placed} skipped={skipped} canvas={canvas_w}x{canvas_h} time={dt:.1f}s out={args.out}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user