From cb182b7fba3977ddbb10a7fe37d09491153209f5 Mon Sep 17 00:00:00 2001 From: zhangqiang Date: Fri, 28 Jul 2023 14:29:20 +0800 Subject: [PATCH] [BugFix] Re-Order operation on a table with seperated primary keys and sort keys may cause data corruption (#27850) This pr fixes two bugs: 1. When we reorder rowsets, we will read a base chunk and reorder it as a new chunk. However, we don't clear the previous base chunk data after we finish reorder the base chunk. So we may write duplicate data into new rowset. 2. We still use primary key columns but not sort key columns to reorder the data chunk which may cause the data order does not meet the expectations. Signed-off-by: zhangqiang --- be/src/storage/schema_change.cpp | 47 +++++++++++++++++++------------ be/src/storage/tablet_updates.cpp | 5 ++-- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/be/src/storage/schema_change.cpp b/be/src/storage/schema_change.cpp index 6ec42aeac108e..ad7353f481ec9 100644 --- a/be/src/storage/schema_change.cpp +++ b/be/src/storage/schema_change.cpp @@ -47,10 +47,10 @@ namespace starrocks::vectorized { using ChunkRow = std::pair; -int compare_chunk_row(const ChunkRow& lhs, const ChunkRow& rhs) { - for (uint16_t i = 0; i < lhs.second->schema()->num_key_fields(); ++i) { - int res = lhs.second->get_column_by_index(i)->compare_at(lhs.first, rhs.first, - *rhs.second->get_column_by_index(i), -1); +int compare_chunk_row(const ChunkRow& lhs, const ChunkRow& rhs, const std::vector& sort_key_idxes) { + for (uint16_t i = 0; i < sort_key_idxes.size(); ++i) { + int res = lhs.second->get_column_by_index(sort_key_idxes[i]) + ->compare_at(lhs.first, rhs.first, *rhs.second->get_column_by_index(sort_key_idxes[i]), -1); if (res != 0) { return res; } @@ -58,32 +58,36 @@ int compare_chunk_row(const ChunkRow& lhs, const ChunkRow& rhs) { return 0; } +struct MergeElement; // TODO: optimize it with vertical sort class ChunkMerger { public: - explicit ChunkMerger(TabletSharedPtr tablet); + explicit ChunkMerger(TabletSharedPtr tablet, std::vector sort_key_idxes); virtual ~ChunkMerger(); bool merge(std::vector& chunk_arr, RowsetWriter* rowset_writer); static void aggregate_chunk(ChunkAggregator& aggregator, ChunkPtr& chunk, RowsetWriter* rowset_writer); private: - struct MergeElement { - bool operator<(const MergeElement& other) const { - return compare_chunk_row(std::make_pair(row_index, chunk), std::make_pair(other.row_index, other.chunk)) > - 0; - } - - Chunk* chunk; - size_t row_index; - }; - + friend class MergeElement; bool _make_heap(std::vector& chunk_arr); bool _pop_heap(); TabletSharedPtr _tablet; std::priority_queue _heap; std::unique_ptr _aggregator; + std::vector _sort_key_idxes; +}; + +struct MergeElement { + bool operator<(const MergeElement& other) const { + return compare_chunk_row(std::make_pair(row_index, chunk), std::make_pair(other.row_index, other.chunk), + _merger->_sort_key_idxes) > 0; + } + + Chunk* chunk; + size_t row_index; + ChunkMerger* _merger; }; ChunkSorter::ChunkSorter(ChunkAllocator* chunk_allocator) : _swap_chunk(nullptr) {} @@ -153,7 +157,8 @@ Status ChunkAllocator::allocate(ChunkPtr& chunk, size_t num_rows, Schema& schema return Status::OK(); } -ChunkMerger::ChunkMerger(TabletSharedPtr tablet) : _tablet(std::move(tablet)), _aggregator(nullptr) {} +ChunkMerger::ChunkMerger(TabletSharedPtr tablet, std::vector sort_key_idxes) + : _tablet(std::move(tablet)), _aggregator(nullptr), _sort_key_idxes(std::move(sort_key_idxes)) {} ChunkMerger::~ChunkMerger() { if (_aggregator != nullptr) { @@ -247,6 +252,7 @@ bool ChunkMerger::_make_heap(std::vector& chunk_arr) { MergeElement element; element.chunk = chunk.get(); element.row_index = 0; + element._merger = this; _heap.push(element); } @@ -662,8 +668,13 @@ bool SchemaChangeWithSorting::_internal_sorting(std::vector& chunk_arr } } - ChunkMerger merger(std::move(tablet)); - if (!merger.merge(chunk_arr, new_rowset_writer)) { + std::vector sort_key_idxes = tablet->tablet_schema().sort_key_idxes(); + if (sort_key_idxes.empty()) { + sort_key_idxes.resize(tablet->tablet_schema().num_key_columns()); + std::iota(sort_key_idxes.begin(), sort_key_idxes.end(), 0); + } + ChunkMerger merger(std::move(tablet), std::move(sort_key_idxes)); + if (merger.merge(chunk_arr, new_rowset_writer)) { LOG(WARNING) << "merge chunk arr failed"; return false; } diff --git a/be/src/storage/tablet_updates.cpp b/be/src/storage/tablet_updates.cpp index bd206e7b628ae..df0209b613d61 100644 --- a/be/src/storage/tablet_updates.cpp +++ b/be/src/storage/tablet_updates.cpp @@ -3076,15 +3076,14 @@ Status TabletUpdates::reorder_from(const std::shared_ptr& base_tablet, i } vectorized::ChunkPtr base_chunk = ChunkHelper::new_chunk(base_schema, config::vector_chunk_size); - vectorized::Schema new_schema = ChunkHelper::convert_schema_to_format_v2(_tablet.tablet_schema()); - vectorized::ChunkPtr new_chunk = ChunkHelper::new_chunk(new_schema, config::vector_chunk_size); - for (auto& seg_iterator : seg_iterators) { if (seg_iterator.get() == nullptr) { continue; } + } while (true) { + vectorized::ChunkPtr new_chunk = ChunkHelper::new_chunk(new_schema, config::vector_chunk_size); base_chunk->reset(); Status status = seg_iterator->get_next(base_chunk.get()); if (!status.ok()) {