Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 104 additions & 76 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ class ClientBase
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ClientBase);

RCLCPP_PUBLIC
ClientBase(
std::shared_ptr<rcl_node_t> node_handle,
const std::string & service_name);
ClientBase(const std::string & service_name);

RCLCPP_PUBLIC
virtual ~ClientBase();
Expand All @@ -61,86 +59,44 @@ class ClientBase
const rcl_client_t *
get_client_handle() const;

virtual void handle_response(std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<void> response) = 0;
virtual std::shared_ptr<void> create_response() = 0;
virtual std::shared_ptr<rmw_request_id_t> create_request_header() = 0;
virtual void handle_response(
std::shared_ptr<rmw_request_id_t> request_header, std::shared_ptr<void> response) = 0;

protected:
RCLCPP_DISABLE_COPY(ClientBase);

std::shared_ptr<rcl_node_t> node_handle_;

rcl_client_t client_handle_ = rcl_get_zero_initialized_client();
std::string service_name_;
};

template<typename ServiceT>
class Client : public ClientBase
template<typename RequestT, typename ResponseT>
class ClientPattern
{
public:
using SharedRequest = typename ServiceT::Request::SharedPtr;
using SharedResponse = typename ServiceT::Response::SharedPtr;
RCLCPP_SMART_PTR_DEFINITIONS(ClientPattern);
using SharedRequest = std::shared_ptr<RequestT>;
using SharedResponse = std::shared_ptr<ResponseT>;

using Promise = std::promise<SharedResponse>;
using PromiseWithRequest = std::promise<std::pair<SharedRequest, SharedResponse>>;
using Promise = std::promise<ResponseT>;
using PromiseWithRequest = std::promise<std::pair<RequestT, ResponseT>>;

using SharedPromise = std::shared_ptr<Promise>;
using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;

using SharedFuture = std::shared_future<SharedResponse>;
using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;
using SharedFuture = std::shared_future<ResponseT>;
using SharedFutureWithRequest = std::shared_future<std::pair<RequestT, ResponseT>>;

using CallbackType = std::function<void(SharedFuture)>;
using CallbackWithRequestType = std::function<void(SharedFutureWithRequest)>;

RCLCPP_SMART_PTR_DEFINITIONS(Client);
using SendRequestFunctionT = std::function<void(const RequestT &, int64_t &)>;

Client(
std::shared_ptr<rcl_node_t> node_handle,
const std::string & service_name,
rcl_client_options_t & client_options)
: ClientBase(node_handle, service_name)
{
using rosidl_generator_cpp::get_service_type_support_handle;
auto service_type_support_handle =
get_service_type_support_handle<ServiceT>();
if (rcl_client_init(&client_handle_, this->node_handle_.get(),
service_type_support_handle, service_name.c_str(), &client_options) != RCL_RET_OK)
{
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create client: ") +
rcl_get_error_string_safe());
// *INDENT-ON*
}
}

virtual ~Client()
{
if (rcl_client_fini(&client_handle_, node_handle_.get()) != RCL_RET_OK) {
fprintf(stderr,
"Error in destruction of rmw client handle: %s\n", rmw_get_error_string_safe());
}
}

std::shared_ptr<void> create_response()
{
return std::shared_ptr<void>(new typename ServiceT::Response());
}

std::shared_ptr<rmw_request_id_t> create_request_header()
{
// TODO(wjwwood): This should probably use rmw_request_id's allocator.
// (since it is a C type)
return std::shared_ptr<rmw_request_id_t>(new rmw_request_id_t);
}

void handle_response(std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<void> response)
virtual void handle_response(std::shared_ptr<rmw_request_id_t> request_header,
ResponseT & response)
{
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
int64_t sequence_number = request_header->sequence_number;
// TODO(esteve) this should throw instead since it is not expected to happen in the first place
if (this->pending_requests_.count(sequence_number) == 0) {
Expand All @@ -152,15 +108,10 @@ class Client : public ClientBase
auto callback = std::get<1>(tuple);
auto future = std::get<2>(tuple);
this->pending_requests_.erase(sequence_number);
call_promise->set_value(typed_response);
call_promise->set_value(response);
callback(future);
}

SharedFuture async_send_request(SharedRequest request)
{
return async_send_request(request, [](SharedFuture) {});
}

template<
typename CallbackT,
typename std::enable_if<
Expand All @@ -170,17 +121,11 @@ class Client : public ClientBase
>::value
>::type * = nullptr
>
SharedFuture async_send_request(SharedRequest request, CallbackT && cb)
SharedFuture async_send_request(const RequestT & request, CallbackT && cb)
{
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
int64_t sequence_number;
if (RCL_RET_OK != rcl_send_request(get_client_handle(), request.get(), &sequence_number)) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to send request: ") + rcl_get_error_string_safe());
// *INDENT-ON*
}

send_request_function_(request, sequence_number);
SharedPromise call_promise = std::make_shared<Promise>();
SharedFuture f(call_promise->get_future());
pending_requests_[sequence_number] =
Expand All @@ -197,7 +142,7 @@ class Client : public ClientBase
>::value
>::type * = nullptr
>
SharedFutureWithRequest async_send_request(SharedRequest request, CallbackT && cb)
SharedFutureWithRequest async_send_request(const RequestT & request, CallbackT && cb)
{
SharedPromiseWithRequest promise = std::make_shared<PromiseWithRequest>();
SharedFutureWithRequest future_with_request(promise->get_future());
Expand All @@ -213,13 +158,96 @@ class Client : public ClientBase
return future_with_request;
}

private:
RCLCPP_DISABLE_COPY(Client);
SharedFuture async_send_request(const RequestT & request)
{
return async_send_request(request, [](SharedFuture) {});
}

virtual void set_send_request_function(SendRequestFunctionT && fn)
{
send_request_function_ = fn;
}
protected:
SendRequestFunctionT send_request_function_;

private:
std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_;
std::mutex pending_requests_mutex_;
};

template<typename ServiceT>
class Client : public ClientPattern<typename ServiceT::Request::SharedPtr, typename ServiceT::Response::SharedPtr>,
public ClientBase
{
using ClientPatternT = ClientPattern<typename ServiceT::Request::SharedPtr, typename ServiceT::Response::SharedPtr>;
public:
RCLCPP_SMART_PTR_DEFINITIONS(Client);
using SharedRequest = std::shared_ptr<typename ServiceT::Request>;

Client(
std::shared_ptr<rcl_node_t> node_handle,
const std::string & service_name,
rcl_client_options_t & client_options)
: ClientBase(service_name), node_handle_(node_handle)
{
using rosidl_generator_cpp::get_service_type_support_handle;
auto service_type_support_handle =
get_service_type_support_handle<ServiceT>();
if (rcl_client_init(&client_handle_, this->node_handle_.get(),
service_type_support_handle, service_name.c_str(), &client_options) != RCL_RET_OK)
{
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create client: ") +
rcl_get_error_string_safe());
// *INDENT-ON*
}

this->set_send_request_function([this](SharedRequest request, int64_t & sequence_number)
{
if (RCL_RET_OK != rcl_send_request(get_client_handle(), request.get(), &sequence_number)) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to send request: ") + rcl_get_error_string_safe());
// *INDENT-ON*
}
});
}

virtual ~Client()
{
if (rcl_client_fini(&client_handle_, node_handle_.get()) != RCL_RET_OK) {
fprintf(stderr,
"Error in destruction of rmw client handle: %s\n", rmw_get_error_string_safe());
}
}

std::shared_ptr<void> create_response()
{
return std::shared_ptr<void>(new typename ServiceT::Response());
}

virtual void handle_response(std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<void> response)
{
auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(response);
ClientPatternT::handle_response(request_header, typed_response);
}

virtual std::shared_ptr<rmw_request_id_t> create_request_header()
{
// TODO(wjwwood): This should probably use rmw_request_id's allocator.
// (since it is a C type)
return std::shared_ptr<rmw_request_id_t>(new rmw_request_id_t);
}

private:
RCLCPP_DISABLE_COPY(Client);
std::shared_ptr<rcl_node_t> node_handle_;

};


} // namespace client
} // namespace rclcpp

Expand Down
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ class Node
RCLCPP_PUBLIC
const rcl_guard_condition_t * get_notify_guard_condition() const;

RCLCPP_PUBLIC
const rcl_node_t * get_rcl_handle() const;

std::atomic_bool has_executor;

private:
Expand Down
Loading