Skip to content

Commit

Permalink
Added support for clients and services
Browse files Browse the repository at this point in the history
  • Loading branch information
esteve committed May 2, 2022
1 parent cb87f98 commit 7c6944a
Show file tree
Hide file tree
Showing 21 changed files with 851 additions and 17 deletions.
35 changes: 35 additions & 0 deletions rclrs/src/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/// Based on https://www.viget.com/articles/understanding-futures-in-rust-part-1/
use core::marker::PhantomData;
use parking_lot::Mutex;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

#[derive(Default)]
pub struct RclFuture<T> {
value: Option<T>,
}

impl<T: Default + Clone> RclFuture<T> {
pub fn new() -> RclFuture<T> {
Self { value: None }
}

pub fn set_value(&mut self, msg: T) {
self.value = Some(msg);
}
}

impl<T: Clone> Future for RclFuture<T> {
type Output = T;

fn poll(self: Pin<&mut Self>, _ctx: &mut Context) -> Poll<Self::Output> {
if let Some(value) = &self.value {
Poll::Ready(value.clone())
} else {
Poll::Pending
}
}
}
93 changes: 89 additions & 4 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,69 @@ pub use wait::*;
use rcl_bindings::rcl_context_is_valid;
use std::time::Duration;

use std::pin::Pin;

pub use rcl_bindings::rmw_request_id_t;

/// Polls the node for new messages and executes the corresponding callbacks.
///
/// See [`WaitSet::wait`] for the meaning of the `timeout` parameter.
///
/// This may under some circumstances return
/// [`SubscriptionTakeFailed`][1] when the wait set spuriously wakes up.
/// [`SubscriptionTakeFailed`][1], [`ClientTakeFailed`][2], [`ServiceTakeFailed`][3] when the wait
/// set spuriously wakes up.
/// This can usually be ignored.
///
/// [1]: crate::SubscriberErrorCode
/// [2]: crate::ClientErrorCode
/// [3]: crate::ServiceErrorCode
pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclReturnCode> {
let live_subscriptions = node.live_subscriptions();
let live_clients = node.live_clients();
let live_services = node.live_services();
let ctx = Context {
handle: node.context.clone(),
};
let mut wait_set = WaitSet::new(live_subscriptions.len(), &ctx)?;
let mut wait_set = WaitSet::new(
live_subscriptions.len(),
0,
0,
live_clients.len(),
live_services.len(),
0,
&ctx,
)?;

for live_subscription in &live_subscriptions {
wait_set.add_subscription(live_subscription.clone())?;
}

for live_client in &live_clients {
wait_set.add_client(live_client.clone())?;
}

for live_service in &live_services {
wait_set.add_service(live_service.clone())?;
}

let ready_entities = wait_set.wait(timeout)?;

for ready_subscription in ready_entities.subscriptions {
ready_subscription.execute()?;
}

for ready_client in ready_entities.clients {
ready_client.execute()?;
}

for ready_service in ready_entities.services {
ready_service.execute()?;
}

Ok(())
}

/// Convenience function for calling [`spin_once`] in a loop.
/// Convenience function for calling [`rclrs::spin_once`] in a loop.
///
/// This function additionally checks that the context is still valid.
pub fn spin(node: &Node) -> Result<(), RclReturnCode> {
Expand All @@ -70,6 +104,57 @@ pub fn spin(node: &Node) -> Result<(), RclReturnCode> {
};
}
}

Ok(())
}

#[derive(Clone)]
struct RclWaker {}

fn rclwaker_wake(_s: &RclWaker) {}

fn rclwaker_wake_by_ref(_s: &RclWaker) {}

fn rclwaker_clone(s: &RclWaker) -> RawWaker {
let arc = unsafe { Arc::from_raw(s) };
std::mem::forget(arc.clone());
RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
}

const VTABLE: RawWakerVTable = unsafe {
RawWakerVTable::new(
|s| rclwaker_clone(&*(s as *const RclWaker)),
|s| rclwaker_wake(&*(s as *const RclWaker)),
|s| rclwaker_wake_by_ref(&*(s as *const RclWaker)),
|s| drop(Arc::from_raw(s as *const RclWaker)),
)
};

fn rclwaker_into_waker(s: *const RclWaker) -> Waker {
let raw_waker = RawWaker::new(s as *const (), &VTABLE);
unsafe { Waker::from_raw(raw_waker) }
}

pub fn spin_until_future_complete<T: Unpin + Clone>(
node: &node::Node,
mut future: Arc<Mutex<Box<RclFuture<T>>>>,
) -> Result<<future::RclFuture<T> as Future>::Output, RclReturnCode> {
let rclwaker = Arc::new(RclWaker {});
let waker = rclwaker_into_waker(Arc::into_raw(rclwaker));
let mut cx = std::task::Context::from_waker(&waker);

loop {
let context_valid = unsafe { rcl_context_is_valid(&mut *node.context.lock() as *mut _) };
if context_valid {
if let Some(error) = spin_once(node, None).err() {
match error {
RclReturnCode::Timeout => continue,
error => return Err(error),
};
};
match Future::poll(Pin::new(&mut *future.lock()), &mut cx) {
Poll::Ready(val) => break Ok(val),
Poll::Pending => continue,
};
}
}
}
Loading

0 comments on commit 7c6944a

Please sign in to comment.