Skip to content

Brokerless P2P [EXPERIMENTAL] #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: foxy
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ jobs:
- name: Download dependencies
run: |
git clone -b foxy https://github.com/eProsima/Micro-CDR src/Micro-CDR
git clone -b foxy https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
git clone -b feature/brokerless_p2p https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client
git clone -b foxy https://github.com/micro-ROS/rosidl_typesupport_microxrcedds src/rosidl_typesupport_microxrcedds
touch src/rosidl_typesupport_microxrcedds/test/COLCON_IGNORE

- name: Build
run: . /opt/ros/foxy/setup.sh && colcon build
run: . /opt/ros/foxy/setup.sh && colcon build --cmake-args -DUCLIENT_MAX_SESSION_CONNECTION_ATTEMPTS=1

- name: Test
run: |
Expand Down
6 changes: 3 additions & 3 deletions rmw_microxrcedds_c/src/callbacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void on_topic(
if ((custom_subscription->datareader_id.id == object_id.id) &&
(custom_subscription->datareader_id.type == object_id.type))
{
custom_subscription->micro_buffer_lenght[custom_subscription->history_write_index] = length;
custom_subscription->micro_buffer_length[custom_subscription->history_write_index] = length;
ucdr_deserialize_array_uint8_t(
ub,
custom_subscription->micro_buffer[custom_subscription->history_write_index], length);
Expand Down Expand Up @@ -89,7 +89,7 @@ void on_request(
while (service_item != NULL) {
rmw_uxrce_service_t * custom_service = (rmw_uxrce_service_t *)service_item->data;
if (custom_service->request_id == request_id) {
custom_service->micro_buffer_lenght[custom_service->history_write_index] = length;
custom_service->micro_buffer_length[custom_service->history_write_index] = length;
ucdr_deserialize_array_uint8_t(
ub,
custom_service->micro_buffer[custom_service->history_write_index], length);
Expand Down Expand Up @@ -132,7 +132,7 @@ void on_reply(
while (client_item != NULL) {
rmw_uxrce_client_t * custom_client = (rmw_uxrce_client_t *)client_item->data;
if (custom_client->request_id == request_id) {
custom_client->micro_buffer_lenght[custom_client->history_write_index] = length;
custom_client->micro_buffer_length[custom_client->history_write_index] = length;
ucdr_deserialize_array_uint8_t(
ub,
custom_client->micro_buffer[custom_client->history_write_index], length);
Expand Down
12 changes: 12 additions & 0 deletions rmw_microxrcedds_c/src/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@ struct rmw_uxrce_mempool_item_t * get_memory(struct rmw_uxrce_mempool_t * mem)
return item;
}

bool check_already_freed(struct rmw_uxrce_mempool_t * mem, struct rmw_uxrce_mempool_item_t * item){

struct rmw_uxrce_mempool_item_t * i = mem->freeitems;
while (i != NULL) {
if (i == item){
return true;
}
i = i->next;
}
return false;
}

void put_memory(struct rmw_uxrce_mempool_t * mem, struct rmw_uxrce_mempool_item_t * item)
{
// Gets item from allocated pool
Expand Down
1 change: 1 addition & 0 deletions rmw_microxrcedds_c/src/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void set_mem_pool(struct rmw_uxrce_mempool_t * mem, struct rmw_uxrce_mempool_ite
bool has_memory(struct rmw_uxrce_mempool_t * mem);
struct rmw_uxrce_mempool_item_t * get_memory(struct rmw_uxrce_mempool_t * mem);
void put_memory(struct rmw_uxrce_mempool_t * mem, struct rmw_uxrce_mempool_item_t * item);
bool check_already_freed(struct rmw_uxrce_mempool_t * mem, struct rmw_uxrce_mempool_item_t * item);
void free_mem_pool(struct rmw_uxrce_mempool_t * mem);

#endif // MEMORY_H_
36 changes: 25 additions & 11 deletions rmw_microxrcedds_c/src/rmw_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,24 @@ rmw_create_client(
}
client_req = uxr_buffer_create_requester_xml(
&custom_node->context->session,
custom_node->context->reliable_output, custom_client->client_id,
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
*custom_node->context->entity_creation_output,
custom_client->client_id,
custom_node->participant_id,
rmw_uxrce_xml_buffer,
UXR_REPLACE);
#elif defined(MICRO_XRCEDDS_USE_REFS)
// TODO(pablogs9): Is possible to instantiate a replier by ref?
// client_req = uxr_buffer_create_replier_ref(&custom_node->context->session,
// custom_node->context->reliable_output, custom_service->subscriber_id,
// custom_node->participant_id, "", UXR_REPLACE);
if (!build_requester_profile(service_name, rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) {
RMW_SET_ERROR_MSG("failed to generate ref request for client creation");
goto fail;
}

client_req = uxr_buffer_create_requester_ref(
&custom_node->context->session,
*custom_node->context->entity_creation_output,
custom_client->client_id,
custom_node->participant_id,
rmw_uxrce_profile_name,
UXR_REPLACE);
#endif

rmw_client->data = custom_client;
Expand All @@ -144,7 +155,7 @@ rmw_create_client(
&custom_node->context->session, 1000, requests,
status, 1))
{
RMW_SET_ERROR_MSG("Issues creating Micro XRCE-DDS entities");
RMW_SET_ERROR_MSG("Issues creating Requester Micro XRCE-DDS entities");
put_memory(&client_memory, &custom_client->mem);
goto fail;
}
Expand All @@ -162,8 +173,10 @@ rmw_create_client(

custom_client->request_id = uxr_buffer_request_data(
&custom_node->context->session,
custom_node->context->reliable_output, custom_client->client_id,
custom_client->stream_id, &delivery_control);
*custom_node->context->entity_creation_output,
custom_client->client_id,
custom_client->stream_id,
&delivery_control);
}
return rmw_client;

Expand Down Expand Up @@ -203,7 +216,8 @@ rmw_destroy_client(
rmw_uxrce_client_t * custom_client = (rmw_uxrce_client_t *)client->data;
uint16_t delete_client =
uxr_buffer_delete_entity(
&custom_node->context->session, custom_node->context->reliable_output,
&custom_node->context->session,
*custom_node->context->entity_creation_output,
custom_client->client_id);

uint16_t requests[] = {delete_client};
Expand All @@ -212,7 +226,7 @@ rmw_destroy_client(
&custom_node->context->session, 1000, requests, status,
sizeof(status)))
{
RMW_SET_ERROR_MSG("unable to remove client from the server");
RMW_SET_ERROR_MSG("Unable to remove client from the server");
result_ret = RMW_RET_ERROR;
} else {
rmw_uxrce_fini_client_memory(client);
Expand Down
29 changes: 20 additions & 9 deletions rmw_microxrcedds_c/src/rmw_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@
#include <termios.h>
#endif

#ifdef UCLIENT_PROFILE_BROKERLESS
#include <uxr/client/brokerless/brokerless.h>
#endif

#if defined(MICRO_XRCEDDS_SERIAL) || defined(MICRO_XRCEDDS_CUSTOM_SERIAL)
#define CLOSE_TRANSPORT(transport) uxr_close_serial_transport(transport)
#elif defined(MICRO_XRCEDDS_UDP)
#elif defined(MICRO_XRCEDDS_UDP) && !defined(UCLIENT_PROFILE_BROKERLESS)
#define CLOSE_TRANSPORT(transport) uxr_close_udp_transport(transport)
#else
#define CLOSE_TRANSPORT(transport)
Expand Down Expand Up @@ -245,8 +249,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
}
printf("Serial mode => dev: %s\n", context_impl->connection_params.serial_device);

#elif defined(MICRO_XRCEDDS_UDP)
// TODO(Borja) Think how we are going to select transport to use
#elif defined(MICRO_XRCEDDS_UDP) && !defined(UCLIENT_PROFILE_BROKERLESS)
#ifdef MICRO_XRCEDDS_IPV4
uxrIpProtocol ip_protocol = UXR_IPv4;
#elif defined(MICRO_XRCEDDS_IPV6)
Expand All @@ -263,6 +266,11 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
printf(
"UDP mode => ip: %s - port: %s\n", context_impl->connection_params.agent_address,
context_impl->connection_params.agent_port);
#elif defined(MICRO_XRCEDDS_UDP) && defined(UCLIENT_PROFILE_BROKERLESS)

context_impl->transport.comm = brokerless_comm_stub;

printf("UDP mode => Brokerless P2P\n");
#elif defined(MICRO_XRCEDDS_CUSTOM_SERIAL)
int pseudo_fd = 0;
if (strlen(options->impl->connection_params.serial_device) > 0) {
Expand All @@ -289,23 +297,26 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context)

context_impl->reliable_input = uxr_create_input_reliable_stream(
&context_impl->session, context_impl->input_reliable_stream_buffer,
context_impl->transport.comm.mtu * RMW_UXRCE_STREAM_HISTORY, RMW_UXRCE_STREAM_HISTORY);
context_impl->reliable_output =
uxr_create_output_reliable_stream(
RMW_UXRCE_MAX_BUFFER_SIZE, RMW_UXRCE_STREAM_HISTORY);
context_impl->reliable_output = uxr_create_output_reliable_stream(
&context_impl->session, context_impl->output_reliable_stream_buffer,
context_impl->transport.comm.mtu * RMW_UXRCE_STREAM_HISTORY, RMW_UXRCE_STREAM_HISTORY);
RMW_UXRCE_MAX_BUFFER_SIZE, RMW_UXRCE_STREAM_HISTORY);

context_impl->best_effort_input = uxr_create_input_best_effort_stream(&context_impl->session);
context_impl->best_effort_output = uxr_create_output_best_effort_stream(
&context_impl->session,
context_impl->output_best_effort_stream_buffer, context_impl->transport.comm.mtu);

context_impl->output_best_effort_stream_buffer, RMW_UXRCE_MAX_TRANSPORT_MTU);

#ifndef UCLIENT_PROFILE_BROKERLESS
context_impl->entity_creation_output = &context_impl->reliable_output;
if (!uxr_create_session(&context_impl->session)) {
CLOSE_TRANSPORT(&context_impl->transport);
RMW_SET_ERROR_MSG("failed to create node session on Micro ROS Agent.");
return RMW_RET_ERROR;
}
#else
context_impl->entity_creation_output = &context_impl->best_effort_output;
#endif

return RMW_RET_OK;
}
Expand Down
48 changes: 33 additions & 15 deletions rmw_microxrcedds_c/src/rmw_microxrcedds_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include "./utils.h"


rmw_uxrce_topic_t *
create_topic(
struct rmw_uxrce_node_t * custom_node,
Expand All @@ -34,7 +33,6 @@ create_topic(
RMW_SET_ERROR_MSG("Not available memory node");
goto fail;
}

rmw_uxrce_topic_t * custom_topic = (rmw_uxrce_topic_t *)memory_node->data;


Expand Down Expand Up @@ -63,8 +61,11 @@ create_topic(

topic_req = uxr_buffer_create_topic_xml(
&custom_node->context->session,
custom_node->context->reliable_output, custom_topic->topic_id,
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
*custom_node->context->entity_creation_output,
custom_topic->topic_id,
custom_node->participant_id,
rmw_uxrce_xml_buffer,
UXR_REPLACE);
#elif defined(MICRO_XRCEDDS_USE_REFS)
(void)qos_policies;
if (!build_topic_profile(topic_name, rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) {
Expand All @@ -76,18 +77,23 @@ create_topic(

topic_req = uxr_buffer_create_topic_ref(
&custom_node->context->session,
custom_node->context->reliable_output, custom_topic->topic_id,
custom_node->participant_id, rmw_uxrce_profile_name, UXR_REPLACE);
*custom_node->context->entity_creation_output,
custom_topic->topic_id,
custom_node->participant_id,
rmw_uxrce_profile_name,
UXR_REPLACE);
#endif

// Send the request and wait for response
uint8_t status;
custom_topic->sync_with_agent =
uxr_run_session_until_all_status(
&custom_node->context->session, 1000, &topic_req,
&status, 1);
if (!custom_topic->sync_with_agent) {
RMW_SET_ERROR_MSG("Issues creating micro XRCE-DDS entities");

if (UXR_BEST_EFFORT_STREAM == custom_node->context->entity_creation_output->type)
{
uxr_flash_output_streams(&custom_node->context->session);
}
else if(!(custom_topic->sync_with_agent = uxr_run_session_until_all_status(&custom_node->context->session, 1000, &topic_req, &status, 1)))
{
RMW_SET_ERROR_MSG("Issues creating Topic Micro XRCE-DDS entities");
rmw_uxrce_fini_topic_memory(custom_topic);
custom_topic = NULL;
goto fail;
Expand All @@ -101,18 +107,30 @@ rmw_ret_t destroy_topic(rmw_uxrce_topic_t * topic)
{
rmw_ret_t result_ret = RMW_RET_OK;

// TODO (pablogs9): Implement this in all destroy functions;
if (check_already_freed(&topics_memory, &topic->mem))
{
return RMW_RET_ERROR;
}

uint16_t delete_topic = uxr_buffer_delete_entity(
&topic->owner_node->context->session,
topic->owner_node->context->reliable_output,
*topic->owner_node->context->entity_creation_output,
topic->topic_id);

uint16_t requests[] = {delete_topic};
uint8_t status[1];
if (!uxr_run_session_until_all_status(

if (UXR_BEST_EFFORT_STREAM == topic->owner_node->context->entity_creation_output->type)
{
uxr_flash_output_streams(&topic->owner_node->context->session);
rmw_uxrce_fini_topic_memory(topic);
}
else if (!uxr_run_session_until_all_status(
&topic->owner_node->context->session, 1000, requests,
status, 1))
{
RMW_SET_ERROR_MSG("unable to remove topic from the server");
RMW_SET_ERROR_MSG("Unable to remove topic from the server");
result_ret = RMW_RET_ERROR;
} else {
rmw_uxrce_fini_topic_memory(topic);
Expand Down
34 changes: 24 additions & 10 deletions rmw_microxrcedds_c/src/rmw_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ rmw_node_t * create_node(
participant_req =
uxr_buffer_create_participant_xml(
&node_info->context->session,
node_info->context->reliable_output,
node_info->participant_id, (uint16_t)domain_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
*node_info->context->entity_creation_output,
node_info->participant_id,
(uint16_t)domain_id,
rmw_uxrce_xml_buffer,
UXR_REPLACE);
#elif defined(MICRO_XRCEDDS_USE_REFS)
if (!build_participant_profile(rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) {
RMW_SET_ERROR_MSG("failed to generate xml request for node creation");
Expand All @@ -95,15 +98,24 @@ rmw_node_t * create_node(
participant_req =
uxr_buffer_create_participant_ref(
&node_info->context->session,
node_info->context->reliable_output,
node_info->participant_id, (uint16_t)domain_id, rmw_uxrce_profile_name, UXR_REPLACE);
*custom_node->context->entity_creation_output,
node_info->participant_id,
(uint16_t)domain_id,
rmw_uxrce_profile_name,
UXR_REPLACE);
#endif

uint8_t status[1];
uint16_t requests[] = {participant_req};

if (!uxr_run_session_until_all_status(&node_info->context->session, 1000, requests, status, 1)) {
if (UXR_BEST_EFFORT_STREAM == node_info->context->entity_creation_output->type)
{
uxr_flash_output_streams(&node_info->context->session);
}
else if(!uxr_run_session_until_all_status(&node_info->context->session, 1000, requests, status, 1))
{
rmw_uxrce_fini_node_memory(node_handle);
RMW_SET_ERROR_MSG("Issues creating micro XRCE-DDS entities");
RMW_SET_ERROR_MSG("Issues creating Participant Micro XRCE-DDS entities");
return NULL;
}

Expand Down Expand Up @@ -201,14 +213,16 @@ rmw_ret_t rmw_destroy_node(rmw_node_t * node)

uint16_t participant_req = uxr_buffer_delete_entity(
&custom_node->context->session,
custom_node->context->reliable_output,
*custom_node->context->entity_creation_output,
custom_node->participant_id);
uint8_t status[1];
uint16_t requests[] = {participant_req};

if (!uxr_run_session_until_all_status(
&custom_node->context->session, 1000, requests, status,
1))
if (UXR_BEST_EFFORT_STREAM == custom_node->context->entity_creation_output->type)
{
uxr_flash_output_streams(&custom_node->context->session);
}
else if(!uxr_run_session_until_all_status(&custom_node->context->session, 1000, requests, status,1))
{
ret = RMW_RET_ERROR;
}
Expand Down
2 changes: 1 addition & 1 deletion rmw_microxrcedds_c/src/rmw_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ rmw_publish(
{
written = functions->cdr_serialize(ros_message, &mb);

if (UXR_BEST_EFFORT_STREAM == custom_publisher->stream_id.type) {
if (written && UXR_BEST_EFFORT_STREAM == custom_publisher->stream_id.type) {
uxr_flash_output_streams(&custom_publisher->owner_node->context->session);
} else {
written &= uxr_run_session_until_confirm_delivery(
Expand Down
Loading