Skip to content

Commit

Permalink
[BugFix] Fix incorrect estimation of average row size when doing part…
Browse files Browse the repository at this point in the history
…ial update (StarRocks#27485)

When we do partial update, we first write a partial rowset and we will
load the left columns data into memory during the following apply phase
and we process the rowset data by segment. So if one segment file has
too many rows, a large memory is needed. So we will get the average row
size of historical data to avoid large memory alloc.

The following code is how we get the average row size. However, when we
do partial update, the `data_size` is the compressed partial rowset data
size which far less than total row size, so the `average_row_size` may
far less than real average row size. So we may write too many rows into
one segment which cause large memory alloc.
```
int64_t TabletUpdates::get_average_row_size() {
    TTabletInfo info;
    get_tablet_info_extra(&info);
    int64_t total_row = info.row_count;
    int64_t total_size = info.data_size;
    if (total_row != 0) {
        return total_size / total_row;
    } else {
        return 0;
    }
```
This pr fix the issue and return the correct average row size of
historical data.

Signed-off-by: zhangqiang <[email protected]>
  • Loading branch information
sevev committed Jul 25, 2023
1 parent 5aa93bf commit 7b5466b
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 11 deletions.
10 changes: 10 additions & 0 deletions be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ Status Rowset::reload_segment(int32_t segment_id) {
return Status::OK();
}

int64_t Rowset::total_segment_data_size() {
int64_t res = 0;
for (auto& seg : _segments) {
if (seg != nullptr) {
res += seg->get_data_size();
}
}
return res;
}

StatusOr<int64_t> Rowset::estimate_compaction_segment_iterator_num() {
if (num_segments() == 0) {
return 0;
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
// reload this rowset after the underlying segment file is changed
Status reload();
Status reload_segment(int32_t segment_id);
int64_t total_segment_data_size();

const TabletSchema& schema() const { return *_schema; }
void set_schema(const TabletSchema* schema) { _schema = schema; }
Expand Down
8 changes: 8 additions & 0 deletions be/src/storage/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,18 @@ class RowsetMeta {

int64_t total_row_size() { return _rowset_meta_pb->total_row_size(); }

void set_total_row_size(int64_t total_size) { _rowset_meta_pb->set_total_row_size(total_size); }

int64_t total_update_row_size() { return _rowset_meta_pb->total_update_row_size(); }

size_t total_disk_size() const { return _rowset_meta_pb->total_disk_size(); }

void set_total_disk_size(size_t disk_size) { _rowset_meta_pb->set_total_disk_size(disk_size); }

size_t data_disk_size() const { return _rowset_meta_pb->data_disk_size(); }

void set_data_disk_size(size_t data_size) { _rowset_meta_pb->set_data_disk_size(data_size); }

size_t index_disk_size() const { return _rowset_meta_pb->index_disk_size(); }

bool has_delete_predicate() const { return _rowset_meta_pb->has_delete_predicate(); }
Expand Down
8 changes: 8 additions & 0 deletions be/src/storage/rowset/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ class Segment : public std::enable_shared_from_this<Segment> {

int64_t mem_usage() { return _basic_info_mem_usage() + _short_key_index_mem_usage(); }

int64_t get_data_size() {
auto res = _fs->get_file_size(_fname);
if (res.ok()) {
return res.value();
}
return 0;
}

// read short_key_index, for data check, just used in unit test now
Status get_short_key_index(std::vector<std::string>* sk_index_values);

Expand Down
7 changes: 5 additions & 2 deletions be/src/storage/rowset_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ Status RowsetUpdateState::_prepare_partial_update_states(Tablet* tablet, Rowset*
_memory_usage += _partial_update_states[idx].write_columns[col_idx]->memory_usage();
}
int64_t t_end = MonotonicMillis();
_partial_update_states[idx].update_byte_size();
_partial_update_states[idx].inited = true;

LOG(INFO) << strings::Substitute(
Expand Down Expand Up @@ -452,7 +453,8 @@ Status RowsetUpdateState::_check_and_resolve_conflict(Tablet* tablet, Rowset* ro
}

Status RowsetUpdateState::apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, uint32_t segment_id,
EditVersion latest_applied_version, const PrimaryIndex& index) {
EditVersion latest_applied_version, const PrimaryIndex& index,
int64_t* append_column_size) {
const auto& rowset_meta_pb = rowset->rowset_meta()->get_meta_pb();
if (!rowset_meta_pb.has_txn_meta() || rowset->num_segments() == 0 ||
rowset_meta_pb.txn_meta().has_merge_condition()) {
Expand Down Expand Up @@ -505,8 +507,9 @@ Status RowsetUpdateState::apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_
if (_partial_update_states[segment_id].write_columns[col_idx] != nullptr) {
_memory_usage -= _partial_update_states[segment_id].write_columns[col_idx]->memory_usage();
}
*append_column_size += _partial_update_states[segment_id].byte_size;
_partial_update_states[segment_id].release();
}
_partial_update_states[segment_id].release();
return Status::OK();
}

Expand Down
11 changes: 10 additions & 1 deletion be/src/storage/rowset_update_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ struct PartialUpdateState {
std::vector<std::unique_ptr<vectorized::Column>> write_columns;
bool inited = false;
EditVersion read_version;
int64_t byte_size = 0;

void update_byte_size() {
for (size_t i = 0; i < write_columns.size(); i++) {
if (write_columns[i] != nullptr) {
byte_size += write_columns[i]->byte_size();
}
}
}

void release() {
src_rss_rowids.clear();
Expand All @@ -41,7 +50,7 @@ class RowsetUpdateState {
Status load(Tablet* tablet, Rowset* rowset);

Status apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, uint32_t segment_id,
EditVersion latest_applied_version, const PrimaryIndex& index);
EditVersion latest_applied_version, const PrimaryIndex& index, int64_t* append_column_size);

const std::vector<ColumnUniquePtr>& upserts() const { return _upserts; }
const std::vector<ColumnUniquePtr>& deletes() const { return _deletes; }
Expand Down
60 changes: 52 additions & 8 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ Status TabletUpdates::_rowset_commit_unlocked(int64_t version, const RowsetShare
rowset_stats->num_rows = rowset->num_rows();
rowset_stats->num_dels = 0;
rowset_stats->byte_size = rowset->data_disk_size();
rowset_stats->row_size = rowset->total_row_size();
_calc_compaction_score(rowset_stats.get());

std::lock_guard lg(_rowset_stats_lock);
Expand Down Expand Up @@ -903,13 +904,16 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
EditVersion latest_applied_version;
st = get_latest_applied_version(&latest_applied_version);

int64_t full_row_size = 0;
int64_t full_rowset_size = 0;
if (rowset->rowset_meta()->get_meta_pb().delfile_idxes_size() == 0) {
for (uint32_t i = 0; i < rowset->num_segments(); i++) {
state.load_upserts(rowset.get(), i);
auto& upserts = state.upserts();
if (upserts[i] != nullptr) {
// apply partial rowset segment
st = state.apply(&_tablet, rowset.get(), rowset_id, i, latest_applied_version, index);
st = state.apply(&_tablet, rowset.get(), rowset_id, i, latest_applied_version, index,
&full_row_size);
if (!st.ok()) {
manager->update_state_cache().remove(state_entry);
std::string msg =
Expand Down Expand Up @@ -961,7 +965,8 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
auto& upserts = state.upserts();
if (upserts[loaded_upsert] != nullptr) {
// apply partial rowset segment
st = state.apply(&_tablet, rowset.get(), rowset_id, loaded_upsert, latest_applied_version, index);
st = state.apply(&_tablet, rowset.get(), rowset_id, loaded_upsert, latest_applied_version, index,
&full_row_size);
if (!st.ok()) {
manager->update_state_cache().remove(state_entry);
std::string msg = strings::Substitute(
Expand Down Expand Up @@ -990,6 +995,8 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
}
}
}
full_row_size += rowset->rowset_meta()->total_row_size();
full_rowset_size = rowset->total_segment_data_size();

PersistentIndexMetaPB index_meta;
if (enable_persistent_index) {
Expand Down Expand Up @@ -1108,7 +1115,11 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
// 4. write meta
const auto& rowset_meta_pb = rowset->rowset_meta()->get_meta_pb();
if (rowset_meta_pb.has_txn_meta()) {
full_rowset_size = rowset->total_segment_data_size();
rowset->rowset_meta()->clear_txn_meta();
rowset->rowset_meta()->set_total_row_size(full_row_size);
rowset->rowset_meta()->set_total_disk_size(full_rowset_size);
rowset->rowset_meta()->set_data_disk_size(full_rowset_size);
st = TabletMetaManager::apply_rowset_commit(_tablet.data_dir(), tablet_id, _next_log_id, version,
new_del_vecs, index_meta, enable_persistent_index,
&(rowset->rowset_meta()->get_meta_pb()));
Expand Down Expand Up @@ -1137,6 +1148,19 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
_apply_version_changed.notify_all();
}

{
std::lock_guard lg(_rowset_stats_lock);
auto iter = _rowset_stats.find(rowset_id);
if (iter == _rowset_stats.end()) {
string msg = strings::Substitute("inconsistent rowset_stats, rowset not found tablet=$0 rowsetid=$1",
_tablet.tablet_id(), rowset_id);
LOG(ERROR) << msg;
} else {
iter->second->byte_size = full_rowset_size;
iter->second->row_size = full_row_size;
}
}

st = index.on_commited();
if (!st.ok()) {
std::string msg = Substitute("primary index on_commit failed: $0", st.to_string());
Expand Down Expand Up @@ -2309,12 +2333,32 @@ void TabletUpdates::get_tablet_info_extra(TTabletInfo* info) {
}

int64_t TabletUpdates::get_average_row_size() {
TTabletInfo info;
get_tablet_info_extra(&info);
int64_t total_row = info.row_count;
int64_t total_size = info.data_size;
if (total_row != 0) {
return total_size / total_row;
int64_t row_num = 0;
int64_t total_row_size = 0;
vector<uint32_t> rowsets;
{
std::lock_guard rl(_lock);
if (_edit_version_infos.empty()) {
LOG(WARNING) << "tablet delete when get_tablet_info_extra tablet:" << _tablet.tablet_id();
} else {
auto& last = _edit_version_infos.back();
rowsets = last->rowsets;
}
}
{
std::lock_guard lg(_rowset_stats_lock);
for (uint32_t rowsetid : rowsets) {
auto itr = _rowset_stats.find(rowsetid);
if (itr != _rowset_stats.end()) {
// TODO(cbl): also report num deletes
row_num += itr->second->num_rows;
total_row_size += itr->second->row_size;
}
}
}

if (row_num != 0) {
return total_row_size / row_num;
} else {
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/tablet_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ class TabletUpdates {
size_t num_rows = 0;
size_t num_dels = 0;
size_t byte_size = 0;
size_t row_size = 0;
int64_t compaction_score = 0;
std::string to_string() const;
};
Expand Down

0 comments on commit 7b5466b

Please sign in to comment.