Skip to content

Commit

Permalink
Use tokio runtime for examples. Use futures::Future as future impleme…
Browse files Browse the repository at this point in the history
…ntation for clients
  • Loading branch information
esteve committed Jun 8, 2022
1 parent 8d85f70 commit a988c45
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 34 deletions.
3 changes: 3 additions & 0 deletions examples/minimal_client_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ path = "src/minimal_service.rs"

[dependencies]
anyhow = {version = "1", features = ["backtrace"]}
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "time"] }
# Needed for clients
futures = "0.3"

[dependencies.rclrs]
version = "*"
Expand Down
27 changes: 19 additions & 8 deletions examples/minimal_client_service/src/minimal_client_async.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,38 @@
use anyhow::{Error, Result};
use futures::join;
use futures::Future;
use rclrs::Node;
use std::env;
use std::thread;
use tokio;

fn main() -> Result<(), Error> {
#[tokio::main]
async fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args()).unwrap();

let mut node = context.create_node("minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };

println!("Starting client");

std::thread::sleep(std::time::Duration::from_millis(500));

let future = client.call_async(&request)?;
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };

let future = client.call_async(&request);

println!("Waiting for response");
let response = rclrs::spin_until_future_complete(&node, future)?;

let spin_thread = std::thread::spawn(move || {
rclrs::spin(&node);
});

let response = future.await;
println!(
"Result of {} + {} is: {}",
request.a, request.b, response.sum
);
"Result of {} + {} is: {}",
request.a, request.b, response.unwrap().sum
);
spin_thread.join();
Ok(())
}
2 changes: 2 additions & 0 deletions rclrs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ libc = "0.2.43"
parking_lot = "0.11.2"
# Needed for the Message trait, among others
rosidl_runtime_rs = "*"
# Needed for clients
futures = "0.3"

[build-dependencies]
# Needed for FFI
Expand Down
23 changes: 23 additions & 0 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,29 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
Ok(())
}

/// Convenience function for calling [`rclrs::spin_once`] in a loop.
///
/// This function additionally checks that the context is still valid.
pub fn spin_some(node: &Node) -> Result<(), RclrsError> {
// The context_is_valid functions exists only to abstract away ROS distro differences
#[cfg(ros_distro = "foxy")]
// SAFETY: No preconditions for this function.
let context_is_valid = || unsafe { rcl_context_is_valid(&mut *node.context.lock()) };
#[cfg(not(ros_distro = "foxy"))]
// SAFETY: No preconditions for this function.
let context_is_valid = || unsafe { rcl_context_is_valid(&*node.context.lock()) };

if context_is_valid() {
if let Some(error) = spin_once(node, Some(std::time::Duration::from_millis(500))).err() {
match error.code {
RclReturnCode::Timeout => (),
_ => return Err(error),
}
}
}
Ok(())
}

pub fn spin_until_future_complete<T: Unpin + Clone>(
node: &node::Node,
future: Arc<Mutex<Box<crate::future::RclFuture<T>>>>,
Expand Down
2 changes: 1 addition & 1 deletion rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl Node {
) -> Result<Arc<crate::node::service::Service<T>>, RclrsError>
where
T: rosidl_runtime_rs::Service + 'static,
F: FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + Sized + 'static,
F: FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + 'static + Send,
{
let service = Arc::new(crate::node::service::Service::<T>::new(
self, topic, callback,
Expand Down
59 changes: 37 additions & 22 deletions rclrs/src/node/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#![warn(missing_docs)]
use crate::node::client::oneshot::Canceled;
use futures::channel::{mpsc, oneshot};
use std::borrow::Borrow;
use std::boxed::Box;
use std::collections::HashMap;
Expand All @@ -15,6 +17,10 @@ use crate::{rcl_bindings::*, RclrsError};
use parking_lot::{Mutex, MutexGuard};
use rosidl_runtime_rs::Message;

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_client_t {}

pub struct ClientHandle {
handle: Mutex<rcl_client_t>,
node_handle: Arc<Mutex<rcl_node_t>>,
Expand All @@ -36,9 +42,18 @@ impl Drop for ClientHandle {
}
}

impl From<Canceled> for RclrsError {
fn from(_: Canceled) -> Self {
RclrsError {
code: RclReturnCode::Error,
msg: None,
}
}
}

/// Trait to be implemented by concrete Client structs
/// See [`Client<T>`] for an example
pub trait ClientBase {
pub trait ClientBase: Send + Sync {
fn handle(&self) -> &ClientHandle;
fn execute(&self) -> Result<(), RclrsError>;
}
Expand All @@ -49,8 +64,8 @@ where
T: rosidl_runtime_rs::Service,
{
pub(crate) handle: Arc<ClientHandle>,
requests: Mutex<HashMap<i64, Mutex<Box<dyn FnMut(&T::Response) + 'static>>>>,
futures: Mutex<HashMap<i64, Arc<Mutex<Box<RclFuture<T::Response>>>>>>,
requests: Mutex<HashMap<i64, Mutex<Box<dyn FnMut(&T::Response) + 'static + Send>>>>,
futures: Arc<Mutex<HashMap<i64, oneshot::Sender<T::Response>>>>,
sequence_number: AtomicI64,
}

Expand Down Expand Up @@ -89,7 +104,9 @@ where
Ok(Self {
handle,
requests: Mutex::new(HashMap::new()),
futures: Mutex::new(HashMap::new()),
futures: Arc::new(Mutex::new(
HashMap::<i64, oneshot::Sender<T::Response>>::new(),
)),
sequence_number: AtomicI64::new(0),
})
}
Expand All @@ -112,14 +129,13 @@ where
callback: F,
) -> Result<(), RclrsError>
where
F: FnMut(&T::Response) + Sized + 'static,
F: FnMut(&T::Response) + 'static + Send,
{
let rmw_message = T::Request::into_rmw_message(message.into_cow());
let handle = &mut *self.handle.lock();
let mut sequence_number = self.sequence_number.load(Ordering::SeqCst);
let ret = unsafe {
rcl_send_request(
handle as *mut _,
&*self.handle.lock() as *const _,
rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
&mut sequence_number,
)
Expand All @@ -130,7 +146,7 @@ where
ret.ok()
}

/// Send a requests with a callback as a parameter.
/// Send a request with a callback as a parameter.
///
/// The [`MessageCow`] trait is implemented by any
/// [`Message`] as well as any reference to a `Message`.
Expand All @@ -142,31 +158,25 @@ where
///
/// Hence, when a message will not be needed anymore after publishing, pass it by value.
/// When a message will be needed again after publishing, pass it by reference, instead of cloning and passing by value.
pub fn call_async<'a, R: MessageCow<'a, T::Request>>(
pub async fn call_async<'a, R: MessageCow<'a, T::Request>>(
&self,
request: R,
) -> Result<Arc<Mutex<Box<RclFuture<T::Response>>>>, RclrsError>
) -> Result<T::Response, RclrsError>
where
T: rosidl_runtime_rs::Service + 'static,
{
let rmw_message = T::Request::into_rmw_message(request.into_cow());
let handle = &mut *self.handle.lock();
let mut sequence_number = self.sequence_number.load(Ordering::SeqCst);
let ret = unsafe {
rcl_send_request(
handle as *mut _,
&*self.handle.lock() as *const _,
rmw_message.as_ref() as *const <T::Request as Message>::RmwMsg as *mut _,
&mut sequence_number,
)
};
let response = Arc::new(Mutex::new(Box::new(RclFuture::<T::Response>::new())));
{
let futures = &mut *self.futures.lock();
futures.insert(sequence_number, response.clone());
}
self.sequence_number.swap(sequence_number, Ordering::SeqCst);
ret.ok()?;
Ok(response)
let (tx, rx) = oneshot::channel::<T::Response>();
self.futures.lock().insert(sequence_number, tx);
Ok(rx.await?)
}

/// Ask RMW for the data
Expand Down Expand Up @@ -232,8 +242,13 @@ where
let callback = requests.remove(&req_id.sequence_number).unwrap();
(*callback.lock())(&res);
} else if futures.contains_key(&req_id.sequence_number) {
let future = futures.remove(&req_id.sequence_number).unwrap();
(&mut *future.lock()).set_value(res);
futures
.remove(&req_id.sequence_number)
.unwrap_or_else(|| panic!("fail to find key in Client::process_requests"))
.send(res)
.unwrap_or_else(|_| {
panic!("fail to send response via channel in Client::process_requests")
});
}
Ok(())
}
Expand Down
11 changes: 8 additions & 3 deletions rclrs/src/node/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ use crate::node::publisher::MessageCow;

use parking_lot::{Mutex, MutexGuard};

// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
// they are running in. Therefore, this type can be safely sent to another thread.
unsafe impl Send for rcl_service_t {}

pub struct ServiceHandle {
handle: Mutex<rcl_service_t>,
node_handle: Arc<Mutex<rcl_node_t>>,
Expand All @@ -36,7 +40,7 @@ impl Drop for ServiceHandle {

/// Trait to be implemented by concrete Service structs
/// See [`Service<T>`] for an example
pub trait ServiceBase {
pub trait ServiceBase: Send + Sync {
fn handle(&self) -> &ServiceHandle;
fn execute(&self) -> Result<(), RclrsError>;
}
Expand All @@ -48,7 +52,8 @@ where
{
pub handle: Arc<ServiceHandle>,
// The callback's lifetime should last as long as we need it to
pub callback: Mutex<Box<dyn FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + 'static>>,
pub callback:
Mutex<Box<dyn FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + 'static + Send>>,
}

impl<T> Service<T>
Expand All @@ -58,7 +63,7 @@ where
pub fn new<F>(node: &Node, topic: &str, callback: F) -> Result<Self, RclrsError>
where
T: rosidl_runtime_rs::Service,
F: FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + Sized + 'static,
F: FnMut(&rmw_request_id_t, &T::Request, &mut T::Response) + 'static + Send,
{
let mut service_handle = unsafe { rcl_get_zero_initialized_service() };
let type_support = <T as rosidl_runtime_rs::Service>::get_type_support()
Expand Down

0 comments on commit a988c45

Please sign in to comment.