Skip to content

Commit

Permalink
[BugFix] PersistentIndex may keep unused delete operations (StarRocks…
Browse files Browse the repository at this point in the history
…#34352)

When we ingest a large amount of data to the primary key table with a persistent index, we may flush many temp l1 files in advance to reduce memory usage during ingestion. These temp l1 files will keep deleting operations to ensure data is correct. After ingestion, we will merge these temp l1 files and merge delete operations. However, if we have only one temp l1 file, the delete operation also writes into the new l1 file, which can make the l1 file larger than expected, leading to a waste of disk IO.

Signed-off-by: zhangqiang <[email protected]>
  • Loading branch information
sevev committed Nov 8, 2023
1 parent c475b83 commit ae6f439
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 9 deletions.
24 changes: 15 additions & 9 deletions be/src/storage/persistent_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3598,6 +3598,10 @@ Status merge_shard_kvs_fixed_len(std::vector<KVRef>& l0_kvs, std::vector<std::ve
kvs_set.reserve(estimated_size);
DCHECK(!l1_kvs.empty());
for (const auto& kv : l1_kvs[0]) {
const auto v = UNALIGNED_LOAD64(kv.kv_pos + KeySize);
if (v == NullIndexValue) {
continue;
}
const auto [_, inserted] = kvs_set.emplace(kv);
DCHECK(inserted) << "duplicate key found when in l1 index";
if (!inserted) {
Expand Down Expand Up @@ -3648,6 +3652,10 @@ Status merge_shard_kvs_var_len(std::vector<KVRef>& l0_kvs, std::vector<std::vect
kvs_set.reserve(estimate_size);
DCHECK(!l1_kvs.empty());
for (const auto& kv : l1_kvs[0]) {
const auto v = UNALIGNED_LOAD64(kv.kv_pos + kv.size - kIndexValueSize);
if (v == NullIndexValue) {
continue;
}
const auto [_, inserted] = kvs_set.emplace(kv);
DCHECK(inserted) << "duplicate key found when in l1 index";
if (!inserted) {
Expand Down Expand Up @@ -4041,15 +4049,6 @@ Status PersistentIndex::_merge_compaction() {
if (_l1_vec.empty()) {
return Status::InternalError("cannot do merge_compaction without l1");
}
// if _l0 is empty() and _l1_vec only has one _l1, we can rename it directly
if (_l0->size() == 0) {
if (!_has_l1 && _l1_vec.size() == 1) {
const std::string idx_file_path =
strings::Substitute("$0/index.l1.$1.$2", _path, _version.major(), _version.minor());
const std::string idx_file_path_tmp = _l1_vec[0]->_file->filename();
return FileSystem::Default()->rename_file(idx_file_path_tmp, idx_file_path);
}
}
auto writer = std::make_unique<ImmutableIndexWriter>();
const std::string idx_file_path =
strings::Substitute("$0/index.l1.$1.$2", _path, _version.major(), _version.minor());
Expand All @@ -4062,6 +4061,13 @@ Status PersistentIndex::_merge_compaction() {
if (_usage != writer->total_kv_size()) {
_usage = writer->total_kv_size();
}
if (_size != writer->total_kv_num()) {
std::string msg =
strings::Substitute("inconsistent kv num after merge compaction, actual:$0, expect:$1, index_file:$2",
writer->total_kv_num(), _size, writer->index_file());
LOG(ERROR) << msg;
return Status::InternalError(msg);
}
return writer->finish();
}

Expand Down
16 changes: 16 additions & 0 deletions be/src/storage/persistent_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ class ImmutableIndexWriter {

size_t file_size() { return _total_bytes; }

size_t total_kv_num() { return _total; }

std::string index_file() { return _idx_file_path; }

private:
EditVersion _version;
string _idx_file_path_tmp;
Expand Down Expand Up @@ -642,6 +646,18 @@ class PersistentIndex {

static double major_compaction_score(size_t l1_count, size_t l2_count);

// not thread safe, just for unit test
size_t kv_num_in_immutable_index() {
size_t res = 0;
for (int i = 0; i < _l1_vec.size(); i++) {
res += _l1_vec[i]->total_size();
}
for (int i = 0; i < _l2_vec.size(); i++) {
res += _l2_vec[i]->total_size();
}
return res;
}

protected:
Status _delete_expired_index_file(const EditVersion& l0_version, const EditVersion& l1_version,
const EditVersionWithMerge& min_l2_version);
Expand Down
76 changes: 76 additions & 0 deletions be/test/storage/persistent_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2206,4 +2206,80 @@ PARALLEL_TEST(PersistentIndexTest, test_l2_versions) {
ASSERT_TRUE(m10 < m9);
}

PARALLEL_TEST(PersistentIndexTest, test_index_keep_delete) {
config::l0_max_mem_usage = 1024;
config::enable_pindex_minor_compaction = false;
FileSystem* fs = FileSystem::Default();
const std::string kPersistentIndexDir = "./PersistentIndexTest_test_index_keep_delete";
const std::string kIndexFile = "./PersistentIndexTest_test_index_keep_delete/index.l0.0.0";
bool created;
ASSERT_OK(fs->create_dir_if_missing(kPersistentIndexDir, &created));

using Key = std::string;
PersistentIndexMetaPB index_meta;
const int N = 10000;
const int DEL_N = 90000;
int64_t cur_version = 0;

// insert
vector<Key> keys(N);
vector<Slice> key_slices;
vector<IndexValue> values;
key_slices.reserve(N);
for (int i = 0; i < N; i++) {
keys[i] = "test_varlen_" + std::to_string(i);
values.emplace_back(i);
key_slices.emplace_back(keys[i]);
}
// erase
vector<Key> erase_keys(DEL_N);
vector<Slice> erase_key_slices;
erase_key_slices.reserve(DEL_N);
for (int i = 0; i < DEL_N; i++) {
erase_keys[i] = "test_varlen_" + std::to_string(i);
erase_key_slices.emplace_back(erase_keys[i]);
}

{
ASSIGN_OR_ABORT(auto wfile, FileSystem::Default()->new_writable_file(kIndexFile));
ASSERT_OK(wfile->close());
}

{
EditVersion version(cur_version++, 0);
index_meta.set_key_size(0);
index_meta.set_size(0);
version.to_pb(index_meta.mutable_version());
MutableIndexMetaPB* l0_meta = index_meta.mutable_l0_meta();
l0_meta->set_format_version(PERSISTENT_INDEX_VERSION_3);
IndexSnapshotMetaPB* snapshot_meta = l0_meta->mutable_snapshot();
version.to_pb(snapshot_meta->mutable_version());

PersistentIndex index(kPersistentIndexDir);

ASSERT_OK(index.load(index_meta));
ASSERT_OK(index.prepare(EditVersion(cur_version++, 0), N));
// erase non-exist keys
// flush advance
vector<IndexValue> erase_old_values(erase_keys.size());
ASSERT_TRUE(index.erase(erase_keys.size(), erase_key_slices.data(), erase_old_values.data()).ok());
ASSERT_TRUE(index.commit(&index_meta).ok());
ASSERT_TRUE(index.on_commited().ok());
ASSERT_EQ(0, index.kv_num_in_immutable_index());

ASSERT_OK(index.prepare(EditVersion(cur_version++, 0), N));
// erase non-exist keys
// flush advance
ASSERT_TRUE(index.erase(erase_keys.size(), erase_key_slices.data(), erase_old_values.data()).ok());
// not trigger flush advance
config::l0_max_mem_usage = 100 * 1024 * 1024; // 100MB
std::vector<IndexValue> old_values(keys.size());
ASSERT_TRUE(index.upsert(keys.size(), key_slices.data(), values.data(), old_values.data()).ok());
ASSERT_TRUE(index.commit(&index_meta).ok());
ASSERT_TRUE(index.on_commited().ok());
ASSERT_EQ(N, index.kv_num_in_immutable_index());
}
ASSERT_TRUE(fs::remove_all(kPersistentIndexDir).ok());
}

} // namespace starrocks

0 comments on commit ae6f439

Please sign in to comment.