Skip to content

Commit

Permalink
[Enhancement] Optimize memory usage of primary key table large load (#…
Browse files Browse the repository at this point in the history
…12068)

Currently, RowsetUpdateState::load will preload all segments primary keys into memory, if the load(rowset) is very large, it will use a lot of memory during the commit or apply phrase.

For large load(rowset), we don't preload all segment's primary keys but process segment by segment, which can reduce the memory usage during apply.

It is important to note that the limitation is a soft limit because we can't tolerate the failure to apply, so memory usage may still exceed the limitation.

In my test env, one BE with two HDD, using Broker load, create a table with persistent index:

use tpcds to test
create table sql, using broker load:

CREATE TABLE `store_sales` (
  `ss_item_sk` bigint(20) NOT NULL COMMENT "",
  `ss_ticket_number` bigint(20) NOT NULL COMMENT "",
  `ss_sold_date_sk` bigint(20) NULL COMMENT "",
  `ss_sold_time_sk` bigint(20) NULL COMMENT "",
  `ss_customer_sk` bigint(20) NULL COMMENT "",
  `ss_cdemo_sk` bigint(20) NULL COMMENT "",
  `ss_hdemo_sk` bigint(20) NULL COMMENT "",
  `ss_addr_sk` bigint(20) NULL COMMENT "",
  `ss_store_sk` bigint(20) NULL COMMENT "",
  `ss_promo_sk` bigint(20) NULL COMMENT "",
  `ss_quantity` bigint(20) NULL COMMENT "",
  `ss_wholesale_cost` decimal64(7, 2) NULL COMMENT "",
  `ss_list_price` decimal64(7, 2) NULL COMMENT "",
  `ss_sales_price` decimal64(7, 2) NULL COMMENT "",
  `ss_ext_discount_amt` decimal64(7, 2) NULL COMMENT "",
  `ss_ext_sales_price` decimal64(7, 2) NULL COMMENT "",
  `ss_ext_wholesale_cost` decimal64(7, 2) NULL COMMENT "",
  `ss_ext_list_price` decimal64(7, 2) NULL COMMENT "",
  `ss_ext_tax` decimal64(7, 2) NULL COMMENT "",
  `ss_coupon_amt` decimal64(7, 2) NULL COMMENT "",
  `ss_net_paid` decimal64(7, 2) NULL COMMENT "",
  `ss_net_paid_inc_tax` decimal64(7, 2) NULL COMMENT "",
  `ss_net_profit` decimal64(7, 2) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`ss_item_sk`, `ss_ticket_number`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`ss_item_sk`, `ss_ticket_number`) BUCKETS 2
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "true",
"compression" = "LZ4"
);
PrimaryKey Length	RowNum	BucketNum	Load time(s)	Apply time(ms)	Peak Memory usage(GB)	Note
16 Bytes	864001869	2	7643	355200	25.03	branch-opt
16 Bytes	864001869	2	7591	348465	46.45	branch-main
16 Bytes	864001869	100	7194	32705	25.11	branch-opt
16 Bytes	864001869	100	7104	30705	43.14	branch-main
Note there are still some scenarios we don't resolve in this pr:

In the partial update, the read column data maybe very large and we don't resolve it in this pr
We still need to load all primary key into L0 of persistent index first which maybe cause OOM

(cherry picked from commit 6c68734)

# Conflicts:
#	be/src/storage/rowset_update_state.cpp
  • Loading branch information
sevev authored and mergify[bot] committed Nov 21, 2022
1 parent 9251c9a commit 4c2a647
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 89 deletions.
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -879,4 +879,5 @@ CONF_mInt64(l0_max_file_size, "209715200"); // 200MB

// Used by query cache, cache entries are evicted when it exceeds its capacity(500MB in default)
CONF_Int64(query_cache_capacity, "536870912");

} // namespace starrocks::config
203 changes: 153 additions & 50 deletions be/src/storage/rowset_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,85 +45,176 @@ Status RowsetUpdateState::load(Tablet* tablet, Rowset* rowset) {
return _status;
}

Status RowsetUpdateState::_do_load(Tablet* tablet, Rowset* rowset) {
auto span = Tracer::Instance().start_trace_txn_tablet("rowset_update_state_load", rowset->txn_id(),
tablet->tablet_id());
_tablet_id = tablet->tablet_id();
auto& schema = rowset->schema();
vector<uint32_t> pk_columns;
for (size_t i = 0; i < schema.num_key_columns(); i++) {
pk_columns.push_back((uint32_t)i);
Status RowsetUpdateState::_load_deletes(Rowset* rowset, uint32_t idx, vectorized::Column* pk_column) {
DCHECK(_deletes.size() >= idx);
// always one file for now.
if (_deletes.size() == 0) {
_deletes.resize(rowset->num_delete_files());
}
vectorized::Schema pkey_schema = ChunkHelper::convert_schema_to_format_v2(schema, pk_columns);
std::unique_ptr<vectorized::Column> pk_column;
if (!PrimaryKeyEncoder::create_column(pkey_schema, &pk_column).ok()) {
CHECK(false) << "create column for primary key encoder failed";
if (_deletes.size() == 0 || _deletes[idx] != nullptr) {
return Status::OK();
}

ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(rowset->rowset_path()));
// always one file for now.
for (auto i = 0; i < rowset->num_delete_files(); i++) {
auto path = Rowset::segment_del_file_path(rowset->rowset_path(), rowset->rowset_id(), i);
ASSIGN_OR_RETURN(auto read_file, fs->new_random_access_file(path));
ASSIGN_OR_RETURN(auto file_size, read_file->get_size());
std::vector<uint8_t> read_buffer(file_size);
RETURN_IF_ERROR(read_file->read_at_fully(0, read_buffer.data(), read_buffer.size()));
auto col = pk_column->clone();
if (serde::ColumnArraySerde::deserialize(read_buffer.data(), col.get()) == nullptr) {
return Status::InternalError("column deserialization failed");
}
_deletes.emplace_back(std::move(col));
auto path = Rowset::segment_del_file_path(rowset->rowset_path(), rowset->rowset_id(), idx);
ASSIGN_OR_RETURN(auto read_file, fs->new_random_access_file(path));
ASSIGN_OR_RETURN(auto file_size, read_file->get_size());
std::vector<uint8_t> read_buffer(file_size);
RETURN_IF_ERROR(read_file->read_at_fully(0, read_buffer.data(), read_buffer.size()));
auto col = pk_column->clone();
if (serde::ColumnArraySerde::deserialize(read_buffer.data(), col.get()) == nullptr) {
return Status::InternalError("column deserialization failed");
}
col->raw_data();
_memory_usage += col != nullptr ? col->memory_usage() : 0;
_deletes[idx] = std::move(col);
return Status::OK();
}

Status RowsetUpdateState::_load_upserts(Rowset* rowset, uint32_t idx, vectorized::Column* pk_column) {
RowsetReleaseGuard guard(rowset->shared_from_this());
DCHECK(_upserts.size() >= idx);
if (_upserts.size() == 0) {
_upserts.resize(rowset->num_segments());
}
if (_upserts.size() == 0 || _upserts[idx] != nullptr) {
return Status::OK();
}

OlapReaderStatistics stats;
auto& schema = rowset->schema();
vector<uint32_t> pk_columns;
for (size_t i = 0; i < schema.num_key_columns(); i++) {
pk_columns.push_back((uint32_t)i);
}
vectorized::Schema pkey_schema = ChunkHelper::convert_schema_to_format_v2(schema, pk_columns);
auto res = rowset->get_segment_iterators2(pkey_schema, nullptr, 0, &stats);
if (!res.ok()) {
return res.status();
}
// TODO(cbl): auto close iterators on failure
auto& itrs = res.value();
CHECK(itrs.size() == rowset->num_segments()) << "itrs.size != num_segments";
_upserts.resize(rowset->num_segments());

// only hold pkey, so can use larger chunk size
auto chunk_shared_ptr = ChunkHelper::new_chunk(pkey_schema, 4096);
auto chunk = chunk_shared_ptr.get();
for (size_t i = 0; i < itrs.size(); i++) {
auto& dest = _upserts[i];
auto col = pk_column->clone();
auto itr = itrs[i].get();
if (itr != nullptr) {
auto num_rows = rowset->segments()[i]->num_rows();
col->reserve(num_rows);
while (true) {
chunk->reset();
auto st = itr->get_next(chunk);
if (st.is_end_of_file()) {
break;
} else if (!st.ok()) {
return st;
} else {
PrimaryKeyEncoder::encode(pkey_schema, *chunk, 0, chunk->num_rows(), col.get());
}
auto& dest = _upserts[idx];
auto col = pk_column->clone();
auto itr = itrs[idx].get();
if (itr != nullptr) {
auto num_rows = rowset->segments()[idx]->num_rows();
col->reserve(num_rows);
while (true) {
chunk->reset();
auto st = itr->get_next(chunk);
if (st.is_end_of_file()) {
break;
} else if (!st.ok()) {
return st;
} else {
PrimaryKeyEncoder::encode(pkey_schema, *chunk, 0, chunk->num_rows(), col.get());
}
itr->close();
CHECK(col->size() == num_rows) << "read segment: iter rows != num rows";
}
dest = std::move(col);
CHECK(col->size() == num_rows) << "read segment: iter rows != num rows";
}
for (const auto& itr : itrs) {
itr->close();
}
dest = std::move(col);
// This is a little bit trick. If pk column is a binary column, we will call function `raw_data()` in the following
// And the function `raw_data()` will build slice of pk column which will increase the memory usage of pk column
// So we try build slice in advance in here to make sure the correctness of memory statistics
dest->raw_data();
_memory_usage += dest != nullptr ? dest->memory_usage() : 0;

return Status::OK();
}

Status RowsetUpdateState::_do_load(Tablet* tablet, Rowset* rowset) {
auto span = Tracer::Instance().start_trace_txn_tablet("rowset_update_state_load", rowset->txn_id(),
tablet->tablet_id());
_tablet_id = tablet->tablet_id();
auto& schema = rowset->schema();
vector<uint32_t> pk_columns;
for (size_t i = 0; i < schema.num_key_columns(); i++) {
pk_columns.push_back((uint32_t)i);
}
for (const auto& upsert : _upserts) {
_memory_usage += upsert != nullptr ? upsert->memory_usage() : 0;
vectorized::Schema pkey_schema = ChunkHelper::convert_schema_to_format_v2(schema, pk_columns);
std::unique_ptr<vectorized::Column> pk_column;
if (!PrimaryKeyEncoder::create_column(pkey_schema, &pk_column).ok()) {
CHECK(false) << "create column for primary key encoder failed";
}
for (const auto& one_delete : _deletes) {
_memory_usage += one_delete != nullptr ? one_delete->memory_usage() : 0;
// if rowset is partial rowset, we need to load rowset totally because we don't support load multiple load
// for partial update so far
bool ignore_mem_limit = rowset->rowset_meta()->get_meta_pb().has_txn_meta() && rowset->num_segments() != 0;

if (ignore_mem_limit) {
for (size_t i = 0; i < rowset->num_delete_files(); i++) {
RETURN_IF_ERROR(_load_deletes(rowset, i, pk_column.get()));
}
for (size_t i = 0; i < rowset->num_segments(); i++) {
RETURN_IF_ERROR(_load_upserts(rowset, i, pk_column.get()));
}
} else {
RETURN_IF_ERROR(_load_deletes(rowset, 0, pk_column.get()));
RETURN_IF_ERROR(_load_upserts(rowset, 0, pk_column.get()));
}

if (!rowset->rowset_meta()->get_meta_pb().has_txn_meta() || rowset->num_segments() == 0 ||
rowset->rowset_meta()->get_meta_pb().txn_meta().has_merge_condition()) {
return Status::OK();
}
return _prepare_partial_update_states(tablet, rowset);
}

Status RowsetUpdateState::load_deletes(Rowset* rowset, uint32_t idx) {
auto& schema = rowset->schema();
vector<uint32_t> pk_columns;
for (size_t i = 0; i < schema.num_key_columns(); i++) {
pk_columns.push_back((uint32_t)i);
}
vectorized::Schema pkey_schema = ChunkHelper::convert_schema_to_format_v2(schema, pk_columns);
std::unique_ptr<vectorized::Column> pk_column;
if (!PrimaryKeyEncoder::create_column(pkey_schema, &pk_column).ok()) {
CHECK(false) << "create column for primary key encoder failed";
}
return _load_deletes(rowset, idx, pk_column.get());
}

Status RowsetUpdateState::load_upserts(Rowset* rowset, uint32_t upsert_id) {
auto& schema = rowset->schema();
vector<uint32_t> pk_columns;
for (size_t i = 0; i < schema.num_key_columns(); i++) {
pk_columns.push_back((uint32_t)i);
}
vectorized::Schema pkey_schema = ChunkHelper::convert_schema_to_format_v2(schema, pk_columns);
std::unique_ptr<vectorized::Column> pk_column;
if (!PrimaryKeyEncoder::create_column(pkey_schema, &pk_column).ok()) {
CHECK(false) << "create column for primary key encoder failed";
}
return _load_upserts(rowset, upsert_id, pk_column.get());
}

void RowsetUpdateState::release_upserts(uint32_t idx) {
if (idx >= _upserts.size()) {
return;
}
if (_upserts[idx] != nullptr) {
_memory_usage -= _upserts[idx]->memory_usage();
_upserts[idx].reset();
}
}

void RowsetUpdateState::release_deletes(uint32_t idx) {
if (idx >= _deletes.size()) {
return;
}
if (_deletes[idx] != nullptr) {
_memory_usage -= _deletes[idx]->memory_usage();
_deletes[idx].reset();
}
}

struct RowidSortEntry {
uint32_t rowid;
uint32_t idx;
Expand Down Expand Up @@ -256,6 +347,7 @@ Status RowsetUpdateState::_prepare_partial_update_states(Tablet* tablet, Rowset*
for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) {
_partial_update_states[i].write_columns[col_idx]->append_selective(*read_columns[col_idx], idxes.data(), 0,
idxes.size());
_memory_usage += _partial_update_states[i].write_columns[col_idx]->memory_usage();
}
}
int64_t t_end = MonotonicMillis();
Expand Down Expand Up @@ -411,13 +503,24 @@ Status RowsetUpdateState::apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_
// clean this to prevent DeferOp clean files
rewrite_files.clear();
RETURN_IF_ERROR(rowset->reload());
<<<<<<< HEAD
// Be may crash during the rewrite or after the rewrite
// So the data at the end of the segment_file may be illegal
// We use partial_rowset_footers to locate the partial_footer so that
// the segment can be read normally after be crash during rewrite
// If rewrite is finished, the partial_segment_footer should be removed from rowset_meta
// to make sure the new full rowset could be read normally after be restarted
RETURN_IF_ERROR(_update_rowset_meta(tablet, rowset));
=======
for (size_t i = 0; i < _partial_update_states.size(); i++) {
for (size_t col_idx = 0; col_idx < _partial_update_states[i].write_columns.size(); col_idx++) {
if (_partial_update_states[i].write_columns[col_idx] != nullptr) {
_memory_usage -= _partial_update_states[i].write_columns[col_idx]->memory_usage();
_partial_update_states[i].write_columns[col_idx].reset();
}
}
}
>>>>>>> 6c6873447 ([Enhancement] Optimize memory usage of primary key table large load (#12068))
return Status::OK();
}

Expand Down
8 changes: 8 additions & 0 deletions be/src/storage/rowset_update_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ class RowsetUpdateState {
static void plan_read_by_rssid(const vector<uint64_t>& rowids, size_t* num_default,
std::map<uint32_t, std::vector<uint32_t>>* rowids_by_rssid, vector<uint32_t>* idxes);

Status load_deletes(Rowset* rowset, uint32_t delete_id);
Status load_upserts(Rowset* rowset, uint32_t upsert_id);
void release_upserts(uint32_t idx);
void release_deletes(uint32_t idx);

private:
Status _load_deletes(Rowset* rowset, uint32_t delete_id, vectorized::Column* pk_column);
Status _load_upserts(Rowset* rowset, uint32_t upsert_id, vectorized::Column* pk_column);

Status _do_load(Tablet* tablet, Rowset* rowset);

Status _prepare_partial_update_states(Tablet* tablet, Rowset* rowset);
Expand Down
29 changes: 22 additions & 7 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
st = get_latest_applied_version(&latest_applied_version);
if (st.ok()) {
st = state.apply(&_tablet, rowset.get(), rowset_id, latest_applied_version, index);
manager->update_state_cache().update_object_size(state_entry, state.memory_usage());
}
if (!st.ok()) {
manager->update_state_cache().remove(state_entry);
Expand Down Expand Up @@ -874,17 +875,23 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
for (uint32_t i = 0; i < rowset->num_segments(); i++) {
new_deletes[rowset_id + i] = {};
}
auto& upserts = state.upserts();
for (uint32_t i = 0; i < upserts.size(); i++) {

for (uint32_t i = 0; i < rowset->num_segments(); i++) {
state.load_upserts(rowset.get(), i);
auto& upserts = state.upserts();
if (upserts[i] != nullptr) {
_do_update(rowset_id, i, conditional_column, upserts, index, tablet_id, &new_deletes);
manager->index_cache().update_object_size(index_entry, index.memory_usage());
}
state.release_upserts(i);
}

for (const auto& one_delete : state.deletes()) {
delete_op += one_delete->size();
index.erase(*one_delete, &new_deletes);
for (uint32_t i = 0; i < rowset->num_delete_files(); i++) {
state.load_deletes(rowset.get(), i);
auto& deletes = state.deletes();
delete_op += deletes[i]->size();
index.erase(*deletes[i], &new_deletes);
state.release_deletes(i);
}

PersistentIndexMetaPB index_meta;
Expand Down Expand Up @@ -1426,6 +1433,15 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
uint32_t max_src_rssid = max_rowset_id + rowset->num_segments() - 1;

for (size_t i = 0; i < _compaction_state->pk_cols.size(); i++) {
if (st = _compaction_state->load_segments(rowset, i); !st.ok()) {
manager->index_cache().release(index_entry);
_compaction_state.reset();
std::string msg = Substitute("_apply_compaction_commit error: load compaction state failed: $0 $1",
st.to_string(), debug_string());
LOG(ERROR) << msg;
_set_error(msg);
return;
}
auto& pk_col = _compaction_state->pk_cols[i];
total_rows += pk_col->size();
uint32_t rssid = rowset_id + i;
Expand All @@ -1440,8 +1456,7 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
total_deletes += tmp_deletes.size();
}
delvecs.emplace_back(rssid, dv);
// release memory early
pk_col.reset();
_compaction_state->release_segments(rowset, i);
}
// release memory
_compaction_state.reset();
Expand Down
Loading

0 comments on commit 4c2a647

Please sign in to comment.