Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,335 changes: 825 additions & 510 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ members = [
"crates/ros-z-msgs",
"crates/ros-z-tests",
"crates/ros-z-console",
"crates/ros-z-cli",
"crates/ros-z-bridge",
"crates/ros-z/examples/protobuf_demo",
]
Expand Down
73 changes: 39 additions & 34 deletions book/src/chapters/custom_messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

ros-z supports two approaches for defining custom message types:

| Approach | Definition | Best For |
|----------|------------|----------|
| **Rust-Native** | Write Rust structs directly | Prototyping, ros-z-only systems |
| **Schema-Generated** | Write `.msg`/`.srv` files, generate Rust | Production, ROS 2 interop |
| Approach | Definition | Best For |
| -------------------- | ---------------------------------------- | ------------------------------- |
| **Rust-Native** | Write Rust structs directly | Prototyping, ros-z-only systems |
| **Schema-Generated** | Write `.msg`/`.srv` files, generate Rust | Production, ROS 2 interop |

```mermaid
flowchart TD
Expand All @@ -21,58 +21,60 @@ flowchart TD

## Rust-Native Messages

**Define messages directly in Rust by implementing required traits.** This approach is fast for prototyping but only works between ros-z nodes.
**Define messages directly in Rust and derive their schema metadata.** This approach is fast for prototyping, and plain named structs can still participate in the standard ROS 2 type description service.

```admonish warning
Rust-Native messages use `TypeHash::zero()` and won't interoperate with ROS 2 C++/Python nodes.
Rust-native messages defined with `#[derive(MessageTypeInfo)]` are limited to ROS 2 schema-compatible shapes. If you need `Option<T>`, enums, or other ros-z-only schema extensions, use `#[derive(ExtendedMessageTypeInfo)]` plus the parallel extended type description service instead of the standard ROS 2 type description service.
```

### Workflow of Rust-Native Messages

```mermaid
graph LR
A[Define Struct] --> B[Impl MessageTypeInfo]
A[Define Struct] --> B[Derive MessageTypeInfo]
B --> C[Add Serde Traits]
C --> D[Impl WithTypeInfo]
C --> D[Impl ZMessage]
D --> E[Use in Pub/Sub]
```

### Required Traits

| Trait | Purpose | Key Method |
|-------|---------|------------|
| **MessageTypeInfo** | Type identification | `type_name()`, `type_hash()` |
| **WithTypeInfo** | ros-z integration | `type_info()` |
| **Serialize/Deserialize** | Data encoding | From `serde` |
| Trait | Purpose | Key Method |
| ------------------------- | ------------------------------------ | ------------------------------------------------ |
| **MessageTypeInfo** | Type identification + runtime schema | `type_name()`, `type_hash()`, `message_schema()` |
| **Serialize/Deserialize** | Data encoding | From `serde` |
| **ZMessage** | ros-z serialization path | `type Serdes = SerdeCdrSerdes<Self>` |

### Message Example

```rust,ignore
use ros_z::{MessageTypeInfo, entity::TypeHash};
use ros_z::ros_msg::WithTypeInfo;
use ros_z::MessageTypeInfo;
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, MessageTypeInfo)]
#[ros_msg(type_name = "my_msgs/msg/RobotStatus")]
struct RobotStatus {
battery_level: f32,
position_x: f32,
position_y: f32,
is_moving: bool,
}

impl MessageTypeInfo for RobotStatus {
fn type_name() -> &'static str {
"my_msgs::msg::dds_::RobotStatus_"
}

fn type_hash() -> TypeHash {
TypeHash::zero() // ros-z-to-ros-z only
}
impl ros_z::msg::ZMessage for RobotStatus {
type Serdes = ros_z::msg::SerdeCdrSerdes<Self>;
}

impl WithTypeInfo for RobotStatus {}
```

`MessageTypeInfo` derive in core `ros-z` intentionally supports only ROS 2 schema-compatible named
structs: primitive numeric/bool types, `String`, `Vec<T>`, fixed arrays `[T; N]`, and nested
message types. Tuple structs, unit structs, enums, `Option<T>`, maps, `usize`, `isize`, and other
Rust-only shapes are rejected at compile time.

For richer serde shapes such as `Option<T>` or enums, use `ros_z::ExtendedMessageTypeInfo`.
It keeps normal `message_schema()` support for types that are still ROS 2 compatible, and uses a
separate `~get_extended_type_description` service for extended-only schemas when the publisher node
is created with `.with_extended_type_description_service()`.

### Service Example

```rust,ignore
Expand Down Expand Up @@ -105,6 +107,9 @@ impl ZService for NavigateTo {

See the `z_custom_message` example:

When a publisher node enables the type description service, derived custom message types
automatically register their runtime schema so dynamic subscribers can discover them.

```bash
# Terminal 1: Router
cargo run --example zenoh_router
Expand Down Expand Up @@ -272,14 +277,14 @@ ROS_Z_MSG_PATH="./my_robot_msgs" cargo build

## Comparison

| Feature | Rust-Native | Schema-Generated |
|---------|-------------|------------------|
| **Definition** | Rust structs | `.msg`/`.srv` files |
| **Type Hashes** | `TypeHash::zero()` | Proper RIHS01 hashes |
| **Standard Type Refs** | Manual | Automatic (`geometry_msgs`, etc.) |
| **ROS 2 Interop** | No | Partial (messages yes, services limited) |
| **Setup Complexity** | Low | Medium (build.rs required) |
| **Best For** | Prototyping | Production |
| Feature | Rust-Native | Schema-Generated |
| ---------------------- | ------------------ | ---------------------------------------- |
| **Definition** | Rust structs | `.msg`/`.srv` files |
| **Type Hashes** | `TypeHash::zero()` | Proper RIHS01 hashes |
| **Standard Type Refs** | Manual | Automatic (`geometry_msgs`, etc.) |
| **ROS 2 Interop** | No | Partial (messages yes, services limited) |
| **Setup Complexity** | Low | Medium (build.rs required) |
| **Best For** | Prototyping | Production |

---

Expand Down
6 changes: 6 additions & 0 deletions book/src/chapters/keyexpr_formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ robot/sensors/camera/** # Topic /robot/sensors/camera
- Using `zenoh-bridge-ros2dds`
- Integrating with CycloneDDS or FastDDS nodes via Zenoh

**Discovery consequences:**

- `Ros2Dds` graph discovery can identify publishers/subscribers/services by topic or service name, but its liveliness data does not provide publisher node identity.
- Topic-based graph helpers such as publisher/subscriber matching work with `Ros2Dds`.
- Node-based discovery and automatic schema discovery via `create_dyn_sub_auto()` are not supported with `Ros2Dds`, because the publishing node cannot be identified from discovery data.

## Key Expression Behavior (IMPORTANT)

Understanding how topic names are converted to key expressions is critical for debugging:
Expand Down
1 change: 1 addition & 0 deletions crates/rmw-zenoh-rs/src/rmw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,7 @@ pub extern "C" fn rmw_create_service(

let service_impl = crate::service::ServiceImpl {
inner: zserver,
pending: std::collections::HashMap::new(),
service_name: service_name_cstr,
request_ts: service_type_support,
response_ts: service_type_support,
Expand Down
127 changes: 38 additions & 89 deletions crates/rmw-zenoh-rs/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::ffi::CString;
use std::sync::Mutex;
use std::sync::atomic::{AtomicI64, Ordering};
Expand Down Expand Up @@ -89,15 +90,11 @@ impl ClientImpl {

tracing::debug!(
"[ClientImpl::take_response] Attempting to take response, rx has {} items",
if self.inner.rx().is_empty() {
0
} else {
self.inner.rx().len()
}
if self.inner.rmw_has_responses() { 1 } else { 0 }
);

// Try to receive a response
if let Ok(sample) = self.inner.rx().try_recv() {
if let Some(sample) = self.inner.rmw_try_take_response_sample()? {
tracing::debug!("[ClientImpl::take_response] Got response sample");

let payload = sample.payload();
Expand Down Expand Up @@ -172,6 +169,8 @@ impl ClientImpl {
/// Service implementation for RMW
pub struct ServiceImpl {
pub inner: ros_z::service::ZServer<crate::msg::RosService>,
pub pending:
HashMap<ros_z::service::RequestId, ros_z::service::ServiceReply<crate::msg::RosService>>,
pub service_name: CString,
pub request_ts: crate::type_support::ServiceTypeSupport,
pub response_ts: crate::type_support::ServiceTypeSupport,
Expand All @@ -195,69 +194,23 @@ impl ServiceImpl {
*taken = false;
}

// Try to receive a request from the raw receiver
if let Some(query) = self.inner.try_queue().and_then(|q| q.try_recv()) {
// Get the payload bytes
let bytes = if let Some(payload) = query.payload() {
payload.to_bytes().to_vec()
} else {
return Ok(());
};

// Extract attachment from query to get GID, sequence number, and timestamp
let key = if let Some(attachment_bytes) = query.attachment() {
match ros_z::attachment::Attachment::try_from(attachment_bytes) {
Ok(attachment) => {
let key: ros_z::service::QueryKey = attachment.into();
tracing::debug!(
"[ServiceImpl::take_request] Got request with sn: {}, gid: {:?}",
key.sn,
key.gid
);
key
}
Err(e) => {
tracing::warn!("Failed to extract attachment from query: {}", e);
// Fallback to placeholder
ros_z::service::QueryKey {
gid: [0u8; 16],
sn: 0i64,
}
}
}
} else {
tracing::warn!("No attachment in query, using placeholder QueryKey");
ros_z::service::QueryKey {
gid: [0u8; 16],
sn: 0i64,
}
};

// Extract timestamp from attachment
let source_timestamp = if let Some(attachment_bytes) = query.attachment() {
match ros_z::attachment::Attachment::try_from(attachment_bytes) {
Ok(attachment) => attachment.source_timestamp,
Err(_) => 0,
}
} else {
0
};
if let Some(request_ctx) = self.inner.try_take_request()? {
let request_id = request_ctx.id().clone();
let source_timestamp = 0;
let (request_msg, reply) = request_ctx.into_parts();
let bytes = request_msg.0;

// Set received_timestamp to current time
let received_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |v| v.as_nanos() as i64);

// Store the query for later response
// Store the reply context for later response
tracing::debug!(
"[ServiceImpl::take_request] Storing query with key sn:{}, inserting into map",
key.sn
);
self.inner.map_insert(key.clone(), query);
tracing::debug!(
"[ServiceImpl::take_request] Map now has {} entries",
self.inner.map_len()
"[ServiceImpl::take_request] Storing reply context with sn:{}",
request_id.sequence_number
);
self.pending.insert(request_id.clone(), reply);

// Deserialize into the provided request buffer using request MessageTypeSupport
unsafe {
Expand All @@ -269,9 +222,8 @@ impl ServiceImpl {
// Fill request_header with sequence info and timestamps
if !request_header.is_null() {
unsafe {
(*request_header).request_id.sequence_number = key.sn;
// Copy GID from key
for (i, &byte) in key.gid.iter().enumerate() {
(*request_header).request_id.sequence_number = request_id.sequence_number;
for (i, &byte) in request_id.writer_guid.iter().enumerate() {
if i < 16 {
(*request_header).request_id.writer_guid[i] = byte;
}
Expand All @@ -293,42 +245,39 @@ impl ServiceImpl {
request_header: *const rmw_request_id_t,
response: *const c_void,
) -> Result<()> {
// Extract QueryKey from request_header
let key = unsafe {
let request_id = unsafe {
let mut gid = [0u8; 16];
gid.copy_from_slice(&(*request_header).writer_guid);
ros_z::service::QueryKey {
gid,
sn: (*request_header).sequence_number,
ros_z::service::RequestId {
writer_guid: gid,
sequence_number: (*request_header).sequence_number,
}
};

tracing::debug!(
"[ServiceImpl::send_response] Sending response for key sn:{}, gid:{:?}",
key.sn,
key.gid
);
tracing::debug!(
"[ServiceImpl::send_response] Map has {} entries before send_response",
self.inner.map_len()
request_id.sequence_number,
request_id.writer_guid
);

// Create RosMessage Response from the raw pointer using response MessageTypeSupport
let resp = crate::msg::RosMessage::new(response, self.response_ts.response);

// Send response
match self.inner.send_response(&resp, &key) {
Ok(_) => {
tracing::debug!("[ServiceImpl::send_response] Response sent successfully");
Ok(())
}
Err(e) => {
tracing::error!(
"[ServiceImpl::send_response] Failed to send response: {}",
e
);
Err(e)
}
match self.pending.remove(&request_id) {
Some(reply) => match reply.reply_blocking(&resp) {
Ok(_) => {
tracing::debug!("[ServiceImpl::send_response] Response sent successfully");
Ok(())
}
Err(e) => {
tracing::error!(
"[ServiceImpl::send_response] Failed to send response: {}",
e
);
Err(e)
}
},
None => Err(zenoh::Error::from("Pending request not found")),
}
}
}
Expand All @@ -337,7 +286,7 @@ impl Waitable for ClientImpl {
fn is_ready(&self) -> bool {
// Acquire fence to ensure we see the latest channel state from other threads
std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire);
!self.inner.rx().is_empty()
self.inner.rmw_has_responses()
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/ros-z-bridge/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl Bridge {
let key = BridgeKey {
topic: ep.topic.clone(),
type_name: type_info.name.clone(),
kind: ep.kind,
kind: ep.kind.into(),
};

if event.appeared {
Expand Down
Loading
Loading