Files
Poulpe 9a158f5c5f Initial: ContinuousTransponder wrapper for Kogger USBL
High-level Python wrapper around the upstream cosma-tech/kogger_acousticAntenna
driver. Configures a Kogger acoustic antenna as a permanent slave transponder
in a single start() call: address filter, echo filter, optional TDMA sync slot,
permanent response window, and Python callbacks for each ping received.

No modification to the upstream driver — only composes existing public methods
in the right order. Snapshot of upstream driver included read-only under driver/
for reference.

Includes:
- transponder_continu.py (302 lines): the wrapper class + CLI
- examples/auv_slave.py (79 lines): usage example with logging
- README.md: design rationale, usage, multi-AUV TDMA, watchdog, hardware wiring
- driver/: snapshot of cosma-tech/kogger_acousticAntenna at commit 1b539f9
  ('Add index slot for multi pinger', 2025-03-11)

Built for Cosma context (USV master + N AUVs slaves) following the design
conversation in Discord #ping-pong-ping (2026-04-27). See poulpe/ping-pong-ping
on Gitea for the interactive demo of the protocol.
2026-04-27 22:08:44 +00:00

80 lines
2.3 KiB
Python

#!/usr/bin/env python3
"""
Exemple : un AUV slave Kogger en mode transpondeur continu.
Le master USV envoie des pings adressés. Cet AUV répond automatiquement, logge
chaque ping reçu (timestamp, SNR, distance hardware), et publie en parallèle
sur stdout. Aucun arming par ping requis.
Lancement :
python3 auv_slave.py --port /dev/ttyUSB0 --address 2 --vehicle AUV-2
Pour intégrer dans un nœud ROS2 / NATS / MQTT, remplacer `on_ping` ci-dessous
par un publisher.
"""
import argparse
import time
import sys
sys.path.insert(0, "..") # rend transponder_continu.py importable depuis examples/
from transponder_continu import ContinuousTransponder, SyncSlot
def main():
p = argparse.ArgumentParser()
p.add_argument("--port", required=True)
p.add_argument("--address", type=int, required=True)
p.add_argument("--vehicle", default="AUV")
p.add_argument("--slot-total", type=int, default=0,
help="Si >0 : enrôle l'AUV dans un slot TDMA partagé.")
p.add_argument("--slot-duration", type=float, default=2.0)
args = p.parse_args()
sync = None
if args.slot_total > 0:
sync = SyncSlot(
slot_total=args.slot_total,
slot_index=args.address - 1, # convention : slot index = address - 1
slot_duration=args.slot_duration,
)
t = ContinuousTransponder(
port=args.port,
my_address=args.address,
vehicle_name=args.vehicle,
sync=sync,
watchdog_timeout_s=15.0, # re-arm si silence > 15 s (USV down ?)
)
pings_log = []
def on_ping(msg):
rec = {
"t_local": time.time(),
"id": msg.get("id"),
"snr": msg.get("snr"),
"distance_m": t.state.last_distance_m,
"raw": msg,
}
pings_log.append(rec)
print(
f"[{rec['t_local']:.3f}] ping #{t.state.pings_received} "
f"id={rec['id']} snr={rec['snr']} "
f"d={rec['distance_m']:.2f}m" if rec['distance_m'] else "d=—"
)
t.on_ping_received(on_ping)
print(f"=== {args.vehicle} en transpondeur continu, address={args.address} ===")
t.start()
try:
t.run_forever()
finally:
t.stop()
print(f"\nFin de session. {len(pings_log)} pings reçus au total.")
if __name__ == "__main__":
main()