Skip to content

Commit

Permalink
[BugFix] Re-Order operation on a table with seperated primary keys an…
Browse files Browse the repository at this point in the history
…d sort keys may cause data corruption (StarRocks#27850)

This pr fixes two bugs:
1. When we reorder rowsets, we will read a base chunk and reorder it as
a new chunk. However, we don't clear the previous base chunk data after
we finish reorder the base chunk. So we may write duplicate data into
new rowset.
2. We still use primary key columns but not sort key columns to reorder
the data chunk which may cause the data order does not meet the
expectations.

Signed-off-by: zhangqiang <[email protected]>
  • Loading branch information
sevev committed Aug 1, 2023
1 parent e53115e commit cb182b7
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
47 changes: 29 additions & 18 deletions be/src/storage/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,43 +47,47 @@ namespace starrocks::vectorized {

using ChunkRow = std::pair<size_t, Chunk*>;

int compare_chunk_row(const ChunkRow& lhs, const ChunkRow& rhs) {
for (uint16_t i = 0; i < lhs.second->schema()->num_key_fields(); ++i) {
int res = lhs.second->get_column_by_index(i)->compare_at(lhs.first, rhs.first,
*rhs.second->get_column_by_index(i), -1);
int compare_chunk_row(const ChunkRow& lhs, const ChunkRow& rhs, const std::vector<ColumnId>& sort_key_idxes) {
for (uint16_t i = 0; i < sort_key_idxes.size(); ++i) {
int res = lhs.second->get_column_by_index(sort_key_idxes[i])
->compare_at(lhs.first, rhs.first, *rhs.second->get_column_by_index(sort_key_idxes[i]), -1);
if (res != 0) {
return res;
}
}
return 0;
}

struct MergeElement;
// TODO: optimize it with vertical sort
class ChunkMerger {
public:
explicit ChunkMerger(TabletSharedPtr tablet);
explicit ChunkMerger(TabletSharedPtr tablet, std::vector<ColumnId> sort_key_idxes);
virtual ~ChunkMerger();

bool merge(std::vector<ChunkPtr>& chunk_arr, RowsetWriter* rowset_writer);
static void aggregate_chunk(ChunkAggregator& aggregator, ChunkPtr& chunk, RowsetWriter* rowset_writer);

private:
struct MergeElement {
bool operator<(const MergeElement& other) const {
return compare_chunk_row(std::make_pair(row_index, chunk), std::make_pair(other.row_index, other.chunk)) >
0;
}

Chunk* chunk;
size_t row_index;
};

friend class MergeElement;
bool _make_heap(std::vector<ChunkPtr>& chunk_arr);
bool _pop_heap();

TabletSharedPtr _tablet;
std::priority_queue<MergeElement> _heap;
std::unique_ptr<ChunkAggregator> _aggregator;
std::vector<ColumnId> _sort_key_idxes;
};

struct MergeElement {
bool operator<(const MergeElement& other) const {
return compare_chunk_row(std::make_pair(row_index, chunk), std::make_pair(other.row_index, other.chunk),
_merger->_sort_key_idxes) > 0;
}

Chunk* chunk;
size_t row_index;
ChunkMerger* _merger;
};

ChunkSorter::ChunkSorter(ChunkAllocator* chunk_allocator) : _swap_chunk(nullptr) {}
Expand Down Expand Up @@ -153,7 +157,8 @@ Status ChunkAllocator::allocate(ChunkPtr& chunk, size_t num_rows, Schema& schema
return Status::OK();
}

ChunkMerger::ChunkMerger(TabletSharedPtr tablet) : _tablet(std::move(tablet)), _aggregator(nullptr) {}
ChunkMerger::ChunkMerger(TabletSharedPtr tablet, std::vector<ColumnId> sort_key_idxes)
: _tablet(std::move(tablet)), _aggregator(nullptr), _sort_key_idxes(std::move(sort_key_idxes)) {}

ChunkMerger::~ChunkMerger() {
if (_aggregator != nullptr) {
Expand Down Expand Up @@ -247,6 +252,7 @@ bool ChunkMerger::_make_heap(std::vector<ChunkPtr>& chunk_arr) {
MergeElement element;
element.chunk = chunk.get();
element.row_index = 0;
element._merger = this;

_heap.push(element);
}
Expand Down Expand Up @@ -662,8 +668,13 @@ bool SchemaChangeWithSorting::_internal_sorting(std::vector<ChunkPtr>& chunk_arr
}
}

ChunkMerger merger(std::move(tablet));
if (!merger.merge(chunk_arr, new_rowset_writer)) {
std::vector<ColumnId> sort_key_idxes = tablet->tablet_schema().sort_key_idxes();
if (sort_key_idxes.empty()) {
sort_key_idxes.resize(tablet->tablet_schema().num_key_columns());
std::iota(sort_key_idxes.begin(), sort_key_idxes.end(), 0);
}
ChunkMerger merger(std::move(tablet), std::move(sort_key_idxes));
if (merger.merge(chunk_arr, new_rowset_writer)) {
LOG(WARNING) << "merge chunk arr failed";
return false;
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3076,15 +3076,14 @@ Status TabletUpdates::reorder_from(const std::shared_ptr<Tablet>& base_tablet, i
}

vectorized::ChunkPtr base_chunk = ChunkHelper::new_chunk(base_schema, config::vector_chunk_size);

vectorized::Schema new_schema = ChunkHelper::convert_schema_to_format_v2(_tablet.tablet_schema());
vectorized::ChunkPtr new_chunk = ChunkHelper::new_chunk(new_schema, config::vector_chunk_size);

for (auto& seg_iterator : seg_iterators) {
if (seg_iterator.get() == nullptr) {
continue;
}
}
while (true) {
vectorized::ChunkPtr new_chunk = ChunkHelper::new_chunk(new_schema, config::vector_chunk_size);
base_chunk->reset();
Status status = seg_iterator->get_next(base_chunk.get());
if (!status.ok()) {
Expand Down

0 comments on commit cb182b7

Please sign in to comment.