245 lines
8.6 KiB
Python
Executable File
245 lines
8.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Photomosaic overlay: place each frame at (east, north, heading) on 2D canvas.
|
|
|
|
KISS: cv2 only, running-mean compositing.
|
|
"""
|
|
import argparse
|
|
import csv
|
|
import glob
|
|
import math
|
|
import os
|
|
import sys
|
|
import time
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
def load_traj(path):
|
|
"""Return list of dicts with frame_idx, east, north, heading."""
|
|
rows = []
|
|
with open(path) as f:
|
|
rdr = csv.DictReader(f)
|
|
for r in rdr:
|
|
try:
|
|
fi = int(r["frame_idx"])
|
|
except (KeyError, ValueError):
|
|
continue
|
|
# Prefer corrected east/north, fallback to raw east_m/north_m
|
|
if "east_m_corr" in r and r["east_m_corr"] != "":
|
|
e = float(r["east_m_corr"])
|
|
n = float(r["north_m_corr"])
|
|
elif "east_m" in r:
|
|
e = float(r["east_m"])
|
|
n = float(r["north_m"])
|
|
else:
|
|
continue
|
|
h = float(r["heading_deg"]) if "heading_deg" in r and r["heading_deg"] != "" else None
|
|
rows.append({"frame_idx": fi, "east": e, "north": n, "heading": h})
|
|
return rows
|
|
|
|
|
|
def attach_headings(traj, heading_csv):
|
|
"""Join heading_deg from secondary CSV by frame_idx (loopclosed CSVs miss heading)."""
|
|
if all(r["heading"] is not None for r in traj):
|
|
return traj
|
|
by_idx = {}
|
|
with open(heading_csv) as f:
|
|
rdr = csv.DictReader(f)
|
|
for r in rdr:
|
|
try:
|
|
by_idx[int(r["frame_idx"])] = float(r["heading_deg"])
|
|
except (KeyError, ValueError):
|
|
pass
|
|
for r in traj:
|
|
if r["heading"] is None:
|
|
r["heading"] = by_idx.get(r["frame_idx"], 0.0)
|
|
return traj
|
|
|
|
|
|
def find_frame(frames_dir, frame_idx):
|
|
"""Try both naming conventions: frame_0001.jpg or frame_00001.jpg."""
|
|
for digits in (4, 5, 6):
|
|
p = os.path.join(frames_dir, f"frame_{frame_idx+1:0{digits}d}.jpg")
|
|
if os.path.exists(p):
|
|
return p
|
|
p = os.path.join(frames_dir, f"frame_{frame_idx:0{digits}d}.jpg")
|
|
if os.path.exists(p):
|
|
return p
|
|
return None
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--frames-dir", required=True)
|
|
ap.add_argument("--traj-csv", required=True)
|
|
ap.add_argument("--heading-csv", default=None, help="Fallback CSV for heading_deg if missing")
|
|
ap.add_argument("--altitude", type=float, default=1.5)
|
|
ap.add_argument("--fov-h", type=float, default=122.0)
|
|
ap.add_argument("--fov-v", type=float, default=80.0)
|
|
ap.add_argument("--alpha", type=float, default=0.3)
|
|
ap.add_argument("--sample-every", type=int, default=5)
|
|
ap.add_argument("--out", required=True)
|
|
ap.add_argument("--max-canvas-px", type=int, default=4000)
|
|
ap.add_argument("--heading-sign", type=int, default=-1, help="-1 for clockwise-from-north (default)")
|
|
args = ap.parse_args()
|
|
|
|
t0 = time.time()
|
|
|
|
# 1. Load trajectory
|
|
traj = load_traj(args.traj_csv)
|
|
if args.heading_csv:
|
|
traj = attach_headings(traj, args.heading_csv)
|
|
else:
|
|
# Auto-fallback: try dvl_full_*.csv next to traj_csv
|
|
if any(r["heading"] is None for r in traj):
|
|
base = os.path.basename(args.traj_csv)
|
|
# Extract video tag like GX039839 / GX019818
|
|
for token in base.replace(".", "_").split("_"):
|
|
if token.startswith("GX") and len(token) >= 6:
|
|
cand = f"/tmp/dvl_full_{token}.csv"
|
|
if os.path.exists(cand):
|
|
print(f"[heading] joining from {cand}")
|
|
traj = attach_headings(traj, cand)
|
|
break
|
|
|
|
if not traj:
|
|
print("ERROR: empty trajectory", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Sample
|
|
traj = traj[:: args.sample_every]
|
|
print(f"[traj] {len(traj)} frames after sampling (every {args.sample_every})")
|
|
|
|
# 2. Footprint at altitude
|
|
fp_w = 2.0 * args.altitude * math.tan(math.radians(args.fov_h / 2.0))
|
|
fp_h = 2.0 * args.altitude * math.tan(math.radians(args.fov_v / 2.0))
|
|
print(f"[footprint] {fp_w:.2f}m wide x {fp_h:.2f}m tall (alt={args.altitude}m)")
|
|
|
|
# 3. World bbox + margin
|
|
es = [r["east"] for r in traj]
|
|
ns = [r["north"] for r in traj]
|
|
margin = 1.5 * max(fp_w, fp_h)
|
|
e_min, e_max = min(es) - margin, max(es) + margin
|
|
n_min, n_max = min(ns) - margin, max(ns) + margin
|
|
world_w = e_max - e_min
|
|
world_h = n_max - n_min
|
|
print(f"[bbox] east [{e_min:.1f},{e_max:.1f}] north [{n_min:.1f},{n_max:.1f}] = {world_w:.1f}m x {world_h:.1f}m")
|
|
|
|
# 4. Canvas pixel size
|
|
ppm = args.max_canvas_px / max(world_w, world_h)
|
|
canvas_w = int(world_w * ppm)
|
|
canvas_h = int(world_h * ppm)
|
|
print(f"[canvas] {canvas_w}x{canvas_h} px ({ppm:.1f} px/m)")
|
|
|
|
# Compositing buffers: sum (float32 BGR) + count (int)
|
|
acc = np.zeros((canvas_h, canvas_w, 3), dtype=np.float32)
|
|
cnt = np.zeros((canvas_h, canvas_w), dtype=np.int32)
|
|
|
|
fp_px_w = max(2, int(fp_w * ppm))
|
|
fp_px_h = max(2, int(fp_h * ppm))
|
|
print(f"[footprint-px] {fp_px_w}x{fp_px_h}")
|
|
|
|
placed = 0
|
|
skipped = 0
|
|
for i, r in enumerate(traj):
|
|
path = find_frame(args.frames_dir, r["frame_idx"])
|
|
if not path:
|
|
skipped += 1
|
|
continue
|
|
img = cv2.imread(path)
|
|
if img is None:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Resize image to footprint pixel size first (keep aspect)
|
|
img_resized = cv2.resize(img, (fp_px_w, fp_px_h), interpolation=cv2.INTER_AREA)
|
|
|
|
# Rotate by heading. Convention: heading 0 = north, positive clockwise.
|
|
# We rotate image so image "up" aligns with north direction.
|
|
# cv2 rotation positive = counterclockwise → use -heading * sign
|
|
heading = r["heading"] if r["heading"] is not None else 0.0
|
|
angle = args.heading_sign * heading # default -1 = clockwise
|
|
|
|
# Build canvas the size of the rotated bounding box
|
|
diag = int(math.ceil(math.sqrt(fp_px_w ** 2 + fp_px_h ** 2)))
|
|
# Pad to diag x diag for safe rotation
|
|
pad_h = (diag - fp_px_h) // 2
|
|
pad_w = (diag - fp_px_w) // 2
|
|
padded = cv2.copyMakeBorder(
|
|
img_resized, pad_h, diag - fp_px_h - pad_h,
|
|
pad_w, diag - fp_px_w - pad_w,
|
|
cv2.BORDER_CONSTANT, value=0,
|
|
)
|
|
# Mask = 1 where valid pixels
|
|
mask = np.zeros((padded.shape[0], padded.shape[1]), dtype=np.uint8)
|
|
mask[pad_h:pad_h + fp_px_h, pad_w:pad_w + fp_px_w] = 255
|
|
|
|
M = cv2.getRotationMatrix2D((diag / 2, diag / 2), angle, 1.0)
|
|
rotated = cv2.warpAffine(padded, M, (diag, diag), flags=cv2.INTER_LINEAR, borderValue=0)
|
|
rotated_mask = cv2.warpAffine(mask, M, (diag, diag), flags=cv2.INTER_NEAREST, borderValue=0)
|
|
|
|
# Place at world (east, north).
|
|
# Canvas: x = east (left→right), y = -north (top→bottom, north up)
|
|
cx_world = r["east"]
|
|
cy_world = r["north"]
|
|
px = int((cx_world - e_min) * ppm)
|
|
py = int((n_max - cy_world) * ppm) # flip Y for image coords
|
|
|
|
# Top-left of paste
|
|
x0 = px - diag // 2
|
|
y0 = py - diag // 2
|
|
x1 = x0 + diag
|
|
y1 = y0 + diag
|
|
|
|
# Clip to canvas
|
|
cx0 = max(0, x0)
|
|
cy0 = max(0, y0)
|
|
cx1 = min(canvas_w, x1)
|
|
cy1 = min(canvas_h, y1)
|
|
if cx1 <= cx0 or cy1 <= cy0:
|
|
skipped += 1
|
|
continue
|
|
|
|
sx0 = cx0 - x0
|
|
sy0 = cy0 - y0
|
|
sx1 = sx0 + (cx1 - cx0)
|
|
sy1 = sy0 + (cy1 - cy0)
|
|
|
|
sub_img = rotated[sy0:sy1, sx0:sx1].astype(np.float32)
|
|
sub_mask = rotated_mask[sy0:sy1, sx0:sx1] > 0
|
|
|
|
# Running mean: add to acc + increment cnt only where mask
|
|
acc[cy0:cy1, cx0:cx1][sub_mask] += sub_img[sub_mask]
|
|
cnt[cy0:cy1, cx0:cx1][sub_mask] += 1
|
|
placed += 1
|
|
|
|
if (i + 1) % 100 == 0:
|
|
print(f" ... {i+1}/{len(traj)} placed={placed} skipped={skipped}")
|
|
|
|
# Finalize: divide by count
|
|
out = np.zeros_like(acc, dtype=np.uint8)
|
|
valid = cnt > 0
|
|
out[valid] = (acc[valid] / cnt[valid, None]).astype(np.uint8)
|
|
|
|
# Draw trajectory polyline (thin blue)
|
|
pts = []
|
|
for r in traj:
|
|
px = int((r["east"] - e_min) * ppm)
|
|
py = int((n_max - r["north"]) * ppm)
|
|
pts.append((px, py))
|
|
for i in range(1, len(pts)):
|
|
cv2.line(out, pts[i - 1], pts[i], (255, 200, 0), 1, cv2.LINE_AA)
|
|
# Mark start (green) and end (red)
|
|
if pts:
|
|
cv2.circle(out, pts[0], 8, (0, 255, 0), 2)
|
|
cv2.circle(out, pts[-1], 8, (0, 0, 255), 2)
|
|
|
|
cv2.imwrite(args.out, out)
|
|
dt = time.time() - t0
|
|
print(f"[done] placed={placed} skipped={skipped} canvas={canvas_w}x{canvas_h} time={dt:.1f}s out={args.out}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|