diff --git a/CMakeLists.txt b/CMakeLists.txt index b38ab3d931fd..34ca88ddb9ef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -489,6 +489,7 @@ set(SOURCES db/db_impl_debug.cc db/db_impl_experimental.cc db/db_impl_readonly.cc + db/db_impl_secondary.cc db/db_info_dumper.cc db/db_iter.cc db/dbformat.cc @@ -874,6 +875,7 @@ if(WITH_TESTS) db/db_options_test.cc db/db_properties_test.cc db/db_range_del_test.cc + db/db_secondary_test.cc db/db_sst_test.cc db/db_statistics_test.cc db/db_table_properties_test.cc diff --git a/HISTORY.md b/HISTORY.md index 4ea7b9ef28bb..2e441967bbc9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * RocksDB may choose to preopen some files even if options.max_open_files != -1. This may make DB open slightly longer. * For users of dictionary compression with ZSTD v0.7.0+, we now reuse the same digested dictionary when compressing each of an SST file's data blocks for faster compression speeds. * For all users of dictionary compression who set `cache_index_and_filter_blocks == true`, we now store dictionary data used for decompression in the block cache for better control over memory usage. For users of ZSTD v1.1.4+ who compile with -DZSTD_STATIC_LINKING_ONLY, this includes a digested dictionary, which is used to increase decompression speed. +* Introduce a new IOError subcode, PathNotFound, to indicate trying to open a nonexistent file or directory for read. ### Public API Change * CompactionPri = kMinOverlappingRatio also uses compensated file size, which boosts file with lots of tombstones to be compacted first. diff --git a/Makefile b/Makefile index 544c45773881..d2294fd72763 100644 --- a/Makefile +++ b/Makefile @@ -443,6 +443,7 @@ TESTS = \ db_merge_operator_test \ db_options_test \ db_range_del_test \ + db_secondary_test \ db_sst_test \ db_tailing_iter_test \ db_io_failure_test \ @@ -548,6 +549,7 @@ TESTS = \ range_tombstone_fragmenter_test \ range_del_aggregator_test \ sst_file_reader_test \ + db_secondary_test \ PARALLEL_TEST = \ backupable_db_test \ @@ -1558,6 +1560,9 @@ range_tombstone_fragmenter_test: db/range_tombstone_fragmenter_test.o db/db_test sst_file_reader_test: table/sst_file_reader_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_secondary_test: db/db_secondary_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + #------------------------------------------------- # make install related stuff INSTALL_PATH ?= /usr/local diff --git a/TARGETS b/TARGETS index ee0c1774a783..4293fda38ecb 100644 --- a/TARGETS +++ b/TARGETS @@ -98,6 +98,7 @@ cpp_library( "db/db_impl_files.cc", "db/db_impl_open.cc", "db/db_impl_readonly.cc", + "db/db_impl_secondary.cc", "db/db_impl_write.cc", "db/db_info_dumper.cc", "db/db_iter.cc", diff --git a/db/db_impl.h b/db/db_impl.h index 4b663cf23885..815d1d6fdd06 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -1176,6 +1176,8 @@ class DBImpl : public DB { // and log_empty_. Refer to the definition of each variable below for more // details. InstrumentedMutex log_write_mutex_; + + protected: // State below is protected by mutex_ // With two_write_queues enabled, some of the variables that accessed during // WriteToWAL need different synchronization: log_empty_, alive_log_files_, @@ -1183,6 +1185,7 @@ class DBImpl : public DB { // more description. mutable InstrumentedMutex mutex_; + private: std::atomic shutting_down_; // This condition variable is signaled on these conditions: // * whenever bg_compaction_scheduled_ goes down to 0 @@ -1213,8 +1216,12 @@ class DBImpl : public DB { // read and writes are protected by log_write_mutex_ instead. This is to avoid // expesnive mutex_ lock during WAL write, which update log_empty_. bool log_empty_; + + protected: ColumnFamilyHandleImpl* default_cf_handle_; InternalStats* default_cf_internal_stats_; + + private: std::unique_ptr column_family_memtables_; struct LogFileNumberSize { explicit LogFileNumberSize(uint64_t _number) @@ -1281,12 +1288,16 @@ class DBImpl : public DB { WriteBatch cached_recoverable_state_; std::atomic cached_recoverable_state_empty_ = {true}; std::atomic total_log_size_; + + protected: // only used for dynamically adjusting max_total_wal_size. it is a sum of // [write_buffer_size * max_write_buffer_number] over all column families uint64_t max_total_in_memory_state_; // If true, we have only one (default) column family. We use this to optimize // some code-paths bool single_column_family_mode_; + + private: // If this is non-empty, we need to delete these log files in background // threads. Protected by db mutex. autovector logs_to_free_; @@ -1499,12 +1510,14 @@ class DBImpl : public DB { std::string db_absolute_path_; + protected: // The options to access storage files const EnvOptions env_options_; // Additonal options for compaction and flush EnvOptions env_options_for_compaction_; + private: // Number of running IngestExternalFile() calls. // REQUIRES: mutex held int num_running_ingest_file_; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 51c9fb7ca67e..e6e241b179a1 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -405,7 +405,6 @@ Status DBImpl::Recover( } if (s.ok()) { - SequenceNumber next_sequence(kMaxSequenceNumber); default_cf_handle_ = new ColumnFamilyHandleImpl( versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); @@ -468,6 +467,7 @@ Status DBImpl::Recover( if (!logs.empty()) { // Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); + SequenceNumber next_sequence(kMaxSequenceNumber); s = RecoverLogFiles(logs, &next_sequence, read_only); if (!s.ok()) { // Clear memtables if recovery failed @@ -629,8 +629,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // to be skipped instead of propagating bad information (like overly // large sequence numbers). log::Reader reader(immutable_db_options_.info_log, std::move(file_reader), - &reporter, true /*checksum*/, log_number, - false /* retry_after_eof */); + &reporter, true /*checksum*/, log_number); // Determine if we should tolerate incomplete records at the tail end of the // Read all the records and add to a memtable diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index bd7099f00d02..0c0a8482b599 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -159,7 +159,6 @@ Status DB::OpenForReadOnly( *dbptr = nullptr; handles->clear(); - SuperVersionContext sv_context(/* create_superversion */ true); DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname); impl->mutex_.Lock(); Status s = impl->Recover(column_families, true /* read only */, @@ -176,6 +175,7 @@ Status DB::OpenForReadOnly( handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); } } + SuperVersionContext sv_context(/* create_superversion */ true); if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { sv_context.NewSuperVersion(); diff --git a/db/db_impl_secondary.cc b/db/db_impl_secondary.cc new file mode 100644 index 000000000000..205339a00a2b --- /dev/null +++ b/db/db_impl_secondary.cc @@ -0,0 +1,368 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_impl_secondary.h" +#include "db/db_impl.h" +#include "db/db_iter.h" +#include "db/forward_iterator.h" +#include "db/merge_context.h" +#include "db/range_del_aggregator.h" +#include "monitoring/perf_context_imp.h" +#include "util/auto_roll_logger.h" + +namespace rocksdb { + +#ifndef ROCKSDB_LITE + +DBImplSecondary::DBImplSecondary(const DBOptions& db_options, + const std::string& dbname) + : DBImpl(db_options, dbname) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Opening the db in secondary mode"); + LogFlush(immutable_db_options_.info_log); +} + +DBImplSecondary::~DBImplSecondary() {} + +Status DBImplSecondary::Recover( + const std::vector& column_families) { + mutex_.AssertHeld(); + + Status s; + s = versions_->RecoverAsSecondary(column_families, &manifest_reader_, + &manifest_reporter_, + &manifest_reader_status_); + if (!s.ok()) { + return s; + } + if (immutable_db_options_.paranoid_checks && s.ok()) { + s = CheckConsistency(); + } + // Initial max_total_in_memory_state_ before recovery logs. Log recovery + // may check this value to decide whether to flush. + max_total_in_memory_state_ = 0; + for (auto cfd : *versions_->GetColumnFamilySet()) { + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * + mutable_cf_options->max_write_buffer_number; + } + if (s.ok()) { + default_cf_handle_ = new ColumnFamilyHandleImpl( + versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); + default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); + single_column_family_mode_ = + versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1; + } + + // TODO: attempt to recover from WAL files. + return s; +} + +// Implementation of the DB interface +Status DBImplSecondary::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) { + return GetImpl(read_options, column_family, key, value); +} + +Status DBImplSecondary::GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* pinnable_val) { + assert(pinnable_val != nullptr); + PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_); + StopWatch sw(env_, stats_, DB_GET); + PERF_TIMER_GUARD(get_snapshot_time); + + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + tracer_->Get(column_family, key); + } + } + // Acquire SuperVersion + SuperVersion* super_version = GetAndRefSuperVersion(cfd); + SequenceNumber snapshot = versions_->LastSequence(); + ; + MergeContext merge_context; + SequenceNumber max_covering_tombstone_seq = 0; + Status s; + LookupKey lkey(key, snapshot); + PERF_TIMER_STOP(get_snapshot_time); + + bool done = false; + if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + super_version->imm->Get( + lkey, pinnable_val->GetSelf(), &s, &merge_context, + &max_covering_tombstone_seq, read_options)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } + if (!done && !s.ok() && !s.IsMergeInProgress()) { + ReturnAndCleanupSuperVersion(cfd, super_version); + return s; + } + if (!done) { + PERF_TIMER_GUARD(get_from_output_files_time); + super_version->current->Get(read_options, lkey, pinnable_val, &s, + &merge_context, &max_covering_tombstone_seq); + RecordTick(stats_, MEMTABLE_MISS); + } + { + PERF_TIMER_GUARD(get_post_process_time); + ReturnAndCleanupSuperVersion(cfd, super_version); + RecordTick(stats_, NUMBER_KEYS_READ); + size_t size = pinnable_val->size(); + RecordTick(stats_, BYTES_READ, size); + MeasureTime(stats_, BYTES_PER_READ, size); + PERF_COUNTER_ADD(get_read_bytes, size); + } + return s; +} + +Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) { + if (read_options.managed) { + return NewErrorIterator( + Status::NotSupported("Managed iterator is not supported anymore.")); + } + if (read_options.read_tier == kPersistedTier) { + return NewErrorIterator(Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators.")); + } + Iterator* result = nullptr; + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + ReadCallback* read_callback = nullptr; // No read callback provided. + if (read_options.tailing) { + SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + auto iter = new ForwardIterator(this, read_options, cfd, super_version); + result = NewDBIterator( + env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, + cfd->user_comparator(), iter, kMaxSequenceNumber, + super_version->mutable_cf_options.max_sequential_skip_in_iterations, + read_callback, this, cfd); + } else { + auto snapshot = read_options.snapshot != nullptr + ? read_options.snapshot->GetSequenceNumber() + : versions_->LastSequence(); + result = NewIteratorImpl(read_options, cfd, snapshot, read_callback); + } + return result; +} + +ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( + const ReadOptions& read_options, ColumnFamilyData* cfd, + SequenceNumber snapshot, ReadCallback* read_callback) { + SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + auto db_iter = NewArenaWrappedDbIterator( + env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, + snapshot, + super_version->mutable_cf_options.max_sequential_skip_in_iterations, + super_version->version_number, read_callback); + auto internal_iter = + NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(), + db_iter->GetRangeDelAggregator(), snapshot); + db_iter->SetIterUnderDBIter(internal_iter); + return db_iter; +} + +Status DBImplSecondary::NewIterators( + const ReadOptions& read_options, + const std::vector& column_families, + std::vector* iterators) { + if (read_options.managed) { + return Status::NotSupported("Managed iterator is not supported anymore."); + } + if (read_options.read_tier == kPersistedTier) { + return Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators."); + } + ReadCallback* read_callback = nullptr; // No read callback provided. + if (iterators == nullptr) { + return Status::InvalidArgument("iterators not allowed to be nullptr"); + } + iterators->clear(); + iterators->reserve(column_families.size()); + if (read_options.tailing) { + for (auto cfh : column_families) { + auto cfd = reinterpret_cast(cfh)->cfd(); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_); + auto iter = new ForwardIterator(this, read_options, cfd, super_version); + iterators->push_back(NewDBIterator( + env_, read_options, *cfd->ioptions(), + super_version->mutable_cf_options, cfd->user_comparator(), iter, + kMaxSequenceNumber, + super_version->mutable_cf_options.max_sequential_skip_in_iterations, + read_callback, this, cfd)); + } + } else { + SequenceNumber latest_snapshot = versions_->LastSequence(); + SequenceNumber read_seq = + read_options.snapshot != nullptr + ? reinterpret_cast(read_options.snapshot) + ->number_ + : latest_snapshot; + + for (auto cfh : column_families) { + auto* cfd = reinterpret_cast(cfh)->cfd(); + iterators->push_back( + NewIteratorImpl(read_options, cfd, read_seq, read_callback)); + } + } + + return Status::OK(); +} + +Status DBImplSecondary::TryCatchUpWithPrimary() { + assert(versions_.get() != nullptr); + assert(manifest_reader_.get() != nullptr); + Status s; + std::unordered_set cfds_changed; + InstrumentedMutexLock lock_guard(mutex()); + s = versions_->ReadAndApply(mutex(), &manifest_reader_, &cfds_changed); + if (s.ok()) { + SuperVersionContext sv_context(true /* create_superversion */); + for (auto cfd : cfds_changed) { + sv_context.NewSuperVersion(); + cfd->InstallSuperVersion(&sv_context, mutex()); + } + sv_context.Clean(); + } + return s; +} + +Status DB::OpenAsSecondary(const Options& options, const std::string& dbname, + const std::string& secondary_dbname, DB** dbptr) { + *dbptr = nullptr; + + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.emplace_back(kDefaultColumnFamilyName, cf_options); + std::vector handles; + + Status s = DB::OpenAsSecondary(db_options, dbname, secondary_dbname, + column_families, &handles, dbptr); + if (s.ok()) { + assert(handles.size() == 1); + delete handles[0]; + } + return s; +} + +Status DB::OpenAsSecondary( + const DBOptions& db_options, const std::string& dbname, + const std::string& secondary_dbname, + const std::vector& column_families, + std::vector* handles, DB** dbptr) { + *dbptr = nullptr; + if (db_options.max_open_files != -1) { + // TODO (yanqin) maybe support max_open_files != -1 by creating hard links + // on SST files so that db secondary can still have access to old SSTs + // while primary instance may delete original. + return Status::InvalidArgument("require max_open_files to be -1"); + } + + DBOptions tmp_opts(db_options); + if (nullptr == tmp_opts.info_log) { + Env* env = tmp_opts.env; + assert(env != nullptr); + std::string secondary_db_abs_path; + env->GetAbsolutePath(secondary_dbname, &secondary_db_abs_path); + std::string fname = InfoLogFileName(secondary_dbname, secondary_db_abs_path, + tmp_opts.db_log_dir); + + env->CreateDirIfMissing(secondary_dbname); + if (tmp_opts.log_file_time_to_roll > 0 || tmp_opts.max_log_file_size > 0) { + AutoRollLogger* result = new AutoRollLogger( + env, secondary_dbname, tmp_opts.db_log_dir, + tmp_opts.max_log_file_size, tmp_opts.log_file_time_to_roll, + tmp_opts.info_log_level); + Status s = result->GetStatus(); + if (!s.ok()) { + delete result; + } else { + tmp_opts.info_log.reset(result); + } + } + if (nullptr == tmp_opts.info_log) { + env->RenameFile(fname, OldInfoLogFileName( + secondary_dbname, env->NowMicros(), + secondary_db_abs_path, tmp_opts.db_log_dir)); + Status s = env->NewLogger(fname, &(tmp_opts.info_log)); + if (tmp_opts.info_log != nullptr) { + tmp_opts.info_log->SetInfoLogLevel(tmp_opts.info_log_level); + } + } + } + + assert(tmp_opts.info_log != nullptr); + + handles->clear(); + DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname); + impl->mutex_.Lock(); + Status s = impl->Recover(column_families); + if (s.ok()) { + for (auto cf : column_families) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (nullptr == cfd) { + s = Status::InvalidArgument("Column family not found: ", cf.name); + break; + } + handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); + } + } + SuperVersionContext sv_context(true /* create_superversion */); + if (s.ok()) { + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + sv_context.NewSuperVersion(); + cfd->InstallSuperVersion(&sv_context, &impl->mutex_); + } + } + impl->mutex_.Unlock(); + sv_context.Clean(); + if (s.ok()) { + *dbptr = impl; + for (auto h : *handles) { + impl->NewThreadStatusCfInfo( + reinterpret_cast(h)->cfd()); + } + } else { + for (auto h : *handles) { + delete h; + } + handles->clear(); + delete impl; + } + return s; +} +#else // !ROCKSDB_LITE + +Status DB::OpenAsSecondary(const Options& /*options*/, + const std::string& /*name*/, + const std::string& /*secondary_name*/, + DB** /*dbptr*/) { + return Status::NotSupported("Not supported in ROCKSDB_LITE."); +} + +Status DB::OpenAsSecondary( + const DBOptions& /*db_options*/, const std::string& /*dbname*/, + const std::string& /*secondary_name*/, + const std::vector& /*column_families*/, + std::vector* /*handles*/, DB** /*dbptr*/) { + return Status::NotSupported("Not supported in ROCKSDB_LITE."); +} +#endif // !ROCKSDB_LITE + +} // namespace rocksdb diff --git a/db/db_impl_secondary.h b/db/db_impl_secondary.h new file mode 100644 index 000000000000..6b7570c414d8 --- /dev/null +++ b/db/db_impl_secondary.h @@ -0,0 +1,140 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include "db/db_impl.h" + +namespace rocksdb { + +class DBImplSecondary : public DBImpl { + public: + DBImplSecondary(const DBOptions& options, const std::string& dbname); + virtual ~DBImplSecondary(); + + Status Recover(const std::vector& column_families); + + // Implementations of the DB interface + using DB::Get; + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) override; + + Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value); + + using DBImpl::NewIterator; + virtual Iterator* NewIterator(const ReadOptions&, + ColumnFamilyHandle* column_family) override; + + ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options, + ColumnFamilyData* cfd, + SequenceNumber snapshot, + ReadCallback* read_callback); + + virtual Status NewIterators( + const ReadOptions& options, + const std::vector& column_families, + std::vector* iterators) override; + + using DBImpl::Put; + virtual Status Put(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/, const Slice& /*value*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::Merge; + virtual Status Merge(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/, const Slice& /*value*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::Delete; + virtual Status Delete(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::SingleDelete; + virtual Status SingleDelete(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + virtual Status Write(const WriteOptions& /*options*/, + WriteBatch* /*updates*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::CompactRange; + virtual Status CompactRange(const CompactRangeOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice* /*begin*/, + const Slice* /*end*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::CompactFiles; + virtual Status CompactFiles( + const CompactionOptions& /*compact_options*/, + ColumnFamilyHandle* /*column_family*/, + const std::vector& /*input_file_names*/, + const int /*output_level*/, const int /*output_path_id*/ = -1, + std::vector* const /*output_file_names*/ = nullptr, + CompactionJobInfo* /*compaction_job_info*/ = nullptr) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + virtual Status DisableFileDeletions() override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + virtual Status EnableFileDeletions(bool /*force*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + virtual Status GetLiveFiles(std::vector&, + uint64_t* /*manifest_file_size*/, + bool /*flush_memtable*/ = true) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::Flush; + virtual Status Flush(const FlushOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DBImpl::SyncWAL; + virtual Status SyncWAL() override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + using DB::IngestExternalFile; + virtual Status IngestExternalFile( + ColumnFamilyHandle* /*column_family*/, + const std::vector& /*external_files*/, + const IngestExternalFileOptions& /*ingestion_options*/) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + + Status TryCatchUpWithPrimary(); + + private: + friend class DB; + + // No copying allowed + DBImplSecondary(const DBImplSecondary&); + void operator=(const DBImplSecondary&); + + std::unique_ptr manifest_reader_; + std::unique_ptr manifest_reporter_; + std::unique_ptr manifest_reader_status_; +}; +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/db/db_secondary_test.cc b/db/db_secondary_test.cc new file mode 100644 index 000000000000..d960247d68c0 --- /dev/null +++ b/db/db_secondary_test.cc @@ -0,0 +1,294 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/db_impl_secondary.h" +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "util/fault_injection_test_env.h" +#include "util/sync_point.h" + +namespace rocksdb { + +#ifndef ROCKSDB_LITE +class DBSecondaryTest : public DBTestBase { + public: + DBSecondaryTest() : DBTestBase("/db_secondary_test"), secondary_dbname_() { + secondary_dbname_ = + test::PerThreadDBPath(env_, "/db_secondary_test_secondary"); + } + + ~DBSecondaryTest() { + if (getenv("KEEP_DB") != nullptr) { + fprintf(stdout, "Secondary DB is still at %s\n", + secondary_dbname_.c_str()); + } else { + Options options; + options.env = env_; + EXPECT_OK(DestroyDB(secondary_dbname_, options)); + } + } + + protected: + Status ReopenAsSecondary(const Options& options) { + return DB::OpenAsSecondary(options, dbname_, secondary_dbname_, &db_); + } + + std::string secondary_dbname_; +}; + +TEST_F(DBSecondaryTest, ReopenAsSecondary) { + Options options; + options.env = env_; + Reopen(options); + ASSERT_OK(Put("foo", "foo_value")); + ASSERT_OK(Put("bar", "bar_value")); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + Close(); + + ASSERT_OK(ReopenAsSecondary(options)); + ASSERT_EQ("foo_value", Get("foo")); + ASSERT_EQ("bar_value", Get("bar")); + ReadOptions ropts; + ropts.verify_checksums = true; + auto db1 = static_cast(db_); + ASSERT_NE(nullptr, db1); + Iterator* iter = db1->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + if (0 == count) { + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value", iter->value().ToString()); + } else if (1 == count) { + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value", iter->value().ToString()); + } + ++count; + } + delete iter; + ASSERT_EQ(2, count); + Close(); +} + +TEST_F(DBSecondaryTest, OpenAsSecondary) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i < 3; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(Flush()); + } + DB* db_secondary = nullptr; + Options options1; + options1.env = env_; + options1.max_open_files = -1; + Status s = + DB::OpenAsSecondary(options1, dbname_, secondary_dbname_, &db_secondary); + ASSERT_OK(s); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ReadOptions ropts; + ropts.verify_checksums = true; + const auto verify_db_func = [&](const std::string& foo_val, + const std::string& bar_val) { + std::string value; + ASSERT_OK(db_secondary->Get(ropts, "foo", &value)); + ASSERT_EQ(foo_val, value); + ASSERT_OK(db_secondary->Get(ropts, "bar", &value)); + ASSERT_EQ(bar_val, value); + Iterator* iter = db_secondary->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ(foo_val, iter->value().ToString()); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ(bar_val, iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; + }; + + verify_db_func("foo_value2", "bar_value2"); + + ASSERT_OK(Put("foo", "new_foo_value")); + ASSERT_OK(Put("bar", "new_bar_value")); + ASSERT_OK(Flush()); + + ASSERT_OK( + static_cast(db_secondary)->TryCatchUpWithPrimary()); + verify_db_func("new_foo_value", "new_bar_value"); + + delete db_secondary; + Close(); +} + +TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) { + Options options; + options.env = env_; + Reopen(options); + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->LoadDependency( + {{"VersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0", + "VersionSet::ProcessManifestWrites:BeforeNewManifest"}, + {"VersionSet::ProcessManifestWrites:AfterNewManifest", + "VersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + // Make sure db calls RecoverLogFiles so as to trigger a manifest write, + // which causes the db to switch to a new MANIFEST upon start. + port::Thread ro_db_thread([&]() { + DB* db_secondary = nullptr; + Options options1; + options1.env = env_; + options1.max_open_files = -1; + Status s = DB::OpenAsSecondary(options1, dbname_, secondary_dbname_, + &db_secondary); + ASSERT_OK(s); + delete db_secondary; + }); + Reopen(options); + ro_db_thread.join(); + Close(); +} + +TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + } + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + DB* db1 = nullptr; + Options options1; + options1.env = env_; + options1.max_open_files = -1; + Status s = DB::OpenAsSecondary(options1, dbname_, secondary_dbname_, &db1); + ASSERT_OK(s); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_OK(db1->Get(ropts, "foo", &value)); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + ASSERT_OK(db1->Get(ropts, "bar", &value)); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + Iterator* iter = db1->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; + delete db1; + Close(); +} + +TEST_F(DBSecondaryTest, MissingTableFile) { + Options options; + options.env = env_; + options.level0_file_num_compaction_trigger = 4; + Reopen(options); + + DB* db1 = nullptr; + Options options1; + options1.env = env_; + options1.max_open_files = -1; + Status s = DB::OpenAsSecondary(options1, dbname_, secondary_dbname_, &db1); + ASSERT_OK(s); + + for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) { + ASSERT_OK(Put("foo", "foo_value" + std::to_string(i))); + ASSERT_OK(Put("bar", "bar_value" + std::to_string(i))); + ASSERT_OK(dbfull()->Flush(FlushOptions())); + } + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + auto db_secondary = static_cast(db1); + ASSERT_NE(nullptr, db_secondary); + ReadOptions ropts; + ropts.verify_checksums = true; + std::string value; + ASSERT_NOK(db_secondary->Get(ropts, "foo", &value)); + ASSERT_NOK(db_secondary->Get(ropts, "bar", &value)); + + ASSERT_OK(db_secondary->TryCatchUpWithPrimary()); + ASSERT_OK(db_secondary->Get(ropts, "foo", &value)); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + ASSERT_OK(db_secondary->Get(ropts, "bar", &value)); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + value); + Iterator* iter = db1->NewIterator(ropts); + ASSERT_NE(nullptr, iter); + iter->Seek("bar"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("bar", iter->key().ToString()); + ASSERT_EQ("bar_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + iter->Seek("foo"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("foo", iter->key().ToString()); + ASSERT_EQ("foo_value" + + std::to_string(options.level0_file_num_compaction_trigger - 1), + iter->value().ToString()); + size_t count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++count; + } + ASSERT_EQ(2, count); + delete iter; + delete db1; + Close(); +} +#endif //! ROCKSDB_LITE + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/log_reader.cc b/db/log_reader.cc index 2c57cde5d593..237fd1929486 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -24,8 +24,7 @@ Reader::Reporter::~Reporter() { Reader::Reader(std::shared_ptr info_log, std::unique_ptr&& _file, - Reporter* reporter, bool checksum, uint64_t log_num, - bool retry_after_eof) + Reporter* reporter, bool checksum, uint64_t log_num) : info_log_(info_log), file_(std::move(_file)), reporter_(reporter), @@ -39,7 +38,8 @@ Reader::Reader(std::shared_ptr info_log, end_of_buffer_offset_(0), log_number_(log_num), recycled_(false), - retry_after_eof_(retry_after_eof) {} + fragments_(), + in_fragmented_record_(false) {} Reader::~Reader() { delete[] backing_store_; @@ -199,6 +199,118 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, return false; } +// return true if a complete record has been read successfully. +bool Reader::TryReadRecord(Slice* record, std::string* scratch) { + assert(record != nullptr); + assert(scratch != nullptr); + record->clear(); + scratch->clear(); + + uint64_t prospective_record_offset = 0; + uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); + size_t drop_size = 0; + unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy + Slice fragment; + while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) { + switch (fragment_type_or_err) { + case kFullType: + case kRecyclableFullType: + if (in_fragmented_record_ && !fragments_.empty()) { + ReportCorruption(fragments_.size(), "partial record without end(1)"); + } + fragments_.clear(); + *record = fragment; + prospective_record_offset = physical_record_offset; + last_record_offset_ = prospective_record_offset; + in_fragmented_record_ = false; + return true; + + case kFirstType: + case kRecyclableFirstType: + if (in_fragmented_record_ && !fragments_.empty()) { + ReportCorruption(fragments_.size(), "partial record without end(2)"); + } + prospective_record_offset = physical_record_offset; + fragments_.assign(fragment.data(), fragment.size()); + in_fragmented_record_ = true; + break; + + case kMiddleType: + case kRecyclableMiddleType: + if (!in_fragmented_record_) { + ReportCorruption(fragment.size(), + "missing start of fragmented record(1)"); + } else { + fragments_.append(fragment.data(), fragment.size()); + } + break; + + case kLastType: + case kRecyclableLastType: + if (!in_fragmented_record_) { + ReportCorruption(fragment.size(), + "missing start of fragmented record(2)"); + } else { + fragments_.append(fragment.data(), fragment.size()); + scratch->assign(fragments_.data(), fragments_.size()); + fragments_.clear(); + *record = Slice(*scratch); + last_record_offset_ = prospective_record_offset; + in_fragmented_record_ = false; + return true; + } + break; + + case kBadHeader: + case kEof: + case kOldRecord: + if (in_fragmented_record_) { + fragments_.clear(); + } + return false; + + case kBadRecord: + if (in_fragmented_record_) { + ReportCorruption(fragments_.size(), "error in middle of record"); + in_fragmented_record_ = false; + fragments_.clear(); + } + break; + + case kBadRecordLen: + case kBadRecordChecksum: + if (recycled_) { + fragments_.clear(); + return false; + } + if (fragment_type_or_err == kBadRecordLen) { + ReportCorruption(drop_size, "bad record length"); + } else { + ReportCorruption(drop_size, "checksum mismatch"); + } + if (in_fragmented_record_) { + ReportCorruption(fragments_.size(), "error in middle of record"); + in_fragmented_record_ = false; + fragments_.clear(); + } + break; + + default: { + char buf[40]; + snprintf(buf, sizeof(buf), "unknown record type %u", + fragment_type_or_err); + ReportCorruption( + fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0), + buf); + in_fragmented_record_ = false; + fragments_.clear(); + break; + } + } + } + return false; +} + uint64_t Reader::LastRecordOffset() { return last_record_offset_; } @@ -207,14 +319,22 @@ void Reader::UnmarkEOF() { if (read_error_) { return; } - eof_ = false; + if (eof_offset_ == 0) { + return; + } + UnmarkEOFInternal(); +} - // If retry_after_eof_ is true, we have to proceed to read anyway. - if (!retry_after_eof_ && eof_offset_ == 0) { +void Reader::ForceUnmarkEOF() { + if (read_error_) { return; } + eof_ = false; + UnmarkEOFInternal(); +} +void Reader::UnmarkEOFInternal() { // If the EOF was in the middle of a block (a partial block was read) we have // to read the rest of the block as ReadPhysicalRecord can only read full // blocks and expects the file position indicator to be aligned to the start @@ -292,12 +412,8 @@ bool Reader::ReadMore(size_t* drop_size, int *error) { } else if (buffer_.size() < static_cast(kBlockSize)) { eof_ = true; eof_offset_ = buffer_.size(); - TEST_SYNC_POINT("LogReader::ReadMore:FirstEOF"); } return true; - } else if (retry_after_eof_ && !read_error_) { - UnmarkEOF(); - return !read_error_; } else { // Note that if buffer_ is non-empty, we have a truncated header at the // end of the file, which can be caused by the writer crashing in the @@ -355,24 +471,16 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { } } if (header_size + length > buffer_.size()) { - if (!retry_after_eof_) { - *drop_size = buffer_.size(); - buffer_.clear(); - if (!eof_) { - return kBadRecordLen; - } - // If the end of the file has been reached without reading |length| - // bytes of payload, assume the writer died in the middle of writing the - // record. Don't report a corruption unless requested. - if (*drop_size) { - return kBadHeader; - } - } else { - int r = kEof; - if (!ReadMore(drop_size, &r)) { - return r; - } - continue; + *drop_size = buffer_.size(); + buffer_.clear(); + if (!eof_) { + return kBadRecordLen; + } + // If the end of the file has been reached without reading |length| + // bytes of payload, assume the writer died in the middle of writing the + // record. Don't report a corruption unless requested. + if (*drop_size) { + return kBadHeader; } return kEof; } @@ -409,5 +517,123 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { } } +bool Reader::TryReadMore(size_t* drop_size, int* error) { + if (!eof_ && !read_error_) { + // Last read was a full read, so this is a trailer to skip + buffer_.clear(); + Status status = file_->Read(kBlockSize, &buffer_, backing_store_); + end_of_buffer_offset_ += buffer_.size(); + if (!status.ok()) { + buffer_.clear(); + ReportDrop(kBlockSize, status); + read_error_ = true; + *error = kEof; + return false; + } else if (buffer_.size() < static_cast(kBlockSize)) { + eof_ = true; + eof_offset_ = buffer_.size(); + TEST_SYNC_POINT_CALLBACK("LogReader::TryReadMore:FirstEOF", nullptr); + } + return true; + } else if (!read_error_) { + ForceUnmarkEOF(); + return !read_error_; + } else { + // Note that if buffer_ is non-empty, we have a truncated header at the + // end of the file, which can be caused by the writer crashing in the + // middle of writing the header. Unless explicitly requested we don't + // considering this an error, just report EOF. + if (buffer_.size()) { + *drop_size = buffer_.size(); + buffer_.clear(); + *error = kBadHeader; + return false; + } + buffer_.clear(); + *error = kEof; + return false; + } +} + +// return true if the caller should process the fragment_type_or_err. +bool Reader::TryReadFragment(Slice* fragment, size_t* drop_size, + unsigned int* fragment_type_or_err) { + assert(fragment != nullptr); + assert(drop_size != nullptr); + assert(fragment_type_or_err != nullptr); + + while (buffer_.size() < static_cast(kHeaderSize)) { + size_t old_size = buffer_.size(); + int error = kEof; + if (!TryReadMore(drop_size, &error)) { + *fragment_type_or_err = error; + return false; + } else if (old_size == buffer_.size()) { + return false; + } + } + const char* header = buffer_.data(); + const uint32_t a = static_cast(header[4]) & 0xff; + const uint32_t b = static_cast(header[5]) & 0xff; + const unsigned int type = header[6]; + const uint32_t length = a | (b << 8); + int header_size = kHeaderSize; + if (type >= kRecyclableFullType && type <= kRecyclableLastType) { + if (end_of_buffer_offset_ - buffer_.size() == 0) { + recycled_ = true; + } + header_size = kRecyclableHeaderSize; + while (buffer_.size() < static_cast(kRecyclableHeaderSize)) { + size_t old_size = buffer_.size(); + int error = kEof; + if (!TryReadMore(drop_size, &error)) { + *fragment_type_or_err = error; + return false; + } else if (old_size == buffer_.size()) { + return false; + } + } + const uint32_t log_num = DecodeFixed32(header + 7); + if (log_num != log_number_) { + *fragment_type_or_err = kOldRecord; + return true; + } + } + + while (header_size + length > buffer_.size()) { + size_t old_size = buffer_.size(); + int error = kEof; + if (!TryReadMore(drop_size, &error)) { + *fragment_type_or_err = error; + return false; + } else if (old_size == buffer_.size()) { + return false; + } + } + + if (type == kZeroType && length == 0) { + buffer_.clear(); + *fragment_type_or_err = kBadRecord; + return true; + } + + if (checksum_) { + uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); + uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6); + if (actual_crc != expected_crc) { + *drop_size = buffer_.size(); + buffer_.clear(); + *fragment_type_or_err = kBadRecordChecksum; + return true; + } + } + + buffer_.remove_prefix(header_size + length); + + *fragment = Slice(header + header_size, length); + *fragment_type_or_err = type; + return true; +} + } // namespace log } // namespace rocksdb diff --git a/db/log_reader.h b/db/log_reader.h index 2c4f4f059901..83d05ddcb11f 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -53,7 +53,7 @@ class Reader { Reader(std::shared_ptr info_log, // @lint-ignore TXT2 T25377293 Grandfathered in std::unique_ptr&& file, Reporter* reporter, - bool checksum, uint64_t log_num, bool retry_after_eof); + bool checksum, uint64_t log_num); ~Reader(); @@ -66,6 +66,8 @@ class Reader { WALRecoveryMode wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords); + bool TryReadRecord(Slice* record, std::string* scratch); + // Returns the physical offset of the last record returned by ReadRecord. // // Undefined before the first call to ReadRecord. @@ -76,6 +78,9 @@ class Reader { return eof_; } + // returns true if the reader has encountered read error. + bool hasReadError() const { return read_error_; } + // when we know more data has been written to the file. we can use this // function to force the reader to look again in the file. // Also aligns the file position indicator to the start of the next block @@ -83,14 +88,20 @@ class Reader { // block that was partially read. void UnmarkEOF(); + void ForceUnmarkEOF(); + SequentialFileReader* file() { return file_.get(); } + Reporter* GetReporter() const { return reporter_; } + private: std::shared_ptr info_log_; const std::unique_ptr file_; Reporter* const reporter_; bool const checksum_; char* const backing_store_; + + // Internal state variables used for reading records Slice buffer_; bool eof_; // Last Read() indicated EOF by returning < kBlockSize bool read_error_; // Error occurred while reading from file @@ -110,10 +121,8 @@ class Reader { // Whether this is a recycled log file bool recycled_; - // Whether retry after encountering EOF - // TODO (yanqin) add support for retry policy, e.g. sleep, max retry limit, - // etc. - const bool retry_after_eof_; + std::string fragments_; + bool in_fragmented_record_; // Extend record types with the following special values enum { @@ -136,9 +145,16 @@ class Reader { // Return type, or one of the preceding special values unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size); + bool TryReadFragment(Slice* result, size_t* drop_size, + unsigned int* fragment_type_or_err); + // Read some more bool ReadMore(size_t* drop_size, int *error); + bool TryReadMore(size_t* drop_size, int* error); + + void UnmarkEOFInternal(); + // Reports dropped bytes to the reporter. // buffer_ must be updated to remove the dropped bytes prior to invocation. void ReportCorruption(size_t bytes, const char* reason); diff --git a/db/log_test.cc b/db/log_test.cc index 9e8148f65396..c2f090c77543 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -43,7 +43,10 @@ static std::string RandomSkewedString(int i, Random* rnd) { return BigString(NumberString(i), rnd->Skewed(17)); } -class LogTest : public ::testing::TestWithParam { +// Param type is tuple +// get<0>(tuple): non-zero if recycling log, zero if regular log +// get<1>(tuple): true if allow retry after read EOF, false otherwise +class LogTest : public ::testing::TestWithParam> { private: class StringSource : public SequentialFile { public: @@ -53,16 +56,20 @@ class LogTest : public ::testing::TestWithParam { bool force_eof_; size_t force_eof_position_; bool returned_partial_; - explicit StringSource(Slice& contents) : - contents_(contents), - force_error_(false), - force_error_position_(0), - force_eof_(false), - force_eof_position_(0), - returned_partial_(false) { } + bool fail_after_read_partial_; + explicit StringSource(Slice& contents, bool fail_after_read_partial) + : contents_(contents), + force_error_(false), + force_error_position_(0), + force_eof_(false), + force_eof_position_(0), + returned_partial_(false), + fail_after_read_partial_(fail_after_read_partial) {} virtual Status Read(size_t n, Slice* result, char* scratch) override { - EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error"; + if (fail_after_read_partial_) { + EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error"; + } if (force_error_) { if (force_error_position_ >= n) { @@ -151,9 +158,8 @@ class LogTest : public ::testing::TestWithParam { Writer writer_; Reader reader_; - // Record metadata for testing initial offset functionality - static size_t initial_offset_record_sizes_[]; - uint64_t initial_offset_last_record_offsets_[4]; + protected: + bool allow_retry_read_; public: LogTest() @@ -161,19 +167,12 @@ class LogTest : public ::testing::TestWithParam { dest_holder_(test::GetWritableFileWriter( new test::StringSink(&reader_contents_), "" /* don't care */)), source_holder_(test::GetSequentialFileReader( - new StringSource(reader_contents_), "" /* file name */)), - writer_(std::move(dest_holder_), 123, GetParam()), + new StringSource(reader_contents_, !std::get<1>(GetParam())), + "" /* file name */)), + writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())), reader_(nullptr, std::move(source_holder_), &report_, - true /* checksum */, 123 /* log_number */, - false /* retry_after_eof */) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; - initial_offset_last_record_offsets_[0] = 0; - initial_offset_last_record_offsets_[1] = header_size + 10000; - initial_offset_last_record_offsets_[2] = 2 * (header_size + 10000); - initial_offset_last_record_offsets_[3] = 2 * (header_size + 10000) + - (2 * log::kBlockSize - 1000) + - 3 * header_size; - } + true /* checksum */, 123 /* log_number */), + allow_retry_read_(std::get<1>(GetParam())) {} Slice* get_reader_contents() { return &reader_contents_; } @@ -189,7 +188,13 @@ class LogTest : public ::testing::TestWithParam { WALRecoveryMode::kTolerateCorruptedTailRecords) { std::string scratch; Slice record; - if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) { + bool ret = false; + if (allow_retry_read_) { + ret = reader_.TryReadRecord(&record, &scratch); + } else { + ret = reader_.ReadRecord(&record, &scratch, wal_recovery_mode); + } + if (ret) { return record.ToString(); } else { return "EOF"; @@ -258,23 +263,8 @@ class LogTest : public ::testing::TestWithParam { return "OK"; } } - - void WriteInitialOffsetLog() { - for (int i = 0; i < 4; i++) { - std::string record(initial_offset_record_sizes_[i], - static_cast('a' + i)); - Write(record); - } - } - }; -size_t LogTest::initial_offset_record_sizes_[] = - {10000, // Two sizable records in first block - 10000, - 2 * log::kBlockSize - 1000, // Span three blocks - 1}; - TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); } TEST_P(LogTest, ReadWrite) { @@ -312,7 +302,8 @@ TEST_P(LogTest, Fragmentation) { TEST_P(LogTest, MarginalTrailer) { // Make a trailer that is exactly the same length as an empty record. - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes()); @@ -326,7 +317,8 @@ TEST_P(LogTest, MarginalTrailer) { TEST_P(LogTest, MarginalTrailer2) { // Make a trailer that is exactly the same length as an empty record. - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes()); @@ -339,7 +331,8 @@ TEST_P(LogTest, MarginalTrailer2) { } TEST_P(LogTest, ShortTrailer) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size + 4; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes()); @@ -352,7 +345,8 @@ TEST_P(LogTest, ShortTrailer) { } TEST_P(LogTest, AlignedEof) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + int header_size = + std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize; const int n = kBlockSize - 2 * header_size + 4; Write(BigString("foo", n)); ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes()); @@ -403,6 +397,11 @@ TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) { } TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then truncated trailing record should not + // raise an error. + return; + } Write("foo"); ShrinkSize(4); // Drop all payload as well as a header byte ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); @@ -412,13 +411,20 @@ TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) { } TEST_P(LogTest, BadLength) { - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + if (allow_retry_read_) { + // If read retry is allowed, then we should not raise an error when the + // record length specified in header is longer than data currently + // available. It's possible that the body of the record is not written yet. + return; + } + bool recyclable_log = (std::get<0>(GetParam()) != 0); + int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; const int kPayloadSize = kBlockSize - header_size; Write(BigString("bar", kPayloadSize)); Write("foo"); // Least significant size byte is stored in header[4]. IncrementByte(4, 1); - if (!GetParam()) { + if (!recyclable_log) { ASSERT_EQ("foo", Read()); ASSERT_EQ(kBlockSize, DroppedBytes()); ASSERT_EQ("OK", MatchError("bad record length")); @@ -428,6 +434,12 @@ TEST_P(LogTest, BadLength) { } TEST_P(LogTest, BadLengthAtEndIsIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then we should not raise an error when the + // record length specified in header is longer than data currently + // available. It's possible that the body of the record is not written yet. + return; + } Write("foo"); ShrinkSize(1); ASSERT_EQ("EOF", Read()); @@ -436,6 +448,12 @@ TEST_P(LogTest, BadLengthAtEndIsIgnored) { } TEST_P(LogTest, BadLengthAtEndIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then we should not raise an error when the + // record length specified in header is longer than data currently + // available. It's possible that the body of the record is not written yet. + return; + } Write("foo"); ShrinkSize(1); ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency)); @@ -447,7 +465,8 @@ TEST_P(LogTest, ChecksumMismatch) { Write("foooooo"); IncrementByte(0, 14); ASSERT_EQ("EOF", Read()); - if (!GetParam()) { + bool recyclable_log = (std::get<0>(GetParam()) != 0); + if (!recyclable_log) { ASSERT_EQ(14U, DroppedBytes()); ASSERT_EQ("OK", MatchError("checksum mismatch")); } else { @@ -458,8 +477,10 @@ TEST_P(LogTest, ChecksumMismatch) { TEST_P(LogTest, UnexpectedMiddleType) { Write("foo"); - SetByte(6, static_cast(GetParam() ? kRecyclableMiddleType : kMiddleType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte(6, static_cast(recyclable_log ? kRecyclableMiddleType + : kMiddleType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); ASSERT_EQ("OK", MatchError("missing start")); @@ -467,8 +488,10 @@ TEST_P(LogTest, UnexpectedMiddleType) { TEST_P(LogTest, UnexpectedLastType) { Write("foo"); - SetByte(6, static_cast(GetParam() ? kRecyclableLastType : kLastType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte(6, + static_cast(recyclable_log ? kRecyclableLastType : kLastType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); ASSERT_EQ("OK", MatchError("missing start")); @@ -477,8 +500,10 @@ TEST_P(LogTest, UnexpectedLastType) { TEST_P(LogTest, UnexpectedFullType) { Write("foo"); Write("bar"); - SetByte(6, static_cast(GetParam() ? kRecyclableFirstType : kFirstType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte( + 6, static_cast(recyclable_log ? kRecyclableFirstType : kFirstType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ("bar", Read()); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); @@ -488,8 +513,10 @@ TEST_P(LogTest, UnexpectedFullType) { TEST_P(LogTest, UnexpectedFirstType) { Write("foo"); Write(BigString("bar", 100000)); - SetByte(6, static_cast(GetParam() ? kRecyclableFirstType : kFirstType)); - FixChecksum(0, 3, !!GetParam()); + bool recyclable_log = (std::get<0>(GetParam()) != 0); + SetByte( + 6, static_cast(recyclable_log ? kRecyclableFirstType : kFirstType)); + FixChecksum(0, 3, !!recyclable_log); ASSERT_EQ(BigString("bar", 100000), Read()); ASSERT_EQ("EOF", Read()); ASSERT_EQ(3U, DroppedBytes()); @@ -506,6 +533,11 @@ TEST_P(LogTest, MissingLastIsIgnored) { } TEST_P(LogTest, MissingLastIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then truncated trailing record should not + // raise an error. + return; + } Write(BigString("bar", kBlockSize)); // Remove the LAST block, including header. ShrinkSize(14); @@ -524,6 +556,11 @@ TEST_P(LogTest, PartialLastIsIgnored) { } TEST_P(LogTest, PartialLastIsNotIgnored) { + if (allow_retry_read_) { + // If read retry is allowed, then truncated trailing record should not + // raise an error. + return; + } Write(BigString("bar", kBlockSize)); // Cause a bad record length in the LAST block. ShrinkSize(1); @@ -550,7 +587,8 @@ TEST_P(LogTest, ErrorJoinsRecords) { SetByte(offset, 'x'); } - if (!GetParam()) { + bool recyclable_log = (std::get<0>(GetParam()) != 0); + if (!recyclable_log) { ASSERT_EQ("correct", Read()); ASSERT_EQ("EOF", Read()); size_t dropped = DroppedBytes(); @@ -564,7 +602,8 @@ TEST_P(LogTest, ErrorJoinsRecords) { TEST_P(LogTest, ClearEofSingleBlock) { Write("foo"); Write("bar"); - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool recyclable_log = (std::get<0>(GetParam()) != 0); + int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; ForceEOF(3 + header_size + 2); ASSERT_EQ("foo", Read()); UnmarkEOF(); @@ -579,7 +618,8 @@ TEST_P(LogTest, ClearEofSingleBlock) { TEST_P(LogTest, ClearEofMultiBlock) { size_t num_full_blocks = 5; - int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool recyclable_log = (std::get<0>(GetParam()) != 0); + int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize; size_t n = (kBlockSize - header_size) * num_full_blocks + 25; Write(BigString("foo", n)); Write(BigString("bar", n)); @@ -628,7 +668,8 @@ TEST_P(LogTest, ClearEofError2) { } TEST_P(LogTest, Recycle) { - if (!GetParam()) { + bool recyclable_log = (std::get<0>(GetParam()) != 0); + if (!recyclable_log) { return; // test is only valid for recycled logs } Write("foo"); @@ -651,7 +692,11 @@ TEST_P(LogTest, Recycle) { ASSERT_EQ("EOF", Read()); } -INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2)); +INSTANTIATE_TEST_CASE_P(bool, LogTest, + ::testing::Values(std::make_tuple(0, false), + std::make_tuple(0, true), + std::make_tuple(1, false), + std::make_tuple(1, true))); class RetriableLogTest : public ::testing::TestWithParam { private: @@ -717,8 +762,7 @@ class RetriableLogTest : public ::testing::TestWithParam { reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_)); assert(reader_ != nullptr); log_reader_.reset(new Reader(nullptr, std::move(reader_), &report_, - true /* checksum */, 123 /* log_number */, - true /* retry_after_eof */)); + true /* checksum */, 123 /* log_number */)); assert(log_reader_ != nullptr); } return s; @@ -738,14 +782,17 @@ class RetriableLogTest : public ::testing::TestWithParam { writer_->Sync(true); } - std::string Read() { - auto wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords; + bool TryRead(std::string* result) { + assert(result != nullptr); + result->clear(); std::string scratch; Slice record; - if (log_reader_->ReadRecord(&record, &scratch, wal_recovery_mode)) { - return record.ToString(); + bool r = log_reader_->TryReadRecord(&record, &scratch); + if (r) { + result->assign(record.data(), record.size()); + return true; } else { - return "Read error"; + return false; } } }; @@ -754,12 +801,16 @@ TEST_P(RetriableLogTest, TailLog_PartialHeader) { ASSERT_OK(SetupTestEnv()); std::vector remaining_bytes_in_last_record; size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool eof = false; SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency( {{"RetriableLogTest::TailLog:AfterPart1", "RetriableLogTest::TailLog:BeforeReadRecord"}, - {"LogReader::ReadMore:FirstEOF", + {"LogReader::TryReadMore:FirstEOF", "RetriableLogTest::TailLog:BeforePart2"}}); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack("LogReader::TryReadMore:FirstEOF", + [&](void* /*arg*/) { eof = true; }); SyncPoint::GetInstance()->EnableProcessing(); size_t delta = header_size - 1; @@ -779,23 +830,29 @@ TEST_P(RetriableLogTest, TailLog_PartialHeader) { std::string record; port::Thread log_reader_thread([&]() { TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); - record = Read(); + while (!TryRead(&record)) { + } }); log_reader_thread.join(); log_writer_thread.join(); ASSERT_EQ("foo", record); + ASSERT_TRUE(eof); } TEST_P(RetriableLogTest, TailLog_FullHeader) { ASSERT_OK(SetupTestEnv()); std::vector remaining_bytes_in_last_record; size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + bool eof = false; SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency( {{"RetriableLogTest::TailLog:AfterPart1", "RetriableLogTest::TailLog:BeforeReadRecord"}, - {"LogReader::ReadMore:FirstEOF", + {"LogReader::TryReadMore:FirstEOF", "RetriableLogTest::TailLog:BeforePart2"}}); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack("LogReader::TryReadMore:FirstEOF", + [&](void* /*arg*/) { eof = true; }); SyncPoint::GetInstance()->EnableProcessing(); size_t delta = header_size + 1; @@ -810,18 +867,45 @@ TEST_P(RetriableLogTest, TailLog_FullHeader) { TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1"); TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2"); Write(Slice(part2)); + ASSERT_TRUE(eof); }); std::string record; port::Thread log_reader_thread([&]() { TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord"); - record = Read(); + while (!TryRead(&record)) { + } }); log_reader_thread.join(); log_writer_thread.join(); ASSERT_EQ("foo", record); } +TEST_P(RetriableLogTest, NonBlockingReadFullRecord) { + // Clear all sync point callbacks even if this test does not use sync point. + // It is necessary, otherwise the execute of this test may hit a sync point + // with which a callback is registered. The registered callback may access + // some dead variable, causing segfault. + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + ASSERT_OK(SetupTestEnv()); + size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize; + size_t delta = header_size - 1; + size_t old_sz = contents().size(); + Encode("foo-bar"); + size_t new_sz = contents().size(); + std::string part1 = contents().substr(old_sz, delta); + std::string part2 = + contents().substr(old_sz + delta, new_sz - old_sz - delta); + Write(Slice(part1)); + std::string record; + ASSERT_FALSE(TryRead(&record)); + ASSERT_TRUE(record.empty()); + Write(Slice(part2)); + ASSERT_TRUE(TryRead(&record)); + ASSERT_EQ("foo-bar", record); +} + INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2)); } // namespace log diff --git a/db/repair.cc b/db/repair.cc index 4e93a161cf1d..e6f94338982a 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -364,8 +364,7 @@ class Repairer { // propagating bad information (like overly large sequence // numbers). log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter, - true /*enable checksum*/, log, - false /* retry_after_eof */); + true /*enable checksum*/, log); // Initialize per-column family memtables for (auto* cfd : *vset_.GetColumnFamilySet()) { diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 4d6671ef66d7..4f55a30d30af 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -315,8 +315,7 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) { assert(file); currentLogReader_.reset( new log::Reader(options_->info_log, std::move(file), &reporter_, - read_options_.verify_checksums_, logFile->LogNumber(), - false /* retry_after_eof */)); + read_options_.verify_checksums_, logFile->LogNumber())); return Status::OK(); } } // namespace rocksdb diff --git a/db/version_builder.cc b/db/version_builder.cc index 7b45347c1240..84e4dc6579ae 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -364,10 +364,10 @@ class VersionBuilder::Rep { CheckConsistency(vstorage); } - void LoadTableHandlers(InternalStats* internal_stats, int max_threads, - bool prefetch_index_and_filter_in_cache, - bool is_initial_load, - const SliceTransform* prefix_extractor) { + Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, + bool prefetch_index_and_filter_in_cache, + bool is_initial_load, + const SliceTransform* prefix_extractor) { assert(table_cache_ != nullptr); size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity(); @@ -394,7 +394,8 @@ class VersionBuilder::Rep { size_t table_cache_usage = table_cache_->get_cache()->GetUsage(); if (table_cache_usage >= load_limit) { - return; + // TODO (yanqin) find a suitable status code. + return Status::OK(); } else { max_load = load_limit - table_cache_usage; } @@ -402,11 +403,15 @@ class VersionBuilder::Rep { // std::vector> files_meta; + std::vector statuses; for (int level = 0; level < num_levels_; level++) { for (auto& file_meta_pair : levels_[level].added_files) { auto* file_meta = file_meta_pair.second; - assert(!file_meta->table_reader_handle); - files_meta.emplace_back(file_meta, level); + // If the file has been opened before, just skip it. + if (!file_meta->table_reader_handle) { + files_meta.emplace_back(file_meta, level); + statuses.emplace_back(Status::OK()); + } if (files_meta.size() >= max_load) { break; } @@ -426,7 +431,7 @@ class VersionBuilder::Rep { auto* file_meta = files_meta[file_idx].first; int level = files_meta[file_idx].second; - table_cache_->FindTable( + statuses[file_idx] = table_cache_->FindTable( env_options_, *(base_vstorage_->InternalComparator()), file_meta->fd, &file_meta->table_reader_handle, prefix_extractor, false /*no_io */, true /* record_read_stats */, @@ -448,6 +453,12 @@ class VersionBuilder::Rep { for (auto& t : threads) { t.join(); } + for (const auto& s : statuses) { + if (!s.ok()) { + return s; + } + } + return Status::OK(); } void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) { @@ -487,14 +498,13 @@ void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { rep_->SaveTo(vstorage); } -void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats, - int max_threads, - bool prefetch_index_and_filter_in_cache, - bool is_initial_load, - const SliceTransform* prefix_extractor) { - rep_->LoadTableHandlers(internal_stats, max_threads, - prefetch_index_and_filter_in_cache, is_initial_load, - prefix_extractor); +Status VersionBuilder::LoadTableHandlers( + InternalStats* internal_stats, int max_threads, + bool prefetch_index_and_filter_in_cache, bool is_initial_load, + const SliceTransform* prefix_extractor) { + return rep_->LoadTableHandlers(internal_stats, max_threads, + prefetch_index_and_filter_in_cache, + is_initial_load, prefix_extractor); } void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level, diff --git a/db/version_builder.h b/db/version_builder.h index d6ee37e08ffa..168301fdd619 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -33,10 +33,10 @@ class VersionBuilder { bool CheckConsistencyForNumLevels(); void Apply(VersionEdit* edit); void SaveTo(VersionStorageInfo* vstorage); - void LoadTableHandlers(InternalStats* internal_stats, int max_threads, - bool prefetch_index_and_filter_in_cache, - bool is_initial_load, - const SliceTransform* prefix_extractor); + Status LoadTableHandlers(InternalStats* internal_stats, int max_threads, + bool prefetch_index_and_filter_in_cache, + bool is_initial_load, + const SliceTransform* prefix_extractor); void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f); private: diff --git a/db/version_set.cc b/db/version_set.cc index 8cead5dc618a..4b419c3ff29b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -713,6 +713,7 @@ void LevelIterator::InitFileIterator(size_t new_file_index) { } } } +} // anonymous namespace // A wrapper of version builder which references the current version in // constructor and unref it in the destructor. @@ -736,7 +737,6 @@ class BaseReferencedVersionBuilder { VersionBuilder* version_builder_; Version* version_; }; -} // anonymous namespace Status Version::GetTableProperties(std::shared_ptr* tp, const FileMetaData* file_meta, @@ -2938,7 +2938,7 @@ Status VersionSet::ProcessManifestWrites( } else if (group_start != std::numeric_limits::max()) { group_start = std::numeric_limits::max(); } - LogAndApplyHelper(last_writer->cfd, builder, version, e, mu); + LogAndApplyHelper(last_writer->cfd, builder, e, mu); batch_edits.push_back(e); } } @@ -2992,6 +2992,7 @@ Status VersionSet::ProcessManifestWrites( assert(pending_manifest_file_number_ == 0); if (!descriptor_log_ || manifest_file_size_ > db_options_->max_manifest_file_size) { + TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest"); pending_manifest_file_number_ = NewFileNumber(); batch_edits.back()->SetNextFile(next_file_number_.load()); new_descriptor_log = true; @@ -3088,6 +3089,7 @@ Status VersionSet::ProcessManifestWrites( if (s.ok() && new_descriptor_log) { s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_, db_directory); + TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest"); } if (s.ok()) { @@ -3215,7 +3217,7 @@ Status VersionSet::ProcessManifestWrites( return s; } -// 'datas' is gramatically incorrect. We still use this notation is to indicate +// 'datas' is gramatically incorrect. We still use this notation to indicate // that this variable represents a collection of column_family_data. Status VersionSet::LogAndApply( const autovector& column_family_datas, @@ -3297,6 +3299,132 @@ Status VersionSet::LogAndApply( new_cf_options); } +Status VersionSet::ReadAndApply( + InstrumentedMutex* mu, std::unique_ptr* manifest_reader, + std::unordered_set* cfds_changed) { + assert(manifest_reader != nullptr); + assert(cfds_changed != nullptr); + mu->AssertHeld(); + + Status s; + bool have_log_number = false; + bool have_prev_log_number = false; + bool have_next_file = false; + bool have_last_sequence = false; + uint64_t next_file = 0; + uint64_t last_sequence = 0; + uint64_t log_number = 0; + uint64_t previous_log_number = 0; + uint32_t max_column_family = 0; + uint64_t min_log_number_to_keep = 0; + + while (s.ok()) { + Slice record; + std::string scratch; + log::Reader* reader = manifest_reader->get(); + std::string old_manifest_path = reader->file()->file_name(); + while (reader->TryReadRecord(&record, &scratch)) { + VersionEdit edit; + s = edit.DecodeFrom(record); + if (!s.ok()) { + break; + } + auto cfd = column_family_set_->GetColumnFamily(edit.column_family_); + if (active_version_builders_.find(edit.column_family_) == + active_version_builders_.end()) { + std::unique_ptr builder_guard( + new BaseReferencedVersionBuilder(cfd)); + active_version_builders_.insert( + std::make_pair(edit.column_family_, std::move(builder_guard))); + } + s = ApplyOneVersionEditToBuilder( + edit, &have_log_number, &log_number, &have_prev_log_number, + &previous_log_number, &have_next_file, &next_file, + &have_last_sequence, &last_sequence, &min_log_number_to_keep, + &max_column_family); + if (!s.ok()) { + break; + } + if (column_family_set_->get_table_cache()->GetCapacity() == + TableCache::kInfiniteCapacity) { + // Unlimited table cache. Pre-load table handle now so that the table + // files are still accessible to us after the primary unlinks them. + auto builder_iter = active_version_builders_.find(edit.column_family_); + assert(builder_iter != active_version_builders_.end()); + auto builder = builder_iter->second->version_builder(); + assert(builder != nullptr); + s = builder->LoadTableHandlers( + cfd->internal_stats(), db_options_->max_file_opening_threads, + false /* prefetch_index_and_filter_in_cache */, + false /* is_initial_load */, + cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + if (!s.ok() && !s.IsPathNotFound()) { + break; + } else if (s.IsPathNotFound()) { + s = Status::OK(); + // TODO (yanqin) release file descriptors already opened, or modify + // LoadTableHandlers so that opened files are not re-opened. + } else { // s.ok() == true + auto version = new Version(cfd, this, env_options_, + *cfd->GetLatestMutableCFOptions(), + current_version_number_++); + builder->SaveTo(version->storage_info()); + version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true); + AppendVersion(cfd, version); + active_version_builders_.erase(builder_iter); + if (cfds_changed->count(cfd) == 0) { + cfds_changed->insert(cfd); + } + } + } + if (have_next_file) { + next_file_number_.store(next_file + 1); + } + if (have_last_sequence) { + last_allocated_sequence_ = last_sequence; + last_published_sequence_ = last_sequence; + last_sequence_ = last_sequence; + } + if (have_prev_log_number) { + prev_log_number_ = previous_log_number; + MarkFileNumberUsed(previous_log_number); + } + if (have_log_number) { + MarkFileNumberUsed(log_number); + } + column_family_set_->UpdateMaxColumnFamily(max_column_family); + MarkMinLogNumberToKeep2PC(min_log_number_to_keep); + } + if (s.ok()) { + // It's possible that we have finished reading the current MANIFEST, and + // the primary has created a new MANIFEST. + log::Reader::Reporter* reporter = reader->GetReporter(); + s = MaybeSwitchManifest(reporter, manifest_reader); + reader = manifest_reader->get(); + } + if (s.ok() && reader->file()->file_name() == old_manifest_path) { + break; + } + } + + if (s.ok()) { + for (auto cfd : *column_family_set_) { + auto builder_iter = active_version_builders_.find(cfd->GetID()); + if (builder_iter == active_version_builders_.end()) { + continue; + } + auto builder = builder_iter->second->version_builder(); + if (!builder->CheckConsistencyForNumLevels()) { + s = Status::InvalidArgument( + "db has more levels than options.num_levels"); + break; + } + } + } + + return s; +} + void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { assert(edit->IsColumnFamilyManipulation()); edit->SetNextFile(next_file_number_.load()); @@ -3315,8 +3443,8 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { } void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, - VersionBuilder* builder, Version* /*v*/, - VersionEdit* edit, InstrumentedMutex* mu) { + VersionBuilder* builder, VersionEdit* edit, + InstrumentedMutex* mu) { #ifdef NDEBUG (void)cfd; #endif @@ -3343,7 +3471,7 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, builder->Apply(edit); } -Status VersionSet::ApplyOneVersionEdit( +Status VersionSet::ApplyOneVersionEditToBuilder( VersionEdit& edit, const std::unordered_map& name_to_options, std::unordered_map& column_families_not_found, @@ -3470,6 +3598,152 @@ Status VersionSet::ApplyOneVersionEdit( return Status::OK(); } +Status VersionSet::ApplyOneVersionEditToBuilder( + VersionEdit& edit, bool* have_log_number, uint64_t* /* log_number */, + bool* have_prev_log_number, uint64_t* previous_log_number, + bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, + SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, + uint32_t* max_column_family) { + ColumnFamilyData* cfd = nullptr; + Status status; + if (edit.is_column_family_add_) { + // TODO (yanqin) for now the secondary ignores column families created + // after Open. This also simplifies handling of switching to a new MANIFEST + // and processing the snapshot of the system at the beginning of the + // MANIFEST. + return Status::OK(); + } else if (edit.is_column_family_drop_) { + // Drop the column family by setting it to be 'dropped' without destroying + // the column family handle. + cfd = column_family_set_->GetColumnFamily(edit.column_family_); + // Drop a CF created after Open? Then ignore + if (cfd == nullptr) { + return Status::OK(); + } + cfd->SetDropped(); + if (cfd->Unref()) { + delete cfd; + cfd = nullptr; + } + } else { + cfd = column_family_set_->GetColumnFamily(edit.column_family_); + // Operation on a CF created after Open? Then ignore + if (cfd == nullptr) { + return Status::OK(); + } + auto builder_iter = active_version_builders_.find(edit.column_family_); + assert(builder_iter != active_version_builders_.end()); + auto builder = builder_iter->second->version_builder(); + assert(builder != nullptr); + builder->Apply(&edit); + } + if (cfd != nullptr) { + if (edit.has_log_number_) { + if (cfd->GetLogNumber() > edit.log_number_) { + // TODO (yanqin) use a separate info log for secondary instance. + } else { + cfd->SetLogNumber(edit.log_number_); + *have_log_number = true; + } + } + if (edit.has_comparator_ && + edit.comparator_ != cfd->user_comparator()->Name()) { + return Status::InvalidArgument( + cfd->user_comparator()->Name(), + "does not match existing comparator " + edit.comparator_); + } + } + + if (edit.has_prev_log_number_) { + *previous_log_number = edit.prev_log_number_; + *have_prev_log_number = true; + } + + if (edit.has_next_file_number_) { + *next_file = edit.next_file_number_; + *have_next_file = true; + } + + if (edit.has_max_column_family_) { + *max_column_family = edit.max_column_family_; + } + + if (edit.has_min_log_number_to_keep_) { + *min_log_number_to_keep = + std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_); + } + + if (edit.has_last_sequence_) { + *last_sequence = edit.last_sequence_; + *have_last_sequence = true; + } + return status; +} + +Status VersionSet::MaybeSwitchManifest( + log::Reader::Reporter* reporter, + std::unique_ptr* manifest_reader) { + assert(manifest_reader != nullptr); + Status s; + do { + std::string manifest_path; + s = GetCurrentManifestPath(&manifest_path); + std::unique_ptr manifest_file; + if (s.ok()) { + if (nullptr == manifest_reader->get() || + manifest_reader->get()->file()->file_name() != manifest_path) { + TEST_SYNC_POINT( + "VersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0"); + TEST_SYNC_POINT( + "VersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:1"); + s = env_->NewSequentialFile( + manifest_path, &manifest_file, + env_->OptimizeForManifestRead(env_options_)); + } else { + // No need to switch manifest. + break; + } + } + std::unique_ptr manifest_file_reader; + if (s.ok()) { + manifest_file_reader.reset( + new SequentialFileReader(std::move(manifest_file), manifest_path)); + // TODO(yanqin) secondary instance needs a separate info log file. + manifest_reader->reset( + new log::Reader(nullptr, std::move(manifest_file_reader), reporter, + true /* checksum */, 0 /* log_number */)); + ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n", + manifest_path.c_str()); + } + } while (s.IsPathNotFound()); + return s; +} + +Status VersionSet::GetCurrentManifestPath(std::string* manifest_path) { + assert(manifest_path != nullptr); + std::string fname; + Status s = ReadFileToString(env_, CurrentFileName(dbname_), &fname); + if (!s.ok()) { + return s; + } + if (fname.empty() || fname.back() != '\n') { + return Status::Corruption("CURRENT file does not end with newline"); + } + // remove the trailing '\n' + fname.resize(fname.size() - 1); + FileType type; + bool parse_ok = ParseFileName(fname, &manifest_file_number_, &type); + if (!parse_ok || type != kDescriptorFile) { + return Status::Corruption("CURRENT file corrupted"); + } + *manifest_path = dbname_; + if (dbname_.back() != '/') { + manifest_path->push_back('/'); + } + *manifest_path += fname; + return Status::OK(); +} + Status VersionSet::Recover( const std::vector& column_families, bool read_only) { @@ -3483,43 +3757,28 @@ Status VersionSet::Recover( std::unordered_map column_families_not_found; // Read "CURRENT" file, which contains a pointer to the current manifest file - std::string manifest_filename; - Status s = ReadFileToString( - env_, CurrentFileName(dbname_), &manifest_filename - ); + std::string manifest_path; + Status s = GetCurrentManifestPath(&manifest_path); if (!s.ok()) { return s; } - if (manifest_filename.empty() || - manifest_filename.back() != '\n') { - return Status::Corruption("CURRENT file does not end with newline"); - } - // remove the trailing '\n' - manifest_filename.resize(manifest_filename.size() - 1); - FileType type; - bool parse_ok = - ParseFileName(manifest_filename, &manifest_file_number_, &type); - if (!parse_ok || type != kDescriptorFile) { - return Status::Corruption("CURRENT file corrupted"); - } ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n", - manifest_filename.c_str()); + manifest_path.c_str()); - manifest_filename = dbname_ + "/" + manifest_filename; std::unique_ptr manifest_file_reader; { std::unique_ptr manifest_file; - s = env_->NewSequentialFile(manifest_filename, &manifest_file, + s = env_->NewSequentialFile(manifest_path, &manifest_file, env_->OptimizeForManifestRead(env_options_)); if (!s.ok()) { return s; } manifest_file_reader.reset( - new SequentialFileReader(std::move(manifest_file), manifest_filename)); + new SequentialFileReader(std::move(manifest_file), manifest_path)); } uint64_t current_manifest_file_size; - s = env_->GetFileSize(manifest_filename, ¤t_manifest_file_size); + s = env_->GetFileSize(manifest_path, ¤t_manifest_file_size); if (!s.ok()) { return s; } @@ -3555,8 +3814,7 @@ Status VersionSet::Recover( VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter, - true /* checksum */, 0 /* log_number */, - false /* retry_after_eof */); + true /* checksum */, 0 /* log_number */); Slice record; std::string scratch; std::vector replay_buffer; @@ -3587,7 +3845,7 @@ Status VersionSet::Recover( TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup", &edit); for (auto& e : replay_buffer) { - s = ApplyOneVersionEdit( + s = ApplyOneVersionEditToBuilder( e, cf_name_to_options, column_families_not_found, builders, &have_log_number, &log_number, &have_prev_log_number, &previous_log_number, &have_next_file, &next_file, @@ -3608,7 +3866,7 @@ Status VersionSet::Recover( s = Status::Corruption("corrupted atomic group"); break; } - s = ApplyOneVersionEdit( + s = ApplyOneVersionEditToBuilder( edit, cf_name_to_options, column_families_not_found, builders, &have_log_number, &log_number, &have_prev_log_number, &previous_log_number, &have_next_file, &next_file, @@ -3715,7 +3973,7 @@ Status VersionSet::Recover( "prev_log_number is %lu," "max_column_family is %u," "min_log_number_to_keep is %lu\n", - manifest_filename.c_str(), (unsigned long)manifest_file_number_, + manifest_path.c_str(), (unsigned long)manifest_file_number_, (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_, (unsigned long)log_number, (unsigned long)prev_log_number_, column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc()); @@ -3737,6 +3995,179 @@ Status VersionSet::Recover( return s; } +Status VersionSet::RecoverAsSecondary( + const std::vector& column_families, + std::unique_ptr* manifest_reader, + std::unique_ptr* manifest_reporter, + std::unique_ptr* manifest_reader_status) { + assert(manifest_reader != nullptr); + assert(manifest_reporter != nullptr); + assert(manifest_reader_status != nullptr); + + std::unordered_map cf_name_to_options; + for (const auto& cf : column_families) { + cf_name_to_options.insert({cf.name, cf.options}); + } + + // add default column family + auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); + if (default_cf_iter == cf_name_to_options.end()) { + return Status::InvalidArgument("Default column family not specified"); + } + VersionEdit default_cf_edit; + default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName); + default_cf_edit.SetColumnFamily(0); + ColumnFamilyData* default_cfd = + CreateColumnFamily(default_cf_iter->second, &default_cf_edit); + // In recovery, nobody else can access it, so it's fine to set it to be + // initialized earlier. + default_cfd->set_initialized(); + + bool have_log_number = false; + bool have_prev_log_number = false; + bool have_next_file = false; + bool have_last_sequence = false; + uint64_t next_file = 0; + uint64_t last_sequence = 0; + uint64_t log_number = 0; + uint64_t previous_log_number = 0; + uint32_t max_column_family = 0; + uint64_t min_log_number_to_keep = 0; + std::unordered_map builders; + std::unordered_map column_families_not_found; + builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)}); + + manifest_reader_status->reset(new Status()); + manifest_reporter->reset(new LogReporter()); + static_cast(manifest_reporter->get())->status = + manifest_reader_status->get(); + Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader); + log::Reader* reader = manifest_reader->get(); + + while (s.ok()) { + assert(reader != nullptr); + Slice record; + std::string scratch; + while (s.ok() && reader->TryReadRecord(&record, &scratch)) { + VersionEdit edit; + s = edit.DecodeFrom(record); + if (!s.ok()) { + break; + } + s = ApplyOneVersionEditToBuilder( + edit, cf_name_to_options, column_families_not_found, builders, + &have_log_number, &log_number, &have_prev_log_number, + &previous_log_number, &have_next_file, &next_file, + &have_last_sequence, &last_sequence, &min_log_number_to_keep, + &max_column_family); + } + if (s.ok()) { + bool enough = have_next_file && have_log_number && have_last_sequence; + if (enough) { + for (const auto& cf : column_families) { + auto cfd = column_family_set_->GetColumnFamily(cf.name); + if (cfd == nullptr) { + enough = false; + break; + } + } + } + if (enough && column_family_set_->get_table_cache()->GetCapacity() == + TableCache::kInfiniteCapacity) { + for (const auto& cf : column_families) { + auto cfd = column_family_set_->GetColumnFamily(cf.name); + assert(cfd != nullptr); + if (!cfd->IsDropped()) { + auto builder_iter = builders.find(cfd->GetID()); + assert(builder_iter != builders.end()); + auto builder = builder_iter->second->version_builder(); + assert(builder != nullptr); + s = builder->LoadTableHandlers( + cfd->internal_stats(), db_options_->max_file_opening_threads, + false /* prefetch_index_and_filter_in_cache */, + false /* is_initial_load */, + cfd->GetLatestMutableCFOptions()->prefix_extractor.get()); + if (!s.ok()) { + enough = false; + if (s.IsPathNotFound()) { + s = Status::OK(); + } + break; + } + } + } + if (!enough) { + // TODO (yanqin) release table handlers if any of the files are not + // found. + } + } + if (enough) { + break; + } + } + } + + if (s.ok()) { + if (!have_prev_log_number) { + previous_log_number = 0; + } + column_family_set_->UpdateMaxColumnFamily(max_column_family); + + MarkMinLogNumberToKeep2PC(min_log_number_to_keep); + MarkFileNumberUsed(previous_log_number); + MarkFileNumberUsed(log_number); + + for (auto cfd : *column_family_set_) { + assert(builders.count(cfd->GetID()) > 0); + auto builder = builders[cfd->GetID()]->version_builder(); + if (!builder->CheckConsistencyForNumLevels()) { + s = Status::InvalidArgument( + "db has more levels than options.num_levels"); + break; + } + } + } + + if (s.ok()) { + for (auto cfd : *column_family_set_) { + if (cfd->IsDropped()) { + continue; + } + assert(cfd->initialized()); + auto builders_iter = builders.find(cfd->GetID()); + assert(builders_iter != builders.end()); + auto* builder = builders_iter->second->version_builder(); + + Version* v = new Version(cfd, this, env_options_, + *cfd->GetLatestMutableCFOptions(), + current_version_number_++); + builder->SaveTo(v->storage_info()); + + // Install recovered version + v->PrepareApply(*cfd->GetLatestMutableCFOptions(), + !(db_options_->skip_stats_update_on_db_open)); + AppendVersion(cfd, v); + } + next_file_number_.store(next_file + 1); + last_allocated_sequence_ = last_sequence; + last_published_sequence_ = last_sequence; + last_sequence_ = last_sequence; + prev_log_number_ = previous_log_number; + for (auto cfd : *column_family_set_) { + if (cfd->IsDropped()) { + continue; + } + ROCKS_LOG_INFO(db_options_->info_log, + "Column family [%s] (ID %u), log number is %" PRIu64 "\n", + cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber()); + } + } + for (auto& builder : builders) { + delete builder.second; + } + return s; +} + Status VersionSet::ListColumnFamilies(std::vector* column_families, const std::string& dbname, Env* env) { // these are just for performance reasons, not correcntes, @@ -3771,8 +4202,7 @@ Status VersionSet::ListColumnFamilies(std::vector* column_families, VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, - true /* checksum */, 0 /* log_number */, - false /* retry_after_eof */); + true /* checksum */, 0 /* log_number */); Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { @@ -3932,8 +4362,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(nullptr, std::move(file_reader), &reporter, - true /* checksum */, 0 /* log_number */, - false /* retry_after_eof */); + true /* checksum */, 0 /* log_number */); Slice record; std::string scratch; while (reader.ReadRecord(&record, &scratch) && s.ok()) { diff --git a/db/version_set.h b/db/version_set.h index b50f653ba436..c4c1b2445b12 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -735,9 +735,7 @@ struct ObsoleteFileInfo { } }; -namespace { class BaseReferencedVersionBuilder; -} class VersionSet { public: @@ -799,12 +797,24 @@ class VersionSet { bool new_descriptor_log = false, const ColumnFamilyOptions* new_cf_options = nullptr); + Status ReadAndApply(InstrumentedMutex* mu, + std::unique_ptr* manifest_reader, + std::unordered_set* cfds_changed); + + Status GetCurrentManifestPath(std::string* manifest_filename); + // Recover the last saved descriptor from persistent storage. // If read_only == true, Recover() will not complain if some column families // are not opened Status Recover(const std::vector& column_families, bool read_only = false); + Status RecoverAsSecondary( + const std::vector& column_families, + std::unique_ptr* manifest_reader, + std::unique_ptr* manifest_reporter, + std::unique_ptr* manifest_reader_status); + // Reads a manifest file and returns a list of column families in // column_families. static Status ListColumnFamilies(std::vector* column_families, @@ -984,6 +994,7 @@ class VersionSet { friend class Version; friend class DBImpl; + friend class DBImplReadOnly; struct LogReporter : public log::Reader::Reporter { Status* status; @@ -1007,7 +1018,8 @@ class VersionSet { ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, VersionEdit* edit); - Status ApplyOneVersionEdit( + // REQUIRES db mutex + Status ApplyOneVersionEditToBuilder( VersionEdit& edit, const std::unordered_map& name_to_opts, std::unordered_map& column_families_not_found, @@ -1017,6 +1029,18 @@ class VersionSet { bool* have_last_sequence, SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, uint32_t* max_column_family); + // REQUIRES db mutex + Status ApplyOneVersionEditToBuilder( + VersionEdit& edit, bool* have_log_number, uint64_t* log_number, + bool* have_prev_log_number, uint64_t* previous_log_number, + bool* have_next_file, uint64_t* next_file, bool* have_last_sequence, + SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep, + uint32_t* max_column_family); + + Status MaybeSwitchManifest(log::Reader::Reporter* reporter, + std::unique_ptr* manifest_reader); + + // REQUIRES db mutex at beginning. may release and re-acquire db mutex Status ProcessManifestWrites(std::deque& writers, InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log, @@ -1070,12 +1094,15 @@ class VersionSet { // env options for all reads and writes except compactions EnvOptions env_options_; + std::unordered_map> + active_version_builders_; + // No copying allowed VersionSet(const VersionSet&); void operator=(const VersionSet&); void LogAndApplyCFHelper(VersionEdit* edit); - void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, Version* v, + void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, VersionEdit* edit, InstrumentedMutex* mu); }; diff --git a/db/wal_manager.cc b/db/wal_manager.cc index 667ecae41add..dc513e9997ef 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -457,7 +457,7 @@ Status WalManager::ReadFirstLine(const std::string& fname, reporter.status = &status; reporter.ignore_error = !db_options_.paranoid_checks; log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, - true /*checksum*/, number, false /* retry_after_eof */); + true /*checksum*/, number); std::string scratch; Slice record; diff --git a/env/env_hdfs.cc b/env/env_hdfs.cc index 14fb902f0d40..7c0e14fe23e7 100644 --- a/env/env_hdfs.cc +++ b/env/env_hdfs.cc @@ -36,9 +36,11 @@ namespace { // Log error message static Status IOError(const std::string& context, int err_number) { - return (err_number == ENOSPC) ? - Status::NoSpace(context, strerror(err_number)) : - Status::IOError(context, strerror(err_number)); + return (err_number == ENOSPC) + ? Status::NoSpace(context, strerror(err_number)) + : (err_number == ENOENT) + ? Status::PathNotFound(context, strerror(err_number)) + : Status::IOError(context, strerror(err_number)); } // assume that there is one global logger for now. It is not thread-safe, diff --git a/env/io_posix.h b/env/io_posix.h index 106f6df6507f..e6824d3e8706 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -41,6 +41,9 @@ static Status IOError(const std::string& context, const std::string& file_name, strerror(err_number)); case ESTALE: return Status::IOError(Status::kStaleFile); + case ENOENT: + return Status::PathNotFound(IOErrorMsg(context, file_name), + strerror(err_number)); default: return Status::IOError(IOErrorMsg(context, file_name), strerror(err_number)); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 53fb52c9494a..b1adef377fb1 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -155,6 +155,15 @@ class DB { std::vector* handles, DB** dbptr, bool error_if_log_file_exist = false); + static Status OpenAsSecondary(const Options& options, const std::string& name, + const std::string& secondary_name, DB** dbptr); + + static Status OpenAsSecondary( + const DBOptions& db_options, const std::string& name, + const std::string& secondary_name, + const std::vector& column_families, + std::vector* handles, DB** dbptr); + // Open DB with column families. // db_options specify database specific options // column_families is the vector of all column families in the database, diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index 40b374ecf6e8..f8f66bf42262 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -73,6 +73,7 @@ class Status { kStaleFile = 6, kMemoryLimit = 7, kSpaceLimit = 8, + kPathNotFound = 9, kMaxSubCode }; @@ -198,6 +199,11 @@ class Status { return Status(kIOError, kSpaceLimit, msg, msg2); } + static Status PathNotFound() { return Status(kIOError, kPathNotFound); } + static Status PathNotFound(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kIOError, kPathNotFound, msg, msg2); + } + // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -266,6 +272,14 @@ class Status { return (code() == kAborted) && (subcode() == kMemoryLimit); } + // Returns true iff the status indicates a PathNotFound error + // This is caused by an I/O error returning the specific "no such file or + // directory" error condition. A PathNotFound error is an I/O error with + // a specific subcode, enabling users to take appropriate action if necessary + bool IsPathNotFound() const { + return (code() == kIOError) && (subcode() == kPathNotFound); + } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; diff --git a/port/win/io_win.h b/port/win/io_win.h index c46876b8c0c0..1c9d803b13ff 100644 --- a/port/win/io_win.h +++ b/port/win/io_win.h @@ -27,7 +27,9 @@ std::string GetWindowsErrSz(DWORD err); inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) { return ((err == ERROR_HANDLE_DISK_FULL) || (err == ERROR_DISK_FULL)) ? Status::NoSpace(context, GetWindowsErrSz(err)) - : Status::IOError(context, GetWindowsErrSz(err)); + : ((err == ERROR_FILE_NOT_FOUND) || (err == ERROR_PATH_NOT_FOUND)) + ? Status::PathNotFound(context, GetWindowsErrSz(err)) + : Status::IOError(context, GetWindowsErrSz(err)); } inline Status IOErrorFromLastWindowsError(const std::string& context) { @@ -37,7 +39,9 @@ inline Status IOErrorFromLastWindowsError(const std::string& context) { inline Status IOError(const std::string& context, int err_number) { return (err_number == ENOSPC) ? Status::NoSpace(context, strerror(err_number)) - : Status::IOError(context, strerror(err_number)); + : (err_number == ENOENT) + ? Status::PathNotFound(context, strerror(err_number)) + : Status::IOError(context, strerror(err_number)); } class WinFileData; @@ -426,9 +430,7 @@ class WinMemoryMappedBuffer : public MemoryMappedFileBuffer { class WinDirectory : public Directory { HANDLE handle_; public: - explicit - WinDirectory(HANDLE h) noexcept : - handle_(h) { + explicit WinDirectory(HANDLE h) noexcept : handle_(h) { assert(handle_ != INVALID_HANDLE_VALUE); } ~WinDirectory() { diff --git a/src.mk b/src.mk index 5cc599fda255..858b3d6efad1 100644 --- a/src.mk +++ b/src.mk @@ -22,6 +22,7 @@ LIB_SOURCES = \ db/db_impl_files.cc \ db/db_impl_open.cc \ db/db_impl_readonly.cc \ + db/db_impl_secondary.cc \ db/db_impl_write.cc \ db/db_info_dumper.cc \ db/db_iter.cc \ @@ -280,6 +281,7 @@ MAIN_SOURCES = \ db/db_options_test.cc \ db/db_properties_test.cc \ db/db_range_del_test.cc \ + db/db_secondary_test.cc \ db/db_sst_test.cc \ db/db_statistics_test.cc \ db/db_table_properties_test.cc \ diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 350832937d51..002abd8f1b3b 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -2008,8 +2008,7 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header, log_number = 0; } log::Reader reader(options.info_log, std::move(wal_file_reader), &reporter, - true /* checksum */, log_number, - false /* retry_after_eof */); + true /* checksum */, log_number); std::string scratch; WriteBatch batch; Slice record; diff --git a/util/status.cc b/util/status.cc index 5b3dcf8e92e4..c66bf6f8e163 100644 --- a/util/status.cc +++ b/util/status.cc @@ -41,7 +41,8 @@ static const char* msgs[static_cast(Status::kMaxSubCode)] = { "Deadlock", // kDeadlock "Stale file handle", // kStaleFile "Memory limit reached", // kMemoryLimit - "Space limit reached" // kSpaceLimit + "Space limit reached", // kSpaceLimit + "No such file or directory", // kPathNotFound }; Status::Status(Code _code, SubCode _subcode, const Slice& msg,