Skip to content

Commit

Permalink
[BugFix] remove primary index entry when apply rowset failed (StarRoc…
Browse files Browse the repository at this point in the history
…ks#27488)

If primary key tablet apply rowset failed, we should remove primary
index cache to release memory. This pr remove primary index cache when
apply rowset failed.

Signed-off-by: zhangqiang <[email protected]>
  • Loading branch information
sevev committed Jan 4, 2024
1 parent 8bc9035 commit 4fddda6
Showing 1 changed file with 42 additions and 40 deletions.
82 changes: 42 additions & 40 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -905,16 +905,23 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
auto index_entry = manager->index_cache().get_or_create(tablet_id);
index_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms());
auto& index = index_entry->value();

auto failure_handler = [&](const std::string& msg, bool remove_update_state) {
if (remove_update_state) {
manager->update_state_cache().remove(state_entry);
}
manager->index_cache().remove(index_entry);
LOG(ERROR) << msg;
_set_error(msg);
};
// empty rowset does not need to load in-memory primary index, so skip it
if (rowset->has_data_files() || _tablet.get_enable_persistent_index()) {
auto st = index.load(&_tablet);
manager->index_cache().update_object_size(index_entry, index.memory_usage());
if (!st.ok()) {
manager->index_cache().remove(index_entry);
std::string msg = Substitute("_apply_rowset_commit error: load primary index failed: $0 $1", st.to_string(),
debug_string());
LOG(ERROR) << msg;
_set_error(msg);
std::string msg = strings::Substitute("_apply_rowset_commit error: load primary index failed: $0 $1",
st.to_string(), debug_string());
failure_handler(msg, true);
return;
}
}
Expand All @@ -928,9 +935,7 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
if (iter == _rowset_stats.end()) {
string msg = strings::Substitute("inconsistent rowset_stats, rowset not found tablet=$0 rowsetid=$1",
_tablet.tablet_id(), rowset_id);
DCHECK(false) << msg;
LOG(ERROR) << msg;
_set_error(msg);
failure_handler(msg, true);
return;
} else {
size_t num_adds = iter->second->num_rows;
Expand All @@ -940,11 +945,9 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
}
st = index.prepare(version, merge_num);
if (!st.ok()) {
manager->index_cache().remove(index_entry);
std::string msg = Substitute("_apply_rowset_commit error: primary index prepare failed: $0 $1", st.to_string(),
debug_string());
LOG(ERROR) << msg;
_set_error(msg);
std::string msg = strings::Substitute("_apply_rowset_commit error: primary index prepare failed: $0 $1",
st.to_string(), debug_string());
failure_handler(msg, true);
return;
}

Expand Down Expand Up @@ -981,22 +984,18 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
// apply partial rowset segment
st = state.apply(&_tablet, rowset.get(), rowset_id, i, latest_applied_version, index, &full_row_size);
if (!st.ok()) {
manager->update_state_cache().remove(state_entry);
std::string msg =
strings::Substitute("_apply_rowset_commit error: apply rowset update state failed: $0 $1",
st.to_string(), debug_string());
LOG(ERROR) << msg;
_set_error(msg);
failure_handler(msg, true);
return;
}
st = _do_update(rowset_id, i, conditional_column, upserts, index, tablet_id, &new_deletes);
if (!st.ok()) {
manager->update_state_cache().remove(state_entry);
std::string msg =
strings::Substitute("_apply_rowset_commit error: apply rowset update state failed: $0 $1",
st.to_string(), debug_string());
LOG(ERROR) << msg;
_set_error(msg);
failure_handler(msg, true);
return;
}
manager->index_cache().update_object_size(index_entry, index.memory_usage());
Expand Down Expand Up @@ -1034,15 +1033,21 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
st = state.apply(&_tablet, rowset.get(), rowset_id, loaded_upsert, latest_applied_version, index,
&full_row_size);
if (!st.ok()) {
manager->update_state_cache().remove(state_entry);
std::string msg = strings::Substitute(
"_apply_rowset_commit error: apply rowset update state failed: $0 $1", st.to_string(),
debug_string());
LOG(ERROR) << msg;
_set_error(msg);
failure_handler(msg, true);
return;
}
st = _do_update(rowset_id, loaded_upsert, conditional_column, upserts, index, tablet_id,
&new_deletes);
if (!st.ok()) {
std::string msg =
strings::Substitute("_apply_rowset_commit error: update primary index failed: $0 $1",
st.to_string(), debug_string());
failure_handler(msg, true);
return;
}
_do_update(rowset_id, loaded_upsert, conditional_column, upserts, index, tablet_id, &new_deletes);
manager->index_cache().update_object_size(index_entry, index.memory_usage());
}
i++;
Expand All @@ -1068,18 +1073,17 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
if (enable_persistent_index) {
st = TabletMetaManager::get_persistent_index_meta(_tablet.data_dir(), tablet_id, &index_meta);
if (!st.ok() && !st.is_not_found()) {
std::string msg = Substitute("get persistent index meta failed: $0", st.to_string());
LOG(ERROR) << msg << " " << _debug_string(false, true);
_set_error(msg);
std::string msg = strings::Substitute("get persistent index meta failed: $0 $1", st.to_string(),
_debug_string(false, true));
failure_handler(msg, true);
return;
}
}
span->AddEvent("commit_index");
st = index.commit(&index_meta);
if (!st.ok()) {
std::string msg = Substitute("primary index commit failed: $0", st.to_string());
LOG(ERROR) << msg << " " << _debug_string(false, true);
_set_error(msg);
std::string msg = strings::Substitute("primary index commit failed: $0", st.to_string());
failure_handler(msg, true);
return;
}

Expand Down Expand Up @@ -1118,10 +1122,9 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
// TODO(cbl): should get the version before this apply version, to be safe
st = manager->get_latest_del_vec(_tablet.data_dir()->get_meta(), tsid, &old_del_vec);
if (!st.ok()) {
std::string msg = Substitute("_apply_rowset_commit error: get_latest_del_vec failed: $0 $1",
st.to_string(), debug_string());
LOG(ERROR) << msg;
_set_error(msg);
std::string msg = strings::Substitute("_apply_rowset_commit error: get_latest_del_vec failed: $0 $1",
st.to_string(), debug_string());
failure_handler(msg, false);
return;
}
new_del_vecs[idx].first = rssid;
Expand Down Expand Up @@ -1175,6 +1178,7 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
{
std::lock_guard wl(_lock);
if (_edit_version_infos.empty()) {
manager->index_cache().remove(index_entry);
LOG(WARNING) << "tablet deleted when apply rowset commmit tablet:" << tablet_id;
return;
}
Expand All @@ -1195,10 +1199,9 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
}

if (!st.ok()) {
std::string msg = Substitute("_apply_rowset_commit error: write meta failed: $0 $1", st.to_string(),
_debug_string(false));
LOG(ERROR) << msg;
_set_error(msg);
std::string msg = strings::Substitute("_apply_rowset_commit error: write meta failed: $0 $1",
st.to_string(), _debug_string(false));
failure_handler(msg, false);
return;
}
// put delvec in cache
Expand Down Expand Up @@ -1229,9 +1232,8 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {

st = index.on_commited();
if (!st.ok()) {
std::string msg = Substitute("primary index on_commit failed: $0", st.to_string());
LOG(ERROR) << msg;
_set_error(msg);
std::string msg = strings::Substitute("primary index on_commit failed: $0", st.to_string());
failure_handler(msg, false);
return;
}

Expand Down

0 comments on commit 4fddda6

Please sign in to comment.