diff --git a/be/src/storage/delta_writer.cpp b/be/src/storage/delta_writer.cpp index 3b98113dffb26..78610eaaca6cd 100644 --- a/be/src/storage/delta_writer.cpp +++ b/be/src/storage/delta_writer.cpp @@ -185,6 +185,17 @@ Status DeltaWriter::_init() { } writer_context.referenced_column_ids.push_back(index); } + int64_t average_row_size = _tablet->updates()->get_average_row_size(); + if (average_row_size != 0) { + _memtable_buffer_row = config::write_buffer_size / average_row_size; + } else { + // If tablet is a new created tablet and has no historical data, average_row_size is 0 + // And we use schema size as average row size. If there are complex type(i.e. BITMAP/ARRAY) or varchar, + // we will consider it as 16 bytes. + average_row_size = _tablet->tablet_schema().estimate_row_size(16); + _memtable_buffer_row = config::write_buffer_size / average_row_size; + } + writer_context.partial_update_tablet_schema = TabletSchema::create(_tablet->tablet_schema(), writer_context.referenced_column_ids); auto sort_key_idxes = _tablet->tablet_schema().sort_key_idxes(); @@ -388,6 +399,7 @@ void DeltaWriter::_reset_mem_table() { _mem_table = std::make_unique(_tablet->tablet_id(), &_vectorized_schema, _opt.slots, _mem_table_sink.get(), "", _mem_tracker); } + _mem_table->set_write_buffer_row(_memtable_buffer_row); } Status DeltaWriter::commit() { diff --git a/be/src/storage/delta_writer.h b/be/src/storage/delta_writer.h index ef0212ff86532..67356deae11a3 100644 --- a/be/src/storage/delta_writer.h +++ b/be/src/storage/delta_writer.h @@ -164,6 +164,8 @@ class DeltaWriter { std::unique_ptr _flush_token; std::unique_ptr _replicate_token; bool _with_rollback_log; + // initial value is max value + size_t _memtable_buffer_row = -1; }; } // namespace vectorized diff --git a/be/src/storage/memtable.cpp b/be/src/storage/memtable.cpp index 5788fbbd6f94f..f4fdadf311697 100644 --- a/be/src/storage/memtable.cpp +++ b/be/src/storage/memtable.cpp @@ -116,8 +116,12 @@ size_t MemTable::write_buffer_size() const { return _chunk_bytes_usage + _aggregator_bytes_usage; } +size_t MemTable::write_buffer_rows() const { + return _total_rows - _merged_rows; +} + bool MemTable::is_full() const { - return write_buffer_size() >= _max_buffer_size; + return write_buffer_size() >= _max_buffer_size || write_buffer_rows() >= _max_buffer_row; } bool MemTable::insert(const Chunk& chunk, const uint32_t* indexes, uint32_t from, uint32_t size) { @@ -147,6 +151,7 @@ bool MemTable::insert(const Chunk& chunk, const uint32_t* indexes, uint32_t from if (chunk.has_rows()) { _chunk_memory_usage += chunk.memory_usage() * size / chunk.num_rows(); _chunk_bytes_usage += _chunk->bytes_usage(cur_row_count, size); + _total_rows += chunk.num_rows(); } // if memtable is full, push it to the flush executor, @@ -303,6 +308,7 @@ void MemTable::_aggregate(bool is_final) { // impossible finish DCHECK(!_aggregator->is_finish()); DCHECK(_aggregator->source_exhausted()); + _merged_rows = _aggregator->merged_rows(); if (is_final) { _result_chunk.reset(); diff --git a/be/src/storage/memtable.h b/be/src/storage/memtable.h index 1c3c0d64e6393..5821d14764716 100644 --- a/be/src/storage/memtable.h +++ b/be/src/storage/memtable.h @@ -41,6 +41,7 @@ class MemTable { // buffer memory usage for write segment size_t write_buffer_size() const; + size_t write_buffer_rows() const; // return true suggests caller should flush this memory table bool insert(const Chunk& chunk, const uint32_t* indexes, uint32_t from, uint32_t size); @@ -51,6 +52,8 @@ class MemTable { bool is_full() const; + void set_write_buffer_row(size_t max_buffer_row) { _max_buffer_row = max_buffer_row; } + static Schema convert_schema(const TabletSchema* tablet_schema, const std::vector* slot_descs); private: @@ -92,6 +95,10 @@ class MemTable { std::string _merge_condition; int64_t _max_buffer_size = config::write_buffer_size; + // initial value is max size + size_t _max_buffer_row = -1; + size_t _total_rows = 0; + size_t _merged_rows = 0; // memory statistic MemTracker* _mem_tracker = nullptr; diff --git a/be/src/storage/rowset/rowset.cpp b/be/src/storage/rowset/rowset.cpp index 83cb2beb935e0..eb857223a4de4 100644 --- a/be/src/storage/rowset/rowset.cpp +++ b/be/src/storage/rowset/rowset.cpp @@ -166,6 +166,24 @@ Status Rowset::reload() { return Status::OK(); } +Status Rowset::reload_segment(int32_t segment_id) { + DCHECK(_segments.size() > segment_id); + if (_segments.size() <= segment_id) { + LOG(WARNING) << "Error segment id: " << segment_id; + return Status::InternalError("Error segment id"); + } + ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_rowset_path)); + size_t footer_size_hint = 16 * 1024; + std::string seg_path = segment_file_path(_rowset_path, rowset_id(), segment_id); + auto res = Segment::open(fs, seg_path, segment_id, _schema, &footer_size_hint); + if (!res.ok()) { + LOG(WARNING) << "Fail to open " << seg_path << ": " << res.status(); + return res.status(); + } + _segments[segment_id] = std::move(res).value(); + return Status::OK(); +} + StatusOr Rowset::estimate_compaction_segment_iterator_num() { if (num_segments() == 0) { return 0; diff --git a/be/src/storage/rowset/rowset.h b/be/src/storage/rowset/rowset.h index 212b6df1c20ef..00386113fcfdb 100644 --- a/be/src/storage/rowset/rowset.h +++ b/be/src/storage/rowset/rowset.h @@ -140,6 +140,7 @@ class Rowset : public std::enable_shared_from_this { // reload this rowset after the underlying segment file is changed Status reload(); + Status reload_segment(int32_t segment_id); const TabletSchema& schema() const { return *_schema; } void set_schema(const TabletSchema* schema) { _schema = schema; } diff --git a/be/src/storage/rowset_update_state.cpp b/be/src/storage/rowset_update_state.cpp index db359f4be7e79..19c336acdfaaf 100644 --- a/be/src/storage/rowset_update_state.cpp +++ b/be/src/storage/rowset_update_state.cpp @@ -3,6 +3,7 @@ #include "rowset_update_state.h" #include "common/tracer.h" +#include "fs/fs_util.h" #include "gutil/strings/substitute.h" #include "serde/column_array_serde.h" #include "storage/chunk_helper.h" @@ -163,7 +164,8 @@ Status RowsetUpdateState::_do_load(Tablet* tablet, Rowset* rowset) { if (!_check_partial_update(rowset)) { return Status::OK(); } - return _prepare_partial_update_states(tablet, rowset); + + return _prepare_partial_update_states(tablet, rowset, 0, true); } Status RowsetUpdateState::load_deletes(Rowset* rowset, uint32_t idx) { @@ -289,7 +291,17 @@ void RowsetUpdateState::plan_read_by_rssid(const vector& rowids, size_ } } -Status RowsetUpdateState::_prepare_partial_update_states(Tablet* tablet, Rowset* rowset) { +// Assume segment idx has been loaded and _upserts[idx] is not null +// The caller should make sure `load_upserts` has been called success before call this function +Status RowsetUpdateState::_prepare_partial_update_states(Tablet* tablet, Rowset* rowset, uint32_t idx, bool need_lock) { + if (_partial_update_states.size() == 0) { + _partial_update_states.resize(rowset->num_segments()); + } + + if (_partial_update_states[idx].inited == true) { + return Status::OK(); + } + int64_t t_start = MonotonicMillis(); const auto& txn_meta = rowset->rowset_meta()->get_meta_pb().txn_meta(); const auto& tablet_schema = tablet->tablet_schema(); @@ -305,57 +317,52 @@ Status RowsetUpdateState::_prepare_partial_update_states(Tablet* tablet, Rowset* } } + DCHECK(_upserts[idx] != nullptr); auto read_column_schema = ChunkHelper::convert_schema_to_format_v2(tablet_schema, read_column_ids); std::vector> read_columns(read_column_ids.size()); - size_t num_segments = rowset->num_segments(); - _partial_update_states.resize(num_segments); - for (size_t i = 0; i < num_segments; i++) { - _partial_update_states[i].write_columns.resize(read_columns.size()); - _partial_update_states[i].src_rss_rowids.resize(_upserts[i]->size()); - for (uint32_t j = 0; j < read_columns.size(); ++j) { - auto column = ChunkHelper::column_from_field(*read_column_schema.field(j).get()); - read_columns[j] = column->clone_empty(); - _partial_update_states[i].write_columns[j] = column->clone_empty(); - } + + _partial_update_states[idx].write_columns.resize(read_columns.size()); + _partial_update_states[idx].src_rss_rowids.resize(_upserts[idx]->size()); + for (uint32_t i = 0; i < read_columns.size(); ++i) { + auto column = ChunkHelper::column_from_field(*read_column_schema.field(i).get()); + read_columns[i] = column->clone_empty(); + _partial_update_states[idx].write_columns[i] = column->clone_empty(); } int64_t t_read_index = MonotonicMillis(); - std::vector*> rss_rowids; - rss_rowids.resize(num_segments); - for (size_t i = 0; i < num_segments; ++i) { - rss_rowids[i] = &(_partial_update_states[i].src_rss_rowids); + if (need_lock) { + RETURN_IF_ERROR(tablet->updates()->prepare_partial_update_states( + tablet, _upserts[idx], &(_partial_update_states[idx].read_version), + &(_partial_update_states[idx].src_rss_rowids))); + } else { + RETURN_IF_ERROR(tablet->updates()->prepare_partial_update_states_unlock( + tablet, _upserts[idx], &(_partial_update_states[idx].read_version), + &(_partial_update_states[idx].src_rss_rowids))); } - DCHECK_EQ(_upserts.size(), num_segments); - RETURN_IF_ERROR(tablet->updates()->prepare_partial_update_states(tablet, _upserts, &_read_version, &_next_rowset_id, - &rss_rowids)); int64_t t_read_values = MonotonicMillis(); size_t total_rows = 0; // rows actually needed to be read, excluding rows with default values - size_t total_nondefault_rows = 0; - for (size_t i = 0; i < num_segments; i++) { - size_t num_default = 0; - std::map> rowids_by_rssid; - vector idxes; - plan_read_by_rssid(_partial_update_states[i].src_rss_rowids, &num_default, &rowids_by_rssid, &idxes); - total_rows += _partial_update_states[i].src_rss_rowids.size(); - total_nondefault_rows += _partial_update_states[i].src_rss_rowids.size() - num_default; - // get column values by rowid, also get default values if needed - RETURN_IF_ERROR( - tablet->updates()->get_column_values(read_column_ids, num_default > 0, rowids_by_rssid, &read_columns)); - for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) { - _partial_update_states[i].write_columns[col_idx]->append_selective(*read_columns[col_idx], idxes.data(), 0, - idxes.size()); - _memory_usage += _partial_update_states[i].write_columns[col_idx]->memory_usage(); - } + size_t num_default = 0; + std::map> rowids_by_rssid; + vector idxes; + plan_read_by_rssid(_partial_update_states[idx].src_rss_rowids, &num_default, &rowids_by_rssid, &idxes); + total_rows += _partial_update_states[idx].src_rss_rowids.size(); + RETURN_IF_ERROR( + tablet->updates()->get_column_values(read_column_ids, num_default > 0, rowids_by_rssid, &read_columns)); + for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) { + _partial_update_states[idx].write_columns[col_idx]->append_selective(*read_columns[col_idx], idxes.data(), 0, + idxes.size()); + _memory_usage += _partial_update_states[idx].write_columns[col_idx]->memory_usage(); } int64_t t_end = MonotonicMillis(); + _partial_update_states[idx].inited = true; - LOG(INFO) << Substitute( - "prepare PartialUpdateState tablet:$0 read_version:$1 #segment:$2 #row:$3(#non-default:$4) #column:$5 " - "time:$6ms(index:$7/value:$8)", - _tablet_id, _read_version.to_string(), num_segments, total_rows, total_nondefault_rows, read_columns.size(), - t_end - t_start, t_read_values - t_read_index, t_end - t_read_values); + LOG(INFO) << strings::Substitute( + "prepare PartialUpdateState tablet:$0 segment:$1 #row:$2(#non-default:$3) #column:$4 " + "time:$5ms(index:$6/value:$7)", + _tablet_id, idx, total_rows, total_rows - num_default, read_columns.size(), t_end - t_start, + t_read_values - t_read_index, t_end - t_read_values); return Status::OK(); } @@ -369,87 +376,86 @@ bool RowsetUpdateState::_check_partial_update(Rowset* rowset) { } Status RowsetUpdateState::_check_and_resolve_conflict(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, - EditVersion latest_applied_version, + uint32_t segment_id, EditVersion latest_applied_version, std::vector& read_column_ids, const PrimaryIndex& index) { - // _partial_update_states is empty which means write column is empty - if (_partial_update_states.empty()) { - return Status::InternalError("write column is empty"); + if (_partial_update_states.size() <= segment_id || !_partial_update_states[segment_id].inited) { + std::string msg = strings::Substitute( + "_check_and_reslove_conflict tablet:$0 rowset:$1 segment:$2 failed, partial_update_states size:$3", + tablet->tablet_id(), rowset_id, segment_id, _partial_update_states.size()); + LOG(WARNING) << msg; + return Status::InternalError(msg); } // _read_version is equal to latest_applied_version which means there is no other rowset is applied // the data of write_columns can be write to segment file directly - if (latest_applied_version == _read_version) { + LOG(INFO) << "latest_applied_version is " << latest_applied_version.to_string() << " read version is " + << _partial_update_states[segment_id].read_version.to_string(); + if (latest_applied_version == _partial_update_states[segment_id].read_version) { return Status::OK(); } // get rss_rowids to identify conflict exist or not int64_t t_start = MonotonicMillis(); - uint32_t num_segments = _upserts.size(); - std::vector> new_rss_rowids; - new_rss_rowids.resize(num_segments); - for (uint32_t i = 0; i < num_segments; ++i) { - auto& pks = *_upserts[i]; - new_rss_rowids[i].resize(pks.size()); - index.get(pks, &new_rss_rowids[i]); - } + std::vector new_rss_rowids(_upserts[segment_id]->size()); + index.get(*_upserts[segment_id], &new_rss_rowids); int64_t t_read_index = MonotonicMillis(); size_t total_conflicts = 0; - for (uint32_t i = 0; i < num_segments; ++i) { - uint32_t num_rows = new_rss_rowids[i].size(); - std::vector conflict_idxes; - std::vector conflict_rowids; - DCHECK_EQ(num_rows, _partial_update_states[i].src_rss_rowids.size()); - for (size_t j = 0; j < new_rss_rowids[i].size(); ++j) { - uint64_t new_rss_rowid = new_rss_rowids[i][j]; - uint32_t new_rssid = new_rss_rowid >> 32; - uint64_t rss_rowid = _partial_update_states[i].src_rss_rowids[j]; - uint32_t rssid = rss_rowid >> 32; - - if (rssid != new_rssid) { - conflict_idxes.emplace_back(j); - conflict_rowids.emplace_back(new_rss_rowid); - } + uint32_t num_rows = new_rss_rowids.size(); + std::vector conflict_idxes; + std::vector conflict_rowids; + DCHECK_EQ(num_rows, _partial_update_states[segment_id].src_rss_rowids.size()); + for (size_t i = 0; i < new_rss_rowids.size(); ++i) { + uint64_t new_rss_rowid = new_rss_rowids[i]; + uint32_t new_rssid = new_rss_rowid >> 32; + uint64_t rss_rowid = _partial_update_states[segment_id].src_rss_rowids[i]; + uint32_t rssid = rss_rowid >> 32; + + if (rssid != new_rssid) { + conflict_idxes.emplace_back(i); + conflict_rowids.emplace_back(new_rss_rowid); } - if (!conflict_idxes.empty()) { - total_conflicts += conflict_idxes.size(); - std::vector> read_columns; - read_columns.resize(_partial_update_states[i].write_columns.size()); - for (uint32_t j = 0; j < read_columns.size(); ++j) { - read_columns[j] = _partial_update_states[i].write_columns[j]->clone_empty(); - } - size_t num_default = 0; - std::map> rowids_by_rssid; - std::vector read_idxes; - plan_read_by_rssid(conflict_rowids, &num_default, &rowids_by_rssid, &read_idxes); - DCHECK_EQ(conflict_idxes.size(), read_idxes.size()); - RETURN_IF_ERROR(tablet->updates()->get_column_values(read_column_ids, num_default > 0, rowids_by_rssid, - &read_columns)); - - for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) { - std::unique_ptr new_write_column = - _partial_update_states[i].write_columns[col_idx]->clone_empty(); - new_write_column->append_selective(*read_columns[col_idx], read_idxes.data(), 0, read_idxes.size()); - RETURN_IF_ERROR(_partial_update_states[i].write_columns[col_idx]->update_rows(*new_write_column, - conflict_idxes.data())); - } + } + if (!conflict_idxes.empty()) { + total_conflicts += conflict_idxes.size(); + std::vector> read_columns; + read_columns.resize(_partial_update_states[segment_id].write_columns.size()); + for (uint32_t i = 0; i < read_columns.size(); ++i) { + read_columns[i] = _partial_update_states[segment_id].write_columns[i]->clone_empty(); + } + size_t num_default = 0; + std::map> rowids_by_rssid; + std::vector read_idxes; + plan_read_by_rssid(conflict_rowids, &num_default, &rowids_by_rssid, &read_idxes); + DCHECK_EQ(conflict_idxes.size(), read_idxes.size()); + RETURN_IF_ERROR( + tablet->updates()->get_column_values(read_column_ids, num_default > 0, rowids_by_rssid, &read_columns)); + + for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) { + std::unique_ptr new_write_column = + _partial_update_states[segment_id].write_columns[col_idx]->clone_empty(); + new_write_column->append_selective(*read_columns[col_idx], read_idxes.data(), 0, read_idxes.size()); + RETURN_IF_ERROR(_partial_update_states[segment_id].write_columns[col_idx]->update_rows( + *new_write_column, conflict_idxes.data())); } } int64_t t_end = MonotonicMillis(); - LOG(INFO) << Substitute( - "_check_and_resolve_conflict tablet:$0 rowset:$1 version:($2 $3) #conflict-row:$4 #column:$5 " - "time:$6ms(index:$7/value:$8)", - tablet->tablet_id(), rowset_id, _read_version.to_string(), latest_applied_version.to_string(), - total_conflicts, read_column_ids.size(), t_end - t_start, t_read_index - t_start, t_end - t_read_index); + LOG(INFO) << strings::Substitute( + "_check_and_resolve_conflict tablet:$0 rowset:$1 segmet:$2 version:($3 $4) #conflict-row:$5 #column:$6 " + "time:$7ms(index:$8/value:$9)", + tablet->tablet_id(), rowset_id, segment_id, _partial_update_states[segment_id].read_version.to_string(), + latest_applied_version.to_string(), total_conflicts, read_column_ids.size(), t_end - t_start, + t_read_index - t_start, t_end - t_read_index); return Status::OK(); } -Status RowsetUpdateState::apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, EditVersion latest_applied_version, - const PrimaryIndex& index) { +Status RowsetUpdateState::apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, uint32_t segment_id, + EditVersion latest_applied_version, const PrimaryIndex& index) { const auto& rowset_meta_pb = rowset->rowset_meta()->get_meta_pb(); - if (!_check_partial_update(rowset)) { + if (!rowset_meta_pb.has_txn_meta() || rowset->num_segments() == 0 || + rowset_meta_pb.txn_meta().has_merge_condition()) { return Status::OK(); } // currently assume it's a partial update @@ -467,57 +473,40 @@ Status RowsetUpdateState::apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_ } } - size_t num_segments = rowset->num_segments(); - DCHECK(num_segments == _upserts.size()); - vector> rewrite_files; - DeferOp clean_temp_files([&] { - for (auto& e : rewrite_files) { - FileSystem::Default()->delete_file(e.second); - } - }); - bool is_rewrite = config::rewrite_partial_segment; - RETURN_IF_ERROR( - _check_and_resolve_conflict(tablet, rowset, rowset_id, latest_applied_version, read_column_ids, index)); - - for (size_t i = 0; i < num_segments; i++) { - auto src_path = Rowset::segment_file_path(tablet->schema_hash_path(), rowset->rowset_id(), i); - auto dest_path = Rowset::segment_temp_file_path(tablet->schema_hash_path(), rowset->rowset_id(), i); - rewrite_files.emplace_back(src_path, dest_path); - - int64_t t_rewrite_start = MonotonicMillis(); - FooterPointerPB partial_rowset_footer = txn_meta.partial_rowset_footers(i); - // if is_rewrite is true, rewrite partial segment file into dest_path first, then append write_columns - // if is_rewrite is false, append write_columns into src_path and rebuild segment footer - if (is_rewrite) { - RETURN_IF_ERROR(SegmentRewriter::rewrite(src_path, dest_path, tablet->tablet_schema(), read_column_ids, - _partial_update_states[i].write_columns, i, - partial_rowset_footer)); - } else { - RETURN_IF_ERROR(SegmentRewriter::rewrite(src_path, tablet->tablet_schema(), read_column_ids, - _partial_update_states[i].write_columns, i, - partial_rowset_footer)); - } - int64_t t_rewrite_end = MonotonicMillis(); - LOG(INFO) << Substitute("apply partial segment tablet:$0 rowset:$1 seg:$2 #column:$3 #rewrite:$4ms", - tablet->tablet_id(), rowset_id, i, read_column_ids.size(), - t_rewrite_end - t_rewrite_start); - } - if (is_rewrite) { - for (size_t i = 0; i < num_segments; i++) { - RETURN_IF_ERROR(FileSystem::Default()->rename_file(rewrite_files[i].second, rewrite_files[i].first)); - } - } - // clean this to prevent DeferOp clean files - rewrite_files.clear(); - RETURN_IF_ERROR(rowset->reload()); - for (size_t i = 0; i < _partial_update_states.size(); i++) { - for (size_t col_idx = 0; col_idx < _partial_update_states[i].write_columns.size(); col_idx++) { - if (_partial_update_states[i].write_columns[col_idx] != nullptr) { - _memory_usage -= _partial_update_states[i].write_columns[col_idx]->memory_usage(); - _partial_update_states[i].write_columns[col_idx].reset(); - } + DCHECK(_upserts[segment_id] != nullptr); + if (_partial_update_states.size() == 0 || !_partial_update_states[segment_id].inited) { + RETURN_IF_ERROR(_prepare_partial_update_states(tablet, rowset, segment_id, false)); + } else { + // reslove conflict of segment + RETURN_IF_ERROR(_check_and_resolve_conflict(tablet, rowset, rowset_id, segment_id, latest_applied_version, + read_column_ids, index)); + } + + auto src_path = Rowset::segment_file_path(tablet->schema_hash_path(), rowset->rowset_id(), segment_id); + auto dest_path = Rowset::segment_temp_file_path(tablet->schema_hash_path(), rowset->rowset_id(), segment_id); + DeferOp clean_temp_files([&] { FileSystem::Default()->delete_file(dest_path); }); + int64_t t_rewrite_start = MonotonicMillis(); + FooterPointerPB partial_rowset_footer = txn_meta.partial_rowset_footers(segment_id); + RETURN_IF_ERROR(SegmentRewriter::rewrite(src_path, dest_path, tablet->tablet_schema(), read_column_ids, + _partial_update_states[segment_id].write_columns, segment_id, + partial_rowset_footer)); + int64_t t_rewrite_end = MonotonicMillis(); + LOG(INFO) << strings::Substitute("apply partial segment tablet:$0 rowset:$1 seg:$2 #column:$3 #rewrite:$4ms", + tablet->tablet_id(), rowset_id, segment_id, read_column_ids.size(), + t_rewrite_end - t_rewrite_start); + + // we should reload segment after rewrite segment file because we may read data from the segment during + // the subsequent apply process. And the segment will be treated as a full segment, so we must reload + // segment[segment_id] of partial rowset + FileSystem::Default()->rename_file(dest_path, src_path); + RETURN_IF_ERROR(rowset->reload_segment(segment_id)); + + for (size_t col_idx = 0; col_idx < _partial_update_states[segment_id].write_columns.size(); col_idx++) { + if (_partial_update_states[segment_id].write_columns[col_idx] != nullptr) { + _memory_usage -= _partial_update_states[segment_id].write_columns[col_idx]->memory_usage(); } } + _partial_update_states[segment_id].release(); return Status::OK(); } diff --git a/be/src/storage/rowset_update_state.h b/be/src/storage/rowset_update_state.h index 5106254db13f1..64f84d604b34a 100644 --- a/be/src/storage/rowset_update_state.h +++ b/be/src/storage/rowset_update_state.h @@ -16,6 +16,19 @@ class Tablet; struct PartialUpdateState { std::vector src_rss_rowids; std::vector> write_columns; + bool inited = false; + EditVersion read_version; + + void release() { + src_rss_rowids.clear(); + for (size_t i = 0; i < write_columns.size(); i++) { + if (write_columns[i] != nullptr) { + write_columns[i].reset(); + } + } + write_columns.clear(); + inited = false; + } }; class RowsetUpdateState { @@ -27,8 +40,8 @@ class RowsetUpdateState { Status load(Tablet* tablet, Rowset* rowset); - Status apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, EditVersion latest_applied_version, - const PrimaryIndex& index); + Status apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, uint32_t segment_id, + EditVersion latest_applied_version, const PrimaryIndex& index); const std::vector& upserts() const { return _upserts; } const std::vector& deletes() const { return _deletes; } @@ -41,9 +54,11 @@ class RowsetUpdateState { // call check conflict directly // only use for ut of partial update - Status test_check_conflict(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, EditVersion latest_applied_version, - std::vector& read_column_ids, const PrimaryIndex& index) { - return _check_and_resolve_conflict(tablet, rowset, rowset_id, latest_applied_version, read_column_ids, index); + Status test_check_conflict(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, uint32_t segment_id, + EditVersion latest_applied_version, std::vector& read_column_ids, + const PrimaryIndex& index) { + return _check_and_resolve_conflict(tablet, rowset, rowset_id, segment_id, latest_applied_version, + read_column_ids, index); } static void plan_read_by_rssid(const vector& rowids, size_t* num_default, @@ -60,9 +75,15 @@ class RowsetUpdateState { Status _do_load(Tablet* tablet, Rowset* rowset); - Status _prepare_partial_update_states(Tablet* tablet, Rowset* rowset); + // `need_lock` means whether the `_index_lock` in TabletUpdates needs to held. + // `index_lock` is used to avoid access the PrimaryIndex at the same time as the apply thread. + // This function will be called in two places, one is the commit phase and the other is the apply phase. + // In rowset commit phase, `need_lock` should be set as true to prevent concurrent access. + // In rowset apply phase, `_index_lock` is already held by apply thread, `need_lock` should be set as false + // to avoid dead lock. + Status _prepare_partial_update_states(Tablet* tablet, Rowset* rowset, uint32_t idx, bool need_lock); - Status _check_and_resolve_conflict(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, + Status _check_and_resolve_conflict(Tablet* tablet, Rowset* rowset, uint32_t rowset_id, uint32_t segment_id, EditVersion latest_applied_version, std::vector& read_column_ids, const PrimaryIndex& index); @@ -77,10 +98,6 @@ class RowsetUpdateState { size_t _memory_usage = 0; int64_t _tablet_id = 0; - // states for partial update - EditVersion _read_version; - uint32_t _next_rowset_id = 0; - // TODO: dump to disk if memory usage is too large std::vector _partial_update_states; diff --git a/be/src/storage/tablet_updates.cpp b/be/src/storage/tablet_updates.cpp index e1b1fe5b3b5a2..de856a60c22e8 100644 --- a/be/src/storage/tablet_updates.cpp +++ b/be/src/storage/tablet_updates.cpp @@ -830,24 +830,7 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) { return; } - span->AddEvent("reslove_conflict"); - int64_t t_load = MonotonicMillis(); - EditVersion latest_applied_version; - st = get_latest_applied_version(&latest_applied_version); - if (st.ok()) { - st = state.apply(&_tablet, rowset.get(), rowset_id, latest_applied_version, index); - manager->update_state_cache().update_object_size(state_entry, state.memory_usage()); - } - if (!st.ok()) { - manager->update_state_cache().remove(state_entry); - std::string msg = Substitute("_apply_rowset_commit error: apply rowset update state failed: $0 $1", - st.to_string(), debug_string()); - LOG(ERROR) << msg; - _set_error(msg); - return; - } int64_t t_apply = MonotonicMillis(); - std::int32_t conditional_column = -1; const auto& txn_meta = rowset->rowset_meta()->get_meta_pb().txn_meta(); if (txn_meta.has_merge_condition()) { @@ -867,11 +850,24 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) { for (uint32_t i = 0; i < rowset->num_segments(); i++) { new_deletes[rowset_id + i] = {}; } + EditVersion latest_applied_version; + st = get_latest_applied_version(&latest_applied_version); for (uint32_t i = 0; i < rowset->num_segments(); i++) { state.load_upserts(rowset.get(), i); auto& upserts = state.upserts(); if (upserts[i] != nullptr) { + // apply partial rowset segment + st = state.apply(&_tablet, rowset.get(), rowset_id, i, latest_applied_version, index); + if (!st.ok()) { + manager->update_state_cache().remove(state_entry); + std::string msg = + strings::Substitute("_apply_rowset_commit error: apply rowset update state failed: $0 $1", + st.to_string(), debug_string()); + LOG(ERROR) << msg; + _set_error(msg); + return; + } _do_update(rowset_id, i, conditional_column, upserts, index, tablet_id, &new_deletes); manager->index_cache().update_object_size(index_entry, index.memory_usage()); } @@ -1057,8 +1053,8 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) { << " " << del_percent << "% rowset:" << rowset_id << " #seg:" << rowset->num_segments() << " #op(upsert:" << rowset->num_rows() << " del:" << delete_op << ") #del:" << old_total_del << "+" << new_del << "=" << total_del << " #dv:" << ndelvec << " duration:" << t_write - t_start << "ms" - << Substitute("($0/$1/$2/$3/$4)", t_load - t_start, t_apply - t_load, t_index - t_apply, - t_delvec - t_index, t_write - t_delvec); + << strings::Substitute("($0/$1/$2/$3)", t_apply - t_start, t_index - t_apply, t_delvec - t_index, + t_write - t_delvec); VLOG(1) << "rowset commit apply " << delvec_change_info << " " << _debug_string(true, true); } @@ -2146,6 +2142,18 @@ void TabletUpdates::get_tablet_info_extra(TTabletInfo* info) { info->__set_data_size(total_size); } +int64_t TabletUpdates::get_average_row_size() { + TTabletInfo info; + get_tablet_info_extra(&info); + int64_t total_row = info.row_count; + int64_t total_size = info.data_size; + if (total_row != 0) { + return total_size / total_row; + } else { + return 0; + } +} + std::string TabletUpdates::RowsetStats::to_string() const { return Substitute("[seg:$0 row:$1 del:$2 bytes:$3 compaction:$4]", num_segments, num_rows, num_dels, byte_size, compaction_score); @@ -3302,6 +3310,7 @@ Status TabletUpdates::get_column_values(std::vector& column_ids, bool } std::string seg_path = Rowset::segment_file_path(rowset->rowset_path(), rowset->rowset_id(), rssid - iter->first); + LOG(INFO) << "segment path is " << seg_path; auto segment = Segment::open(fs, seg_path, rssid - iter->first, &rowset->schema()); if (!segment.ok()) { LOG(WARNING) << "Fail to open " << seg_path << ": " << segment.status(); @@ -3326,10 +3335,15 @@ Status TabletUpdates::get_column_values(std::vector& column_ids, bool return Status::OK(); } -Status TabletUpdates::prepare_partial_update_states(Tablet* tablet, const std::vector& upserts, - EditVersion* read_version, uint32_t* next_rowset_id, - std::vector*>* rss_rowids) { +Status TabletUpdates::prepare_partial_update_states(Tablet* tablet, const ColumnUniquePtr& upsert, + EditVersion* read_version, std::vector* rss_rowids) { std::lock_guard lg(_index_lock); + return prepare_partial_update_states_unlock(tablet, upsert, read_version, rss_rowids); +} + +Status TabletUpdates::prepare_partial_update_states_unlock(Tablet* tablet, const ColumnUniquePtr& upsert, + EditVersion* read_version, + std::vector* rss_rowids) { { // get next_rowset_id and read_version to identify conflict std::lock_guard wl(_lock); @@ -3338,10 +3352,8 @@ Status TabletUpdates::prepare_partial_update_states(Tablet* tablet, const std::v LOG(WARNING) << msg; return Status::InternalError(msg); } - *next_rowset_id = _next_rowset_id; *read_version = _edit_version_infos[_apply_version_idx]->version; } - auto manager = StorageEngine::instance()->update_manager(); auto index_entry = manager->index_cache().get_or_create(tablet->tablet_id()); index_entry->update_expire_time(MonotonicMillis() + manager->get_cache_expire_ms()); @@ -3358,12 +3370,7 @@ Status TabletUpdates::prepare_partial_update_states(Tablet* tablet, const std::v return Status::InternalError(msg); } - // get rss_rowids for each segment of rowset - uint32_t num_segments = upserts.size(); - for (size_t i = 0; i < num_segments; i++) { - auto& pks = *upserts[i]; - index.get(pks, (*rss_rowids)[i]); - } + index.get(*upsert, rss_rowids); // if `enable_persistent_index` of tablet is change(maybe changed by alter table) // we should try to remove the index_entry from cache diff --git a/be/src/storage/tablet_updates.h b/be/src/storage/tablet_updates.h index 658582c6d3c64..dc3c1a1c13725 100644 --- a/be/src/storage/tablet_updates.h +++ b/be/src/storage/tablet_updates.h @@ -146,6 +146,9 @@ class TabletUpdates { // get info's version, version_count, row_count, data_size void get_tablet_info_extra(TTabletInfo* info); + // get average row size + int64_t get_average_row_size(); + std::string debug_string() const; // Return nullptr if the delta rowset does not exist. @@ -225,9 +228,16 @@ class TabletUpdates { std::map>& rowids_by_rssid, vector>* columns); + /* Status prepare_partial_update_states(Tablet* tablet, const std::vector& upserts, EditVersion* read_version, uint32_t* next_rowset_id, std::vector*>* rss_rowids); + */ + Status prepare_partial_update_states(Tablet* tablet, const ColumnUniquePtr& upserts, EditVersion* read_version, + std::vector* rss_rowids); + + Status prepare_partial_update_states_unlock(Tablet* tablet, const ColumnUniquePtr& upserts, + EditVersion* read_version, std::vector* rss_rowids); Status get_missing_version_ranges(std::vector& missing_version_ranges); diff --git a/be/test/storage/rowset/rowset_test.cpp b/be/test/storage/rowset/rowset_test.cpp index d1b8bec07eeb7..43635b5775f1b 100644 --- a/be/test/storage/rowset/rowset_test.cpp +++ b/be/test/storage/rowset/rowset_test.cpp @@ -665,6 +665,7 @@ TEST_F(RowsetTest, FinalMergeVerticalPartialTest) { } auto rowset = rowset_writer->build().value(); + rowset->set_schema(&tablet->tablet_schema()); ASSERT_TRUE(rowset != nullptr); ASSERT_EQ(3, rowset->rowset_meta()->num_segments()); ASSERT_EQ(rows_per_segment * 3, rowset->rowset_meta()->num_rows()); diff --git a/be/test/storage/rowset_update_state_test.cpp b/be/test/storage/rowset_update_state_test.cpp index cd389dad62665..22f550bd5ca80 100644 --- a/be/test/storage/rowset_update_state_test.cpp +++ b/be/test/storage/rowset_update_state_test.cpp @@ -312,7 +312,7 @@ TEST_F(RowsetUpdateStateTest, check_conflict) { st = index.load(_tablet.get()); std::vector read_column_ids = {2}; state.test_check_conflict(_tablet.get(), partial_rowset.get(), partial_rowset->rowset_meta()->get_rowset_seg_id(), - latest_applied_version, read_column_ids, index); + 0, latest_applied_version, read_column_ids, index); // check data of write column const std::vector& new_parital_update_states = state.parital_update_states(); diff --git a/be/test/storage/tablet_updates_test.cpp b/be/test/storage/tablet_updates_test.cpp index 11435d0dc7143..2706520de3520 100644 --- a/be/test/storage/tablet_updates_test.cpp +++ b/be/test/storage/tablet_updates_test.cpp @@ -2219,6 +2219,7 @@ void TabletUpdatesTest::load_snapshot(const std::string& meta_dir, const TabletS ASSIGN_OR_ABORT(auto read_file, fs->new_random_access_file(segment_path)); ASSERT_TRUE(Segment::parse_segment_footer(read_file.get(), footer, nullptr, nullptr).ok()); + LOG(INFO) << "parse segment footer success"; } void TabletUpdatesTest::test_load_snapshot_incremental_with_partial_rowset_old(bool enable_persistent_index) {