Skip to content

Commit

Permalink
[Feature] Support assign sort key of primary key model by schema chan…
Browse files Browse the repository at this point in the history
…ge reorder. (#13642) (#14930)

* [cherry-pick][branch-2.5][Feature] Support assign sort key of primary key model by schema change reorder. (#13642)

``
MySQL [test]> CREATE TABLE test (
    ->   `k1` int,
    ->   `k2` int,
    ->   `k3` int,
    ->   `v1` int,
    ->   `v2` int,
    ->   `v3` int,
    ->   `v4` int,
    ->   `v5` int
    -> ) ENGINE=OLAP
    -> PRIMARY KEY(`k1`, `k2`, `k3`)
    -> COMMENT "OLAP"
    -> DISTRIBUTED BY HASH(`k1`, `k2`, `k3`) BUCKETS 1
    -> ORDER BY(`v2`, `v3`)
    -> PROPERTIES ("replication_num" = "1");
Query OK, 0 rows affected (0.11 sec)

MySQL [test]> ALTER TABLE test ORDER BY (v3, v2);
Primary key table do not support reorder column
```
should be:
```
MySQL [test]> select v2, v3 from test;
+------+------+
| v2   | v3   |
+------+------+
|    1 |    5 |
|    2 |    4 |
|    3 |    3 |
|    4 |    2 |
|    5 |    1 |
+------+------+
5 rows in set (0.02 sec)

MySQL [test]> show create table test;
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| test  | CREATE TABLE `test` (
  `k1` int(11) NOT NULL COMMENT "",
  `k2` int(11) NOT NULL COMMENT "",
  `k3` int(11) NOT NULL COMMENT "",
  `v1` int(11) NULL COMMENT "",
  `v2` int(11) NULL COMMENT "",
  `v3` int(11) NULL COMMENT "",
  `v4` int(11) NULL COMMENT "",
  `v5` int(11) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`k1`, `k2`, `k3`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`k1`, `k2`, `k3`) BUCKETS 1
ORDER BY(`v2`, `v3`)
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"compression" = "LZ4"
); |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.01 sec)

MySQL [test]> ALTER TABLE test ORDER BY (v3, v2);
Query OK, 0 rows affected (0.03 sec)

MySQL [test]> select v3, v2 from test;
+------+------+
| v3   | v2   |
+------+------+
|    1 |    5 |
|    2 |    4 |
|    3 |    3 |
|    4 |    2 |
|    5 |    1 |
+------+------+
5 rows in set (0.01 sec)

MySQL [test]> show create table test;
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| test  | CREATE TABLE `test` (
  `k1` int(11) NOT NULL COMMENT "",
  `k2` int(11) NOT NULL COMMENT "",
  `k3` int(11) NOT NULL COMMENT "",
  `v1` int(11) NULL COMMENT "",
  `v2` int(11) NULL COMMENT "",
  `v3` int(11) NULL COMMENT "",
  `v4` int(11) NULL COMMENT "",
  `v5` int(11) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`k1`, `k2`, `k3`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`k1`, `k2`, `k3`) BUCKETS 1
ORDER BY(`v3`, `v2`)
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"compression" = "LZ4"
); |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.01 sec)
```

* resolve conflict.
  • Loading branch information
Linkerist authored Dec 8, 2022
1 parent 11c5280 commit dc4ae31
Show file tree
Hide file tree
Showing 10 changed files with 478 additions and 31 deletions.
9 changes: 6 additions & 3 deletions be/src/storage/rowset/rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,8 @@ Status HorizontalRowsetWriter::_final_merge() {
ChunkIteratorPtr itr;
// create temporary segment files at first, then merge them and create final segment files if schema change with sorting
if (_context.schema_change_sorting) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
_context.tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) {
itr = new_heap_merge_iterator(seg_iterators);
} else if (_context.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS ||
_context.tablet_schema->keys_type() == KeysType::AGG_KEYS) {
Expand Down Expand Up @@ -677,7 +678,8 @@ Status HorizontalRowsetWriter::_final_merge() {
ChunkIteratorPtr itr;
// create temporary segment files at first, then merge them and create final segment files if schema change with sorting
if (_context.schema_change_sorting) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
_context.tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) {
itr = new_mask_merge_iterator(seg_iterators, mask_buffer.get());
} else if (_context.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS ||
_context.tablet_schema->keys_type() == KeysType::AGG_KEYS) {
Expand Down Expand Up @@ -747,7 +749,8 @@ Status HorizontalRowsetWriter::_final_merge() {
ChunkIteratorPtr itr;
// create temporary segment files at first, then merge them and create final segment files if schema change with sorting
if (_context.schema_change_sorting) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
_context.tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) {
itr = new_heap_merge_iterator(seg_iterators);
} else if (_context.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS ||
_context.tablet_schema->keys_type() == KeysType::AGG_KEYS) {
Expand Down
38 changes: 20 additions & 18 deletions be/src/storage/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,6 @@ int compare_chunk_row(const ChunkRow& lhs, const ChunkRow& rhs) {
return 0;
}

class ChunkSorter {
public:
explicit ChunkSorter(ChunkAllocator* allocator);
virtual ~ChunkSorter();

bool sort(ChunkPtr& chunk, const TabletSharedPtr& new_tablet);

private:
ChunkPtr _swap_chunk;
size_t _max_allocated_rows{0};
};

// TODO: optimize it with vertical sort
class ChunkMerger {
public:
Expand Down Expand Up @@ -115,14 +103,23 @@ bool ChunkSorter::sort(ChunkPtr& chunk, const TabletSharedPtr& new_tablet) {

_swap_chunk->reset();

std::vector<ColumnId> sort_key_idxes;
if (new_schema.sort_key_idxes().empty()) {
int num_key_columns = chunk->schema()->num_key_fields();
for (ColumnId i = 0; i < num_key_columns; ++i) {
sort_key_idxes.push_back(i);
}
} else {
sort_key_idxes = new_schema.sort_key_idxes();
}
Columns key_columns;
int num_key_columns = chunk->schema()->num_key_fields();
for (int i = 0; i < num_key_columns; i++) {
key_columns.push_back(chunk->get_column_by_index(i));
for (const auto sort_key_idx : sort_key_idxes) {
key_columns.push_back(chunk->get_column_by_index(sort_key_idx));
}

SmallPermutation perm = create_small_permutation(chunk->num_rows());
Status st = stable_sort_and_tie_columns(false, key_columns, SortDescs::asc_null_first(num_key_columns), &perm);
Status st =
stable_sort_and_tie_columns(false, key_columns, SortDescs::asc_null_first(sort_key_idxes.size()), &perm);
CHECK(st.ok());
std::vector<uint32_t> selective;
permutate_to_selective(perm, &selective);
Expand Down Expand Up @@ -761,12 +758,17 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}

if (base_tablet->keys_type() == KeysType::PRIMARY_KEYS) {
const auto& base_sort_key_idxes = base_tablet->tablet_schema().sort_key_idxes();
const auto& new_sort_key_idxes = new_tablet->tablet_schema().sort_key_idxes();
if (std::mismatch(new_sort_key_idxes.begin(), new_sort_key_idxes.end(), base_sort_key_idxes.begin()).first !=
new_sort_key_idxes.end()) {
sc_params.sc_directly = !(sc_params.sc_sorting = true);
}
if (sc_params.sc_directly) {
status = new_tablet->updates()->convert_from(base_tablet, request.alter_version,
sc_params.chunk_changer.get());
} else if (sc_params.sc_sorting) {
LOG(WARNING) << "schema change of primary key model do not support sorting.";
status = Status::NotSupported("schema change of primary key model do not support sorting.");
status = new_tablet->updates()->reorder_from(base_tablet, request.alter_version);
} else {
status = new_tablet->updates()->link_from(base_tablet.get(), request.alter_version);
}
Expand Down
15 changes: 14 additions & 1 deletion be/src/storage/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ class ChunkAllocator {
size_t _memory_limitation;
};

class ChunkSorter {
public:
explicit ChunkSorter(ChunkAllocator* allocator);
virtual ~ChunkSorter();

bool sort(ChunkPtr& chunk, const TabletSharedPtr& new_tablet);

private:
ChunkAllocator* _chunk_allocator = nullptr;
ChunkPtr _swap_chunk;
size_t _max_allocated_rows;
};

class SchemaChange {
public:
SchemaChange() = default;
Expand Down Expand Up @@ -116,10 +129,10 @@ class SchemaChangeWithSorting : public SchemaChange {
Status process_v2(TabletReader* reader, RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet,
TabletSharedPtr base_tablet, RowsetSharedPtr rowset) override;

private:
static bool _internal_sorting(std::vector<ChunkPtr>& chunk_arr, RowsetWriter* new_rowset_writer,
TabletSharedPtr tablet);

private:
ChunkChanger* _chunk_changer = nullptr;
size_t _memory_limitation;
ChunkAllocator* _chunk_allocator = nullptr;
Expand Down
215 changes: 215 additions & 0 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "storage/chunk_iterator.h"
#include "storage/compaction_utils.h"
#include "storage/del_vector.h"
#include "storage/merge_iterator.h"
#include "storage/rowset/default_value_column_iterator.h"
#include "storage/rowset/rowset_factory.h"
#include "storage/rowset/rowset_meta_manager.h"
Expand Down Expand Up @@ -2693,6 +2694,220 @@ Status TabletUpdates::_convert_from_base_rowset(const std::shared_ptr<Tablet>& b
return rowset_writer->flush();
}

Status TabletUpdates::reorder_from(const std::shared_ptr<Tablet>& base_tablet, int64_t request_version) {
OlapStopWatch watch;
DCHECK(_tablet.tablet_state() == TABLET_NOTREADY)
<< "tablet state is not TABLET_NOTREADY, reorder_from is not allowed"
<< " tablet_id:" << _tablet.tablet_id() << " tablet_state:" << _tablet.tablet_state();
LOG(INFO) << "reorder_from start tablet:" << _tablet.tablet_id() << " #pending:" << _pending_commits.size()
<< " base_tablet:" << base_tablet->tablet_id() << " request_version:" << request_version;
int64_t max_version = base_tablet->updates()->max_version();
if (max_version < request_version) {
LOG(WARNING) << "reorder_from: base_tablet's max_version:" << max_version
<< " < alter_version:" << request_version << " tablet:" << _tablet.tablet_id()
<< " base_tablet:" << base_tablet->tablet_id();
return Status::InternalError("reorder_from: max_version < request_version");
}
std::vector<RowsetSharedPtr> src_rowsets;
EditVersion version;
Status status = base_tablet->updates()->get_applied_rowsets(request_version, &src_rowsets, &version);
if (!status.ok()) {
LOG(WARNING) << "reorder_from: get base tablet rowsets error tablet:" << base_tablet->tablet_id()
<< " request_version:" << request_version << " reason:" << status;
return status;
}

// disable compaction temporarily when tablet just loaded
_last_compaction_time_ms = UnixMillis();

auto kv_store = _tablet.data_dir()->get_meta();
auto tablet_id = _tablet.tablet_id();
uint32_t next_rowset_id = 0;
std::vector<RowsetLoadInfo> new_rowset_load_infos(src_rowsets.size());

std::vector<vectorized::ChunkPtr> chunk_arr;

vectorized::Schema base_schema = ChunkHelper::convert_schema_to_format_v2(base_tablet->tablet_schema());
vectorized::ChunkSorter chunk_sorter(_chunk_allocator);

OlapReaderStatistics stats;

size_t total_bytes = 0;
size_t total_rows = 0;
size_t total_files = 0;
for (int i = 0; i < src_rowsets.size(); i++) {
const auto& src_rowset = src_rowsets[i];

RowsetReleaseGuard guard(src_rowset->shared_from_this());
auto res = src_rowset->get_segment_iterators2(base_schema, base_tablet->data_dir()->get_meta(), version.major(),
&stats);
if (!res.ok()) {
return res.status();
}
const auto& seg_iterators = res.value();

RowsetId rid = StorageEngine::instance()->next_rowset_id();

RowsetWriterContext writer_context;
writer_context.rowset_id = rid;
writer_context.tablet_uid = _tablet.tablet_uid();
writer_context.tablet_id = _tablet.tablet_id();
writer_context.partition_id = _tablet.partition_id();
writer_context.tablet_schema_hash = _tablet.schema_hash();
writer_context.rowset_path_prefix = _tablet.schema_hash_path();
writer_context.tablet_schema = &_tablet.tablet_schema();
writer_context.rowset_state = VISIBLE;
writer_context.version = src_rowset->version();
writer_context.segments_overlap = src_rowset->rowset_meta()->segments_overlap();
writer_context.schema_change_sorting = true;

std::unique_ptr<RowsetWriter> rowset_writer;
status = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
if (!status.ok()) {
LOG(INFO) << "build rowset writer failed";
return Status::InternalError("build rowset writer failed");
}

vectorized::ChunkPtr base_chunk = ChunkHelper::new_chunk(base_schema, config::vector_chunk_size);

vectorized::Schema new_schema = ChunkHelper::convert_schema_to_format_v2(_tablet.tablet_schema());
vectorized::ChunkPtr new_chunk = ChunkHelper::new_chunk(new_schema, config::vector_chunk_size);

for (auto& seg_iterator : seg_iterators) {
if (seg_iterator.get() == nullptr) {
continue;
}
while (true) {
base_chunk->reset();
Status status = seg_iterator->get_next(base_chunk.get());
if (!status.ok()) {
if (status.is_end_of_file()) {
break;
} else {
std::stringstream ss;
ss << "segment iterator failed to get next chunk, status is:" << status.to_string();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
}

for (auto i = 0; i < base_chunk->num_columns(); ++i) {
base_chunk->get_column_by_index(i)->swap_column(*new_chunk->get_column_by_index(i));
}

total_bytes += static_cast<double>(new_chunk->memory_usage());
total_rows += static_cast<double>(new_chunk->num_rows());

if (new_chunk->num_rows() > 0) {
if (!chunk_sorter.sort(new_chunk, std::static_pointer_cast<Tablet>(_tablet.shared_from_this()))) {
LOG(WARNING) << "chunk data sort failed";
return Status::InternalError("chunk data sort failed");
}
}
chunk_arr.push_back(new_chunk);
}
}

if (!chunk_arr.empty()) {
if (!vectorized::SchemaChangeWithSorting::_internal_sorting(
chunk_arr, rowset_writer.get(), std::static_pointer_cast<Tablet>(_tablet.shared_from_this()))) {
return Status::InternalError("failed to sorting internally.");
}
}

status = rowset_writer->flush();
if (!status.ok()) {
LOG(WARNING) << "failed to convert from base rowset, exit alter process";
return status;
}

auto new_rowset = rowset_writer->build();
if (!new_rowset.ok()) return new_rowset.status();

auto& new_rowset_load_info = new_rowset_load_infos[i];
new_rowset_load_info.num_segments = (*new_rowset)->num_segments();
new_rowset_load_info.rowset_id = next_rowset_id;

auto& rowset_meta_pb = new_rowset_load_info.rowset_meta_pb;
(*new_rowset)->rowset_meta()->to_rowset_pb(&rowset_meta_pb);
rowset_meta_pb.set_rowset_seg_id(new_rowset_load_info.rowset_id);
rowset_meta_pb.set_rowset_id(rid.to_string());

next_rowset_id += std::max(1U, (uint32_t)new_rowset_load_info.num_segments);

total_bytes += rowset_meta_pb.total_disk_size();
total_rows += rowset_meta_pb.num_rows();
total_files += rowset_meta_pb.num_segments() + rowset_meta_pb.num_delete_files();
}

TabletMetaPB meta_pb;
_tablet.tablet_meta()->to_meta_pb(&meta_pb);
meta_pb.set_tablet_state(TabletStatePB::PB_RUNNING);
TabletUpdatesPB* updates_pb = meta_pb.mutable_updates();
updates_pb->clear_versions();
auto version_pb = updates_pb->add_versions();
version_pb->mutable_version()->set_major(version.major());
version_pb->mutable_version()->set_minor(version.minor());
int64_t creation_time = time(nullptr);
version_pb->set_creation_time(creation_time);
for (auto& new_rowset_load_info : new_rowset_load_infos) {
version_pb->mutable_rowsets()->Add(new_rowset_load_info.rowset_id);
}
version_pb->set_rowsetid_add(next_rowset_id);
auto apply_version_pb = updates_pb->mutable_apply_version();
apply_version_pb->set_major(version.major());
apply_version_pb->set_minor(version.minor());
updates_pb->set_next_log_id(1);
updates_pb->set_next_rowset_id(next_rowset_id);

// delete old meta & write new meta
auto data_dir = _tablet.data_dir();
rocksdb::WriteBatch wb;
RETURN_IF_ERROR(TabletMetaManager::clear_log(data_dir, &wb, tablet_id));
RETURN_IF_ERROR(TabletMetaManager::clear_rowset(data_dir, &wb, tablet_id));
RETURN_IF_ERROR(TabletMetaManager::clear_del_vector(data_dir, &wb, tablet_id));
RETURN_IF_ERROR(TabletMetaManager::clear_persistent_index(data_dir, &wb, tablet_id));
// do not clear pending rowsets, because these pending rowsets should be committed after schemachange is done
RETURN_IF_ERROR(TabletMetaManager::put_tablet_meta(data_dir, &wb, meta_pb));
DelVector delvec;
for (const auto& new_rowset_load_info : new_rowset_load_infos) {
RETURN_IF_ERROR(
TabletMetaManager::put_rowset_meta(data_dir, &wb, tablet_id, new_rowset_load_info.rowset_meta_pb));
for (int j = 0; j < new_rowset_load_info.num_segments; j++) {
RETURN_IF_ERROR(TabletMetaManager::put_del_vector(data_dir, &wb, tablet_id,
new_rowset_load_info.rowset_id + j, delvec));
}
}

std::unique_lock wrlock(_tablet.get_header_lock());
status = kv_store->write_batch(&wb);
if (!status.ok()) {
LOG(WARNING) << "Fail to delete old meta and write new meta" << tablet_id << ": " << status;
return Status::InternalError("Fail to delete old meta and write new meta");
}

auto update_manager = StorageEngine::instance()->update_manager();
auto index_entry = update_manager->index_cache().get_or_create(tablet_id);
index_entry->update_expire_time(MonotonicMillis() + update_manager->get_cache_expire_ms());
auto& index = index_entry->value();
index.unload();
update_manager->index_cache().release(index_entry);
// 4. load from new meta
status = _load_from_pb(*updates_pb);
if (!status.ok()) {
LOG(WARNING) << "_load_from_pb failed tablet_id:" << tablet_id << " " << status;
return status;
}

_tablet.set_tablet_state(TabletState::TABLET_RUNNING);
LOG(INFO) << "reorder_from finish tablet:" << _tablet.tablet_id() << " version:" << this->max_version()
<< " base tablet:" << base_tablet->tablet_id() << " #pending:" << _pending_commits.size()
<< " time:" << watch.get_elapse_second() << "s"
<< " #column:" << _tablet.tablet_schema().num_columns() << " #rowset:" << src_rowsets.size()
<< " #file:" << total_files << " #row:" << total_rows << " bytes:" << total_bytes;
return Status::OK();
}

void TabletUpdates::_remove_unused_rowsets() {
size_t removed = 0;
std::vector<RowsetSharedPtr> skipped_rowsets;
Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/tablet_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Schema;
class TabletReader;
class ChunkChanger;
class SegmentIterator;
class ChunkAllocator;
} // namespace vectorized

struct CompactionInfo {
Expand Down Expand Up @@ -163,6 +164,8 @@ class TabletUpdates {
Status convert_from(const std::shared_ptr<Tablet>& base_tablet, int64_t request_version,
vectorized::ChunkChanger* chunk_changer);

Status reorder_from(const std::shared_ptr<Tablet>& base_tablet, int64_t request_version);

Status load_snapshot(const SnapshotMeta& snapshot_meta, bool restore_from_backup = false);

Status get_latest_applied_version(EditVersion* latest_applied_version);
Expand Down Expand Up @@ -408,6 +411,8 @@ class TabletUpdates {
std::atomic<bool> _error{false};
std::string _error_msg;

vectorized::ChunkAllocator* _chunk_allocator = nullptr;

TabletUpdates(const TabletUpdates&) = delete;
const TabletUpdates& operator=(const TabletUpdates&) = delete;
};
Expand Down
Loading

0 comments on commit dc4ae31

Please sign in to comment.