Skip to content

Commit

Permalink
dedup ReadOptions in iterator hierarchy (facebook#7210)
Browse files Browse the repository at this point in the history
Summary:
Previously, a `ReadOptions` object was stored in every `BlockBasedTableIterator`
and every `LevelIterator`. This redundancy consumes extra memory,
resulting in the `Arena` making more allocations, and iteration
observing worse cache performance.

This PR migrates callers of `NewInternalIterator()` and
`MakeInputIterator()` to provide a `ReadOptions` object guaranteed to
outlive the returned iterator. When the iterator's lifetime will be managed by the
user, this lifetime guarantee is achieved by storing the `ReadOptions`
value in `ArenaWrappedDBIter`. Then, sub-iterators of `NewInternalIterator()` and
`MakeInputIterator()` can hold a reference-to-const `ReadOptions`.

Pull Request resolved: facebook#7210

Test Plan:
- `make check` under ASAN and valgrind
- benchmark: on a DB with 2 L0 files and 3 L1+ levels, this PR reduced `Arena` allocation 4792 -> 4160 bytes.

Reviewed By: anand1976

Differential Revision: D22861323

Pulled By: ajkr

fbshipit-source-id: 54aebb3e89c872eeab0f5793b4b6e42878d093ce
  • Loading branch information
ajkr authored and facebook-github-bot committed Aug 3, 2020
1 parent 18efd76 commit a4a4a2d
Show file tree
Hide file tree
Showing 19 changed files with 155 additions and 99 deletions.
4 changes: 2 additions & 2 deletions db/arena_wrapped_db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options,
true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, allow_blob);
sv_number_ = version_number;
read_options_ = read_options;
allow_refresh_ = allow_refresh;
}

Expand Down Expand Up @@ -98,8 +99,7 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator(
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, allow_blob, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback,
allow_blob);
iter->StoreRefreshInfo(db_impl, cfd, read_callback, allow_blob);
}

return iter;
Expand Down
7 changes: 3 additions & 4 deletions db/arena_wrapped_db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class ArenaWrappedDBIter : public Iterator {
virtual ReadRangeDelAggregator* GetRangeDelAggregator() {
return db_iter_->GetRangeDelAggregator();
}
const ReadOptions& GetReadOptions() { return read_options_; }

// Set the internal iterator wrapped inside the DB Iterator. Usually it is
// a merging iterator.
Expand Down Expand Up @@ -79,10 +80,8 @@ class ArenaWrappedDBIter : public Iterator {

// Store some parameters so we can refresh the iterator at a later point
// with these same params
void StoreRefreshInfo(const ReadOptions& read_options, DBImpl* db_impl,
ColumnFamilyData* cfd, ReadCallback* read_callback,
bool allow_blob) {
read_options_ = read_options;
void StoreRefreshInfo(DBImpl* db_impl, ColumnFamilyData* cfd,
ReadCallback* read_callback, bool allow_blob) {
db_impl_ = db_impl;
cfd_ = cfd;
read_callback_ = read_callback;
Expand Down
3 changes: 2 additions & 1 deletion db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ Status BuildTable(
// No matter whether use_direct_io_for_flush_and_compaction is true,
// we will regrad this verification as user reads since the goal is
// to cache it here for further user reads
ReadOptions read_options;
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
ReadOptions(), file_options, internal_comparator, *meta,
read_options, file_options, internal_comparator, *meta,
nullptr /* range_del_agg */,
mutable_cf_options.prefix_extractor.get(), nullptr,
(internal_stats == nullptr) ? nullptr
Expand Down
16 changes: 13 additions & 3 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,9 @@ Status CompactionJob::Run() {
// No matter whether use_direct_io_for_flush_and_compaction is true,
// we will regard this verification as user reads since the goal is
// to cache it here for further user reads
ReadOptions read_options;
InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), file_options_, cfd->internal_comparator(),
read_options, file_options_, cfd->internal_comparator(),
files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
prefix_extractor,
/*table_reader_ptr=*/nullptr,
Expand Down Expand Up @@ -877,11 +878,20 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {

CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
existing_snapshots_);
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.fill_cache = false;
// Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for
// (a) concurrent compactions,
// (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
read_options.total_order_seek = true;

// Although the v2 aggregator is what the level iterator(s) know about,
// the AddTombstones calls will be propagated down to the v1 aggregator.
std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
sub_compact->compaction, &range_del_agg, file_options_for_read_));
std::unique_ptr<InternalIterator> input(
versions_->MakeInputIterator(read_options, sub_compact->compaction,
&range_del_agg, file_options_for_read_));

AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
Expand Down
33 changes: 25 additions & 8 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,35 @@ TEST_F(DBBasicTest, ReadOnlyDB) {
ASSERT_OK(Put("foo", "v3"));
Close();

auto verify_one_iter = [&](Iterator* iter) {
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
++count;
}
// Always expect two keys: "foo" and "bar"
ASSERT_EQ(count, 2);
};

auto verify_all_iters = [&]() {
Iterator* iter = db_->NewIterator(ReadOptions());
verify_one_iter(iter);
delete iter;

std::vector<Iterator*> iters;
ASSERT_OK(db_->NewIterators(ReadOptions(),
{dbfull()->DefaultColumnFamily()}, &iters));
ASSERT_EQ(static_cast<uint64_t>(1), iters.size());
verify_one_iter(iters[0]);
delete iters[0];
};

auto options = CurrentOptions();
assert(options.env == env_);
ASSERT_OK(ReadOnlyReopen(options));
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
Iterator* iter = db_->NewIterator(ReadOptions());
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
++count;
}
ASSERT_EQ(count, 2);
delete iter;
verify_all_iters();
Close();

// Reopen and flush memtable.
Expand All @@ -124,6 +140,7 @@ TEST_F(DBBasicTest, ReadOnlyDB) {
ASSERT_OK(ReadOnlyReopen(options));
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
verify_all_iters();
ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
}

Expand Down
9 changes: 6 additions & 3 deletions db/db_compaction_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
&arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
Expand Down Expand Up @@ -426,8 +427,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilter) {
InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
&arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
read_options, &arena, &range_del_agg, kMaxSequenceNumber, handles_[1]));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
Expand Down Expand Up @@ -644,8 +646,9 @@ TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) {
InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* snapshots */);
ReadOptions read_options;
ScopedArenaIterator iter(dbfull()->NewInternalIterator(
&arena, &range_del_agg, kMaxSequenceNumber));
read_options, &arena, &range_del_agg, kMaxSequenceNumber));
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
Expand Down
22 changes: 12 additions & 10 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1364,9 +1364,12 @@ bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
}
}

InternalIterator* DBImpl::NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
ColumnFamilyHandle* column_family, bool allow_unprepared_value) {
InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
Arena* arena,
RangeDelAggregator* range_del_agg,
SequenceNumber sequence,
ColumnFamilyHandle* column_family,
bool allow_unprepared_value) {
ColumnFamilyData* cfd;
if (column_family == nullptr) {
cfd = default_cf_handle_->cfd();
Expand All @@ -1378,9 +1381,8 @@ InternalIterator* DBImpl::NewInternalIterator(
mutex_.Lock();
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
mutex_.Unlock();
ReadOptions roptions;
return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg,
sequence, allow_unprepared_value);
return NewInternalIterator(read_options, cfd, super_version, arena,
range_del_agg, sequence, allow_unprepared_value);
}

void DBImpl::SchedulePurge() {
Expand Down Expand Up @@ -2829,10 +2831,10 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
sv->version_number, read_callback, this, cfd, allow_blob,
read_options.snapshot != nullptr ? false : allow_refresh);

InternalIterator* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot,
/* allow_unprepared_value */ true);
InternalIterator* internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot,
/* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter);

return db_iter;
Expand Down
16 changes: 11 additions & 5 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,10 @@ class DBImpl : public DB {
// the value and so will require PrepareValue() to be called before value();
// allow_unprepared_value = false is convenient when this optimization is not
// useful, e.g. when reading the whole column family.
// @param read_options Must outlive the returned iterator.
InternalIterator* NewInternalIterator(
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
const ReadOptions& read_options, Arena* arena,
RangeDelAggregator* range_del_agg, SequenceNumber sequence,
ColumnFamilyHandle* column_family = nullptr,
bool allow_unprepared_value = false);

Expand Down Expand Up @@ -721,10 +723,14 @@ class DBImpl : public DB {

const WriteController& write_controller() { return write_controller_; }

InternalIterator* NewInternalIterator(
const ReadOptions&, ColumnFamilyData* cfd, SuperVersion* super_version,
Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
bool allow_unprepared_value);
// @param read_options Must outlive the returned iterator.
InternalIterator* NewInternalIterator(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SuperVersion* super_version,
Arena* arena,
RangeDelAggregator* range_del_agg,
SequenceNumber sequence,
bool allow_unprepared_value);

// hollow transactions shell used for recovery.
// these will then be passed to TransactionDB so that
Expand Down
16 changes: 8 additions & 8 deletions db/db_impl/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options,
read_seq,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback);
auto internal_iter =
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), read_seq,
/* allow_unprepared_value */ true);
auto internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), read_seq,
/* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}
Expand Down Expand Up @@ -118,10 +118,10 @@ Status DBImplReadOnly::NewIterators(
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
sv->version_number, read_callback);
auto* internal_iter =
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), read_seq,
/* allow_unprepared_value */ true);
auto* internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), read_seq,
/* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter);
iterators->push_back(db_iter);
}
Expand Down
8 changes: 4 additions & 4 deletions db/db_impl/db_impl_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,10 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
snapshot,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback);
auto internal_iter =
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot,
/* allow_unprepared_value */ true);
auto internal_iter = NewInternalIterator(
db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot,
/* allow_unprepared_value */ true);
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}
Expand Down
15 changes: 9 additions & 6 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -907,12 +907,13 @@ std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
InternalKeyComparator icmp(options.comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
ReadOptions read_options;
ScopedArenaIterator iter;
if (cf == 0) {
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
kMaxSequenceNumber));
} else {
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
kMaxSequenceNumber, handles_[cf]));
}
InternalKey target(user_key, kMaxSequenceNumber, kTypeValue);
Expand Down Expand Up @@ -1322,12 +1323,13 @@ void DBTestBase::validateNumberOfEntries(int numValues, int cf) {
kMaxSequenceNumber /* upper_bound */);
// This should be defined after range_del_agg so that it destructs the
// assigned iterator before it range_del_agg is already destructed.
ReadOptions read_options;
ScopedArenaIterator iter;
if (cf != 0) {
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
kMaxSequenceNumber, handles_[cf]));
} else {
iter.set(dbfull()->NewInternalIterator(&arena, &range_del_agg,
iter.set(dbfull()->NewInternalIterator(read_options, &arena, &range_del_agg,
kMaxSequenceNumber));
}
iter->SeekToFirst();
Expand Down Expand Up @@ -1530,8 +1532,9 @@ void DBTestBase::VerifyDBInternal(
InternalKeyComparator icmp(last_options_.comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
auto iter =
dbfull()->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber);
ReadOptions read_options;
auto iter = dbfull()->NewInternalIterator(read_options, &arena,
&range_del_agg, kMaxSequenceNumber);
iter->SeekToFirst();
for (auto p : true_data) {
ASSERT_TRUE(iter->Valid());
Expand Down
1 change: 1 addition & 0 deletions db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class TableCache {
// the returned iterator. The returned "*table_reader_ptr" object is owned
// by the cache and should not be deleted, and is valid for as long as the
// returned iterator is live.
// @param options Must outlive the returned iterator.
// @param range_del_agg If non-nullptr, adds range deletions to the
// aggregator. If an error occurs, returns it in a NewErrorInternalIterator
// @param for_compaction If true, a new TableReader may be allocated (but
Expand Down
15 changes: 4 additions & 11 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,7 @@ namespace {

class LevelIterator final : public InternalIterator {
public:
// @param read_options Must outlive this iterator.
LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
const FileOptions& file_options,
const InternalKeyComparator& icomparator,
Expand Down Expand Up @@ -1020,7 +1021,7 @@ class LevelIterator final : public InternalIterator {
}

TableCache* table_cache_;
const ReadOptions read_options_;
const ReadOptions& read_options_;
const FileOptions& file_options_;
const InternalKeyComparator& icomparator_;
const UserComparatorWrapper user_comparator_;
Expand Down Expand Up @@ -5670,18 +5671,10 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_table_files,
}

InternalIterator* VersionSet::MakeInputIterator(
const Compaction* c, RangeDelAggregator* range_del_agg,
const ReadOptions& read_options, const Compaction* c,
RangeDelAggregator* range_del_agg,
const FileOptions& file_options_compactions) {
auto cfd = c->column_family_data();
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.fill_cache = false;
// Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for
// (a) concurrent compactions,
// (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
read_options.total_order_seek = true;

// Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level.
// TODO(opt): use concatenating iterator for level-0 if there is no overlap
Expand Down
Loading

0 comments on commit a4a4a2d

Please sign in to comment.