diff --git a/CHANGELOG.md b/CHANGELOG.md index 48890fcd3cc..adf42ea5d3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,7 @@ * None. ### Fixed -* ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?) -* None. +* Sync client may [have failed an assertion](https://github.com/realm/realm-core/blob/006660c8d20c4941d3838f74aec6f3561ebf6784/src/realm/sync/noinst/client_impl_base.cpp#L388) during shutdown if all sessions hadn't been ready to finalize by the time the Client destructor ran. (PR [#6293](https://github.com/realm/realm-core/pull/6293), since v13.4.1) ### Breaking changes * None. diff --git a/src/realm/object-store/sync/impl/sync_client.hpp b/src/realm/object-store/sync/impl/sync_client.hpp index 4e3be582242..3fe5e994635 100644 --- a/src/realm/object-store/sync/impl/sync_client.hpp +++ b/src/realm/object-store/sync/impl/sync_client.hpp @@ -95,7 +95,7 @@ struct SyncClient { void stop() { - m_client.stop(); + m_client.shutdown(); } std::unique_ptr make_session(std::shared_ptr db, diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index eb43ef82b34..24b6b57ec24 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -192,6 +192,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac void initiate(ProtocolEnvelope, std::string server_address, port_type server_port, std::string virt_path, std::string signed_access_token); + void force_close(); + void nonsync_transact_notify(version_type new_version); void cancel_reconnect_delay(); @@ -279,6 +281,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac // finalize_before_actualization() has already been called. bool m_actualized = false; + bool m_force_closed = false; + bool m_suspended = false; // Set to true when the first DOWNLOAD message is received to indicate that @@ -396,7 +400,7 @@ ClientImpl::~ClientImpl() // Since no other thread is allowed to be accessing this client or any of // its subobjects at this time, no mutex locking is necessary. - drain(); + shutdown_and_wait(); // Session wrappers are removed from m_unactualized_session_wrappers as they // are abandoned. REALM_ASSERT(m_stopped); @@ -491,24 +495,17 @@ void ClientImpl::drain_connections_on_loop() { post([this](Status status) mutable { REALM_ASSERT(status.is_ok()); - actualize_and_finalize_session_wrappers(); drain_connections(); }); } -void ClientImpl::drain() +void ClientImpl::shutdown_and_wait() { - stop(); - { - std::lock_guard lock{m_drain_mutex}; - if (m_drained) { - return; - } - } - - drain_connections_on_loop(); - + shutdown(); std::unique_lock lock{m_drain_mutex}; + if (m_drained) { + return; + } logger.debug("Waiting for %1 connections to drain", m_num_connections); m_drain_cv.wait(lock, [&] { @@ -518,13 +515,17 @@ void ClientImpl::drain() m_drained = true; } -void ClientImpl::stop() noexcept +void ClientImpl::shutdown() noexcept { - std::lock_guard lock{m_mutex}; - if (m_stopped) - return; - m_stopped = true; - m_wait_or_client_stopped_cond.notify_all(); + { + std::lock_guard lock{m_mutex}; + if (m_stopped) + return; + m_stopped = true; + m_wait_or_client_stopped_cond.notify_all(); + } + + drain_connections_on_loop(); } @@ -584,11 +585,13 @@ void ClientImpl::actualize_and_finalize_session_wrappers() { std::map unactualized_session_wrappers; SessionWrapperStack abandoned_session_wrappers; + bool stopped; { std::lock_guard lock{m_mutex}; m_actualize_and_finalize_needed = false; swap(m_unactualized_session_wrappers, unactualized_session_wrappers); swap(m_abandoned_session_wrappers, abandoned_session_wrappers); + stopped = m_stopped; } // Note, we need to finalize old session wrappers before we actualize new // ones. This ensures that deactivation of old sessions is initiated before @@ -596,6 +599,13 @@ void ClientImpl::actualize_and_finalize_session_wrappers() // not see two overlapping sessions for the same local Realm file. while (util::bind_ptr wrapper = abandoned_session_wrappers.pop()) wrapper->finalize(); // Throws + if (stopped) { + for (auto& p : unactualized_session_wrappers) { + SessionWrapper& wrapper = *p.first; + wrapper.finalize_before_actualization(); + } + return; + } for (auto& p : unactualized_session_wrappers) { SessionWrapper& wrapper = *p.first; ServerEndpoint server_endpoint = std::move(p.second); @@ -668,6 +678,7 @@ void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept { std::lock_guard lk(m_drain_mutex); + REALM_ASSERT(m_num_connections); --m_num_connections; m_drain_cv.notify_all(); } @@ -676,6 +687,10 @@ void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept // ################ SessionImpl ################ +void SessionImpl::force_close() +{ + m_wrapper.force_close(); +} void SessionImpl::on_connection_state_changed(ConnectionState state, const util::Optional& error_info) @@ -1500,12 +1515,11 @@ void SessionWrapper::actualize(ServerEndpoint endpoint) report_progress(); // Throws } - -// Must be called from event loop thread -void SessionWrapper::finalize() +void SessionWrapper::force_close() { REALM_ASSERT(m_actualized); REALM_ASSERT(m_sess); + m_force_closed = true; ClientImpl::Connection& conn = m_sess->get_connection(); conn.initiate_session_deactivation(m_sess); // Throws @@ -1513,6 +1527,23 @@ void SessionWrapper::finalize() // Delete the pending bootstrap store since it uses a reference to the logger in m_sess m_flx_pending_bootstrap_store.reset(); m_sess = nullptr; + m_connection_state_change_listener = {}; +} + +// Must be called from event loop thread +void SessionWrapper::finalize() +{ + REALM_ASSERT(m_actualized); + + if (!m_force_closed) { + REALM_ASSERT(m_sess); + ClientImpl::Connection& conn = m_sess->get_connection(); + conn.initiate_session_deactivation(m_sess); // Throws + + // Delete the pending bootstrap store since it uses a reference to the logger in m_sess + m_flx_pending_bootstrap_store.reset(); + m_sess = nullptr; + } // The Realm file can be closed now, as no access to the Realm file is // supposed to happen on behalf of a session after initiation of @@ -1548,6 +1579,7 @@ void SessionWrapper::finalize() inline void SessionWrapper::finalize_before_actualization() noexcept { m_actualized = true; + m_force_closed = true; } @@ -1848,6 +1880,9 @@ std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ide void ClientImpl::Connection::report_connection_state_change(ConnectionState state, util::Optional error_info) { + if (m_force_closed) { + return; + } auto handler = [=](ClientImpl::Session& sess) { SessionImpl& sess_2 = static_cast(sess); sess_2.on_connection_state_changed(state, error_info); // Throws @@ -1871,14 +1906,14 @@ Client::Client(Client&& client) noexcept Client::~Client() noexcept {} -void Client::stop() noexcept +void Client::shutdown() noexcept { - m_impl->stop(); + m_impl->shutdown(); } -void Client::drain() +void Client::shutdown_and_wait() { - m_impl->drain(); + m_impl->shutdown_and_wait(); } void Client::cancel_reconnect_delay() diff --git a/src/realm/sync/client.hpp b/src/realm/sync/client.hpp index b743ccf36d9..2cf532e8956 100644 --- a/src/realm/sync/client.hpp +++ b/src/realm/sync/client.hpp @@ -46,11 +46,11 @@ class Client { /// See run(). /// /// Thread-safe. - void stop() noexcept; + void shutdown() noexcept; /// Forces all connections to close and waits for any pending work on the event - /// loop to complete. All sessions must be destroyed before calling drain. - void drain(); + /// loop to complete. All sessions must be destroyed before calling shutdown_and_wait. + void shutdown_and_wait(); /// \brief Cancel current or next reconnect delay for all servers. /// diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index 29e7805e3c9..44770477454 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -228,6 +228,7 @@ void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler) handler(status); std::lock_guard lock(m_drain_mutex); + REALM_ASSERT(m_outstanding_posts); --m_outstanding_posts; m_drain_cv.notify_all(); }); @@ -266,6 +267,7 @@ SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds handler(status); std::lock_guard lock(m_drain_mutex); + REALM_ASSERT(m_outstanding_posts); --m_outstanding_posts; m_drain_cv.notify_all(); }); @@ -301,6 +303,7 @@ void Connection::activate() void Connection::activate_session(std::unique_ptr sess) { REALM_ASSERT(&sess->m_conn == this); + REALM_ASSERT(!m_force_closed); Session& sess_2 = *sess; session_ident_type ident = sess->m_ident; auto p = m_sessions.emplace(ident, std::move(sess)); // Throws @@ -318,15 +321,14 @@ void Connection::activate_session(std::unique_ptr sess) void Connection::initiate_session_deactivation(Session* sess) { REALM_ASSERT(&sess->m_conn == this); + REALM_ASSERT(m_num_active_sessions); if (REALM_UNLIKELY(--m_num_active_sessions == 0)) { if (m_activated && m_state == ConnectionState::disconnected) m_on_idle->trigger(); } sess->initiate_deactivation(); // Throws if (sess->m_state == Session::Deactivated) { - // Session is now deactivated, so remove and destroy it. - session_ident_type ident = sess->m_ident; - m_sessions.erase(ident); + finish_session_deactivation(sess); } } @@ -376,23 +378,47 @@ void Connection::cancel_reconnect_delay() // soon as there are any sessions that are both active and unsuspended. } +void ClientImpl::Connection::finish_session_deactivation(Session* sess) +{ + REALM_ASSERT(sess->m_state == Session::Deactivated); + auto ident = sess->m_ident; + m_sessions.erase(ident); +} void Connection::force_close() { - if (m_disconnect_delay_in_progress || m_reconnect_delay_in_progress) { + if (m_force_closed) { + return; + } + + m_force_closed = true; + + if (m_state != ConnectionState::disconnected) { + voluntary_disconnect(); + } + + REALM_ASSERT(m_state == ConnectionState::disconnected); + if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) { m_reconnect_disconnect_timer.reset(); - m_disconnect_delay_in_progress = false; m_reconnect_delay_in_progress = false; + m_disconnect_delay_in_progress = false; } - REALM_ASSERT(m_num_active_unsuspended_sessions == 0); - REALM_ASSERT(m_num_active_sessions == 0); - if (m_state == ConnectionState::disconnected) { - return; + // We must copy any session pointers we want to close to a vector because force_closing + // the session may remove it from m_sessions and invalidate the iterator uses to loop + // through the map. By copying to a separate vector we ensure our iterators remain valid. + std::vector to_close; + for (auto& session_pair : m_sessions) { + if (session_pair.second->m_state == Session::State::Active) { + to_close.push_back(session_pair.second.get()); + } + } + + for (auto& sess : to_close) { + sess->force_close(); } - voluntary_disconnect(); - logger.info("Force disconnected"); + logger.debug("Force closed idle connection"); } @@ -436,6 +462,10 @@ void Connection::websocket_connected_handler(const std::string& protocol) bool Connection::websocket_binary_message_received(util::Span data) { + if (m_force_closed) { + logger.debug("Received binary message after connection was force closed"); + return false; + } std::error_code ec; using sf = SimulatedFailure; if (sf::trigger(sf::sync_client__read_head, ec)) { @@ -456,6 +486,10 @@ void Connection::websocket_error_handler() bool Connection::websocket_closed_handler(bool was_clean, Status status) { + if (m_force_closed) { + logger.debug("Received websocket close message after connection was force closed"); + return false; + } logger.info("Closing the websocket with status='%1', was_clean='%2'", status, was_clean); // Return early. if (status.is_ok()) { @@ -544,6 +578,11 @@ void Connection::initiate_reconnect_wait() REALM_ASSERT(!m_reconnect_delay_in_progress); REALM_ASSERT(!m_disconnect_delay_in_progress); + // If we've been force closed then we don't need/want to reconnect. Just return early here. + if (m_force_closed) { + return; + } + using milliseconds_lim = ReconnectInfo::milliseconds_lim; constexpr milliseconds_type min_delay = 1000; // 1 second (barring deductions) @@ -1014,9 +1053,7 @@ void Connection::handle_write_message() { m_sending_session->message_sent(); // Throws if (m_sending_session->m_state == Session::Deactivated) { - // Session is now deactivated, so remove and destroy it. - session_ident_type ident = m_sending_session->m_ident; - m_sessions.erase(ident); + finish_session_deactivation(m_sending_session); } m_sending_session = nullptr; m_sending = false; @@ -1044,9 +1081,7 @@ void Connection::send_next_message() sess.send_message(); // Throws if (sess.m_state == Session::Deactivated) { - // Session is now deactivated, so remove and destroy it. - session_ident_type ident = sess.m_ident; - m_sessions.erase(ident); + finish_session_deactivation(&sess); } // An enlisted session may choose to not send a message. In that case, @@ -1328,9 +1363,7 @@ void Connection::receive_error_message(const ProtocolErrorInfo& info, session_id } if (sess->m_state == Session::Deactivated) { - // Session is now deactivated, so remove and destroy it. - session_ident_type ident = sess->m_ident; - m_sessions.erase(ident); + finish_session_deactivation(sess); } return; } @@ -1444,9 +1477,7 @@ void Connection::receive_unbound_message(session_ident_type session_ident) } if (sess->m_state == Session::Deactivated) { - // Session is now deactivated, so remove and destroy it. - session_ident_type ident = sess->m_ident; - m_sessions.erase(ident); + finish_session_deactivation(sess); } } @@ -1733,8 +1764,9 @@ void Session::activate() } catch (const IntegrationException& error) { logger.error("Error integrating bootstrap changesets: %1", error.what()); - on_suspended(SessionErrorInfo{error.code(), false}); + m_suspended = true; m_conn.one_less_active_unsuspended_session(); // Throws + on_suspended(SessionErrorInfo{error.code(), false}); } if (has_pending_client_reset) { diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index ba709a6e6ac..b2b59f2d566 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -136,10 +136,9 @@ class ClientImpl { static constexpr int get_oldest_supported_protocol_version() noexcept; - /// This calls stop() on the socket provider respectively. - void stop() noexcept; + void shutdown() noexcept; - void drain(); + void shutdown_and_wait(); const std::string& get_user_agent_string() const noexcept; ReconnectMode get_reconnect_mode() const noexcept; @@ -502,6 +501,7 @@ class ClientImpl::Connection { void enlist_to_send(Session*); void one_more_active_unsuspended_session(); void one_less_active_unsuspended_session(); + void finish_session_deactivation(Session* sess); OutputBuffer& get_output_buffer() noexcept; Session* get_session(session_ident_type) const noexcept; @@ -569,6 +569,8 @@ class ClientImpl::Connection { bool m_websocket_error_received = false; + bool m_force_closed = false; + // The timer will be constructed on demand, and will only be destroyed when // canceling a reconnect or disconnect delay. // @@ -790,6 +792,8 @@ class ClientImpl::Session { Session(SessionWrapper&, ClientImpl::Connection&); ~Session(); + void force_close(); + util::Future send_test_command(std::string body); private: @@ -1262,8 +1266,10 @@ inline void ClientImpl::Connection::one_more_active_unsuspended_session() inline void ClientImpl::Connection::one_less_active_unsuspended_session() { + REALM_ASSERT(m_num_active_unsuspended_sessions); if (--m_num_active_unsuspended_sessions != 0) return; + // Dropped from one to zero if (m_state != ConnectionState::disconnected) initiate_disconnect_wait(); // Throws diff --git a/test/sync_fixtures.hpp b/test/sync_fixtures.hpp index ca1f656439f..4a255e87320 100644 --- a/test/sync_fixtures.hpp +++ b/test/sync_fixtures.hpp @@ -566,7 +566,9 @@ class MultiClientServerFixture { { unit_test::TestContext& test_context = m_test_context; stop(); - m_clients.clear(); + for (int i = 0; i < m_num_clients; ++i) { + m_clients[i]->shutdown_and_wait(); + } m_client_socket_providers.clear(); for (int i = 0; i < m_num_servers; ++i) { if (m_server_threads[i].joinable()) @@ -668,13 +670,13 @@ class MultiClientServerFixture { }); } // We can't wait for clearing the simulated failure since some tests stop the client early - client.drain(); + client.shutdown_and_wait(); } void stop() { for (int i = 0; i < m_num_clients; ++i) - m_clients[i]->stop(); + m_clients[i]->shutdown(); for (int i = 0; i < m_num_servers; ++i) m_servers[i]->stop(); } diff --git a/test/test_sync.cpp b/test/test_sync.cpp index fa2149fdfc3..4de93182365 100644 --- a/test/test_sync.cpp +++ b/test/test_sync.cpp @@ -2273,7 +2273,7 @@ TEST_IF(Sync_ReadOnlyClient, false) fixture.set_client_side_error_handler(1, [&](std::error_code ec, bool, const std::string&) { CHECK_EQUAL(ProtocolError::permission_denied, ec); did_get_permission_denied = true; - fixture.get_client(1).stop(); + fixture.get_client(1).shutdown(); }); fixture.start(); @@ -2980,7 +2980,7 @@ TEST_IF(Sync_SSL_Certificate_Verify_Callback_External, false) " preverify_ok = %4, depth = %5", server_address, server_port, pem, preverify_ok, depth); if (depth == 0) - client.stop(); + client.shutdown(); return true; }; @@ -2996,7 +2996,7 @@ TEST_IF(Sync_SSL_Certificate_Verify_Callback_External, false) session.bind(); session.wait_for_download_complete_or_client_stopped(); - client.drain(); + client.shutdown_and_wait(); } #endif // REALM_HAVE_OPENSSL @@ -3154,7 +3154,7 @@ TEST(Sync_UploadDownloadProgress_1) return cond_var_signaled; }); - client.stop(); + client.shutdown(); CHECK_EQUAL(number_of_handler_calls, 1); } } @@ -3698,7 +3698,7 @@ TEST(Sync_UploadDownloadProgress_6) session->bind(); } - client.drain(); + client.shutdown_and_wait(); server.stop(); server_thread.join(); @@ -5035,7 +5035,7 @@ TEST_IF(Sync_SSL_Certificates, false) error_info->error_code, error_info->is_fatal(), error_info->message); // We expect to get through the SSL handshake but will hit an error due to the wrong token. CHECK_NOT_EQUAL(error_info->error_code, Client::Error::ssl_server_cert_rejected); - client.stop(); + client.shutdown(); } };