diff --git a/db/db_impl.h b/db/db_impl.h index 29acca5e106a..bb3c445c8ec6 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -1162,6 +1162,8 @@ class DBImpl : public DB { // and log_empty_. Refer to the definition of each variable below for more // details. InstrumentedMutex log_write_mutex_; + + protected: // State below is protected by mutex_ // With two_write_queues enabled, some of the variables that accessed during // WriteToWAL need different synchronization: log_empty_, alive_log_files_, @@ -1169,6 +1171,7 @@ class DBImpl : public DB { // more description. mutable InstrumentedMutex mutex_; + private: std::atomic shutting_down_; // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 @@ -1199,8 +1202,12 @@ class DBImpl : public DB { // read and writes are protected by log_write_mutex_ instead. This is to avoid // expesnive mutex_ lock during WAL write, which update log_empty_. bool log_empty_; + + protected: ColumnFamilyHandleImpl* default_cf_handle_; InternalStats* default_cf_internal_stats_; + + private: std::unique_ptr column_family_memtables_; struct LogFileNumberSize { explicit LogFileNumberSize(uint64_t _number) @@ -1267,12 +1274,16 @@ class DBImpl : public DB { WriteBatch cached_recoverable_state_; std::atomic cached_recoverable_state_empty_ = {true}; std::atomic total_log_size_; + + protected: // only used for dynamically adjusting max_total_wal_size. it is a sum of // [write_buffer_size * max_write_buffer_number] over all column families uint64_t max_total_in_memory_state_; // If true, we have only one (default) column family. We use this to optimize // some code-paths bool single_column_family_mode_; + + private: // If this is non-empty, we need to delete these log files in background // threads. Protected by db mutex. autovector logs_to_free_; @@ -1485,12 +1496,14 @@ class DBImpl : public DB { std::string db_absolute_path_; + protected: // The options to access storage files const EnvOptions env_options_; // Additonal options for compaction and flush EnvOptions env_options_for_compaction_; + private: // Number of running IngestExternalFile() calls. // REQUIRES: mutex held int num_running_ingest_file_; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 2a33741568ba..559b14aeddbb 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -405,7 +405,6 @@ Status DBImpl::Recover( } if (s.ok()) { - SequenceNumber next_sequence(kMaxSequenceNumber); default_cf_handle_ = new ColumnFamilyHandleImpl( versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); @@ -465,6 +464,7 @@ Status DBImpl::Recover( if (!logs.empty()) { // Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); + SequenceNumber next_sequence(kMaxSequenceNumber); s = RecoverLogFiles(logs, &next_sequence, read_only); if (!s.ok()) { // Clear memtables if recovery failed diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index bd7099f00d02..0c0a8482b599 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -159,7 +159,6 @@ Status DB::OpenForReadOnly( *dbptr = nullptr; handles->clear(); - SuperVersionContext sv_context(/* create_superversion */ true); DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname); impl->mutex_.Lock(); Status s = impl->Recover(column_families, true /* read only */, @@ -176,6 +175,7 @@ Status DB::OpenForReadOnly( handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); } } + SuperVersionContext sv_context(/* create_superversion */ true); if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { sv_context.NewSuperVersion(); diff --git a/db/db_impl_secondary.cc b/db/db_impl_secondary.cc new file mode 100644 index 000000000000..205339a00a2b --- /dev/null +++ b/db/db_impl_secondary.cc @@ -0,0 +1,368 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_impl_secondary.h" +#include "db/db_impl.h" +#include "db/db_iter.h" +#include "db/forward_iterator.h" +#include "db/merge_context.h" +#include "db/range_del_aggregator.h" +#include "monitoring/perf_context_imp.h" +#include "util/auto_roll_logger.h" + +namespace rocksdb { + +#ifndef ROCKSDB_LITE + +DBImplSecondary::DBImplSecondary(const DBOptions& db_options, + const std::string& dbname) + : DBImpl(db_options, dbname) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Opening the db in secondary mode"); + LogFlush(immutable_db_options_.info_log); +} + +DBImplSecondary::~DBImplSecondary() {} + +Status DBImplSecondary::Recover( + const std::vector& column_families) { + mutex_.AssertHeld(); + + Status s; + s = versions_->RecoverAsSecondary(column_families, &manifest_reader_, + &manifest_reporter_, + &manifest_reader_status_); + if (!s.ok()) { + return s; + } + if (immutable_db_options_.paranoid_checks && s.ok()) { + s = CheckConsistency(); + } + // Initial max_total_in_memory_state_ before recovery logs. Log recovery + // may check this value to decide whether to flush. + max_total_in_memory_state_ = 0; + for (auto cfd : *versions_->GetColumnFamilySet()) { + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * + mutable_cf_options->max_write_buffer_number; + } + if (s.ok()) { + default_cf_handle_ = new ColumnFamilyHandleImpl( + versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); + default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); + single_column_family_mode_ = + versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; + } + + // TODO: attempt to recover from WAL files. + return s; +} + +// Implementation of the DB interface +Status DBImplSecondary::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) { + return GetImpl(read_options, column_family, key, value); +} + +Status DBImplSecondary::GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* pinnable_val) { + assert(pinnable_val != nullptr); + PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); + StopWatch sw(env_, stats_, DB_GET); + PERF_TIMER_GUARD(get_snapshot_time); + + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + tracer_->Get(column_family, key); + } + } + // Acquire SuperVersion + SuperVersion* super_version = GetAndRefSuperVersion(cfd); + SequenceNumber snapshot = versions_->LastSequence(); + ; + MergeContext merge_context; + SequenceNumber max_covering_tombstone_seq = 0; + Status s; + LookupKey lkey(key, snapshot); + PERF_TIMER_STOP(get_snapshot_time); + + bool done = false; + if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + super_version->imm->Get( + lkey, pinnable_val->GetSelf(), &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } + if (!done && !s.ok() && !s.IsMergeInProgress()) { + ReturnAndCleanupSuperVersion(cfd, super_version); + return s; + } + if (!done) { + PERF_TIMER_GUARD(get_from_output_files_time); + super_version->current->Get(read_options, lkey, pinnable_val, &s, + &merge_context, &max_covering_tombstone_seq); + RecordTick(stats_, MEMTABLE_MISS); + } + { + PERF_TIMER_GUARD(get_post_process_time); + ReturnAndCleanupSuperVersion(cfd, super_version); + RecordTick(stats_, NUMBER_KEYS_READ); + size_t size = pinnable_val->size(); + RecordTick(stats_, BYTES_READ, size); + MeasureTime(stats_, BYTES_PER_READ, size); + PERF_COUNTER_ADD(get_read_bytes, size); + } + return s; +} + +Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) { + if (read_options.managed) { + return NewErrorIterator( + Status::NotSupported("Managed iterator is not supported anymore.")); + } + if (read_options.read_tier == kPersistedTier) { + return NewErrorIterator(Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators.")); + } + Iterator* result = nullptr; + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + ReadCallback* read_callback = nullptr; // No read callback provided. + if (read_options.tailing) { + SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + auto iter = new ForwardIterator(this, read_options, cfd, super_version); + result = NewDBIterator( + env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, + cfd->user_comparator(), iter, kMaxSequenceNumber, + super_version->mutable_cf_options.max_sequential_skip_in_iterations, + read_callback, this, cfd); + } else { + auto snapshot = read_options.snapshot != nullptr + ? read_options.snapshot->GetSequenceNumber() + : versions_->LastSequence(); + result = NewIteratorImpl(read_options, cfd, snapshot, read_callback); + } + return result; +} + +ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( + const ReadOptions& read_options, ColumnFamilyData* cfd, + SequenceNumber snapshot, ReadCallback* read_callback) { + SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + auto db_iter = NewArenaWrappedDbIterator( + env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, + snapshot, + super_version->mutable_cf_options.max_sequential_skip_in_iterations, + super_version->version_number, read_callback); + auto internal_iter = + NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), + db_iter->GetRangeDelAggregator(), snapshot); + db_iter->SetIterUnderDBIter(internal_iter); + return db_iter; +} + +Status DBImplSecondary::NewIterators( + const ReadOptions& read_options, + const std::vector& column_families, + std::vector* iterators) { + if (read_options.managed) { + return Status::NotSupported("Managed iterator is not supported anymore."); + } + if (read_options.read_tier == kPersistedTier) { + return Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators."); + } + ReadCallback* read_callback = nullptr; // No read callback provided. + if (iterators == nullptr) { + return Status::InvalidArgument("iterators not allowed to be nullptr"); + } + iterators->clear(); + iterators->reserve(column_families.size()); + if (read_options.tailing) { + for (auto cfh : column_families) { + auto cfd = reinterpret_cast(cfh)->cfd(); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + auto iter = new ForwardIterator(this, read_options, cfd, super_version); + iterators->push_back(NewDBIterator( + env_, read_options, *cfd->ioptions(), + super_version->mutable_cf_options, cfd->user_comparator(), iter, + kMaxSequenceNumber, + super_version->mutable_cf_options.max_sequential_skip_in_iterations, + read_callback, this, cfd)); + } + } else { + SequenceNumber latest_snapshot = versions_->LastSequence(); + SequenceNumber read_seq = + read_options.snapshot != nullptr + ? reinterpret_cast(read_options.snapshot) + ->number_ + : latest_snapshot; + + for (auto cfh : column_families) { + auto* cfd = reinterpret_cast(cfh)->cfd(); + iterators->push_back( + NewIteratorImpl(read_options, cfd, read_seq, read_callback)); + } + } + + return Status::OK(); +} + +Status DBImplSecondary::TryCatchUpWithPrimary() { + assert(versions_.get() != nullptr); + assert(manifest_reader_.get() != nullptr); + Status s; + std::unordered_set cfds_changed; + InstrumentedMutexLock lock_guard(mutex()); + s = versions_->ReadAndApply(mutex(), &manifest_reader_, &cfds_changed); + if (s.ok()) { + SuperVersionContext sv_context(true /* create_superversion */); + for (auto cfd : cfds_changed) { + sv_context.NewSuperVersion(); + cfd->InstallSuperVersion(&sv_context, mutex()); + } + sv_context.Clean(); + } + return s; +} + +Status DB::OpenAsSecondary(const Options& options, const std::string& dbname, + const std::string& secondary_dbname, DB** dbptr) { + *dbptr = nullptr; + + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.emplace_back(kDefaultColumnFamilyName, cf_options); + std::vector handles; + + Status s = DB::OpenAsSecondary(db_options, dbname, secondary_dbname, + column_families, &handles, dbptr); + if (s.ok()) { + assert(handles.size() == 1); + delete handles[0]; + } + return s; +} + +Status DB::OpenAsSecondary( + const DBOptions& db_options, const std::string& dbname, + const std::string& secondary_dbname, + const std::vector& column_families, + std::vector* handles, DB** dbptr) { + *dbptr = nullptr; + if (db_options.max_open_files != -1) { + // TODO (yanqin) maybe support max_open_files != -1 by creating hard links + // on SST files so that db secondary can still have access to old SSTs + // while primary instance may delete original. + return Status::InvalidArgument("require max_open_files to be -1"); + } + + DBOptions tmp_opts(db_options); + if (nullptr == tmp_opts.info_log) { + Env* env = tmp_opts.env; + assert(env != nullptr); + std::string secondary_db_abs_path; + env->GetAbsolutePath(secondary_dbname, &secondary_db_abs_path); + std::string fname = InfoLogFileName(secondary_dbname, secondary_db_abs_path, + tmp_opts.db_log_dir); + + env->CreateDirIfMissing(secondary_dbname); + if (tmp_opts.log_file_time_to_roll > 0 || tmp_opts.max_log_file_size > 0) { + AutoRollLogger* result = new AutoRollLogger( + env, secondary_dbname, tmp_opts.db_log_dir, + tmp_opts.max_log_file_size, tmp_opts.log_file_time_to_roll, + tmp_opts.info_log_level); + Status s = result->GetStatus(); + if (!s.ok()) { + delete result; + } else { + tmp_opts.info_log.reset(result); + } + } + if (nullptr == tmp_opts.info_log) { + env->RenameFile(fname, OldInfoLogFileName( + secondary_dbname, env->NowMicros(), + secondary_db_abs_path, tmp_opts.db_log_dir)); + Status s = env->NewLogger(fname, &(tmp_opts.info_log)); + if (tmp_opts.info_log != nullptr) { + tmp_opts.info_log->SetInfoLogLevel(tmp_opts.info_log_level); + } + } + } + + assert(tmp_opts.info_log != nullptr); + + handles->clear(); + DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname); + impl->mutex_.Lock(); + Status s = impl->Recover(column_families); + if (s.ok()) { + for (auto cf : column_families) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (nullptr == cfd) { + s = Status::InvalidArgument("Column family not found: ", cf.name); + break; + } + handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); + } + } + SuperVersionContext sv_context(true /* create_superversion */); + if (s.ok()) { + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + sv_context.NewSuperVersion(); + cfd->InstallSuperVersion(&sv_context, &impl->mutex_); + } + } + impl->mutex_.Unlock(); + sv_context.Clean(); + if (s.ok()) { + *dbptr = impl; + for (auto h : *handles) { + impl->NewThreadStatusCfInfo( + reinterpret_cast(h)->cfd()); + } + } else { + for (auto h : *handles) { + delete h; + } + handles->clear(); + delete impl; + } + return s; +} +#else // !ROCKSDB_LITE + +Status DB::OpenAsSecondary(const Options& /*options*/, + const std::string& /*name*/, + const std::string& /*secondary_name*/, + DB** /*dbptr*/) { + return Status::NotSupported("Not supported in ROCKSDB_LITE."); +} + +Status DB::OpenAsSecondary( + const DBOptions& /*db_options*/, const std::string& /*dbname*/, + const std::string& /*secondary_name*/, + const std::vector& /*column_families*/, + std::vector* /*handles*/, DB** /*dbptr*/) { + return Status::NotSupported("Not supported in ROCKSDB_LITE."); +} +#endif // !ROCKSDB_LITE + +} // namespace rocksdb diff --git a/db/db_impl_secondary.h b/db/db_impl_secondary.h new file mode 100644 index 000000000000..6b7570c414d8 --- /dev/null +++ b/db/db_impl_secondary.h @@ -0,0 +1,140 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include "db/db_impl.h" + +namespace rocksdb { + +class DBImplSecondary : public DBImpl { + public: + DBImplSecondary(const DBOptions& options, const std::string& dbname); + virtual ~DBImplSecondary(); + + Status Recover(const std::vector& column_families); + + // Implementations of the DB interface + using DB::Get; + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) override; + + Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value); + + using DBImpl::NewIterator; + virtual Iterator* NewIterator(const ReadOptions&, + ColumnFamilyHandle* column_family) override; + + ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options, + ColumnFamilyData* cfd, + SequenceNumber snapshot, + ReadCallback* read_callback); + + virtual Status NewIterators( + const ReadOptions& options, + const std::vector& column_families, + std::vector* iterators) override; + + using DBImpl::Put; + virtual Status Put(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/, const Slice& /*value*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::Merge; + virtual Status Merge(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/, const Slice& /*value*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::Delete; + virtual Status Delete(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::SingleDelete; + virtual Status SingleDelete(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + virtual Status Write(const WriteOptions& /*options*/, + WriteBatch* /*updates*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::CompactRange; + virtual Status CompactRange(const CompactRangeOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice* /*begin*/, + const Slice* /*end*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::CompactFiles; + virtual Status CompactFiles( + const CompactionOptions& /*compact_options*/, + ColumnFamilyHandle* /*column_family*/, + const std::vector& /*input_file_names*/, + const int /*output_level*/, const int /*output_path_id*/ = -1, + std::vector* const /*output_file_names*/ = nullptr, + CompactionJobInfo* /*compaction_job_info*/ = nullptr) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + virtual Status DisableFileDeletions() override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + virtual Status EnableFileDeletions(bool /*force*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + virtual Status GetLiveFiles(std::vector&, + uint64_t* /*manifest_file_size*/, + bool /*flush_memtable*/ = true) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::Flush; + virtual Status Flush(const FlushOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::SyncWAL; + virtual Status SyncWAL() override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DB::IngestExternalFile; + virtual Status IngestExternalFile( + ColumnFamilyHandle* /*column_family*/, + const std::vector& /*external_files*/, + const IngestExternalFileOptions& /*ingestion_options*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + Status TryCatchUpWithPrimary(); + + private: + friend class DB; + + // No copying allowed + DBImplSecondary(const DBImplSecondary&); + void operator=(const DBImplSecondary&); + + std::unique_ptr manifest_reader_; + std::unique_ptr manifest_reporter_; + std::unique_ptr manifest_reader_status_; +}; +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/db/log_reader.h b/db/log_reader.h index 2c4f4f059901..79d7c2b471d3 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -85,6 +85,8 @@ class Reader { SequentialFileReader* file() { return file_.get(); } + Reporter* GetReporter() const { return reporter_; } + private: std::shared_ptr info_log_; const std::unique_ptr file_; diff --git a/db/version_builder.cc b/db/version_builder.cc index 7b45347c1240..a920e28d6514 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -364,10 +364,10 @@ class VersionBuilder::Rep { CheckConsistency(vstorage); } - void LoadTableHandlers(InternalStats* internal_stats, int max_threads, - bool prefetch_index_and_filter_in_cache, - bool is_initial_load, - const SliceTransform* prefix_extractor) { + Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, + bool prefetch_index_and_filter_in_cache, + bool is_initial_load, + const SliceTransform* prefix_extractor) { assert(table_cache_ != nullptr); size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity(); @@ -394,7 +394,8 @@ class VersionBuilder::Rep { size_t table_cache_usage = table_cache_->get_cache()->GetUsage(); if (table_cache_usage >= load_limit) { - return; + // TODO (yanqin) find a suitable status code. + return Status::OK(); } else { max_load = load_limit - table_cache_usage; } @@ -402,11 +403,15 @@ class VersionBuilder::Rep { // std::vector> files_meta; + std::vector statuses; for (int level = 0; level < num_levels_; level++) { for (auto& file_meta_pair : levels_[level].added_files) { auto* file_meta = file_meta_pair.second; - assert(!file_meta->table_reader_handle); - files_meta.emplace_back(file_meta, level); + // If the file has been opened before, just skip it. + if (!file_meta->table_reader_handle) { + files_meta.emplace_back(file_meta, level); + statuses.emplace_back(Status::OK()); + } if (files_meta.size() >= max_load) { break; } @@ -426,7 +431,7 @@ class VersionBuilder::Rep { auto* file_meta = files_meta[file_idx].first; int level = files_meta[file_idx].second; - table_cache_->FindTable( + statuses[file_idx] = table_cache_->FindTable( env_options_, *(base_vstorage_->InternalComparator()), file_meta->fd, &file_meta->table_reader_handle, prefix_extractor, false /*no_io */, true /* record_read_stats */, @@ -448,6 +453,12 @@ class VersionBuilder::Rep { for (auto& t : threads) { t.join(); } + for (const auto& s : statuses) { + if (!s.ok()) { + return s; + } + } + return Status::OK(); } void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) { @@ -487,14 +498,15 @@ void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { rep_->SaveTo(vstorage); } -void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats, - int max_threads, - bool prefetch_index_and_filter_in_cache, - bool is_initial_load, - const SliceTransform* prefix_extractor) { - rep_->LoadTableHandlers(internal_stats, max_threads, - prefetch_index_and_filter_in_cache, is_initial_load, - prefix_extractor); +Status VersionBuilder::LoadTableHandlers( + InternalStats* internal_stats, int max_threads, + bool prefetch_index_and_filter_in_cache, + bool is_initial_load, + const SliceTransform* prefix_extractor) { + return rep_->LoadTableHandlers(internal_stats, max_threads, + prefetch_index_and_filter_in_cache, + is_initial_load, + prefix_extractor); } void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level, diff --git a/db/version_builder.h b/db/version_builder.h index d6ee37e08ffa..168301fdd619 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -33,10 +33,10 @@ class VersionBuilder { bool CheckConsistencyForNumLevels(); void Apply(VersionEdit* edit); void SaveTo(VersionStorageInfo* vstorage); - void LoadTableHandlers(InternalStats* internal_stats, int max_threads, - bool prefetch_index_and_filter_in_cache, - bool is_initial_load, - const SliceTransform* prefix_extractor); + Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, + bool prefetch_index_and_filter_in_cache, + bool is_initial_load, + const SliceTransform* prefix_extractor); void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f); private: diff --git a/db/version_set.cc b/db/version_set.cc index 9acafc588f10..64b32e25b0c6 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -713,6 +713,7 @@ void LevelIterator::InitFileIterator(size_t new_file_index) { } } } +} // anonymous namespace // A wrapper of version builder which references the current version in // constructor and unref it in the destructor. @@ -736,7 +737,6 @@ class BaseReferencedVersionBuilder { VersionBuilder* version_builder_; Version* version_; }; -} // anonymous namespace Status Version::GetTableProperties(std::shared_ptr* tp, const FileMetaData* file_meta, @@ -2938,7 +2938,7 @@ Status VersionSet::ProcessManifestWrites( } else if (group_start != std::numeric_limits::max()) { group_start = std::numeric_limits::max(); } - LogAndApplyHelper(last_writer->cfd, builder, version, e, mu); + LogAndApplyHelper(last_writer->cfd, builder, e, mu); batch_edits.push_back(e); } } @@ -2992,6 +2992,7 @@ Status VersionSet::ProcessManifestWrites( assert(pending_manifest_file_number_ == 0); if (!descriptor_log_ || manifest_file_size_ > db_options_->max_manifest_file_size) { + TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest"); pending_manifest_file_number_ = NewFileNumber(); batch_edits.back()->SetNextFile(next_file_number_.load()); new_descriptor_log = true; @@ -3088,6 +3089,7 @@ Status VersionSet::ProcessManifestWrites( if (s.ok() && new_descriptor_log) { s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_, db_directory); + TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest"); } if (s.ok()) { @@ -3215,7 +3217,7 @@ Status VersionSet::ProcessManifestWrites( return s; } -// 'datas' is gramatically incorrect. We still use this notation is to indicate +// 'datas' is gramatically incorrect. We still use this notation to indicate // that this variable represents a collection of column_family_data. Status VersionSet::LogAndApply( const autovector& column_family_datas, @@ -3297,6 +3299,133 @@ Status VersionSet::LogAndApply( new_cf_options); } +Status VersionSet::ReadAndApply( + InstrumentedMutex* mu, std::unique_ptr* manifest_reader, + std::unordered_set* cfds_changed) { + assert(manifest_reader != nullptr); + assert(cfds_changed != nullptr); + mu->AssertHeld(); + + Status s; + bool have_log_number = false; + bool have_prev_log_number = false; + bool have_next_file = false; + bool have_last_sequence = false; + uint64_t next_file = 0; + uint64_t last_sequence = 0; + uint64_t log_number = 0; + uint64_t previous_log_number = 0; + uint32_t max_column_family = 0; + uint64_t min_log_number_to_keep = 0; + + while (s.ok()) { + Slice record; + std::string scratch; + bool read_success = false; // Make lint happy + log::Reader* reader = manifest_reader->get(); + std::string old_manifest_path = reader->file()->file_name(); + while ((read_success = reader->TryReadRecord(&record, &scratch))) { + VersionEdit edit; + s = edit.DecodeFrom(record); + if (!s.ok()) { + break; + } + auto cfd = column_family_set_->GetColumnFamily(edit.column_family_); + if (active_version_builders_.find(edit.column_family_) == + active_version_builders_.end()) { + std::unique_ptr builder_guard( + new BaseReferencedVersionBuilder(cfd)); + active_version_builders_.insert( + std::make_pair(edit.column_family_, std::move(builder_guard))); + } + s = ApplyOneVersionEditToBuilder( + edit, &have_log_number, &log_number, &have_prev_log_number, + &previous_log_number, &have_next_file, &next_file, + &have_last_sequence, &last_sequence, &min_log_number_to_keep, + &max_column_family); + if (!s.ok()) { + break; + } + if (column_family_set_->get_table_cache()->GetCapacity() == + TableCache::kInfiniteCapacity) { + // Unlimited table cache. Pre-load table handle now so that the table + // files are still accessible to us after the primary unlinks them. + auto builder_iter = active_version_builders_.find(edit.column_family_); + assert(builder_iter != active_version_builders_.end()); + auto builder = builder_iter->second->version_builder(); + assert(builder != nullptr); + s = builder->LoadTableHandlers( + cfd->internal_stats(), db_options_->max_file_opening_threads, + false /* prefetch_index_and_filter_in_cache */, + false /* is_initial_load */, + cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + if (!s.ok() && !s.IsPathNotFound()) { + break; + } else if (s.IsPathNotFound()) { + s = Status::OK(); + // TODO (yanqin) release file descriptors already opened, or modify + // LoadTableHandlers so that opened files are not re-opened. + } else { // s.ok() == true + auto version = new Version(cfd, this, env_options_, + *cfd->GetLatestMutableCFOptions(), + current_version_number_++); + builder->SaveTo(version->storage_info()); + version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true); + AppendVersion(cfd, version); + active_version_builders_.erase(builder_iter); + if (cfds_changed->count(cfd) == 0) { + cfds_changed->insert(cfd); + } + } + } + if (have_next_file) { + next_file_number_.store(next_file + 1); + } + if (have_last_sequence) { + last_allocated_sequence_ = last_sequence; + last_published_sequence_ = last_sequence; + last_sequence_ = last_sequence; + } + if (have_prev_log_number) { + prev_log_number_ = previous_log_number; + MarkFileNumberUsed(previous_log_number); + } + if (have_log_number) { + MarkFileNumberUsed(log_number); + } + column_family_set_->UpdateMaxColumnFamily(max_column_family); + MarkMinLogNumberToKeep2PC(min_log_number_to_keep); + } + if (s.ok() && !read_success) { + // It's possible that we have finished reading the current MANIFEST, and + // the primary has created a new MANIFEST. + log::Reader::Reporter* reporter = reader->GetReporter(); + s = MaybeSwitchManifest(reporter, manifest_reader); + reader = manifest_reader->get(); + } + if (s.ok() && reader->file()->file_name() == old_manifest_path) { + break; + } + } + + if (s.ok()) { + for (auto cfd : *column_family_set_) { + auto builder_iter = active_version_builders_.find(cfd->GetID()); + if (builder_iter == active_version_builders_.end()) { + continue; + } + auto builder = builder_iter->second->version_builder(); + if (!builder->CheckConsistencyForNumLevels()) { + s = Status::InvalidArgument( + "db has more levels than options.num_levels"); + break; + } + } + } + + return s; +} + void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { assert(edit->IsColumnFamilyManipulation()); edit->SetNextFile(next_file_number_.load()); @@ -3315,8 +3444,8 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { } void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, - VersionBuilder* builder, Version* /*v*/, - VersionEdit* edit, InstrumentedMutex* mu) { + VersionBuilder* builder, VersionEdit* edit, + InstrumentedMutex* mu) { #ifdef NDEBUG (void)cfd; #endif @@ -3343,7 +3472,7 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, builder->Apply(edit); } -Status VersionSet::ApplyOneVersionEdit( +Status VersionSet::ApplyOneVersionEditToBuilder( VersionEdit& edit, const std::unordered_map& name_to_options, std::unordered_map& column_families_not_found, @@ -3470,6 +3599,152 @@ Status VersionSet::ApplyOneVersionEdit( return Status::OK(); } +Status VersionSet::ApplyOneVersionEditToBuilder( + VersionEdit& edit, bool* have_log_number, uint64_t* /* log_number */, + bool* have_prev_log_number, uint64_t* previous_log_number, + bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, + SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, + uint32_t* max_column_family) { + ColumnFamilyData* cfd = nullptr; + Status status; + if (edit.is_column_family_add_) { + // TODO (yanqin) for now the secondary ignores column families created + // after Open. This also simplifies handling of switching to a new MANIFEST + // and processing the snapshot of the system at the beginning of the + // MANIFEST. + return Status::OK(); + } else if (edit.is_column_family_drop_) { + // Drop the column family by setting it to be 'dropped' without destroying + // the column family handle. + cfd = column_family_set_->GetColumnFamily(edit.column_family_); + // Drop a CF created after Open? Then ignore + if (cfd == nullptr) { + return Status::OK(); + } + cfd->SetDropped(); + if (cfd->Unref()) { + delete cfd; + cfd = nullptr; + } + } else { + cfd = column_family_set_->GetColumnFamily(edit.column_family_); + // Operation on a CF created after Open? Then ignore + if (cfd == nullptr) { + return Status::OK(); + } + auto builder_iter = active_version_builders_.find(edit.column_family_); + assert(builder_iter != active_version_builders_.end()); + auto builder = builder_iter->second->version_builder(); + assert(builder != nullptr); + builder->Apply(&edit); + } + if (cfd != nullptr) { + if (edit.has_log_number_) { + if (cfd->GetLogNumber() > edit.log_number_) { + // TODO (yanqin) use a separate info log for secondary instance. + } else { + cfd->SetLogNumber(edit.log_number_); + *have_log_number = true; + } + } + if (edit.has_comparator_ && + edit.comparator_ != cfd->user_comparator()->Name()) { + return Status::InvalidArgument( + cfd->user_comparator()->Name(), + "does not match existing comparator " + edit.comparator_); + } + } + + if (edit.has_prev_log_number_) { + *previous_log_number = edit.prev_log_number_; + *have_prev_log_number = true; + } + + if (edit.has_next_file_number_) { + *next_file = edit.next_file_number_; + *have_next_file = true; + } + + if (edit.has_max_column_family_) { + *max_column_family = edit.max_column_family_; + } + + if (edit.has_min_log_number_to_keep_) { + *min_log_number_to_keep = + std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_); + } + + if (edit.has_last_sequence_) { + *last_sequence = edit.last_sequence_; + *have_last_sequence = true; + } + return status; +} + +Status VersionSet::MaybeSwitchManifest( + log::Reader::Reporter* reporter, + std::unique_ptr* manifest_reader) { + assert(manifest_reader != nullptr); + Status s; + do { + std::string manifest_path; + s = GetCurrentManifestPath(&manifest_path); + std::unique_ptr manifest_file; + if (s.ok()) { + if (nullptr == manifest_reader->get() || + manifest_reader->get()->file()->file_name() != manifest_path) { + TEST_SYNC_POINT( + "VersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0"); + TEST_SYNC_POINT( + "VersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:1"); + s = env_->NewSequentialFile( + manifest_path, &manifest_file, + env_->OptimizeForManifestRead(env_options_)); + } else { + // No need to switch manifest. + break; + } + } + std::unique_ptr manifest_file_reader; + if (s.ok()) { + manifest_file_reader.reset( + new SequentialFileReader(std::move(manifest_file), manifest_path)); + // TODO(yanqin) secondary instance needs a separate info log file. + manifest_reader->reset( + new log::Reader(nullptr, std::move(manifest_file_reader), reporter, + true /* checksum */, 0 /* log_number */)); + ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n", + manifest_path.c_str()); + } + } while (s.IsPathNotFound()); + return s; +} + +Status VersionSet::GetCurrentManifestPath(std::string* manifest_path) { + assert(manifest_path != nullptr); + std::string fname; + Status s = ReadFileToString(env_, CurrentFileName(dbname_), &fname); + if (!s.ok()) { + return s; + } + if (fname.empty() || fname.back() != '\n') { + return Status::Corruption("CURRENT file does not end with newline"); + } + // remove the trailing '\n' + fname.resize(fname.size() - 1); + FileType type; + bool parse_ok = ParseFileName(fname, &manifest_file_number_, &type); + if (!parse_ok || type != kDescriptorFile) { + return Status::Corruption("CURRENT file corrupted"); + } + *manifest_path = dbname_; + if (dbname_.back() != '/') { + manifest_path->push_back('/'); + } + *manifest_path += fname; + return Status::OK(); +} + Status VersionSet::Recover( const std::vector& column_families, bool read_only) { @@ -3483,43 +3758,28 @@ Status VersionSet::Recover( std::unordered_map column_families_not_found; // Read "CURRENT" file, which contains a pointer to the current manifest file - std::string manifest_filename; - Status s = ReadFileToString( - env_, CurrentFileName(dbname_), &manifest_filename - ); + std::string manifest_path; + Status s = GetCurrentManifestPath(&manifest_path); if (!s.ok()) { return s; } - if (manifest_filename.empty() || - manifest_filename.back() != '\n') { - return Status::Corruption("CURRENT file does not end with newline"); - } - // remove the trailing '\n' - manifest_filename.resize(manifest_filename.size() - 1); - FileType type; - bool parse_ok = - ParseFileName(manifest_filename, &manifest_file_number_, &type); - if (!parse_ok || type != kDescriptorFile) { - return Status::Corruption("CURRENT file corrupted"); - } ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n", - manifest_filename.c_str()); + manifest_path.c_str()); - manifest_filename = dbname_ + "/" + manifest_filename; std::unique_ptr manifest_file_reader; { std::unique_ptr manifest_file; - s = env_->NewSequentialFile(manifest_filename, &manifest_file, + s = env_->NewSequentialFile(manifest_path, &manifest_file, env_->OptimizeForManifestRead(env_options_)); if (!s.ok()) { return s; } manifest_file_reader.reset( - new SequentialFileReader(std::move(manifest_file), manifest_filename)); + new SequentialFileReader(std::move(manifest_file), manifest_path)); } uint64_t current_manifest_file_size; - s = env_->GetFileSize(manifest_filename, ¤t_manifest_file_size); + s = env_->GetFileSize(manifest_path, ¤t_manifest_file_size); if (!s.ok()) { return s; } @@ -3587,7 +3847,7 @@ Status VersionSet::Recover( TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup", &edit); for (auto& e : replay_buffer) { - s = ApplyOneVersionEdit( + s = ApplyOneVersionEditToBuilder( e, cf_name_to_options, column_families_not_found, builders, &have_log_number, &log_number, &have_prev_log_number, &previous_log_number, &have_next_file, &next_file, @@ -3608,7 +3868,7 @@ Status VersionSet::Recover( s = Status::Corruption("corrupted atomic group"); break; } - s = ApplyOneVersionEdit( + s = ApplyOneVersionEditToBuilder( edit, cf_name_to_options, column_families_not_found, builders, &have_log_number, &log_number, &have_prev_log_number, &previous_log_number, &have_next_file, &next_file, @@ -3715,7 +3975,7 @@ Status VersionSet::Recover( "prev_log_number is %lu," "max_column_family is %u," "min_log_number_to_keep is %lu\n", - manifest_filename.c_str(), (unsigned long)manifest_file_number_, + manifest_path.c_str(), (unsigned long)manifest_file_number_, (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_, (unsigned long)log_number, (unsigned long)prev_log_number_, column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); @@ -3737,6 +3997,179 @@ Status VersionSet::Recover( return s; } +Status VersionSet::RecoverAsSecondary( + const std::vector& column_families, + std::unique_ptr* manifest_reader, + std::unique_ptr* manifest_reporter, + std::unique_ptr* manifest_reader_status) { + assert(manifest_reader != nullptr); + assert(manifest_reporter != nullptr); + assert(manifest_reader_status != nullptr); + + std::unordered_map cf_name_to_options; + for (const auto& cf : column_families) { + cf_name_to_options.insert({cf.name, cf.options}); + } + + // add default column family + auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); + if (default_cf_iter == cf_name_to_options.end()) { + return Status::InvalidArgument("Default column family not specified"); + } + VersionEdit default_cf_edit; + default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); + default_cf_edit.SetColumnFamily(0); + ColumnFamilyData* default_cfd = + CreateColumnFamily(default_cf_iter->second, &default_cf_edit); + // In recovery, nobody else can access it, so it's fine to set it to be + // initialized earlier. + default_cfd->set_initialized(); + + bool have_log_number = false; + bool have_prev_log_number = false; + bool have_next_file = false; + bool have_last_sequence = false; + uint64_t next_file = 0; + uint64_t last_sequence = 0; + uint64_t log_number = 0; + uint64_t previous_log_number = 0; + uint32_t max_column_family = 0; + uint64_t min_log_number_to_keep = 0; + std::unordered_map builders; + std::unordered_map column_families_not_found; + builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)}); + + manifest_reader_status->reset(new Status()); + manifest_reporter->reset(new LogReporter()); + static_cast(manifest_reporter->get())->status = + manifest_reader_status->get(); + Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader); + log::Reader* reader = manifest_reader->get(); + + while (s.ok()) { + assert(reader != nullptr); + Slice record; + std::string scratch; + while (s.ok() && reader->TryReadRecord(&record, &scratch)) { + VersionEdit edit; + s = edit.DecodeFrom(record); + if (!s.ok()) { + break; + } + s = ApplyOneVersionEditToBuilder( + edit, cf_name_to_options, column_families_not_found, builders, + &have_log_number, &log_number, &have_prev_log_number, + &previous_log_number, &have_next_file, &next_file, + &have_last_sequence, &last_sequence, &min_log_number_to_keep, + &max_column_family); + } + if (s.ok()) { + bool enough = have_next_file && have_log_number && have_last_sequence; + if (enough) { + for (const auto& cf : column_families) { + auto cfd = column_family_set_->GetColumnFamily(cf.name); + if (cfd == nullptr) { + enough = false; + break; + } + } + } + if (enough && column_family_set_->get_table_cache()->GetCapacity() == + TableCache::kInfiniteCapacity) { + for (const auto& cf : column_families) { + auto cfd = column_family_set_->GetColumnFamily(cf.name); + assert(cfd != nullptr); + if (!cfd->IsDropped()) { + auto builder_iter = builders.find(cfd->GetID()); + assert(builder_iter != builders.end()); + auto builder = builder_iter->second->version_builder(); + assert(builder != nullptr); + s = builder->LoadTableHandlers( + cfd->internal_stats(), db_options_->max_file_opening_threads, + false /* prefetch_index_and_filter_in_cache */, + false /* is_initial_load */, + cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + if (!s.ok()) { + enough = false; + if (s.IsPathNotFound()) { + s = Status::OK(); + } + break; + } + } + } + if (!enough) { + // TODO (yanqin) release table handlers if any of the files are not + // found. + } + } + if (enough) { + break; + } + } + } + + if (s.ok()) { + if (!have_prev_log_number) { + previous_log_number = 0; + } + column_family_set_->UpdateMaxColumnFamily(max_column_family); + + MarkMinLogNumberToKeep2PC(min_log_number_to_keep); + MarkFileNumberUsed(previous_log_number); + MarkFileNumberUsed(log_number); + + for (auto cfd : *column_family_set_) { + assert(builders.count(cfd->GetID()) > 0); + auto builder = builders[cfd->GetID()]->version_builder(); + if (!builder->CheckConsistencyForNumLevels()) { + s = Status::InvalidArgument( + "db has more levels than options.num_levels"); + break; + } + } + } + + if (s.ok()) { + for (auto cfd : *column_family_set_) { + if (cfd->IsDropped()) { + continue; + } + assert(cfd->initialized()); + auto builders_iter = builders.find(cfd->GetID()); + assert(builders_iter != builders.end()); + auto* builder = builders_iter->second->version_builder(); + + Version* v = new Version(cfd, this, env_options_, + *cfd->GetLatestMutableCFOptions(), + current_version_number_++); + builder->SaveTo(v->storage_info()); + + // Install recovered version + v->PrepareApply(*cfd->GetLatestMutableCFOptions(), + !(db_options_->skip_stats_update_on_db_open)); + AppendVersion(cfd, v); + } + next_file_number_.store(next_file + 1); + last_allocated_sequence_ = last_sequence; + last_published_sequence_ = last_sequence; + last_sequence_ = last_sequence; + prev_log_number_ = previous_log_number; + for (auto cfd : *column_family_set_) { + if (cfd->IsDropped()) { + continue; + } + ROCKS_LOG_INFO(db_options_->info_log, + "Column family [%s] (ID %u), log number is %" PRIu64 "\n", + cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber()); + } + } + for (auto& builder : builders) { + delete builder.second; + } + return s; +} + Status VersionSet::ListColumnFamilies(std::vector* column_families, const std::string& dbname, Env* env) { // these are just for performance reasons, not correcntes, diff --git a/db/version_set.h b/db/version_set.h index b50f653ba436..c4c1b2445b12 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -735,9 +735,7 @@ struct ObsoleteFileInfo { } }; -namespace { class BaseReferencedVersionBuilder; -} class VersionSet { public: @@ -799,12 +797,24 @@ class VersionSet { bool new_descriptor_log = false, const ColumnFamilyOptions* new_cf_options = nullptr); + Status ReadAndApply(InstrumentedMutex* mu, + std::unique_ptr* manifest_reader, + std::unordered_set* cfds_changed); + + Status GetCurrentManifestPath(std::string* manifest_filename); + // Recover the last saved descriptor from persistent storage. // If read_only == true, Recover() will not complain if some column families // are not opened Status Recover(const std::vector& column_families, bool read_only = false); + Status RecoverAsSecondary( + const std::vector& column_families, + std::unique_ptr* manifest_reader, + std::unique_ptr* manifest_reporter, + std::unique_ptr* manifest_reader_status); + // Reads a manifest file and returns a list of column families in // column_families. static Status ListColumnFamilies(std::vector* column_families, @@ -984,6 +994,7 @@ class VersionSet { friend class Version; friend class DBImpl; + friend class DBImplReadOnly; struct LogReporter : public log::Reader::Reporter { Status* status; @@ -1007,7 +1018,8 @@ class VersionSet { ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, VersionEdit* edit); - Status ApplyOneVersionEdit( + // REQUIRES db mutex + Status ApplyOneVersionEditToBuilder( VersionEdit& edit, const std::unordered_map& name_to_opts, std::unordered_map& column_families_not_found, @@ -1017,6 +1029,18 @@ class VersionSet { bool* have_last_sequence, SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, uint32_t* max_column_family); + // REQUIRES db mutex + Status ApplyOneVersionEditToBuilder( + VersionEdit& edit, bool* have_log_number, uint64_t* log_number, + bool* have_prev_log_number, uint64_t* previous_log_number, + bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, + SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, + uint32_t* max_column_family); + + Status MaybeSwitchManifest(log::Reader::Reporter* reporter, + std::unique_ptr* manifest_reader); + + // REQUIRES db mutex at beginning. may release and re-acquire db mutex Status ProcessManifestWrites(std::deque& writers, InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log, @@ -1070,12 +1094,15 @@ class VersionSet { // env options for all reads and writes except compactions EnvOptions env_options_; + std::unordered_map> + active_version_builders_; + // No copying allowed VersionSet(const VersionSet&); void operator=(const VersionSet&); void LogAndApplyCFHelper(VersionEdit* edit); - void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, Version* v, + void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, VersionEdit* edit, InstrumentedMutex* mu); }; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 53fb52c9494a..b1adef377fb1 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -155,6 +155,15 @@ class DB { std::vector* handles, DB** dbptr, bool error_if_log_file_exist = false); + static Status OpenAsSecondary(const Options& options, const std::string& name, + const std::string& secondary_name, DB** dbptr); + + static Status OpenAsSecondary( + const DBOptions& db_options, const std::string& name, + const std::string& secondary_name, + const std::vector& column_families, + std::vector* handles, DB** dbptr); + // Open DB with column families. // db_options specify database specific options // column_families is the vector of all column families in the database, diff --git a/src.mk b/src.mk index 8b3ab68d8ffc..73a0c93946cc 100644 --- a/src.mk +++ b/src.mk @@ -22,6 +22,7 @@ LIB_SOURCES = \ db/db_impl_files.cc \ db/db_impl_open.cc \ db/db_impl_readonly.cc \ + db/db_impl_secondary.cc \ db/db_impl_write.cc \ db/db_info_dumper.cc \ db/db_iter.cc \