Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 54 additions & 34 deletions src/core/ddsi/src/ddsi_discovery.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,49 +113,69 @@ bool ddsi_handle_sedp_checks (struct ddsi_domaingv * const gv, ddsi_sedp_kind_t
#undef E
}

static void ddsi_handle_sedp (const struct ddsi_receiver_state *rst, ddsi_seqno_t seq, struct ddsi_serdata *serdata, ddsi_sedp_kind_t sedp_kind)
static void ddsi_handle_sedp_endpoint (const struct ddsi_receiver_state *rst, ddsi_seqno_t seq, struct ddsi_serdata *serdata, ddsi_sedp_kind_t sedp_kind)
{
ddsi_plist_t decoded_data;
if (ddsi_serdata_to_sample (serdata, &decoded_data, NULL, NULL))
{
struct ddsi_domaingv * const gv = rst->gv;
GVLOGDISC ("SEDP ST%"PRIx32, serdata->statusinfo);
switch (serdata->statusinfo & (DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER))

// GUID prefixes should match
if (memcmp (&decoded_data.endpoint_guid.prefix, &rst->src_guid_prefix, sizeof (rst->src_guid_prefix)) != 0)
{
case 0:
switch (sedp_kind)
{
case SEDP_KIND_TOPIC:
#ifdef DDS_HAS_TOPIC_DISCOVERY
ddsi_handle_sedp_alive_topic (rst, seq, &decoded_data, rst->vendor, serdata->timestamp);
#endif
break;
case SEDP_KIND_READER:
case SEDP_KIND_WRITER:
ddsi_handle_sedp_alive_endpoint (rst, seq, &decoded_data, sedp_kind, rst->vendor, serdata->timestamp);
break;
}
break;
case DDSI_STATUSINFO_DISPOSE:
case DDSI_STATUSINFO_UNREGISTER:
case (DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER):
switch (sedp_kind)
{
case SEDP_KIND_TOPIC:
GVTRACE (" "PGUIDFMT": mismatch with RTPS source "PGUIDPREFIXFMT"\n", PGUID (decoded_data.endpoint_guid), PGUIDPREFIX(rst->src_guid_prefix));
}
else
{
switch (serdata->statusinfo & (DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER))
{
case 0:
ddsi_handle_sedp_alive_endpoint (rst, seq, &decoded_data, sedp_kind, rst->vendor, serdata->timestamp);
break;
case DDSI_STATUSINFO_DISPOSE:
case DDSI_STATUSINFO_UNREGISTER:
case (DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER):
ddsi_handle_sedp_dead_endpoint (rst, &decoded_data, sedp_kind, serdata->timestamp);
break;
}
}
ddsi_plist_fini (&decoded_data);
}
}

#ifdef DDS_HAS_TOPIC_DISCOVERY
ddsi_handle_sedp_dead_topic (rst, &decoded_data, serdata->timestamp);
#endif
break;
case SEDP_KIND_READER:
case SEDP_KIND_WRITER:
ddsi_handle_sedp_dead_endpoint (rst, &decoded_data, sedp_kind, serdata->timestamp);
break;
}
break;
static void ddsi_handle_sedp_topic (const struct ddsi_receiver_state *rst, ddsi_seqno_t seq, struct ddsi_serdata *serdata)
{
ddsi_plist_t decoded_data;
if (ddsi_serdata_to_sample (serdata, &decoded_data, NULL, NULL))
{
struct ddsi_domaingv * const gv = rst->gv;
GVLOGDISC ("SEDP ST%"PRIx32, serdata->statusinfo);

// GUID prefixes should match
if (memcmp (&decoded_data.topic_guid.prefix, &rst->src_guid_prefix, sizeof (rst->src_guid_prefix)) != 0)
{
GVTRACE (" "PGUIDFMT": mismatch with RTPS source "PGUIDPREFIXFMT"\n", PGUID (decoded_data.topic_guid), PGUIDPREFIX(rst->src_guid_prefix));
}
else
{
switch (serdata->statusinfo & (DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER))
{
case 0:
ddsi_handle_sedp_alive_topic (rst, seq, &decoded_data, rst->vendor, serdata->timestamp);
break;
case DDSI_STATUSINFO_DISPOSE:
case DDSI_STATUSINFO_UNREGISTER:
case (DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER):
ddsi_handle_sedp_dead_topic (rst, &decoded_data, serdata->timestamp);
break;
}
}
ddsi_plist_fini (&decoded_data);
}
}
#endif

#ifdef DDS_HAS_TYPE_DISCOVERY
static void handle_typelookup (const struct ddsi_receiver_state *rst, ddsi_entityid_t wr_entity_id, struct ddsi_serdata *serdata)
Expand Down Expand Up @@ -371,15 +391,15 @@ int ddsi_builtins_dqueue_handler (const struct ddsi_rsample_info *sampleinfo, co
break;
case DDSI_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER:
case DDSI_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER:
ddsi_handle_sedp (sampleinfo->rst, sampleinfo->seq, d, SEDP_KIND_WRITER);
ddsi_handle_sedp_endpoint (sampleinfo->rst, sampleinfo->seq, d, SEDP_KIND_WRITER);
break;
case DDSI_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER:
case DDSI_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER:
ddsi_handle_sedp (sampleinfo->rst, sampleinfo->seq, d, SEDP_KIND_READER);
ddsi_handle_sedp_endpoint (sampleinfo->rst, sampleinfo->seq, d, SEDP_KIND_READER);
break;
#ifdef DDS_HAS_TOPIC_DISCOVERY
case DDSI_ENTITYID_SEDP_BUILTIN_TOPIC_WRITER:
ddsi_handle_sedp (sampleinfo->rst, sampleinfo->seq, d, SEDP_KIND_TOPIC);
ddsi_handle_sedp_topic (sampleinfo->rst, sampleinfo->seq, d);
break;
#endif
case DDSI_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER:
Expand Down
29 changes: 18 additions & 11 deletions src/core/ddsi/src/ddsi_discovery_spdp.c
Original file line number Diff line number Diff line change
Expand Up @@ -664,18 +664,25 @@ void ddsi_handle_spdp (const struct ddsi_receiver_state *rst, ddsi_entityid_t pw
if (ddsi_serdata_to_sample (serdata, &decoded_data, NULL, NULL))
{
enum handle_spdp_result interesting = HSR_NOT_INTERESTING;
switch (serdata->statusinfo & (DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER))
if (memcmp (&decoded_data.participant_guid.prefix, &rst->src_guid_prefix, sizeof (rst->src_guid_prefix)) != 0)
{
case 0:
interesting = handle_spdp_alive (rst, seq, serdata->timestamp, &decoded_data);
break;

case DDSI_STATUSINFO_DISPOSE:
case DDSI_STATUSINFO_UNREGISTER:
case (DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER):
handle_spdp_dead (rst, pwr_entityid, serdata->timestamp, &decoded_data, serdata->statusinfo);
interesting = HSR_INTERESTING;
break;
GVTRACE ("SPDP ST%x "PGUIDFMT": mismatch with RTPS source "PGUIDPREFIXFMT, serdata->statusinfo, PGUID (decoded_data.participant_guid), PGUIDPREFIX(rst->src_guid_prefix));
}
else
{
switch (serdata->statusinfo & (DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER))
{
case 0:
interesting = handle_spdp_alive (rst, seq, serdata->timestamp, &decoded_data);
break;

case DDSI_STATUSINFO_DISPOSE:
case DDSI_STATUSINFO_UNREGISTER:
case (DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER):
handle_spdp_dead (rst, pwr_entityid, serdata->timestamp, &decoded_data, serdata->statusinfo);
interesting = HSR_INTERESTING;
break;
}
}

ddsi_plist_fini (&decoded_data);
Expand Down
45 changes: 23 additions & 22 deletions src/core/ddsi/src/ddsi_pmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,40 +87,41 @@ void ddsi_handle_pmd_message (const struct ddsi_receiver_state *rst, struct ddsi
{
/* use sample with knowledge of internal representation: there's a deserialized sample inside already */
const struct ddsi_serdata_pserop *sample = (const struct ddsi_serdata_pserop *) sample_common;
struct ddsi_proxy_participant *proxypp;
ddsi_guid_t ppguid;
struct ddsi_lease *l;
RSTTRACE (" PMD ST%"PRIx32, sample->c.statusinfo);
ddsi_participant_message_data_t const * const pmd = sample->sample;
RSTTRACE (" PMD ST%"PRIx32" pp %"PRIx32":%"PRIx32":%"PRIx32" kind %"PRIu32, sample->c.statusinfo, PGUIDPREFIX (pmd->participantGuidPrefix), pmd->kind);
if (memcmp (&pmd->participantGuidPrefix, &rst->src_guid_prefix, sizeof (rst->src_guid_prefix)) != 0)
{
RSTTRACE (" : mismatch with RTPS source "PGUIDPREFIXFMT"\n", PGUIDPREFIX(rst->src_guid_prefix));
return;
}
const ddsi_guid_t ppguid = { .prefix = pmd->participantGuidPrefix, .entityid = { .u = DDSI_ENTITYID_PARTICIPANT } };
struct ddsi_proxy_participant * const proxypp = ddsi_entidx_lookup_proxy_participant_guid (rst->gv->entity_index, &ppguid);
if (proxypp == NULL)
{
RSTTRACE (": unknown proxy participant\n");
return;
}
switch (sample->c.statusinfo & (DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER))
{
case 0: {
const ddsi_participant_message_data_t *pmd = sample->sample;
RSTTRACE (" pp %"PRIx32":%"PRIx32":%"PRIx32" kind %"PRIu32" data %"PRIu32, PGUIDPREFIX (pmd->participantGuidPrefix), pmd->kind, pmd->value.length);
ppguid.prefix = pmd->participantGuidPrefix;
ppguid.entityid.u = DDSI_ENTITYID_PARTICIPANT;
if ((proxypp = ddsi_entidx_lookup_proxy_participant_guid (rst->gv->entity_index, &ppguid)) == NULL)
RSTTRACE (" PPunknown");
else if (pmd->kind == DDSI_PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE &&
(l = ddsrt_atomic_ldvoidp (&proxypp->minl_man)) != NULL)
case 0:
RSTTRACE (" data %"PRIu32, pmd->value.length);
if (pmd->kind == DDSI_PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE)
{
/* Renew lease for entity with shortest manual-by-participant lease */
ddsi_lease_renew (l, ddsrt_time_elapsed ());
struct ddsi_lease * const l = ddsrt_atomic_ldvoidp (&proxypp->minl_man);
if (l != NULL)
ddsi_lease_renew (l, ddsrt_time_elapsed ());
}
break;
}

case DDSI_STATUSINFO_DISPOSE:
case DDSI_STATUSINFO_UNREGISTER:
case DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER: {
const ddsi_participant_message_data_t *pmd = sample->sample;
ppguid.prefix = pmd->participantGuidPrefix;
ppguid.entityid.u = DDSI_ENTITYID_PARTICIPANT;
case DDSI_STATUSINFO_DISPOSE | DDSI_STATUSINFO_UNREGISTER:
if (ddsi_delete_proxy_participant_by_guid (rst->gv, &ppguid, sample->c.timestamp, false) < 0)
RSTTRACE (" unknown");
RSTTRACE (": unknown");
else
RSTTRACE (" delete");
RSTTRACE (": delete");
break;
}
}
RSTTRACE ("\n");
}
Expand Down
Loading
Loading