diff --git a/build/generate-future-functions.py b/build/generate-future-functions.py index 9d0c4cd6c59..1f636d8e452 100644 --- a/build/generate-future-functions.py +++ b/build/generate-future-functions.py @@ -56,6 +56,7 @@ typedef("size_t", None), typedef("ssize_t", None), typedef("uint32_t", None), + typedef("void_ptr", "void *"), # Const fundamental. typedef("const_char_ptr", "const char *"), diff --git a/src/libmongoc/CMakeLists.txt b/src/libmongoc/CMakeLists.txt index 6c52b49554e..6740486e28c 100644 --- a/src/libmongoc/CMakeLists.txt +++ b/src/libmongoc/CMakeLists.txt @@ -514,6 +514,7 @@ set (SOURCES ${SOURCES} ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-host-list.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-index.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-init.c + ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-interrupt.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-list.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-linux-distro-scanner.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-log.c @@ -896,6 +897,7 @@ set (test-libmongoc-sources ${PROJECT_SOURCE_DIR}/tests/test-mongoc-gridfs.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-handshake.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-hedged-reads.c + ${PROJECT_SOURCE_DIR}/tests/test-mongoc-interrupt.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-linux-distro-scanner.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-list.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-log.c diff --git a/src/libmongoc/src/mongoc/CMakeLists.txt b/src/libmongoc/src/mongoc/CMakeLists.txt index e682e01e52c..3c17c714222 100644 --- a/src/libmongoc/src/mongoc/CMakeLists.txt +++ b/src/libmongoc/src/mongoc/CMakeLists.txt @@ -115,6 +115,7 @@ set (src_libmongoc_src_mongoc_DIST_noinst_hs mongoc-handshake-os-private.h mongoc-handshake-private.h mongoc-host-list-private.h + mongoc-interrupt-private.h mongoc-libressl-private.h mongoc-linux-distro-scanner-private.h mongoc-list-private.h @@ -197,6 +198,7 @@ set (src_libmongoc_src_mongoc_DIST_cs mongoc-find-and-modify.c mongoc-host-list.c mongoc-init.c + mongoc-interrupt.c mongoc-gridfs.c mongoc-gridfs-bucket.c mongoc-gridfs-bucket-file.c diff --git a/src/libmongoc/src/mongoc/mongoc-interrupt-private.h b/src/libmongoc/src/mongoc/mongoc-interrupt-private.h new file mode 100644 index 00000000000..e764fdd8c39 --- /dev/null +++ b/src/libmongoc/src/mongoc/mongoc-interrupt-private.h @@ -0,0 +1,57 @@ +/* + * Copyright 2020-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mongoc-prelude.h" + +#ifndef MONGOC_STREAM_INTERRUPTIBLE_PRIVATE_H +#define MONGOC_STREAM_INTERRUPTIBLE_PRIVATE_H + +#include "mongoc-stream.h" + +/* Creates a stream to use to interrupt calls to mongoc_stream_poll. + * + * The expected use is to cancel in-progress ismaster commands (especially for + * awaitable ismaster). An ismaster command may not respond for a long time, so + * reading the reply may block on mongoc_stream_poll until data is readable. To + * interrupt mongoc_stream_poll, a stream retrieved by + * _mongoc_interrupt_get_stream can be added to the call of poll. Any other + * thread can call _mongoc_interrupt_interrupt to write to that stream. + */ +typedef struct _mongoc_interrupt_t mongoc_interrupt_t; + +mongoc_interrupt_t * +_mongoc_interrupt_new (uint32_t timeout_ms); + +/* Interrupt the stream. An in progress poll for POLLIN should return. */ +bool +_mongoc_interrupt_interrupt (mongoc_interrupt_t *interrupt); + +/* Returns a socket stream, that can be polled alongside other + * socket streams. */ +mongoc_stream_t * +_mongoc_interrupt_get_stream (mongoc_interrupt_t *interrupt); + +/* Flushes queued data on an interrupt. + * + * This is not guaranteed to flush all data, but it does not block. + */ +bool +_mongoc_interrupt_flush (mongoc_interrupt_t *interrupt); + +void +_mongoc_interrupt_destroy (mongoc_interrupt_t *interrupt); + +#endif /* MONGOC_STREAM_INTERRUPTIBLE_PRIVATE_H */ diff --git a/src/libmongoc/src/mongoc/mongoc-interrupt.c b/src/libmongoc/src/mongoc/mongoc-interrupt.c new file mode 100644 index 00000000000..577bcc2c41f --- /dev/null +++ b/src/libmongoc/src/mongoc/mongoc-interrupt.c @@ -0,0 +1,321 @@ +/* + * Copyright 2020-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mongoc/mongoc-errno-private.h" +#include "mongoc/mongoc-interrupt-private.h" +#include "mongoc/mongoc-log.h" +#include "mongoc/mongoc-socket-private.h" +#include "mongoc/mongoc-stream-socket.h" +#include "mongoc/mongoc-trace-private.h" +#include "common-thread-private.h" + +/* The interrupt stream is implemented in two ways. + * On POSIX, this uses the self-pipe trick. + * On Windows, this uses a pair of TCP sockets. + */ +struct _mongoc_interrupt_t { + bson_mutex_t mutex; + + union { + /* For POSIX. pipe_fds[0] is the read end and pipe_fds[1] is the write + * end. */ + int pipe_fds[2]; + + /* For Windows */ + struct { + mongoc_socket_t *read; + mongoc_socket_t *write; + } socket_pair; + } impl; + + mongoc_stream_t *stream; +}; + +mongoc_stream_t * +_mongoc_interrupt_get_stream (mongoc_interrupt_t *interrupt) +{ + return interrupt->stream; +} + +static void +_log_errno (char *prefix, int _errno) +{ + char buf[128] = {0}; + + bson_strerror_r (_errno, buf, sizeof (buf)); + MONGOC_ERROR ("%s: (%d) %s", prefix, _errno, buf); +} + +#ifdef _WIN32 +/* TCP socket pair implementation. */ +mongoc_interrupt_t * +_mongoc_interrupt_new (uint32_t timeout_ms) +{ + mongoc_interrupt_t *interrupt; + mongoc_socket_t *listen_socket = NULL; + mongoc_socket_t *interrupt_socket = NULL; + struct sockaddr_storage server_addr; + mongoc_socklen_t sock_len; + int ret; + bool success = false; + struct sockaddr_in server_addr_in = {0}; + + ENTRY; + + interrupt = (mongoc_interrupt_t *) bson_malloc0 (sizeof *interrupt); + bson_mutex_init (&interrupt->mutex); + + /* Inspired by cpython's implementation of socketpair. */ + listen_socket = mongoc_socket_new (AF_INET, SOCK_STREAM, 0); + if (!listen_socket) { + MONGOC_ERROR ("socket creation failed"); + GOTO (fail); + } + + memset (&server_addr_in, 0, sizeof (server_addr_in)); + server_addr_in.sin_family = AF_INET; + server_addr_in.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + ret = mongoc_socket_bind (listen_socket, + (struct sockaddr *) &server_addr_in, + sizeof (server_addr_in)); + if (ret == -1) { + _log_errno ("bind failed", mongoc_socket_errno (listen_socket)); + GOTO (fail); + } + + ret = mongoc_socket_listen (listen_socket, 1); + if (ret == -1) { + _log_errno ("listen failed", mongoc_socket_errno (listen_socket)); + GOTO (fail); + } + + sock_len = sizeof (server_addr); + ret = mongoc_socket_getsockname ( + listen_socket, (struct sockaddr *) &server_addr, &sock_len); + if (-1 == ret) { + _log_errno ("getsockname failed", mongoc_socket_errno (listen_socket)); + GOTO (fail); + } + + interrupt->impl.socket_pair.read = + mongoc_socket_new (server_addr.ss_family, SOCK_STREAM, 0); + if (!interrupt->impl.socket_pair.read) { + MONGOC_ERROR ("socket creation failed"); + GOTO (fail); + } + + /* Begin non-blocking connect. */ + ret = mongoc_socket_connect (interrupt->impl.socket_pair.read, + (struct sockaddr *) &server_addr, + sock_len, + 0); + if (ret == -1 && + !MONGOC_ERRNO_IS_AGAIN ( + mongoc_socket_errno (interrupt->impl.socket_pair.read))) { + _log_errno ("connect failed", + mongoc_socket_errno (interrupt->impl.socket_pair.read)); + GOTO (fail); + } + + interrupt->impl.socket_pair.write = mongoc_socket_accept ( + listen_socket, bson_get_monotonic_time () + timeout_ms * 1000); + if (!interrupt->impl.socket_pair.write) { + _log_errno ("accept failed", mongoc_socket_errno (listen_socket)); + GOTO (fail); + } + + /* Create an unowned socket. interrupt_socket has 0 for the pid, so it will + * be considered unowned. */ + interrupt_socket = bson_malloc0 (sizeof (mongoc_socket_t)); + interrupt_socket->sd = interrupt->impl.socket_pair.read->sd; + /* Creating the stream takes ownership of the mongoc_socket_t. */ + interrupt->stream = mongoc_stream_socket_new (interrupt_socket); + success = true; +fail: + mongoc_socket_destroy (listen_socket); + if (!success) { + _mongoc_interrupt_destroy (interrupt); + interrupt = NULL; + } + RETURN (interrupt); +} + +void +_mongoc_interrupt_destroy (mongoc_interrupt_t *interrupt) +{ + if (!interrupt) { + return; + } + + bson_mutex_destroy (&interrupt->mutex); + mongoc_socket_destroy (interrupt->impl.socket_pair.read); + mongoc_socket_destroy (interrupt->impl.socket_pair.write); + mongoc_stream_destroy (interrupt->stream); + bson_free (interrupt); +} + +bool +_mongoc_interrupt_flush (mongoc_interrupt_t *interrupt) +{ + uint8_t buf[1]; + while (true) { + if (-1 == mongoc_socket_recv ( + interrupt->impl.socket_pair.read, buf, sizeof (buf), 0, 0)) { + if (MONGOC_ERRNO_IS_AGAIN (errno)) { + /* Nothing left to read. */ + return true; + } else { + /* Unexpected error. */ + _log_errno ("interrupt recv failed", + mongoc_socket_errno (interrupt->impl.socket_pair.read)); + return false; + } + } + } + /* Should never be reached. */ + BSON_ASSERT (false); +} + +bool +_mongoc_interrupt_interrupt (mongoc_interrupt_t *interrupt) +{ + bson_mutex_lock (&interrupt->mutex); + if (mongoc_socket_send (interrupt->impl.socket_pair.write, "!", 1, 0) == + -1 && + !MONGOC_ERRNO_IS_AGAIN (errno)) { + _log_errno ("interrupt send failed", + mongoc_socket_errno (interrupt->impl.socket_pair.write)); + bson_mutex_unlock (&interrupt->mutex); + return false; + } + bson_mutex_unlock (&interrupt->mutex); + return true; +} + +#else +/* Pipe implementation. */ + +/* Set non-blocking and close on exec. */ +static bool +_set_pipe_flags (int pipe_fd) +{ + int flags; + + flags = fcntl (pipe_fd, F_GETFL); + + if (-1 == fcntl (pipe_fd, F_SETFL, (flags | O_NONBLOCK))) { + return false; + } + +#ifdef FD_CLOEXEC + flags = fcntl (pipe_fd, F_GETFD); + if (-1 == fcntl (pipe_fd, F_SETFD, (flags | FD_CLOEXEC))) { + return false; + } +#endif + return true; +} + +mongoc_interrupt_t * +_mongoc_interrupt_new (uint32_t timeout_ms) +{ + mongoc_interrupt_t *interrupt; + mongoc_socket_t *interrupt_socket = NULL; + bool success = false; + + ENTRY; + + interrupt = (mongoc_interrupt_t *) bson_malloc0 (sizeof *interrupt); + bson_mutex_init (&interrupt->mutex); + + if (0 != pipe (interrupt->impl.pipe_fds)) { + _log_errno ("pipe creation failed", errno); + GOTO (fail); + } + + /* Make the pipe non-blocking and close-on-exec. */ + if (!_set_pipe_flags (interrupt->impl.pipe_fds[0]) || + !_set_pipe_flags (interrupt->impl.pipe_fds[1])) { + _log_errno ("unable to configure pipes", errno); + } + + /* Create an unowned socket. interrupt_socket has 0 for the pid, so it will + * be considered unowned. */ + interrupt_socket = bson_malloc0 (sizeof (mongoc_socket_t)); + interrupt_socket->sd = interrupt->impl.pipe_fds[0]; + /* Creating the stream takes ownership of the mongoc_socket_t. */ + interrupt->stream = mongoc_stream_socket_new (interrupt_socket); + + success = true; +fail: + if (!success) { + _mongoc_interrupt_destroy (interrupt); + interrupt = NULL; + } + RETURN (interrupt); +} + +bool +_mongoc_interrupt_flush (mongoc_interrupt_t *interrupt) +{ + char c; + while (true) { + if (read (interrupt->impl.pipe_fds[0], &c, 1) == -1) { + if (MONGOC_ERRNO_IS_AGAIN (errno)) { + /* Nothing left to read. */ + return true; + } else { + /* Unexpected error. */ + MONGOC_ERROR ("failed to read from pipe: %d", errno); + return false; + } + } + } + /* Should never be reached. */ + BSON_ASSERT (false); +} + +bool +_mongoc_interrupt_interrupt (mongoc_interrupt_t *interrupt) +{ + bson_mutex_lock (&interrupt->mutex); + if (write (interrupt->impl.pipe_fds[1], "!", 1) == -1 && + !MONGOC_ERRNO_IS_AGAIN (errno)) { + MONGOC_ERROR ("failed to write to pipe: %d", errno); + bson_mutex_unlock (&interrupt->mutex); + return false; + } + bson_mutex_unlock (&interrupt->mutex); + return true; +} + +void +_mongoc_interrupt_destroy (mongoc_interrupt_t *interrupt) +{ + if (!interrupt) { + return; + } + bson_mutex_destroy (&interrupt->mutex); + if (interrupt->impl.pipe_fds[0]) { + close (interrupt->impl.pipe_fds[0]); + } + if (interrupt->impl.pipe_fds[1]) { + close (interrupt->impl.pipe_fds[1]); + } + mongoc_stream_destroy (interrupt->stream); + bson_free (interrupt); +} +#endif diff --git a/src/libmongoc/src/mongoc/mongoc-socket.c b/src/libmongoc/src/mongoc/mongoc-socket.c index b73948ea917..344461f30fd 100644 --- a/src/libmongoc/src/mongoc/mongoc-socket.c +++ b/src/libmongoc/src/mongoc/mongoc-socket.c @@ -100,14 +100,14 @@ _mongoc_socket_setflags (int sd) #else int flags; - flags = fcntl (sd, F_GETFL, sd); + flags = fcntl (sd, F_GETFL); if (-1 == fcntl (sd, F_SETFL, (flags | O_NONBLOCK))) { return false; } #ifdef FD_CLOEXEC - flags = fcntl (sd, F_GETFD, sd); + flags = fcntl (sd, F_GETFD); if (-1 == fcntl (sd, F_SETFD, (flags | FD_CLOEXEC))) { return false; } diff --git a/src/libmongoc/tests/mock_server/future-value.c b/src/libmongoc/tests/mock_server/future-value.c index e9a34797ad2..7a51f74ff30 100644 --- a/src/libmongoc/tests/mock_server/future-value.c +++ b/src/libmongoc/tests/mock_server/future-value.c @@ -140,6 +140,20 @@ future_value_get_uint32_t (future_value_t *future_value) return future_value->value.uint32_t_value; } +void +future_value_set_void_ptr (future_value_t *future_value, void_ptr value) +{ + future_value->type = future_value_void_ptr_type; + future_value->value.void_ptr_value = value; +} + +void_ptr +future_value_get_void_ptr (future_value_t *future_value) +{ + BSON_ASSERT (future_value->type == future_value_void_ptr_type); + return future_value->value.void_ptr_value; +} + void future_value_set_const_char_ptr (future_value_t *future_value, const_char_ptr value) { diff --git a/src/libmongoc/tests/mock_server/future-value.h b/src/libmongoc/tests/mock_server/future-value.h index 6b4fd93efae..39658303a51 100644 --- a/src/libmongoc/tests/mock_server/future-value.h +++ b/src/libmongoc/tests/mock_server/future-value.h @@ -20,6 +20,7 @@ typedef char * char_ptr; typedef char ** char_ptr_ptr; +typedef void * void_ptr; typedef const char * const_char_ptr; typedef bson_error_t * bson_error_ptr; typedef bson_t * bson_ptr; @@ -58,6 +59,7 @@ typedef enum { future_value_size_t_type, future_value_ssize_t_type, future_value_uint32_t_type, + future_value_void_ptr_type, future_value_const_char_ptr_type, future_value_bson_error_ptr_type, future_value_bson_ptr_type, @@ -105,6 +107,7 @@ typedef struct _future_value_t size_t size_t_value; ssize_t ssize_t_value; uint32_t uint32_t_value; + void_ptr void_ptr_value; const_char_ptr const_char_ptr_value; bson_error_ptr bson_error_ptr_value; bson_ptr bson_ptr_value; @@ -224,6 +227,15 @@ uint32_t future_value_get_uint32_t ( future_value_t *future_value); +void +future_value_set_void_ptr( + future_value_t *future_value, + void_ptr value); + +void_ptr +future_value_get_void_ptr ( + future_value_t *future_value); + void future_value_set_const_char_ptr( future_value_t *future_value, diff --git a/src/libmongoc/tests/mock_server/future.c b/src/libmongoc/tests/mock_server/future.c index 4483768853c..c6a263cc0c1 100644 --- a/src/libmongoc/tests/mock_server/future.c +++ b/src/libmongoc/tests/mock_server/future.c @@ -121,6 +121,18 @@ future_get_uint32_t (future_t *future) abort (); } +void_ptr +future_get_void_ptr (future_t *future) +{ + if (future_wait (future)) { + return future_value_get_void_ptr (&future->return_value); + } + + fprintf (stderr, "%s timed out\n", BSON_FUNC); + fflush (stderr); + abort (); +} + const_char_ptr future_get_const_char_ptr (future_t *future) { diff --git a/src/libmongoc/tests/mock_server/future.h b/src/libmongoc/tests/mock_server/future.h index 379b2f14e76..a3b249b513b 100644 --- a/src/libmongoc/tests/mock_server/future.h +++ b/src/libmongoc/tests/mock_server/future.h @@ -66,6 +66,9 @@ future_get_ssize_t (future_t *future); uint32_t future_get_uint32_t (future_t *future); +void_ptr +future_get_void_ptr (future_t *future); + const_char_ptr future_get_const_char_ptr (future_t *future); diff --git a/src/libmongoc/tests/test-libmongoc.c b/src/libmongoc/tests/test-libmongoc.c index 2ae65dfb887..7da69b56449 100644 --- a/src/libmongoc/tests/test-libmongoc.c +++ b/src/libmongoc/tests/test-libmongoc.c @@ -244,8 +244,10 @@ extern void test_streamable_ismaster_install (TestSuite *suite); #ifdef MONGOC_ENABLE_OCSP_OPENSSL extern void -test_ocsp_cache_install(TestSuite *suite); +test_ocsp_cache_install (TestSuite *suite); #endif +extern void +test_interrupt_install (TestSuite *suite); typedef struct { mongoc_log_level_t level; @@ -2407,16 +2409,17 @@ windows_exception_handler (EXCEPTION_POINTERS *pExceptionInfo) /* Initialize stack walking. */ char exception_string[128]; bson_snprintf (exception_string, - sizeof(exception_string), + sizeof (exception_string), (exception_code == EXCEPTION_ACCESS_VIOLATION) - ? "(access violation)" - : "0x%08X", exception_code); + ? "(access violation)" + : "0x%08X", + exception_code); char address_string[32]; - bson_snprintf(address_string, - sizeof(address_string), - "0x%p", - pExceptionInfo->ExceptionRecord->ExceptionAddress); + bson_snprintf (address_string, + sizeof (address_string), + "0x%p", + pExceptionInfo->ExceptionRecord->ExceptionAddress); fprintf (stderr, "exception '%s' at '%s', terminating\n", @@ -2626,7 +2629,7 @@ main (int argc, char *argv[]) #ifdef MONGOC_ENABLE_OCSP_OPENSSL test_ocsp_cache_install (&suite); #endif - + test_interrupt_install (&suite); ret = TestSuite_Run (&suite); TestSuite_Destroy (&suite); diff --git a/src/libmongoc/tests/test-mongoc-interrupt.c b/src/libmongoc/tests/test-mongoc-interrupt.c new file mode 100644 index 00000000000..4af8db9be70 --- /dev/null +++ b/src/libmongoc/tests/test-mongoc-interrupt.c @@ -0,0 +1,151 @@ +/* + * Copyright 2020-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TestSuite.h" +#include "test-libmongoc.h" +#include "mock_server/mock-server.h" +#include "mock_server/future.h" +#include "mongoc/mongoc.h" +#include "mongoc/mongoc-interrupt-private.h" +#include "mongoc/mongoc-client-private.h" +#include "common-thread-private.h" + +static int64_t +_time_ms () +{ + return bson_get_monotonic_time () / 1000; +} + +BSON_THREAD_FUN (_interrupt, future_void) +{ + future_t *future; + future_value_t return_value; + mongoc_interrupt_t *interrupt; + + future = future_void; + interrupt = future_get_param (future, 0)->value.void_ptr_value; + _mongoc_usleep (10 * 1000); + _mongoc_interrupt_interrupt (interrupt); + return_value.type = future_value_void_type; + future_resolve (future, return_value); + BSON_THREAD_RETURN; +} + +/* Run an interrupt in a separate thread. */ +static future_t * +_future_interrupt (mongoc_interrupt_t *interrupt) +{ + future_t *future; + future_value_t *future_value; + + future = future_new (future_value_void_type, 1); + future_value = future_get_param (future, 0); + future_value_set_void_ptr (future_value, (void *) interrupt); + future_start (future, _interrupt); + return future; +} + +static void +test_interrupt (void) +{ + mock_server_t *server; + mongoc_interrupt_t *interrupt; + mongoc_stream_poll_t *poller; + uint64_t started_ms; + int i; + future_t *future; + const mongoc_uri_t *uri; + mongoc_stream_t *stream; + bson_error_t error; + + interrupt = _mongoc_interrupt_new (10000); + + /* Poll the interrupt for input. */ + poller = bson_malloc0 (sizeof (mongoc_stream_poll_t) * 1); + poller[0].stream = _mongoc_interrupt_get_stream (interrupt); + poller[0].events = POLLIN; + + /* Test that sending an interrupt before the poll executes quickly. */ + started_ms = _time_ms (); + poller[0].revents = 0; + _mongoc_interrupt_interrupt (interrupt); + mongoc_stream_poll (poller, 1, 10000); + _mongoc_interrupt_flush (interrupt); + ASSERT_CMPTIME (_time_ms () - started_ms, 10000); + + /* Test that an interrupt after polling executes quickly. */ + started_ms = _time_ms (); + poller[0].revents = 0; + future = _future_interrupt (interrupt); + mongoc_stream_poll (poller, 1, 10000); + _mongoc_interrupt_flush (interrupt); + ASSERT_CMPTIME (_time_ms () - started_ms, 10000); + future_wait (future); + future_destroy (future); + + /* Flushing with nothing queued up does not block. */ + started_ms = _time_ms (); + _mongoc_interrupt_flush (interrupt); + ASSERT_CMPTIME (_time_ms () - started_ms, 10000); + + /* Test interrupting while polling on another socket. */ + server = mock_server_new (); + mock_server_run (server); + uri = mock_server_get_uri (server); + stream = + mongoc_client_connect_tcp (10000, mongoc_uri_get_hosts (uri), &error); + ASSERT_OR_PRINT (stream, error); + + bson_free (poller); + poller = bson_malloc0 (sizeof (mongoc_stream_poll_t) * 2); + poller[0].stream = _mongoc_interrupt_get_stream (interrupt); + poller[0].events = POLLIN; + poller[1].stream = stream; + poller[1].events = POLLIN; + + for (i = 0; i < 10; i++) { + started_ms = _time_ms (); + _mongoc_interrupt_interrupt (interrupt); + mongoc_stream_poll (poller, 2, 10000); + ASSERT_CMPTIME (_time_ms () - started_ms, 10000); + } + + /* Swap the order of the streams polled. mongoc_stream_poll uses the poll + * function associated with the first stream. */ + poller[0].revents = 0; + poller[0].stream = stream; + poller[1].revents = 0; + poller[1].stream = _mongoc_interrupt_get_stream (interrupt); + + for (i = 0; i < 10; i++) { + started_ms = _time_ms (); + _mongoc_interrupt_interrupt (interrupt); + mongoc_stream_poll (poller, 2, 10000); + ASSERT_CMPTIME (_time_ms () - started_ms, 10000); + } + + mongoc_stream_destroy (stream); + + mock_server_destroy (server); + _mongoc_interrupt_destroy (interrupt); + bson_free (poller); +} + +void +test_interrupt_install (TestSuite *suite) +{ + TestSuite_AddMockServerTest (suite, "/interrupt", test_interrupt); +}