Files
cosma-nav/extract/inspect_topics.py

48 lines
1.4 KiB
Python

import sys
import glob
from mcap.reader import make_reader
class LargeRecordWrapper:
def __init__(self, fp):
self.fp = fp
def read(self, size=-1):
return self.fp.read(size)
def seek(self, pos, whence=0):
return self.fp.seek(pos, whence)
def tell(self):
return self.fp.tell()
def seekable(self):
return True
def inspect(bag_dir: str):
files = sorted(glob.glob(f"{bag_dir}/*.mcap"))
if not files:
print(f"No MCAP files in {bag_dir}")
return
topics: dict[str, dict] = {}
for path in files:
try:
with open(path, "rb") as fp:
wrapped = LargeRecordWrapper(fp)
reader = make_reader(wrapped)
for schema, channel, message in reader.iter_messages():
t = channel.topic
if t not in topics:
topics[t] = {"type": schema.name if schema else "?", "count": 0, "encoding": channel.message_encoding}
topics[t]["count"] += 1
except Exception as e:
print(f"Warning: error reading {path}: {e}", file=sys.stderr)
continue
print(f"\n{'Topic':<50} {'Type':<45} {'Msgs':>6}")
print("-" * 105)
for t, info in sorted(topics.items()):
print(f"{t:<50} {info['type']:<45} {info['count']:>6}")
if __name__ == "__main__":
inspect(sys.argv[1] if len(sys.argv) > 1 else "data/bags")