ROS 2 adapter
Reads rosbag2 directories (sqlite3 + mcap backends) and turns them
into RoboTrace episodes. No rclpy install required - the adapter is
backed by the pure-Python rosbags
library, so it runs on macOS dev boxes and in CI without a ROS 2
distro.
from robotrace.adapters import ros2
ros2.upload_bag(
"./run_2026-05-08/",
name="warmup pick-and-place",
policy_version="pap-v3.2.1",
env_version="halcyon-cell-rev4",
git_sha="abc1234",
)That's the whole 95% case. Read on for the three explicit verbs, topic auto-classification, multi-camera handling, and the install matrix.
Install
# Sensor- / action-only bags. No image topics.
pip install 'robotrace-dev[ros2]==0.1.0a6'
# With image-topic → MP4 encoding (most cases).
pip install 'robotrace-dev[ros2,video]==0.1.0a6'The pin is the most reliable install during alpha and drops once
we cut 1.0.
[ros2] pulls in rosbags and numpy. [video] adds
opencv-python for encoding sensor_msgs/Image (and
CompressedImage) topics into a single video.mp4. The split is on
purpose - a sensor-only bag shouldn't pay opencv's install cost. If
you call upload_bag on a bag that has image topics without [video]
installed, the adapter raises ConfigurationError pointing at the
right pip install line.
ROS 1 (rosbag1) is out of scope - RoboTrace is ROS 2 only.
The three verbs
| Verb | What it does |
|---|---|
ros2.scan_bag(path) | Read-only introspection. Returns a BagSummary with topic catalog, classifier decisions, and bag duration. No files written, no network. |
ros2.encode_bag(path, out) | Walks the bag once, writes video.mp4, sensors.npz, actions.npz into out. Returns an EncodedBag with the file paths and discovered duration_s / fps. No network. |
ros2.upload_bag(path, ...) | One-shot: scan → encode to a tempdir → start_episode + upload_* + finalize. Cleans the tempdir on return. Returns the finalized Episode. |
scan_bag is the dry-run - most users start there to confirm the
classifier picked the right topics before paying for the full encode.
summary = ros2.scan_bag("./run_2026-05-08/")
print(summary.report())
# /Users/.../run_2026-05-08 47.20s, 1842 messages
# video:
# /camera/image_raw (sensor_msgs/msg/Image, 472 msg, via msgtype)
# sensors:
# /joint_states (sensor_msgs/msg/JointState, 944 msg, via default)
# actions:
# /cmd_vel (geometry_msgs/msg/Twist, 426 msg, via msgtype)If everything looks right, swap scan_bag for upload_bag and you're
done.
Topic auto-classification
Every connection in the bag is sorted into one of three slots based on its message type and (as a fallback) its topic name. Rules apply in this order - first match wins:
| Rule | → Slot |
|---|---|
sensor_msgs/Image, sensor_msgs/CompressedImage | video |
geometry_msgs/Twist, TwistStamped, Wrench, WrenchStamped, trajectory_msgs/JointTrajectory, MultiDOFJointTrajectory, control_msgs/JointJog | actions |
Topic name ends in /cmd_* or /command (catches custom command messages) | actions |
| Anything else | sensors |
Override per-slot if the heuristic is wrong for your bag:
ros2.upload_bag(
"./run_2026-05-08/",
video_topics=["/cameras/wrist/image_color"],
sensor_topics=["/joint_states", "/wrench"],
action_topics=["/teleop/joy_cmd"],
policy_version="pap-v3.2.1",
)Pass an empty list (video_topics=[]) to deliberately exclude a
slot - useful for bags where you only care about sensor traces.
Multi-camera bags
When more than one image topic is present, the adapter tiles the
frames horizontally into a single video.mp4. Heights are
black-padded so cameras with different resolutions still align. Topic
order is alphabetical so the same bag always produces the same
mosaic.
If you only want one camera, pass canonical_video_topic:
ros2.upload_bag(
"./bimanual_run/",
canonical_video_topic="/cameras/overhead/image_color",
policy_version="pap-v3.2.1",
)The mosaic's frame rate is computed from the median inter-frame delta of whichever topic has the most frames; if it can't be computed (single-frame topic, all timestamps equal) the encoder falls back to 10 fps so the resulting MP4 is obviously a placeholder rather than silently wrong.
How sensors / actions get packed
Each non-image topic contributes a set of arrays into a single NPZ file per slot. Layout uses the topic name as a namespace so a single NPZ can hold many heterogeneous streams without clobbering keys:
sensors.npz
/joint_states/_t_ns int64[N] # nanosecond timestamps
/joint_states/position float32[N, 6]
/joint_states/velocity float32[N, 6]
/joint_states/effort float32[N, 6]
/imu/_t_ns int64[M]
/imu/orientation float32[M, 4]
/imu/angular_velocity float32[M, 3]
/imu/linear_acceleration float32[M, 3]Well-known message types get clean field names from a built-in flattener registry:
| Message type | Fields packed |
|---|---|
sensor_msgs/JointState | position, velocity, effort |
sensor_msgs/Imu | orientation, angular_velocity, linear_acceleration |
geometry_msgs/Twist[Stamped] | linear, angular |
geometry_msgs/Wrench[Stamped] | force, torque |
geometry_msgs/PoseStamped | position, orientation |
nav_msgs/Odometry | position, orientation, linear_velocity, angular_velocity |
Anything not on that list gets the generic flattener: it walks
the dataclass and packs every numeric scalar / fixed-length numeric
field. Strings, variable-length nested arrays, and opaque blobs
(PointCloud2, image-shaped buffers) are dropped silently. If a
field's shape changes mid-bag - e.g. a joint count flip - the
encoder records the topic in
metadata.skipped_topics so you can spot it in the portal instead of
shipping silently corrupt data.
Episode metadata
The adapter merges its own metadata with anything you pass:
ros2.upload_bag(
"./run/",
policy_version="pap-v3.2.1",
metadata={"task": "pick_and_place", "operator": "alice"},
)Lands on the episode as:
{
"adapter": "ros2",
"bag": "/Users/.../run",
"skipped_topics": [],
"task": "pick_and_place",
"operator": "alice"
}Your keys win on collision.
Encode-then-handle-it-yourself
encode_bag exposes the artifacts as files so you can inspect or
post-process before uploading. Useful when you want to splice frames,
re-bucket sensor topics, or stage a long-running upload:
encoded = ros2.encode_bag("./run/", "/tmp/encoded/")
print(encoded.duration_s, encoded.fps)
# 47.2 30.0
print([a.path for a in encoded.artifacts()])
# [PosixPath('/tmp/encoded/video.mp4'),
# PosixPath('/tmp/encoded/sensors.npz'),
# PosixPath('/tmp/encoded/actions.npz')]Then drive start_episode / upload_* directly - same plumbing
upload_bag uses internally. The adapter goes through the lower-level
start_episode path (not log_episode)
because log_episode rejects .npz files in the actions= slot
(its extension check guesses sensors); the adapter knows what it
wrote and bypasses that validation.
Why not the live record(topics=[...]) shape?
That's coming next in robotrace 0.2. The targeted shape:
from robotrace.adapters import ros2
with ros2.record(
topics=["/camera/image_raw", "/joint_states", "/cmd_vel"],
policy_version="pap-v3.2.1",
env_version="halcyon-cell-rev4",
git_sha="abc1234",
) as run:
run.wait_for_done() # or your existing rclpy spin loop
# auto-uploads on exitLive recording needs rclpy, which means the install stops being
distro-agnostic and starts depending on which ROS 2 distro
(humble, jazzy, …) you run. The plan is for ros2.record(...) to
write to a tempdir using rosbag2_py.SequentialWriter and then call
this very encode_bag / upload_bag plumbing on close - same
encoder, different driver. Until it ships, the
pip install 'robotrace-dev[ros2]==0.1.0a6' path stays
zero-rclpy.
Errors
| Exception | When |
|---|---|
ConfigurationError | path doesn't exist, isn't a directory, has no metadata.yaml, or rosbags / cv2 aren't installed |
AuthError | API key bad / revoked (raised by the underlying start_episode) |
ValidationError | Server rejected the create payload |
TransportError | Network / DNS / timeout during the create or upload |
If an upload fails partway through, the adapter (via
Client.start_episode's standard handling) flips the run to
status="failed" with the failure reason in
metadata.failure_reason before re-raising - so you don't end up
with ghostly "recording" runs in the portal.