diff --git a/docs/manual/config/config_file_reference.rst b/docs/manual/config/config_file_reference.rst
index 45bcc605e3..cc2b0bb5da 100644
--- a/docs/manual/config/config_file_reference.rst
+++ b/docs/manual/config/config_file_reference.rst
@@ -2795,8 +2795,8 @@ The default value is: ``none``
..
generated from ddsi_config.h[6b6cd6f2af797765bc3753758bdd5f166f64b418]
- generated from ddsi_config.c[a48ec3d818f2ad852949a7832951709c3488dc10]
- generated from ddsi__cfgelems.h[32ba43b085108475f0fc5041d4593a9926c0f6f9]
+ generated from ddsi_config.c[98a66b82efe4476934c17d3df879e70add3b65a2]
+ generated from ddsi__cfgelems.h[7fe8f980955d7eab1f9515a474d5a51042698343]
generated from cfgunits.h[05f093223fce107d24dd157ebaafa351dc9df752]
generated from _confgen.h[bb9a0fc6ef1f7f7c46790ee00132e340e5fff36d]
generated from _confgen.c[0d833a6f2c98902f1249e63aed03a6164f0791d6]
diff --git a/docs/manual/options.md b/docs/manual/options.md
index b8d17a65f4..5c56af3750 100644
--- a/docs/manual/options.md
+++ b/docs/manual/options.md
@@ -1961,8 +1961,8 @@ The categorisation of tracing output is incomplete and hence most of the verbosi
The default value is: `none`
-
-
+
+
diff --git a/etc/cyclonedds.rnc b/etc/cyclonedds.rnc
index 9921c61e11..8a5d9a78b6 100644
--- a/etc/cyclonedds.rnc
+++ b/etc/cyclonedds.rnc
@@ -1360,8 +1360,8 @@ MIIEpAIBAAKCAQEA3HIh...AOBaaqSV37XBUJg== Deprecated (use Transport instead) This element allows selecting the transport to be used (udp, udp6, "
"tcp, tcp6, raweth) This element allows selecting the transport to be used (udp, udp6, "
+ "raweth)
maybe_memsize = xsd:token { pattern = "default|0|(\d+(\.\d*)?([Ee][\-+]?\d+)?|\.\d+([Ee][\-+]?\d+)?) *([kMG]i?)?B" }
}
# generated from ddsi_config.h[6b6cd6f2af797765bc3753758bdd5f166f64b418]
-# generated from ddsi_config.c[a48ec3d818f2ad852949a7832951709c3488dc10]
-# generated from ddsi__cfgelems.h[32ba43b085108475f0fc5041d4593a9926c0f6f9]
+# generated from ddsi_config.c[98a66b82efe4476934c17d3df879e70add3b65a2]
+# generated from ddsi__cfgelems.h[7fe8f980955d7eab1f9515a474d5a51042698343]
# generated from cfgunits.h[05f093223fce107d24dd157ebaafa351dc9df752]
# generated from _confgen.h[bb9a0fc6ef1f7f7c46790ee00132e340e5fff36d]
# generated from _confgen.c[0d833a6f2c98902f1249e63aed03a6164f0791d6]
diff --git a/etc/cyclonedds.xsd b/etc/cyclonedds.xsd
index 375ae42076..51f7bc0c56 100644
--- a/etc/cyclonedds.xsd
+++ b/etc/cyclonedds.xsd
@@ -2047,8 +2047,8 @@ MIIEpAIBAAKCAQEA3HIh...AOBaaqSV37XBUJg==<br>
-
-
+
+
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 54af236fba..a4f58b0374 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -49,6 +49,7 @@ option(ENABLE_TYPELIB "Enable Type Library support" ON)
option(ENABLE_TYPE_DISCOVERY "Enable Type Discovery support" ON)
option(ENABLE_TOPIC_DISCOVERY "Enable Topic Discovery support" ON)
option(ENABLE_QOS_PROVIDER "Enable Qos Provider support" ON)
+option(ENABLE_TCP "Enable PoC support for DDS-over-TCP" OFF)
if(ENABLE_TYPE_DISCOVERY)
if(NOT ENABLE_TYPELIB)
message(FATAL_ERROR "ENABLE_TYPE_DISCOVERY requires ENABLE_TYPELIB to be enabled")
diff --git a/src/core/ddsc/tests/CMakeLists.txt b/src/core/ddsc/tests/CMakeLists.txt
index 4f5b4ab873..24e5aae0dc 100644
--- a/src/core/ddsc/tests/CMakeLists.txt
+++ b/src/core/ddsc/tests/CMakeLists.txt
@@ -153,6 +153,11 @@ if(ENABLE_QOS_PROVIDER)
"qos_provider.c")
endif()
+if(ENABLE_TCP)
+ list(APPEND ddsc_test_sources
+ "tcp.c")
+endif()
+
add_cunit_executable(cunit_ddsc ${ddsc_test_sources})
if(MSVC)
diff --git a/src/core/ddsc/tests/tcp.c b/src/core/ddsc/tests/tcp.c
new file mode 100644
index 0000000000..4fc40be5fd
--- /dev/null
+++ b/src/core/ddsc/tests/tcp.c
@@ -0,0 +1,246 @@
+// Copyright(c) 2025 ZettaScale Technology and others
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
+// v. 1.0 which is available at
+// http://www.eclipse.org/org/documents/edl-v10.php.
+//
+// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
+
+#include
The TCP element allows you to specify various parameters related to " "running DDSI over TCP.
" )), +#endif #ifdef DDS_HAS_TCP_TLS GROUP("SSL", ssl_cfgelems, NULL, 1, NOMEMBER, @@ -2294,7 +2308,9 @@ static struct cfgelem root_cfgelems[] = { MOVED("Discovery", "CycloneDDS/Domain/Discovery"), MOVED("Tracing", "CycloneDDS/Domain/Tracing"), MOVED("Internal|Unsupported", "CycloneDDS/Domain/Internal"), +#if DDS_HAS_TCP MOVED("TCP", "CycloneDDS/Domain/TCP"), +#endif #if DDS_HAS_SECURITY MOVED("DDSSecurity", "CycloneDDS/Domain/Security"), #endif diff --git a/src/core/ddsi/src/ddsi__receive.h b/src/core/ddsi/src/ddsi__receive.h index dba2c01fb9..2512bffaae 100644 --- a/src/core/ddsi/src/ddsi__receive.h +++ b/src/core/ddsi/src/ddsi__receive.h @@ -57,7 +57,7 @@ int ddsi_user_dqueue_handler (const struct ddsi_rsample_info *sampleinfo, const int ddsi_add_gap (struct ddsi_xmsg *msg, struct ddsi_writer *wr, struct ddsi_proxy_reader *prd, ddsi_seqno_t start, ddsi_seqno_t base, uint32_t numbits, const uint32_t *bits); /** @component incoming_rtps */ -void ddsi_handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, unsigned char *msg, const struct ddsi_network_packet_info *pktinfo); +void ddsi_handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, const struct ddsi_network_packet_info *pktinfo); #if defined (__cplusplus) } diff --git a/src/core/ddsi/src/ddsi_config.c b/src/core/ddsi/src/ddsi_config.c index b344d17124..8ca0d36d14 100644 --- a/src/core/ddsi/src/ddsi_config.c +++ b/src/core/ddsi/src/ddsi_config.c @@ -166,7 +166,9 @@ DU(natint); DU(natint_255); DU(pos_uint); DUPF(participantIndex); +#ifdef DDS_HAS_TCP DU(dyn_port); +#endif DUPF(memsize); DUPF(memsize16); DU(duration_inf); @@ -992,8 +994,20 @@ static const char *en_sched_class_vs[] = { "realtime", "timeshare", "default", N static const ddsrt_sched_t en_sched_class_ms[] = { DDSRT_SCHED_REALTIME, DDSRT_SCHED_TIMESHARE, DDSRT_SCHED_DEFAULT, 0 }; GENERIC_ENUM_CTYPE (sched_class, ddsrt_sched_t) -static const char *en_transport_selector_vs[] = { "default", "udp", "udp6", "tcp", "tcp6", "raweth", "none", NULL }; -static const enum ddsi_transport_selector en_transport_selector_ms[] = { DDSI_TRANS_DEFAULT, DDSI_TRANS_UDP, DDSI_TRANS_UDP6, DDSI_TRANS_TCP, DDSI_TRANS_TCP6, DDSI_TRANS_RAWETH, DDSI_TRANS_NONE, 0 }; +static const char *en_transport_selector_vs[] = { + "default", "udp", "udp6", +#ifdef DDS_HAS_TCP + "tcp", "tcp6", +#endif + "raweth", "none", NULL +}; +static const enum ddsi_transport_selector en_transport_selector_ms[] = { + DDSI_TRANS_DEFAULT, DDSI_TRANS_UDP, DDSI_TRANS_UDP6, +#ifdef DDS_HAS_TCP + DDSI_TRANS_TCP, DDSI_TRANS_TCP6, +#endif + DDSI_TRANS_RAWETH, DDSI_TRANS_NONE, 0 +}; GENERIC_ENUM_CTYPE (transport_selector, enum ddsi_transport_selector) /* by putting the "true" and "false" aliases at the end, they won't come out of the @@ -1472,10 +1486,12 @@ static void pf_int (struct ddsi_cfgst *cfgst, void *parent, struct cfgelem const cfg_logelem (cfgst, sources, "%d", *p); } +#ifdef DDS_HAS_TCP static enum update_result uf_dyn_port(struct ddsi_cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, int first, const char *value) { return uf_int_min_max(cfgst, parent, cfgelem, first, value, -1, 65535); } +#endif static enum update_result uf_natint(struct ddsi_cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, int first, const char *value) { diff --git a/src/core/ddsi/src/ddsi_discovery_spdp.c b/src/core/ddsi/src/ddsi_discovery_spdp.c index 0f042262d9..8d3204cfc1 100644 --- a/src/core/ddsi/src/ddsi_discovery_spdp.c +++ b/src/core/ddsi/src/ddsi_discovery_spdp.c @@ -100,17 +100,15 @@ void ddsi_get_participant_builtin_topic_data (const struct ddsi_participant *pp, } // Construct unicast locator parameters + if (gv->config.publish_uc_locators) { struct locators_builder def_uni = locators_builder_init (&dst->default_unicast_locators, locs->def_uni, MAX_XMIT_CONNS); struct locators_builder meta_uni = locators_builder_init (&dst->metatraffic_unicast_locators, locs->meta_uni, MAX_XMIT_CONNS); for (int i = 0; i < gv->n_interfaces; i++) { if (!gv->xmit_conns[i]->m_factory->m_enable_spdp) - { - // skip any interfaces where the address kind doesn't match the selected transport - // as a reasonablish way of not advertising PSMX locators here continue; - } + #ifndef NDEBUG int32_t kind; #endif @@ -135,9 +133,6 @@ void ddsi_get_participant_builtin_topic_data (const struct ddsi_participant *pp, assert (kind == gv->interfaces[i].extloc.kind); locators_add_one (&def_uni, &gv->interfaces[i].extloc, data_port); locators_add_one (&meta_uni, &gv->interfaces[i].extloc, meta_port); - } - if (gv->config.publish_uc_locators) - { dst->present |= PP_DEFAULT_UNICAST_LOCATOR | PP_METATRAFFIC_UNICAST_LOCATOR; dst->aliased |= PP_DEFAULT_UNICAST_LOCATOR | PP_METATRAFFIC_UNICAST_LOCATOR; } diff --git a/src/core/ddsi/src/ddsi_init.c b/src/core/ddsi/src/ddsi_init.c index e866838179..c3c86c7682 100644 --- a/src/core/ddsi/src/ddsi_init.c +++ b/src/core/ddsi/src/ddsi_init.c @@ -1086,7 +1086,8 @@ static void decide_participant_index_and_add_localhost_to_peers (struct ddsi_dom { GVTRACE ("all interfaces allow spdp multicast, no peers defined: defaulting participant index to \"none\"\n"); gv->config.participantIndex = DDSI_PARTICIPANT_INDEX_NONE; - } else if (all_allow_spdp_mc) + } + else if (all_allow_spdp_mc) { GVTRACE ("all interfaces allow spdp multicast, but peers defined: defaulting participant index to \"auto\"\n"); gv->config.participantIndex = DDSI_PARTICIPANT_INDEX_AUTO; @@ -1098,7 +1099,7 @@ static void decide_participant_index_and_add_localhost_to_peers (struct ddsi_dom } } if (gv->config.add_localhost_to_peers == DDSI_BOOLDEF_TRUE || - (none_allow_spdp_mc && gv->config.add_localhost_to_peers != DDSI_BOOLDEF_FALSE)) + (none_allow_spdp_mc && gv->m_factory->m_connless && gv->config.add_localhost_to_peers != DDSI_BOOLDEF_FALSE)) { // add self to as_disc, but only once we have everything set up to actually do that if (gv->config.add_localhost_to_peers == DDSI_BOOLDEF_DEFAULT) @@ -1515,11 +1516,10 @@ int ddsi_init (struct ddsi_domaingv *gv, struct ddsi_psmx_instance_locators *psm { if (gv->config.tcp_port < 0) ; /* no TCP listener */ - else if (gv->config.tcp_port == DDSI_TRAN_RANDOM_PORT_NUMBER) - ; /* kernel-allocated random port */ - else if (!ddsi_is_valid_port (gv->m_factory, (uint32_t) gv->config.tcp_port)) + else if (gv->config.tcp_port != DDSI_TRAN_RANDOM_PORT_NUMBER && !ddsi_is_valid_port (gv->m_factory, (uint32_t) gv->config.tcp_port)) { GVERROR ("Listener port %d is out of range for transport %s\n", gv->config.tcp_port, gv->m_factory->m_typename); + goto err_mc_conn; } else { diff --git a/src/core/ddsi/src/ddsi_receive.c b/src/core/ddsi/src/ddsi_receive.c index f9a30ec202..7bbfc9425c 100644 --- a/src/core/ddsi/src/ddsi_receive.c +++ b/src/core/ddsi/src/ddsi_receive.c @@ -2983,6 +2983,16 @@ static struct ddsi_receiver_state *rst_cow_if_needed (int *rst_live, struct ddsi } } +static bool smhdr_bswap_needed (ddsi_rtps_submessage_header_t const * const smhdr) +{ + DDSRT_WARNING_MSVC_OFF(6326); + if (smhdr->flags & DDSI_RTPS_SUBMESSAGE_FLAG_ENDIANNESS) + return !(DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); + else + return (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); + DDSRT_WARNING_MSVC_ON(6326); +} + static int handle_submsg_sequence ( struct ddsi_thread_state * const thrst, @@ -3045,14 +3055,7 @@ static int handle_submsg_sequence while (vr != VR_MALFORMED && submsg <= (end - sizeof (ddsi_rtps_submessage_header_t))) { ddsi_rtps_submessage_t * const sm = (ddsi_rtps_submessage_t *) submsg; - bool byteswap; - - DDSRT_WARNING_MSVC_OFF(6326) - if (sm->smhdr.flags & DDSI_RTPS_SUBMESSAGE_FLAG_ENDIANNESS) - byteswap = !(DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); - else - byteswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); - DDSRT_WARNING_MSVC_ON(6326) + const bool byteswap = smhdr_bswap_needed (&sm->smhdr); if (byteswap) sm->smhdr.octetsToNextHeader = ddsrt_bswap2u (sm->smhdr.octetsToNextHeader); @@ -3258,8 +3261,9 @@ static int handle_submsg_sequence } } -static void handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, unsigned char *msg, const struct ddsi_network_packet_info *pktinfo) +static void handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, const struct ddsi_network_packet_info *pktinfo) { + unsigned char *msg = DDSI_RMSG_PAYLOAD (rmsg); ddsi_rtps_header_t *hdr = (ddsi_rtps_header_t *) msg; assert (gv->config.protocol_version.major == DDSI_RTPS_MAJOR); assert (ddsi_thread_is_asleep ()); @@ -3298,100 +3302,80 @@ static void handle_rtps_message (struct ddsi_thread_state * const thrst, struct } } -void ddsi_handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, unsigned char *msg, const struct ddsi_network_packet_info *pktinfo) +void ddsi_handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, const struct ddsi_network_packet_info *pktinfo) { - handle_rtps_message (thrst, gv, conn, guidprefix, rbpool, rmsg, sz, msg, pktinfo); + handle_rtps_message (thrst, gv, conn, guidprefix, rbpool, rmsg, sz, pktinfo); } -static bool do_packet (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool) +ddsrt_nonnull_all ddsrt_attribute_warn_unused_result +static ssize_t read_packet_from_stream (struct ddsi_domaingv *gv, struct ddsi_tran_conn *conn, struct ddsi_rmsg *rmsg, size_t maxsz, struct ddsi_network_packet_info *pktinfo) { - /* UDP max packet size is 64kB */ + // Streams are sequences of RTPS messages, where the first submessage of each message + // must be a ADLINK_MSG_LEN with the "length" field the total RTPS message size. + // + // FIXME: We ought to keep enough state for reading messages in multiple rounds + // + // Note that we can't fix that until we stop using a ddsi_rmsg to buffer incoming data. + + assert (conn->m_stream); - const size_t maxsz = gv->config.rmsg_chunk_size < 65536 ? gv->config.rmsg_chunk_size : 65536; const size_t ddsi_msg_len_size = 8; const size_t stream_hdr_size = DDSI_RTPS_MESSAGE_HEADER_SIZE + ddsi_msg_len_size; + unsigned char * const buff = (unsigned char *) DDSI_RMSG_PAYLOAD (rmsg); ssize_t sz; - struct ddsi_rmsg * rmsg = ddsi_rmsg_new (rbpool); - unsigned char * buff; - size_t buff_len = maxsz; - ddsi_rtps_header_t * hdr; - struct ddsi_network_packet_info pktinfo; - if (rmsg == NULL) - { - return false; - } + sz = ddsi_conn_read (conn, buff, stream_hdr_size, true, pktinfo); + if (sz == 0) // Spurious read can happen with SSL, at this point we're still good + return 0; - DDSRT_STATIC_ASSERT (sizeof (struct ddsi_rmsg) == offsetof (struct ddsi_rmsg, chunk) + sizeof (struct ddsi_rmsg_chunk)); - buff = (unsigned char *) DDSI_RMSG_PAYLOAD (rmsg); - hdr = (ddsi_rtps_header_t*) buff; + // Errors or incorrect size: signal failure so connection will be dropped + // don't report: this is typically the path taken if the connection was dropped + if (sz < 0 || (size_t) sz != stream_hdr_size) + return -1; - if (conn->m_stream) - { - ddsi_rtps_msg_len_t * ml = (ddsi_rtps_msg_len_t*) (hdr + 1); + // First submessage following header should be ADLINK_MSG_LEN. The RTPS message + // header and this submessage's octets-to-next-header will checked by handle_rtps_message() + ddsi_rtps_header_t * const hdr = (ddsi_rtps_header_t *) buff; + ddsi_rtps_msg_len_t * const ml = (ddsi_rtps_msg_len_t *) (hdr + 1); + if (ml->smhdr.submessageId != DDSI_RTPS_SMID_ADLINK_MSG_LEN) + goto framing_error; - /* - Read in packet header to get size of packet in ddsi_rtps_msg_len_t, then read in - remainder of packet. - */ + if (smhdr_bswap_needed (&ml->smhdr)) + ml->length = ddsrt_bswap4u (ml->length); + if (ml->length < stream_hdr_size || ml->length > maxsz) + goto framing_error; - /* Read in DDSI header plus MSG_LEN sub message that follows it */ + sz = ddsi_conn_read (conn, buff + stream_hdr_size, ml->length - stream_hdr_size, false, NULL); + if (sz != (ssize_t) (ml->length - stream_hdr_size)) + return -1; - sz = ddsi_conn_read (conn, buff, stream_hdr_size, true, &pktinfo); - if (sz == 0) - { - /* Spurious read -- which at this point is still ok */ - ddsi_rmsg_commit (rmsg); - return true; - } + return (ssize_t) ml->length; - /* Read in remainder of packet */ +framing_error: + GVTRACE ("framing error, dropping connection\n"); + return -1; +} - if (sz > 0) - { - int swap; +static bool do_packet (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool) +{ + /* UDP max packet size is 64kB, we always limit RTPS messages always to 64kB */ + const size_t maxsz = gv->config.rmsg_chunk_size < 65536 ? gv->config.rmsg_chunk_size : 65536; + struct ddsi_network_packet_info pktinfo; + ssize_t sz; - DDSRT_WARNING_MSVC_OFF(6326) - if (ml->smhdr.flags & DDSI_RTPS_SUBMESSAGE_FLAG_ENDIANNESS) - { - swap = !(DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); - } - else - { - swap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); - } - DDSRT_WARNING_MSVC_ON(6326) - if (swap) - { - ml->length = ddsrt_bswap4u (ml->length); - } + struct ddsi_rmsg * const rmsg = ddsi_rmsg_new (rbpool); + if (rmsg == NULL) + return false; - if (ml->smhdr.submessageId != DDSI_RTPS_SMID_ADLINK_MSG_LEN) - { - malformed_packet_received (gv, buff, NULL, (size_t) sz, hdr->vendorid); - sz = -1; - } - else - { - sz = ddsi_conn_read (conn, buff + stream_hdr_size, ml->length - stream_hdr_size, false, NULL); - if (sz > 0) - { - sz = (ssize_t) ml->length; - } - } - } - } + if (!conn->m_stream) + sz = ddsi_conn_read (conn, DDSI_RMSG_PAYLOAD (rmsg), maxsz, true, &pktinfo); else - { - /* Get next packet */ - - sz = ddsi_conn_read (conn, buff, buff_len, true, &pktinfo); - } + sz = read_packet_from_stream (gv, conn, rmsg, maxsz, &pktinfo); if (sz > 0 && !gv->deaf) { ddsi_rmsg_setsize (rmsg, (uint32_t) sz); - handle_rtps_message(thrst, gv, conn, guidprefix, rbpool, rmsg, (size_t) sz, buff, &pktinfo); + handle_rtps_message (thrst, gv, conn, guidprefix, rbpool, rmsg, (size_t) sz, &pktinfo); } ddsi_rmsg_commit (rmsg); return (sz > 0); diff --git a/src/core/ddsi/src/ddsi_tcp.c b/src/core/ddsi/src/ddsi_tcp.c index fc501e8d53..bcc989a85a 100644 --- a/src/core/ddsi/src/ddsi_tcp.c +++ b/src/core/ddsi/src/ddsi_tcp.c @@ -251,9 +251,8 @@ static dds_return_t ddsi_tcp_sock_new (struct ddsi_tran_factory_tcp * const fact return rc; } -static void ddsi_tcp_node_free (void * ptr) +static void ddsi_tcp_node_free (ddsi_tcp_node_t node) { - ddsi_tcp_node_t node = (ddsi_tcp_node_t) ptr; ddsi_conn_free ((struct ddsi_tran_conn *) node->m_conn); ddsrt_free (node); } @@ -345,7 +344,7 @@ static void ddsi_tcp_cache_add (struct ddsi_tran_factory_tcp *fact, ddsi_tcp_con } sockaddr_to_string_with_port(buff, sizeof(buff), &conn->m_peer_addr.a); - GVLOG (DDS_LC_TCP, "tcp cache %s %s socket %"PRIdSOCK" to %s\n", action, conn->m_base.m_server ? "server" : "client", conn->m_sock, buff); + GVLOG (DDS_LC_TCP, "tcp cache %s %s socket %"PRIdSOCK" to %s (%p)\n", action, conn->m_base.m_server ? "server" : "client", conn->m_sock, buff, (void *) conn); } static void ddsi_tcp_cache_remove (ddsi_tcp_conn_t conn) @@ -361,7 +360,7 @@ static void ddsi_tcp_cache_remove (ddsi_tcp_conn_t conn) if (node) { sockaddr_to_string_with_port(buff, sizeof(buff), &conn->m_peer_addr.a); - GVLOG (DDS_LC_TCP, "tcp cache removed socket %"PRIdSOCK" to %s\n", conn->m_sock, buff); + GVLOG (DDS_LC_TCP, "tcp cache removed socket %"PRIdSOCK" to %s (%p)\n", conn->m_sock, buff, (void *) conn); ddsrt_avl_delete_dpath (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, node, &path); ddsi_tcp_node_free (node); } @@ -388,25 +387,23 @@ static ddsi_tcp_conn_t ddsi_tcp_cache_find (struct ddsi_tran_factory_tcp *fact, ddsrt_mutex_lock (&fact->ddsi_tcp_cache_lock_g); node = ddsrt_avl_lookup_ipath (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, &key, &path); - if (node) + if (node && !node->m_conn->m_base.m_closed) + ret = node->m_conn; + else { - if (node->m_conn->m_base.m_closed) + if (node) { + struct ddsi_domaingv * const gv = node->m_conn->m_base.m_factory->gv; + GVLOG (DDS_LC_TCP, "tcp drop in cache find (%p)\n", (void *) node->m_conn); ddsrt_avl_delete (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, node); ddsi_tcp_node_free (node); + // path is no longer valid because we deleted the node, so compute it anew + (void) ddsrt_avl_lookup_ipath (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, &key, &path); } - else - { - ret = node->m_conn; - } - } - if (ret == NULL) - { ret = ddsi_tcp_new_conn (fact, NULL, DDSRT_INVALID_SOCKET, false, &key.m_peer_addr.a); ddsi_tcp_cache_add (fact, ret, &path); } ddsrt_mutex_unlock (&fact->ddsi_tcp_cache_lock_g); - return ret; } @@ -965,6 +962,10 @@ static ddsi_tcp_conn_t ddsi_tcp_new_conn (struct ddsi_tran_factory_tcp *fact, co conn->m_base.m_base.m_port = INVALID_PORT; ddsi_tcp_conn_set_socket (conn, sock); + char buff[DDSI_LOCSTRLEN]; + struct ddsi_domaingv * const gv = fact->fact.gv; + sockaddr_to_string_with_port(buff, sizeof(buff), &conn->m_peer_addr.a); + GVLOG (DDS_LC_TCP, "tcp new %s connection on socket %"PRIdSOCK" to %s (%p)\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff, (void *) conn); return conn; } @@ -1015,7 +1016,7 @@ static void ddsi_tcp_conn_delete (ddsi_tcp_conn_t conn) struct ddsi_domaingv const * const gv = fact->fact.gv; char buff[DDSI_LOCSTRLEN]; sockaddr_to_string_with_port(buff, sizeof(buff), &conn->m_peer_addr.a); - GVLOG (DDS_LC_TCP, "tcp free %s connection on socket %"PRIdSOCK" to %s\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff); + GVLOG (DDS_LC_TCP, "tcp free %s connection on socket %"PRIdSOCK" to %s (%p)\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff, (void *) conn); #ifdef DDS_HAS_TCP_TLS if (fact->ddsi_tcp_ssl_plugin.ssl_free) @@ -1123,11 +1124,23 @@ static void ddsi_tcp_release_listener (struct ddsi_tran_listener * listener) ddsrt_free (tl); } +static void ddsi_tcp_release_factory_free_cache_node (void *vnode) +{ + ddsi_tcp_node_t node = vnode; + struct ddsi_domaingv * const gv = node->m_conn->m_base.m_factory->gv; + GVLOG (DDS_LC_TCP, "tcp cache free on shutdown (%p)\n", (void *) node->m_conn); + // TCP support code really is broken ... it doesn't ordinarily leak connections, + // but it does sometimes when a connection is created by a message sent to a + // peer that was removed just before. The TCP code should be rewritten entirely. + ddsrt_atomic_st32 (&node->m_conn->m_base.m_count, 1); + ddsi_tcp_node_free (node); +} + static void ddsi_tcp_release_factory (struct ddsi_tran_factory *fact_cmn) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) fact_cmn; struct ddsi_domaingv const * const gv = fact->fact.gv; - ddsrt_avl_free (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, ddsi_tcp_node_free); + ddsrt_avl_free (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, ddsi_tcp_release_factory_free_cache_node); ddsrt_mutex_destroy (&fact->ddsi_tcp_cache_lock_g); #ifdef DDS_HAS_TCP_TLS if (fact->ddsi_tcp_ssl_plugin.fini) diff --git a/src/core/ddsi/tests/plist_leasedur.c b/src/core/ddsi/tests/plist_leasedur.c index d17ccd0df1..9667729f03 100644 --- a/src/core/ddsi/tests/plist_leasedur.c +++ b/src/core/ddsi/tests/plist_leasedur.c @@ -345,7 +345,7 @@ static void ddsi_plist_leasedur_new_proxypp_impl (bool include_lease_duration, b memcpy (buf + size, pkt_trailer, sizeof (pkt_trailer)); size += sizeof (pkt_trailer); ddsi_rmsg_setsize (rmsg, (uint32_t) size); - ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, buf, &pktinfo); + ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, &pktinfo); ddsi_rmsg_commit (rmsg); // Discovery data processing is done by the dq.builtin thread, so we can't be @@ -494,7 +494,7 @@ static void ddsi_plist_leasedur_new_proxyrd_impl (bool include_lease_duration, b memcpy (buf + size, pkt_p4, sizeof (pkt_p4)); size += sizeof (pkt_p4); ddsi_rmsg_setsize (rmsg, (uint32_t) size); - ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, buf, &pktinfo); + ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, &pktinfo); ddsi_rmsg_commit (rmsg); // Discovery data processing is done by the dq.builtin thread, so we can't be diff --git a/src/core/ddsi/tests/pmd_message.c b/src/core/ddsi/tests/pmd_message.c index b6ad3e3e7f..9643c95ad8 100644 --- a/src/core/ddsi/tests/pmd_message.c +++ b/src/core/ddsi/tests/pmd_message.c @@ -212,7 +212,7 @@ static void create_fake_proxy_participant (void) memcpy (buf, spdp_pkt, sizeof (spdp_pkt)); size += sizeof (spdp_pkt); ddsi_rmsg_setsize (rmsg, (uint32_t) size); - ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, buf, &pktinfo); + ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, &pktinfo); ddsi_rmsg_commit (rmsg); // wait until SPDP message has been processed wait_for_dqueue (); @@ -302,7 +302,7 @@ static void send_pmd_message (uint32_t seqlo, uint16_t encoding, uint16_t option memcpy (buf, pmd_pkt, sizeof (pmd_pkt)); size += sizeof (pmd_pkt) - 24 + act_payload_size; ddsi_rmsg_setsize (rmsg, (uint32_t) size); - ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, buf, &pktinfo); + ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, &pktinfo); ddsi_rmsg_commit (rmsg); // wait until PMD message has been processed wait_for_dqueue (); diff --git a/src/core/ddsi/tests/receive_packet.c b/src/core/ddsi/tests/receive_packet.c index a2ad431abb..62de19651d 100644 --- a/src/core/ddsi/tests/receive_packet.c +++ b/src/core/ddsi/tests/receive_packet.c @@ -418,7 +418,7 @@ CU_Test (ddsi_receive_packet, rti_dispose_with_key) ddsi_rmsg_setsize (rmsg, (uint32_t) sizeof (rtps_message)); struct ddsi_thread_state * const thrst = ddsi_lookup_thread_state (); - ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, (uint32_t) sizeof (rtps_message), buf, &pktinfo); + ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, (uint32_t) sizeof (rtps_message), &pktinfo); ddsi_rmsg_commit (rmsg); receive_packet_fini (); diff --git a/src/ddsrt/CMakeLists.txt b/src/ddsrt/CMakeLists.txt index a78ed8a84e..f37eb6588e 100644 --- a/src/ddsrt/CMakeLists.txt +++ b/src/ddsrt/CMakeLists.txt @@ -369,7 +369,7 @@ set(DDSRT_WITH_LWIP ${WITH_LWIP}) set(DDSRT_WITH_FREERTOS ${WITH_FREERTOS}) foreach(feature TCP_TLS SECURITY LIFESPAN DEADLINE_MISSED NETWORK_PARTITIONS - TYPELIB TYPE_DISCOVERY TOPIC_DISCOVERY QOS_PROVIDER) + TYPELIB TYPE_DISCOVERY TOPIC_DISCOVERY QOS_PROVIDER TCP) set(DDS_HAS_${feature} ${ENABLE_${feature}}) endforeach() diff --git a/src/ddsrt/include/dds/features.h.in b/src/ddsrt/include/dds/features.h.in index a294df19bf..e3ecf09328 100644 --- a/src/ddsrt/include/dds/features.h.in +++ b/src/ddsrt/include/dds/features.h.in @@ -44,5 +44,8 @@ /* Whether or not support for qos provider is included */ #cmakedefine DDS_HAS_QOS_PROVIDER 1 +/* Whether or not support for DDS-over-TCP is included */ +#cmakedefine DDS_HAS_TCP 1 + #endif