Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload data and migrate schema #6944

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/realm/error_codes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ ErrorCategory ErrorCodes::error_categories(Error code)
case SyncServerPermissionsChanged:
case SyncUserMismatch:
case SyncWriteNotAllowed:
case SyncSchemaMigrationError:
return ErrorCategory().set(ErrorCategory::runtime_error).set(ErrorCategory::sync_error);

case SyncConnectFailed:
Expand Down
3 changes: 3 additions & 0 deletions src/realm/error_codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ typedef enum realm_errno {
RLM_ERR_TLS_HANDSHAKE_FAILED = 1041,
RLM_ERR_WRONG_SYNC_TYPE = 1042,
RLM_ERR_SYNC_WRITE_NOT_ALLOWED = 1043,
RLM_ERR_SYNC_SCHEMA_MIGRATION_ERROR = 1044,

RLM_ERR_SYSTEM_ERROR = 1999,

Expand Down Expand Up @@ -261,6 +262,8 @@ typedef enum realm_sync_errno_session {
RLM_SYNC_ERR_SESSION_MIGRATE_TO_FLX = 232,
RLM_SYNC_ERR_SESSION_BAD_PROGRESS = 233,
RLM_SYNC_ERR_SESSION_REVERT_TO_PBS = 234,
RLM_SYNC_ERR_SESSION_BAD_SCHEMA_VERSION = 235,
RLM_SYNC_ERR_SESSION_SCHEMA_VERSION_CHANGED = 236,
// Error code 299 is reserved as an "unknown session error" in tests
} realm_sync_errno_session_e;

Expand Down
1 change: 1 addition & 0 deletions src/realm/error_codes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ class ErrorCodes {
TlsHandshakeFailed = RLM_ERR_TLS_HANDSHAKE_FAILED,
WrongSyncType = RLM_ERR_WRONG_SYNC_TYPE,
SyncWriteNotAllowed = RLM_ERR_SYNC_WRITE_NOT_ALLOWED,
SyncSchemaMigrationError = RLM_ERR_SYNC_SCHEMA_MIGRATION_ERROR,

SystemError = RLM_ERR_SYSTEM_ERROR,

Expand Down
7 changes: 7 additions & 0 deletions src/realm/object-store/impl/realm_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,13 @@ void RealmCoordinator::delete_and_reopen()
util::CheckedLockGuard lock(m_realm_mutex);
close();
util::File::remove(m_config.path);
#if REALM_ENABLE_SYNC
// Close the sync session.
if (m_sync_session) {
m_sync_session->force_close();
m_sync_session = nullptr;
}
#endif
open_db();
}

Expand Down
16 changes: 12 additions & 4 deletions src/realm/object-store/shared_realm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ bool Realm::schema_change_needs_write_transaction(Schema& schema, std::vector<Sc

switch (m_config.schema_mode) {
case SchemaMode::Automatic:
if (version < m_schema_version && m_schema_version != ObjectStore::NotVersioned)
throw InvalidSchemaVersionException(m_schema_version, version, false);
verify_schema_version_not_decreasing(version);
return true;

case SchemaMode::Immutable:
Expand Down Expand Up @@ -321,8 +320,7 @@ bool Realm::schema_change_needs_write_transaction(Schema& schema, std::vector<Sc
}

case SchemaMode::Manual:
if (version < m_schema_version && m_schema_version != ObjectStore::NotVersioned)
throw InvalidSchemaVersionException(m_schema_version, version, false);
verify_schema_version_not_decreasing(version);
if (version == m_schema_version) {
ObjectStore::verify_no_changes_required(changes);
REALM_UNREACHABLE(); // changes is non-empty so above line always throws
Expand All @@ -332,6 +330,16 @@ bool Realm::schema_change_needs_write_transaction(Schema& schema, std::vector<Sc
REALM_COMPILER_HINT_UNREACHABLE();
}

void Realm::verify_schema_version_not_decreasing(uint64_t version)
{
#if REALM_ENABLE_SYNC
if (m_config.sync_config)
return;
#endif
Comment on lines +336 to +339
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this check for? Is it to make sure it is not a sync'ed realm?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. didn't want to break the behaviour for local realms.

if (version < m_schema_version && m_schema_version != ObjectStore::NotVersioned)
throw InvalidSchemaVersionException(m_schema_version, version, false);
}

Schema Realm::get_full_schema()
{
if (!m_config.immutable())
Expand Down
3 changes: 2 additions & 1 deletion src/realm/object-store/shared_realm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ class Realm : public std::enable_shared_from_this<Realm> {
* will be thrown instead.
*
* If the destination file does not exist, the action performed depends on
* the type of the source and destimation files. If the destination
* the type of the source and destination files. If the destination
* configuration is a non-sync local Realm configuration, a compacted copy
* of the current Transaction's data (which includes uncommitted changes if
* applicable!) is written in streaming form, with no history.
Expand Down Expand Up @@ -546,6 +546,7 @@ class Realm : public std::enable_shared_from_this<Realm> {
void set_schema(Schema const& reference, Schema schema);
bool reset_file(Schema& schema, std::vector<SchemaChange>& changes_required);
bool schema_change_needs_write_transaction(Schema& schema, std::vector<SchemaChange>& changes, uint64_t version);
void verify_schema_version_not_decreasing(uint64_t version);
Schema get_full_schema();

// Ensure that m_schema and m_schema_version match that of the current
Expand Down
107 changes: 91 additions & 16 deletions src/realm/object-store/sync/async_open_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ AsyncOpenTask::AsyncOpenTask(std::shared_ptr<_impl::RealmCoordinator> coordinato
{
}

void AsyncOpenTask::start(AsyncOpenCallback async_open_complete)
void AsyncOpenTask::start(AsyncOpenCallback callback)
{
util::CheckedUniqueLock lock(m_mutex);
if (!m_session)
Expand All @@ -43,8 +43,7 @@ void AsyncOpenTask::start(AsyncOpenCallback async_open_complete)
lock.unlock();

std::shared_ptr<AsyncOpenTask> self(shared_from_this());
session->wait_for_download_completion([async_open_complete = std::move(async_open_complete), self,
this](Status status) mutable {
session->wait_for_download_completion([callback = std::move(callback), self, this](Status status) mutable {
std::shared_ptr<_impl::RealmCoordinator> coordinator;
{
util::CheckedLockGuard lock(m_mutex);
Expand All @@ -56,18 +55,19 @@ void AsyncOpenTask::start(AsyncOpenCallback async_open_complete)
}

if (!status.is_ok()) {
self->async_open_complete(std::move(async_open_complete), coordinator, status);
self->async_open_complete(std::move(callback), coordinator, status);
return;
}

auto config = coordinator->get_config();
if (config.sync_config && config.sync_config->flx_sync_requested &&
config.sync_config->subscription_initializer) {
const bool rerun_on_launch = config.sync_config->rerun_init_subscription_on_open;
self->attach_to_subscription_initializer(std::move(async_open_complete), coordinator, rerun_on_launch);
}
else {
self->async_open_complete(std::move(async_open_complete), coordinator, status);
self->migrate_schema_or_complete(std::move(callback), coordinator, status);
});
// The callback does not extend the lifetime of the task if it's never invoked.
SyncSession::Internal::set_sync_schema_migration_callback(*session, [weak_self = weak_from_this(), this]() {
if (auto self = weak_self.lock()) {
util::CheckedLockGuard lock(m_mutex);
if (!m_session)
return;
m_sync_schema_migration_required = true;
Comment on lines +66 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a case where weak_self could be locked but the this pointer is not valid? I can't think of any.

Copy link
Collaborator Author

@danieltabacaru danieltabacaru Sep 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's really possible either

}
});
session->revive_if_needed();
Expand Down Expand Up @@ -118,7 +118,7 @@ void AsyncOpenTask::unregister_download_progress_notifier(uint64_t token)
m_session->unregister_progress_notifier(token);
}

void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& async_open_callback,
void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& callback,
std::shared_ptr<_impl::RealmCoordinator> coordinator,
bool rerun_on_launch)
{
Expand All @@ -137,13 +137,13 @@ void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& async
// We need to wait until subscription initializer completes
std::shared_ptr<AsyncOpenTask> self(shared_from_this());
init_subscription.get_state_change_notification(sync::SubscriptionSet::State::Complete)
.get_async([self, coordinator, async_open_callback = std::move(async_open_callback)](
.get_async([self, coordinator, callback = std::move(callback)](
StatusWith<realm::sync::SubscriptionSet::State> state) mutable {
self->async_open_complete(std::move(async_open_callback), coordinator, state.get_status());
self->async_open_complete(std::move(callback), coordinator, state.get_status());
});
}
else {
async_open_complete(std::move(async_open_callback), coordinator, Status::OK());
async_open_complete(std::move(callback), coordinator, Status::OK());
}
}

Expand All @@ -152,6 +152,10 @@ void AsyncOpenTask::async_open_complete(AsyncOpenCallback&& callback,
{
{
util::CheckedLockGuard lock(m_mutex);
// 'Cancel' may have been called just before 'async_open_complete' is invoked.
if (!m_session)
return;

for (auto token : m_registered_callbacks) {
m_session->unregister_progress_notifier(token);
}
Expand All @@ -171,4 +175,75 @@ void AsyncOpenTask::async_open_complete(AsyncOpenCallback&& callback,
return callback({}, std::make_exception_ptr(Exception(status)));
}

void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback,
std::shared_ptr<_impl::RealmCoordinator> coordinator, Status status)
{
util::CheckedUniqueLock lock(m_mutex);
if (!m_session)
return;
auto session = m_session;
auto migrate_schema = m_sync_schema_migration_required;
lock.unlock();

if (!migrate_schema) {
wait_for_bootstrap_or_complete(std::move(callback), coordinator, status);
return;
}

std::shared_ptr<AsyncOpenTask> self(shared_from_this());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment that you are extending the lifetime of this object until the bootstrap is complete?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment. we've already been doing that with the initial subscriptions callback.

session->wait_for_upload_completion([callback = std::move(callback), coordinator, session, self,
this](Status status) mutable {
{
util::CheckedLockGuard lock(m_mutex);
if (!m_session)
return; // Swallow all events if the task has been cancelled.
}

if (!status.is_ok()) {
self->async_open_complete(std::move(callback), coordinator, status);
return;
}

auto future = SyncSession::Internal::pause_async(*session);
// Wait until the SessionWrapper is done using the DBRef.
std::move(future).get_async([callback = std::move(callback), coordinator, self, this](Status status) mutable {
{
util::CheckedLockGuard lock(m_mutex);
if (!m_session)
return; // Swallow all events if the task has been cancelled.
}

if (!status.is_ok()) {
self->async_open_complete(std::move(callback), coordinator, status);
return;
}

// Delete the realm file and reopen it.
{
util::CheckedLockGuard lock(m_mutex);
m_session = nullptr;
coordinator->delete_and_reopen();
m_session = coordinator->sync_session();
}

self->wait_for_bootstrap_or_complete(std::move(callback), coordinator, status);
});
});
}

void AsyncOpenTask::wait_for_bootstrap_or_complete(AsyncOpenCallback&& callback,
std::shared_ptr<_impl::RealmCoordinator> coordinator,
Status status)
{
auto config = coordinator->get_config();
if (config.sync_config && config.sync_config->flx_sync_requested &&
config.sync_config->subscription_initializer) {
const bool rerun_on_launch = config.sync_config->rerun_init_subscription_on_open;
attach_to_subscription_initializer(std::move(callback), coordinator, rerun_on_launch);
}
else {
async_open_complete(std::move(callback), coordinator, status);
}
}

} // namespace realm
9 changes: 7 additions & 2 deletions src/realm/object-store/sync/async_open_task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class AsyncOpenTask : public std::enable_shared_from_this<AsyncOpenTask> {
//
// If multiple AsyncOpenTasks all attempt to download the same Realm and one of them is canceled,
// the other tasks will receive a "Cancelled" exception.
void start(AsyncOpenCallback async_open_complete) REQUIRES(!m_mutex);
void start(AsyncOpenCallback callback) REQUIRES(!m_mutex);

// Cancels the download and stops the session. No further functions should be called on this class.
void cancel() REQUIRES(!m_mutex);
Expand All @@ -62,12 +62,17 @@ class AsyncOpenTask : public std::enable_shared_from_this<AsyncOpenTask> {
REQUIRES(!m_mutex);
void attach_to_subscription_initializer(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, bool)
REQUIRES(!m_mutex);
void migrate_schema_or_complete(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, Status)
REQUIRES(!m_mutex);
void wait_for_bootstrap_or_complete(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, Status)
REQUIRES(!m_mutex);

std::shared_ptr<_impl::RealmCoordinator> m_coordinator GUARDED_BY(m_mutex);
std::shared_ptr<SyncSession> m_session GUARDED_BY(m_mutex);
std::vector<uint64_t> m_registered_callbacks GUARDED_BY(m_mutex);
mutable util::CheckedMutex m_mutex;
bool m_db_first_open{true};
const bool m_db_first_open;
bool m_sync_schema_migration_required GUARDED_BY(m_mutex) = false;
};

} // namespace realm
Expand Down
17 changes: 17 additions & 0 deletions src/realm/object-store/sync/sync_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,16 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
next_state = NextStateAfterError::inactive;
log_out_user = true;
break;
case sync::ProtocolErrorInfo::Action::MigrateSchema:
std::function<void()> callback;
{
util::CheckedLockGuard l(m_state_mutex);
callback = m_sync_schema_migration_callback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you "move" or clear the callback when you grab it to call it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, let's do that.

}
if (callback) {
callback();
}
return; // do not propgate the error to the user at this point
}
}
else {
Expand Down Expand Up @@ -893,6 +903,7 @@ void SyncSession::create_sync_session()
session_config.proxy_config = sync_config.proxy_config;
session_config.simulate_integration_error = sync_config.simulate_integration_error;
session_config.flx_bootstrap_batch_size_bytes = sync_config.flx_bootstrap_batch_size_bytes;
session_config.schema_version = m_config.schema_version;
session_config.session_reason = ClientResetOperation::is_fresh_path(m_config.path)
? sync::SessionReason::ClientReset
: sync::SessionReason::Sync;
Expand Down Expand Up @@ -1003,6 +1014,12 @@ void SyncSession::set_sync_transact_callback(std::function<sync::Session::SyncTr
m_sync_transact_callback = std::move(callback);
}

void SyncSession::set_sync_schema_migration_callback(std::function<void()>&& callback)
{
util::CheckedLockGuard l(m_state_mutex);
m_sync_schema_migration_callback = std::move(callback);
}

void SyncSession::nonsync_transact_notify(sync::version_type version)
{
m_progress_notifier.set_local_version(version);
Expand Down
10 changes: 10 additions & 0 deletions src/realm/object-store/sync/sync_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
class Internal {
friend class _impl::RealmCoordinator;
friend struct OnlyForTesting;
friend class AsyncOpenTask;

static void set_sync_transact_callback(SyncSession& session, std::function<TransactionCallback>&& callback)
{
Expand All @@ -290,6 +291,11 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
return session.m_db;
}

static void set_sync_schema_migration_callback(SyncSession& session, std::function<void()>&& callback)
{
session.set_sync_schema_migration_callback(std::move(callback));
}

static util::Future<void> pause_async(SyncSession& session);
};

Expand Down Expand Up @@ -404,6 +410,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
void set_sync_transact_callback(std::function<TransactionCallback>&&) REQUIRES(!m_state_mutex);
void nonsync_transact_notify(VersionID::version_type) REQUIRES(!m_state_mutex);

void set_sync_schema_migration_callback(std::function<void()>&&) REQUIRES(!m_state_mutex);

void create_sync_session() REQUIRES(m_state_mutex, !m_config_mutex);
void did_drop_external_reference()
REQUIRES(!m_state_mutex, !m_config_mutex, !m_external_reference_mutex, !m_connection_state_mutex);
Expand Down Expand Up @@ -438,6 +446,8 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {

std::function<TransactionCallback> m_sync_transact_callback GUARDED_BY(m_state_mutex);

std::function<void()> m_sync_schema_migration_callback GUARDED_BY(m_state_mutex);

template <typename Field>
auto config(Field f) REQUIRES(!m_config_mutex)
{
Expand Down
11 changes: 11 additions & 0 deletions src/realm/sync/noinst/client_impl_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2548,6 +2548,17 @@ Status Session::receive_error_message(const ProtocolErrorInfo& info)
return Status::OK();
}

if (protocol_error == ProtocolError::schema_version_changed) {
// Enable upload immediately if the session is still active.
if (m_state == Active) {
m_allow_upload = true;
// Notify SyncSession a schema migration is required.
on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
}
// Keep the session active to upload any unsynced changes.
return Status::OK();
}

m_error_message_received = true;
suspend(SessionErrorInfo{info, std::move(status)});
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion src/realm/sync/noinst/client_reset_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ bool ClientResetOperation::finalize(sync::SaltedFileIdent salted_file_ident, syn
std::move(on_flx_version_complete)); // throws

if (m_notify_after) {
m_notify_after(previous_state->get_version_of_current_transaction(), did_recover_out);
m_notify_after(frozen_before_state_version, did_recover_out);
}

m_client_reset_old_version = local_version_ids.old_version;
Expand Down
Loading