Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions src/libmongoc/tests/mock_server/mock-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,13 @@ struct _autoresponder_handle_t {
int id;
};

typedef enum {
REPLY, HANGUP, RESET
} reply_type_t;


typedef struct {
reply_type_t type;
mongoc_reply_flags_t flags;
bson_t *docs;
int n_docs;
Expand Down Expand Up @@ -1218,12 +1223,14 @@ mock_server_receives_kill_cursors (mock_server_t *server, int64_t cursor_id)
void
mock_server_hangs_up (request_t *request)
{
reply_t *reply;
test_suite_mock_server_log ("%5.2f %hu <- %hu \thang up!",
mock_server_get_uptime_sec (request->server),
request->client_port,
request_get_server_port (request));

mongoc_stream_close (request->client);
reply = bson_malloc0 (sizeof (reply_t));
reply->type = HANGUP;
q_put (request->replies, reply);
Copy link
Collaborator Author

@kevinAlbs kevinAlbs May 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior of mock_server_hangs_up was to close the server side stream from the main thread. TSan flags a data race because in the process of closing, it overwrites the stream's file descriptor with an invalid one, causing a write/read race.

The mock server maintains a worker thread for each server-side connection. And each worker thread maintains a thread-safe queue for sending replies back. So instead of closing the stream directly, this pushes a special reply onto the queue to signal the worker thread to close the stream.

}


Expand All @@ -1245,20 +1252,15 @@ mock_server_hangs_up (request_t *request)
void
mock_server_resets (request_t *request)
{
struct linger no_linger;
no_linger.l_onoff = 1;
no_linger.l_linger = 0;

reply_t *reply;
test_suite_mock_server_log ("%5.2f %hu <- %hu \treset!",
mock_server_get_uptime_sec (request->server),
request->client_port,
request_get_server_port (request));

/* send RST packet to client */
mongoc_stream_setsockopt (
request->client, SOL_SOCKET, SO_LINGER, &no_linger, sizeof no_linger);

mongoc_stream_close (request->client);
reply = bson_malloc0 (sizeof (reply_t));
reply->type = RESET;
q_put (request->replies, reply);
}


Expand Down Expand Up @@ -1331,7 +1333,6 @@ mock_server_replies_simple (request_t *request, const char *docs_json)
mock_server_replies (request, MONGOC_REPLY_NONE, 0, 0, 1, docs_json);
}


/*--------------------------------------------------------------------------
*
* mock_server_replies_ok_and_destroys --
Expand Down Expand Up @@ -1801,6 +1802,7 @@ mock_server_reply_multi (request_t *request,

reply = bson_malloc0 (sizeof (reply_t));

reply->type = REPLY;
reply->flags = flags;
reply->n_docs = n_docs;
reply->docs = bson_malloc0 (n_docs * sizeof (bson_t));
Expand Down Expand Up @@ -1837,12 +1839,26 @@ _mock_server_reply_with_stream (mock_server_t *server,
uint8_t *ptr;
size_t len;
bool is_op_msg;

mongoc_reply_flags_t flags = reply->flags;
const bson_t *docs = reply->docs;
int n_docs = reply->n_docs;
int64_t cursor_id = reply->cursor_id;

if (reply->type == HANGUP) {
mongoc_stream_close (client);
return;
} else if (reply->type == RESET) {
struct linger no_linger;
no_linger.l_onoff = 1;
no_linger.l_linger = 0;

/* send RST packet to client */
mongoc_stream_setsockopt (client, SOL_SOCKET, SO_LINGER, &no_linger, sizeof no_linger);

mongoc_stream_close (client);
return;
}

docs_json = bson_string_new ("");
for (i = 0; i < n_docs; i++) {
doc_json = bson_as_json (&docs[i], NULL);
Expand Down
3 changes: 2 additions & 1 deletion src/libmongoc/tests/test-libmongoc.c
Original file line number Diff line number Diff line change
Expand Up @@ -407,13 +407,14 @@ log_handler (mongoc_log_level_t log_level,
suite = (TestSuite *) user_data;

if (log_level < MONGOC_LOG_LEVEL_INFO) {
bson_mutex_lock (&captured_logs_mutex);
if (capturing_logs) {
log_entry = log_entry_create (log_level, message);
bson_mutex_lock (&captured_logs_mutex);
_mongoc_array_append_val (&captured_logs, log_entry);
bson_mutex_unlock (&captured_logs_mutex);
return;
}
bson_mutex_unlock (&captured_logs_mutex);

if (!suite->silent) {
mongoc_log_default_handler (log_level, log_domain, message, NULL);
Expand Down