High-level Python wrapper around the upstream cosma-tech/kogger_acousticAntenna
driver. Configures a Kogger acoustic antenna as a permanent slave transponder
in a single start() call: address filter, echo filter, optional TDMA sync slot,
permanent response window, and Python callbacks for each ping received.
No modification to the upstream driver — only composes existing public methods
in the right order. Snapshot of upstream driver included read-only under driver/
for reference.
Includes:
- transponder_continu.py (302 lines): the wrapper class + CLI
- examples/auv_slave.py (79 lines): usage example with logging
- README.md: design rationale, usage, multi-AUV TDMA, watchdog, hardware wiring
- driver/: snapshot of cosma-tech/kogger_acousticAntenna at commit 1b539f9
('Add index slot for multi pinger', 2025-03-11)
Built for Cosma context (USV master + N AUVs slaves) following the design
conversation in Discord #ping-pong-ping (2026-04-27). See poulpe/ping-pong-ping
on Gitea for the interactive demo of the protocol.
140 lines
5.1 KiB
Python
Executable File
140 lines
5.1 KiB
Python
Executable File
#! /usr/bin/env python
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
from mpl_toolkits.mplot3d import Axes3D
|
|
from matplotlib.animation import FuncAnimation
|
|
from matplotlib.widgets import Slider
|
|
import sys
|
|
|
|
|
|
# --- Thresholds for filtering ---
|
|
DIST_DIFF_THRESHOLD = 20000000000.0
|
|
AZIMUTH_DIFF_THRESHOLD = 20000000000.0
|
|
ELEV_DIFF_THRESHOLD = 20000000000.0
|
|
SNR_DIFF_THRESHOLD = 50000000000.0
|
|
|
|
|
|
def create_animated_plot(filepath):
|
|
"""
|
|
Creates an animated 3D plot of AUV movement with play/pause and speed control.
|
|
"""
|
|
# --- Data Loading and Processing ---
|
|
df = pd.read_csv(filepath, header=None, names=['Timestamp', 'Measurement', 'Value'])
|
|
df_pivot = df.pivot_table(index='Timestamp', columns='Measurement', values='Value', aggfunc='first').reset_index()
|
|
|
|
# --- Filtering based on thresholds ---
|
|
for col in ['Dist_diff', 'Azimuth_diff', 'Elev_diff', 'SNR_diff']:
|
|
if col in df_pivot.columns:
|
|
df_pivot[col] = pd.to_numeric(df_pivot[col])
|
|
df_pivot[col] = df_pivot[col].bfill()
|
|
|
|
if 'Dist_diff' in df_pivot.columns:
|
|
df_pivot = df_pivot[np.abs(df_pivot['Dist_diff']) <= DIST_DIFF_THRESHOLD]
|
|
if 'Azimuth_diff' in df_pivot.columns:
|
|
df_pivot = df_pivot[np.abs(df_pivot['Azimuth_diff']) <= AZIMUTH_DIFF_THRESHOLD]
|
|
if 'Elev_diff' in df_pivot.columns:
|
|
df_pivot = df_pivot[np.abs(df_pivot['Elev_diff']) <= ELEV_DIFF_THRESHOLD]
|
|
if 'SNR_diff' in df_pivot.columns:
|
|
df_pivot = df_pivot[np.abs(df_pivot['SNR_diff']) <= SNR_DIFF_THRESHOLD]
|
|
|
|
try:
|
|
df_pivot = df_pivot.dropna(subset=['Dist', 'Azimuth', 'Elev']).copy()
|
|
except:
|
|
df_pivot = df_pivot.dropna(subset=['Azimuth', 'Elev']).copy()
|
|
|
|
df_pivot = df_pivot.reset_index(drop=True)
|
|
|
|
try:
|
|
df_pivot['Dist'] = pd.to_numeric(df_pivot['Dist'])
|
|
except:
|
|
df_pivot['Dist'] = pd.to_numeric(10000)
|
|
pass
|
|
df_pivot['Azimuth'] = pd.to_numeric(df_pivot['Azimuth'])
|
|
df_pivot['Elev'] = pd.to_numeric(df_pivot['Elev'])
|
|
df_pivot['Azimuth_rad'] = np.deg2rad(df_pivot['Azimuth'])
|
|
df_pivot['Elev_rad'] = np.deg2rad(df_pivot['Elev'])
|
|
df_pivot['x'] = df_pivot['Dist'] * np.cos(df_pivot['Elev_rad']) * np.cos(df_pivot['Azimuth_rad'])
|
|
df_pivot['y'] = df_pivot['Dist'] * np.cos(df_pivot['Elev_rad']) * np.sin(df_pivot['Azimuth_rad'])
|
|
df_pivot['z'] = df_pivot['Dist'] * np.sin(df_pivot['Elev_rad'])
|
|
|
|
# --- Plot Setup ---
|
|
fig = plt.figure(figsize=(12, 10))
|
|
ax = fig.add_subplot(111, projection='3d')
|
|
plt.subplots_adjust(bottom=0.25) # Adjust subplot to make room for slider
|
|
|
|
# Set plot limits
|
|
ax.set_xlim(df_pivot['x'].min() - 1, df_pivot['x'].max() + 1)
|
|
ax.set_ylim(df_pivot['y'].min() - 1, df_pivot['y'].max() + 1)
|
|
ax.set_zlim(df_pivot['z'].min() - 1, df_pivot['z'].max() + 1)
|
|
|
|
# Initialize plot elements
|
|
ax.plot(df_pivot['x'], df_pivot['y'], df_pivot['z'], c='gray', linestyle='--', label='Full Trajectory')
|
|
auv_position, = ax.plot([], [], [], 'ro', markersize=10, label='AUV Position')
|
|
current_trajectory, = ax.plot([], [], [], c='b', label='Current Trajectory')
|
|
usv_position = ax.scatter(0, 0, 0, c='g', marker='^', s=100, label='USV')
|
|
|
|
ax.set_xlabel('X coordinate (meters)')
|
|
ax.set_ylabel('Y coordinate (meters)')
|
|
ax.set_zlabel('Z coordinate (meters)')
|
|
ax.legend()
|
|
|
|
# --- Animation ---
|
|
def update(frame):
|
|
auv_position.set_data([df_pivot['x'][frame]], [df_pivot['y'][frame]])
|
|
auv_position.set_3d_properties([df_pivot['z'][frame]])
|
|
current_trajectory.set_data(df_pivot['x'][:frame+1], df_pivot['y'][:frame+1])
|
|
current_trajectory.set_3d_properties(df_pivot['z'][:frame+1])
|
|
time_label = pd.to_datetime(df_pivot['Timestamp'][frame]).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
|
ax.set_title(f'AUV Movement at: {time_label}')
|
|
return auv_position, current_trajectory
|
|
|
|
fig.ani = FuncAnimation(fig, update, frames=len(df_pivot), blit=False, interval=50)
|
|
|
|
# --- Slider for Interval Control ---
|
|
axcolor = 'lightgoldenrodyellow'
|
|
ax_interval = plt.axes([0.25, 0.1, 0.65, 0.03], facecolor=axcolor)
|
|
s_interval = Slider(
|
|
ax=ax_interval,
|
|
label='Interval (ms)',
|
|
valmin=1,
|
|
valmax=1000,
|
|
valinit=50,
|
|
valstep=10
|
|
)
|
|
|
|
def update_interval(val):
|
|
if fig.ani.event_source:
|
|
fig.ani.event_source.stop()
|
|
|
|
new_interval = int(val)
|
|
fig.ani = FuncAnimation(fig, update, frames=len(df_pivot), blit=False, interval=new_interval)
|
|
fig.canvas.draw_idle()
|
|
|
|
s_interval.on_changed(update_interval)
|
|
fig.s_interval = s_interval
|
|
|
|
# --- Play/Pause Functionality ---
|
|
paused = False
|
|
def toggle_pause(event):
|
|
nonlocal paused
|
|
if event.key == ' ':
|
|
if paused:
|
|
fig.ani.resume()
|
|
else:
|
|
fig.ani.pause()
|
|
paused = not paused
|
|
|
|
fig.canvas.mpl_connect('key_press_event', toggle_pause)
|
|
|
|
plt.show()
|
|
|
|
if __name__ == '__main__':
|
|
if len(sys.argv) != 2:
|
|
print("Usage: python animated_auv_tracker_fixed.py <path_to_data_file>")
|
|
sys.exit(1)
|
|
|
|
filepath = sys.argv[1]
|
|
create_animated_plot(filepath)
|