feat: extract_mcap_signals — pitch/roll/yaw, altitude, obstacle, battery
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""Extract AUV signals from MCAP files: depth, PWM, state."""
|
"""Extract AUV signals from MCAP files: depth, PWM, state."""
|
||||||
import argparse, glob, json, os, sys
|
import argparse, glob, json, math, os, sys
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
@@ -26,7 +26,16 @@ def main():
|
|||||||
depth_raw = []
|
depth_raw = []
|
||||||
pwm_raw = []
|
pwm_raw = []
|
||||||
state_raw = []
|
state_raw = []
|
||||||
TOPICS = ['/mavros/imu/static_pressure', '/mavros/rc/out', '/mavros/state']
|
signals = {}
|
||||||
|
TOPICS = [
|
||||||
|
'/mavros/imu/static_pressure',
|
||||||
|
'/mavros/rc/out',
|
||||||
|
'/mavros/state',
|
||||||
|
'/mavros/imu/data',
|
||||||
|
'/mavros/altitude',
|
||||||
|
'/mavros/battery',
|
||||||
|
'/mavros/distance_sensor/hrlv_ez4_pub',
|
||||||
|
]
|
||||||
|
|
||||||
for mcap_file in mcap_files:
|
for mcap_file in mcap_files:
|
||||||
try:
|
try:
|
||||||
@@ -53,6 +62,40 @@ def main():
|
|||||||
state_raw.append({'t': t_ms, 'mode': str(ros_msg.mode), 'armed': bool(ros_msg.armed)})
|
state_raw.append({'t': t_ms, 'mode': str(ros_msg.mode), 'armed': bool(ros_msg.armed)})
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
elif topic == '/mavros/imu/data':
|
||||||
|
try:
|
||||||
|
q = ros_msg.orientation
|
||||||
|
sinr = 2*(q.w*q.x + q.y*q.z)
|
||||||
|
cosr = 1 - 2*(q.x*q.x + q.y*q.y)
|
||||||
|
roll = math.degrees(math.atan2(sinr, cosr))
|
||||||
|
sinp = 2*(q.w*q.y - q.z*q.x)
|
||||||
|
pitch = math.degrees(math.asin(max(-1, min(1, sinp))))
|
||||||
|
siny = 2*(q.w*q.z + q.x*q.y)
|
||||||
|
cosy = 1 - 2*(q.y*q.y + q.z*q.z)
|
||||||
|
yaw = math.degrees(math.atan2(siny, cosy))
|
||||||
|
signals.setdefault('pitch', []).append({'t_ms': t_ms, 'v': pitch})
|
||||||
|
signals.setdefault('roll', []).append({'t_ms': t_ms, 'v': roll})
|
||||||
|
signals.setdefault('yaw', []).append({'t_ms': t_ms, 'v': yaw})
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
elif topic == '/mavros/altitude':
|
||||||
|
try:
|
||||||
|
signals.setdefault('altitude', []).append(
|
||||||
|
{'t_ms': t_ms, 'v': ros_msg.relative})
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
elif topic == '/mavros/battery':
|
||||||
|
try:
|
||||||
|
signals.setdefault('battery_v', []).append(
|
||||||
|
{'t_ms': t_ms, 'v': ros_msg.voltage})
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
elif topic == '/mavros/distance_sensor/hrlv_ez4_pub':
|
||||||
|
try:
|
||||||
|
signals.setdefault('obstacle_dist', []).append(
|
||||||
|
{'t_ms': t_ms, 'v': ros_msg.range})
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f" Skip {os.path.basename(mcap_file)}: {e}")
|
print(f" Skip {os.path.basename(mcap_file)}: {e}")
|
||||||
|
|
||||||
@@ -69,7 +112,8 @@ def main():
|
|||||||
pwm_samples = sample(pwm_raw, args.max_pts)
|
pwm_samples = sample(pwm_raw, args.max_pts)
|
||||||
state = state_raw # events, keep all
|
state = state_raw # events, keep all
|
||||||
|
|
||||||
all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw]
|
signals_flat = [pt for pts in signals.values() for pt in pts]
|
||||||
|
all_t = [p['t'] for p in depth_raw + pwm_raw + state_raw] + [p['t_ms'] for p in signals_flat]
|
||||||
t_min = min(all_t) if all_t else 0
|
t_min = min(all_t) if all_t else 0
|
||||||
t_max = max(all_t) if all_t else 0
|
t_max = max(all_t) if all_t else 0
|
||||||
|
|
||||||
@@ -94,6 +138,7 @@ def main():
|
|||||||
'depth': depth,
|
'depth': depth,
|
||||||
'pwm_auv': {'channels': channels, 'samples': pwm_samples},
|
'pwm_auv': {'channels': channels, 'samples': pwm_samples},
|
||||||
'state': state,
|
'state': state,
|
||||||
|
'signals': {k: sample(v, args.max_pts) for k, v in signals.items()},
|
||||||
}
|
}
|
||||||
|
|
||||||
outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
|
outdir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'output')
|
||||||
|
|||||||
Reference in New Issue
Block a user