Skip to content

Commit

Permalink
Merge pull request #4839 from realm/tg/sync-session-db
Browse files Browse the repository at this point in the history
Use a single DB instance rather than two for synchronized Realms
  • Loading branch information
tgoyne authored Aug 30, 2021
2 parents 62527a1 + 7b2d101 commit 779c0a5
Show file tree
Hide file tree
Showing 64 changed files with 2,529 additions and 5,079 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Enhancements
* ThreadSafeReference no longer pins the source transaction version for anything other than a Results backed by a Query.
* A ThreadSafeReference to a Results backed by a collection can now be created inside a write transaction as long as the collection was not created in the current write transaction.
* Synchronized Realms are no longer opened twice, cutting the address space and file descriptors used in half. ([#4839](https://github.com/realm/realm-core/pull/4839))

### Fixed
* <How to hit and notice issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
Expand All @@ -14,7 +15,7 @@
-----------

### Internals
* None.
* SyncConfig no longer has an encryption_key field, and the key from the parent Realm::Config is used instead.

----------------------------------------------

Expand Down
2 changes: 0 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ let notSyncServerSources: [String] = [
"realm/sync/instruction_replication.cpp",
"realm/sync/instructions.cpp",
"realm/sync/noinst/changeset_index.cpp",
"realm/sync/noinst/client_file_access_cache.cpp",
"realm/sync/noinst/client_history_impl.cpp",
"realm/sync/noinst/client_impl_base.cpp",
"realm/sync/noinst/client_reset.cpp",
Expand Down Expand Up @@ -564,7 +563,6 @@ let headers: [String] = [
"realm/sync/instructions.hpp",
"realm/sync/metrics.hpp",
"realm/sync/noinst/changeset_index.hpp",
"realm/sync/noinst/client_file_access_cache.hpp",
"realm/sync/noinst/client_history_impl.hpp",
"realm/sync/noinst/client_impl_base.hpp",
"realm/sync/noinst/client_reset.hpp",
Expand Down
2 changes: 1 addition & 1 deletion src/realm/alloc_slab.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ class SlabAlloc : public Allocator {
};
std::vector<MapEntry> m_mappings;
size_t m_translation_table_size = 0;
uint64_t m_mapping_version = 1;
std::atomic<uint64_t> m_mapping_version = 1;
uint64_t m_youngest_live_version = 1;
std::mutex m_mapping_mutex;
util::File m_file;
Expand Down
55 changes: 37 additions & 18 deletions src/realm/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
*
**************************************************************************/

#include <realm/db.hpp>

#include <algorithm>
#include <atomic>
#include <cerrno>
#include <fcntl.h>
#include <realm/db.hpp>
#include <iostream>
#include <mutex>
#include <sstream>
Expand Down Expand Up @@ -729,9 +730,9 @@ std::string DBOptions::sys_tmp_dir = getenv("TMPDIR") ? getenv("TMPDIR") : "";
// is associated with any resources that are local to the initializing
// process, because that would imply a leak.
//
// While it is not explicitely guaranteed in the man page, we shall
// While it is not explicitly guaranteed in the man page, we shall
// assume that is is valid to initialize a process-shared mutex twice
// without an intervending call to pthread_mutex_destroy(). We need to
// without an intervening call to pthread_mutex_destroy(). We need to
// be able to reinitialize a process-shared mutex if the first
// initializing process crashes and leaves the shared memory in an
// undefined state.
Expand Down Expand Up @@ -775,11 +776,9 @@ void DB::do_open(const std::string& path, bool no_create_file, bool is_backend,

Replication::HistoryType openers_hist_type = Replication::hist_None;
int openers_hist_schema_version = 0;
bool opener_is_sync_agent = false;
if (Replication* repl = get_replication()) {
openers_hist_type = repl->get_history_type();
openers_hist_schema_version = repl->get_history_schema_version();
opener_is_sync_agent = repl->is_sync_agent();
}

int current_file_format_version;
Expand Down Expand Up @@ -1234,11 +1233,6 @@ void DB::do_open(const std::string& path, bool no_create_file, bool is_backend,
throw IncompatibleLockFile(ss.str());
}

// We need to ensure that at most one sync agent can join a
// session
if (info->sync_agent_present && opener_is_sync_agent)
throw MultipleSyncAgents{};

// Even though this session participant is not the session initiator,
// it may be the one that has to perform the history schema upgrade.
// See upgrade_file_format(). However we cannot get the actual value
Expand Down Expand Up @@ -1281,11 +1275,6 @@ void DB::do_open(const std::string& path, bool no_create_file, bool is_backend,
// make our presence noted:
++info->num_participants;

if (opener_is_sync_agent) {
REALM_ASSERT(!info->sync_agent_present);
info->sync_agent_present = 1; // Set to true
}

// Keep the mappings and file open:
alloc_detach_guard.release();
fug_2.release(); // Do not unmap
Expand Down Expand Up @@ -1615,15 +1604,13 @@ void DB::close_internal(std::unique_lock<InterprocessMutex> lock, bool allow_ope
}
SharedInfo* info = m_file_map.get_addr();
{
bool is_sync_agent = m_replication ? m_replication->is_sync_agent() : false;

if (!lock.owns_lock())
lock.lock();

if (m_alloc.is_attached())
m_alloc.detach();

if (is_sync_agent) {
if (m_is_sync_agent) {
REALM_ASSERT(info->sync_agent_present);
info->sync_agent_present = 0; // Set to false
}
Expand Down Expand Up @@ -2867,6 +2854,15 @@ DBRef DB::create(Replication& repl, const DBOptions options)
return retval;
}

DBRef DB::create(std::unique_ptr<Replication> repl, const DBOptions options)
{
REALM_ASSERT(repl);
DBRef retval = std::make_shared<DBInit>(options);
retval->m_history = std::move(repl);
retval->open(*retval->m_history, options);
return retval;
}

DBRef DB::create(BinaryData buffer, bool take_ownership)
{
DBOptions options;
Expand All @@ -2876,6 +2872,29 @@ DBRef DB::create(BinaryData buffer, bool take_ownership)
return retval;
}

void DB::claim_sync_agent()
{
REALM_ASSERT(is_attached());
std::unique_lock<InterprocessMutex> lock(m_controlmutex);
SharedInfo* info = m_file_map.get_addr();
if (info->sync_agent_present)
throw MultipleSyncAgents{};
info->sync_agent_present = 1; // Set to true
m_is_sync_agent = true;
}

void DB::release_sync_agent()
{
REALM_ASSERT(is_attached());
std::unique_lock<InterprocessMutex> lock(m_controlmutex);
if (!m_is_sync_agent)
return;
SharedInfo* info = m_file_map.get_addr();
REALM_ASSERT(info->sync_agent_present);
info->sync_agent_present = 0;
m_is_sync_agent = false;
}

// HACK: Somewhat misplaced, but we have no replication.cpp
Replication::~Replication()
{
Expand Down
37 changes: 24 additions & 13 deletions src/realm/db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,20 @@
#ifndef REALM_GROUP_SHARED_HPP
#define REALM_GROUP_SHARED_HPP

#include <functional>
#include <cstdint>
#include <limits>
#include <realm/util/features.h>
#include <realm/util/thread.hpp>
#include <realm/util/interprocess_condvar.hpp>
#include <realm/util/interprocess_mutex.hpp>
#include <realm/db_options.hpp>
#include <realm/group.hpp>
#include <realm/handover_defs.hpp>
#include <realm/impl/transact_log.hpp>
#include <realm/metrics/metrics.hpp>
#include <realm/replication.hpp>
#include <realm/util/features.h>
#include <realm/util/interprocess_condvar.hpp>
#include <realm/util/interprocess_mutex.hpp>
#include <realm/version_id.hpp>
#include <realm/db_options.hpp>
#include <realm/util/logger.hpp>

#include <functional>
#include <cstdint>
#include <limits>

namespace realm {

Expand Down Expand Up @@ -116,6 +115,7 @@ class DB : public std::enable_shared_from_this<DB> {
// file (or another file), a new DB object is needed.
static DBRef create(const std::string& file, bool no_create = false, const DBOptions options = DBOptions());
static DBRef create(Replication& repl, const DBOptions options = DBOptions());
static DBRef create(std::unique_ptr<Replication> repl, const DBOptions options = DBOptions());
static DBRef create(BinaryData, bool take_ownership = true);

~DB() noexcept;
Expand Down Expand Up @@ -160,6 +160,10 @@ class DB : public std::enable_shared_from_this<DB> {
m_replication = repl;
}

const std::string& get_path() const
{
return m_db_path;
}

#ifdef REALM_DEBUG
/// Deprecated method, only called from a unit test
Expand Down Expand Up @@ -388,13 +392,19 @@ class DB : public std::enable_shared_from_this<DB> {
static void delete_files(const std::string& base_path, bool* did_delete_realm = nullptr,
bool delete_lockfile = false);

/// Mark this DB as the sync agent for the file.
/// \throw MultipleSyncAgents if another DB is already the sync agent.
void claim_sync_agent();
void release_sync_agent();

protected:
explicit DB(const DBOptions& options); // Is this ever used?

private:
std::recursive_mutex m_mutex;
int m_transaction_count = 0;
SlabAlloc m_alloc;
std::unique_ptr<Replication> m_history;
Replication* m_replication = nullptr;
struct SharedInfo;
struct ReadCount;
Expand Down Expand Up @@ -446,8 +456,9 @@ class DB : public std::enable_shared_from_this<DB> {
util::InterprocessCondVar m_new_commit_available;
util::InterprocessCondVar m_pick_next_writer;
std::function<void(int, int)> m_upgrade_callback;

std::shared_ptr<metrics::Metrics> m_metrics;
bool m_is_sync_agent = false;

/// Attach this DB instance to the specified database file.
///
/// While at least one instance of DB exists for a specific
Expand Down Expand Up @@ -852,22 +863,22 @@ inline DB::TransactStage Transaction::get_transact_stage() const noexcept
class DB::ReadLockGuard {
public:
ReadLockGuard(DB& shared_group, ReadLockInfo& read_lock) noexcept
: m_shared_group(shared_group)
: m_db(shared_group)
, m_read_lock(&read_lock)
{
}
~ReadLockGuard() noexcept
{
if (m_read_lock)
m_shared_group.release_read_lock(*m_read_lock);
m_db.release_read_lock(*m_read_lock);
}
void release() noexcept
{
m_read_lock = 0;
}

private:
DB& m_shared_group;
DB& m_db;
ReadLockInfo* m_read_lock;
};

Expand Down
8 changes: 4 additions & 4 deletions src/realm/group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1787,12 +1787,12 @@ void Group::advance_transact(ref_type new_top_ref, _impl::NoCopyInputStream& in,
// database.
//
// Initially, when this function is invoked, we cannot assume any
// correspondance between the accessor state and the underlying node
// correspondence between the accessor state and the underlying node
// structure. We can assume that the hierarchy is in a state of minimal
// consistency, and that it can be brought to a state of structural
// correspondace using information in the transaction logs. When structural
// correspondace is achieved, we can reliably refresh the accessor hierarchy
// (Table::refresh_accessor_tree()) to bring it back to a fully concsistent
// correspondence using information in the transaction logs. When structural
// correspondence is achieved, we can reliably refresh the accessor hierarchy
// (Table::refresh_accessor_tree()) to bring it back to a fully consistent
// state. See AccessorConsistencyLevels.
//
// Much of the information in the transaction logs is not used in this
Expand Down
Loading

0 comments on commit 779c0a5

Please sign in to comment.