- NAV viewer v5: grid 2x2 (map + charts), trail slider, play, layer toggles - Datebar: date picker + datalist, fetches /api/data-dates from :8766 - Mission label shows #NN-folder (X sessions) in green or grey - Tools: parse_usv_nav, extract_mcap_signals, extract_usv_pwm, merge_nav_usbl, usbl_to_json, check_sync, parse_kogger_usbl - Vendor: Kogger-Protocol docs
90 lines
3.1 KiB
Python
90 lines
3.1 KiB
Python
#!/usr/bin/env python3
|
|
"""Extract USV PWM signals from navigation log CSVs."""
|
|
import argparse, csv, glob, json, os, re, sys
|
|
from datetime import datetime, timezone
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--nav-dir', required=True)
|
|
args = parser.parse_args()
|
|
|
|
pattern = os.path.join(args.nav_dir, '*_navigation_log.csv')
|
|
csv_files = sorted(glob.glob(pattern))
|
|
if not csv_files:
|
|
print(f"No navigation_log.csv in {args.nav_dir}", file=sys.stderr)
|
|
sys.exit(1)
|
|
print(f"Found {len(csv_files)} nav CSV files")
|
|
|
|
M_data = {}
|
|
RC_data = {}
|
|
|
|
for csv_file in csv_files:
|
|
print(f" Parsing {os.path.basename(csv_file)}")
|
|
try:
|
|
with open(csv_file, 'r') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
ts_str = row.get('timestamp', '').strip()
|
|
data = row.get('data', '').strip()
|
|
val_str = row.get('value', '').strip()
|
|
if not ts_str or not data or not val_str:
|
|
continue
|
|
is_M = re.match(r'^M\d+$', data)
|
|
is_RC = re.match(r'^RC\d+$', data)
|
|
if not is_M and not is_RC:
|
|
continue
|
|
try:
|
|
val = float(val_str)
|
|
except ValueError:
|
|
continue
|
|
try:
|
|
dt = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S.%f')
|
|
except ValueError:
|
|
try:
|
|
dt = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S')
|
|
except ValueError:
|
|
continue
|
|
# CET -> UTC: subtract 3600s
|
|
t_ms = int(dt.timestamp() * 1000) - 3600 * 1000
|
|
pt = {'t': t_ms, 'v': val}
|
|
if is_M:
|
|
M_data.setdefault(data, []).append(pt)
|
|
else:
|
|
RC_data.setdefault(data, []).append(pt)
|
|
except Exception as e:
|
|
print(f" Error {csv_file}: {e}")
|
|
|
|
all_t = []
|
|
for pts in list(M_data.values()) + list(RC_data.values()):
|
|
all_t.extend(p['t'] for p in pts)
|
|
t_min = min(all_t) if all_t else 0
|
|
t_max = max(all_t) if all_t else 0
|
|
|
|
for k in sorted(M_data):
|
|
print(f" {k}: {len(M_data[k])} pts")
|
|
for k in sorted(RC_data):
|
|
print(f" {k}: {len(RC_data[k])} pts")
|
|
|
|
fmt = lambda ms: datetime.fromtimestamp(ms/1000, tz=timezone.utc).isoformat()
|
|
print(f"t_min UTC: {fmt(t_min)}")
|
|
print(f"t_max UTC: {fmt(t_max)}")
|
|
|
|
out = {
|
|
'tz_assumed': 'CET (UTC+1)',
|
|
'tz_converted_to': 'UTC',
|
|
't_min_utc_ms': t_min,
|
|
't_max_utc_ms': t_max,
|
|
'M': M_data,
|
|
'RC': RC_data,
|
|
}
|
|
|
|
outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
|
|
os.makedirs(outdir, exist_ok=True)
|
|
outpath = os.path.join(outdir, 'usv_pwm.json')
|
|
with open(outpath, 'w') as f:
|
|
json.dump(out, f)
|
|
print(f"Written: {outpath}")
|
|
|
|
if __name__ == '__main__':
|
|
main()
|