Skip to content

Commit

Permalink
Added support for clients and services (#146)
Browse files Browse the repository at this point in the history
* Added support for clients and services
  • Loading branch information
esteve authored Jul 8, 2022
1 parent 81cb0f7 commit 58dd04a
Show file tree
Hide file tree
Showing 29 changed files with 1,241 additions and 298 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ jobs:
run: |
cd ${{ steps.build.outputs.ros-workspace-directory-name }}
. /opt/ros/${{ matrix.ros_distribution }}/setup.sh
for path in $(colcon list | awk '$3 == "(ament_cargo)" && $1 != "examples_rclrs_minimal_pub_sub" { print $2 }'); do
for path in $(colcon list | awk '$3 == "(ament_cargo)" && $1 != "examples_rclrs_minimal_pub_sub" && $1 != "examples_rclrs_minimal_client_service" { print $2 }'); do
cd $path
echo "Running cargo test in $path"
cargo test
Expand All @@ -110,7 +110,7 @@ jobs:
run: |
cd ${{ steps.build.outputs.ros-workspace-directory-name }}
. /opt/ros/${{ matrix.ros_distribution }}/setup.sh
for path in $(colcon list | awk '$3 == "(ament_cargo)" && $1 != "examples_rclrs_minimal_pub_sub" { print $2 }'); do
for path in $(colcon list | awk '$3 == "(ament_cargo)" && $1 != "examples_rclrs_minimal_pub_sub" && $1 != "examples_rclrs_minimal_client_service" { print $2 }'); do
cd $path
echo "Running rustdoc check in $path"
cargo rustdoc -- -D warnings
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The current set of features include:
- Message generation
- Support for publishers and subscriptions
- Tunable QoS settings
- Clients and services

Lots of things are still missing however, see the [issue list](https://github.com/ros2-rust/ros2_rust/issues) for an overview. You are very welcome to [contribute](docs/CONTRIBUTING.md)!

Expand Down
30 changes: 30 additions & 0 deletions examples/minimal_client_service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "examples_rclrs_minimal_client_service"
version = "0.2.0"
authors = ["Esteve Fernandez <[email protected]>"]
edition = "2021"

[[bin]]
name = "minimal_client"
path = "src/minimal_client.rs"

[[bin]]
name = "minimal_client_async"
path = "src/minimal_client_async.rs"

[[bin]]
name = "minimal_service"
path = "src/minimal_service.rs"

[dependencies]
anyhow = {version = "1", features = ["backtrace"]}
tokio = { version = "1", features = ["macros", "rt", "rt-multi-thread", "time"] }

[dependencies.rclrs]
version = "*"

[dependencies.rosidl_runtime_rs]
version = "*"

[dependencies.example_interfaces]
version = "*"
23 changes: 23 additions & 0 deletions examples/minimal_client_service/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-model
href="http://download.ros.org/schema/package_format3.xsd"
schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>examples_rclrs_minimal_client_service</name>
<version>0.2.0</version>
<description>Package containing an example of the client-service mechanism in rclrs.</description>
<maintainer email="[email protected]">Esteve Fernandez</maintainer>
<license>Apache License 2.0</license>

<build_depend>example_interfaces</build_depend>
<build_depend>rclrs</build_depend>
<build_depend>rosidl_runtime_rs</build_depend>

<exec_depend>example_interfaces</exec_depend>
<exec_depend>rclrs</exec_depend>
<exec_depend>rosidl_runtime_rs</exec_depend>

<export>
<build_type>ament_cargo</build_type>
</export>
</package>
31 changes: 31 additions & 0 deletions examples/minimal_client_service/src/minimal_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use anyhow::{Error, Result};
use std::env;

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = context.create_node("minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };

println!("Starting client");

std::thread::sleep(std::time::Duration::from_millis(500));

client.async_send_request_with_callback(
&request,
move |response: example_interfaces::srv::AddTwoInts_Response| {
println!(
"Result of {} + {} is: {}",
request.a, request.b, response.sum
);
},
)?;

std::thread::sleep(std::time::Duration::from_millis(500));

println!("Waiting for response");
rclrs::spin(&node).map_err(|err| err.into())
}
32 changes: 32 additions & 0 deletions examples/minimal_client_service/src/minimal_client_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use anyhow::{Error, Result};
use std::env;

#[tokio::main]
async fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = context.create_node("minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

println!("Starting client");

std::thread::sleep(std::time::Duration::from_millis(500));

let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };

let future = client.call_async(&request);

println!("Waiting for response");

let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(&node));

let response = future.await?;
println!(
"Result of {} + {} is: {}",
request.a, request.b, response.sum
);

rclrs_spin.await.ok();
Ok(())
}
24 changes: 24 additions & 0 deletions examples/minimal_client_service/src/minimal_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use anyhow::{Error, Result};
use std::env;

fn handle_service(
_request_header: &rclrs::rmw_request_id_t,
request: example_interfaces::srv::AddTwoInts_Request,
) -> example_interfaces::srv::AddTwoInts_Response {
println!("request: {} + {}", request.a, request.b);
example_interfaces::srv::AddTwoInts_Response {
sum: request.a + request.b,
}
}

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let mut node = context.create_node("minimal_service")?;

let _server = node
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;

println!("Starting server");
rclrs::spin(&node).map_err(|err| err.into())
}
3 changes: 1 addition & 2 deletions examples/minimal_pub_sub/src/minimal_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::env;

use anyhow::{Error, Result};
use std::env;

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
Expand Down
2 changes: 2 additions & 0 deletions rclrs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ libc = "0.2.43"
parking_lot = "0.11.2"
# Needed for the Message trait, among others
rosidl_runtime_rs = "*"
# Needed for clients
futures = "0.3"

[dev-dependencies]
# Needed for e.g. writing yaml files in tests
Expand Down
35 changes: 32 additions & 3 deletions rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,61 @@ pub use wait::*;
use rcl_bindings::rcl_context_is_valid;
use std::time::Duration;

pub use rcl_bindings::rmw_request_id_t;

/// Polls the node for new messages and executes the corresponding callbacks.
///
/// See [`WaitSet::wait`] for the meaning of the `timeout` parameter.
///
/// This may under some circumstances return
/// [`SubscriptionTakeFailed`][1] when the wait set spuriously wakes up.
/// [`SubscriptionTakeFailed`][1], [`ClientTakeFailed`][1], [`ServiceTakeFailed`][1] when the wait
/// set spuriously wakes up.
/// This can usually be ignored.
///
/// [1]: crate::RclReturnCode
pub fn spin_once(node: &Node, timeout: Option<Duration>) -> Result<(), RclrsError> {
let live_subscriptions = node.live_subscriptions();
let live_clients = node.live_clients();
let live_services = node.live_services();
let ctx = Context {
rcl_context_mtx: node.rcl_context_mtx.clone(),
};
let mut wait_set = WaitSet::new(live_subscriptions.len(), &ctx)?;
let mut wait_set = WaitSet::new(
live_subscriptions.len(),
0,
0,
live_clients.len(),
live_services.len(),
0,
&ctx,
)?;

for live_subscription in &live_subscriptions {
wait_set.add_subscription(live_subscription.clone())?;
}

for live_client in &live_clients {
wait_set.add_client(live_client.clone())?;
}

for live_service in &live_services {
wait_set.add_service(live_service.clone())?;
}

let ready_entities = wait_set.wait(timeout)?;

for ready_subscription in ready_entities.subscriptions {
ready_subscription.execute()?;
}

for ready_client in ready_entities.clients {
ready_client.execute()?;
}

for ready_service in ready_entities.services {
ready_service.execute()?;
}

Ok(())
}

Expand All @@ -74,7 +104,6 @@ pub fn spin(node: &Node) -> Result<(), RclrsError> {
error => return error,
}
}

Ok(())
}

Expand Down
52 changes: 52 additions & 0 deletions rclrs/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
mod builder;
mod client;
mod publisher;
mod service;
mod subscription;
pub use self::builder::*;
pub use self::client::*;
pub use self::publisher::*;
pub use self::service::*;
pub use self::subscription::*;

use crate::rcl_bindings::*;
Expand Down Expand Up @@ -68,6 +72,8 @@ unsafe impl Send for rcl_node_t {}
pub struct Node {
rcl_node_mtx: Arc<Mutex<rcl_node_t>>,
pub(crate) rcl_context_mtx: Arc<Mutex<rcl_context_t>>,
pub(crate) clients: Vec<Weak<dyn ClientBase>>,
pub(crate) services: Vec<Weak<dyn ServiceBase>>,
pub(crate) subscriptions: Vec<Weak<dyn SubscriptionBase>>,
_parameter_map: ParameterOverrideMap,
}
Expand Down Expand Up @@ -174,6 +180,23 @@ impl Node {
unsafe { call_string_getter_with_handle(&*self.rcl_node_mtx.lock(), getter) }
}

/// Creates a [`Client`][1].
///
/// [1]: crate::Client
// TODO: make client's lifetime depend on node's lifetime
pub fn create_client<T>(
&mut self,
topic: &str,
) -> Result<Arc<crate::node::client::Client<T>>, RclrsError>
where
T: rosidl_runtime_rs::Service,
{
let client = Arc::new(crate::node::client::Client::<T>::new(self, topic)?);
self.clients
.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
Ok(client)
}

/// Creates a [`Publisher`][1].
///
/// [1]: crate::Publisher
Expand All @@ -189,6 +212,27 @@ impl Node {
Publisher::<T>::new(self, topic, qos)
}

/// Creates a [`Service`][1].
///
/// [1]: crate::Service
// TODO: make service's lifetime depend on node's lifetime
pub fn create_service<T, F>(
&mut self,
topic: &str,
callback: F,
) -> Result<Arc<crate::node::service::Service<T>>, RclrsError>
where
T: rosidl_runtime_rs::Service,
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
{
let service = Arc::new(crate::node::service::Service::<T>::new(
self, topic, callback,
)?);
self.services
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
Ok(service)
}

/// Creates a [`Subscription`][1].
///
/// [1]: crate::Subscription
Expand Down Expand Up @@ -217,6 +261,14 @@ impl Node {
.collect()
}

pub(crate) fn live_clients(&self) -> Vec<Arc<dyn ClientBase>> {
self.clients.iter().filter_map(Weak::upgrade).collect()
}

pub(crate) fn live_services(&self) -> Vec<Arc<dyn ServiceBase>> {
self.services.iter().filter_map(Weak::upgrade).collect()
}

/// Returns the ROS domain ID that the node is using.
///
/// The domain ID controls which nodes can send messages to each other, see the [ROS 2 concept article][1].
Expand Down
4 changes: 3 additions & 1 deletion rclrs/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ impl NodeBuilder {
Ok(Node {
rcl_node_mtx,
rcl_context_mtx: self.context.clone(),
subscriptions: std::vec![],
clients: vec![],
services: vec![],
subscriptions: vec![],
_parameter_map,
})
}
Expand Down
Loading

0 comments on commit 58dd04a

Please sign in to comment.