Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for sessions to drain during sync client shutdown #6293

Merged
merged 12 commits into from
Feb 17, 2023
2 changes: 1 addition & 1 deletion src/realm/object-store/sync/impl/sync_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ struct SyncClient {

void stop()
{
m_client.stop();
m_client.shutdown();
}

std::unique_ptr<sync::Session> make_session(std::shared_ptr<DB> db,
Expand Down
87 changes: 61 additions & 26 deletions src/realm/sync/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac
void initiate(ProtocolEnvelope, std::string server_address, port_type server_port, std::string virt_path,
std::string signed_access_token);

void force_close();

void nonsync_transact_notify(version_type new_version);
void cancel_reconnect_delay();

Expand Down Expand Up @@ -279,6 +281,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, public SyncTransac
// finalize_before_actualization() has already been called.
bool m_actualized = false;

bool m_force_closed = false;

bool m_suspended = false;

// Set to true when the first DOWNLOAD message is received to indicate that
Expand Down Expand Up @@ -396,7 +400,7 @@ ClientImpl::~ClientImpl()
// Since no other thread is allowed to be accessing this client or any of
// its subobjects at this time, no mutex locking is necessary.

drain();
shutdown_and_wait();
// Session wrappers are removed from m_unactualized_session_wrappers as they
// are abandoned.
REALM_ASSERT(m_stopped);
Expand Down Expand Up @@ -491,24 +495,17 @@ void ClientImpl::drain_connections_on_loop()
{
post([this](Status status) mutable {
REALM_ASSERT(status.is_ok());
actualize_and_finalize_session_wrappers();
drain_connections();
});
}

void ClientImpl::drain()
void ClientImpl::shutdown_and_wait()
{
stop();
{
std::lock_guard lock{m_drain_mutex};
if (m_drained) {
return;
}
}

drain_connections_on_loop();

shutdown();
std::unique_lock lock{m_drain_mutex};
if (m_drained) {
return;
}

logger.debug("Waiting for %1 connections to drain", m_num_connections);
m_drain_cv.wait(lock, [&] {
Expand All @@ -518,13 +515,17 @@ void ClientImpl::drain()
m_drained = true;
}

void ClientImpl::stop() noexcept
void ClientImpl::shutdown() noexcept
{
std::lock_guard lock{m_mutex};
if (m_stopped)
return;
m_stopped = true;
m_wait_or_client_stopped_cond.notify_all();
{
std::lock_guard lock{m_mutex};
if (m_stopped)
return;
m_stopped = true;
m_wait_or_client_stopped_cond.notify_all();
}

drain_connections_on_loop();
}


Expand Down Expand Up @@ -584,18 +585,27 @@ void ClientImpl::actualize_and_finalize_session_wrappers()
{
std::map<SessionWrapper*, ServerEndpoint> unactualized_session_wrappers;
SessionWrapperStack abandoned_session_wrappers;
bool stopped;
{
std::lock_guard lock{m_mutex};
m_actualize_and_finalize_needed = false;
swap(m_unactualized_session_wrappers, unactualized_session_wrappers);
swap(m_abandoned_session_wrappers, abandoned_session_wrappers);
stopped = m_stopped;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this since stopped is now an atomic? You can read m_stopped directly on line 602 and avoid a tiny window where m_stopped might be changed between here and there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided not to make m_stopped atomic after all. I think since we're threading force_closed states down into sessions now, we can just use that to efficiently check if it's safe to call callbacks etc rather than introducing an atomic bool.

}
// Note, we need to finalize old session wrappers before we actualize new
// ones. This ensures that deactivation of old sessions is initiated before
// new session are activated. This, in turn, ensures that the server does
// not see two overlapping sessions for the same local Realm file.
while (util::bind_ptr<SessionWrapper> wrapper = abandoned_session_wrappers.pop())
wrapper->finalize(); // Throws
if (stopped) {
for (auto& p : unactualized_session_wrappers) {
SessionWrapper& wrapper = *p.first;
wrapper.finalize_before_actualization();
}
return;
}
for (auto& p : unactualized_session_wrappers) {
SessionWrapper& wrapper = *p.first;
ServerEndpoint server_endpoint = std::move(p.second);
Expand Down Expand Up @@ -668,6 +678,7 @@ void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept

{
std::lock_guard lk(m_drain_mutex);
REALM_ASSERT(m_num_connections);
--m_num_connections;
m_drain_cv.notify_all();
}
Expand All @@ -676,6 +687,10 @@ void ClientImpl::remove_connection(ClientImpl::Connection& conn) noexcept

// ################ SessionImpl ################

void SessionImpl::force_close()
{
m_wrapper.force_close();
}

void SessionImpl::on_connection_state_changed(ConnectionState state,
const util::Optional<SessionErrorInfo>& error_info)
Expand Down Expand Up @@ -1500,19 +1515,35 @@ void SessionWrapper::actualize(ServerEndpoint endpoint)
report_progress(); // Throws
}


// Must be called from event loop thread
void SessionWrapper::finalize()
void SessionWrapper::force_close()
{
REALM_ASSERT(m_actualized);
REALM_ASSERT(m_sess);
m_force_closed = true;

ClientImpl::Connection& conn = m_sess->get_connection();
conn.initiate_session_deactivation(m_sess); // Throws

// Delete the pending bootstrap store since it uses a reference to the logger in m_sess
m_flx_pending_bootstrap_store.reset();
m_sess = nullptr;
m_connection_state_change_listener = {};
}

// Must be called from event loop thread
void SessionWrapper::finalize()
{
REALM_ASSERT(m_actualized);

if (!m_force_closed) {
REALM_ASSERT(m_sess);
ClientImpl::Connection& conn = m_sess->get_connection();
conn.initiate_session_deactivation(m_sess); // Throws

// Delete the pending bootstrap store since it uses a reference to the logger in m_sess
m_flx_pending_bootstrap_store.reset();
m_sess = nullptr;
}

// The Realm file can be closed now, as no access to the Realm file is
// supposed to happen on behalf of a session after initiation of
Expand Down Expand Up @@ -1548,6 +1579,7 @@ void SessionWrapper::finalize()
inline void SessionWrapper::finalize_before_actualization() noexcept
{
m_actualized = true;
m_force_closed = true;
}


Expand Down Expand Up @@ -1848,6 +1880,9 @@ std::string ClientImpl::Connection::make_logger_prefix(connection_ident_type ide
void ClientImpl::Connection::report_connection_state_change(ConnectionState state,
util::Optional<SessionErrorInfo> error_info)
{
if (m_force_closed) {
return;
}
auto handler = [=](ClientImpl::Session& sess) {
SessionImpl& sess_2 = static_cast<SessionImpl&>(sess);
sess_2.on_connection_state_changed(state, error_info); // Throws
Expand All @@ -1871,14 +1906,14 @@ Client::Client(Client&& client) noexcept
Client::~Client() noexcept {}


void Client::stop() noexcept
void Client::shutdown() noexcept
{
m_impl->stop();
m_impl->shutdown();
}

void Client::drain()
void Client::shutdown_and_wait()
{
m_impl->drain();
m_impl->shutdown_and_wait();
}

void Client::cancel_reconnect_delay()
Expand Down
6 changes: 3 additions & 3 deletions src/realm/sync/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ class Client {
/// See run().
///
/// Thread-safe.
void stop() noexcept;
void shutdown() noexcept;

/// Forces all connections to close and waits for any pending work on the event
/// loop to complete. All sessions must be destroyed before calling drain.
void drain();
/// loop to complete. All sessions must be destroyed before calling shutdown_and_wait.
void shutdown_and_wait();

/// \brief Cancel current or next reconnect delay for all servers.
///
Expand Down
Loading