diff --git a/be/src/storage/rowset/rowset.cpp b/be/src/storage/rowset/rowset.cpp index c57e210187f91..fdb08b9031609 100644 --- a/be/src/storage/rowset/rowset.cpp +++ b/be/src/storage/rowset/rowset.cpp @@ -184,6 +184,16 @@ Status Rowset::reload_segment(int32_t segment_id) { return Status::OK(); } +int64_t Rowset::total_segment_data_size() { + int64_t res = 0; + for (auto& seg : _segments) { + if (seg != nullptr) { + res += seg->get_data_size(); + } + } + return res; +} + StatusOr Rowset::estimate_compaction_segment_iterator_num() { if (num_segments() == 0) { return 0; diff --git a/be/src/storage/rowset/rowset.h b/be/src/storage/rowset/rowset.h index 710c210da5297..a99ec3ba2d9c6 100644 --- a/be/src/storage/rowset/rowset.h +++ b/be/src/storage/rowset/rowset.h @@ -141,6 +141,7 @@ class Rowset : public std::enable_shared_from_this { // reload this rowset after the underlying segment file is changed Status reload(); Status reload_segment(int32_t segment_id); + int64_t total_segment_data_size(); const TabletSchema& schema() const { return *_schema; } void set_schema(const TabletSchema* schema) { _schema = schema; } diff --git a/be/src/storage/rowset/rowset_meta.h b/be/src/storage/rowset/rowset_meta.h index 5be6f2f5772e9..e28f18f30fb66 100644 --- a/be/src/storage/rowset/rowset_meta.h +++ b/be/src/storage/rowset/rowset_meta.h @@ -79,10 +79,18 @@ class RowsetMeta { int64_t total_row_size() { return _rowset_meta_pb->total_row_size(); } + void set_total_row_size(int64_t total_size) { _rowset_meta_pb->set_total_row_size(total_size); } + + int64_t total_update_row_size() { return _rowset_meta_pb->total_update_row_size(); } + size_t total_disk_size() const { return _rowset_meta_pb->total_disk_size(); } + void set_total_disk_size(size_t disk_size) { _rowset_meta_pb->set_total_disk_size(disk_size); } + size_t data_disk_size() const { return _rowset_meta_pb->data_disk_size(); } + void set_data_disk_size(size_t data_size) { _rowset_meta_pb->set_data_disk_size(data_size); } + size_t index_disk_size() const { return _rowset_meta_pb->index_disk_size(); } bool has_delete_predicate() const { return _rowset_meta_pb->has_delete_predicate(); } diff --git a/be/src/storage/rowset/segment.h b/be/src/storage/rowset/segment.h index 3133cfac73253..2ef69689439d8 100644 --- a/be/src/storage/rowset/segment.h +++ b/be/src/storage/rowset/segment.h @@ -153,6 +153,14 @@ class Segment : public std::enable_shared_from_this { int64_t mem_usage() { return _basic_info_mem_usage() + _short_key_index_mem_usage(); } + int64_t get_data_size() { + auto res = _fs->get_file_size(_fname); + if (res.ok()) { + return res.value(); + } + return 0; + } + // read short_key_index, for data check, just used in unit test now Status get_short_key_index(std::vector* sk_index_values); diff --git a/be/src/storage/rowset_update_state.cpp b/be/src/storage/rowset_update_state.cpp index 84497699143ae..55bb95a2ee2e2 100644 --- a/be/src/storage/rowset_update_state.cpp +++ b/be/src/storage/rowset_update_state.cpp @@ -356,6 +356,7 @@ Status RowsetUpdateState::_prepare_partial_update_states(Tablet* tablet, Rowset* _memory_usage += _partial_update_states[idx].write_columns[col_idx]->memory_usage(); } int64_t t_end = MonotonicMillis(); + _partial_update_states[idx].update_byte_size(); _partial_update_states[idx].inited = true; LOG(INFO) << strings::Substitute( @@ -452,7 +453,8 @@ Status RowsetUpdateState::_check_and_resolve_conflict(Tablet* tablet, Rowset* ro } Status RowsetUpdateState::apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, uint32_t segment_id, - EditVersion latest_applied_version, const PrimaryIndex& index) { + EditVersion latest_applied_version, const PrimaryIndex& index, + int64_t* append_column_size) { const auto& rowset_meta_pb = rowset->rowset_meta()->get_meta_pb(); if (!rowset_meta_pb.has_txn_meta() || rowset->num_segments() == 0 || rowset_meta_pb.txn_meta().has_merge_condition()) { @@ -505,8 +507,9 @@ Status RowsetUpdateState::apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_ if (_partial_update_states[segment_id].write_columns[col_idx] != nullptr) { _memory_usage -= _partial_update_states[segment_id].write_columns[col_idx]->memory_usage(); } + *append_column_size += _partial_update_states[segment_id].byte_size; + _partial_update_states[segment_id].release(); } - _partial_update_states[segment_id].release(); return Status::OK(); } diff --git a/be/src/storage/rowset_update_state.h b/be/src/storage/rowset_update_state.h index 64f84d604b34a..894c2d488f6e4 100644 --- a/be/src/storage/rowset_update_state.h +++ b/be/src/storage/rowset_update_state.h @@ -18,6 +18,15 @@ struct PartialUpdateState { std::vector> write_columns; bool inited = false; EditVersion read_version; + int64_t byte_size = 0; + + void update_byte_size() { + for (size_t i = 0; i < write_columns.size(); i++) { + if (write_columns[i] != nullptr) { + byte_size += write_columns[i]->byte_size(); + } + } + } void release() { src_rss_rowids.clear(); @@ -41,7 +50,7 @@ class RowsetUpdateState { Status load(Tablet* tablet, Rowset* rowset); Status apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, uint32_t segment_id, - EditVersion latest_applied_version, const PrimaryIndex& index); + EditVersion latest_applied_version, const PrimaryIndex& index, int64_t* append_column_size); const std::vector& upserts() const { return _upserts; } const std::vector& deletes() const { return _deletes; } diff --git a/be/src/storage/tablet_updates.cpp b/be/src/storage/tablet_updates.cpp index bfbc402881aba..e353fab886af7 100644 --- a/be/src/storage/tablet_updates.cpp +++ b/be/src/storage/tablet_updates.cpp @@ -624,6 +624,7 @@ Status TabletUpdates::_rowset_commit_unlocked(int64_t version, const RowsetShare rowset_stats->num_rows = rowset->num_rows(); rowset_stats->num_dels = 0; rowset_stats->byte_size = rowset->data_disk_size(); + rowset_stats->row_size = rowset->total_row_size(); _calc_compaction_score(rowset_stats.get()); std::lock_guard lg(_rowset_stats_lock); @@ -903,13 +904,16 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) { EditVersion latest_applied_version; st = get_latest_applied_version(&latest_applied_version); + int64_t full_row_size = 0; + int64_t full_rowset_size = 0; if (rowset->rowset_meta()->get_meta_pb().delfile_idxes_size() == 0) { for (uint32_t i = 0; i < rowset->num_segments(); i++) { state.load_upserts(rowset.get(), i); auto& upserts = state.upserts(); if (upserts[i] != nullptr) { // apply partial rowset segment - st = state.apply(&_tablet, rowset.get(), rowset_id, i, latest_applied_version, index); + st = state.apply(&_tablet, rowset.get(), rowset_id, i, latest_applied_version, index, + &full_row_size); if (!st.ok()) { manager->update_state_cache().remove(state_entry); std::string msg = @@ -961,7 +965,8 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) { auto& upserts = state.upserts(); if (upserts[loaded_upsert] != nullptr) { // apply partial rowset segment - st = state.apply(&_tablet, rowset.get(), rowset_id, loaded_upsert, latest_applied_version, index); + st = state.apply(&_tablet, rowset.get(), rowset_id, loaded_upsert, latest_applied_version, index, + &full_row_size); if (!st.ok()) { manager->update_state_cache().remove(state_entry); std::string msg = strings::Substitute( @@ -990,6 +995,8 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) { } } } + full_row_size += rowset->rowset_meta()->total_row_size(); + full_rowset_size = rowset->total_segment_data_size(); PersistentIndexMetaPB index_meta; if (enable_persistent_index) { @@ -1108,7 +1115,11 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) { // 4. write meta const auto& rowset_meta_pb = rowset->rowset_meta()->get_meta_pb(); if (rowset_meta_pb.has_txn_meta()) { + full_rowset_size = rowset->total_segment_data_size(); rowset->rowset_meta()->clear_txn_meta(); + rowset->rowset_meta()->set_total_row_size(full_row_size); + rowset->rowset_meta()->set_total_disk_size(full_rowset_size); + rowset->rowset_meta()->set_data_disk_size(full_rowset_size); st = TabletMetaManager::apply_rowset_commit(_tablet.data_dir(), tablet_id, _next_log_id, version, new_del_vecs, index_meta, enable_persistent_index, &(rowset->rowset_meta()->get_meta_pb())); @@ -1137,6 +1148,19 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) { _apply_version_changed.notify_all(); } + { + std::lock_guard lg(_rowset_stats_lock); + auto iter = _rowset_stats.find(rowset_id); + if (iter == _rowset_stats.end()) { + string msg = strings::Substitute("inconsistent rowset_stats, rowset not found tablet=$0 rowsetid=$1", + _tablet.tablet_id(), rowset_id); + LOG(ERROR) << msg; + } else { + iter->second->byte_size = full_rowset_size; + iter->second->row_size = full_row_size; + } + } + st = index.on_commited(); if (!st.ok()) { std::string msg = Substitute("primary index on_commit failed: $0", st.to_string()); @@ -2309,12 +2333,32 @@ void TabletUpdates::get_tablet_info_extra(TTabletInfo* info) { } int64_t TabletUpdates::get_average_row_size() { - TTabletInfo info; - get_tablet_info_extra(&info); - int64_t total_row = info.row_count; - int64_t total_size = info.data_size; - if (total_row != 0) { - return total_size / total_row; + int64_t row_num = 0; + int64_t total_row_size = 0; + vector rowsets; + { + std::lock_guard rl(_lock); + if (_edit_version_infos.empty()) { + LOG(WARNING) << "tablet delete when get_tablet_info_extra tablet:" << _tablet.tablet_id(); + } else { + auto& last = _edit_version_infos.back(); + rowsets = last->rowsets; + } + } + { + std::lock_guard lg(_rowset_stats_lock); + for (uint32_t rowsetid : rowsets) { + auto itr = _rowset_stats.find(rowsetid); + if (itr != _rowset_stats.end()) { + // TODO(cbl): also report num deletes + row_num += itr->second->num_rows; + total_row_size += itr->second->row_size; + } + } + } + + if (row_num != 0) { + return total_row_size / row_num; } else { return 0; } diff --git a/be/src/storage/tablet_updates.h b/be/src/storage/tablet_updates.h index 1acd2fd8ac8ab..4f5ed48a10e9f 100644 --- a/be/src/storage/tablet_updates.h +++ b/be/src/storage/tablet_updates.h @@ -302,6 +302,7 @@ class TabletUpdates { size_t num_rows = 0; size_t num_dels = 0; size_t byte_size = 0; + size_t row_size = 0; int64_t compaction_score = 0; std::string to_string() const; };