Skip to content

Commit

Permalink
add naive monkey
Browse files Browse the repository at this point in the history
  • Loading branch information
littlepig2013 committed Jul 9, 2024
1 parent dd74ad5 commit e20bbc7
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 64 deletions.
54 changes: 37 additions & 17 deletions db/bpk_alloc_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,23 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
bpk_alloc_type_ = tbo.bpk_alloc_type;

if (bpk_alloc_type_ == BitsPerKeyAllocationType::kDefaultBpkAlloc) return;
if (bpk_alloc_type_ == BitsPerKeyAllocationType::kNaiveMonkeyBpkAlloc) {
int output_level = 0;
if (compaction != NULL) {
output_level = compaction->output_level();
}
if (output_level < (int)tbo.naive_monkey_bpk_list.size()) {
naive_monkey_bpk = tbo.naive_monkey_bpk_list[output_level];
} else {
naive_monkey_bpk = overall_bits_per_key;
}
return;
}
uint64_t tmp_num_entries_in_filter_by_file = 0;
if (bpk_alloc_type_ == BitsPerKeyAllocationType::kMonkeyBpkAlloc ||
if (bpk_alloc_type_ == BitsPerKeyAllocationType::kDynamicMonkeyBpkAlloc ||
(bpk_alloc_type_ == BitsPerKeyAllocationType::kWorkloadAwareBpkAlloc &&
vstorage_->GetAccumulatedNumPointReads() == 0)) {
bpk_alloc_type_ = BitsPerKeyAllocationType::kMonkeyBpkAlloc;
bpk_alloc_type_ = BitsPerKeyAllocationType::kDynamicMonkeyBpkAlloc;

if (compaction == nullptr &&
!flush_flag_) { // skip the preparation phase for flush
Expand Down Expand Up @@ -142,20 +154,20 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
std::log(num_entries_in_filter_by_level) *
num_entries_in_filter_by_level;
}
monkey_num_entries_ += num_entries_in_filter_by_level;
dynamic_monkey_num_entries_ += num_entries_in_filter_by_level;
}
if (added_entries_in_max_level > 0 && max_level == 0) {
level_states_pq_.push(
LevelState(0, added_entries_in_max_level, max_fd_number + 11));
temp_sum_in_bpk_optimization_ +=
std::log(added_entries_in_max_level) * added_entries_in_max_level;
}
total_bits_for_filter_ = monkey_num_entries_ * overall_bits_per_key;
total_bits_for_filter_ = dynamic_monkey_num_entries_ * overall_bits_per_key;
common_constant_in_bpk_optimization_ =
-(total_bits_for_filter_ * log_2_squared +
temp_sum_in_bpk_optimization_) /
monkey_num_entries_;
std::unordered_set<size_t> levelIDs_with_bpk0_in_monkey;
dynamic_monkey_num_entries_;
std::unordered_set<size_t> levelIDs_with_bpk0_in_dynamic_monkey;
while (!level_states_pq_.empty() &&
std::log(level_states_pq_.top().num_entries) +
common_constant_in_bpk_optimization_ >
Expand All @@ -167,24 +179,27 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
}
}
} else {
levelIDs_with_bpk0_in_monkey.insert(level_states_pq_.top().level);
levelIDs_with_bpk0_in_dynamic_monkey.insert(
level_states_pq_.top().level);
}
tmp_num_entries_in_filter_by_file = level_states_pq_.top().num_entries;
temp_sum_in_bpk_optimization_ -=
std::log(tmp_num_entries_in_filter_by_file) *
tmp_num_entries_in_filter_by_file;
monkey_num_entries_ -= tmp_num_entries_in_filter_by_file;
dynamic_monkey_num_entries_ -= tmp_num_entries_in_filter_by_file;
common_constant_in_bpk_optimization_ =
-(total_bits_for_filter_ * log_2_squared +
temp_sum_in_bpk_optimization_) /
monkey_num_entries_;
dynamic_monkey_num_entries_;
level_states_pq_.pop();
}

vstorage_->SetLevelIDsWithEmptyBpkInMonkey(levelIDs_with_bpk0_in_monkey);
vstorage_->SetLevelIDsWithEmptyBpkInDynamicMonkey(
levelIDs_with_bpk0_in_dynamic_monkey);

if (!level_states_pq_.empty()) {
monkey_bpk_num_entries_threshold_ = level_states_pq_.top().num_entries;
dynamic_monkey_bpk_num_entries_threshold_ =
level_states_pq_.top().num_entries;
}
} else if (bpk_alloc_type_ ==
BitsPerKeyAllocationType::kWorkloadAwareBpkAlloc) {
Expand Down Expand Up @@ -319,9 +334,9 @@ void BitsPerKeyAllocHelper::PrepareBpkAllocation(const Compaction* compaction) {
std::log(file_workload_state_pq_.top().weight * total_empty_queries_) +
common_constant_in_bpk_optimization_ >
-log_2_squared) {
// if (file_workload_state_pq_.top().meta != nullptr) {
// file_workload_state_pq_.top().meta->bpk = 0.0;
// }
if (file_workload_state_pq_.top().meta != nullptr) {
file_workload_state_pq_.top().meta->bpk = 0.0;
}
weight = file_workload_state_pq_.top().weight;
temp_sum_in_bpk_optimization_ -=
std::log(weight * total_empty_queries_) *
Expand Down Expand Up @@ -359,22 +374,27 @@ bool BitsPerKeyAllocHelper::IfNeedAllocateBitsPerKey(
if (bpk_alloc_type_ == BitsPerKeyAllocationType::kDefaultBpkAlloc)
return false;
assert(bits_per_key);
if (bpk_alloc_type_ == BitsPerKeyAllocationType::kNaiveMonkeyBpkAlloc) {
*bits_per_key = naive_monkey_bpk;
return true;
}

if (bpk_alloc_type_ == BitsPerKeyAllocationType::kMonkeyBpkAlloc ||
if (bpk_alloc_type_ == BitsPerKeyAllocationType::kDynamicMonkeyBpkAlloc ||
(bpk_alloc_type_ == BitsPerKeyAllocationType::kWorkloadAwareBpkAlloc &&
vstorage_->GetAccumulatedNumPointReads() == 0)) {
if (!bpk_optimization_prepared_flag_) {
flush_flag_ = true;
temp_sum_in_bpk_optimization_ +=
num_entries_in_output_level * std::log(num_entries_in_output_level);
monkey_num_entries_ += num_entries_in_output_level;
dynamic_monkey_num_entries_ += num_entries_in_output_level;
level_states_pq_.push(
LevelState(0, num_entries_in_output_level, meta.fd.GetNumber()));
PrepareBpkAllocation();
}

// for bits-per-key < 1, give it 1 if it is larger than or equal to 0.5
if (/*num_entries_in_output_level > monkey_bpk_num_entries_threshold_ ||*/
if (/*num_entries_in_output_level >
dynamic_monkey_bpk_num_entries_threshold_ ||*/
std::log(num_entries_in_output_level) +
common_constant_in_bpk_optimization_ >
-log_2_squared * 0.5) {
Expand Down
5 changes: 3 additions & 2 deletions db/bpk_alloc_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ class BitsPerKeyAllocHelper {
BitsPerKeyAllocationType bpk_alloc_type_ =
BitsPerKeyAllocationType::kDefaultBpkAlloc;
bool flush_flag_ = false;
double naive_monkey_bpk = 0.0;
bool bpk_optimization_prepared_flag_ = false;
double workload_aware_bpk_weight_threshold_ =
std::numeric_limits<double>::max();
uint64_t monkey_bpk_num_entries_threshold_ =
uint64_t dynamic_monkey_bpk_num_entries_threshold_ =
std::numeric_limits<uint64_t>::max();
uint64_t monkey_num_entries_ = 0;
uint64_t dynamic_monkey_num_entries_ = 0;
uint64_t workload_aware_num_entries_ = 0;
uint64_t workload_aware_num_entries_with_empty_queries_ = 0;
double temp_sum_in_bpk_optimization_ = 0;
Expand Down
122 changes: 100 additions & 22 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
#include "db/wide/wide_columns_helper.h"
#include "file/file_util.h"
#include "logging/logging.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/filter_policy_internal.h"
#include "table/compaction_merging_iterator.h"

#if USE_COROUTINES
Expand Down Expand Up @@ -98,6 +100,8 @@ namespace ROCKSDB_NAMESPACE {

namespace {

const double log_2_squared = std::pow(std::log(2), 2);

// Find File in LevelFilesBrief data structure
// Within an index range defined by left and right
int FindFileInRange(const InternalKeyComparator& icmp,
Expand Down Expand Up @@ -2312,6 +2316,39 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
FSSupportedOps::kAsyncIO)) {
use_async_io_ = true;
}

max_modulars_in_cache_ = 0;
data_block_size_ = 1;
overall_bits_per_key_ = 0.0;
max_bits_per_key_granularity_ = 0.0;
max_modulars_ = 1;
if (cfd_ != nullptr && cfd_->ioptions() != nullptr) {
if (cfd_->ioptions()->table_factory != nullptr) {
if (strcmp(cfd_->ioptions()->table_factory->Name(),
TableFactory::kBlockBasedTableName()) == 0) {
const BlockBasedTableOptions tbo =
std::static_pointer_cast<BlockBasedTableFactory>(
cfd_->ioptions()->table_factory)
->GetBlockBasedTableOptions();
if (tbo.filter_policy != nullptr) {
overall_bits_per_key_ =
std::static_pointer_cast<const BloomLikeFilterPolicy>(
tbo.filter_policy)
->GetBitsPerKey();
}
max_bits_per_key_granularity_ = tbo.max_bits_per_key_granularity;
if (tbo.block_cache) {
uint32_t num_entries_per_file =
mutable_cf_options_.target_file_size_base / tbo.block_size;
max_modulars_in_cache_ =
tbo.block_cache->GetCapacity() /
(num_entries_per_file * max_bits_per_key_granularity_);
}
data_block_size_ = tbo.block_size;
max_modulars_ = tbo.max_modulars;
}
}
}
}

Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
Expand Down Expand Up @@ -2575,14 +2612,15 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
get_perf_context()->per_level_perf_context_enabled;
StopWatchNano timer(clock_, timer_enabled /* auto_start */);
bool skip_filter = IsFilterSkipped(
static_cast<int>(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel(),
f->file_metadata, &get_context.max_accessed_modulars_);
*status = table_cache_->Get(
read_options, *internal_comparator(), *f->file_metadata, ikey,
&get_context, mutable_cf_options_.block_protection_bytes_per_key,
mutable_cf_options_.prefix_extractor,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel(), f->file_metadata),
fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_);
skip_filter, fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_);
// TODO: examine the behavior for corrupted key
if (timer_enabled) {
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
Expand Down Expand Up @@ -3219,7 +3257,10 @@ Status Version::MultiGetAsync(
#endif

bool Version::IsFilterSkipped(int level, bool is_file_last_in_level,
const FileMetaData* meta) {
const FileMetaData* meta,
uint8_t* max_accessed_modulars) {
// read no modules by default
if (max_accessed_modulars) *max_accessed_modulars = 0;
// Reaching the bottom level implies misses at all upper levels, so we'll
// skip checking the filters when we predict a hit.
bool result = cfd_->ioptions()->optimize_filters_for_hits &&
Expand All @@ -3232,13 +3273,17 @@ bool Version::IsFilterSkipped(int level, bool is_file_last_in_level,
return result;
}

if (meta->bpk == 0)
// we already mark the bpk as 0 for each FileMetaData that is supposed to be
// skipped, so in most cases we can read the filter if bpk is not 0.
if (meta->bpk == 0) {
return true; // bpk == 0 means we choose not to build filter, thus we
// should skip it
}

if (storage_info_.GetBitsPerKeyAllocationType() ==
BitsPerKeyAllocationType::kMonkeyBpkAlloc) {
return storage_info_.IsFilterSkippedWithEmptyBpkInMonkey(level);
BitsPerKeyAllocationType::kDynamicMonkeyBpkAlloc) {
if (max_accessed_modulars) *max_accessed_modulars = max_modulars_;
return storage_info_.IsFilterSkippedWithEmptyBpkInDynamicMonkey(level);
} else if (storage_info_.GetBitsPerKeyAllocationType() ==
BitsPerKeyAllocationType::kWorkloadAwareBpkAlloc) {
std::pair<uint64_t, uint64_t> num_point_read_stats =
Expand All @@ -3250,33 +3295,66 @@ bool Version::IsFilterSkipped(int level, bool is_file_last_in_level,
if (num_point_reads == 0) {
meta->stats.start_global_point_read_number =
storage_info_.GetAccumulatedNumPointReads();
// first time read a file, only read the first module
if (max_accessed_modulars) *max_accessed_modulars = 1;
return false;
}
if (num_point_reads <= num_existing_point_reads ||
storage_info_.accumulated_num_empty_point_reads_by_file_ == 0) {
// all queries are existing queries, we can skip all the modules
return true;
}
// we already mark the bpk as 0 for each FileMetaData that is supposed to be
// skipped, so in most cases we can read the filter if bpk is not 0.
// However, in case of workload shifting, we can use dynamic tracking method
// to re-evaluate whether we need to skip the current filter when we have
// sufficient recent data (the tracking window is fulfilled, otherwise we
// still read the filter)
if (cfd_->ioptions()->point_reads_track_method !=
kDynamicCompactionAwareTrack) {
return false;
}
// if (meta->stats.global_point_read_number_window.size() <
// cfd_->ioptions()->track_point_read_number_window_size) {
// return false;

// if (cfd_->ioptions()->point_reads_track_method !=
// kDynamicCompactionAwareTrack) {
// return false;
// }

uint64_t num_empty_point_reads = num_point_reads - num_existing_point_reads;

result =
(std::log((meta->num_entries - meta->num_range_deletions) * 1.0 /
(num_point_reads - num_existing_point_reads) *
num_empty_point_reads *
storage_info_.accumulated_num_empty_point_reads_by_file_.load(
std::memory_order_relaxed)) +
storage_info_.GetBitsPerKeyCommonConstant()) > 0;
return result;
// skip this filter if no bpk should be assigned
if (result || max_accessed_modulars == NULL) return result;
*max_accessed_modulars = max_modulars_;
/*
size_t filter_size = 0;
double bpk = 0.0;
double last_fpr = 1.0;
double fpr = 0.0;
if (meta->bpk == -1) {
bpk = overall_bits_per_key_;
} else {
bpk = std::min(meta->bpk, max_bits_per_key_granularity_);
}
filter_size = bpk*(meta->num_entries - meta->num_range_deletions)/8;
fpr = std::exp(-1.0 * bpk * log_2_squared);
double expected_accesses = 0.0;
while (true) {
expected_accesses = num_empty_point_reads*last_fpr +
num_existing_point_reads;
if(std::log(filter_size*1.0/data_block_size_*expected_accesses/((1.0 -
fpr)*num_empty_point_reads*last_fpr)) < -std::log(1.0 -
expected_accesses*1.0/storage_info_.GetAccumulatedNumPointReads())*max_modulars_in_cache_
- 1) {
(*max_accessed_modulars)++;
result = false;
last_fpr *= fpr;
if (bpk <= max_bits_per_key_granularity_) return result;
bpk -= max_bits_per_key_granularity_;
fpr = std::exp(-1.0 * bpk * log_2_squared);
filter_size = bpk*(meta->num_entries - meta->num_range_deletions)/8;
} else {
if (*max_accessed_modulars == 0) result = true;
break;
}
}
return result;*/
}
return false;
}
Expand Down
13 changes: 10 additions & 3 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -688,12 +688,12 @@ class VersionStorageInfo {
return common_constant_in_bpk_optimization_;
}

void SetLevelIDsWithEmptyBpkInMonkey(
void SetLevelIDsWithEmptyBpkInDynamicMonkey(
const std::unordered_set<size_t>& _levelIDs_with_bpk0_in_monkey) const {
levelIDs_with_bpk0_in_monkey_ = _levelIDs_with_bpk0_in_monkey;
}

bool IsFilterSkippedWithEmptyBpkInMonkey(size_t level) const {
bool IsFilterSkippedWithEmptyBpkInDynamicMonkey(size_t level) const {
return levelIDs_with_bpk0_in_monkey_.find(level) !=
levelIDs_with_bpk0_in_monkey_.end();
}
Expand Down Expand Up @@ -1158,7 +1158,8 @@ class Version {
// the filter block may already be cached, but we still do not access it such
// that it eventually expires from the cache.
bool IsFilterSkipped(int level, bool is_file_last_in_level = false,
const FileMetaData* meta = nullptr);
const FileMetaData* meta = nullptr,
uint8_t* max_accessed_modulars = nullptr);

// The helper function of UpdateAccumulatedStats, which may fill the missing
// fields of file_meta from its associated TableProperties.
Expand Down Expand Up @@ -1222,6 +1223,12 @@ class Version {
uint64_t version_number_;
std::shared_ptr<IOTracer> io_tracer_;
bool use_async_io_;
// for modular filter skipping
double overall_bits_per_key_;
double max_bits_per_key_granularity_;
uint64_t max_modulars_in_cache_;
size_t data_block_size_;
uint32_t max_modulars_;

Version(ColumnFamilyData* cfd, VersionSet* vset, const FileOptions& file_opt,
MutableCFOptions mutable_cf_options,
Expand Down
Loading

0 comments on commit e20bbc7

Please sign in to comment.