Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[21768] DDS data publication implementation #2

Closed
wants to merge 21 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Uncrustify
Signed-off-by: Juan Lopez Fernandez <juanlopez@eprosima.com>
  • Loading branch information
juanlofer-eprosima committed Jan 14, 2025
commit 85fb5a555db48723c3fa4be38a5ed4f89b0d5d53
2 changes: 1 addition & 1 deletion ddsenabler/src/cpp/dds_enabler_runner.cpp
Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ std::unique_ptr<eprosima::utils::event::FileWatcherHandler> create_filewatcher(
// WARNING: it is needed to pass file_path, as FileWatcher only retrieves file_name
std::function<void(std::string)> filewatcher_callback =
[&enabler, &file_path]
(std::string file_name)
(std::string file_name)
{
EPROSIMA_LOG_INFO(DDSENABLER_EXECUTION,
"FileWatcher notified changes in file " << file_path << ". Reloading configuration");
14 changes: 10 additions & 4 deletions ddsenabler_participants/src/cpp/CBHandler.cpp
Original file line number Diff line number Diff line change
@@ -181,7 +181,9 @@ bool CBHandler::get_serialized_data(
dyn_type = it->second.second;

fastdds::dds::DynamicData::_ref_type dyn_data;
if ((fastdds::dds::RETCODE_OK != fastdds::dds::json_deserialize(json, dyn_type, fastdds::dds::DynamicDataJsonFormat::EPROSIMA, dyn_data)) || !dyn_data)
if ((fastdds::dds::RETCODE_OK !=
fastdds::dds::json_deserialize(json, dyn_type, fastdds::dds::DynamicDataJsonFormat::EPROSIMA,
dyn_data)) || !dyn_data)
{
EPROSIMA_LOG_ERROR(DDSENABLER_CB_HANDLER,
"Failed to deserialize data for type " << type_name << " : json deserialization failed.");
@@ -190,7 +192,8 @@ bool CBHandler::get_serialized_data(

// TODO: double chekc XCDR2 is the right choice -> only supposed to work against fastdds 3, and appendable only working with XCDR2
fastdds::dds::DynamicPubSubType pubsub_type(dyn_type);
uint32_t payload_size = pubsub_type.calculate_serialized_size(&dyn_data, fastdds::dds::DataRepresentationId::XCDR2_DATA_REPRESENTATION);
uint32_t payload_size = pubsub_type.calculate_serialized_size(&dyn_data,
fastdds::dds::DataRepresentationId::XCDR2_DATA_REPRESENTATION);

if (!payload_pool_->get_payload(payload_size, payload))
{
@@ -271,11 +274,14 @@ bool CBHandler::register_type_nts_(
if (_type_name != type_name)
{
EPROSIMA_LOG_ERROR(DDSENABLER_CB_HANDLER,
"Unexpected dynamic types collection format: " << type_name << " expected to be last item, found " << _type_name << " instead.");
"Unexpected dynamic types collection format: " << type_name << " expected to be last item, found " << _type_name <<
" instead.");
return false;
}

fastdds::dds::DynamicType::_ref_type dyn_type = fastdds::dds::DynamicTypeBuilderFactory::get_instance()->create_type_w_type_object(_type_object)->build();
fastdds::dds::DynamicType::_ref_type dyn_type =
fastdds::dds::DynamicTypeBuilderFactory::get_instance()->create_type_w_type_object(_type_object)
->build();
if (!dyn_type)
{
EPROSIMA_LOG_ERROR(DDSENABLER_CB_HANDLER,
6 changes: 4 additions & 2 deletions ddsenabler_participants/src/cpp/CBWriter.cpp
Original file line number Diff line number Diff line change
@@ -65,7 +65,8 @@ void CBWriter::write_schema(
return;
}

std::unique_ptr<fastdds::rtps::SerializedPayload_t> types_collection_payload = serialize_dynamic_types(types_collection);
std::unique_ptr<fastdds::rtps::SerializedPayload_t> types_collection_payload = serialize_dynamic_types(
types_collection);
if (nullptr == types_collection_payload)
{
EPROSIMA_LOG_ERROR(DDSENABLER_CB_WRITER,
@@ -76,7 +77,8 @@ void CBWriter::write_schema(
std::stringstream ss_data_holder;
ss_data_holder << std::setw(4);
if (fastdds::dds::RETCODE_OK !=
fastdds::dds::json_serialize(fastdds::dds::DynamicDataFactory::get_instance()->create_data(dyn_type), fastdds::dds::DynamicDataJsonFormat::EPROSIMA, ss_data_holder))
fastdds::dds::json_serialize(fastdds::dds::DynamicDataFactory::get_instance()->create_data(dyn_type),
fastdds::dds::DynamicDataJsonFormat::EPROSIMA, ss_data_holder))
{
EPROSIMA_LOG_ERROR(DDSENABLER_CB_WRITER,
"Not able to generate data placeholder for type " << type_name << ".");
14 changes: 10 additions & 4 deletions ddsenabler_participants/src/cpp/EnablerParticipant.cpp
Original file line number Diff line number Diff line change
@@ -40,7 +40,8 @@ EnablerParticipant::EnablerParticipant(
std::shared_ptr<PayloadPool> payload_pool,
std::shared_ptr<DiscoveryDatabase> discovery_database,
std::shared_ptr<ISchemaHandler> schema_handler)
: ddspipe::participants::SchemaParticipant(participant_configuration, payload_pool, discovery_database, schema_handler)
: ddspipe::participants::SchemaParticipant(participant_configuration, payload_pool, discovery_database,
schema_handler)
{
}

@@ -78,7 +79,8 @@ bool EnablerParticipant::publish(
if (!topic_req_callback_)
{
EPROSIMA_LOG_ERROR(DDSENABLER_ENABLER_PARTICIPANT,
"Failed to publish data in topic " << topic_name << " : topic is unknown and topic request callback not set.");
"Failed to publish data in topic " << topic_name <<
" : topic is unknown and topic request callback not set.");
return false;
}

@@ -106,11 +108,15 @@ bool EnablerParticipant::publish(
this->discovery_database_->add_endpoint(rtps::CommonParticipant::simulate_endpoint(topic, this->id()));

// Wait for reader to be created from discovery thread
cv_.wait(lck, [&] { return nullptr != (reader = lookup_reader_nts_(topic_name)); }); // TODO: handle case when stopped before processing queue item
cv_.wait(lck, [&]
{
return nullptr != (reader = lookup_reader_nts_(topic_name));
}); // TODO: handle case when stopped before processing queue item

// (Optionally) wait for writer created in DDS participant to match with external readers, to avoid losing this
// message when not using transient durability
std::this_thread::sleep_for(std::chrono::milliseconds(std::static_pointer_cast<EnablerParticipantConfiguration>(configuration_)->initial_publish_wait));
std::this_thread::sleep_for(std::chrono::milliseconds(std::static_pointer_cast<EnablerParticipantConfiguration>(
configuration_)->initial_publish_wait));
}

auto data = std::make_unique<RtpsPayloadData>();
10 changes: 6 additions & 4 deletions ddsenabler_participants/src/cpp/serialization.cpp
Original file line number Diff line number Diff line change
@@ -163,13 +163,13 @@ std::string serialize_type_identifier(
const TypeIdentifier& type_identifier);

TypeIdentifier deserialize_type_identifier(
const std::string& typeid_str);
const std::string& typeid_str);

std::string serialize_type_object(
const TypeObject& type_object);

TypeObject deserialize_type_object(
const std::string& typeobj_str);
const std::string& typeobj_str);

bool serialize_dynamic_type(
const std::string& type_name,
@@ -207,7 +207,8 @@ bool serialize_dynamic_type(
dependency_type_identifier,
dependency_type_object))
{
EPROSIMA_LOG_WARNING(DDSENABLER_SERIALIZATION, "Error getting TypeObject of dependency " << "for type " << type_name);
EPROSIMA_LOG_WARNING(DDSENABLER_SERIALIZATION,
"Error getting TypeObject of dependency " << "for type " << type_name);
return false;
}

@@ -511,7 +512,8 @@ bool deserialize_dynamic_types(
DynamicTypesCollection& dynamic_types)
{
fastdds::dds::TypeSupport type_support(new DynamicTypesCollectionPubSubType());
fastdds::rtps::SerializedPayload_t serialized_payload = fastdds::rtps::SerializedPayload_t(dynamic_types_payload_size);
fastdds::rtps::SerializedPayload_t serialized_payload = fastdds::rtps::SerializedPayload_t(
dynamic_types_payload_size);
serialized_payload.length = dynamic_types_payload_size;
std::memcpy(
serialized_payload.data,
Original file line number Diff line number Diff line change
@@ -53,7 +53,8 @@ class EnablerConfiguration

DDSENABLER_YAML_DllAPI
EnablerConfiguration(
const std::string& file_path,bool is_json = true);
const std::string& file_path,
bool is_json = true);

DDSENABLER_YAML_DllAPI
EnablerConfiguration(
20 changes: 11 additions & 9 deletions ddsenabler_yaml/src/cpp/EnablerConfiguration.cpp
Original file line number Diff line number Diff line change
@@ -106,10 +106,11 @@ EnablerConfiguration::EnablerConfiguration(
const std::string& file_path,
bool is_json)
{
if(is_json)
if (is_json)
{
load_ddsenabler_configuration_from_json_file(file_path);
}else
}
else
{
load_ddsenabler_configuration_from_yaml_file(file_path);
}
@@ -216,7 +217,8 @@ void EnablerConfiguration::load_enabler_configuration(
// Get initial publish wait
if (YamlReader::is_tag_present(yml, ENABLER_INITIAL_PUBLISH_WAIT_TAG))
{
enabler_configuration->initial_publish_wait = YamlReader::get_nonnegative_int(yml, ENABLER_INITIAL_PUBLISH_WAIT_TAG);
enabler_configuration->initial_publish_wait = YamlReader::get_nonnegative_int(yml,
ENABLER_INITIAL_PUBLISH_WAIT_TAG);
}
}

@@ -351,7 +353,7 @@ void EnablerConfiguration::load_ddsenabler_configuration_from_json_file(
if (!file.is_open())
{
throw eprosima::utils::ConfigurationException(
utils::Formatter() << "Could not open JSON file");
utils::Formatter() << "Could not open JSON file");
}

// Parse the JSON file
@@ -372,22 +374,22 @@ void EnablerConfiguration::load_ddsenabler_configuration_from_json_file(
else
{
throw eprosima::utils::ConfigurationException(
utils::Formatter() <<
"\"ddsmodule\" not found or is not an object within \"dds\" in the JSON file");
utils::Formatter() <<
"\"ddsmodule\" not found or is not an object within \"dds\" in the JSON file");
}
}
else
{
throw eprosima::utils::ConfigurationException(
utils::Formatter() << "\"dds\" not found or is not an object in the JSON file");
utils::Formatter() << "\"dds\" not found or is not an object in the JSON file");
}

}
catch (const std::exception& e)
{
throw eprosima::utils::ConfigurationException(
utils::Formatter() << "Error loading DDS Enabler configuration from file: <" << file_path <<
"> :\n " << e.what());
utils::Formatter() << "Error loading DDS Enabler configuration from file: <" << file_path <<
"> :\n " << e.what());
}
}
else