diff --git a/docs/manual/config/config_file_reference.rst b/docs/manual/config/config_file_reference.rst index 45bcc605e3..cc2b0bb5da 100644 --- a/docs/manual/config/config_file_reference.rst +++ b/docs/manual/config/config_file_reference.rst @@ -2795,8 +2795,8 @@ The default value is: ``none`` .. generated from ddsi_config.h[6b6cd6f2af797765bc3753758bdd5f166f64b418] - generated from ddsi_config.c[a48ec3d818f2ad852949a7832951709c3488dc10] - generated from ddsi__cfgelems.h[32ba43b085108475f0fc5041d4593a9926c0f6f9] + generated from ddsi_config.c[98a66b82efe4476934c17d3df879e70add3b65a2] + generated from ddsi__cfgelems.h[7fe8f980955d7eab1f9515a474d5a51042698343] generated from cfgunits.h[05f093223fce107d24dd157ebaafa351dc9df752] generated from _confgen.h[bb9a0fc6ef1f7f7c46790ee00132e340e5fff36d] generated from _confgen.c[0d833a6f2c98902f1249e63aed03a6164f0791d6] diff --git a/docs/manual/options.md b/docs/manual/options.md index b8d17a65f4..5c56af3750 100644 --- a/docs/manual/options.md +++ b/docs/manual/options.md @@ -1961,8 +1961,8 @@ The categorisation of tracing output is incomplete and hence most of the verbosi The default value is: `none` - - + + diff --git a/etc/cyclonedds.rnc b/etc/cyclonedds.rnc index 9921c61e11..8a5d9a78b6 100644 --- a/etc/cyclonedds.rnc +++ b/etc/cyclonedds.rnc @@ -1360,8 +1360,8 @@ MIIEpAIBAAKCAQEA3HIh...AOBaaqSV37XBUJg==
maybe_memsize = xsd:token { pattern = "default|0|(\d+(\.\d*)?([Ee][\-+]?\d+)?|\.\d+([Ee][\-+]?\d+)?) *([kMG]i?)?B" } } # generated from ddsi_config.h[6b6cd6f2af797765bc3753758bdd5f166f64b418] -# generated from ddsi_config.c[a48ec3d818f2ad852949a7832951709c3488dc10] -# generated from ddsi__cfgelems.h[32ba43b085108475f0fc5041d4593a9926c0f6f9] +# generated from ddsi_config.c[98a66b82efe4476934c17d3df879e70add3b65a2] +# generated from ddsi__cfgelems.h[7fe8f980955d7eab1f9515a474d5a51042698343] # generated from cfgunits.h[05f093223fce107d24dd157ebaafa351dc9df752] # generated from _confgen.h[bb9a0fc6ef1f7f7c46790ee00132e340e5fff36d] # generated from _confgen.c[0d833a6f2c98902f1249e63aed03a6164f0791d6] diff --git a/etc/cyclonedds.xsd b/etc/cyclonedds.xsd index 375ae42076..51f7bc0c56 100644 --- a/etc/cyclonedds.xsd +++ b/etc/cyclonedds.xsd @@ -2047,8 +2047,8 @@ MIIEpAIBAAKCAQEA3HIh...AOBaaqSV37XBUJg==<br> - - + + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 54af236fba..a4f58b0374 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -49,6 +49,7 @@ option(ENABLE_TYPELIB "Enable Type Library support" ON) option(ENABLE_TYPE_DISCOVERY "Enable Type Discovery support" ON) option(ENABLE_TOPIC_DISCOVERY "Enable Topic Discovery support" ON) option(ENABLE_QOS_PROVIDER "Enable Qos Provider support" ON) +option(ENABLE_TCP "Enable PoC support for DDS-over-TCP" OFF) if(ENABLE_TYPE_DISCOVERY) if(NOT ENABLE_TYPELIB) message(FATAL_ERROR "ENABLE_TYPE_DISCOVERY requires ENABLE_TYPELIB to be enabled") diff --git a/src/core/ddsc/tests/CMakeLists.txt b/src/core/ddsc/tests/CMakeLists.txt index 4f5b4ab873..24e5aae0dc 100644 --- a/src/core/ddsc/tests/CMakeLists.txt +++ b/src/core/ddsc/tests/CMakeLists.txt @@ -153,6 +153,11 @@ if(ENABLE_QOS_PROVIDER) "qos_provider.c") endif() +if(ENABLE_TCP) + list(APPEND ddsc_test_sources + "tcp.c") +endif() + add_cunit_executable(cunit_ddsc ${ddsc_test_sources}) if(MSVC) diff --git a/src/core/ddsc/tests/tcp.c b/src/core/ddsc/tests/tcp.c new file mode 100644 index 0000000000..e77d31dc8d --- /dev/null +++ b/src/core/ddsc/tests/tcp.c @@ -0,0 +1,239 @@ +// Copyright(c) 2025 ZettaScale Technology and others +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +// v. 1.0 which is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +#include +#include + +#include "dds/ddsi/ddsi_protocol.h" +#include "dds/ddsi/ddsi_thread.h" +#include "dds/ddsrt/time.h" +#include "dds__guid.h" +#include "ddsi__tran.h" +#include "test_common.h" +#include "dds/ddsi/ddsi_entity_index.h" +#include "dds/ddsi/ddsi_proxy_participant.h" +#include "dds/ddsi/ddsi_addrset.h" + +#include "dds/dds.h" +#include "dds/ddsrt/io.h" +#include "dds/ddsrt/heap.h" +#include "ddsi__addrset.h" + +static ddsrt_atomic_uint32_t framing_errors; + +static void scan_for_framing_error (void *ptr, const dds_log_data_t *data) +{ + (void) ptr; + //tprintf ("[%"PRIu32"] %s", data->domid, data->message); + if (strstr (data->message, "framing error, dropping connection") != NULL) + ddsrt_atomic_or32 (&framing_errors, 1u << data->domid); +} + +static void do_tcp (void (*action) (dds_entity_t rd, dds_entity_t wr, void *arg), void *arg) +{ + const char *config_fmt = + "" + " " + " " + " " + " tcp" + "" + "" + " 0" + " ${CYCLONEDDS_PID}" + " " + " %s" + " " + "" + "" + " %d" + "" + "trace,tcpstdout"; + + ddsrt_atomic_st32 (&framing_errors, 0); + dds_set_log_sink (scan_for_framing_error, NULL); + dds_set_trace_sink (scan_for_framing_error, NULL); + + // Start up a new domain listening to an ephemeral TCP port + char *configs = NULL; + (void) ddsrt_asprintf (&configs, config_fmt, "", 0); + const dds_entity_t doms = dds_create_domain (0, configs); + CU_ASSERT_GT_FATAL (doms, 0); + ddsrt_free (configs); + struct ddsi_domaingv * const gvs = get_domaingv (doms); + CU_ASSERT_NEQ_FATAL (gvs, NULL); + + // Create a second domain instance without a server socket and with the + // first domain's ephemeral port as the peer + char peerlocbuf[DDSI_LOCSTRLEN]; + char *peerstr = NULL; + (void) ddsrt_asprintf (&peerstr, "", ddsi_locator_to_string (peerlocbuf, sizeof (peerlocbuf), &gvs->loc_meta_uc)); + char *configc = NULL; + (void) ddsrt_asprintf (&configc, config_fmt, peerstr, -1); + const dds_entity_t domc = dds_create_domain (1, configc); + CU_ASSERT_GT_FATAL (domc, 0); + ddsrt_free (configc); + ddsrt_free (peerstr); + + const dds_entity_t pp_pub = dds_create_participant (0, NULL, NULL); + CU_ASSERT_GT_FATAL (pp_pub, 0); + const dds_entity_t pp_sub = dds_create_participant (1, NULL, NULL); + CU_ASSERT_GT_FATAL (pp_sub, 0); + char topicname[100]; + create_unique_topic_name ("tcp_basic", topicname, sizeof (topicname)); + dds_qos_t *qos = dds_create_qos (); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY); + const dds_entity_t tp_pub = dds_create_topic (pp_pub, &Space_Type1_desc, topicname, qos, NULL); + const dds_entity_t tp_sub = dds_create_topic (pp_sub, &Space_Type1_desc, topicname, qos, NULL); + dds_delete_qos (qos); + + const dds_entity_t wr = dds_create_writer (pp_pub, tp_pub, NULL, NULL); + const dds_entity_t rd = dds_create_reader (pp_sub, tp_sub, NULL, NULL); + sync_reader_writer (pp_sub, rd, pp_pub, wr); + + action (rd, wr, arg); + + dds_return_t rc; + rc = dds_delete (domc); + CU_ASSERT_EQ_FATAL (rc, 0); + rc = dds_delete (doms); + CU_ASSERT_EQ_FATAL (rc, 0); + + dds_set_log_sink (scan_for_framing_error, NULL); + dds_set_trace_sink (scan_for_framing_error, NULL); +} + +static void do_basic_write (dds_entity_t rd, dds_entity_t wr, void *varg) +{ + (void) varg; + + dds_return_t rc; + rc = dds_write (wr, &(Space_Type1){345,678,101}); + CU_ASSERT_EQ_FATAL (rc, 0); + + const dds_entity_t ws = dds_create_waitset (dds_get_participant (rd)); + CU_ASSERT_GT_FATAL (ws, 0); + rc = dds_set_status_mask (rd, DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQ_FATAL (rc, 0); + rc = dds_waitset_attach (ws, rd, 0); + CU_ASSERT_EQ_FATAL (rc, 0); + rc = dds_waitset_wait (ws, NULL, 0, DDS_MSECS (5000)); + CU_ASSERT_EQ (rc, 1); + + Space_Type1 sample; + void *rawsample = &sample; + dds_sample_info_t si; + rc = dds_take (rd, &rawsample, &si, 1, 1); + CU_ASSERT_EQ_FATAL (rc, 1); + CU_ASSERT_FATAL(si.valid_data); + CU_ASSERT_EQ_FATAL (sample.long_1, 345); + CU_ASSERT_EQ_FATAL (sample.long_2, 678); + CU_ASSERT_EQ_FATAL (sample.long_3, 101); +} + +CU_Test (ddsc_tcp, basic) +{ + do_tcp (do_basic_write, NULL); +} + +struct inject_arg { + ddsrt_iovec_t data; +}; + +static void do_inject (dds_entity_t rd, dds_entity_t wr, void *varg) +{ + const struct inject_arg *arg = varg; + struct ddsi_domaingv * const gvwr = get_domaingv (wr); + + // Find reader's proxy participant in writer's domain + dds_guid_t rdguid; + dds_get_guid (rd, &rdguid); + ddsi_guid_t rdpp_guid = dds_guid_to_ddsi_guid (rdguid); + rdpp_guid.entityid.u = DDSI_ENTITYID_PARTICIPANT; + ddsi_thread_state_awake (ddsi_lookup_thread_state (), gvwr); + struct ddsi_proxy_participant *rdproxypp = ddsi_entidx_lookup_proxy_participant_guid (gvwr->entity_index, &rdpp_guid); + CU_ASSERT_NEQ_FATAL (rdproxypp, NULL); + ddsi_thread_state_asleep (ddsi_lookup_thread_state ()); + // ... because we need the address + ddsi_xlocator_t xloc; + ddsi_addrset_any_uc (rdproxypp->as_default, &xloc); + + // Make writer's domain mute so we can inject garbage in the TCP connection without + // interfering with RTPS messages being sent in the background + dds_domain_set_deafmute (wr, false, true, DDS_INFINITY); + + DDSI_DECL_TRAN_WRITE_MSGFRAGS_PTR(mf, 1); + mf->niov = 1; + mf->iov[0] = arg->data; + ddsi_conn_write (xloc.conn, &xloc.c, mf, 0); + + // logger should + dds_domainid_t rd_domain_id; + dds_return_t rc; + rc = dds_get_domainid (rd, &rd_domain_id); + CU_ASSERT_EQ_FATAL (rc, 0); + while (ddsrt_atomic_ld32 (&framing_errors) == 0) + dds_sleepfor (DDS_MSECS (10)); + CU_ASSERT_EQ_FATAL (ddsrt_atomic_ld32 (&framing_errors), (1u << rd_domain_id)); +} + +CU_Test (ddsc_tcp, inject_wrong_smid) +{ + unsigned char msg[] = { + 'R', 'T', 'P', 'S', /* version (don't care): */ 2,5, + /* vendor id (don't care, 1.16 = Cyclone */ 1,16, + /* GUID prefix (don't care) */ 1,16,3,4, 5,6,7,8, 9,10,11,12, + /* SMID_ADLINK_MSG_LEN, big-endian, 4 octets-to-next-header */ 130,0,0,4, + /* fake length */ 0x0,0x0,0x0,0x20 + }; + struct inject_arg arg = { + .data = { + .iov_len = 28, + .iov_base = msg + } + }; + do_tcp (do_inject, &arg); +} + +CU_Test (ddsc_tcp, inject_short_length) +{ + unsigned char msg[] = { + 'R', 'T', 'P', 'S', /* version (don't care): */ 2,5, + /* vendor id (don't care, 1.16 = Cyclone */ 1,16, + /* GUID prefix (don't care) */ 1,16,3,4, 5,6,7,8, 9,10,11,12, + /* SMID_ADLINK_MSG_LEN, big-endian, 4 octets-to-next-header */ 129,0,0,4, + /* fake length */ 0x0,0x0,0x0,0x3 + }; + struct inject_arg arg = { + .data = { + .iov_len = 28, + .iov_base = msg + } + }; + do_tcp (do_inject, &arg); +} + +CU_Test (ddsc_tcp, inject_oversize_length) +{ + unsigned char msg[] = { + 'R', 'T', 'P', 'S', /* version (don't care): */ 2,5, + /* vendor id (don't care, 1.16 = Cyclone */ 1,16, + /* GUID prefix (don't care) */ 1,16,3,4, 5,6,7,8, 9,10,11,12, + /* SMID_ADLINK_MSG_LEN, big-endian, 4 octets-to-next-header */ 129,0,0,4, + /* fake length */ 0x33,0x44,0x55,0x66 + }; + struct inject_arg arg = { + .data = { + .iov_len = 28, + .iov_base = msg + } + }; + do_tcp (do_inject, &arg); +} diff --git a/src/core/ddsi/defconfig.c b/src/core/ddsi/defconfig.c index c253bed8e4..ee5bf97149 100644 --- a/src/core/ddsi/defconfig.c +++ b/src/core/ddsi/defconfig.c @@ -104,8 +104,8 @@ void ddsi_config_init_default (struct ddsi_config *cfg) #endif /* DDS_HAS_TCP_TLS */ } /* generated from ddsi_config.h[6b6cd6f2af797765bc3753758bdd5f166f64b418] */ -/* generated from ddsi_config.c[a48ec3d818f2ad852949a7832951709c3488dc10] */ -/* generated from ddsi__cfgelems.h[32ba43b085108475f0fc5041d4593a9926c0f6f9] */ +/* generated from ddsi_config.c[98a66b82efe4476934c17d3df879e70add3b65a2] */ +/* generated from ddsi__cfgelems.h[7fe8f980955d7eab1f9515a474d5a51042698343] */ /* generated from cfgunits.h[05f093223fce107d24dd157ebaafa351dc9df752] */ /* generated from _confgen.h[bb9a0fc6ef1f7f7c46790ee00132e340e5fff36d] */ /* generated from _confgen.c[0d833a6f2c98902f1249e63aed03a6164f0791d6] */ diff --git a/src/core/ddsi/src/ddsi__addrset.h b/src/core/ddsi/src/ddsi__addrset.h index 8336b640d3..4f0754ba2c 100644 --- a/src/core/ddsi/src/ddsi__addrset.h +++ b/src/core/ddsi/src/ddsi__addrset.h @@ -32,8 +32,6 @@ struct ddsi_addrset { ddsrt_avl_ctree_t ucaddrs, mcaddrs; }; -typedef ssize_t (*ddsi_addrset_forone_fun_t) (const ddsi_xlocator_t *loc, void *arg); - /** @component locators */ void ddsi_add_xlocator_to_addrset (const struct ddsi_domaingv *gv, struct ddsi_addrset *as, const ddsi_xlocator_t *loc) ddsrt_nonnull_all; @@ -98,10 +96,6 @@ bool ddsi_addrset_contains_non_psmx_uc (const struct ddsi_addrset *as) /* Keeps AS locked */ -/** @component locators */ -int ddsi_addrset_forone (struct ddsi_addrset *as, ddsi_addrset_forone_fun_t f, void *arg) - ddsrt_nonnull ((1, 2)); - /** @component locators */ size_t ddsi_addrset_forall_count (struct ddsi_addrset *as, ddsi_addrset_forall_fun_t f, void *arg) ddsrt_nonnull ((1, 2)); diff --git a/src/core/ddsi/src/ddsi__cfgelems.h b/src/core/ddsi/src/ddsi__cfgelems.h index 0078d3064f..6740fbdbb6 100644 --- a/src/core/ddsi/src/ddsi__cfgelems.h +++ b/src/core/ddsi/src/ddsi__cfgelems.h @@ -306,6 +306,7 @@ static struct cfgelem general_cfgelems[] = { FUNCTIONS(0, uf_boolean_default, 0, pf_nop), DESCRIPTION("

Deprecated (use Transport instead)

"), VALUES("false","true","default")), +#ifdef DDS_HAS_TCP ENUM("Transport", NULL, 1, "default", MEMBER(transport_selector), FUNCTIONS(0, uf_transport_selector, 0, pf_transport_selector), @@ -313,6 +314,15 @@ static struct cfgelem general_cfgelems[] = { "

This element allows selecting the transport to be used (udp, udp6, " "tcp, tcp6, raweth)

"), VALUES("default","udp","udp6","tcp","tcp6","raweth")), +#else + ENUM("Transport", NULL, 1, "default", + MEMBER(transport_selector), + FUNCTIONS(0, uf_transport_selector, 0, pf_transport_selector), + DESCRIPTION( + "

This element allows selecting the transport to be used (udp, udp6, " + "raweth)

"), + VALUES("default","udp","udp6","raweth")), +#endif BOOL("EnableMulticastLoopback", NULL, 1, "true", MEMBER(enableMulticastLoopback), FUNCTIONS(0, uf_boolean, 0, pf_boolean), @@ -1712,6 +1722,7 @@ static struct cfgelem discovery_ports_cfgelems[] = { END_MARKER }; +#ifdef DDS_HAS_TCP static struct cfgelem tcp_cfgelems[] = { ENUM("Enable", NULL, 1, "default", MEMBER(compat_tcp_enable), @@ -1764,6 +1775,7 @@ static struct cfgelem tcp_cfgelems[] = { )), END_MARKER }; +#endif #ifdef DDS_HAS_TCP_TLS static struct cfgelem ssl_cfgelems[] = { @@ -2249,6 +2261,7 @@ static struct cfgelem domain_cfgelems[] = { "functionality is reserved. This includes renaming or moving " "options.

" )), +#ifdef DDS_HAS_TCP GROUP("TCP", tcp_cfgelems, NULL, 1, NOMEMBER, NOFUNCTIONS, @@ -2256,6 +2269,7 @@ static struct cfgelem domain_cfgelems[] = { "

The TCP element allows you to specify various parameters related to " "running DDSI over TCP.

" )), +#endif #ifdef DDS_HAS_TCP_TLS GROUP("SSL", ssl_cfgelems, NULL, 1, NOMEMBER, @@ -2294,7 +2308,9 @@ static struct cfgelem root_cfgelems[] = { MOVED("Discovery", "CycloneDDS/Domain/Discovery"), MOVED("Tracing", "CycloneDDS/Domain/Tracing"), MOVED("Internal|Unsupported", "CycloneDDS/Domain/Internal"), +#if DDS_HAS_TCP MOVED("TCP", "CycloneDDS/Domain/TCP"), +#endif #if DDS_HAS_SECURITY MOVED("DDSSecurity", "CycloneDDS/Domain/Security"), #endif diff --git a/src/core/ddsi/src/ddsi__receive.h b/src/core/ddsi/src/ddsi__receive.h index dba2c01fb9..2512bffaae 100644 --- a/src/core/ddsi/src/ddsi__receive.h +++ b/src/core/ddsi/src/ddsi__receive.h @@ -57,7 +57,7 @@ int ddsi_user_dqueue_handler (const struct ddsi_rsample_info *sampleinfo, const int ddsi_add_gap (struct ddsi_xmsg *msg, struct ddsi_writer *wr, struct ddsi_proxy_reader *prd, ddsi_seqno_t start, ddsi_seqno_t base, uint32_t numbits, const uint32_t *bits); /** @component incoming_rtps */ -void ddsi_handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, unsigned char *msg, const struct ddsi_network_packet_info *pktinfo); +void ddsi_handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, const struct ddsi_network_packet_info *pktinfo); #if defined (__cplusplus) } diff --git a/src/core/ddsi/src/ddsi__security_omg.h b/src/core/ddsi/src/ddsi__security_omg.h index 14f0d6ec42..86e78e85ba 100644 --- a/src/core/ddsi/src/ddsi__security_omg.h +++ b/src/core/ddsi/src/ddsi__security_omg.h @@ -1056,11 +1056,11 @@ ddsi_rtps_msg_state_t ddsi_security_decode_rtps_message (struct ddsi_thread_stat * @param[in] sec_info Security information for handles. * @param[in] conn_write_cb Function to call to do the actual writing. * - * @returns ssize_t + * @returns ddsrt_ssize_t * @retval negative/zero Something went wrong. * @retval positive Secure writing succeeded. */ -ssize_t +ddsrt_ssize_t ddsi_security_secure_conn_write( const struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, diff --git a/src/core/ddsi/src/ddsi__ssl.h b/src/core/ddsi/src/ddsi__ssl.h index 46c5783a39..22dab17790 100644 --- a/src/core/ddsi/src/ddsi__ssl.h +++ b/src/core/ddsi/src/ddsi__ssl.h @@ -31,8 +31,8 @@ struct ddsi_ssl_plugins void (*fini) (void); void (*ssl_free) (SSL *ssl); void (*bio_vfree) (BIO *bio); - ssize_t (*read) (SSL *ssl, void *buf, size_t len, dds_return_t *err); - ssize_t (*write) (SSL *ssl, const void *msg, size_t len, dds_return_t *err); + ddsrt_ssize_t (*read) (SSL *ssl, void *buf, size_t len, dds_return_t *err); + ddsrt_ssize_t (*write) (SSL *ssl, const void *msg, size_t len, dds_return_t *err); SSL * (*connect) (const struct ddsi_domaingv *gv, ddsrt_socket_t sock); BIO * (*listen) (ddsrt_socket_t sock); SSL * (*accept) (const struct ddsi_domaingv *gv, BIO *bio, ddsrt_socket_t *sock); diff --git a/src/core/ddsi/src/ddsi__tran.h b/src/core/ddsi/src/ddsi__tran.h index ea0890af74..85c2a6b865 100644 --- a/src/core/ddsi/src/ddsi__tran.h +++ b/src/core/ddsi/src/ddsi__tran.h @@ -102,8 +102,8 @@ struct ddsi_network_packet_info { }; /* Function pointer types */ -typedef ssize_t (*ddsi_tran_read_fn_t) (struct ddsi_tran_conn *, unsigned char *, size_t, bool, struct ddsi_network_packet_info *pktinfo); -typedef ssize_t (*ddsi_tran_write_fn_t) (struct ddsi_tran_conn *, const ddsi_locator_t *, const ddsi_tran_write_msgfrags_t *, uint32_t); +typedef ddsrt_ssize_t (*ddsi_tran_read_fn_t) (struct ddsi_tran_conn *, unsigned char *, size_t, bool, struct ddsi_network_packet_info *pktinfo); +typedef ddsrt_ssize_t (*ddsi_tran_write_fn_t) (struct ddsi_tran_conn *, const ddsi_locator_t *, const ddsi_tran_write_msgfrags_t *, uint32_t); typedef int (*ddsi_tran_locator_fn_t) (struct ddsi_tran_factory *, struct ddsi_tran_base *, ddsi_locator_t *); typedef bool (*ddsi_tran_supports_fn_t) (const struct ddsi_tran_factory *, int32_t); typedef ddsrt_socket_t (*ddsi_tran_handle_fn_t) (struct ddsi_tran_base *); @@ -388,12 +388,12 @@ inline int ddsi_conn_locator (struct ddsi_tran_conn * conn, ddsi_locator_t * loc } /** @component transport */ -inline ssize_t ddsi_conn_write (struct ddsi_tran_conn * conn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) { +inline ddsrt_ssize_t ddsi_conn_write (struct ddsi_tran_conn * conn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) { return conn->m_closed ? -1 : (conn->m_write_fn) (conn, dst, msgfrags, flags); } /** @component transport */ -inline ssize_t ddsi_conn_read (struct ddsi_tran_conn * conn, unsigned char * buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo) { +inline ddsrt_ssize_t ddsi_conn_read (struct ddsi_tran_conn * conn, unsigned char * buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo) { return conn->m_closed ? -1 : conn->m_read_fn (conn, buf, len, allow_spurious, pktinfo); } diff --git a/src/core/ddsi/src/ddsi_addrset.c b/src/core/ddsi/src/ddsi_addrset.c index a884cb23ae..c404c48aee 100644 --- a/src/core/ddsi/src/ddsi_addrset.c +++ b/src/core/ddsi/src/ddsi_addrset.c @@ -499,29 +499,6 @@ size_t ddsi_addrset_forall_uc_count (struct ddsi_addrset *as, ddsi_addrset_foral return count; } -int ddsi_addrset_forone (struct ddsi_addrset *as, ddsi_addrset_forone_fun_t f, void *arg) -{ - struct ddsi_addrset_node *n; - ddsrt_avl_ctree_t *trees[2]; - ddsrt_avl_citer_t iter; - - trees[0] = &as->mcaddrs; - trees[1] = &as->ucaddrs; - for (int i = 0; i < 2; i++) - { - n = (struct ddsi_addrset_node *) ddsrt_avl_citer_first (&addrset_treedef, trees[i], &iter); - while (n) - { - if ((f) (&n->loc, arg) > 0) - { - return 0; - } - n = (struct ddsi_addrset_node *) ddsrt_avl_citer_next (&iter); - } - } - return -1; -} - struct log_addrset_helper_arg { uint32_t tf; diff --git a/src/core/ddsi/src/ddsi_config.c b/src/core/ddsi/src/ddsi_config.c index b344d17124..8ca0d36d14 100644 --- a/src/core/ddsi/src/ddsi_config.c +++ b/src/core/ddsi/src/ddsi_config.c @@ -166,7 +166,9 @@ DU(natint); DU(natint_255); DU(pos_uint); DUPF(participantIndex); +#ifdef DDS_HAS_TCP DU(dyn_port); +#endif DUPF(memsize); DUPF(memsize16); DU(duration_inf); @@ -992,8 +994,20 @@ static const char *en_sched_class_vs[] = { "realtime", "timeshare", "default", N static const ddsrt_sched_t en_sched_class_ms[] = { DDSRT_SCHED_REALTIME, DDSRT_SCHED_TIMESHARE, DDSRT_SCHED_DEFAULT, 0 }; GENERIC_ENUM_CTYPE (sched_class, ddsrt_sched_t) -static const char *en_transport_selector_vs[] = { "default", "udp", "udp6", "tcp", "tcp6", "raweth", "none", NULL }; -static const enum ddsi_transport_selector en_transport_selector_ms[] = { DDSI_TRANS_DEFAULT, DDSI_TRANS_UDP, DDSI_TRANS_UDP6, DDSI_TRANS_TCP, DDSI_TRANS_TCP6, DDSI_TRANS_RAWETH, DDSI_TRANS_NONE, 0 }; +static const char *en_transport_selector_vs[] = { + "default", "udp", "udp6", +#ifdef DDS_HAS_TCP + "tcp", "tcp6", +#endif + "raweth", "none", NULL +}; +static const enum ddsi_transport_selector en_transport_selector_ms[] = { + DDSI_TRANS_DEFAULT, DDSI_TRANS_UDP, DDSI_TRANS_UDP6, +#ifdef DDS_HAS_TCP + DDSI_TRANS_TCP, DDSI_TRANS_TCP6, +#endif + DDSI_TRANS_RAWETH, DDSI_TRANS_NONE, 0 +}; GENERIC_ENUM_CTYPE (transport_selector, enum ddsi_transport_selector) /* by putting the "true" and "false" aliases at the end, they won't come out of the @@ -1472,10 +1486,12 @@ static void pf_int (struct ddsi_cfgst *cfgst, void *parent, struct cfgelem const cfg_logelem (cfgst, sources, "%d", *p); } +#ifdef DDS_HAS_TCP static enum update_result uf_dyn_port(struct ddsi_cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, int first, const char *value) { return uf_int_min_max(cfgst, parent, cfgelem, first, value, -1, 65535); } +#endif static enum update_result uf_natint(struct ddsi_cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, int first, const char *value) { diff --git a/src/core/ddsi/src/ddsi_debmon.c b/src/core/ddsi/src/ddsi_debmon.c index 7c5ebbd189..2f50684260 100644 --- a/src/core/ddsi/src/ddsi_debmon.c +++ b/src/core/ddsi/src/ddsi_debmon.c @@ -695,7 +695,7 @@ static uint32_t debmon_main (void *vdm) ddsrt_shutdown (sock, DDSRT_SHUTDOWN_WRITE); char buffer[100]; dds_return_t ret; - ssize_t bytes_read; + ddsrt_ssize_t bytes_read; do { fd_set fds; FD_ZERO (&fds); diff --git a/src/core/ddsi/src/ddsi_discovery_spdp.c b/src/core/ddsi/src/ddsi_discovery_spdp.c index 97675b8bc2..e5ec101159 100644 --- a/src/core/ddsi/src/ddsi_discovery_spdp.c +++ b/src/core/ddsi/src/ddsi_discovery_spdp.c @@ -100,17 +100,15 @@ void ddsi_get_participant_builtin_topic_data (const struct ddsi_participant *pp, } // Construct unicast locator parameters + if (gv->config.publish_uc_locators) { struct locators_builder def_uni = locators_builder_init (&dst->default_unicast_locators, locs->def_uni, MAX_XMIT_CONNS); struct locators_builder meta_uni = locators_builder_init (&dst->metatraffic_unicast_locators, locs->meta_uni, MAX_XMIT_CONNS); for (int i = 0; i < gv->n_interfaces; i++) { if (!gv->xmit_conns[i]->m_factory->m_enable_spdp) - { - // skip any interfaces where the address kind doesn't match the selected transport - // as a reasonablish way of not advertising PSMX locators here continue; - } + #ifndef NDEBUG int32_t kind; #endif @@ -135,9 +133,6 @@ void ddsi_get_participant_builtin_topic_data (const struct ddsi_participant *pp, assert (kind == gv->interfaces[i].extloc.kind); locators_add_one (&def_uni, &gv->interfaces[i].extloc, data_port); locators_add_one (&meta_uni, &gv->interfaces[i].extloc, meta_port); - } - if (gv->config.publish_uc_locators) - { dst->present |= PP_DEFAULT_UNICAST_LOCATOR | PP_METATRAFFIC_UNICAST_LOCATOR; dst->aliased |= PP_DEFAULT_UNICAST_LOCATOR | PP_METATRAFFIC_UNICAST_LOCATOR; } diff --git a/src/core/ddsi/src/ddsi_init.c b/src/core/ddsi/src/ddsi_init.c index e866838179..c3c86c7682 100644 --- a/src/core/ddsi/src/ddsi_init.c +++ b/src/core/ddsi/src/ddsi_init.c @@ -1086,7 +1086,8 @@ static void decide_participant_index_and_add_localhost_to_peers (struct ddsi_dom { GVTRACE ("all interfaces allow spdp multicast, no peers defined: defaulting participant index to \"none\"\n"); gv->config.participantIndex = DDSI_PARTICIPANT_INDEX_NONE; - } else if (all_allow_spdp_mc) + } + else if (all_allow_spdp_mc) { GVTRACE ("all interfaces allow spdp multicast, but peers defined: defaulting participant index to \"auto\"\n"); gv->config.participantIndex = DDSI_PARTICIPANT_INDEX_AUTO; @@ -1098,7 +1099,7 @@ static void decide_participant_index_and_add_localhost_to_peers (struct ddsi_dom } } if (gv->config.add_localhost_to_peers == DDSI_BOOLDEF_TRUE || - (none_allow_spdp_mc && gv->config.add_localhost_to_peers != DDSI_BOOLDEF_FALSE)) + (none_allow_spdp_mc && gv->m_factory->m_connless && gv->config.add_localhost_to_peers != DDSI_BOOLDEF_FALSE)) { // add self to as_disc, but only once we have everything set up to actually do that if (gv->config.add_localhost_to_peers == DDSI_BOOLDEF_DEFAULT) @@ -1515,11 +1516,10 @@ int ddsi_init (struct ddsi_domaingv *gv, struct ddsi_psmx_instance_locators *psm { if (gv->config.tcp_port < 0) ; /* no TCP listener */ - else if (gv->config.tcp_port == DDSI_TRAN_RANDOM_PORT_NUMBER) - ; /* kernel-allocated random port */ - else if (!ddsi_is_valid_port (gv->m_factory, (uint32_t) gv->config.tcp_port)) + else if (gv->config.tcp_port != DDSI_TRAN_RANDOM_PORT_NUMBER && !ddsi_is_valid_port (gv->m_factory, (uint32_t) gv->config.tcp_port)) { GVERROR ("Listener port %d is out of range for transport %s\n", gv->config.tcp_port, gv->m_factory->m_typename); + goto err_mc_conn; } else { diff --git a/src/core/ddsi/src/ddsi_raweth.c b/src/core/ddsi/src/ddsi_raweth.c index 5bb4f5ba66..8e277c07a4 100644 --- a/src/core/ddsi/src/ddsi_raweth.c +++ b/src/core/ddsi/src/ddsi_raweth.c @@ -78,7 +78,7 @@ typedef struct ddsi_raweth_conn { ddsrt_mutex_t lock; char *buffer; uint32_t buflen; - ssize_t avail; + ddsrt_ssize_t avail; char *bptr; #endif } *ddsi_raweth_conn_t; @@ -146,10 +146,10 @@ static size_t set_ethernet_header(struct ddsi_vlan_header *hdr, uint16_t proto, } #if defined(__linux) -static ssize_t ddsi_raweth_conn_read (struct ddsi_tran_conn * conn, unsigned char * buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo) +static ddsrt_ssize_t ddsi_raweth_conn_read (struct ddsi_tran_conn * conn, unsigned char * buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo) { dds_return_t rc; - ssize_t ret = 0; + ddsrt_ssize_t ret = 0; struct msghdr msghdr; struct sockaddr_ll src; struct ddsi_ethernet_header ehdr; @@ -179,9 +179,9 @@ static ssize_t ddsi_raweth_conn_read (struct ddsi_tran_conn * conn, unsigned cha rc = ddsrt_recvmsg(&((ddsi_raweth_conn_t) conn)->m_sockext, &msghdr, 0, &ret); } while (rc == DDS_RETCODE_INTERRUPTED); - if (ret > (ssize_t) sizeof (ehdr)) + if (ret > (ddsrt_ssize_t) sizeof (ehdr)) { - ret -= (ssize_t) sizeof (ehdr); + ret -= (ddsrt_ssize_t) sizeof (ehdr); for (cptr = CMSG_FIRSTHDR(&msghdr); cptr; cptr = CMSG_NXTHDR( &msghdr, cptr)) { @@ -218,11 +218,11 @@ static ssize_t ddsi_raweth_conn_read (struct ddsi_tran_conn * conn, unsigned cha return ret; } -static ssize_t ddsi_raweth_conn_write (struct ddsi_tran_conn * conn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) +static ddsrt_ssize_t ddsi_raweth_conn_write (struct ddsi_tran_conn * conn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) { ddsi_raweth_conn_t uc = (ddsi_raweth_conn_t) conn; dds_return_t rc; - ssize_t ret = -1; + ddsrt_ssize_t ret = -1; unsigned retry = 2; int sendflags = 0; struct msghdr msg; @@ -433,9 +433,9 @@ struct ddsi_vlan_tag { * the manipulations using the field to obtain the next packet in the buffer can be safely done. */ -static ssize_t ddsi_raweth_conn_read (struct ddsi_tran_conn * conn, unsigned char * buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo) +static ddsrt_ssize_t ddsi_raweth_conn_read (struct ddsi_tran_conn * conn, unsigned char * buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo) { - ssize_t ret = 0; + ddsrt_ssize_t ret = 0; dds_return_t rc = DDS_RETCODE_OK; ddsi_raweth_conn_t uc = (ddsi_raweth_conn_t) conn; struct bpf_hdr *bpf_hdr; @@ -469,12 +469,12 @@ static ssize_t ddsi_raweth_conn_read (struct ddsi_tran_conn * conn, unsigned cha if (bpf_hdr->bh_datalen == bpf_hdr->bh_caplen) { - ret = (ssize_t)(bpf_hdr->bh_datalen - sizeof(struct ddsi_ethernet_header)); + ret = (ddsrt_ssize_t)(bpf_hdr->bh_datalen - sizeof(struct ddsi_ethernet_header)); if (ntohs(eth_hdr->proto) == ETHERTYPE_VLAN) { vtag = (struct ddsi_vlan_tag *)ptr; ptr += sizeof(*vtag); - ret -= (ssize_t)sizeof(*vtag); + ret -= (ddsrt_ssize_t)sizeof(*vtag); } if ((size_t)ret <= len) { @@ -507,11 +507,11 @@ static ssize_t ddsi_raweth_conn_read (struct ddsi_tran_conn * conn, unsigned cha return (rc == DDS_RETCODE_OK ? ret : -1);; } -static ssize_t ddsi_raweth_conn_write (struct ddsi_tran_conn * conn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) +static ddsrt_ssize_t ddsi_raweth_conn_write (struct ddsi_tran_conn * conn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) { ddsi_raweth_conn_t uc = (ddsi_raweth_conn_t) conn; dds_return_t rc = DDS_RETCODE_OK; - ssize_t ret = -1; + ddsrt_ssize_t ret = -1; struct ddsi_vlan_header vhdr; size_t hdrlen; (void) flags; diff --git a/src/core/ddsi/src/ddsi_receive.c b/src/core/ddsi/src/ddsi_receive.c index f9a30ec202..8150942793 100644 --- a/src/core/ddsi/src/ddsi_receive.c +++ b/src/core/ddsi/src/ddsi_receive.c @@ -2983,6 +2983,16 @@ static struct ddsi_receiver_state *rst_cow_if_needed (int *rst_live, struct ddsi } } +static bool smhdr_bswap_needed (ddsi_rtps_submessage_header_t const * const smhdr) +{ + DDSRT_WARNING_MSVC_OFF(6326); + if (smhdr->flags & DDSI_RTPS_SUBMESSAGE_FLAG_ENDIANNESS) + return !(DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); + else + return (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); + DDSRT_WARNING_MSVC_ON(6326); +} + static int handle_submsg_sequence ( struct ddsi_thread_state * const thrst, @@ -3045,14 +3055,7 @@ static int handle_submsg_sequence while (vr != VR_MALFORMED && submsg <= (end - sizeof (ddsi_rtps_submessage_header_t))) { ddsi_rtps_submessage_t * const sm = (ddsi_rtps_submessage_t *) submsg; - bool byteswap; - - DDSRT_WARNING_MSVC_OFF(6326) - if (sm->smhdr.flags & DDSI_RTPS_SUBMESSAGE_FLAG_ENDIANNESS) - byteswap = !(DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); - else - byteswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); - DDSRT_WARNING_MSVC_ON(6326) + const bool byteswap = smhdr_bswap_needed (&sm->smhdr); if (byteswap) sm->smhdr.octetsToNextHeader = ddsrt_bswap2u (sm->smhdr.octetsToNextHeader); @@ -3258,8 +3261,9 @@ static int handle_submsg_sequence } } -static void handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, unsigned char *msg, const struct ddsi_network_packet_info *pktinfo) +static void handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, const struct ddsi_network_packet_info *pktinfo) { + unsigned char *msg = DDSI_RMSG_PAYLOAD (rmsg); ddsi_rtps_header_t *hdr = (ddsi_rtps_header_t *) msg; assert (gv->config.protocol_version.major == DDSI_RTPS_MAJOR); assert (ddsi_thread_is_asleep ()); @@ -3298,100 +3302,80 @@ static void handle_rtps_message (struct ddsi_thread_state * const thrst, struct } } -void ddsi_handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, unsigned char *msg, const struct ddsi_network_packet_info *pktinfo) +void ddsi_handle_rtps_message (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool, struct ddsi_rmsg *rmsg, size_t sz, const struct ddsi_network_packet_info *pktinfo) { - handle_rtps_message (thrst, gv, conn, guidprefix, rbpool, rmsg, sz, msg, pktinfo); + handle_rtps_message (thrst, gv, conn, guidprefix, rbpool, rmsg, sz, pktinfo); } -static bool do_packet (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool) +ddsrt_nonnull_all ddsrt_attribute_warn_unused_result +static ddsrt_ssize_t read_packet_from_stream (struct ddsi_domaingv *gv, struct ddsi_tran_conn *conn, struct ddsi_rmsg *rmsg, size_t maxsz, struct ddsi_network_packet_info *pktinfo) { - /* UDP max packet size is 64kB */ + // Streams are sequences of RTPS messages, where the first submessage of each message + // must be a ADLINK_MSG_LEN with the "length" field the total RTPS message size. + // + // FIXME: We ought to keep enough state for reading messages in multiple rounds + // + // Note that we can't fix that until we stop using a ddsi_rmsg to buffer incoming data. + + assert (conn->m_stream); - const size_t maxsz = gv->config.rmsg_chunk_size < 65536 ? gv->config.rmsg_chunk_size : 65536; const size_t ddsi_msg_len_size = 8; const size_t stream_hdr_size = DDSI_RTPS_MESSAGE_HEADER_SIZE + ddsi_msg_len_size; - ssize_t sz; - struct ddsi_rmsg * rmsg = ddsi_rmsg_new (rbpool); - unsigned char * buff; - size_t buff_len = maxsz; - ddsi_rtps_header_t * hdr; - struct ddsi_network_packet_info pktinfo; + unsigned char * const buff = (unsigned char *) DDSI_RMSG_PAYLOAD (rmsg); + ddsrt_ssize_t sz; - if (rmsg == NULL) - { - return false; - } + sz = ddsi_conn_read (conn, buff, stream_hdr_size, true, pktinfo); + if (sz == 0) // Spurious read can happen with SSL, at this point we're still good + return 0; - DDSRT_STATIC_ASSERT (sizeof (struct ddsi_rmsg) == offsetof (struct ddsi_rmsg, chunk) + sizeof (struct ddsi_rmsg_chunk)); - buff = (unsigned char *) DDSI_RMSG_PAYLOAD (rmsg); - hdr = (ddsi_rtps_header_t*) buff; + // Errors or incorrect size: signal failure so connection will be dropped + // don't report: this is typically the path taken if the connection was dropped + if (sz < 0 || (size_t) sz != stream_hdr_size) + return -1; - if (conn->m_stream) - { - ddsi_rtps_msg_len_t * ml = (ddsi_rtps_msg_len_t*) (hdr + 1); + // First submessage following header should be ADLINK_MSG_LEN. The RTPS message + // header and this submessage's octets-to-next-header will checked by handle_rtps_message() + ddsi_rtps_header_t * const hdr = (ddsi_rtps_header_t *) buff; + ddsi_rtps_msg_len_t * const ml = (ddsi_rtps_msg_len_t *) (hdr + 1); + if (ml->smhdr.submessageId != DDSI_RTPS_SMID_ADLINK_MSG_LEN) + goto framing_error; - /* - Read in packet header to get size of packet in ddsi_rtps_msg_len_t, then read in - remainder of packet. - */ + if (smhdr_bswap_needed (&ml->smhdr)) + ml->length = ddsrt_bswap4u (ml->length); + if (ml->length < stream_hdr_size || ml->length > maxsz) + goto framing_error; - /* Read in DDSI header plus MSG_LEN sub message that follows it */ + sz = ddsi_conn_read (conn, buff + stream_hdr_size, ml->length - stream_hdr_size, false, NULL); + if (sz != (ddsrt_ssize_t) (ml->length - stream_hdr_size)) + return -1; - sz = ddsi_conn_read (conn, buff, stream_hdr_size, true, &pktinfo); - if (sz == 0) - { - /* Spurious read -- which at this point is still ok */ - ddsi_rmsg_commit (rmsg); - return true; - } + return (ddsrt_ssize_t) ml->length; - /* Read in remainder of packet */ +framing_error: + GVTRACE ("framing error, dropping connection\n"); + return -1; +} - if (sz > 0) - { - int swap; +static bool do_packet (struct ddsi_thread_state * const thrst, struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, const ddsi_guid_prefix_t *guidprefix, struct ddsi_rbufpool *rbpool) +{ + /* UDP max packet size is 64kB, we always limit RTPS messages always to 64kB */ + const size_t maxsz = gv->config.rmsg_chunk_size < 65536 ? gv->config.rmsg_chunk_size : 65536; + struct ddsi_network_packet_info pktinfo; + ddsrt_ssize_t sz; - DDSRT_WARNING_MSVC_OFF(6326) - if (ml->smhdr.flags & DDSI_RTPS_SUBMESSAGE_FLAG_ENDIANNESS) - { - swap = !(DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); - } - else - { - swap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN); - } - DDSRT_WARNING_MSVC_ON(6326) - if (swap) - { - ml->length = ddsrt_bswap4u (ml->length); - } + struct ddsi_rmsg * const rmsg = ddsi_rmsg_new (rbpool); + if (rmsg == NULL) + return false; - if (ml->smhdr.submessageId != DDSI_RTPS_SMID_ADLINK_MSG_LEN) - { - malformed_packet_received (gv, buff, NULL, (size_t) sz, hdr->vendorid); - sz = -1; - } - else - { - sz = ddsi_conn_read (conn, buff + stream_hdr_size, ml->length - stream_hdr_size, false, NULL); - if (sz > 0) - { - sz = (ssize_t) ml->length; - } - } - } - } + if (!conn->m_stream) + sz = ddsi_conn_read (conn, DDSI_RMSG_PAYLOAD (rmsg), maxsz, true, &pktinfo); else - { - /* Get next packet */ - - sz = ddsi_conn_read (conn, buff, buff_len, true, &pktinfo); - } + sz = read_packet_from_stream (gv, conn, rmsg, maxsz, &pktinfo); if (sz > 0 && !gv->deaf) { ddsi_rmsg_setsize (rmsg, (uint32_t) sz); - handle_rtps_message(thrst, gv, conn, guidprefix, rbpool, rmsg, (size_t) sz, buff, &pktinfo); + handle_rtps_message (thrst, gv, conn, guidprefix, rbpool, rmsg, (size_t) sz, &pktinfo); } ddsi_rmsg_commit (rmsg); return (sz > 0); diff --git a/src/core/ddsi/src/ddsi_security_omg.c b/src/core/ddsi/src/ddsi_security_omg.c index 41b29e576a..0bf18acf58 100644 --- a/src/core/ddsi/src/ddsi_security_omg.c +++ b/src/core/ddsi/src/ddsi_security_omg.c @@ -3780,7 +3780,7 @@ ddsi_security_decode_rtps_message ( return ret; } -ssize_t +ddsrt_ssize_t ddsi_security_secure_conn_write( const struct ddsi_domaingv *gv, struct ddsi_tran_conn * conn, @@ -3845,7 +3845,7 @@ ddsi_security_secure_conn_write( } } - ssize_t ret = -1; + ddsrt_ssize_t ret = -1; if (!ddsi_omg_security_encode_rtps_message (gv, sec_info->src_pp_handle, &guid, srcbuf, srclen, &dstbuf, &dstlen, dst_handle)) ret = -1; else diff --git a/src/core/ddsi/src/ddsi_sockwaitset.c b/src/core/ddsi/src/ddsi_sockwaitset.c index aaa4a7f29e..955c6ce29a 100644 --- a/src/core/ddsi/src/ddsi_sockwaitset.c +++ b/src/core/ddsi/src/ddsi_sockwaitset.c @@ -522,7 +522,7 @@ int ddsi_sock_waitset_next_event (struct ddsi_sock_waitset_ctx * ctx, struct dds { /* trigger pipe, read & try again */ char dummy; - ssize_t ret = read (entry->fd, &dummy, 1); + ddsrt_ssize_t ret = read (entry->fd, &dummy, 1); if (ret < 0) abort (); } diff --git a/src/core/ddsi/src/ddsi_ssl.c b/src/core/ddsi/src/ddsi_ssl.c index 2b3928d345..c6ee5c93de 100644 --- a/src/core/ddsi/src/ddsi_ssl.c +++ b/src/core/ddsi/src/ddsi_ssl.c @@ -64,7 +64,7 @@ static int ddsi_ssl_verify (int ok, X509_STORE_CTX *store) return ok; } -static ssize_t ddsi_ssl_read (SSL *ssl, void *buf, size_t len, dds_return_t *rc) +static ddsrt_ssize_t ddsi_ssl_read (SSL *ssl, void *buf, size_t len, dds_return_t *rc) { assert (len <= INT32_MAX); if (SSL_get_shutdown (ssl) != 0) @@ -96,7 +96,7 @@ static ssize_t ddsi_ssl_read (SSL *ssl, void *buf, size_t len, dds_return_t *rc) return rcvd; } -static ssize_t ddsi_ssl_write (SSL *ssl, const void *buf, size_t len, dds_return_t *rc) +static ddsrt_ssize_t ddsi_ssl_write (SSL *ssl, const void *buf, size_t len, dds_return_t *rc) { assert(len <= INT32_MAX); diff --git a/src/core/ddsi/src/ddsi_tcp.c b/src/core/ddsi/src/ddsi_tcp.c index fc501e8d53..6adc18e01a 100644 --- a/src/core/ddsi/src/ddsi_tcp.c +++ b/src/core/ddsi/src/ddsi_tcp.c @@ -251,9 +251,8 @@ static dds_return_t ddsi_tcp_sock_new (struct ddsi_tran_factory_tcp * const fact return rc; } -static void ddsi_tcp_node_free (void * ptr) +static void ddsi_tcp_node_free (ddsi_tcp_node_t node) { - ddsi_tcp_node_t node = (ddsi_tcp_node_t) ptr; ddsi_conn_free ((struct ddsi_tran_conn *) node->m_conn); ddsrt_free (node); } @@ -345,7 +344,7 @@ static void ddsi_tcp_cache_add (struct ddsi_tran_factory_tcp *fact, ddsi_tcp_con } sockaddr_to_string_with_port(buff, sizeof(buff), &conn->m_peer_addr.a); - GVLOG (DDS_LC_TCP, "tcp cache %s %s socket %"PRIdSOCK" to %s\n", action, conn->m_base.m_server ? "server" : "client", conn->m_sock, buff); + GVLOG (DDS_LC_TCP, "tcp cache %s %s socket %"PRIdSOCK" to %s (%p)\n", action, conn->m_base.m_server ? "server" : "client", conn->m_sock, buff, (void *) conn); } static void ddsi_tcp_cache_remove (ddsi_tcp_conn_t conn) @@ -361,7 +360,7 @@ static void ddsi_tcp_cache_remove (ddsi_tcp_conn_t conn) if (node) { sockaddr_to_string_with_port(buff, sizeof(buff), &conn->m_peer_addr.a); - GVLOG (DDS_LC_TCP, "tcp cache removed socket %"PRIdSOCK" to %s\n", conn->m_sock, buff); + GVLOG (DDS_LC_TCP, "tcp cache removed socket %"PRIdSOCK" to %s (%p)\n", conn->m_sock, buff, (void *) conn); ddsrt_avl_delete_dpath (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, node, &path); ddsi_tcp_node_free (node); } @@ -388,31 +387,29 @@ static ddsi_tcp_conn_t ddsi_tcp_cache_find (struct ddsi_tran_factory_tcp *fact, ddsrt_mutex_lock (&fact->ddsi_tcp_cache_lock_g); node = ddsrt_avl_lookup_ipath (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, &key, &path); - if (node) + if (node && !node->m_conn->m_base.m_closed) + ret = node->m_conn; + else { - if (node->m_conn->m_base.m_closed) + if (node) { + struct ddsi_domaingv * const gv = node->m_conn->m_base.m_factory->gv; + GVLOG (DDS_LC_TCP, "tcp drop in cache find (%p)\n", (void *) node->m_conn); ddsrt_avl_delete (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, node); ddsi_tcp_node_free (node); + // path is no longer valid because we deleted the node, so compute it anew + (void) ddsrt_avl_lookup_ipath (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, &key, &path); } - else - { - ret = node->m_conn; - } - } - if (ret == NULL) - { ret = ddsi_tcp_new_conn (fact, NULL, DDSRT_INVALID_SOCKET, false, &key.m_peer_addr.a); ddsi_tcp_cache_add (fact, ret, &path); } ddsrt_mutex_unlock (&fact->ddsi_tcp_cache_lock_g); - return ret; } -static ssize_t ddsi_tcp_conn_read_plain (ddsi_tcp_conn_t tcp, void * buf, size_t len, dds_return_t *rc) +static ddsrt_ssize_t ddsi_tcp_conn_read_plain (ddsi_tcp_conn_t tcp, void * buf, size_t len, dds_return_t *rc) { - ssize_t rcvd = -1; + ddsrt_ssize_t rcvd = -1; assert(rc != NULL); *rc = ddsrt_recv(tcp->m_sock, buf, len, 0, &rcvd); @@ -421,7 +418,7 @@ static ssize_t ddsi_tcp_conn_read_plain (ddsi_tcp_conn_t tcp, void * buf, size_t } #ifdef DDS_HAS_TCP_TLS -static ssize_t ddsi_tcp_conn_read_ssl (ddsi_tcp_conn_t tcp, void * buf, size_t len, dds_return_t *rc) +static ddsrt_ssize_t ddsi_tcp_conn_read_ssl (ddsi_tcp_conn_t tcp, void * buf, size_t len, dds_return_t *rc) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) tcp->m_base.m_factory; return (fact->ddsi_tcp_ssl_plugin.read) (tcp->m_ssl, buf, len, rc); @@ -464,15 +461,15 @@ static int32_t addrfam_to_locator_kind (int af) return (af == AF_INET) ? DDSI_LOCATOR_KIND_TCPv4 : DDSI_LOCATOR_KIND_TCPv6; } -static ssize_t ddsi_tcp_conn_read (struct ddsi_tran_conn * conn, unsigned char *buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo) +static ddsrt_ssize_t ddsi_tcp_conn_read (struct ddsi_tran_conn * conn, unsigned char *buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) conn->m_factory; struct ddsi_domaingv const * const gv = fact->fact.gv; dds_return_t rc; ddsi_tcp_conn_t tcp = (ddsi_tcp_conn_t) conn; - ssize_t (*rd) (ddsi_tcp_conn_t, void *, size_t, dds_return_t * err) = ddsi_tcp_conn_read_plain; + ddsrt_ssize_t (*rd) (ddsi_tcp_conn_t, void *, size_t, dds_return_t * err) = ddsi_tcp_conn_read_plain; size_t pos = 0; - ssize_t n; + ddsrt_ssize_t n; #ifdef DDS_HAS_TCP_TLS if (fact->ddsi_tcp_ssl_plugin.read) @@ -496,7 +493,7 @@ static ssize_t ddsi_tcp_conn_read (struct ddsi_tran_conn * conn, unsigned char * pktinfo->if_index = 0; pktinfo->dst.kind = DDSI_LOCATOR_KIND_INVALID; } - return (ssize_t) pos; + return (ddsrt_ssize_t) pos; } } else if (n == 0) @@ -529,9 +526,9 @@ static ssize_t ddsi_tcp_conn_read (struct ddsi_tran_conn * conn, unsigned char * return -1; } -static ssize_t ddsi_tcp_conn_write_plain (ddsi_tcp_conn_t conn, const void * buf, size_t len, dds_return_t *rc) +static ddsrt_ssize_t ddsi_tcp_conn_write_plain (ddsi_tcp_conn_t conn, const void * buf, size_t len, dds_return_t *rc) { - ssize_t sent = -1; + ddsrt_ssize_t sent = -1; int sendflags = 0; #ifdef MSG_NOSIGNAL @@ -543,21 +540,21 @@ static ssize_t ddsi_tcp_conn_write_plain (ddsi_tcp_conn_t conn, const void * buf } #ifdef DDS_HAS_TCP_TLS -static ssize_t ddsi_tcp_conn_write_ssl (ddsi_tcp_conn_t conn, const void * buf, size_t len, dds_return_t *rc) +static ddsrt_ssize_t ddsi_tcp_conn_write_ssl (ddsi_tcp_conn_t conn, const void * buf, size_t len, dds_return_t *rc) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) conn->m_base.m_factory; return (fact->ddsi_tcp_ssl_plugin.write) (conn->m_ssl, buf, len, rc); } #endif -static ssize_t ddsi_tcp_block_write (ssize_t (*wr) (ddsi_tcp_conn_t, const void *, size_t, dds_return_t *), ddsi_tcp_conn_t conn, const void * buf, size_t sz) +static ddsrt_ssize_t ddsi_tcp_block_write (ddsrt_ssize_t (*wr) (ddsi_tcp_conn_t, const void *, size_t, dds_return_t *), ddsi_tcp_conn_t conn, const void * buf, size_t sz) { /* Write all bytes of buf even in the presence of signals, partial writes and blocking (typically write buffer full) */ struct ddsi_domaingv const * const gv = conn->m_base.m_base.gv; dds_return_t rc; size_t pos = 0; - ssize_t n = -1; + ddsrt_ssize_t n = -1; while (pos != sz) { @@ -587,7 +584,7 @@ static ssize_t ddsi_tcp_block_write (ssize_t (*wr) (ddsi_tcp_conn_t, const void } } - return (pos == sz) ? (ssize_t) pos : -1; + return (pos == sz) ? (ddsrt_ssize_t) pos : -1; } static size_t iovlen_sum (size_t niov, const ddsrt_iovec_t *iov) @@ -604,7 +601,7 @@ static void set_msghdr_iov (ddsrt_msghdr_t *mhdr, ddsrt_iovec_t *iov, size_t iov mhdr->msg_iovlen = (ddsrt_msg_iovlen_t)iovlen; } -static ssize_t ddsi_tcp_conn_write (struct ddsi_tran_conn * base, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) +static ddsrt_ssize_t ddsi_tcp_conn_write (struct ddsi_tran_conn * base, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) base->m_factory; struct ddsi_domaingv const * const gv = fact->fact.gv; @@ -612,7 +609,7 @@ static ssize_t ddsi_tcp_conn_write (struct ddsi_tran_conn * base, const ddsi_loc char msgbuf[4096]; /* stack buffer for merging smallish writes without requiring allocations */ ddsrt_iovec_t iovec; /* iovec used for msgbuf */ #endif - ssize_t ret = -1; + ddsrt_ssize_t ret = -1; size_t len; ddsi_tcp_conn_t conn; int piecewise; @@ -662,7 +659,7 @@ static ssize_t ddsi_tcp_conn_write (struct ddsi_tran_conn * base, const ddsi_loc { GVLOG (DDS_LC_TCP, "tcp write: sock %"PRIdSOCK" message filtered\n", conn->m_sock); ddsrt_mutex_unlock (&conn->m_mutex); - return (ssize_t) len; + return (ddsrt_ssize_t) len; } #ifdef DDS_HAS_TCP_TLS @@ -740,7 +737,7 @@ static ssize_t ddsi_tcp_conn_write (struct ddsi_tran_conn * base, const ddsi_loc if (piecewise) { - ssize_t (*wr) (ddsi_tcp_conn_t, const void *, size_t, dds_return_t *) = ddsi_tcp_conn_write_plain; + ddsrt_ssize_t (*wr) (ddsi_tcp_conn_t, const void *, size_t, dds_return_t *) = ddsi_tcp_conn_write_plain; int i = 0; #ifdef DDS_HAS_TCP_TLS if (fact->ddsi_tcp_ssl_plugin.write) @@ -750,9 +747,9 @@ static ssize_t ddsi_tcp_conn_write (struct ddsi_tran_conn * base, const ddsi_loc #endif assert (msg.msg_iov[i].iov_len > 0); - while (ret >= (ssize_t) msg.msg_iov[i].iov_len) + while (ret >= (ddsrt_ssize_t) msg.msg_iov[i].iov_len) { - ret -= (ssize_t) msg.msg_iov[i++].iov_len; + ret -= (ddsrt_ssize_t) msg.msg_iov[i++].iov_len; } assert (i < (int) msg.msg_iovlen); ret = ddsi_tcp_block_write (wr, conn, (const char *) msg.msg_iov[i].iov_base + ret, msg.msg_iov[i].iov_len - (size_t) ret); @@ -965,6 +962,10 @@ static ddsi_tcp_conn_t ddsi_tcp_new_conn (struct ddsi_tran_factory_tcp *fact, co conn->m_base.m_base.m_port = INVALID_PORT; ddsi_tcp_conn_set_socket (conn, sock); + char buff[DDSI_LOCSTRLEN]; + struct ddsi_domaingv * const gv = fact->fact.gv; + sockaddr_to_string_with_port(buff, sizeof(buff), &conn->m_peer_addr.a); + GVLOG (DDS_LC_TCP, "tcp new %s connection on socket %"PRIdSOCK" to %s (%p)\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff, (void *) conn); return conn; } @@ -1015,7 +1016,7 @@ static void ddsi_tcp_conn_delete (ddsi_tcp_conn_t conn) struct ddsi_domaingv const * const gv = fact->fact.gv; char buff[DDSI_LOCSTRLEN]; sockaddr_to_string_with_port(buff, sizeof(buff), &conn->m_peer_addr.a); - GVLOG (DDS_LC_TCP, "tcp free %s connection on socket %"PRIdSOCK" to %s\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff); + GVLOG (DDS_LC_TCP, "tcp free %s connection on socket %"PRIdSOCK" to %s (%p)\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff, (void *) conn); #ifdef DDS_HAS_TCP_TLS if (fact->ddsi_tcp_ssl_plugin.ssl_free) @@ -1123,11 +1124,23 @@ static void ddsi_tcp_release_listener (struct ddsi_tran_listener * listener) ddsrt_free (tl); } +static void ddsi_tcp_release_factory_free_cache_node (void *vnode) +{ + ddsi_tcp_node_t node = vnode; + struct ddsi_domaingv * const gv = node->m_conn->m_base.m_factory->gv; + GVLOG (DDS_LC_TCP, "tcp cache free on shutdown (%p)\n", (void *) node->m_conn); + // TCP support code really is broken ... it doesn't ordinarily leak connections, + // but it does sometimes when a connection is created by a message sent to a + // peer that was removed just before. The TCP code should be rewritten entirely. + ddsrt_atomic_st32 (&node->m_conn->m_base.m_count, 1); + ddsi_tcp_node_free (node); +} + static void ddsi_tcp_release_factory (struct ddsi_tran_factory *fact_cmn) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) fact_cmn; struct ddsi_domaingv const * const gv = fact->fact.gv; - ddsrt_avl_free (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, ddsi_tcp_node_free); + ddsrt_avl_free (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, ddsi_tcp_release_factory_free_cache_node); ddsrt_mutex_destroy (&fact->ddsi_tcp_cache_lock_g); #ifdef DDS_HAS_TCP_TLS if (fact->ddsi_tcp_ssl_plugin.fini) diff --git a/src/core/ddsi/src/ddsi_tran.c b/src/core/ddsi/src/ddsi_tran.c index 093b0e627b..938a4060be 100644 --- a/src/core/ddsi/src/ddsi_tran.c +++ b/src/core/ddsi/src/ddsi_tran.c @@ -35,8 +35,8 @@ extern inline dds_return_t ddsi_factory_create_conn (struct ddsi_tran_conn **con extern inline int ddsi_listener_locator (struct ddsi_tran_listener * listener, ddsi_locator_t * loc); extern inline int ddsi_listener_listen (struct ddsi_tran_listener * listener); extern inline struct ddsi_tran_conn * ddsi_listener_accept (struct ddsi_tran_listener * listener); -extern inline ssize_t ddsi_conn_read (struct ddsi_tran_conn * conn, unsigned char * buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo); -extern inline ssize_t ddsi_conn_write (struct ddsi_tran_conn * conn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags); +extern inline ddsrt_ssize_t ddsi_conn_read (struct ddsi_tran_conn * conn, unsigned char * buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo); +extern inline ddsrt_ssize_t ddsi_conn_write (struct ddsi_tran_conn * conn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags); extern inline uint32_t ddsi_tran_get_locator_port (const struct ddsi_tran_factory *factory, const ddsi_locator_t *loc); extern inline void ddsi_tran_set_locator_port (const struct ddsi_tran_factory *factory, ddsi_locator_t *loc, uint32_t port); extern inline uint32_t ddsi_tran_get_locator_aux (const struct ddsi_tran_factory *factory, const ddsi_locator_t *loc); diff --git a/src/core/ddsi/src/ddsi_udp.c b/src/core/ddsi/src/ddsi_udp.c index 6fb2aca17b..98c420bff1 100644 --- a/src/core/ddsi/src/ddsi_udp.c +++ b/src/core/ddsi/src/ddsi_udp.c @@ -151,7 +151,7 @@ static void translate_pktinfo (struct ddsi_network_packet_info *pktinfo, ddsrt_m pktinfo->if_index = 0; } -static ssize_t ddsi_udp_conn_read (struct ddsi_tran_conn * conn_cmn, unsigned char * buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo) +static ddsrt_ssize_t ddsi_udp_conn_read (struct ddsi_tran_conn * conn_cmn, unsigned char * buf, size_t len, bool allow_spurious, struct ddsi_network_packet_info *pktinfo) { ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn; struct ddsi_domaingv * const gv = conn->m_base.m_base.gv; @@ -187,7 +187,7 @@ static ssize_t ddsi_udp_conn_read (struct ddsi_tran_conn * conn_cmn, unsigned ch (void) allow_spurious; dds_return_t rc; - ssize_t nrecv; + ddsrt_ssize_t nrecv; do { rc = ddsrt_recvmsg (&conn->m_sockext, &msghdr, 0, &nrecv); } while (rc == DDS_RETCODE_INTERRUPTED); @@ -234,12 +234,12 @@ static ssize_t ddsi_udp_conn_read (struct ddsi_tran_conn * conn_cmn, unsigned ch return nrecv; } -static ssize_t ddsi_udp_conn_write (struct ddsi_tran_conn * conn_cmn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) +static ddsrt_ssize_t ddsi_udp_conn_write (struct ddsi_tran_conn * conn_cmn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) { ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn; struct ddsi_domaingv * const gv = conn->m_base.m_base.gv; dds_return_t rc; - ssize_t nsent = -1; + ddsrt_ssize_t nsent = -1; unsigned retry = 2; int sendflags = 0; #if defined _WIN32 && !defined WINCE diff --git a/src/core/ddsi/src/ddsi_vnet.c b/src/core/ddsi/src/ddsi_vnet.c index 9e077c182f..f40ba21c1d 100644 --- a/src/core/ddsi/src/ddsi_vnet.c +++ b/src/core/ddsi/src/ddsi_vnet.c @@ -64,13 +64,13 @@ static int ddsi_vnet_conn_locator (struct ddsi_tran_factory * vfact, struct ddsi return 0; } -static ssize_t ddsi_vnet_conn_write (struct ddsi_tran_conn * conn_cmn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) +static ddsrt_ssize_t ddsi_vnet_conn_write (struct ddsi_tran_conn * conn_cmn, const ddsi_locator_t *dst, const ddsi_tran_write_msgfrags_t *msgfrags, uint32_t flags) { (void) conn_cmn; (void) dst; (void) flags; ddsrt_iov_len_t n = 0; for (size_t i = 0; i < msgfrags->niov; i++) n += msgfrags->iov[i].iov_len; - return (ssize_t) n; + return (ddsrt_ssize_t) n; } static dds_return_t ddsi_vnet_create_conn (struct ddsi_tran_conn **conn_out, struct ddsi_tran_factory * fact_cmn, uint32_t port, const struct ddsi_tran_qos *qos) diff --git a/src/core/ddsi/src/ddsi_xmsg.c b/src/core/ddsi/src/ddsi_xmsg.c index f1cd9a7a11..fe7c963032 100644 --- a/src/core/ddsi/src/ddsi_xmsg.c +++ b/src/core/ddsi/src/ddsi_xmsg.c @@ -1117,9 +1117,9 @@ void ddsi_xpack_free (struct ddsi_xpack *xp) ddsrt_free (xp); } -static ssize_t ddsi_xpack_send_rtps(struct ddsi_xpack * xp, const ddsi_xlocator_t *loc) +static ddsrt_ssize_t ddsi_xpack_send_rtps(struct ddsi_xpack * xp, const ddsi_xlocator_t *loc) { - ssize_t ret = -1; + ddsrt_ssize_t ret = -1; #ifdef DDS_HAS_SECURITY /* Only encode when needed. */ @@ -1145,11 +1145,11 @@ static ssize_t ddsi_xpack_send_rtps(struct ddsi_xpack * xp, const ddsi_xlocator_ return ret; } -static ssize_t ddsi_xpack_send1 (const ddsi_xlocator_t *loc, void * varg) +static ddsrt_ssize_t ddsi_xpack_send1 (const ddsi_xlocator_t *loc, void * varg) { struct ddsi_xpack *xp = varg; struct ddsi_domaingv const * const gv = xp->gv; - ssize_t nbytes = 0; + ddsrt_ssize_t nbytes = 0; if (gv->logconfig.c.mask & DDS_LC_TRACE) { @@ -1189,7 +1189,7 @@ static ssize_t ddsi_xpack_send1 (const ddsi_xlocator_t *loc, void * varg) else { GVTRACE ("(dropped)"); - nbytes = (ssize_t) xp->msg_len.length; + nbytes = (ddsrt_ssize_t) xp->msg_len.length; } /* Clear call flags, as used on a per call basis */ diff --git a/src/core/ddsi/tests/plist_leasedur.c b/src/core/ddsi/tests/plist_leasedur.c index cdd5a9452f..72bfcbc690 100644 --- a/src/core/ddsi/tests/plist_leasedur.c +++ b/src/core/ddsi/tests/plist_leasedur.c @@ -319,7 +319,7 @@ static void ddsi_plist_leasedur_new_proxypp_impl (bool include_lease_duration) memcpy (buf + size, pkt_trailer, sizeof (pkt_trailer)); size += sizeof (pkt_trailer); ddsi_rmsg_setsize (rmsg, (uint32_t) size); - ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, buf, &pktinfo); + ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, &pktinfo); ddsi_rmsg_commit (rmsg); // Discovery data processing is done by the dq.builtin thread, so we can't be @@ -453,7 +453,7 @@ static void ddsi_plist_leasedur_new_proxyrd_impl (bool include_lease_duration) memcpy (buf + size, pkt_p4, sizeof (pkt_p4)); size += sizeof (pkt_p4); ddsi_rmsg_setsize (rmsg, (uint32_t) size); - ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, buf, &pktinfo); + ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, &pktinfo); ddsi_rmsg_commit (rmsg); // Discovery data processing is done by the dq.builtin thread, so we can't be diff --git a/src/core/ddsi/tests/pmd_message.c b/src/core/ddsi/tests/pmd_message.c index de1f3b7ab9..a716b56bac 100644 --- a/src/core/ddsi/tests/pmd_message.c +++ b/src/core/ddsi/tests/pmd_message.c @@ -210,7 +210,7 @@ static void create_fake_proxy_participant (void) memcpy (buf, spdp_pkt, sizeof (spdp_pkt)); size += sizeof (spdp_pkt); ddsi_rmsg_setsize (rmsg, (uint32_t) size); - ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, buf, &pktinfo); + ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, &pktinfo); ddsi_rmsg_commit (rmsg); // wait until SPDP message has been processed wait_for_dqueue (); @@ -294,7 +294,7 @@ static void send_pmd_message (uint32_t seqlo, uint16_t encoding, uint16_t option memcpy (buf, pmd_pkt, sizeof (pmd_pkt)); size += sizeof (pmd_pkt) - 24 + act_payload_size; ddsi_rmsg_setsize (rmsg, (uint32_t) size); - ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, buf, &pktinfo); + ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, size, &pktinfo); ddsi_rmsg_commit (rmsg); // wait until PMD message has been processed wait_for_dqueue (); diff --git a/src/core/ddsi/tests/receive_packet.c b/src/core/ddsi/tests/receive_packet.c index a2ad431abb..62de19651d 100644 --- a/src/core/ddsi/tests/receive_packet.c +++ b/src/core/ddsi/tests/receive_packet.c @@ -418,7 +418,7 @@ CU_Test (ddsi_receive_packet, rti_dispose_with_key) ddsi_rmsg_setsize (rmsg, (uint32_t) sizeof (rtps_message)); struct ddsi_thread_state * const thrst = ddsi_lookup_thread_state (); - ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, (uint32_t) sizeof (rtps_message), buf, &pktinfo); + ddsi_handle_rtps_message (thrst, &gv, gv.data_conn_uc, NULL, rbufpool, rmsg, (uint32_t) sizeof (rtps_message), &pktinfo); ddsi_rmsg_commit (rmsg); receive_packet_fini (); diff --git a/src/ddsrt/CMakeLists.txt b/src/ddsrt/CMakeLists.txt index a78ed8a84e..f37eb6588e 100644 --- a/src/ddsrt/CMakeLists.txt +++ b/src/ddsrt/CMakeLists.txt @@ -369,7 +369,7 @@ set(DDSRT_WITH_LWIP ${WITH_LWIP}) set(DDSRT_WITH_FREERTOS ${WITH_FREERTOS}) foreach(feature TCP_TLS SECURITY LIFESPAN DEADLINE_MISSED NETWORK_PARTITIONS - TYPELIB TYPE_DISCOVERY TOPIC_DISCOVERY QOS_PROVIDER) + TYPELIB TYPE_DISCOVERY TOPIC_DISCOVERY QOS_PROVIDER TCP) set(DDS_HAS_${feature} ${ENABLE_${feature}}) endforeach() diff --git a/src/ddsrt/include/dds/ddsrt/sockets.h b/src/ddsrt/include/dds/ddsrt/sockets.h index b93ea9beb4..37c1f60f16 100644 --- a/src/ddsrt/include/dds/ddsrt/sockets.h +++ b/src/ddsrt/include/dds/ddsrt/sockets.h @@ -219,7 +219,7 @@ ddsrt_send( const void *buf, size_t len, int flags, - ssize_t *sent); + ddsrt_ssize_t *sent); /** * @brief Send a message @@ -240,7 +240,7 @@ ddsrt_sendmsg( ddsrt_socket_t sock, const ddsrt_msghdr_t *msg, int flags, - ssize_t *sent); + ddsrt_ssize_t *sent); /** * @brief Receive data into a buffer @@ -266,7 +266,7 @@ ddsrt_recv( void *buf, size_t len, int flags, - ssize_t *rcvd); + ddsrt_ssize_t *rcvd); /** * @brief Receive a message @@ -290,7 +290,7 @@ ddsrt_recvmsg( const ddsrt_socket_ext_t *sockext, ddsrt_msghdr_t *msg, int flags, - ssize_t *rcvd); + ddsrt_ssize_t *rcvd); /** * @brief Get options from the socket. diff --git a/src/ddsrt/include/dds/ddsrt/sync/freertos.h b/src/ddsrt/include/dds/ddsrt/sync/freertos.h index bc9efe8b90..87892d98a0 100644 --- a/src/ddsrt/include/dds/ddsrt/sync/freertos.h +++ b/src/ddsrt/include/dds/ddsrt/sync/freertos.h @@ -92,7 +92,7 @@ void ddsrt_tasklist_rtrim(ddsrt_tasklist_t *list); void ddsrt_tasklist_pack(ddsrt_tasklist_t *list); int ddsrt_tasklist_shrink(ddsrt_tasklist_t *list); int ddsrt_tasklist_grow(ddsrt_tasklist_t *list); -ssize_t ddsrt_tasklist_find(ddsrt_tasklist_t *list, TaskHandle_t task); +int ddsrt_tasklist_find(ddsrt_tasklist_t *list, TaskHandle_t task); TaskHandle_t ddsrt_tasklist_peek(ddsrt_tasklist_t *list, TaskHandle_t task); TaskHandle_t ddsrt_tasklist_pop(ddsrt_tasklist_t *list, TaskHandle_t task); int ddsrt_tasklist_push(ddsrt_tasklist_t *list, TaskHandle_t task); diff --git a/src/ddsrt/include/dds/ddsrt/types/posix.h b/src/ddsrt/include/dds/ddsrt/types/posix.h index 380c583818..f914413084 100644 --- a/src/ddsrt/include/dds/ddsrt/types/posix.h +++ b/src/ddsrt/include/dds/ddsrt/types/posix.h @@ -14,9 +14,10 @@ #include #include #if defined(__IAR_SYSTEMS_ICC__) -typedef long int ssize_t; +typedef long int ddsrt_ssize_t; #else #include +typedef ssize_t ddsrt_ssize_t; #endif #endif /* DDSRT_TYPES_POSIX_H */ diff --git a/src/ddsrt/include/dds/ddsrt/types/windows.h b/src/ddsrt/include/dds/ddsrt/types/windows.h index b8f4b465ba..c12342d4e3 100644 --- a/src/ddsrt/include/dds/ddsrt/types/windows.h +++ b/src/ddsrt/include/dds/ddsrt/types/windows.h @@ -26,7 +26,9 @@ #include #ifdef _MSC_VER -typedef SSIZE_T ssize_t; +typedef SSIZE_T ddsrt_ssize_t; +#else +typedef ssize_t ddsrt_ssize_t; #endif #endif /* DDSRT_TYPES_WINDOWS_H */ diff --git a/src/ddsrt/include/dds/features.h.in b/src/ddsrt/include/dds/features.h.in index a294df19bf..e3ecf09328 100644 --- a/src/ddsrt/include/dds/features.h.in +++ b/src/ddsrt/include/dds/features.h.in @@ -44,5 +44,8 @@ /* Whether or not support for qos provider is included */ #cmakedefine DDS_HAS_QOS_PROVIDER 1 +/* Whether or not support for DDS-over-TCP is included */ +#cmakedefine DDS_HAS_TCP 1 + #endif diff --git a/src/ddsrt/src/ifaddrs/posix/ifaddrs.c b/src/ddsrt/src/ifaddrs/posix/ifaddrs.c index e26a768b90..a7cbc3a18e 100644 --- a/src/ddsrt/src/ifaddrs/posix/ifaddrs.c +++ b/src/ddsrt/src/ifaddrs/posix/ifaddrs.c @@ -226,7 +226,7 @@ static bool is_the_kernel_likely_lying_about_multicast (const ddsrt_ifaddrs_t *i .msg_controllen = 0, .msg_flags = 0 }; - if (sendmsg (sock, &msg, 0) != (ssize_t) sizeof (contents)) + if (sendmsg (sock, &msg, 0) != (ddsrt_ssize_t) sizeof (contents)) goto out; #ifndef __APPLE__ // because we do a short timeout instead if (fcntl (sock, F_SETFL, O_NONBLOCK) == -1) @@ -234,7 +234,7 @@ static bool is_the_kernel_likely_lying_about_multicast (const ddsrt_ifaddrs_t *i #endif unsigned char recvbuf[sizeof (contents)]; msg.msg_iov = &(struct iovec) { .iov_len = sizeof (recvbuf), .iov_base = recvbuf }; - ssize_t nrecv; + ddsrt_ssize_t nrecv; while ((nrecv = recvmsg (sock, &msg, 0)) > 0) { if (nrecv == sizeof (recvbuf) && diff --git a/src/ddsrt/src/sockets/posix/socket.c b/src/ddsrt/src/sockets/posix/socket.c index c1dc85bc25..384e11b287 100644 --- a/src/ddsrt/src/sockets/posix/socket.c +++ b/src/ddsrt/src/sockets/posix/socket.c @@ -436,9 +436,9 @@ ddsrt_recv( void *buf, size_t len, int flags, - ssize_t *rcvd) + ddsrt_ssize_t *rcvd) { - ssize_t n; + ddsrt_ssize_t n; if ((n = recv(sock, buf, len, flags)) != -1) { assert(n >= 0); @@ -450,7 +450,7 @@ ddsrt_recv( } #if (LWIP_SOCKET && !defined(recvmsg)) || defined(__ZEPHYR__) -static ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags) +static ddsrt_ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags) { assert(msg->msg_iovlen == 1); assert(msg->msg_controllen == 0); @@ -472,9 +472,9 @@ ddsrt_recvmsg( const ddsrt_socket_ext_t *sockext, ddsrt_msghdr_t *msg, int flags, - ssize_t *rcvd) + ddsrt_ssize_t *rcvd) { - ssize_t n; + ddsrt_ssize_t n; if ((n = recvmsg(sockext->sock, msg, flags)) != -1) { assert(n >= 0); @@ -535,9 +535,9 @@ ddsrt_send( const void *buf, size_t len, int flags, - ssize_t *sent) + ddsrt_ssize_t *sent) { - ssize_t n; + ddsrt_ssize_t n; if ((n = send(sock, buf, len, flags)) != -1) { assert(n >= 0); @@ -553,9 +553,9 @@ ddsrt_sendmsg( ddsrt_socket_t sock, const ddsrt_msghdr_t *msg, int flags, - ssize_t *sent) + ddsrt_ssize_t *sent) { - ssize_t n; + ddsrt_ssize_t n; if ((n = sendmsg(sock, msg, flags)) != -1) { assert(n >= 0); diff --git a/src/ddsrt/src/sockets/windows/socket.c b/src/ddsrt/src/sockets/windows/socket.c index aeba8496fb..067807c2e2 100644 --- a/src/ddsrt/src/sockets/windows/socket.c +++ b/src/ddsrt/src/sockets/windows/socket.c @@ -522,9 +522,9 @@ ddsrt_recv( void *buf, size_t len, int flags, - ssize_t *rcvd) + ddsrt_ssize_t *rcvd) { - ssize_t n; + ddsrt_ssize_t n; assert(len < INT_MAX); @@ -550,7 +550,7 @@ ddsrt_recvmsg_wsarecvmsg( const ddsrt_socket_ext_t *sockext, ddsrt_msghdr_t *msg, int flags, - ssize_t *rcvd) + ddsrt_ssize_t *rcvd) { WSAMSG wsamsg = { .name = (LPSOCKADDR) msg->msg_name, @@ -570,7 +570,7 @@ ddsrt_recvmsg_wsarecvmsg( // WSAEMSGSIZE is not an error for us: we look at (msg_flags & MSG_TRUNC) msg->msg_flags = wsamsg.dwFlags; msg->msg_controllen = wsamsg.Control.len; - *rcvd = (ssize_t) n; + *rcvd = (ddsrt_ssize_t) n; return DDS_RETCODE_OK; } else @@ -584,7 +584,7 @@ ddsrt_recvmsg_recvfrom( const ddsrt_socket_ext_t *sockext, ddsrt_msghdr_t *msg, int flags, - ssize_t *rcvd) + ddsrt_ssize_t *rcvd) { assert(msg->msg_iovlen == 1); assert(msg->msg_iov[0].iov_len < INT_MAX); @@ -616,7 +616,7 @@ ddsrt_recvmsg( const ddsrt_socket_ext_t *sockext, ddsrt_msghdr_t *msg, int flags, - ssize_t *rcvd) + ddsrt_ssize_t *rcvd) { assert(msg != NULL); if (sockext->wsarecvmsg) @@ -677,7 +677,7 @@ ddsrt_send( const void *buf, size_t len, int flags, - ssize_t *sent) + ddsrt_ssize_t *sent) { int n; @@ -698,7 +698,7 @@ ddsrt_sendmsg( ddsrt_socket_t sock, const ddsrt_msghdr_t *msg, int flags, - ssize_t *sent) + ddsrt_ssize_t *sent) { int ret; DWORD n; @@ -719,7 +719,7 @@ ddsrt_sendmsg( NULL, NULL); if (ret != SOCKET_ERROR) { - *sent = (ssize_t)n; + *sent = (ddsrt_ssize_t)n; return DDS_RETCODE_OK; } diff --git a/src/ddsrt/src/sync/freertos/tasklist.c b/src/ddsrt/src/sync/freertos/tasklist.c index f3f15d2823..845980e103 100644 --- a/src/ddsrt/src/sync/freertos/tasklist.c +++ b/src/ddsrt/src/sync/freertos/tasklist.c @@ -280,7 +280,7 @@ int ddsrt_tasklist_grow(ddsrt_tasklist_t *list) return 0; } -ssize_t ddsrt_tasklist_find(ddsrt_tasklist_t *list, TaskHandle_t task) +int ddsrt_tasklist_find(ddsrt_tasklist_t *list, TaskHandle_t task) { size_t i, n; @@ -292,14 +292,14 @@ ssize_t ddsrt_tasklist_find(ddsrt_tasklist_t *list, TaskHandle_t task) n = list->off <= list->end ? list->end : list->len - 1; for (i = list->off; i <= n; i++) { if (list->tasks[i] == task) - return (ssize_t)i; + return (int)i; } if (list->off > list->end) { n = list->end; for (i = 0; i <= n; i++) { if (list->tasks[i] == task) - return (ssize_t)i; + return (int)i; } } } @@ -322,14 +322,14 @@ TaskHandle_t ddsrt_tasklist_peek(ddsrt_tasklist_t *list, TaskHandle_t task) TaskHandle_t ddsrt_tasklist_pop(ddsrt_tasklist_t *list, TaskHandle_t task) { - ssize_t i; + int i; tasklist_assert(list); if (list->cnt == 0) { return NULL; } else if (task == NULL) { - i = (ssize_t)list->off; + i = (int)list->off; } else if ((i = ddsrt_tasklist_find(list, task)) == -1) { return NULL; } @@ -342,10 +342,10 @@ TaskHandle_t ddsrt_tasklist_pop(ddsrt_tasklist_t *list, TaskHandle_t task) if (list->cnt == 0) { list->off = list->end = 0; - } else if (i == (ssize_t)list->end) { + } else if (i == (int)list->end) { /* Trim invalidated buckets from tail of window. */ ddsrt_tasklist_rtrim(list); - } else if (i == (ssize_t)list->off) { + } else if (i == (int)list->off) { /* Trim invalidated buckets from head of window. */ ddsrt_tasklist_ltrim(list); } else { diff --git a/src/ddsrt/tests/select.c b/src/ddsrt/tests/select.c index 57dfaf8cfb..ac05a51c1e 100644 --- a/src/ddsrt/tests/select.c +++ b/src/ddsrt/tests/select.c @@ -210,7 +210,7 @@ CU_Test(ddsrt_select, timeout) dds_sleepfor(arg.delay * 2); /* Send data to the read socket to avoid blocking indefinitely. */ fprintf (stderr, "write data\n"); - ssize_t sent = 0; + ddsrt_ssize_t sent = 0; rc = ddsrt_send(socks[1], mesg, sizeof(mesg), 0, &sent); CU_ASSERT_EQ_FATAL (rc, DDS_RETCODE_OK); fprintf (stderr, "join thread\n"); @@ -228,7 +228,7 @@ static uint32_t recv_routine(void *ptr) thread_arg_t *arg = (thread_arg_t*)ptr; fd_set rdset; - ssize_t rcvd = -1; + ddsrt_ssize_t rcvd = -1; char buf[sizeof(mesg)]; FD_ZERO(&rdset); @@ -268,7 +268,7 @@ CU_Test(ddsrt_select, send_recv) rc = ddsrt_thread_create(&thr, "recv", &attr, &recv_routine, &arg); CU_ASSERT_EQ_FATAL (rc, DDS_RETCODE_OK); - ssize_t sent = 0; + ddsrt_ssize_t sent = 0; rc = ddsrt_send(socks[1], mesg, sizeof(mesg), 0, &sent); CU_ASSERT_EQ (rc, DDS_RETCODE_OK); CU_ASSERT_EQ (sent, sizeof(mesg)); @@ -287,7 +287,7 @@ static uint32_t recvmsg_routine(void *ptr) thread_arg_t *arg = (thread_arg_t*)ptr; fd_set rdset; - ssize_t rcvd = -1; + ddsrt_ssize_t rcvd = -1; char buf[sizeof(mesg)]; ddsrt_msghdr_t msg; ddsrt_iovec_t iov; @@ -334,7 +334,7 @@ CU_Test(ddsrt_select, sendmsg_recvmsg) rc = ddsrt_thread_create(&thr, "recvmsg", &attr, &recvmsg_routine, &arg); CU_ASSERT_EQ_FATAL (rc, DDS_RETCODE_OK); - ssize_t sent = 0; + ddsrt_ssize_t sent = 0; ddsrt_msghdr_t msg; ddsrt_iovec_t iov; memset(&msg, 0, sizeof(msg));