Skip to content

Commit

Permalink
RCORE-2190 Sync client can crash if a session is resumed before UNBIN…
Browse files Browse the repository at this point in the history
…D message finishes sending (#7874)

* Pulled 'make_client_reset_handler()' changes from role change feature branch
* Added test to reproduce and fix for the 'recognize_sync_version()' assert failure
* Moved new test under REALM_ENABLE_AUTH_TESTS define
* Reverted request_download_completion_notification() and updates from review
  • Loading branch information
Michael Wilkerson-Barker authored Jul 16, 2024
1 parent ad8ba53 commit e019c79
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
### Fixed
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
* FLX download progress was only updated when bootstraps completed, making it always be 0 before the first completion and then forever 1. ([PR #7869](https://github.com/realm/realm-core/issues/7869), since v14.10.2)
* Sync client can crash if a session is resumed while the session is being suspended. ([#7860](https://github.com/realm/realm-core/issues/7860), since v12.0.0)

### Breaking changes
* None.
Expand Down
9 changes: 4 additions & 5 deletions src/realm/sync/noinst/client_impl_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1318,11 +1318,10 @@ inline void ClientImpl::Session::recognize_sync_version(version_type version)

bool resume_upload = do_recognize_sync_version(version);
if (REALM_LIKELY(resume_upload)) {
// Since the deactivation process has not been initiated, the UNBIND
// message cannot have been sent unless the session was suspended due to
// an error.
REALM_ASSERT_3(m_suspended, ||, !m_unbind_message_sent);
if (m_ident_message_sent && !m_suspended)
// Don't attempt to send any updates before the IDENT message has been
// sent or after the UNBIND message has been sent or an error message
// was received.
if (m_ident_message_sent && !m_error_message_received && !m_unbind_message_sent)
ensure_enlisted_to_send(); // Throws
}
}
Expand Down
128 changes: 109 additions & 19 deletions test/object-store/sync/client_reset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,27 +232,40 @@ TEST_CASE("sync: pending client resets are cleared when downloads are complete",

FAIL(util::format("got error from server: %1", err.status));
};

auto realm = Realm::get_shared_realm(realm_config);
auto obj_id = ObjectId::gen();
{
realm->begin_transaction();
CppContext c(realm);
Object::create(
c, realm, "object",
std::any(AnyDict{{"_id", obj_id}, {"value", int64_t(5)}, {partition.property_name, partition.value}}));
realm->commit_transaction();
wait_for_upload(*realm);
auto realm = Realm::get_shared_realm(realm_config);
auto obj_id = ObjectId::gen();
{
realm->begin_transaction();
CppContext c(realm);
Object::create(c, realm, "object",
std::any(AnyDict{
{"_id", obj_id}, {"value", int64_t(5)}, {partition.property_name, partition.value}}));
realm->commit_transaction();
wait_for_upload(*realm);
}
wait_for_download(*realm, std::chrono::minutes(10));
reset_utils::trigger_client_reset(test_app_session.app_session(), realm);
}
{
// Reconnect and wait for the client reset to complete
auto [future, after_reset] = reset_utils::make_client_reset_handler();
realm_config.sync_config->notify_after_client_reset = after_reset;
auto realm = Realm::get_shared_realm(realm_config);
wait_for_download(*realm, std::chrono::minutes(10));
REQUIRE(future.is_ready());
REQUIRE(future.get_no_throw().is_ok());
reset_utils::trigger_client_reset(test_app_session.app_session(), realm);
}
{
// Reconnect and wait for the client reset to complete
auto [future, after_reset] = reset_utils::make_client_reset_handler();
realm_config.sync_config->notify_after_client_reset = after_reset;
auto realm = Realm::get_shared_realm(realm_config);
wait_for_download(*realm, std::chrono::minutes(10));
REQUIRE(future.is_ready());
REQUIRE(future.get_no_throw().is_ok());
}
wait_for_download(*realm, std::chrono::minutes(10));

reset_utils::trigger_client_reset(test_app_session.app_session(), realm);

wait_for_download(*realm, std::chrono::minutes(10));

reset_utils::trigger_client_reset(test_app_session.app_session(), realm);

wait_for_download(*realm, std::chrono::minutes(10));
}

TEST_CASE("sync: client reset", "[sync][pbs][client reset][baas]") {
Expand Down Expand Up @@ -1976,6 +1989,83 @@ TEST_CASE("sync: Client reset during async open", "[sync][pbs][client reset][baa
after_callback_called.future.get();
}

TEST_CASE("sync: fast reconnect during client reset session suspend", "[sync][pbs][baas][client reset]") {
// This test is for validating a fix where the flx migration tests were failing due to
// 'handle_reconnect()' being called while the current session was being suspended as
// a result of receiving an error to perform a client reset.
const reset_utils::Partition partition{"realm_id", random_string(20)};
Property partition_prop = {partition.property_name, PropertyType::String | PropertyType::Nullable};
Schema schema{
{"object",
{
{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
{"value", PropertyType::Int},
partition_prop,
}},
};

auto server_app_config = minimal_app_config("client_reset_suspend", schema);
server_app_config.partition_key = partition_prop;
TestAppSession test_app_session(create_app(server_app_config));
auto app = test_app_session.app();

create_user_and_log_in(app);
SyncTestFile realm_config(app->current_user(), partition.value, schema);
realm_config.sync_config->client_resync_mode = ClientResyncMode::Recover;
realm_config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError err) {
if (err.server_requests_action == sync::ProtocolErrorInfo::Action::Warning ||
err.server_requests_action == sync::ProtocolErrorInfo::Action::Transient) {
return;
}

FAIL(util::format("got error from server: %1", err.status));
};
auto create_obj = [&](SharedRealm& realm, int64_t value) {
auto obj_id = ObjectId::gen();
realm->begin_transaction();
CppContext c(realm);
Object::create(
c, realm, "object",
std::any(AnyDict{{"_id", obj_id}, {"value", value}, {partition.property_name, partition.value}}));
realm->commit_transaction();
return obj_id;
};

enum TestState { not_started, client_reset, suspend, complete };
TestingStateMachine<TestState> test_machine(TestState::not_started);
realm_config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> session_ptr,
const SyncClientHookData& data) {
if (data.event == SyncClientHookEvent::SessionSuspended) {
test_machine.transition_with([&](const TestState cur_state) -> std::optional<TestState> {
if (cur_state == TestState::client_reset) {
if (auto session = session_ptr.lock()) {
session->handle_reconnect();
}
return TestState::suspend;
}
return std::nullopt;
});
}
return SyncClientHookAction::NoAction;
};

auto realm = Realm::get_shared_realm(realm_config);
{
create_obj(realm, 5);
wait_for_upload(*realm);
}
wait_for_download(*realm);
realm->sync_session()->shutdown_and_wait();

reset_utils::trigger_client_reset(test_app_session.app_session(), realm);
test_machine.transition_to(TestState::client_reset);

realm->sync_session()->resume();
test_machine.wait_for(TestState::suspend);
create_obj(realm, 64);
wait_for_download(*realm);
}

#endif // REALM_ENABLE_AUTH_TESTS

namespace cf = realm::collection_fixtures;
Expand Down
39 changes: 15 additions & 24 deletions test/object-store/sync/flx_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ TEST_CASE("flx: test commands work", "[sync][flx][test command][baas]") {
});
}


static auto make_error_handler()
{
auto [error_promise, error_future] = util::make_promise_future<SyncError>();
Expand All @@ -268,17 +267,6 @@ static auto make_error_handler()
return std::make_pair(std::move(error_future), std::move(fn));
}

static auto make_client_reset_handler()
{
auto [reset_promise, reset_future] = util::make_promise_future<ClientResyncMode>();
auto shared_promise = std::make_shared<decltype(reset_promise)>(std::move(reset_promise));
auto fn = [reset_promise = std::move(shared_promise)](SharedRealm, ThreadSafeReference, bool did_recover) {
reset_promise->emplace_value(did_recover ? ClientResyncMode::Recover : ClientResyncMode::DiscardLocal);
};
return std::make_pair(std::move(reset_future), std::move(fn));
}


TEST_CASE("app: error handling integration test", "[sync][flx][baas]") {
static std::optional<FLXSyncTestHarness> harness{"error_handling"};
create_user_and_log_in(harness->app());
Expand Down Expand Up @@ -616,7 +604,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {

SECTION("Recover: offline writes and subscription (single subscription)") {
config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
auto&& [reset_future, reset_handler] = make_client_reset_handler();
auto&& [reset_future, reset_handler] = reset_utils::make_client_reset_handler();
config_local.sync_config->notify_after_client_reset = reset_handler;
auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
test_reset
Expand Down Expand Up @@ -711,7 +699,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {
RealmConfig config_copy = config_local;
config_copy.sync_config = std::make_shared<SyncConfig>(*config_copy.sync_config);
config_copy.sync_config->error_handler = nullptr;
auto&& [reset_future, reset_handler] = make_client_reset_handler();
auto&& [reset_future, reset_handler] = reset_utils::make_client_reset_handler();
config_copy.sync_config->notify_after_client_reset = reset_handler;

// Attempt to open the realm again.
Expand All @@ -728,7 +716,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {

SECTION("Recover: offline writes and subscriptions (multiple subscriptions)") {
config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
auto&& [reset_future, reset_handler] = make_client_reset_handler();
auto&& [reset_future, reset_handler] = reset_utils::make_client_reset_handler();
config_local.sync_config->notify_after_client_reset = reset_handler;
auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
test_reset
Expand Down Expand Up @@ -769,7 +757,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {

SECTION("Recover: offline writes interleaved with subscriptions and empty writes") {
config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
auto&& [reset_future, reset_handler] = make_client_reset_handler();
auto&& [reset_future, reset_handler] = reset_utils::make_client_reset_handler();
config_local.sync_config->notify_after_client_reset = reset_handler;
auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
test_reset
Expand Down Expand Up @@ -834,7 +822,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {

SECTION("Recover: offline writes with associated subscriptions in the correct order") {
config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
auto&& [reset_future, reset_handler] = make_client_reset_handler();
auto&& [reset_future, reset_handler] = reset_utils::make_client_reset_handler();
config_local.sync_config->notify_after_client_reset = reset_handler;
auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
constexpr size_t num_objects_added = 20;
Expand Down Expand Up @@ -1025,7 +1013,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {
->run();

RealmConfig config_copy = config_local;
auto&& [client_reset_future, reset_handler] = make_client_reset_handler();
auto&& [client_reset_future, reset_handler] = reset_utils::make_client_reset_handler();
config_copy.sync_config->error_handler = [](std::shared_ptr<SyncSession>, SyncError err) {
REALM_ASSERT_EX(!err.is_fatal, err.status);
CHECK(err.server_requests_action == sync::ProtocolErrorInfo::Action::Transient);
Expand All @@ -1044,7 +1032,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {

SECTION("DiscardLocal: offline writes and subscriptions are lost") {
config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
auto&& [reset_future, reset_handler] = make_client_reset_handler();
auto&& [reset_future, reset_handler] = reset_utils::make_client_reset_handler();
config_local.sync_config->notify_after_client_reset = reset_handler;
auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
test_reset
Expand Down Expand Up @@ -1091,7 +1079,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {

SECTION("DiscardLocal: an invalid subscription made while offline becomes superseded") {
config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
auto&& [reset_future, reset_handler] = make_client_reset_handler();
auto&& [reset_future, reset_handler] = reset_utils::make_client_reset_handler();
config_local.sync_config->notify_after_client_reset = reset_handler;
auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
std::unique_ptr<sync::SubscriptionSet> invalid_sub;
Expand Down Expand Up @@ -1162,7 +1150,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {

SECTION("DiscardLocal: completion callbacks fire after client reset even when there is no data to download") {
config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
auto&& [reset_future, reset_handler] = make_client_reset_handler();
auto&& [reset_future, reset_handler] = reset_utils::make_client_reset_handler();
config_local.sync_config->notify_after_client_reset = reset_handler;
auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
test_reset
Expand Down Expand Up @@ -1494,7 +1482,7 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {

SECTION("Recover: inserts in collections in mixed - collections cleared remotely") {
config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
auto&& [reset_future, reset_handler] = make_client_reset_handler();
auto&& [reset_future, reset_handler] = reset_utils::make_client_reset_handler();
config_local.sync_config->notify_after_client_reset = reset_handler;
auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
test_reset
Expand Down Expand Up @@ -2329,8 +2317,11 @@ TEST_CASE("flx: interrupted bootstrap restarts/recovers on reconnect", "[sync][f
realm->sync_session()->shutdown_and_wait();
}

// Verify that the file was fully closed
REQUIRE(DB::call_with_lock(interrupted_realm_config.path, [](auto&) {}));
{
// Verify that the file was fully closed
auto empty = [](auto&) {};
REQUIRE(DB::call_with_lock(interrupted_realm_config.path, empty));
}

{
DBOptions options;
Expand Down
11 changes: 11 additions & 0 deletions test/object-store/util/sync/sync_test_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,17 @@ void wait_for_num_objects_in_atlas(std::shared_ptr<app::User> user, const AppSes
std::chrono::minutes(15), std::chrono::milliseconds(500));
}

std::pair<util::Future<ClientResyncMode>, std::function<void(SharedRealm, ThreadSafeReference, bool)>>
make_client_reset_handler()
{
auto [reset_promise, reset_future] = util::make_promise_future<ClientResyncMode>();
auto shared_promise = std::make_shared<decltype(reset_promise)>(std::move(reset_promise));
auto fn = [reset_promise = std::move(shared_promise)](SharedRealm, ThreadSafeReference, bool did_recover) {
reset_promise->emplace_value(did_recover ? ClientResyncMode::Recover : ClientResyncMode::DiscardLocal);
};
return std::make_pair(std::move(reset_future), std::move(fn));
}

void trigger_client_reset(const AppSession& app_session, const SyncSession& sync_session)
{
auto file_ident = sync_session.get_file_ident();
Expand Down
4 changes: 4 additions & 0 deletions test/object-store/util/sync/sync_test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,12 @@ void wait_for_object_to_persist_to_atlas(std::shared_ptr<app::User> user, const
void wait_for_num_objects_in_atlas(std::shared_ptr<app::User> user, const AppSession& app_session,
const std::string& schema_name, size_t expected_size);

std::pair<util::Future<ClientResyncMode>, std::function<void(SharedRealm, ThreadSafeReference, bool)>>
make_client_reset_handler();

void trigger_client_reset(const AppSession& app_session, const SyncSession& sync_session);
void trigger_client_reset(const AppSession& app_session, const SharedRealm& realm);

#endif // REALM_ENABLE_AUTH_TESTS

#endif // REALM_ENABLE_SYNC
Expand Down

0 comments on commit e019c79

Please sign in to comment.