Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rclrs/src/dynamic_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub struct DynamicMessage {
}

/// This is an analogue of rclcpp::get_typesupport_library.
fn get_type_support_library(
pub(crate) fn get_type_support_library(
package_name: &str,
type_support_identifier: &str,
) -> Result<Arc<libloading::Library>, DynamicMessageError> {
Expand Down Expand Up @@ -162,7 +162,7 @@ fn get_type_support_library(
///
/// It is unsafe because it would be theoretically possible to pass in a library that has
/// the expected symbol defined, but with an unexpected type.
unsafe fn get_type_support_handle(
pub(crate) unsafe fn get_type_support_handle(
type_support_library: &libloading::Library,
type_support_identifier: &str,
message_type: &MessageTypeName,
Expand Down
2 changes: 2 additions & 0 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ mod node;
mod parameter;
mod publisher;
mod qos;
mod serialized_subscription;
mod service;
mod subscription;
mod time;
Expand Down Expand Up @@ -221,6 +222,7 @@ pub use parameter::*;
pub use publisher::*;
pub use qos::*;
pub use rcl_bindings::rmw_request_id_t;
pub use serialized_subscription::*;
pub use service::*;
pub use subscription::*;
pub use time::*;
Expand Down
30 changes: 26 additions & 4 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ use crate::{
IntoAsyncSubscriptionCallback, IntoNodeServiceCallback, IntoNodeSubscriptionCallback,
IntoNodeTimerOneshotCallback, IntoNodeTimerRepeatingCallback, IntoTimerOptions, LogParams,
Logger, MessageInfo, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters,
Promise, Publisher, PublisherOptions, PublisherState, RclrsError, RequestedGoal, Service,
ServiceOptions, ServiceState, Subscription, SubscriptionOptions, SubscriptionState,
TerminatedGoal, TimeSource, Timer, TimerState, ToLogParams, Worker, WorkerOptions, WorkerState,
ENTITY_LIFECYCLE_MUTEX,
Promise, Publisher, PublisherOptions, PublisherState, RclrsError, RequestedGoal,
SerializedSubscription, SerializedSubscriptionState, Service, ServiceOptions, ServiceState,
Subscription, SubscriptionOptions, SubscriptionState, TerminatedGoal, TimeSource, Timer,
TimerState, ToLogParams, Worker, WorkerOptions, WorkerState, ENTITY_LIFECYCLE_MUTEX,
};

/// A processing unit that can communicate with other nodes. See the API of
Expand Down Expand Up @@ -944,6 +944,28 @@ impl NodeState {
)
}

/// Creates a [`SerializedSubscription`] with an ordinary callback.
///
/// The message type is determined at runtime, and the callback receives the serialized
/// ROS 2 payload bytes plus [`crate::MessageInfo`].
pub fn create_serialized_subscription<'a, F>(
&self,
topic_type: MessageTypeName,
options: impl Into<SubscriptionOptions<'a>>,
callback: F,
) -> Result<SerializedSubscription, RclrsError>
where
F: Fn(Vec<u8>, MessageInfo) + Send + Sync + 'static,
{
SerializedSubscriptionState::create(
topic_type,
options,
crate::serialized_subscription::NodeSerializedSubscriptionCallback::new(callback),
&self.handle,
self.commands.async_worker_commands(),
)
}

/// Creates a [`DynamicSubscription`] with an async callback.
///
/// For the behavior and API refer to [`Node::create_async_subscription`][1], except two key
Expand Down
191 changes: 191 additions & 0 deletions rclrs/src/serialized_subscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use std::{
any::Any,
ffi::CString,
sync::{Arc, Mutex},
};

use crate::{
dynamic_message::{get_type_support_handle, get_type_support_library, MessageTypeName},
rcl_bindings::*,
MessageInfo, NodeHandle, RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError,
RclrsErrorFilter, ReadyKind, SubscriptionHandle, SubscriptionOptions, ToResult, Waitable,
WaitableLifecycle, WorkerCommands, ENTITY_LIFECYCLE_MUTEX,
};

/// Struct for receiving serialized ROS 2 messages as raw bytes.
///
/// Create a serialized subscription using [`NodeState::create_serialized_subscription`].
pub type SerializedSubscription = Arc<SerializedSubscriptionState>;

struct SerializedSubscriptionExecutable {
handle: Arc<SubscriptionHandle>,
callback: Arc<Mutex<NodeSerializedSubscriptionCallback>>,
}

struct SerializedMessageBuffer {
inner: rcl_serialized_message_t,
}

pub(crate) struct NodeSerializedSubscriptionCallback(
Box<dyn Fn(Vec<u8>, MessageInfo) + Send + Sync>,
);

impl NodeSerializedSubscriptionCallback {
pub(crate) fn new(f: impl Fn(Vec<u8>, MessageInfo) + Send + Sync + 'static) -> Self {
Self(Box::new(f))
}
}

impl SerializedMessageBuffer {
fn new() -> Result<Self, RclrsError> {
let mut inner = unsafe { rcutils_get_zero_initialized_uint8_array() };
let allocator = unsafe { rcutils_get_default_allocator() };
unsafe {
rcutils_uint8_array_init(&mut inner, 0, &allocator).ok()?;
}
Ok(Self { inner })
}

fn take_bytes(&self) -> Vec<u8> {
unsafe { std::slice::from_raw_parts(self.inner.buffer, self.inner.buffer_length) }.to_vec()
}
}

impl Drop for SerializedMessageBuffer {
fn drop(&mut self) {
if self.inner.allocator.allocate.is_some() {
unsafe {
rcutils_uint8_array_fini(&mut self.inner);
}
}
}
}

impl SerializedSubscriptionExecutable {
fn take(&self) -> Result<(Vec<u8>, MessageInfo), RclrsError> {
let mut serialized_message = SerializedMessageBuffer::new()?;
let mut message_info = unsafe { rmw_get_zero_initialized_message_info() };
let rcl_subscription = &mut *self.handle.lock();

unsafe {
rcl_take_serialized_message(
rcl_subscription,
&mut serialized_message.inner,
&mut message_info,
std::ptr::null_mut(),
)
.ok()?;
};

Ok((
serialized_message.take_bytes(),
MessageInfo::from_rmw_message_info(&message_info),
))
}
}

impl RclPrimitive for SerializedSubscriptionExecutable {
unsafe fn execute(
&mut self,
ready: ReadyKind,
_payload: &mut dyn Any,
) -> Result<(), RclrsError> {
ready.for_basic()?;
let evaluate = || {
let (msg, msg_info) = self.take()?;
(self.callback.lock().unwrap().0)(msg, msg_info);
Ok(())
};

evaluate().take_failed_ok()
}

fn kind(&self) -> RclPrimitiveKind {
RclPrimitiveKind::Subscription
}

fn handle(&self) -> RclPrimitiveHandle<'_> {
RclPrimitiveHandle::Subscription(self.handle.lock())
}
}

/// Inner state of a [`SerializedSubscription`].
pub struct SerializedSubscriptionState {
handle: Arc<SubscriptionHandle>,
#[allow(unused)]
callback: Arc<Mutex<NodeSerializedSubscriptionCallback>>,
#[allow(unused)]
lifecycle: WaitableLifecycle,
#[allow(dead_code)]
type_support_library: Arc<libloading::Library>,
}

impl SerializedSubscriptionState {
pub(crate) fn create<'a>(
topic_type: MessageTypeName,
options: impl Into<SubscriptionOptions<'a>>,
callback: NodeSerializedSubscriptionCallback,
node_handle: &Arc<NodeHandle>,
commands: &Arc<WorkerCommands>,
) -> Result<Arc<Self>, RclrsError> {
let SubscriptionOptions { topic, qos } = options.into();
let type_support_library =
get_type_support_library(&topic_type.package_name, "rosidl_typesupport_c")?;
let type_support_ptr = unsafe {
get_type_support_handle(
type_support_library.as_ref(),
"rosidl_typesupport_c",
&topic_type,
)?
};

let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
err,
s: topic.into(),
})?;

let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() };
rcl_subscription_options.qos = qos.into();
let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() };
{
let rcl_node = node_handle.rcl_node.lock()?;
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock()?;
unsafe {
rcl_subscription_init(
&mut rcl_subscription,
&*rcl_node,
type_support_ptr,
topic_c_string.as_ptr(),
&rcl_subscription_options,
)
.ok()?;
}
}

let handle = Arc::new(SubscriptionHandle {
rcl_subscription: Mutex::new(rcl_subscription),
node_handle: Arc::clone(node_handle),
});
let callback = Arc::new(Mutex::new(callback));
let (waitable, lifecycle) = Waitable::new(
Box::new(SerializedSubscriptionExecutable {
handle: Arc::clone(&handle),
callback: Arc::clone(&callback),
}),
Some(Arc::clone(commands.get_guard_condition())),
);
commands.add_to_wait_set(waitable);

Ok(Arc::new(Self {
handle,
callback,
lifecycle,
type_support_library,
}))
}

/// Returns the topic name of the subscription.
pub fn topic_name(&self) -> String {
self.handle.topic_name()
}
}
Loading