Skip to content

Commit

Permalink
count obsolete records
Browse files Browse the repository at this point in the history
* count obsolete records

* fix
  • Loading branch information
artpaul authored Aug 2, 2024
1 parent e6606ed commit 4f3208c
Showing 1 changed file with 64 additions and 12 deletions.
76 changes: 64 additions & 12 deletions include/bitcask/bitcask.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ class Database {
/// The size is only updated on writing or set on loading.
std::atomic_uint64_t size{0};

/// Number of records with a value.
std::atomic_uint64_t records{0};
/// Number of obsolete records. The obsolete record is a record that has been replaced by another
/// record with the same key or by a tombstone.
std::atomic_uint64_t obsolete{0};
/// Number of tombstones.
std::atomic_uint64_t tombstones{0};

public:
FileInfo(std::filesystem::path p, uint64_t s) noexcept : path(std::move(p)), size(s) {}

Expand Down Expand Up @@ -390,7 +398,7 @@ class Database {
}
// Close read-only files.
for (const auto& parts : files_) {
std::for_each(parts.begin(), parts.end(), [this](const auto& f) { f->CloseFile(); });
std::for_each(parts.begin(), parts.end(), [](const auto& f) { f->CloseFile(); });
}
}

Expand Down Expand Up @@ -442,6 +450,7 @@ class Database {
const bool present_in_main = keys_.contains(key);
// During the merging process only the updated_keys_ can be modified.
if (auto ki = updated_keys_->find(key); ki != updated_keys_->end()) {
ki->second.file->obsolete.fetch_add(1);
if (present_in_main) {
ki->second = Record{};
} else {
Expand All @@ -452,6 +461,7 @@ class Database {
}
} else {
if (auto ki = keys_.find(key); ki != keys_.end()) {
ki->second.file->obsolete.fetch_add(1);
keys_.erase(ki);
}
}
Expand Down Expand Up @@ -520,9 +530,20 @@ class Database {
// Update key-set
if (updated_keys_) {
// During the merging process only the updated_keys_ can be modified.
updated_keys_->insert_or_assign(std::string(key), record);
if (auto ki = updated_keys_->find(key); ki == updated_keys_->end()) {
updated_keys_->emplace(key, record);
} else {
ki->second.file->obsolete.fetch_add(1);
ki->second = record;
}

} else {
keys_.insert_or_assign(std::string(key), record);
if (auto ki = keys_.find(key); ki == keys_.end()) {
keys_.emplace(key, record);
} else {
ki->second.file->obsolete.fetch_add(1);
ki->second = record;
}
}

return {};
Expand All @@ -536,7 +557,7 @@ class Database {
std::lock_guard file_lock(file_mutex_);

for (const auto& parts : files_) {
std::for_each(parts.begin(), parts.end(), [this](const auto& f) { f->CloseFile(); });
std::for_each(parts.begin(), parts.end(), [](const auto& f) { f->CloseFile(); });
}
}

Expand All @@ -562,13 +583,13 @@ class Database {
if (files_[i].empty()) {
continue;
}
if (i == 0) {
mode = CompactionMode::kScatter;
} else if (IsLastCompactionLevel(i)) {
if (IsLastCompactionLevel(i)) {
if (!force && files_[i].size() < 4) {
continue;
}
mode = CompactionMode::kGather;
} else if (i == 0) {
mode = CompactionMode::kScatter;
} else {
// Total size of the files in the slot.
const size_t total_size = std::accumulate(files_[i].begin(), files_[i].end(), 0ull,
Expand All @@ -577,7 +598,22 @@ class Database {
if (total_size > options_.max_file_size) {
mode = CompactionMode::kScatter;
} else {
if (!force && files_[i].size() < 4) {
bool process = false;

for (size_t j = 0, end = files_[i].size(); j != end; ++j) {
const auto records = files_[i][j]->records.load();
const auto obsolete = files_[i][j]->obsolete.load();

if (records == 0) {
continue;
}
if (double(obsolete) / double(records) > 0.5) {
process = true;
break;
}
}

if (!process && !force && files_[i].size() < 4) {
continue;
}
mode = CompactionMode::kGather;
Expand Down Expand Up @@ -684,8 +720,6 @@ class Database {

Database(const Options& options, const std::filesystem::path& path)
: options_(options), base_path_(path), active_files_(std::max<unsigned>(1u, options.active_files)) {
// Adjust number of compactions levels.
options_.compaction_levels = std::max<unsigned>(1u, options_.compaction_levels);
// Calculate number of slots for an LSM-tree with up to 8 nodes per level, starting with the second.
compaction_slots_count_ = ((1ull << (3 * (options_.compaction_levels + 1))) - 1) / 7;

Expand All @@ -705,6 +739,12 @@ class Database {

const auto cb = [&](const Record& record, const bool is_tombstone, std::string_view key) -> Status {
clock_ = std::max<uint64_t>(clock_, record.timestamp);
// Count entries.
if (is_tombstone) {
record.file->tombstones.fetch_add(1);
} else {
record.file->records.fetch_add(1);
}

const auto ti = tombstones.find(key);
// Process tombstone.
Expand All @@ -717,6 +757,7 @@ class Database {

if (auto ki = keys_.find(key); ki != keys_.end()) {
if (ki->second.timestamp < record.timestamp) {
ki->second.file->obsolete.fetch_add(1);
keys_.erase(ki);
}
}
Expand All @@ -726,12 +767,14 @@ class Database {

// The record will be deleted in the future. Skip it.
if (ti != tombstones.end() && record.timestamp < ti->second) {
record.file->obsolete.fetch_add(1);
return {};
}

if (const auto ki = keys_.find(key); ki == keys_.end()) {
keys_.emplace(key, record);
} else if (ki->second.timestamp < record.timestamp) {
ki->second.file->obsolete.fetch_add(1);
ki->second = record;
}

Expand Down Expand Up @@ -847,7 +890,9 @@ class Database {
});

if (status) {
updates.emplace_back(ki, rec);
if (ki != keys_.end()) {
updates.emplace_back(ki, rec);
}
} else {
return status;
}
Expand Down Expand Up @@ -1020,7 +1065,7 @@ class Database {
for (size_t offset = range.first, end = range.second; offset < end;) {
detail::Entry e;

auto [read, status] = ReadEntryImpl(file->fd, offset, false, e, key, value);
auto [read, status] = ReadEntryImpl(fd, offset, false, e, key, value);
if (!status) {
return status;
}
Expand Down Expand Up @@ -1371,6 +1416,13 @@ class Database {
return {{}, status};
}

// Count entries.
if (is_tombstone) {
file->tombstones.fetch_add(1);
} else {
file->records.fetch_add(1);
}

const Record record{
.file = file.get(),
.timestamp = timestamp,
Expand Down

0 comments on commit 4f3208c

Please sign in to comment.