feat: inspect_usv_csv — inspecte colonnes navigation_log + header USBL Kogger
This commit is contained in:
56
extract/inspect_usv_csv.py
Normal file
56
extract/inspect_usv_csv.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import sys
|
||||
import csv
|
||||
import ast
|
||||
import glob
|
||||
from collections import Counter
|
||||
|
||||
def inspect_nav(csv_path: str, n=500):
|
||||
data_vals = Counter()
|
||||
rows = 0
|
||||
with open(csv_path, newline="", encoding="utf-8", errors="replace") as f:
|
||||
reader = csv.DictReader(f)
|
||||
print(f"Columns: {reader.fieldnames}")
|
||||
for row in reader:
|
||||
rows += 1
|
||||
if rows <= n:
|
||||
data_vals[row.get("data", "?")] += 1
|
||||
print(f"\nTotal rows: {rows}")
|
||||
print("\nUnique 'data' field values (first 500 rows):")
|
||||
for k, v in data_vals.most_common(30):
|
||||
print(f" {k}: {v}")
|
||||
|
||||
def inspect_usbl(csv_path: str, n=10):
|
||||
with open(csv_path, newline="", encoding="utf-8", errors="replace") as f:
|
||||
reader = csv.reader(f)
|
||||
print(f"Columns: [timestamp, direction, message]")
|
||||
received = 0
|
||||
for row in reader:
|
||||
if len(row) < 3:
|
||||
continue
|
||||
direction = row[1].strip().upper()
|
||||
if direction != "RECEIVED":
|
||||
continue
|
||||
received += 1
|
||||
if received > n:
|
||||
break
|
||||
try:
|
||||
raw = ast.literal_eval(row[2].strip())
|
||||
if isinstance(raw, bytes):
|
||||
print(f" ts={row[0]} len={len(raw)} hex_start={raw[:16].hex()}")
|
||||
except Exception as e:
|
||||
print(f" ts={row[0]} parse_error={e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
ship_dir = sys.argv[1]
|
||||
nav_files = glob.glob(f"{ship_dir}/*_navigation_log.csv")
|
||||
usbl_files = glob.glob(f"{ship_dir}/*_usbl.csv")
|
||||
if nav_files:
|
||||
print(f"\n=== NAV LOG: {nav_files[0]} ===")
|
||||
inspect_nav(nav_files[0])
|
||||
if usbl_files:
|
||||
print(f"\n=== USBL (first 10 RECEIVED): {usbl_files[0]} ===")
|
||||
inspect_usbl(usbl_files[0])
|
||||
if not nav_files and not usbl_files:
|
||||
print(f"No CSV files found in {ship_dir}")
|
||||
import os
|
||||
print("Files found:", os.listdir(ship_dir))
|
||||
Reference in New Issue
Block a user