diff --git a/rclrs/src/dynamic_message.rs b/rclrs/src/dynamic_message.rs index aae749d0e..7197d7998 100644 --- a/rclrs/src/dynamic_message.rs +++ b/rclrs/src/dynamic_message.rs @@ -324,6 +324,10 @@ impl DynamicMessageMetadata { pub fn structure(&self) -> &MessageStructure { &self.structure } + + pub(crate) fn type_support_ptr(&self) -> *const rosidl_message_type_support_t { + self.type_support_ptr + } } // ========================= impl for DynamicMessage ========================= diff --git a/rclrs/src/dynamic_message/dynamic_publisher.rs b/rclrs/src/dynamic_message/dynamic_publisher.rs index 22cbf3268..ccf83bcee 100644 --- a/rclrs/src/dynamic_message/dynamic_publisher.rs +++ b/rclrs/src/dynamic_message/dynamic_publisher.rs @@ -1,13 +1,17 @@ -use std::ffi::CString; -use std::sync::{Arc, Mutex}; +use std::{ + ffi::CString, + sync::{Arc, Mutex}, +}; use super::{ get_type_support_handle, get_type_support_library, DynamicMessage, DynamicMessageError, DynamicMessageMetadata, MessageTypeName, }; -use crate::error::{RclrsError, ToResult}; -use crate::rcl_bindings::*; -use crate::{Node, PublisherHandle, PublisherOptions, ENTITY_LIFECYCLE_MUTEX}; +use crate::{ + error::{RclrsError, ToResult}, + rcl_bindings::*, + Node, PublisherHandle, PublisherOptions, ENTITY_LIFECYCLE_MUTEX, +}; /// Struct for sending dynamic messages. /// @@ -150,8 +154,7 @@ mod tests { #[test] fn test_dynamic_publishers() -> Result<(), RclrsError> { - use crate::vendor::test_msgs::msg; - use crate::TopicEndpointInfo; + use crate::{vendor::test_msgs::msg, TopicEndpointInfo}; let namespace = "/test_dynamic_publishers_graph"; let graph = construct_test_graph(namespace)?; diff --git a/rclrs/src/dynamic_message/dynamic_subscription.rs b/rclrs/src/dynamic_message/dynamic_subscription.rs index 350375ceb..ddc3de290 100644 --- a/rclrs/src/dynamic_message/dynamic_subscription.rs +++ b/rclrs/src/dynamic_message/dynamic_subscription.rs @@ -1,8 +1,10 @@ -use std::any::Any; -use std::boxed::Box; -use std::ffi::CString; -use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, Mutex}; +use std::{ + any::Any, + boxed::Box, + ffi::CString, + ops::{Deref, DerefMut}, + sync::{Arc, Mutex}, +}; use futures::future::BoxFuture; @@ -10,11 +12,11 @@ use super::{ get_type_support_handle, get_type_support_library, DynamicMessage, DynamicMessageMetadata, MessageStructure, MessageTypeName, }; -use crate::rcl_bindings::*; use crate::{ - MessageInfo, Node, NodeHandle, RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError, - RclrsErrorFilter, ReadyKind, SubscriptionHandle, SubscriptionOptions, ToResult, Waitable, - WaitableLifecycle, WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX, + rcl_bindings::*, MessageInfo, Node, NodeHandle, RclPrimitive, RclPrimitiveHandle, + RclPrimitiveKind, RclrsError, RclrsErrorFilter, ReadyKind, SubscriptionHandle, + SubscriptionOptions, ToResult, Waitable, WaitableLifecycle, WorkScope, Worker, WorkerCommands, + ENTITY_LIFECYCLE_MUTEX, }; /// Struct for receiving messages whose type is not known at compile time. diff --git a/rclrs/src/dynamic_message/field_access.rs b/rclrs/src/dynamic_message/field_access.rs index 4ae2a5a17..bcddc0b1a 100644 --- a/rclrs/src/dynamic_message/field_access.rs +++ b/rclrs/src/dynamic_message/field_access.rs @@ -889,10 +889,7 @@ mod immutable { define_value_types!(select_immutable); } -pub use immutable::ArrayValue; -pub use immutable::BoundedSequenceValue; -pub use immutable::SequenceValue; -pub use immutable::SimpleValue; +pub use immutable::{ArrayValue, BoundedSequenceValue, SequenceValue, SimpleValue}; /// The value of a field in a [`DynamicMessage`][1]. /// @@ -944,10 +941,10 @@ mod mutable { define_value_types!(select_mutable); } -pub use mutable::ArrayValue as ArrayValueMut; -pub use mutable::BoundedSequenceValue as BoundedSequenceValueMut; -pub use mutable::SequenceValue as SequenceValueMut; -pub use mutable::SimpleValue as SimpleValueMut; +pub use mutable::{ + ArrayValue as ArrayValueMut, BoundedSequenceValue as BoundedSequenceValueMut, + SequenceValue as SequenceValueMut, SimpleValue as SimpleValueMut, +}; /// The value of a field in a [`DynamicMessage`][1]. /// diff --git a/rclrs/src/dynamic_message/field_access/dynamic_bounded_string.rs b/rclrs/src/dynamic_message/field_access/dynamic_bounded_string.rs index 431c8eb69..9e011168b 100644 --- a/rclrs/src/dynamic_message/field_access/dynamic_bounded_string.rs +++ b/rclrs/src/dynamic_message/field_access/dynamic_bounded_string.rs @@ -1,7 +1,9 @@ -use std::convert::AsMut; -use std::fmt::{self, Display}; -use std::num::NonZeroUsize; -use std::ops::{Deref, DerefMut}; +use std::{ + convert::AsMut, + fmt::{self, Display}, + num::NonZeroUsize, + ops::{Deref, DerefMut}, +}; use rosidl_runtime_rs::StringExceedsBoundsError; diff --git a/rclrs/src/dynamic_message/field_access/dynamic_message_view.rs b/rclrs/src/dynamic_message/field_access/dynamic_message_view.rs index eb59d1da9..2c1153756 100644 --- a/rclrs/src/dynamic_message/field_access/dynamic_message_view.rs +++ b/rclrs/src/dynamic_message/field_access/dynamic_message_view.rs @@ -1,8 +1,12 @@ -use std::fmt::{self, Debug}; -use std::ops::Deref; - -use super::super::MessageStructure; -use super::{DynamicSequenceElementMut, Proxy, ProxyMut, ProxySequence, Value, ValueMut}; +use std::{ + fmt::{self, Debug}, + ops::Deref, +}; + +use super::{ + super::MessageStructure, DynamicSequenceElementMut, Proxy, ProxyMut, ProxySequence, Value, + ValueMut, +}; /// A view of a single message. Used for nested messages. /// diff --git a/rclrs/src/dynamic_message/field_access/dynamic_sequence.rs b/rclrs/src/dynamic_message/field_access/dynamic_sequence.rs index 3124132a9..271617dea 100644 --- a/rclrs/src/dynamic_message/field_access/dynamic_sequence.rs +++ b/rclrs/src/dynamic_message/field_access/dynamic_sequence.rs @@ -1,6 +1,8 @@ -use std::fmt::{self, Debug}; -use std::marker::PhantomData; -use std::ops::{Deref, DerefMut}; +use std::{ + fmt::{self, Debug}, + marker::PhantomData, + ops::{Deref, DerefMut}, +}; use rosidl_runtime_rs::{Sequence, SequenceAlloc, SequenceExceedsBoundsError}; diff --git a/rclrs/src/dynamic_message/message_structure.rs b/rclrs/src/dynamic_message/message_structure.rs index a28b38ab9..b2ece52a3 100644 --- a/rclrs/src/dynamic_message/message_structure.rs +++ b/rclrs/src/dynamic_message/message_structure.rs @@ -1,10 +1,10 @@ -use std::ffi::CStr; -use std::num::NonZeroUsize; +use std::{ffi::CStr, num::NonZeroUsize}; use super::TypeErasedSequence; -use crate::rcl_bindings::rosidl_typesupport_introspection_c__MessageMember as rosidl_message_member_t; -use crate::rcl_bindings::rosidl_typesupport_introspection_c__MessageMembers as rosidl_message_members_t; -use crate::rcl_bindings::*; +use crate::rcl_bindings::{ + rosidl_typesupport_introspection_c__MessageMember as rosidl_message_member_t, + rosidl_typesupport_introspection_c__MessageMembers as rosidl_message_members_t, *, +}; /// Possible base types for fields in a message. // The field variants are self-explaining, no need to add redundant documentation. @@ -298,8 +298,7 @@ impl MessageStructure { #[cfg(test)] mod tests { use super::*; - use crate::dynamic_message::*; - use crate::vendor::test_msgs::msg; + use crate::{dynamic_message::*, vendor::test_msgs::msg}; use std::num::NonZeroUsize; diff --git a/rclrs/src/lib.rs b/rclrs/src/lib.rs index 9db4767f9..04ea8d76d 100644 --- a/rclrs/src/lib.rs +++ b/rclrs/src/lib.rs @@ -192,6 +192,9 @@ mod node; mod parameter; mod publisher; mod qos; +mod serialized_message; +mod serialized_publisher; +mod serialized_subscription; mod service; mod subscription; mod time; @@ -204,7 +207,7 @@ mod worker; #[cfg(test)] mod test_helpers; -mod rcl_bindings; +pub mod rcl_bindings; pub use action::*; pub use arguments::*; @@ -221,6 +224,9 @@ pub use parameter::*; pub use publisher::*; pub use qos::*; pub use rcl_bindings::rmw_request_id_t; +pub use serialized_message::*; +pub use serialized_publisher::*; +pub use serialized_subscription::*; pub use service::*; pub use subscription::*; pub use time::*; diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index ed436df25..7c0ddb4b7 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -43,10 +43,10 @@ use crate::{ IntoAsyncSubscriptionCallback, IntoNodeServiceCallback, IntoNodeSubscriptionCallback, IntoNodeTimerOneshotCallback, IntoNodeTimerRepeatingCallback, IntoTimerOptions, LogParams, Logger, MessageInfo, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters, - Promise, Publisher, PublisherOptions, PublisherState, RclrsError, RequestedGoal, Service, - ServiceOptions, ServiceState, Subscription, SubscriptionOptions, SubscriptionState, - TerminatedGoal, TimeSource, Timer, TimerState, ToLogParams, Worker, WorkerOptions, WorkerState, - ENTITY_LIFECYCLE_MUTEX, + Promise, Publisher, PublisherOptions, PublisherState, RclrsError, RequestedGoal, + SerializedPublisher, SerializedSubscription, Service, ServiceOptions, ServiceState, + Subscription, SubscriptionOptions, SubscriptionState, TerminatedGoal, TimeSource, Timer, + TimerState, ToLogParams, ToResult, Worker, WorkerOptions, WorkerState, ENTITY_LIFECYCLE_MUTEX, }; /// A processing unit that can communicate with other nodes. See the API of @@ -1511,6 +1511,82 @@ impl NodeState { pub(crate) fn handle(&self) -> &Arc { &self.handle } + + /// Creates a serialized subscription. + /// + /// This receives raw serialized (CDR) bytes, using `rcl_take_serialized_message`. + pub fn create_serialized_subscription<'a>( + &self, + topic_type: MessageTypeName, + options: impl Into>, + ) -> Result { + let SubscriptionOptions { topic, qos } = options.into(); + + // Use the same typesupport resolution as dynamic messages. + let metadata = crate::dynamic_message::DynamicMessageMetadata::new(topic_type)?; + + let mut sub = unsafe { rcl_get_zero_initialized_subscription() }; + let topic_c = std::ffi::CString::new(topic).unwrap(); + + let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap(); + let node = self.handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + + unsafe { + let mut opts = rcl_subscription_get_default_options(); + opts.qos = qos.into(); + rcl_subscription_init( + &mut sub, + &*node, + metadata.type_support_ptr(), + topic_c.as_ptr(), + &opts, + ) + .ok()?; + } + + Ok(SerializedSubscription { + handle: Arc::clone(&self.handle), + sub, + }) + } + + /// Creates a serialized publisher. + /// + /// This publishes raw serialized (CDR) bytes, using `rcl_publish_serialized_message`. + pub fn create_serialized_publisher<'a>( + &self, + topic_type: MessageTypeName, + options: impl Into>, + ) -> Result { + let crate::PublisherOptions { topic, qos } = options.into(); + + let metadata = crate::dynamic_message::DynamicMessageMetadata::new(topic_type)?; + let mut pub_ = unsafe { rcl_get_zero_initialized_publisher() }; + let topic_c = std::ffi::CString::new(topic).unwrap(); + + let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap(); + let node = self.handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + + unsafe { + let mut opts = rcl_publisher_get_default_options(); + opts.qos = qos.into(); + rcl_publisher_init( + &mut pub_, + &*node, + metadata.type_support_ptr(), + topic_c.as_ptr(), + &opts, + ) + .ok()?; + } + + Ok(SerializedPublisher { + handle: Arc::clone(&self.handle), + pub_, + }) + } } impl<'a> ToLogParams<'a> for &'a NodeState { diff --git a/rclrs/src/serialized_message.rs b/rclrs/src/serialized_message.rs new file mode 100644 index 000000000..64f34494f --- /dev/null +++ b/rclrs/src/serialized_message.rs @@ -0,0 +1,52 @@ +use crate::{rcl_bindings::*, RclrsError, ToResult}; + +/// A growable serialized message buffer. +/// +/// This wraps `rcl_serialized_message_t` (aka `rmw_serialized_message_t`). +pub struct SerializedMessage { + pub(crate) msg: rcl_serialized_message_t, +} + +unsafe impl Send for SerializedMessage {} + +impl SerializedMessage { + /// Create a new serialized message buffer with the given capacity in bytes. + pub fn new(capacity: usize) -> Result { + unsafe { + let mut msg = rcutils_get_zero_initialized_uint8_array(); + let allocator = rcutils_get_default_allocator(); + rcutils_uint8_array_init(&mut msg, capacity, &allocator).ok()?; + Ok(Self { msg }) + } + } + + /// Return the current serialized payload. + pub fn as_bytes(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.msg.buffer, self.msg.buffer_length) } + } + + /// Reset the length to 0 without changing capacity. + pub fn clear(&mut self) { + self.msg.buffer_length = 0; + } + + /// Replace the serialized payload with the given bytes. + pub fn set_bytes(&mut self, bytes: &[u8]) -> Result<(), RclrsError> { + unsafe { + if self.msg.buffer_capacity < bytes.len() { + rcutils_uint8_array_resize(&mut self.msg, bytes.len()).ok()?; + } + std::ptr::copy_nonoverlapping(bytes.as_ptr(), self.msg.buffer, bytes.len()); + self.msg.buffer_length = bytes.len(); + } + Ok(()) + } +} + +impl Drop for SerializedMessage { + fn drop(&mut self) { + unsafe { + let _ = rcutils_uint8_array_fini(&mut self.msg); + } + } +} diff --git a/rclrs/src/serialized_publisher.rs b/rclrs/src/serialized_publisher.rs new file mode 100644 index 000000000..ee2359543 --- /dev/null +++ b/rclrs/src/serialized_publisher.rs @@ -0,0 +1,34 @@ +use crate::{node::NodeHandle, rcl_bindings::*, RclrsError, ToResult, ENTITY_LIFECYCLE_MUTEX}; +use std::{ptr, sync::Arc}; + +use crate::serialized_message::SerializedMessage; + +/// A publisher which publishes serialized ROS messages. +pub struct SerializedPublisher { + pub(crate) handle: Arc, + pub(crate) pub_: rcl_publisher_t, +} + +unsafe impl Send for SerializedPublisher {} +unsafe impl Sync for SerializedPublisher {} + +impl Drop for SerializedPublisher { + fn drop(&mut self) { + let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap(); + let mut node = self.handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + unsafe { + let _ = rcl_publisher_fini(&mut self.pub_, &mut *node); + } + } +} + +impl SerializedPublisher { + /// Publish a serialized (CDR) message. + pub fn publish(&self, msg: &SerializedMessage) -> Result<(), RclrsError> { + unsafe { + rcl_publish_serialized_message(&self.pub_, &msg.msg, ptr::null_mut()).ok()?; + } + Ok(()) + } +} diff --git a/rclrs/src/serialized_subscription.rs b/rclrs/src/serialized_subscription.rs new file mode 100644 index 000000000..f4e574f3c --- /dev/null +++ b/rclrs/src/serialized_subscription.rs @@ -0,0 +1,42 @@ +use crate::{node::NodeHandle, rcl_bindings::*, MessageInfo, RclrsError, ENTITY_LIFECYCLE_MUTEX}; +use std::{ptr, sync::Arc}; + +use crate::serialized_message::SerializedMessage; + +/// A subscription which receives serialized ROS messages. +pub struct SerializedSubscription { + pub(crate) handle: Arc, + pub(crate) sub: rcl_subscription_t, +} + +unsafe impl Send for SerializedSubscription {} +unsafe impl Sync for SerializedSubscription {} + +impl Drop for SerializedSubscription { + fn drop(&mut self) { + let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap(); + let mut node = self.handle.rcl_node.lock().unwrap(); + let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); + unsafe { + let _ = rcl_subscription_fini(&mut self.sub, &mut *node); + } + } +} + +impl SerializedSubscription { + /// Take a serialized (CDR) message. + /// + /// Returns `Ok(None)` when no message is available. + pub fn take(&self, buf: &mut SerializedMessage) -> Result, RclrsError> { + unsafe { + let mut info: rmw_message_info_t = std::mem::zeroed(); + let rc = + rcl_take_serialized_message(&self.sub, &mut buf.msg, &mut info, ptr::null_mut()); + if rc != 0 { + // No message available or error. The rmw/rcl API uses negative codes for "take failed". + return Ok(None); + } + Ok(Some(MessageInfo::from_rmw_message_info(&info))) + } + } +} diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index be195b8f2..11d584fc9 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -620,8 +620,7 @@ mod tests { #[test] fn test_subscription_qos_settings() { - use crate::vendor::example_interfaces::msg::Empty; - use crate::*; + use crate::{vendor::example_interfaces::msg::Empty, *}; let executor = Context::default().create_basic_executor(); @@ -672,8 +671,7 @@ mod tests { #[test] fn test_setting_qos_from_parameters() { - use crate::vendor::example_interfaces::msg::Empty; - use crate::*; + use crate::{vendor::example_interfaces::msg::Empty, *}; let args = ["--ros-args", "-p", "qos_reliability:=best_effort"].map(ToString::to_string);