diff --git a/.evergreen/compile-unix.sh b/.evergreen/compile-unix.sh index 10101632577..84b73c153b8 100755 --- a/.evergreen/compile-unix.sh +++ b/.evergreen/compile-unix.sh @@ -262,10 +262,10 @@ mkfifo pipe || true if [ -e pipe ]; then set +o xtrace tee error.log < pipe & - run_valgrind ./src/libmongoc/test-libmongoc -d -F test-results.json 2>pipe + run_valgrind ./src/libmongoc/test-libmongoc --no-fork -d -F test-results.json 2>pipe rm pipe else - run_valgrind ./src/libmongoc/test-libmongoc -d -F test-results.json + run_valgrind ./src/libmongoc/test-libmongoc --no-fork -d -F test-results.json fi # Check if the error.log exists, and is more than 0 byte diff --git a/.evergreen/run-tests.sh b/.evergreen/run-tests.sh index 6f1925e21b4..e9485b6556f 100755 --- a/.evergreen/run-tests.sh +++ b/.evergreen/run-tests.sh @@ -93,7 +93,7 @@ case "$OS" in check_mongocryptd chmod +x src/libmongoc/Debug/test-libmongoc.exe - ./src/libmongoc/Debug/test-libmongoc.exe $TEST_ARGS + ./src/libmongoc/Debug/test-libmongoc.exe $TEST_ARGS -d ;; *) @@ -104,9 +104,9 @@ case "$OS" in if [ "$VALGRIND" = "on" ]; then . $DIR/valgrind.sh - run_valgrind ./src/libmongoc/test-libmongoc --no-fork $TEST_ARGS + run_valgrind ./src/libmongoc/test-libmongoc --no-fork $TEST_ARGS -d else - ./src/libmongoc/test-libmongoc --no-fork $TEST_ARGS + ./src/libmongoc/test-libmongoc --no-fork $TEST_ARGS -d fi ;; diff --git a/src/libmongoc/CMakeLists.txt b/src/libmongoc/CMakeLists.txt index e9de3388df7..ec7b48e049f 100644 --- a/src/libmongoc/CMakeLists.txt +++ b/src/libmongoc/CMakeLists.txt @@ -541,6 +541,7 @@ set (SOURCES ${SOURCES} ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-stream-gridfs-upload.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-stream-socket.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-topology.c + ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-topology-background-monitoring.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-topology-description.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-topology-description-apm.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-topology-scanner.c @@ -866,6 +867,7 @@ set (test-libmongoc-sources ${PROJECT_SOURCE_DIR}/tests/test-mongoc-array.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-async.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-aws.c + ${PROJECT_SOURCE_DIR}/tests/test-mongoc-background-monitoring.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-buffer.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-bulk.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-change-stream.c diff --git a/src/libmongoc/src/mongoc/CMakeLists.txt b/src/libmongoc/src/mongoc/CMakeLists.txt index 2d3bd4451f8..0acf4d28166 100644 --- a/src/libmongoc/src/mongoc/CMakeLists.txt +++ b/src/libmongoc/src/mongoc/CMakeLists.txt @@ -155,6 +155,7 @@ set (src_libmongoc_src_mongoc_DIST_noinst_hs mongoc-topology-description-apm-private.h mongoc-topology-description-private.h mongoc-topology-private.h + mongoc-topology-background-monitoring-private.h mongoc-topology-scanner-private.h mongoc-trace-private.h mongoc-uri-private.h @@ -170,6 +171,7 @@ set (src_libmongoc_src_mongoc_DIST_cs mongoc-array.c mongoc-async.c mongoc-async-cmd.c + mongoc-topology-background-monitoring.c mongoc-buffer.c mongoc-bulk-operation.c mongoc-change-stream.c diff --git a/src/libmongoc/src/mongoc/mongoc-client-pool.c b/src/libmongoc/src/mongoc/mongoc-client-pool.c index c7473fc5d6c..9f365aa9406 100644 --- a/src/libmongoc/src/mongoc/mongoc-client-pool.c +++ b/src/libmongoc/src/mongoc/mongoc-client-pool.c @@ -25,6 +25,7 @@ #include "mongoc-queue-private.h" #include "mongoc-thread-private.h" #include "mongoc-topology-private.h" +#include "mongoc-topology-background-monitoring-private.h" #include "mongoc-trace-private.h" #ifdef MONGOC_ENABLE_SSL @@ -218,9 +219,10 @@ mongoc_client_pool_destroy (mongoc_client_pool_t *pool) static void _start_scanner_if_needed (mongoc_client_pool_t *pool) { - if (!_mongoc_topology_start_background_scanner (pool->topology)) { - MONGOC_ERROR ("Background scanner did not start!"); - abort (); + if (!pool->topology->single_threaded) { + bson_mutex_lock (&pool->topology->mutex); + _mongoc_topology_background_monitoring_start (pool->topology); + bson_mutex_unlock (&pool->topology->mutex); } } diff --git a/src/libmongoc/src/mongoc/mongoc-client-private.h b/src/libmongoc/src/mongoc/mongoc-client-private.h index 5ab10f8b8af..b77ca610f48 100644 --- a/src/libmongoc/src/mongoc/mongoc-client-private.h +++ b/src/libmongoc/src/mongoc/mongoc-client-private.h @@ -233,6 +233,14 @@ mongoc_stream_t * mongoc_client_connect_tcp (int32_t connecttimeoutms, const mongoc_host_list_t *host, bson_error_t *error); + +mongoc_stream_t * +mongoc_client_connect (bool buffered, + bool use_ssl, + void *ssl_opts_void, + const mongoc_uri_t *uri, + const mongoc_host_list_t *host, + bson_error_t *error); BSON_END_DECLS #endif /* MONGOC_CLIENT_PRIVATE_H */ diff --git a/src/libmongoc/src/mongoc/mongoc-client.c b/src/libmongoc/src/mongoc/mongoc-client.c index 999fc53aa2c..02da5ddd45c 100644 --- a/src/libmongoc/src/mongoc/mongoc-client.c +++ b/src/libmongoc/src/mongoc/mongoc-client.c @@ -560,7 +560,7 @@ _mongoc_get_rr_search (const char *service, * * Fetch an SRV or TXT resource record and update put results in * @rr_data. - * + * * See RFCs 1464 and 2782, MongoDB's "Initial DNS Seedlist Discovery" * spec, and MongoDB's "Polling SRV Records for Mongos Discovery" * spec. @@ -644,10 +644,12 @@ mongoc_client_connect_tcp (int32_t connecttimeoutms, hints.ai_flags = 0; hints.ai_protocol = 0; + TRACE ("DNS lookup for %s", host->host); s = getaddrinfo (host->host, portstr, &hints, &result); if (s != 0) { mongoc_counter_dns_failure_inc (); + TRACE ("Failed to resolve %s", host->host); bson_set_error (error, MONGOC_ERROR_STREAM, MONGOC_ERROR_STREAM_NAME_RESOLUTION, @@ -764,49 +766,26 @@ mongoc_client_connect_unix (const mongoc_host_list_t *host, bson_error_t *error) #endif } - -/* - *-------------------------------------------------------------------------- - * - * mongoc_client_default_stream_initiator -- - * - * A mongoc_stream_initiator_t that will handle the various type - * of supported sockets by MongoDB including TCP and UNIX. - * - * Language binding authors may want to implement an alternate - * version of this method to use their native stream format. - * - * Returns: - * A mongoc_stream_t if successful; otherwise NULL and @error is set. - * - * Side effects: - * @error is set if return value is NULL. - * - *-------------------------------------------------------------------------- - */ - mongoc_stream_t * -mongoc_client_default_stream_initiator (const mongoc_uri_t *uri, - const mongoc_host_list_t *host, - void *user_data, - bson_error_t *error) +mongoc_client_connect (bool buffered, + bool use_ssl, + void *ssl_opts_void, + const mongoc_uri_t *uri, + const mongoc_host_list_t *host, + bson_error_t *error) { mongoc_stream_t *base_stream = NULL; int32_t connecttimeoutms; -#ifdef MONGOC_ENABLE_SSL - mongoc_client_t *client = (mongoc_client_t *) user_data; - const char *mechanism; -#endif BSON_ASSERT (uri); BSON_ASSERT (host); #ifndef MONGOC_ENABLE_SSL - if (mongoc_uri_get_tls (uri)) { + if (ssl_opts_void || mongoc_uri_get_tls (uri)) { bson_set_error (error, MONGOC_ERROR_CLIENT, MONGOC_ERROR_CLIENT_NO_ACCEPTABLE_PEER, - "SSL is not enabled in this build of mongo-c-driver."); + "TLS is not enabled in this build of mongo-c-driver."); return NULL; } #endif @@ -836,14 +815,17 @@ mongoc_client_default_stream_initiator (const mongoc_uri_t *uri, #ifdef MONGOC_ENABLE_SSL if (base_stream) { + mongoc_ssl_opt_t *ssl_opts; + const char *mechanism; + + ssl_opts = (mongoc_ssl_opt_t *) ssl_opts_void; mechanism = mongoc_uri_get_auth_mechanism (uri); - if (client->use_ssl || - (mechanism && (0 == strcmp (mechanism, "MONGODB-X509")))) { + if (use_ssl || (mechanism && (0 == strcmp (mechanism, "MONGODB-X509")))) { mongoc_stream_t *original = base_stream; base_stream = mongoc_stream_tls_new_with_hostname ( - base_stream, host->host, &client->ssl_opts, true); + base_stream, host->host, ssl_opts, true); if (!base_stream) { mongoc_stream_destroy (original); @@ -863,9 +845,54 @@ mongoc_client_default_stream_initiator (const mongoc_uri_t *uri, } #endif - return base_stream ? mongoc_stream_buffered_new (base_stream, 1024) : NULL; + if (!base_stream) { + return NULL; + } + if (buffered) { + return mongoc_stream_buffered_new (base_stream, 1024); + } + return base_stream; } +/* + *-------------------------------------------------------------------------- + * + * mongoc_client_default_stream_initiator -- + * + * A mongoc_stream_initiator_t that will handle the various type + * of supported sockets by MongoDB including TCP and UNIX. + * + * Language binding authors may want to implement an alternate + * version of this method to use their native stream format. + * + * Returns: + * A mongoc_stream_t if successful; otherwise NULL and @error is set. + * + * Side effects: + * @error is set if return value is NULL. + * + *-------------------------------------------------------------------------- + */ + +mongoc_stream_t * +mongoc_client_default_stream_initiator (const mongoc_uri_t *uri, + const mongoc_host_list_t *host, + void *user_data, + bson_error_t *error) +{ + void *ssl_opts_void = NULL; + bool use_ssl = false; +#ifdef MONGOC_ENABLE_SSL + mongoc_client_t *client = (mongoc_client_t *) user_data; + + use_ssl = client->use_ssl; + ssl_opts_void = (void *) &client->ssl_opts; + +#endif + + return mongoc_client_connect ( + true, use_ssl, ssl_opts_void, uri, host, error); +} /* *-------------------------------------------------------------------------- diff --git a/src/libmongoc/src/mongoc/mongoc-topology-background-monitoring-private.h b/src/libmongoc/src/mongoc/mongoc-topology-background-monitoring-private.h new file mode 100644 index 00000000000..86b612cb009 --- /dev/null +++ b/src/libmongoc/src/mongoc/mongoc-topology-background-monitoring-private.h @@ -0,0 +1,43 @@ + +/* + * Copyright 2020-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mongoc-prelude.h" + +#ifndef MONGOC_TOPOLOGY_BACKGROUND_MONITORING_PRIVATE_H +#define MONGOC_TOPOLOGY_BACKGROUND_MONITORING_PRIVATE_H + +/* Methods of mongoc_topology_t for managing background monitoring. */ + +struct _mongoc_topology_t; + +void +_mongoc_topology_background_monitoring_start ( + struct _mongoc_topology_t *topology); + +void +_mongoc_topology_background_monitoring_reconcile ( + struct _mongoc_topology_t *topology); + +void +_mongoc_topology_background_monitoring_request_scan ( + struct _mongoc_topology_t *topology); + +void +_mongoc_topology_background_monitoring_stop ( + struct _mongoc_topology_t *topology); + +#endif /* MONGOC_TOPOLOGY_BACKGROUND_MONITORING_PRIVATE_H */ diff --git a/src/libmongoc/src/mongoc/mongoc-topology-background-monitoring.c b/src/libmongoc/src/mongoc/mongoc-topology-background-monitoring.c new file mode 100644 index 00000000000..605ed764312 --- /dev/null +++ b/src/libmongoc/src/mongoc/mongoc-topology-background-monitoring.c @@ -0,0 +1,845 @@ +/* + * Copyright 2020-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mongoc-topology-background-monitoring-private.h" + +#include "mongoc-client-private.h" +#include "mongoc-log-private.h" +#ifdef MONGOC_ENABLE_SSL +#include "mongoc-ssl-private.h" +#endif +#include "mongoc-stream-private.h" +#include "mongoc-topology-description-apm-private.h" +#include "mongoc-topology-private.h" +#include "mongoc-trace-private.h" +#include "mongoc-util-private.h" + +#undef MONGOC_LOG_DOMAIN +#define MONGOC_LOG_DOMAIN "bg_monitor" + +static BSON_THREAD_FUN (srv_polling_run, topology_void) +{ + mongoc_topology_t *topology; + + topology = topology_void; + bson_mutex_lock (&topology->mutex); + while (true) { + int64_t now_ms; + int64_t scan_due_ms; + int64_t sleep_duration_ms; + + if (topology->scanner_state != MONGOC_TOPOLOGY_SCANNER_BG_RUNNING) { + bson_mutex_unlock (&topology->mutex); + break; + } + + /* This will check if a scan is due. */ + mongoc_topology_rescan_srv (topology); + + /* Unlock and sleep until next scan is due, or until shutdown signalled. + */ + now_ms = bson_get_monotonic_time () / 1000; + scan_due_ms = topology->srv_polling_last_scan_ms + + topology->srv_polling_rescan_interval_ms; + sleep_duration_ms = scan_due_ms - now_ms; + + if (sleep_duration_ms > 0) { + TRACE ("srv polling thread sleeping for %" PRId64 "ms", + sleep_duration_ms); + } + + /* If shutting down, stop. */ + mongoc_cond_timedwait ( + &topology->srv_polling_cond, &topology->mutex, sleep_duration_ms); + } + BSON_THREAD_RETURN; +} + +typedef struct { + mongoc_topology_t *topology; + bson_thread_t thread; + + /* State accessed from multiple threads. */ + struct { + bson_mutex_t mutex; + mongoc_cond_t cond; + bool shutting_down; + bool is_shutdown; + bool scan_requested; + } shared; + + /* Time of last scan in milliseconds. */ + uint64_t last_scan_ms; + /* The time of the next scheduled scan. */ + uint64_t scan_due_ms; + /* The server id matching the server description. */ + uint32_t server_id; + /* Default time to sleep between ismaster checks (reduced when a scan is + * requested) */ + uint64_t heartbeat_frequency_ms; + /* The minimum time to sleep between ismaster checks. */ + uint64_t min_heartbeat_frequency_ms; + int64_t connect_timeout_ms; + bool use_tls; +#ifdef MONGOC_ENABLE_SSL + mongoc_ssl_opt_t *ssl_opts; +#endif + mongoc_uri_t *uri; + mongoc_host_list_t host; + /* A custom initiator may be set if a user provides overrides to create a + * stream. */ + mongoc_stream_initiator_t initiator; + void *initiator_context; + mongoc_stream_t *stream; + int64_t request_id; + mongoc_apm_callbacks_t apm_callbacks; + void *apm_context; +} mongoc_server_monitor_t; + +/* Called only from server monitor thread. + * Caller must hold no locks (user's callback may lock topology mutex). + * Locks APM mutex. + */ +static void +_server_monitor_heartbeat_started (mongoc_server_monitor_t *server_monitor) +{ + mongoc_apm_server_heartbeat_started_t event; + if (!server_monitor->apm_callbacks.server_heartbeat_started) { + return; + } + + event.host = &server_monitor->host; + event.context = server_monitor->apm_context; + bson_mutex_lock (&server_monitor->topology->apm_mutex); + server_monitor->apm_callbacks.server_heartbeat_started (&event); + bson_mutex_unlock (&server_monitor->topology->apm_mutex); +} + +/* Called only from server monitor thread. + * Caller must hold no locks (user's callback may lock topology mutex). + * Locks APM mutex. + */ +static void +_server_monitor_heartbeat_succeeded (mongoc_server_monitor_t *server_monitor, + const bson_t *reply, + int64_t duration_usec) +{ + mongoc_apm_server_heartbeat_succeeded_t event; + + if (!server_monitor->apm_callbacks.server_heartbeat_succeeded) { + return; + } + event.host = &server_monitor->host; + event.context = server_monitor->apm_context; + event.reply = reply; + event.duration_usec = duration_usec; + bson_mutex_lock (&server_monitor->topology->apm_mutex); + server_monitor->apm_callbacks.server_heartbeat_succeeded (&event); + bson_mutex_unlock (&server_monitor->topology->apm_mutex); +} + +/* Called only from server monitor thread. + * Caller must hold no locks (user's callback may lock topology mutex). + * Locks APM mutex. + */ +static void +_server_monitor_heartbeat_failed (mongoc_server_monitor_t *server_monitor, + const bson_error_t *error, + int64_t duration_usec) +{ + mongoc_apm_server_heartbeat_failed_t event; + + if (!server_monitor->apm_callbacks.server_heartbeat_failed) { + return; + } + + event.host = &server_monitor->host; + event.context = server_monitor->apm_context; + event.error = error; + event.duration_usec = duration_usec; + bson_mutex_lock (&server_monitor->topology->apm_mutex); + server_monitor->apm_callbacks.server_heartbeat_failed (&event); + bson_mutex_unlock (&server_monitor->topology->apm_mutex); +} + +static bool +_server_monitor_cmd_send (mongoc_server_monitor_t *server_monitor, + bson_t *cmd, + bson_t *reply, + bson_error_t *error) +{ + mongoc_rpc_t rpc; + mongoc_array_t array_to_write; + mongoc_iovec_t *iovec; + int niovec; + mongoc_buffer_t buffer; + uint32_t reply_len; + bson_t temp_reply; + + rpc.header.msg_len = 0; + rpc.header.request_id = server_monitor->request_id++; + rpc.header.response_to = 0; + rpc.header.opcode = MONGOC_OPCODE_QUERY; + rpc.query.flags = MONGOC_QUERY_SLAVE_OK; + rpc.query.collection = "admin.$cmd"; + rpc.query.skip = 0; + rpc.query.n_return = -1; + rpc.query.query = bson_get_data (cmd); + rpc.query.fields = NULL; + + _mongoc_array_init (&array_to_write, sizeof (mongoc_iovec_t)); + _mongoc_rpc_gather (&rpc, &array_to_write); + iovec = (mongoc_iovec_t *) array_to_write.data; + niovec = array_to_write.len; + _mongoc_rpc_swab_to_le (&rpc); + + if (!_mongoc_stream_writev_full (server_monitor->stream, + iovec, + niovec, + server_monitor->connect_timeout_ms, + error)) { + _mongoc_array_destroy (&array_to_write); + bson_init (reply); + return false; + } + _mongoc_array_destroy (&array_to_write); + + _mongoc_buffer_init (&buffer, NULL, 0, NULL, NULL); + if (!_mongoc_buffer_append_from_stream (&buffer, + server_monitor->stream, + 4, + server_monitor->connect_timeout_ms, + error)) { + _mongoc_buffer_destroy (&buffer); + bson_init (reply); + return false; + } + + memcpy (&reply_len, buffer.data, 4); + reply_len = BSON_UINT32_FROM_LE (reply_len); + + if (!_mongoc_buffer_append_from_stream (&buffer, + server_monitor->stream, + reply_len - buffer.len, + server_monitor->connect_timeout_ms, + error)) { + _mongoc_buffer_destroy (&buffer); + bson_init (reply); + return false; + } + + if (!_mongoc_rpc_scatter (&rpc, buffer.data, buffer.len)) { + bson_set_error (error, + MONGOC_ERROR_PROTOCOL, + MONGOC_ERROR_PROTOCOL_INVALID_REPLY, + "Invalid reply from server."); + + _mongoc_buffer_destroy (&buffer); + bson_init (reply); + return false; + } + + if (BSON_UINT32_FROM_LE (rpc.header.opcode) == MONGOC_OPCODE_COMPRESSED) { + uint8_t *buf = NULL; + size_t len = BSON_UINT32_FROM_LE (rpc.compressed.uncompressed_size) + + sizeof (mongoc_rpc_header_t); + + buf = bson_malloc0 (len); + if (!_mongoc_rpc_decompress (&rpc, buf, len)) { + bson_free (buf); + _mongoc_buffer_destroy (&buffer); + bson_init (reply); + bson_set_error (error, + MONGOC_ERROR_PROTOCOL, + MONGOC_ERROR_PROTOCOL_INVALID_REPLY, + "Could not decompress server reply"); + return MONGOC_ASYNC_CMD_ERROR; + } + + _mongoc_buffer_destroy (&buffer); + _mongoc_buffer_init (&buffer, buf, len, NULL, NULL); + } + + _mongoc_rpc_swab_from_le (&rpc); + + if (!_mongoc_rpc_get_first_document (&rpc, &temp_reply)) { + bson_set_error (error, + MONGOC_ERROR_PROTOCOL, + MONGOC_ERROR_PROTOCOL_INVALID_REPLY, + "Invalid reply from server"); + _mongoc_buffer_destroy (&buffer); + bson_init (reply); + return false; + } + bson_copy_to (&temp_reply, reply); + _mongoc_buffer_destroy (&buffer); + return true; +} + +/* Update the topology description with a reply or an error. + * + * Called only from server monitor thread. + * Caller must hold no locks. + * Locks topology mutex. + */ +static void +_server_monitor_update_topology_description ( + mongoc_server_monitor_t *server_monitor, + bson_t *reply, + uint64_t rtt_us, + bson_error_t *error) +{ + mongoc_topology_t *topology; + + topology = server_monitor->topology; + bson_mutex_lock (&topology->mutex); + if (topology->scanner_state != MONGOC_TOPOLOGY_SCANNER_SHUTTING_DOWN) { + mongoc_topology_description_handle_ismaster ( + &server_monitor->topology->description, + server_monitor->server_id, + reply, + rtt_us / 1000, + error); + /* reconcile server monitors. */ + _mongoc_topology_background_monitoring_reconcile (topology); + } + /* Wake threads performing server selection. */ + mongoc_cond_broadcast (&server_monitor->topology->cond_client); + bson_mutex_unlock (&server_monitor->topology->mutex); +} + +/* Send an ismaster command to a server. + * + * Called only from server monitor thread. + * Caller must hold no locks. + * Locks server_monitor->mutex to reset scan_requested. + * Locks topology mutex when updating topology description with new ismaster + * reply (or error). + */ +static void +_server_monitor_regular_ismaster (mongoc_server_monitor_t *server_monitor) +{ + bson_t cmd; + bson_t reply; + bool ret; + bson_error_t error = {0}; + int64_t rtt_us; + int64_t start_us; + int attempt; + bool stream_established = false; + mongoc_topology_t *topology; + + topology = server_monitor->topology; + bson_init (&cmd); + bson_init (&reply); + rtt_us = 0; + attempt = 0; + while (true) { + /* If the first attempt to send an ismaster failed, see if we can attempt + * again. + * Server monitoring spec: Once the server is connected, the client MUST + * change its type to Unknown only after it has retried the server once. + */ + if (attempt > 0) { + mongoc_server_description_t *existing_sd; + bool should_retry = false; + + if (attempt > 1 || !stream_established) { + /* We've already retried, or we were not able to establish a + * connection at all. */ + should_retry = false; + } else { + bson_mutex_lock (&topology->mutex); + /* If the server description is already Unknown, don't retry. */ + if (topology->scanner_state != + MONGOC_TOPOLOGY_SCANNER_SHUTTING_DOWN) { + existing_sd = mongoc_topology_description_server_by_id ( + &topology->description, server_monitor->server_id, NULL); + if (existing_sd != NULL && + existing_sd->type != MONGOC_SERVER_UNKNOWN) { + should_retry = true; + } + } + + bson_mutex_unlock (&server_monitor->topology->mutex); + } + + if (!should_retry) { + TRACE ("sm (%d): cannot retry", server_monitor->server_id); + /* error was previously set. */ + _server_monitor_update_topology_description ( + server_monitor, NULL, -1, &error); + break; + } else { + TRACE ("sm (%d): going to retry", server_monitor->server_id); + } + } + attempt++; + + bson_reinit (&cmd); + BCON_APPEND (&cmd, "isMaster", BCON_INT32 (1)); + + if (!server_monitor->stream) { + /* Using an initiator isn't really necessary. Users can't set them on + * pools. But it is used for tests. */ + if (server_monitor->initiator) { + server_monitor->stream = + server_monitor->initiator (server_monitor->uri, + &server_monitor->host, + server_monitor->initiator_context, + &error); + } else { + void *ssl_opts_void = NULL; + +#ifdef MONGOC_ENABLE_SSL + ssl_opts_void = server_monitor->ssl_opts; +#endif + server_monitor->stream = + mongoc_client_connect (false, + ssl_opts_void != NULL, + ssl_opts_void, + server_monitor->uri, + &server_monitor->host, + &error); + } + if (!server_monitor->stream) { + TRACE ("sm (%d) failed to connect", server_monitor->server_id); + _server_monitor_heartbeat_failed (server_monitor, &error, rtt_us); + continue; + } + + bson_destroy (&cmd); + bson_copy_to (_mongoc_topology_get_ismaster (server_monitor->topology), + &cmd); + } + + /* A stream was already established, or we created one successfully. (This + * permits a retry). */ + stream_established = true; + + /* Cluster time is updated on every reply. Don't wait for notifications, + * just poll it. */ + bson_mutex_lock (&server_monitor->topology->mutex); + if (!bson_empty (&server_monitor->topology->description.cluster_time)) { + bson_append_document ( + &cmd, + "$clusterTime", + 12, + &server_monitor->topology->description.cluster_time); + } + bson_mutex_unlock (&server_monitor->topology->mutex); + + start_us = bson_get_monotonic_time (); + bson_destroy (&reply); + _server_monitor_heartbeat_started (server_monitor); + ret = _server_monitor_cmd_send (server_monitor, &cmd, &reply, &error); + if (!ret) { + TRACE ("sm (%d) error = %s", server_monitor->server_id, error.message); + } + /* Must mark scan_requested as "delivered" before updating the topology + * description, not after. Otherwise, we could miss a scan request in + * server selection. We need to uphold the invariant: if a scan is + * requested, cond_client will be signalled. + */ + bson_mutex_lock (&server_monitor->shared.mutex); + server_monitor->shared.scan_requested = false; + bson_mutex_unlock (&server_monitor->shared.mutex); + rtt_us = (bson_get_monotonic_time () - start_us); + + if (ret) { + _server_monitor_update_topology_description ( + server_monitor, &reply, rtt_us, &error); + _server_monitor_heartbeat_succeeded (server_monitor, &reply, rtt_us); + break; + } else { + mongoc_stream_destroy (server_monitor->stream); + server_monitor->stream = NULL; + _server_monitor_heartbeat_failed (server_monitor, &error, rtt_us); + continue; + } + } + + bson_destroy (&cmd); + bson_destroy (&reply); +} + +/* The server monitor thread. + * + * Runs continuously and sends ismaster commands. Sleeps until it is time to + * scan or woken by a change in shared state: + * - a request for immediate scan + * - a request for shutdown + * Locks the server mutex to check shared state. + * Locks topology mutex to update description. + */ +static BSON_THREAD_FUN (_server_monitor_run, server_monitor_void) +{ + mongoc_server_monitor_t *server_monitor; + + server_monitor = (mongoc_server_monitor_t *) server_monitor_void; + + while (true) { + int64_t now_ms; + int64_t sleep_duration_ms; + + now_ms = bson_get_monotonic_time () / 1000; + if (now_ms >= server_monitor->scan_due_ms) { + TRACE ("sm (%d) sending ismaster", server_monitor->server_id); + _server_monitor_regular_ismaster (server_monitor); + server_monitor->last_scan_ms = bson_get_monotonic_time () / 1000; + server_monitor->scan_due_ms = server_monitor->last_scan_ms + + server_monitor->heartbeat_frequency_ms; + } + + bson_mutex_lock (&server_monitor->shared.mutex); + if (server_monitor->shared.shutting_down) { + server_monitor->shared.is_shutdown = true; + bson_mutex_unlock (&server_monitor->shared.mutex); + break; + } + + if (server_monitor->shared.scan_requested) { + server_monitor->scan_due_ms = + server_monitor->last_scan_ms + + server_monitor->min_heartbeat_frequency_ms; + } + + sleep_duration_ms = server_monitor->scan_due_ms - now_ms; + + if (sleep_duration_ms > 0) { + TRACE ("sm (%d) sleeping for %" PRId64, + server_monitor->server_id, + sleep_duration_ms); + mongoc_cond_timedwait (&server_monitor->shared.cond, + &server_monitor->shared.mutex, + sleep_duration_ms); + } + bson_mutex_unlock (&server_monitor->shared.mutex); + } + BSON_THREAD_RETURN; +} + +/* Free data for a server monitor. + * + * Called from any thread during reconcile and during the shutdown procedure. + * Caller must have topology mutex locked, but not the server monitor mutex. + */ +static void +_server_monitor_destroy (mongoc_server_monitor_t *server_monitor) +{ + mongoc_stream_destroy (server_monitor->stream); + mongoc_uri_destroy (server_monitor->uri); + mongoc_cond_destroy (&server_monitor->shared.cond); + bson_mutex_destroy (&server_monitor->shared.mutex); +#ifdef MONGOC_ENABLE_SSL + if (server_monitor->ssl_opts) { + _mongoc_ssl_opts_cleanup (server_monitor->ssl_opts, true); + bson_free (server_monitor->ssl_opts); + } +#endif + bson_free (server_monitor); +} + +/* Signal a server monitor to shutdown. If shutdown has already completed, frees + * server monitor and returns true. + * + * Called only during topology description reconcile (from any thread). + * Caller must hold topology lock. + */ +static bool +_server_monitor_try_shutdown_and_destroy ( + mongoc_server_monitor_t *server_monitor) +{ + bool is_shutdown; + + bson_mutex_lock (&server_monitor->shared.mutex); + is_shutdown = server_monitor->shared.is_shutdown; + server_monitor->shared.shutting_down = true; + mongoc_cond_signal (&server_monitor->shared.cond); + bson_mutex_unlock (&server_monitor->shared.mutex); + + /* If the server monitor thread has exited, join so the internal thread state + * can be freed. Joining can only be done once the server monitor has + * completed shutdown. Otherwise, the server monitor may be in the middle of + * scanning (and therefore may need to take the topology mutex again). + * + * Since the topology mutex is locked, we're guaranteed that only one thread + * will join. + */ + if (is_shutdown) { + TRACE ("sm (%d) try join start", server_monitor->server_id); + COMMON_PREFIX (thread_join) (server_monitor->thread); + TRACE ("sm (%d) try join end", server_monitor->server_id); + _server_monitor_destroy (server_monitor); + return true; + } + return false; /* Still waiting for shutdown. */ +} + +/* Request scan of a single server. + * + * Caller does not need to have topology mutex locked. + * Locks server_monitor mutex to deliver scan_requested. + */ +static void +_server_monitor_request_scan (mongoc_server_monitor_t *server_monitor) +{ + bson_mutex_lock (&server_monitor->shared.mutex); + server_monitor->shared.scan_requested = true; + mongoc_cond_signal (&server_monitor->shared.cond); + bson_mutex_unlock (&server_monitor->shared.mutex); +} + +/* Create a server monitor if necessary. + * + * Called by monitor threads and application threads when reconciling the + * topology description. Caller must have topology mutex locked. + */ +static void +_background_monitor_reconcile_server_monitor (mongoc_topology_t *topology, + mongoc_server_description_t *sd) +{ + mongoc_set_t *server_monitors; + mongoc_server_monitor_t *server_monitor; + + server_monitors = topology->server_monitors; + server_monitor = mongoc_set_get (server_monitors, sd->id); + + if (server_monitor) { + return; + } + + /* Add a new server monitor. */ + server_monitor = bson_malloc0 (sizeof (*server_monitor)); + server_monitor->server_id = sd->id; + memcpy (&server_monitor->host, &sd->host, sizeof (mongoc_host_list_t)); + server_monitor->topology = topology; + server_monitor->heartbeat_frequency_ms = + topology->description.heartbeat_msec; + server_monitor->min_heartbeat_frequency_ms = + topology->min_heartbeat_frequency_msec; + server_monitor->connect_timeout_ms = topology->connect_timeout_msec; + server_monitor->uri = mongoc_uri_copy (topology->uri); +/* TODO: CDRIVER-3682 Do not retrieve ssl opts from topology scanner. They + * should be stored somewhere else. */ +#ifdef MONGOC_ENABLE_SSL + if (topology->scanner->ssl_opts) { + server_monitor->ssl_opts = bson_malloc0 (sizeof (mongoc_ssl_opt_t)); + + _mongoc_ssl_opts_copy_to ( + topology->scanner->ssl_opts, server_monitor->ssl_opts, true); + } +#endif + memcpy (&server_monitor->apm_callbacks, + &topology->description.apm_callbacks, + sizeof (mongoc_apm_callbacks_t)); + server_monitor->apm_context = topology->description.apm_context; + server_monitor->initiator = topology->scanner->initiator; + server_monitor->initiator_context = topology->scanner->initiator_context; + mongoc_cond_init (&server_monitor->shared.cond); + bson_mutex_init (&server_monitor->shared.mutex); + COMMON_PREFIX (thread_create) + (&server_monitor->thread, _server_monitor_run, server_monitor); + mongoc_set_add (server_monitors, server_monitor->server_id, server_monitor); +} + +/* Start background monitoring. + * + * Called by an application thread popping a client from a pool. Safe to + * call repeatedly. + * Caller must have topology mutex locked. + */ +void +_mongoc_topology_background_monitoring_start (mongoc_topology_t *topology) +{ + BSON_ASSERT (!topology->single_threaded); + + if (topology->scanner_state == MONGOC_TOPOLOGY_SCANNER_BG_RUNNING) { + return; + } + + BSON_ASSERT (topology->scanner_state == MONGOC_TOPOLOGY_SCANNER_OFF); + + topology->scanner_state = MONGOC_TOPOLOGY_SCANNER_BG_RUNNING; + + _mongoc_handshake_freeze (); + _mongoc_topology_description_monitor_opening (&topology->description); + + /* Reconcile to create the first server monitors. */ + _mongoc_topology_background_monitoring_reconcile (topology); + /* Start SRV polling thread. */ + if (mongoc_uri_get_service (topology->uri)) { + COMMON_PREFIX (thread_create) + (&topology->srv_polling_thread, srv_polling_run, topology); + } +} + +/* Reconcile the topology description with the set of server monitors. + * + * Called when the topology description is updated (via handshake, monitoring, + * or invalidation). May be called by server monitor thread or an application + * thread. + * Caller must have topology mutex locked. + * Locks server monitor mutexes. May join / remove server monitors that have + * completed shutdown. + */ +void +_mongoc_topology_background_monitoring_reconcile (mongoc_topology_t *topology) +{ + mongoc_topology_description_t *td; + mongoc_set_t *server_descriptions; + mongoc_set_t *server_monitors; + uint32_t *server_monitor_ids_to_remove; + uint32_t n_server_monitor_ids_to_remove = 0; + int i; + + td = &topology->description; + server_descriptions = td->servers; + server_monitors = topology->server_monitors; + + BSON_ASSERT (!topology->single_threaded); + + if (topology->scanner_state != MONGOC_TOPOLOGY_SCANNER_BG_RUNNING) { + return; + } + + /* Add newly discovered server monitors, and update existing ones. */ + for (i = 0; i < server_descriptions->items_len; i++) { + mongoc_server_description_t *sd; + + sd = mongoc_set_get_item (server_descriptions, i); + _background_monitor_reconcile_server_monitor (topology, sd); + } + + /* Signal shutdown to server monitors no longer in the topology description. + */ + server_monitor_ids_to_remove = + bson_malloc0 (sizeof (uint32_t) * server_monitors->items_len); + for (i = 0; i < server_monitors->items_len; i++) { + mongoc_server_monitor_t *server_monitor; + uint32_t id; + + server_monitor = mongoc_set_get_item_and_id (server_monitors, i, &id); + if (!mongoc_set_get (server_descriptions, id)) { + if (_server_monitor_try_shutdown_and_destroy (server_monitor)) { + server_monitor_ids_to_remove[n_server_monitor_ids_to_remove] = id; + n_server_monitor_ids_to_remove++; + } + } + } + + /* Remove freed server monitors that have completed shutdown. */ + for (i = 0; i < n_server_monitor_ids_to_remove; i++) { + mongoc_set_rm (server_monitors, server_monitor_ids_to_remove[i]); + } + bson_free (server_monitor_ids_to_remove); +} + +/* Request all server monitors to scan. + * + * Called from application threads (during server selection or "not master" + * errors). Caller must have topology mutex locked. Locks server monitor mutexes + * to deliver scan_requested. + */ +void +_mongoc_topology_background_monitoring_request_scan ( + mongoc_topology_t *topology) +{ + mongoc_set_t *server_monitors; + int i; + + BSON_ASSERT (!topology->single_threaded); + + if (topology->scanner_state == MONGOC_TOPOLOGY_SCANNER_SHUTTING_DOWN) { + return; + } + + server_monitors = topology->server_monitors; + + for (i = 0; i < server_monitors->items_len; i++) { + mongoc_server_monitor_t *server_monitor; + uint32_t id; + + server_monitor = mongoc_set_get_item_and_id (server_monitors, i, &id); + _server_monitor_request_scan (server_monitor); + } +} + +/* Stop, join, and destroy all server monitors. + * + * Called by application threads when destroying a client pool. + * Caller must have topology mutex locked. + * Locks server monitor mutexes to deliver shutdown. Releases topology mutex to + * join server monitor threads. Leaves topology mutex locked on exit. This + * function is thread-safe. But in practice, it is only ever called by one + * application thread (because mongoc_client_pool_destroy is not thread-safe). + */ +void +_mongoc_topology_background_monitoring_stop (mongoc_topology_t *topology) +{ + mongoc_server_monitor_t *server_monitor; + int i; + bool is_srv_polling; + + BSON_ASSERT (!topology->single_threaded); + + if (topology->scanner_state != MONGOC_TOPOLOGY_SCANNER_BG_RUNNING) { + return; + } + + topology->scanner_state = MONGOC_TOPOLOGY_SCANNER_SHUTTING_DOWN; + + is_srv_polling = NULL != mongoc_uri_get_service (topology->uri); + /* Signal SRV polling to shut down (if it is started). */ + if (is_srv_polling) { + mongoc_cond_signal (&topology->srv_polling_cond); + } + + /* Signal all server monitors to shut down. */ + for (i = 0; i < topology->server_monitors->items_len; i++) { + server_monitor = mongoc_set_get_item (topology->server_monitors, i); + bson_mutex_lock (&server_monitor->shared.mutex); + server_monitor->shared.shutting_down = true; + mongoc_cond_signal (&server_monitor->shared.cond); + bson_mutex_unlock (&server_monitor->shared.mutex); + } + + /* Some mongoc_server_monitor_t may be waiting for topology mutex. Unlock so + * they can proceed to terminate. It is safe to unlock topology mutex. Since + * scanner_state has transitioned to shutting down, no thread can modify + * server_monitors. */ + bson_mutex_unlock (&topology->mutex); + + for (i = 0; i < topology->server_monitors->items_len; i++) { + /* Wait for the thread to shutdown. */ + server_monitor = mongoc_set_get_item (topology->server_monitors, i); + TRACE ("sm (%d) joining thread", server_monitor->server_id); + COMMON_PREFIX (thread_join) (server_monitor->thread); + TRACE ("sm (%d) thread joined", server_monitor->server_id); + _server_monitor_destroy (server_monitor); + } + + /* Wait for SRV polling thread. */ + if (is_srv_polling) { + COMMON_PREFIX (thread_join) (topology->srv_polling_thread); + } + + bson_mutex_lock (&topology->mutex); + mongoc_set_destroy (topology->server_monitors); + topology->server_monitors = mongoc_set_new (1, NULL, NULL); + topology->scanner_state = MONGOC_TOPOLOGY_SCANNER_OFF; + mongoc_cond_broadcast (&topology->cond_client); +} diff --git a/src/libmongoc/src/mongoc/mongoc-topology-private.h b/src/libmongoc/src/mongoc/mongoc-topology-private.h index eb7d809d215..49130dc6a93 100644 --- a/src/libmongoc/src/mongoc/mongoc-topology-private.h +++ b/src/libmongoc/src/mongoc/mongoc-topology-private.h @@ -40,10 +40,10 @@ typedef enum { MONGOC_TOPOLOGY_SCANNER_OFF, MONGOC_TOPOLOGY_SCANNER_BG_RUNNING, - MONGOC_TOPOLOGY_SCANNER_SHUTTING_DOWN, - MONGOC_TOPOLOGY_SCANNER_SINGLE_THREADED, + MONGOC_TOPOLOGY_SCANNER_SHUTTING_DOWN } mongoc_topology_scanner_state_t; +struct _mongoc_background_monitor_t; struct _mongoc_client_pool_t; typedef struct _mongoc_topology_t { @@ -67,16 +67,16 @@ typedef struct _mongoc_topology_t { /* Minimum of SRV record TTLs, but no lower than 60 seconds. * May be zero for non-SRV/non-MongoS topology. */ - int64_t rescanSRVIntervalMS; - int64_t last_srv_scan; + int64_t srv_polling_rescan_interval_ms; + int64_t srv_polling_last_scan_ms; + /* For multi-threaded, srv polling occurs in a separate thread. */ + bson_thread_t srv_polling_thread; + mongoc_cond_t srv_polling_cond; bson_mutex_t mutex; mongoc_cond_t cond_client; - mongoc_cond_t cond_server; - bson_thread_t thread; - mongoc_topology_scanner_state_t scanner_state; - bool scan_requested; + bool single_threaded; bool stale; @@ -98,6 +98,11 @@ typedef struct _mongoc_topology_t { char *mongocryptd_spawn_path; bson_t *mongocryptd_spawn_args; #endif + + /* For background monitoring. */ + mongoc_set_t *server_monitors; + bson_mutex_t apm_mutex; + } mongoc_topology_t; mongoc_topology_t * @@ -160,12 +165,6 @@ mongoc_topology_server_timestamp (mongoc_topology_t *topology, uint32_t id); mongoc_topology_description_type_t _mongoc_topology_get_type (mongoc_topology_t *topology); -bool -_mongoc_topology_start_background_scanner (mongoc_topology_t *topology); - -void -_mongoc_topology_background_thread_stop (mongoc_topology_t *topology); - bool _mongoc_topology_set_appname (mongoc_topology_t *topology, const char *appname); @@ -197,4 +196,7 @@ _mongoc_topology_request_scan (mongoc_topology_t *topology); void _mongoc_topology_bypass_cooldown (mongoc_topology_t *topology); + +void +mongoc_topology_rescan_srv (mongoc_topology_t *topology); #endif diff --git a/src/libmongoc/src/mongoc/mongoc-topology.c b/src/libmongoc/src/mongoc/mongoc-topology.c index e6410cf61bb..f92be7fb086 100644 --- a/src/libmongoc/src/mongoc/mongoc-topology.c +++ b/src/libmongoc/src/mongoc/mongoc-topology.c @@ -29,9 +29,13 @@ #include "mongoc-uri-private.h" #include "mongoc-util-private.h" #include "mongoc-trace-private.h" +#include "mongoc-topology-background-monitoring-private.h" #include "utlist.h" +static void +_topology_collect_errors (mongoc_topology_t *topology, bson_error_t *error_out); + static bool _mongoc_topology_reconcile_add_nodes (mongoc_server_description_t *sd, mongoc_topology_t *topology) @@ -49,6 +53,11 @@ _mongoc_topology_reconcile_add_nodes (mongoc_server_description_t *sd, return true; } +/* Called from: + * - the topology scanner callback (when an ismaster was just received) + * - at the start of a single-threaded scan (mongoc_topology_scan_once) + * Not called for multi threaded monitoring. + */ void mongoc_topology_reconcile (mongoc_topology_t *topology) { @@ -134,6 +143,7 @@ _mongoc_topology_scanner_setup_err_cb (uint32_t id, * command objects. * * NOTE: This method locks the given topology's mutex. + * Only called for single-threaded monitoring. * *------------------------------------------------------------------------- */ @@ -292,7 +302,6 @@ mongoc_topology_new (const mongoc_uri_t *uri, bool single_threaded) bson_mutex_init (&topology->mutex); mongoc_cond_init (&topology->cond_client); - mongoc_cond_init (&topology->cond_server); if (single_threaded) { /* single threaded clients negotiate sasl supported mechanisms during @@ -337,14 +346,15 @@ mongoc_topology_new (const mongoc_uri_t *uri, bool single_threaded) GOTO (srv_fail); } - topology->last_srv_scan = bson_get_monotonic_time (); - topology->rescanSRVIntervalMS = BSON_MAX ( + topology->srv_polling_last_scan_ms = bson_get_monotonic_time () / 1000; + topology->srv_polling_rescan_interval_ms = BSON_MAX ( rr_data.min_ttl * 1000, MONGOC_TOPOLOGY_MIN_RESCAN_SRV_INTERVAL_MS); topology_valid = true; srv_fail: bson_free (rr_data.txt_record_opts); bson_free (prefixed_service); + _mongoc_host_list_destroy_all (rr_data.hosts); } else { topology_valid = true; } @@ -392,6 +402,12 @@ mongoc_topology_new (const mongoc_uri_t *uri, bool single_threaded) topology->description.type = init_type; + if (!topology->single_threaded) { + topology->server_monitors = mongoc_set_new (1, NULL, NULL); + bson_mutex_init (&topology->apm_mutex); + mongoc_cond_init (&topology->srv_polling_cond); + } + if (!topology_valid) { /* add no nodes */ return topology; @@ -414,6 +430,8 @@ mongoc_topology_new (const mongoc_uri_t *uri, bool single_threaded) * * Set Application Performance Monitoring callbacks. * + * Caller must hold topology->mutex. + * *------------------------------------------------------------------------- */ void @@ -472,7 +490,15 @@ mongoc_topology_destroy (mongoc_topology_t *topology) bson_free (topology->mongocryptd_spawn_path); #endif - _mongoc_topology_background_thread_stop (topology); + if (!topology->single_threaded) { + bson_mutex_lock (&topology->mutex); + _mongoc_topology_background_monitoring_stop (topology); + bson_mutex_unlock (&topology->mutex); + BSON_ASSERT (topology->scanner_state == MONGOC_TOPOLOGY_SCANNER_OFF); + mongoc_set_destroy (topology->server_monitors); + bson_mutex_destroy (&topology->apm_mutex); + mongoc_cond_destroy (&topology->srv_polling_cond); + } _mongoc_topology_description_monitor_closed (&topology->description); mongoc_uri_destroy (topology->uri); @@ -486,7 +512,6 @@ mongoc_topology_destroy (mongoc_topology_t *topology) _mongoc_topology_clear_session_pool (topology); mongoc_cond_destroy (&topology->cond_client); - mongoc_cond_destroy (&topology->cond_server); bson_mutex_destroy (&topology->mutex); bson_free (topology); @@ -535,7 +560,8 @@ mongoc_topology_apply_scanned_srv_hosts (mongoc_uri_t *uri, * If validation fails, log it. * If no valid hosts remain, do not update the topology description. */ - LL_FOREACH (hosts, host) { + LL_FOREACH (hosts, host) + { if (mongoc_uri_validate_srv_result (uri, host->host, error)) { _mongoc_host_list_upsert (&valid_hosts, host); } else { @@ -571,13 +597,14 @@ mongoc_topology_apply_scanned_srv_hosts (mongoc_uri_t *uri, * * -------------------------------------------------------------------------- */ -static void +void mongoc_topology_rescan_srv (mongoc_topology_t *topology) { mongoc_rr_data_t rr_data = {0}; const char *service; char *prefixed_service = NULL; - int64_t scan_time; + int64_t scan_time_ms; + bool ret; if ((topology->description.type != MONGOC_TOPOLOGY_SHARDED) && (topology->description.type != MONGOC_TOPOLOGY_UNKNOWN)) { @@ -591,26 +618,35 @@ mongoc_topology_rescan_srv (mongoc_topology_t *topology) return; } - scan_time = topology->last_srv_scan + (topology->rescanSRVIntervalMS * 1000); - if (bson_get_monotonic_time () < scan_time) { - /* Query SRV no more frequently than rescanSRVIntervalMS. */ + scan_time_ms = topology->srv_polling_last_scan_ms + + topology->srv_polling_rescan_interval_ms; + if (bson_get_monotonic_time () / 1000 < scan_time_ms) { + /* Query SRV no more frequently than srv_polling_rescan_interval_ms. */ return; } + TRACE ("Polling for SRV records", NULL); + /* Go forth and query... */ prefixed_service = bson_strdup_printf ("_mongodb._tcp.%s", service); - if (!_mongoc_client_get_rr (prefixed_service, - MONGOC_RR_SRV, - &rr_data, - &topology->scanner->error)) { + + /* Unlock topology mutex during scan so it does not hold up other operations. + */ + bson_mutex_unlock (&topology->mutex); + ret = _mongoc_client_get_rr ( + prefixed_service, MONGOC_RR_SRV, &rr_data, &topology->scanner->error); + bson_mutex_lock (&topology->mutex); + + topology->srv_polling_last_scan_ms = bson_get_monotonic_time () / 1000; + if (!ret) { /* Failed querying, soldier on and try again next time. */ - topology->rescanSRVIntervalMS = topology->description.heartbeat_msec; + topology->srv_polling_rescan_interval_ms = + topology->description.heartbeat_msec; MONGOC_ERROR ("SRV polling error: %s", topology->scanner->error.message); GOTO (done); } - topology->last_srv_scan = bson_get_monotonic_time (); - topology->rescanSRVIntervalMS = BSON_MAX ( + topology->srv_polling_rescan_interval_ms = BSON_MAX ( rr_data.min_ttl * 1000, MONGOC_TOPOLOGY_MIN_RESCAN_SRV_INTERVAL_MS); if (!mongoc_topology_apply_scanned_srv_hosts (topology->uri, @@ -622,11 +658,13 @@ mongoc_topology_rescan_srv (mongoc_topology_t *topology) * hosts exist. * Leave the toplogy alone and perform another scan at the next interval * rather than removing all records and having nothing to connect to. - * For no verified hosts drivers "MUST temporarily set rescanSRVIntervalMS + * For no verified hosts drivers "MUST temporarily set + * srv_polling_rescan_interval_ms * to heartbeatFrequencyMS until at least one verified SRV record is * obtained." */ - topology->rescanSRVIntervalMS = topology->description.heartbeat_msec; + topology->srv_polling_rescan_interval_ms = + topology->description.heartbeat_msec; GOTO (done); } @@ -647,6 +685,9 @@ mongoc_topology_rescan_srv (mongoc_topology_t *topology) * * NOTE: this method unlocks and re-locks @topology's mutex. * + * Only runs for single threaded monitoring. (obey_cooldown is always + * true). + * *-------------------------------------------------------------------------- */ static void @@ -689,8 +730,6 @@ void _mongoc_topology_do_blocking_scan (mongoc_topology_t *topology, bson_error_t *error) { - topology->scanner_state = MONGOC_TOPOLOGY_SCANNER_SINGLE_THREADED; - _mongoc_handshake_freeze (); bson_mutex_lock (&topology->mutex); @@ -990,7 +1029,8 @@ mongoc_topology_select_server_id (mongoc_topology_t *topology, &topology->mutex, (expire_at - loop_start) / 1000); - mongoc_topology_scanner_get_error (ts, &scanner_error); + _topology_collect_errors (topology, &scanner_error); + bson_mutex_unlock (&topology->mutex); #ifdef _WIN32 @@ -1116,21 +1156,15 @@ _mongoc_topology_host_by_id (mongoc_topology_t *topology, } /* - *-------------------------------------------------------------------------- - * - * _mongoc_topology_request_scan -- - * - * Non-locking variant + + * Caller must have topology->mutex locked. * - *-------------------------------------------------------------------------- */ void _mongoc_topology_request_scan (mongoc_topology_t *topology) { - topology->scan_requested = true; - - mongoc_cond_signal (&topology->cond_server); + _mongoc_topology_background_monitoring_request_scan (topology); } /* @@ -1159,20 +1193,14 @@ mongoc_topology_invalidate_server (mongoc_topology_t *topology, } /* - *-------------------------------------------------------------------------- - * - * _mongoc_topology_update_from_handshake -- - * - * A client opens a new connection and calls ismaster on it when it - * detects a closed connection in _mongoc_cluster_check_interval, or if - * mongoc_client_pool_pop creates a new client. Update the topology - * description from the ismaster response. - * - * NOTE: this method uses @topology's mutex. - * - * Returns: - * false if the server was removed from the topology - *-------------------------------------------------------------------------- + * Update the topology from the response to a handshake on a new application + * connection. + * Only applicable to a client pool (single-threaded clients reuse monitoring + * connections). + * Caller must not have the topology->mutex locked. + * Locks topology->mutex. + * Called only from app threads (not server monitor threads). + * Returns false if the server was removed from the topology */ bool _mongoc_topology_update_from_handshake (mongoc_topology_t *topology, @@ -1182,6 +1210,7 @@ _mongoc_topology_update_from_handshake (mongoc_topology_t *topology, BSON_ASSERT (topology); BSON_ASSERT (sd); + BSON_ASSERT (!topology->single_threaded); bson_mutex_lock (&topology->mutex); @@ -1191,6 +1220,8 @@ _mongoc_topology_update_from_handshake (mongoc_topology_t *topology, /* if pooled, wake threads waiting in mongoc_topology_server_by_id */ mongoc_cond_broadcast (&topology->cond_client); + /* Update background monitoring. */ + _mongoc_topology_background_monitoring_reconcile (topology); bson_mutex_unlock (&topology->mutex); return has_server; @@ -1285,211 +1316,6 @@ _mongoc_topology_get_type (mongoc_topology_t *topology) return td_type; } -/* - *-------------------------------------------------------------------------- - * - * _mongoc_topology_run_background -- - * - * The background topology monitoring thread runs in this loop. - * - * NOTE: this method uses @topology's mutex. - * - *-------------------------------------------------------------------------- - */ -static BSON_THREAD_FUN (_mongoc_topology_run_background, data) -{ - mongoc_topology_t *topology; - int64_t now; - int64_t last_scan; - int64_t timeout; - int64_t force_timeout; - int64_t heartbeat_msec; - int r; - - BSON_ASSERT (data); - - last_scan = 0; - topology = (mongoc_topology_t *) data; - heartbeat_msec = topology->description.heartbeat_msec; - - /* we exit this loop when shutting down, or on error */ - for (;;) { - /* unlocked after starting a scan or after breaking out of the loop */ - bson_mutex_lock (&topology->mutex); - if (!mongoc_topology_scanner_valid (topology->scanner)) { - bson_mutex_unlock (&topology->mutex); - goto DONE; - } - - /* we exit this loop on error, or when we should scan immediately */ - for (;;) { - if (topology->scanner_state == MONGOC_TOPOLOGY_SCANNER_SHUTTING_DOWN) { - bson_mutex_unlock (&topology->mutex); - goto DONE; - } - - now = bson_get_monotonic_time (); - - if (last_scan == 0) { - /* set up the "last scan" as exactly long enough to force an - * immediate scan on the first pass */ - last_scan = now - (heartbeat_msec * 1000); - } - - timeout = heartbeat_msec - ((now - last_scan) / 1000); - - /* if someone's specifically asked for a scan, use a shorter interval - */ - if (topology->scan_requested) { - force_timeout = topology->min_heartbeat_frequency_msec - - ((now - last_scan) / 1000); - - timeout = BSON_MIN (timeout, force_timeout); - } - - /* if we can start scanning, do so immediately */ - if (timeout <= 0) { - break; - } else { - /* otherwise wait until someone: - * o requests a scan - * o we time out - * o requests a shutdown - */ - r = mongoc_cond_timedwait ( - &topology->cond_server, &topology->mutex, timeout); - -#ifdef _WIN32 - if (!(r == 0 || r == WSAETIMEDOUT)) { -#else - if (!(r == 0 || r == ETIMEDOUT)) { -#endif - bson_mutex_unlock (&topology->mutex); - /* handle errors */ - goto DONE; - } - - /* if we timed out, or were woken up, check if it's time to scan - * again, or bail out */ - } - } - - topology->scan_requested = false; - mongoc_topology_scan_once (topology, false /* obey cooldown */); - bson_mutex_unlock (&topology->mutex); - - last_scan = bson_get_monotonic_time (); - } - -DONE: - BSON_THREAD_RETURN; -} - -/* - *-------------------------------------------------------------------------- - * - * mongoc_topology_start_background_scanner - * - * Start the topology background thread running. This should only be - * called once per pool. If clients are created separately (not - * through a pool) the SDAM logic will not be run in a background - * thread. Returns whether or not the scanner is running on termination - * of the function. - * - * NOTE: this method uses @topology's mutex. - * - *-------------------------------------------------------------------------- - */ - -bool -_mongoc_topology_start_background_scanner (mongoc_topology_t *topology) -{ - int r; - - if (topology->single_threaded) { - return false; - } - - bson_mutex_lock (&topology->mutex); - - if (topology->scanner_state == MONGOC_TOPOLOGY_SCANNER_BG_RUNNING) { - bson_mutex_unlock (&topology->mutex); - return true; - } - - BSON_ASSERT (topology->scanner_state == MONGOC_TOPOLOGY_SCANNER_OFF); - - topology->scanner_state = MONGOC_TOPOLOGY_SCANNER_BG_RUNNING; - - _mongoc_handshake_freeze (); - _mongoc_topology_description_monitor_opening (&topology->description); - - r = COMMON_PREFIX (thread_create) ( - &topology->thread, _mongoc_topology_run_background, topology); - - if (r != 0) { - MONGOC_ERROR ("could not start topology scanner thread: %s", - strerror (r)); - abort (); - } - - bson_mutex_unlock (&topology->mutex); - - return true; -} - -/* - *-------------------------------------------------------------------------- - * - * mongoc_topology_background_thread_stop -- - * - * Stop the topology background thread. Called by the owning pool at - * its destruction. - * - * NOTE: this method uses @topology's mutex. - * - *-------------------------------------------------------------------------- - */ - -void -_mongoc_topology_background_thread_stop (mongoc_topology_t *topology) -{ - bool join_thread = false; - - if (topology->single_threaded) { - return; - } - - bson_mutex_lock (&topology->mutex); - - BSON_ASSERT (topology->scanner_state != - MONGOC_TOPOLOGY_SCANNER_SHUTTING_DOWN); - - if (topology->scanner_state == MONGOC_TOPOLOGY_SCANNER_BG_RUNNING) { - /* if the background thread is running, request a shutdown and signal the - * thread */ - topology->scanner_state = MONGOC_TOPOLOGY_SCANNER_SHUTTING_DOWN; - mongoc_cond_signal (&topology->cond_server); - join_thread = true; - } else { - /* nothing to do if it's already off */ - } - - bson_mutex_unlock (&topology->mutex); - - if (join_thread) { - /* if we're joining the thread, wait for it to come back and broadcast - * all listeners */ - COMMON_PREFIX (thread_join) (topology->thread); - - bson_mutex_lock (&topology->mutex); - topology->scanner_state = MONGOC_TOPOLOGY_SCANNER_OFF; - bson_mutex_unlock (&topology->mutex); - - mongoc_cond_broadcast (&topology->cond_client); - } -} - bool _mongoc_topology_set_appname (mongoc_topology_t *topology, const char *appname) { @@ -1732,3 +1558,44 @@ _mongoc_topology_bypass_cooldown (mongoc_topology_t *topology) BSON_ASSERT (topology->single_threaded); topology->scanner->bypass_cooldown = true; } + +/* Called from application threads + * Caller must hold topology lock. + * Locks topology description mutex to copy out server description errors. + * For single-threaded monitoring, the topology scanner may include errors for + * servers that were removed from the topology. + */ +static void +_topology_collect_errors (mongoc_topology_t *topology, bson_error_t *error_out) +{ + mongoc_topology_description_t *topology_description; + mongoc_server_description_t *server_description; + bson_string_t *error_message; + int i; + + topology_description = &topology->description; + memset (error_out, 0, sizeof (bson_error_t)); + error_message = bson_string_new (""); + + for (i = 0; i < topology_description->servers->items_len; i++) { + bson_error_t *error; + + server_description = topology_description->servers->items[i].item; + error = &server_description->error; + if (error->code) { + if (error_message->len > 0) { + bson_string_append_c (error_message, ' '); + } + bson_string_append_printf ( + error_message, "[%s]", server_description->error.message); + /* The last error's code and domain wins. */ + error_out->code = error->code; + error_out->domain = error->domain; + } + } + + bson_strncpy ((char *) &error_out->message, + error_message->str, + sizeof (error_out->message)); + bson_string_free (error_message, true); +} diff --git a/src/libmongoc/src/mongoc/mongoc-trace-private.h b/src/libmongoc/src/mongoc/mongoc-trace-private.h index fbe1905a583..6774b93412e 100644 --- a/src/libmongoc/src/mongoc/mongoc-trace-private.h +++ b/src/libmongoc/src/mongoc/mongoc-trace-private.h @@ -114,7 +114,7 @@ BSON_BEGIN_DECLS BSON_FUNC, \ __LINE__, \ #_n, \ - _iov, \ + (void *) _iov, \ (int) _iovcnt); \ mongoc_log_trace_iovec (MONGOC_LOG_DOMAIN, _iov, _iovcnt); \ } while (0) diff --git a/src/libmongoc/tests/mock_server/mock-server.c b/src/libmongoc/tests/mock_server/mock-server.c index 3a909c2efad..0725ad66c45 100644 --- a/src/libmongoc/tests/mock_server/mock-server.c +++ b/src/libmongoc/tests/mock_server/mock-server.c @@ -915,8 +915,8 @@ _mock_server_receives_msg (mock_server_t *server, uint32_t flags, ...) * * mock_server_receives_ismaster -- * - * Pop a client ismaster call if one is enqueued, or wait up to - * request_timeout_ms for the client to send a request. + * Pop a client non-streaming ismaster call if one is enqueued, + * or wait up to request_timeout_ms for the client to send a request. * * Returns: * A request you must request_destroy, or NULL if the current @@ -932,7 +932,10 @@ request_t * mock_server_receives_ismaster (mock_server_t *server) { return mock_server_receives_command ( - server, "admin", MONGOC_QUERY_SLAVE_OK, "{'isMaster': 1}"); + server, + "admin", + MONGOC_QUERY_SLAVE_OK, + "{'isMaster': 1, 'maxAwaitTimeMS': { '$exists': false }}"); } diff --git a/src/libmongoc/tests/mock_server/mock-server.h b/src/libmongoc/tests/mock_server/mock-server.h index fc8a25d91cd..546720d155a 100644 --- a/src/libmongoc/tests/mock_server/mock-server.h +++ b/src/libmongoc/tests/mock_server/mock-server.h @@ -216,7 +216,9 @@ rs_response_to_ismaster (mock_server_t *server, int has_tags, ...); -#define RS_RESPONSE_TO_ISMASTER(server, primary, has_tags, ...) \ - rs_response_to_ismaster (server, primary, has_tags, __VA_ARGS__, NULL) +#define RS_RESPONSE_TO_ISMASTER( \ + server, max_wire_version, primary, has_tags, ...) \ + rs_response_to_ismaster ( \ + server, max_wire_version, primary, has_tags, __VA_ARGS__, NULL) #endif /* MOCK_SERVER_H */ diff --git a/src/libmongoc/tests/test-libmongoc.c b/src/libmongoc/tests/test-libmongoc.c index dc987769b72..5f48306039b 100644 --- a/src/libmongoc/tests/test-libmongoc.c +++ b/src/libmongoc/tests/test-libmongoc.c @@ -242,6 +242,8 @@ extern void test_aws_install (TestSuite *suite); extern void test_streamable_ismaster_install (TestSuite *suite); +extern void +test_monitoring_install (TestSuite *suite); typedef struct { mongoc_log_level_t level; @@ -412,6 +414,9 @@ log_handler (mongoc_log_level_t log_level, if (!suite->silent) { mongoc_log_default_handler (log_level, log_domain, message, NULL); } + } else if (log_level == MONGOC_LOG_LEVEL_DEBUG && + test_suite_debug_output ()) { + mongoc_log_default_handler (log_level, log_domain, message, NULL); } } @@ -2403,16 +2408,17 @@ windows_exception_handler (EXCEPTION_POINTERS *pExceptionInfo) /* Initialize stack walking. */ char exception_string[128]; bson_snprintf (exception_string, - sizeof(exception_string), + sizeof (exception_string), (exception_code == EXCEPTION_ACCESS_VIOLATION) - ? "(access violation)" - : "0x%08X", exception_code); + ? "(access violation)" + : "0x%08X", + exception_code); char address_string[32]; - bson_snprintf(address_string, - sizeof(address_string), - "0x%p", - pExceptionInfo->ExceptionRecord->ExceptionAddress); + bson_snprintf (address_string, + sizeof (address_string), + "0x%p", + pExceptionInfo->ExceptionRecord->ExceptionAddress); fprintf (stderr, "exception '%s' at '%s', terminating\n", @@ -2619,6 +2625,7 @@ main (int argc, char *argv[]) test_server_description_install (&suite); test_aws_install (&suite); test_streamable_ismaster_install (&suite); + test_monitoring_install (&suite); ret = TestSuite_Run (&suite); diff --git a/src/libmongoc/tests/test-mongoc-background-monitoring.c b/src/libmongoc/tests/test-mongoc-background-monitoring.c new file mode 100644 index 00000000000..b3fd46db16e --- /dev/null +++ b/src/libmongoc/tests/test-mongoc-background-monitoring.c @@ -0,0 +1,651 @@ +/* + * Copyright 2020-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mock_server/mock-server.h" +#include "mongoc/mongoc.h" +#include "mongoc/mongoc-client-private.h" +#include "mongoc/mongoc-topology-private.h" +#include "mongoc/mongoc-topology-background-monitoring-private.h" +#include "mongoc/mongoc-topology-description-private.h" +#include "mongoc/mongoc-server-description-private.h" +#include "mongoc/mongoc-client-pool-private.h" +#include "test-libmongoc.h" +#include "TestSuite.h" + +#define LOG_DOMAIN "test_monitoring" + +/* TODO: Make these test patterns more reusable for testing libmongoc. */ +typedef struct { + uint32_t n_heartbeat_started; + uint32_t n_heartbeat_succeeded; + uint32_t n_heartbeat_failed; + uint32_t n_server_changed; + mongoc_topology_description_type_t td_type; + mongoc_server_description_type_t sd_type; +} tf_observations_t; + +typedef enum { + TF_FAST_HEARTBEAT = 1 << 0, + TF_FAST_MIN_HEARTBEAT = 1 << 1 +} tf_flags_t; + +typedef struct { + tf_flags_t flags; + mock_server_t *server; + mongoc_client_pool_t *pool; + mongoc_client_t *client; + tf_observations_t *observations; + bson_mutex_t mutex; + mongoc_cond_t cond; + bson_string_t *logs; +} test_fixture_t; + +void +tf_dump (test_fixture_t *tf) +{ + printf ("== Begin dump ==\n"); + printf ("-- Current observations --\n"); + printf ("n_heartbeat_started=%d\n", tf->observations->n_heartbeat_started); + printf ("n_heartbeat_succeeded=%d\n", + tf->observations->n_heartbeat_succeeded); + printf ("n_heartbeat_failed=%d\n", tf->observations->n_heartbeat_failed); + printf ("n_server_changed=%d\n", tf->observations->n_server_changed); + printf ("sd_type=%d\n", tf->observations->sd_type); + + printf ("-- Test fixture logs --\n"); + printf ("%s", tf->logs->str); + printf ("== End dump ==\n"); +} + +void +tf_log (test_fixture_t *tf, const char *format, ...) BSON_GNUC_PRINTF (2, 3); + +void +tf_log (test_fixture_t *tf, const char *format, ...) +{ + va_list ap; + char *str; + char nowstr[32]; + struct timeval tv; + struct tm tt; + time_t t; + + bson_gettimeofday (&tv); + t = tv.tv_sec; + +#ifdef _WIN32 +#ifdef _MSC_VER + localtime_s (&tt, &t); +#else + tt = *(localtime (&t)); +#endif +#else + localtime_r (&t, &tt); +#endif + + strftime (nowstr, sizeof nowstr, "%Y/%m/%d %H:%M:%S ", &tt); + + va_start (ap, format); + str = bson_strdupv_printf (format, ap); + va_end (ap); + bson_string_append (tf->logs, nowstr); + bson_string_append (tf->logs, str); + bson_string_append_c (tf->logs, '\n'); + bson_free (str); +} + +#define TF_LOG(_tf, ...) tf_log (_tf, __VA_ARGS__) + +static void +_heartbeat_started (const mongoc_apm_server_heartbeat_started_t *event) +{ + test_fixture_t *tf; + + tf = (test_fixture_t *) mongoc_apm_server_heartbeat_started_get_context ( + event); + bson_mutex_lock (&tf->mutex); + tf->observations->n_heartbeat_started++; + TF_LOG (tf, "heartbeat started"); + mongoc_cond_broadcast (&tf->cond); + bson_mutex_unlock (&tf->mutex); +} + +static void +_heartbeat_succeeded (const mongoc_apm_server_heartbeat_succeeded_t *event) +{ + test_fixture_t *tf; + + tf = (test_fixture_t *) mongoc_apm_server_heartbeat_succeeded_get_context ( + event); + bson_mutex_lock (&tf->mutex); + tf->observations->n_heartbeat_succeeded++; + TF_LOG (tf, "heartbeat succeeded"); + mongoc_cond_broadcast (&tf->cond); + bson_mutex_unlock (&tf->mutex); +} + +static void +_heartbeat_failed (const mongoc_apm_server_heartbeat_failed_t *event) +{ + test_fixture_t *tf; + + tf = + (test_fixture_t *) mongoc_apm_server_heartbeat_failed_get_context (event); + bson_mutex_lock (&tf->mutex); + TF_LOG (tf, "heartbeat failed"); + tf->observations->n_heartbeat_failed++; + mongoc_cond_broadcast (&tf->cond); + bson_mutex_unlock (&tf->mutex); +} + +static void +_server_changed (const mongoc_apm_server_changed_t *event) +{ + test_fixture_t *tf; + const mongoc_server_description_t *new_sd; + + tf = (test_fixture_t *) mongoc_apm_server_changed_get_context (event); + new_sd = mongoc_apm_server_changed_get_new_description (event); + bson_mutex_lock (&tf->mutex); + TF_LOG (tf, "server changed"); + tf->observations->sd_type = new_sd->type; + tf->observations->n_server_changed++; + mongoc_cond_broadcast (&tf->cond); + bson_mutex_unlock (&tf->mutex); +} + +test_fixture_t * +tf_new (tf_flags_t flags) +{ + mongoc_apm_callbacks_t *callbacks; + test_fixture_t *tf; + + tf = bson_malloc0 (sizeof (test_fixture_t)); + tf->observations = bson_malloc0 (sizeof (tf_observations_t)); + bson_mutex_init (&tf->mutex); + mongoc_cond_init (&tf->cond); + + callbacks = mongoc_apm_callbacks_new (); + tf->server = mock_server_new (); + mock_server_run (tf->server); + + mongoc_apm_set_server_heartbeat_started_cb (callbacks, _heartbeat_started); + mongoc_apm_set_server_changed_cb (callbacks, _server_changed); + mongoc_apm_set_server_heartbeat_succeeded_cb (callbacks, + _heartbeat_succeeded); + mongoc_apm_set_server_heartbeat_failed_cb (callbacks, _heartbeat_failed); + tf->pool = mongoc_client_pool_new (mock_server_get_uri (tf->server)); + mongoc_client_pool_set_apm_callbacks (tf->pool, callbacks, tf); + mongoc_apm_callbacks_destroy (callbacks); + + if (flags & TF_FAST_HEARTBEAT) { + _mongoc_client_pool_get_topology (tf->pool)->description.heartbeat_msec = + 10; + /* A fast heartbeat implies a fast min heartbeat. */ + flags |= TF_FAST_MIN_HEARTBEAT; + } + if (flags & TF_FAST_MIN_HEARTBEAT) { + _mongoc_client_pool_get_topology (tf->pool) + ->min_heartbeat_frequency_msec = 10; + } + tf->flags = flags; + tf->logs = bson_string_new (""); + tf->client = mongoc_client_pool_pop (tf->pool); + return tf; +} + +void +tf_destroy (test_fixture_t *tf) +{ + mock_server_destroy (tf->server); + mongoc_client_pool_push (tf->pool, tf->client); + mongoc_client_pool_destroy (tf->pool); + bson_string_free (tf->logs, true); + bson_mutex_destroy (&tf->mutex); + mongoc_cond_destroy (&tf->cond); + bson_free (tf->observations); + bson_free (tf); +} + +/* Wait for _predicate to become true over the next five seconds. + * _predicate is only tested when observations change. + * Upon failure, dumps logs and observations. + */ +#define OBSERVE_SOON(_tf, _predicate) \ + do { \ + int64_t _start_ms = bson_get_monotonic_time () / 1000; \ + int64_t _expires_ms = _start_ms + 5000; \ + bson_mutex_lock (&_tf->mutex); \ + while (!(_predicate)) { \ + if (bson_get_monotonic_time () / 1000 > _expires_ms) { \ + bson_mutex_unlock (&_tf->mutex); \ + tf_dump (_tf); \ + test_error ("Predicate expired: %s", #_predicate); \ + } \ + mongoc_cond_timedwait ( \ + &_tf->cond, &_tf->mutex, _expires_ms - _start_ms); \ + } \ + bson_mutex_unlock (&_tf->mutex); \ + } while (0); + +/* Check that _predicate is true immediately. Upon failure, + * dumps logs and observations. */ +#define OBSERVE(_tf, _predicate) \ + do { \ + bson_mutex_lock (&_tf->mutex); \ + if (!(_predicate)) { \ + tf_dump (_tf); \ + bson_mutex_unlock (&_tf->mutex); \ + test_error ("Predicate failed: %s", #_predicate); \ + } \ + bson_mutex_unlock (&_tf->mutex); \ + } while (0); + +#define WAIT _mongoc_usleep (10 * 1000); + +static void +_signal_shutdown (test_fixture_t *tf) +{ + bson_mutex_lock (&tf->client->topology->mutex); + /* Ignore the "Last server removed from topology" warning. */ + capture_logs (true); + /* remove the server description from the topology description. */ + mongoc_topology_description_reconcile (&tf->client->topology->description, + NULL); + capture_logs (false); + /* remove the server monitor from the set of server monitors. */ + _mongoc_topology_background_monitoring_reconcile (tf->client->topology); + bson_mutex_unlock (&tf->client->topology->mutex); +} + +static void +_add_server_monitor (test_fixture_t *tf) +{ + uint32_t id; + const mongoc_uri_t *uri; + + uri = mock_server_get_uri (tf->server); + bson_mutex_lock (&tf->client->topology->mutex); + /* remove the server description from the topology description. */ + mongoc_topology_description_add_server ( + &tf->client->topology->description, + mongoc_uri_get_hosts (uri)->host_and_port, + &id); + /* add the server monitor from the set of server monitors. */ + _mongoc_topology_background_monitoring_reconcile (tf->client->topology); + bson_mutex_unlock (&tf->client->topology->mutex); +} + +static void +_request_scan (test_fixture_t *tf) +{ + bson_mutex_lock (&tf->client->topology->mutex); + _mongoc_topology_request_scan (tf->client->topology); + bson_mutex_unlock (&tf->client->topology->mutex); +} + +void +test_connect_succeeds (void) +{ + test_fixture_t *tf; + request_t *request; + + tf = tf_new (0); + request = mock_server_receives_ismaster (tf->server); + mock_server_replies_ok_and_destroys (request); + + OBSERVE_SOON (tf, tf->observations->n_heartbeat_started == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 0); + OBSERVE_SOON (tf, tf->observations->n_server_changed == 1); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_STANDALONE); + + tf_destroy (tf); +} + +void +test_connect_hangup (void) +{ + test_fixture_t *tf; + request_t *request; + + tf = tf_new (0); + request = mock_server_receives_ismaster (tf->server); + mock_server_hangs_up (request); + request_destroy (request); + + OBSERVE_SOON (tf, tf->observations->n_heartbeat_started == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 0); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 1); + OBSERVE_SOON (tf, tf->observations->n_server_changed == 0); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_UNKNOWN); + + /* No retry occurs since the server was never discovered. */ + WAIT; + OBSERVE (tf, tf->observations->n_heartbeat_started == 1); + tf_destroy (tf); +} + +void +test_connect_badreply (void) +{ + test_fixture_t *tf; + request_t *request; + + tf = tf_new (0); + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_replies_simple (request, "{'ok': 0}"); + request_destroy (request); + + OBSERVE_SOON (tf, tf->observations->n_heartbeat_started == 1); + /* Still considered a successful heartbeat. */ + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 0); + OBSERVE_SOON (tf, tf->observations->n_server_changed == 0); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_UNKNOWN); + + /* No retry occurs since the server was never discovered. */ + WAIT; + OBSERVE (tf, tf->observations->n_heartbeat_started == 1); + tf_destroy (tf); +} + +void +test_connect_shutdown (void) +{ + test_fixture_t *tf; + request_t *request; + + tf = tf_new (0); + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + /* Before the server replies, signal the server monitor to shutdown. */ + _signal_shutdown (tf); + + /* Reply (or hang up) so the request does not wait for connectTimeoutMS to + * time out. */ + mock_server_replies_ok_and_destroys (request); + + /* Heartbeat succeeds, but server description is not updated. */ + OBSERVE_SOON (tf, tf->observations->n_heartbeat_started == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 0); + OBSERVE_SOON (tf, tf->observations->n_server_changed == 0); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_UNKNOWN); + + tf_destroy (tf); +} + +void +test_connect_requestscan (void) +{ + test_fixture_t *tf; + request_t *request; + + tf = tf_new (0); + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + /* Before the mock server replies, request a scan. */ + _request_scan (tf); + mock_server_replies_ok_and_destroys (request); + + /* Because the request occurred during the scan, no subsequent scan occurs. + */ + WAIT; + OBSERVE (tf, tf->observations->n_heartbeat_started == 1); + OBSERVE (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE (tf, tf->observations->n_heartbeat_failed == 0); + OBSERVE (tf, tf->observations->n_server_changed == 1); + OBSERVE (tf, tf->observations->sd_type == MONGOC_SERVER_STANDALONE); + + tf_destroy (tf); +} + +void +test_retry_succeeds (void) +{ + test_fixture_t *tf; + request_t *request; + + tf = tf_new (TF_FAST_HEARTBEAT); + + /* Initial discovery occurs. */ + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_replies_ok_and_destroys (request); + + /* Heartbeat succeeds, but server description is not updated. */ + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 0); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_STANDALONE); + + /* The next ismaster occurs (due to fast heartbeat). */ + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_hangs_up (request); + request_destroy (request); + + /* Server is still standalone. */ + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 1); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_STANDALONE); + + /* Retry occurs. */ + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_replies_ok_and_destroys (request); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 2); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 1); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_STANDALONE); + + tf_destroy (tf); +} + +void +test_retry_hangup (void) +{ + test_fixture_t *tf; + request_t *request; + + tf = tf_new (TF_FAST_HEARTBEAT); + + /* Initial discovery occurs. */ + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_replies_ok_and_destroys (request); + + /* Heartbeat succeeds, but server description is not updated. */ + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 0); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_STANDALONE); + + /* The next ismaster occurs (due to fast heartbeat). */ + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_hangs_up (request); + request_destroy (request); + + /* Server is still standalone. */ + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 1); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_STANDALONE); + + /* Retry occurs. */ + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_hangs_up (request); + request_destroy (request); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 2); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_UNKNOWN); + + tf_destroy (tf); +} + +void +test_retry_badreply (void) +{ + test_fixture_t *tf; + request_t *request; + + tf = tf_new (TF_FAST_HEARTBEAT); + + /* Initial discovery occurs. */ + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_replies_ok_and_destroys (request); + + /* Heartbeat succeeds, but server description is not updated. */ + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 0); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_STANDALONE); + + /* The next ismaster occurs (due to fast heartbeat). */ + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_hangs_up (request); + request_destroy (request); + + /* Server is still standalone. */ + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 1); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_STANDALONE); + + /* Retry occurs. */ + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_replies_simple (request, "{'ok': 0}"); + request_destroy (request); + /* Heartbeat succeeds, but server description is unknown. */ + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 2); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 1); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_UNKNOWN); + + tf_destroy (tf); +} + +void +test_retry_shutdown (void) +{ + test_fixture_t *tf; + request_t *request; + + tf = tf_new (TF_FAST_HEARTBEAT); + + /* Initial discovery occurs. */ + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_replies_ok_and_destroys (request); + + /* Heartbeat succeeds, but server description is not updated. */ + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == 1); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_failed == 0); + OBSERVE_SOON (tf, tf->observations->sd_type == MONGOC_SERVER_STANDALONE); + + /* The next ismaster occurs (due to fast heartbeat). */ + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + _signal_shutdown (tf); + mock_server_replies_ok_and_destroys (request); + + /* No retry occurs. */ + WAIT; + OBSERVE (tf, tf->observations->n_heartbeat_started == 2); + OBSERVE (tf, tf->observations->n_heartbeat_succeeded == 2); + OBSERVE (tf, tf->observations->n_heartbeat_failed == 0); + OBSERVE (tf, tf->observations->sd_type == MONGOC_SERVER_STANDALONE); + + tf_destroy (tf); +} + +static void +test_flip_flop (void) +{ + test_fixture_t *tf; + request_t *request; + int i; + + tf = tf_new (0); + + for (i = 1; i < 100; i++) { + MONGOC_DEBUG ("i=%d", i); + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_replies_ok_and_destroys (request); + _signal_shutdown (tf); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_started == i); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == i); + _add_server_monitor (tf); + } + + tf_destroy (tf); +} + +static void +test_repeated_requestscan (void) +{ + test_fixture_t *tf; + request_t *request; + int i; + + tf = tf_new (TF_FAST_MIN_HEARTBEAT); + + for (i = 1; i < 100; i++) { + request = mock_server_receives_ismaster (tf->server); + OBSERVE (tf, request); + mock_server_replies_ok_and_destroys (request); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_started == i); + OBSERVE_SOON (tf, tf->observations->n_heartbeat_succeeded == i); + _request_scan (tf); + } + + tf_destroy (tf); +} + +void +test_monitoring_install (TestSuite *suite) +{ + /* Tests for initial connection. */ + TestSuite_AddMockServerTest ( + suite, "/server_monitor/connect/succeeds", test_connect_succeeds); + TestSuite_AddMockServerTest ( + suite, "/server_monitor/connect/hangup", test_connect_hangup); + TestSuite_AddMockServerTest ( + suite, "/server_monitor/connect/badreply", test_connect_badreply); + TestSuite_AddMockServerTest ( + suite, "/server_monitor/connect/shutdown", test_connect_shutdown); + TestSuite_AddMockServerTest ( + suite, "/server_monitor/connect/requestscan", test_connect_requestscan); + + /* Tests for retry. */ + TestSuite_AddMockServerTest ( + suite, "/server_monitor/retry/succeeds", test_retry_succeeds); + TestSuite_AddMockServerTest ( + suite, "/server_monitor/retry/hangup", test_retry_hangup); + TestSuite_AddMockServerTest ( + suite, "/server_monitor/retry/badreply", test_retry_badreply); + TestSuite_AddMockServerTest ( + suite, "/server_monitor/retry/shutdown", test_retry_shutdown); + + /* Test flip flopping. */ + TestSuite_AddMockServerTest ( + suite, "/server_monitor/flip_flop", test_flip_flop); + + /* Test repeated scan requests. */ + TestSuite_AddMockServerTest ( + suite, "/server_monitor/repeated_requestscan", test_repeated_requestscan); +} diff --git a/src/libmongoc/tests/test-mongoc-client.c b/src/libmongoc/tests/test-mongoc-client.c index 2ceeca8a43e..7f584d0fc86 100644 --- a/src/libmongoc/tests/test-mongoc-client.c +++ b/src/libmongoc/tests/test-mongoc-client.c @@ -2456,7 +2456,10 @@ test_mongoc_client_descriptions (void) start = bson_get_monotonic_time (); do { _mongoc_usleep (1000); - if (bson_get_monotonic_time () - start > 1000 * 1000) { + /* Windows IPv4 tasks may take longer to connect since connection to the + * first address returned by getaddrinfo may be IPv6, and failure to + * connect may take a couple seconds. See CDRIVER-3639. */ + if (bson_get_monotonic_time () - start > 3 * 1000 * 1000) { test_error ("still have %d descriptions, not expected %d, after 1 sec", (int) n, (int) expected_n); @@ -2565,7 +2568,7 @@ _test_mongoc_client_select_server_error (bool pooled) if (pooled) { uri = test_framework_get_uri (); - mongoc_uri_set_option_as_int32 (uri, "serverSelectionTimeoutMS", 1000); + mongoc_uri_set_option_as_int32 (uri, "serverSelectionTimeoutMS", 3000); pool = mongoc_client_pool_new (uri); test_framework_set_pool_ssl_opts (pool); client = mongoc_client_pool_pop (pool); diff --git a/src/libmongoc/tests/test-mongoc-sdam-monitoring.c b/src/libmongoc/tests/test-mongoc-sdam-monitoring.c index 85001311e0d..e1db06b80d7 100644 --- a/src/libmongoc/tests/test-mongoc-sdam-monitoring.c +++ b/src/libmongoc/tests/test-mongoc-sdam-monitoring.c @@ -816,7 +816,7 @@ _test_heartbeat_fails_dns (bool pooled) context_init (&context); uri = mongoc_uri_new ( - "mongodb://doesntexist.foobar/?serverSelectionTimeoutMS=1000"); + "mongodb://doesntexist.foobar/?serverSelectionTimeoutMS=3000"); if (pooled) { pool = mongoc_client_pool_new (uri); pool_set_heartbeat_event_callbacks (pool, &context); diff --git a/src/libmongoc/tests/test-mongoc-topology-reconcile.c b/src/libmongoc/tests/test-mongoc-topology-reconcile.c index 77e3a0eba7b..99d91aa66e4 100644 --- a/src/libmongoc/tests/test-mongoc-topology-reconcile.c +++ b/src/libmongoc/tests/test-mongoc-topology-reconcile.c @@ -22,8 +22,7 @@ get_node (mongoc_topology_t *topology, const char *host_and_port) mongoc_topology_scanner_t *ts; mongoc_topology_scanner_node_t *node; mongoc_topology_scanner_node_t *sought = NULL; - - bson_mutex_lock (&topology->mutex); + BSON_ASSERT (topology->single_threaded); ts = topology->scanner; @@ -35,8 +34,6 @@ get_node (mongoc_topology_t *topology, const char *host_and_port) } } - bson_mutex_unlock (&topology->mutex); - return sought; } @@ -140,8 +137,10 @@ _test_topology_reconcile_rs (bool pooled) * server0 is selected, server1 is discovered and added to scanner. */ BSON_ASSERT (selects_server (client, secondary_read_prefs, server0)); - BSON_ASSERT ( - get_node (client->topology, mock_server_get_host_and_port (server1))); + if (!pooled) { + BSON_ASSERT ( + get_node (client->topology, mock_server_get_host_and_port (server1))); + } /* * select again with mode "primary": server1 is selected. @@ -283,15 +282,7 @@ _test_topology_reconcile_sharded (bool pooled) ASSERT_CMPSTR (sd->host.host_and_port, mock_server_get_host_and_port (mongos)); - if (pooled) { - /* wait a second for scanner thread to remove secondary */ - int64_t start = bson_get_monotonic_time (); - while (get_node (client->topology, - mock_server_get_host_and_port (secondary))) { - ASSERT_CMPTIME ((int) (bson_get_monotonic_time () - start), - (int) 1000000); - } - } else { + if (!pooled) { BSON_ASSERT (!get_node (client->topology, mock_server_get_host_and_port (secondary))); } @@ -456,13 +447,12 @@ test_topology_reconcile_from_handshake (void *ctx) * mode for good measure. */ static void -_test_topology_reconcile_retire (bool pooled) +test_topology_reconcile_retire_single (void) { mock_server_t *secondary; mock_server_t *primary; char *uri_str; mongoc_uri_t *uri; - mongoc_client_pool_t *pool = NULL; mongoc_client_t *client; mongoc_topology_t *topology; mongoc_read_prefs_t *primary_read_prefs; @@ -491,14 +481,9 @@ _test_topology_reconcile_retire (bool pooled) uri = mongoc_uri_new (uri_str); - if (pooled) { - pool = mongoc_client_pool_new (uri); - topology = _mongoc_client_pool_get_topology (pool); - client = mongoc_client_pool_pop (pool); - } else { - client = mongoc_client_new (uri_str); - topology = client->topology; - } + client = mongoc_client_new (uri_str); + topology = client->topology; + /* step 1: discover both replica set members */ primary_read_prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY); @@ -536,13 +521,11 @@ _test_topology_reconcile_retire (bool pooled) /* server removed from topology description. in pooled mode, the scanner node * is untouched, in single mode mongoc_cluster_fetch_stream_single scans and * updates topology */ - node = get_node (topology, mock_server_get_host_and_port (secondary)); - if (pooled) { - BSON_ASSERT (node); - BSON_ASSERT (!node->retired); - } else { - BSON_ASSERT (!node); - } + + + BSON_ASSERT ( + !get_node (topology, mock_server_get_host_and_port (secondary))); + /* step 7: trigger a scan by selecting with an unsatisfiable read preference. * should not crash with BSON_ASSERT. */ @@ -551,15 +534,12 @@ _test_topology_reconcile_retire (bool pooled) BSON_ASSERT ( !mongoc_client_select_server (client, false, tag_read_prefs, NULL)); + BSON_ASSERT ( !get_node (topology, mock_server_get_host_and_port (secondary))); - if (pooled) { - mongoc_client_pool_push (pool, client); - mongoc_client_pool_destroy (pool); - } else { - mongoc_client_destroy (client); - } + mongoc_client_destroy (client); + future_destroy (future); mock_server_destroy (primary); @@ -572,20 +552,6 @@ _test_topology_reconcile_retire (bool pooled) } -static void -test_topology_reconcile_retire_single (void) -{ - _test_topology_reconcile_retire (false); -} - - -static void -test_topology_reconcile_retire_pooled (void) -{ - _test_topology_reconcile_retire (true); -} - - /* CDRIVER-2552 in mongoc_topology_scanner_start, assert (!node->cmd) * failed after this sequence in libmongoc 1.6.0: * @@ -602,13 +568,12 @@ test_topology_reconcile_retire_pooled (void) * test that in step 5 the new node has no new async_cmd_t */ static void -_test_topology_reconcile_add (bool pooled) +test_topology_reconcile_add_single (void) { mock_server_t *secondary; mock_server_t *primary; char *uri_str; mongoc_uri_t *uri; - mongoc_client_pool_t *pool = NULL; mongoc_client_t *client; mongoc_topology_t *topology; mongoc_read_prefs_t *primary_read_prefs; @@ -636,14 +601,8 @@ _test_topology_reconcile_add (bool pooled) uri = mongoc_uri_new (uri_str); - if (pooled) { - pool = mongoc_client_pool_new (uri); - topology = _mongoc_client_pool_get_topology (pool); - client = mongoc_client_pool_pop (pool); - } else { - client = mongoc_client_new (uri_str); - topology = client->topology; - } + client = mongoc_client_new (uri_str); + topology = client->topology; /* step 1: discover primary */ primary_read_prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY); @@ -674,22 +633,14 @@ _test_topology_reconcile_add (bool pooled) topology, mock_server_get_host_and_port (secondary))); node = get_node (topology, mock_server_get_host_and_port (secondary)); - if (pooled) { - /* no asymc_cmd_t is created, since we're not in the scanner loop */ - BSON_ASSERT (!topology->scanner->async->cmds); - BSON_ASSERT (!node); - } else { - /* in single mode the client completes a scan inline and frees all cmds */ - BSON_ASSERT (!topology->scanner->async->cmds); - BSON_ASSERT (node); - } - if (pooled) { - mongoc_client_pool_push (pool, client); - mongoc_client_pool_destroy (pool); - } else { - mongoc_client_destroy (client); - } + /* in single mode the client completes a scan inline and frees all cmds */ + BSON_ASSERT (!topology->scanner->async->cmds); + BSON_ASSERT (node); + + + mongoc_client_destroy (client); + future_destroy (future); mock_server_destroy (primary); @@ -700,20 +651,6 @@ _test_topology_reconcile_add (bool pooled) } -static void -test_topology_reconcile_add_single (void) -{ - _test_topology_reconcile_add (false); -} - - -static void -test_topology_reconcile_add_pooled (void) -{ - _test_topology_reconcile_add (true); -} - - void test_topology_reconcile_install (TestSuite *suite) { @@ -737,18 +674,10 @@ test_topology_reconcile_install (TestSuite *suite) NULL, NULL, test_framework_skip_if_not_replset); - TestSuite_AddMockServerTest (suite, - "/TOPOLOGY/reconcile/retire/pooled", - test_topology_reconcile_retire_pooled, - test_framework_skip_if_slow); TestSuite_AddMockServerTest (suite, "/TOPOLOGY/reconcile/retire/single", test_topology_reconcile_retire_single, test_framework_skip_if_slow); - TestSuite_AddMockServerTest (suite, - "/TOPOLOGY/reconcile/add/pooled", - test_topology_reconcile_add_pooled, - test_framework_skip_if_slow); TestSuite_AddMockServerTest (suite, "/TOPOLOGY/reconcile/add/single", test_topology_reconcile_add_single, diff --git a/src/libmongoc/tests/test-mongoc-topology.c b/src/libmongoc/tests/test-mongoc-topology.c index a2ec898c69d..9d61664fe7e 100644 --- a/src/libmongoc/tests/test-mongoc-topology.c +++ b/src/libmongoc/tests/test-mongoc-topology.c @@ -4,6 +4,7 @@ #include "mongoc/mongoc-client-private.h" #include "mongoc/mongoc-util-private.h" +#include "mongoc/mongoc-topology-background-monitoring-private.h" #include "TestSuite.h" #include "test-libmongoc.h" @@ -15,6 +16,123 @@ #undef MONGOC_LOG_DOMAIN #define MONGOC_LOG_DOMAIN "topology-test" +typedef struct { + int n_started; + int n_succeeded; + int n_failed; + int n_unknowns; + bson_mutex_t mutex; +} checks_t; + +static void +checks_init (checks_t *checks) +{ + memset (checks, 0, sizeof (*checks)); + bson_mutex_init (&checks->mutex); +} + +static void +checks_cleanup (checks_t *checks) +{ + bson_mutex_destroy (&checks->mutex); +} + +static bool +checks_cmp (checks_t *checks, const char *metric, char cmp, int expected) +{ + int actual; + + bson_mutex_lock (&checks->mutex); + if (0 == strcmp (metric, "n_started")) { + actual = checks->n_started; + } else if (0 == strcmp (metric, "n_succeeded")) { + actual = checks->n_succeeded; + } else if (0 == strcmp (metric, "n_failed")) { + actual = checks->n_failed; + } else if (0 == strcmp (metric, "n_unknowns")) { + actual = checks->n_unknowns; + } else { + test_error ("unknown metric: %s", metric); + } + + bson_mutex_unlock (&checks->mutex); + + if (cmp == '=') { + return expected == actual; + } else if (cmp == '>') { + return expected > actual; + } else if (cmp == '<') { + return expected < actual; + } else { + test_error ("unknown comparison: %c", cmp); + } + return false; +} + +static void +check_started (const mongoc_apm_server_heartbeat_started_t *event) +{ + checks_t *c; + + c = (checks_t *) mongoc_apm_server_heartbeat_started_get_context (event); + bson_mutex_lock (&c->mutex); + c->n_started++; + bson_mutex_unlock (&c->mutex); +} + + +static void +check_succeeded (const mongoc_apm_server_heartbeat_succeeded_t *event) +{ + checks_t *c; + + c = (checks_t *) mongoc_apm_server_heartbeat_succeeded_get_context (event); + bson_mutex_lock (&c->mutex); + c->n_succeeded++; + bson_mutex_unlock (&c->mutex); +} + + +static void +check_failed (const mongoc_apm_server_heartbeat_failed_t *event) +{ + checks_t *c; + + c = (checks_t *) mongoc_apm_server_heartbeat_failed_get_context (event); + bson_mutex_lock (&c->mutex); + c->n_failed++; + bson_mutex_unlock (&c->mutex); +} + +static void +server_changed_callback (const mongoc_apm_server_changed_t *event) +{ + checks_t *c; + const mongoc_server_description_t *sd; + + c = (checks_t *) mongoc_apm_server_changed_get_context (event); + bson_mutex_lock (&c->mutex); + sd = mongoc_apm_server_changed_get_new_description (event); + if (sd->type == MONGOC_SERVER_UNKNOWN) { + c->n_unknowns++; + } + bson_mutex_unlock (&c->mutex); +} + + +static mongoc_apm_callbacks_t * +heartbeat_callbacks (void) +{ + mongoc_apm_callbacks_t *callbacks; + + callbacks = mongoc_apm_callbacks_new (); + mongoc_apm_set_server_heartbeat_started_cb (callbacks, check_started); + mongoc_apm_set_server_heartbeat_succeeded_cb (callbacks, check_succeeded); + mongoc_apm_set_server_heartbeat_failed_cb (callbacks, check_failed); + mongoc_apm_set_server_changed_cb (callbacks, server_changed_callback); + + return callbacks; +} static void test_topology_client_creation (void) @@ -99,25 +217,35 @@ test_topology_thread_start_stop (void) topology = _mongoc_client_pool_get_topology (pool); /* Test starting up the scanner */ - ASSERT (_mongoc_topology_start_background_scanner (topology)); + bson_mutex_lock (&topology->mutex); + _mongoc_topology_background_monitoring_start (topology); + bson_mutex_unlock (&topology->mutex); assert_topology_state (topology, MONGOC_TOPOLOGY_SCANNER_BG_RUNNING); /* Test that starting the topology while it is already running is ok to do. */ - ASSERT (_mongoc_topology_start_background_scanner (topology)); + bson_mutex_lock (&topology->mutex); + _mongoc_topology_background_monitoring_start (topology); + bson_mutex_unlock (&topology->mutex); assert_topology_state (topology, MONGOC_TOPOLOGY_SCANNER_BG_RUNNING); /* Test that we can stop the topology */ - _mongoc_topology_background_thread_stop (topology); + bson_mutex_lock (&topology->mutex); + _mongoc_topology_background_monitoring_stop (topology); + bson_mutex_unlock (&topology->mutex); assert_topology_state (topology, MONGOC_TOPOLOGY_SCANNER_OFF); /* Test that stopping the topology when it is already stopped is ok to do. */ - _mongoc_topology_background_thread_stop (topology); + bson_mutex_lock (&topology->mutex); + _mongoc_topology_background_monitoring_stop (topology); + bson_mutex_unlock (&topology->mutex); assert_topology_state (topology, MONGOC_TOPOLOGY_SCANNER_OFF); /* Test that we can start the topology again after stopping it */ - ASSERT (_mongoc_topology_start_background_scanner (topology)); + bson_mutex_lock (&topology->mutex); + _mongoc_topology_background_monitoring_start (topology); + bson_mutex_unlock (&topology->mutex); assert_topology_state (topology, MONGOC_TOPOLOGY_SCANNER_BG_RUNNING); mongoc_client_pool_destroy (pool); @@ -339,17 +467,27 @@ _test_topology_invalidate_server (bool pooled) uint32_t fake_id = 42; uint32_t id; mongoc_server_stream_t *server_stream; + checks_t checks; + int server_count; + mongoc_apm_callbacks_t *callbacks; + checks_init (&checks); uri = test_framework_get_uri (); /* no auto heartbeat */ mongoc_uri_set_option_as_int32 (uri, "heartbeatFrequencyMS", INT32_MAX); - mongoc_uri_set_option_as_int32 (uri, "connectTimeoutMS", 2000); + mongoc_uri_set_option_as_int32 (uri, "connectTimeoutMS", 3000); + server_count = test_framework_server_count (); + callbacks = heartbeat_callbacks (); if (pooled) { pool = mongoc_client_pool_new (uri); + mongoc_client_pool_set_apm_callbacks (pool, callbacks, &checks); test_framework_set_pool_ssl_opts (pool); client = mongoc_client_pool_pop (pool); + /* wait for all nodes to be scanned. */ + WAIT_UNTIL (checks_cmp (&checks, "n_succeeded", '=', server_count)); + /* background scanner complains about failed connection */ capture_logs (true); } else { @@ -395,6 +533,10 @@ _test_topology_invalidate_server (bool pooled) &client->cluster, fake_id, true, NULL, NULL, &error)); bson_mutex_lock (&client->topology->mutex); sd = (mongoc_server_description_t *) mongoc_set_get (td->servers, fake_id); + /* A single threaded client, during reconnect, will scan ALL servers. + * When it receives a response from one of those nodes, showing that + * "fakeaddress" is not in the host list, it will remove the + * server description from the topology description. */ if (!pooled && test_framework_is_replset ()) { BSON_ASSERT (!sd); } else { @@ -419,6 +561,8 @@ _test_topology_invalidate_server (bool pooled) } else { mongoc_client_destroy (client); } + mongoc_apm_callbacks_destroy (callbacks); + checks_cleanup (&checks); } static void @@ -1191,57 +1335,6 @@ test_add_and_scan_failure (void) } -typedef struct { - int n_started; - int n_succeeded; - int n_failed; -} checks_t; - - -static void -check_started (const mongoc_apm_server_heartbeat_started_t *event) -{ - checks_t *c; - - c = (checks_t *) mongoc_apm_server_heartbeat_started_get_context (event); - c->n_started++; -} - - -static void -check_succeeded (const mongoc_apm_server_heartbeat_succeeded_t *event) -{ - checks_t *c; - - c = (checks_t *) mongoc_apm_server_heartbeat_succeeded_get_context (event); - c->n_succeeded++; -} - - -static void -check_failed (const mongoc_apm_server_heartbeat_failed_t *event) -{ - checks_t *c; - - c = (checks_t *) mongoc_apm_server_heartbeat_failed_get_context (event); - c->n_failed++; -} - - -static mongoc_apm_callbacks_t * -heartbeat_callbacks (void) -{ - mongoc_apm_callbacks_t *callbacks; - - callbacks = mongoc_apm_callbacks_new (); - mongoc_apm_set_server_heartbeat_started_cb (callbacks, check_started); - mongoc_apm_set_server_heartbeat_succeeded_cb (callbacks, check_succeeded); - mongoc_apm_set_server_heartbeat_failed_cb (callbacks, check_failed); - - return callbacks; -} - - static future_t * future_command (mongoc_client_t *client, bson_error_t *error) { @@ -1281,7 +1374,7 @@ has_known_server (mongoc_client_t *client) static void _test_ismaster_retry_single (bool hangup, int n_failures) { - checks_t checks = {0}; + checks_t checks; mongoc_apm_callbacks_t *callbacks; mock_server_t *server; mongoc_uri_t *uri; @@ -1292,6 +1385,7 @@ _test_ismaster_retry_single (bool hangup, int n_failures) bson_error_t error; int64_t t; + checks_init (&checks); server = mock_server_new (); mock_server_run (server); uri = mongoc_uri_copy (mock_server_get_uri (server)); @@ -1367,13 +1461,14 @@ _test_ismaster_retry_single (bool hangup, int n_failures) mock_server_destroy (server); bson_free (ismaster); mongoc_apm_callbacks_destroy (callbacks); + checks_cleanup (&checks); } static void _test_ismaster_retry_pooled (bool hangup, int n_failures) { - checks_t checks = {0}; + checks_t checks; mongoc_apm_callbacks_t *callbacks; mock_server_t *server; mongoc_uri_t *uri; @@ -1386,6 +1481,7 @@ _test_ismaster_retry_pooled (bool hangup, int n_failures) int i; int64_t t; + checks_init (&checks); server = mock_server_new (); mock_server_run (server); uri = mongoc_uri_copy (mock_server_get_uri (server)); @@ -1433,11 +1529,14 @@ _test_ismaster_retry_pooled (bool hangup, int n_failures) /* retry immediately (for testing, "immediately" means less than 250ms */ request = mock_server_receives_ismaster (server); ASSERT_CMPINT64 (bson_get_monotonic_time () - t, <, (int64_t) 250 * 1000); + /* Since connection was established successfully, the server description is + * not marked as Unknown until after a failed retry attempt. */ + BSON_ASSERT (has_known_server (client)); if (n_failures == 2) { if (hangup) { mock_server_hangs_up (request); } - BSON_ASSERT (!has_known_server (client)); + WAIT_UNTIL (!has_known_server (client)); } else { mock_server_replies_simple (request, ismaster); WAIT_UNTIL (has_known_server (client)); @@ -1445,9 +1544,9 @@ _test_ismaster_retry_pooled (bool hangup, int n_failures) request_destroy (request); - WAIT_UNTIL (checks.n_succeeded == 3 - n_failures); - WAIT_UNTIL (checks.n_failed == n_failures); - ASSERT_CMPINT (checks.n_started, ==, 3); + WAIT_UNTIL (checks_cmp (&checks, "n_succeeded", '=', 3 - n_failures)); + WAIT_UNTIL (checks_cmp (&checks, "n_failed", '=', n_failures)); + BSON_ASSERT (checks_cmp (&checks, "n_started", '=', 3)); mongoc_client_pool_push (pool, client); mongoc_client_pool_destroy (pool); @@ -1455,6 +1554,7 @@ _test_ismaster_retry_pooled (bool hangup, int n_failures) mock_server_destroy (server); bson_free (ismaster); mongoc_apm_callbacks_destroy (callbacks); + checks_cleanup (&checks); } @@ -1684,38 +1784,6 @@ test_cluster_time_updated_during_handshake () mongoc_uri_destroy (uri); } -/* returns the last time the topology completed a full scan. */ -static int64_t -_get_last_scan (mongoc_client_t *client) -{ - int64_t last_scan; - mongoc_topology_t *topology = client->topology; - bson_mutex_lock (&topology->mutex); - last_scan = topology->last_scan; - bson_mutex_unlock (&topology->mutex); - return last_scan; -} - -typedef struct { - int64_t when_transitioned_to_unknown; - int64_t server_id; -} request_scan_error_ctx_t; - -static void -_server_changed (const mongoc_apm_server_changed_t *event) -{ - const mongoc_server_description_t *sd; - request_scan_error_ctx_t *ctx; - - ctx = (request_scan_error_ctx_t *) mongoc_apm_server_changed_get_context ( - event); - sd = mongoc_apm_server_changed_get_new_description (event); - if (sd->type == MONGOC_SERVER_UNKNOWN) { - ctx->when_transitioned_to_unknown = bson_get_monotonic_time (); - ctx->server_id = sd->id; - } -} - /* test that when a command receives a "not master" or "node is recovering" * error that the client takes the appropriate action: * - a pooled client should mark the server as unknown and request a full scan @@ -1739,17 +1807,28 @@ _test_request_scan_on_error (bool pooled, bson_error_t error = {0}; future_t *future = NULL; request_t *request; - const int64_t minHBMS = 50; - int64_t last_scan = 0; - mongoc_read_prefs_t *read_prefs; + const int64_t minHBMS = 10; + int64_t ping_started_usec = 0; mongoc_apm_callbacks_t *callbacks; - request_scan_error_ctx_t ctx = {0}; + checks_t checks; + mongoc_server_description_t *sd; + uint32_t primary_id; + mongoc_read_prefs_t *read_prefs; + + MONGOC_DEBUG ("pooled? %d", (int) pooled); + MONGOC_DEBUG ("err_response %s", err_response); + MONGOC_DEBUG ("should_scan %d, should_mark_unknown: %d", + (int) should_scan, + (int) should_mark_unknown); + MONGOC_DEBUG ("server_error %s", server_err); + + checks_init (&checks); + read_prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY_PREFERRED); primary = mock_server_new (); secondary = mock_server_new (); mock_server_run (primary); mock_server_run (secondary); - read_prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY_PREFERRED); RS_RESPONSE_TO_ISMASTER (primary, 6, true, false, primary, secondary); RS_RESPONSE_TO_ISMASTER (secondary, 6, false, false, primary, secondary); @@ -1768,31 +1847,34 @@ _test_request_scan_on_error (bool pooled, topology = _mongoc_client_pool_get_topology (client_pool); /* set a small minHeartbeatFrequency, so scans don't block for 500ms. */ topology->min_heartbeat_frequency_msec = minHBMS; - client = mongoc_client_pool_pop (client_pool); - /* upon popping a client, the background monitoring thread is started. */ - /* wait for the initial server selection to finish. */ - WAIT_UNTIL (_get_last_scan (client) > last_scan); } else { - mongoc_server_description_t *sd; client = mongoc_client_new_from_uri (uri); /* set a small minHeartbeatFrequency, so scans don't block for 500ms. */ client->topology->min_heartbeat_frequency_msec = minHBMS; - sd = mongoc_client_select_server (client, false, NULL, &error); - ASSERT_OR_PRINT (sd, error); - mongoc_server_description_destroy (sd); } - mongoc_uri_destroy (uri); - /* now that the initial server selection is completed, record the time. */ - last_scan = _get_last_scan (client); - /* listen for transition to UNKNOWN */ - callbacks = mongoc_apm_callbacks_new (); - mongoc_apm_set_server_changed_cb (callbacks, _server_changed); + + callbacks = heartbeat_callbacks (); if (pooled) { - mongoc_client_pool_set_apm_callbacks (client_pool, callbacks, &ctx); + mongoc_client_pool_set_apm_callbacks (client_pool, callbacks, &checks); } else { - mongoc_client_set_apm_callbacks (client, callbacks, &ctx); + mongoc_client_set_apm_callbacks (client, callbacks, &checks); } mongoc_apm_callbacks_destroy (callbacks); + + if (pooled) { + client = mongoc_client_pool_pop (client_pool); + /* Scanning starts, wait for the initial scan. */ + WAIT_UNTIL (checks_cmp (&checks, "n_succeeded", '=', 2)); + } + + sd = mongoc_client_select_server (client, true, NULL, &error); + ASSERT_OR_PRINT (sd, error); + primary_id = sd->id; + mongoc_server_description_destroy (sd); + BSON_ASSERT (checks_cmp (&checks, "n_succeeded", '=', 2)); + + mongoc_uri_destroy (uri); + ping_started_usec = bson_get_monotonic_time (); /* run a ping command on the primary. */ future = future_client_command_simple ( client, "db", tmp_bson ("{'ping': 1}"), read_prefs, &reply, &error); @@ -1810,32 +1892,42 @@ _test_request_scan_on_error (bool pooled, future_destroy (future); bson_destroy (&reply); + sd = mongoc_client_get_server_description (client, primary_id); if (should_mark_unknown) { - mongoc_server_description_t *sd; - /* between sending the 'ping' command and returning, the server should - * have been marked as unknown. */ - ASSERT_CMPINT64 (last_scan, <=, ctx.when_transitioned_to_unknown); - ASSERT_CMPINT64 ( - ctx.when_transitioned_to_unknown, <=, bson_get_monotonic_time ()); - sd = mongoc_client_get_server_description (client, - (uint32_t) ctx.server_id); - /* check that the error on the server description matches the error - * message in the response. */ - if (server_err) { - ASSERT_CMPSTR (server_err, sd->error.message); + BSON_ASSERT (checks_cmp (&checks, "n_unknowns", '=', 1)); + /* background monitoring may have already overwritten the unknown server + * description if the scan was requested. */ + if (pooled) { + if (sd->type == MONGOC_SERVER_UNKNOWN) { + if (server_err) { + ASSERT_CMPSTR (server_err, sd->error.message); + } + } + } else { + /* after the 'ping' command and returning, the server should + * have been marked as unknown. */ + BSON_ASSERT (sd->type == MONGOC_SERVER_UNKNOWN); + ASSERT_CMPINT (sd->last_update_time_usec, >=, ping_started_usec); + ASSERT_CMPINT ( + sd->last_update_time_usec, <=, bson_get_monotonic_time ()); + /* check that the error on the server description matches the error + * message in the response. */ + if (server_err) { + ASSERT_CMPSTR (server_err, sd->error.message); + } } - mongoc_server_description_destroy (sd); } else { - ASSERT_CMPINT64 (ctx.when_transitioned_to_unknown, ==, (int64_t) 0); + BSON_ASSERT (sd->type != MONGOC_SERVER_UNKNOWN); } + mongoc_server_description_destroy (sd); if (pooled) { if (should_scan) { /* a scan is requested immediately. wait for the scan to finish. */ - WAIT_UNTIL (_get_last_scan (client) > last_scan); + WAIT_UNTIL (checks_cmp (&checks, "n_started", '=', 4)); } else { - /* wait a short while to make sure no scan occurs. */ - _mongoc_usleep (10 * 1000); + _mongoc_usleep (minHBMS * 2); + BSON_ASSERT (checks_cmp (&checks, "n_started", '=', 2)); } } else { /* a single threaded client may mark the topology as stale. if a scan @@ -1856,15 +1948,13 @@ _test_request_scan_on_error (bool pooled, BSON_ASSERT (future_get_bool (future)); future_destroy (future); bson_destroy (&reply); + if (should_scan) { + BSON_ASSERT (checks_cmp (&checks, "n_started", '=', 4)); + } else { + BSON_ASSERT (checks_cmp (&checks, "n_started", '=', 2)); + } } - if (should_scan) { - ASSERT_CMPINT64 (last_scan, <, _get_last_scan (client)); - } else { - ASSERT_CMPINT64 (last_scan, ==, _get_last_scan (client)); - } - - mongoc_read_prefs_destroy (read_prefs); if (pooled) { mongoc_client_pool_push (client_pool, client); mongoc_client_pool_destroy (client_pool); @@ -1873,10 +1963,12 @@ _test_request_scan_on_error (bool pooled, } mock_server_destroy (primary); mock_server_destroy (secondary); + mongoc_read_prefs_destroy (read_prefs); + checks_cleanup (&checks); } static void -test_last_server_removed_warning () +test_last_server_removed_warning (void) { mock_server_t *server; mongoc_client_t *client; @@ -1988,7 +2080,113 @@ test_request_scan_on_error () #undef TEST_SINGLE } +/* Test that the issue described in CDRIVER-3625 is fixed. + * A slow-to-respond server should not block the scan of other servers + * in background monitoring. + */ +static void +test_slow_server_pooled (void) +{ + mock_server_t *primary; + mock_server_t *secondary; + char *ismaster_common; + char *ismaster_primary; + char *ismaster_secondary; + mongoc_read_prefs_t *prefs_secondary; + mongoc_client_pool_t *pool; + mongoc_client_t *client; + mongoc_uri_t *uri; + mongoc_apm_callbacks_t *callbacks; + request_t *request; + checks_t checks; + bool ret; + bson_error_t error; + + checks_init (&checks); + primary = mock_server_new (); + secondary = mock_server_new (); + + mock_server_run (primary); + mock_server_run (secondary); + + mock_server_autoresponds (primary, auto_ping, NULL, NULL); + mock_server_autoresponds (secondary, auto_ping, NULL, NULL); + + ismaster_common = bson_strdup_printf ( + "{'ok': 1, 'setName': 'rs', 'hosts': ['%s', '%s'], 'maxWireVersion': %d", + mock_server_get_host_and_port (primary), + mock_server_get_host_and_port (secondary), + WIRE_VERSION_MAX); + ismaster_primary = bson_strdup_printf ( + "%s, 'ismaster': true, 'secondary': false }", ismaster_common); + ismaster_secondary = bson_strdup_printf ( + "%s, 'ismaster': false, 'secondary': true }", ismaster_common); + + /* Primary response immediately, but secondary does not. */ + mock_server_auto_ismaster (primary, ismaster_primary); + + uri = mongoc_uri_copy (mock_server_get_uri (primary)); + /* Do not connect as topology type Single, so the client pool discovers the + * secondary. */ + mongoc_uri_set_option_as_bool (uri, MONGOC_URI_DIRECTCONNECTION, false); + mongoc_uri_set_option_as_int32 ( + uri, MONGOC_URI_SERVERSELECTIONTIMEOUTMS, 500); + + pool = mongoc_client_pool_new (uri); + callbacks = heartbeat_callbacks (); + mongoc_client_pool_set_apm_callbacks (pool, callbacks, &checks); + + /* Set a shorter heartbeat frequencies for faster responses. */ + _mongoc_client_pool_get_topology (pool)->description.heartbeat_msec = 10; + _mongoc_client_pool_get_topology (pool)->min_heartbeat_frequency_msec = 10; + + client = mongoc_client_pool_pop (pool); + /* As soon as a client is popped, background scanning starts. + * Wait for two scans of the primary. */ + WAIT_UNTIL (checks_cmp (&checks, "n_started", '>', 1)); + + request = mock_server_receives_ismaster (secondary); + + /* A command to the primary succeeds. */ + ret = mongoc_client_command_simple ( + client, "admin", tmp_bson ("{'ping': 1}"), NULL, NULL, &error); + ASSERT_OR_PRINT (ret, error); + /* A command to the secondary fails. */ + prefs_secondary = mongoc_read_prefs_new (MONGOC_READ_SECONDARY); + ret = mongoc_client_command_simple ( + client, "admin", tmp_bson ("{'ping': 1}"), prefs_secondary, NULL, &error); + ASSERT_ERROR_CONTAINS (error, + MONGOC_ERROR_SERVER_SELECTION, + MONGOC_ERROR_SERVER_SELECTION_FAILURE, + "expired"); + BSON_ASSERT (!ret); + + /* Set up an auto responder so future ismasters on the secondary does not + * block until connectTimeoutMS. Otherwise, the shutdown sequence will be + * blocked for connectTimeoutMS. */ + mock_server_auto_ismaster (secondary, ismaster_secondary); + /* Respond to the first ismaster. */ + mock_server_replies_simple (request, ismaster_secondary); + request_destroy (request); + + /* Now a command to the secondary succeeds. */ + ret = mongoc_client_command_simple ( + client, "admin", tmp_bson ("{'ping': 1}"), prefs_secondary, NULL, &error); + ASSERT_OR_PRINT (ret, error); + + mongoc_read_prefs_destroy (prefs_secondary); + mongoc_client_pool_push (pool, client); + mongoc_apm_callbacks_destroy (callbacks); + mongoc_client_pool_destroy (pool); + mongoc_uri_destroy (uri); + bson_free (ismaster_secondary); + bson_free (ismaster_primary); + bson_free (ismaster_common); + mock_server_destroy (secondary); + mock_server_destroy (primary); + checks_cleanup (&checks); +} void test_topology_install (TestSuite *suite) { @@ -2141,4 +2339,6 @@ test_topology_install (TestSuite *suite) TestSuite_AddMockServerTest (suite, "/Topology/last_server_removed_warning", test_last_server_removed_warning); + TestSuite_AddMockServerTest ( + suite, "/Topology/slow_server/pooled", test_slow_server_pooled); }