Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support assign sort key of primary key model by schema change reorder. #13642

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions be/src/storage/rowset/rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,8 @@ Status HorizontalRowsetWriter::_final_merge() {
ChunkIteratorPtr itr;
// create temporary segment files at first, then merge them and create final segment files if schema change with sorting
if (_context.schema_change_sorting) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
Linkerist marked this conversation as resolved.
Show resolved Hide resolved
_context.tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) {
itr = new_heap_merge_iterator(seg_iterators);
} else if (_context.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS ||
_context.tablet_schema->keys_type() == KeysType::AGG_KEYS) {
Expand Down Expand Up @@ -670,7 +671,8 @@ Status HorizontalRowsetWriter::_final_merge() {
ChunkIteratorPtr itr;
// create temporary segment files at first, then merge them and create final segment files if schema change with sorting
if (_context.schema_change_sorting) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
_context.tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) {
itr = new_mask_merge_iterator(seg_iterators, mask_buffer.get());
} else if (_context.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS ||
_context.tablet_schema->keys_type() == KeysType::AGG_KEYS) {
Expand Down Expand Up @@ -740,7 +742,8 @@ Status HorizontalRowsetWriter::_final_merge() {
ChunkIteratorPtr itr;
// create temporary segment files at first, then merge them and create final segment files if schema change with sorting
if (_context.schema_change_sorting) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS) {
if (_context.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
_context.tablet_schema->keys_type() == KeysType::PRIMARY_KEYS) {
itr = new_heap_merge_iterator(seg_iterators);
} else if (_context.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS ||
_context.tablet_schema->keys_type() == KeysType::AGG_KEYS) {
Expand Down
38 changes: 20 additions & 18 deletions be/src/storage/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,6 @@ int compare_chunk_row(const ChunkRow& lhs, const ChunkRow& rhs) {
return 0;
}

class ChunkSorter {
public:
explicit ChunkSorter(ChunkAllocator* allocator);
virtual ~ChunkSorter();

bool sort(ChunkPtr& chunk, const TabletSharedPtr& new_tablet);

private:
ChunkPtr _swap_chunk;
size_t _max_allocated_rows{0};
};

// TODO: optimize it with vertical sort
class ChunkMerger {
public:
Expand Down Expand Up @@ -116,14 +104,23 @@ bool ChunkSorter::sort(ChunkPtr& chunk, const TabletSharedPtr& new_tablet) {

_swap_chunk->reset();

std::vector<ColumnId> sort_key_idxes;
if (new_schema.sort_key_idxes().empty()) {
int num_key_columns = chunk->schema()->num_key_fields();
for (ColumnId i = 0; i < num_key_columns; ++i) {
sort_key_idxes.push_back(i);
}
} else {
sort_key_idxes = new_schema.sort_key_idxes();
}
Columns key_columns;
int num_key_columns = chunk->schema()->num_key_fields();
for (int i = 0; i < num_key_columns; i++) {
key_columns.push_back(chunk->get_column_by_index(i));
for (const auto sort_key_idx : sort_key_idxes) {
key_columns.push_back(chunk->get_column_by_index(sort_key_idx));
}

SmallPermutation perm = create_small_permutation(chunk->num_rows());
Status st = stable_sort_and_tie_columns(false, key_columns, SortDescs::asc_null_first(num_key_columns), &perm);
Status st =
stable_sort_and_tie_columns(false, key_columns, SortDescs::asc_null_first(sort_key_idxes.size()), &perm);
CHECK(st.ok());
std::vector<uint32_t> selective;
permutate_to_selective(perm, &selective);
Expand Down Expand Up @@ -762,12 +759,17 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}

if (base_tablet->keys_type() == KeysType::PRIMARY_KEYS) {
const auto& base_sort_key_idxes = base_tablet->tablet_schema().sort_key_idxes();
const auto& new_sort_key_idxes = new_tablet->tablet_schema().sort_key_idxes();
if (std::mismatch(new_sort_key_idxes.begin(), new_sort_key_idxes.end(), base_sort_key_idxes.begin()).first !=
new_sort_key_idxes.end()) {
sc_params.sc_directly = !(sc_params.sc_sorting = true);
}
if (sc_params.sc_directly) {
status = new_tablet->updates()->convert_from(base_tablet, request.alter_version,
sc_params.chunk_changer.get());
} else if (sc_params.sc_sorting) {
LOG(WARNING) << "schema change of primary key model do not support sorting.";
status = Status::NotSupported("schema change of primary key model do not support sorting.");
status = new_tablet->updates()->reorder_from(base_tablet, request.alter_version);
} else {
status = new_tablet->updates()->link_from(base_tablet.get(), request.alter_version);
}
Expand Down
15 changes: 14 additions & 1 deletion be/src/storage/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ class ChunkAllocator {
size_t _memory_limitation;
};

class ChunkSorter {
public:
explicit ChunkSorter(ChunkAllocator* allocator);
virtual ~ChunkSorter();

bool sort(ChunkPtr& chunk, const TabletSharedPtr& new_tablet);

private:
ChunkAllocator* _chunk_allocator = nullptr;
ChunkPtr _swap_chunk;
size_t _max_allocated_rows;
};

class SchemaChange {
public:
SchemaChange() = default;
Expand Down Expand Up @@ -116,10 +129,10 @@ class SchemaChangeWithSorting : public SchemaChange {
Status process_v2(TabletReader* reader, RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet,
TabletSharedPtr base_tablet, RowsetSharedPtr rowset) override;

private:
static bool _internal_sorting(std::vector<ChunkPtr>& chunk_arr, RowsetWriter* new_rowset_writer,
TabletSharedPtr tablet);

private:
ChunkChanger* _chunk_changer = nullptr;
size_t _memory_limitation;
ChunkAllocator* _chunk_allocator = nullptr;
Expand Down
215 changes: 215 additions & 0 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "storage/chunk_iterator.h"
#include "storage/compaction_utils.h"
#include "storage/del_vector.h"
#include "storage/merge_iterator.h"
#include "storage/rowset/default_value_column_iterator.h"
#include "storage/rowset/rowset_factory.h"
#include "storage/rowset/rowset_meta_manager.h"
Expand Down Expand Up @@ -2690,6 +2691,220 @@ Status TabletUpdates::_convert_from_base_rowset(const std::shared_ptr<Tablet>& b
return rowset_writer->flush();
}

Status TabletUpdates::reorder_from(const std::shared_ptr<Tablet>& base_tablet, int64_t request_version) {
OlapStopWatch watch;
DCHECK(_tablet.tablet_state() == TABLET_NOTREADY)
<< "tablet state is not TABLET_NOTREADY, reorder_from is not allowed"
<< " tablet_id:" << _tablet.tablet_id() << " tablet_state:" << _tablet.tablet_state();
LOG(INFO) << "reorder_from start tablet:" << _tablet.tablet_id() << " #pending:" << _pending_commits.size()
<< " base_tablet:" << base_tablet->tablet_id() << " request_version:" << request_version;
int64_t max_version = base_tablet->updates()->max_version();
if (max_version < request_version) {
LOG(WARNING) << "reorder_from: base_tablet's max_version:" << max_version
<< " < alter_version:" << request_version << " tablet:" << _tablet.tablet_id()
<< " base_tablet:" << base_tablet->tablet_id();
return Status::InternalError("reorder_from: max_version < request_version");
}
std::vector<RowsetSharedPtr> src_rowsets;
EditVersion version;
Status status = base_tablet->updates()->get_applied_rowsets(request_version, &src_rowsets, &version);
if (!status.ok()) {
LOG(WARNING) << "reorder_from: get base tablet rowsets error tablet:" << base_tablet->tablet_id()
<< " request_version:" << request_version << " reason:" << status;
return status;
}

// disable compaction temporarily when tablet just loaded
_last_compaction_time_ms = UnixMillis();

auto kv_store = _tablet.data_dir()->get_meta();
auto tablet_id = _tablet.tablet_id();
uint32_t next_rowset_id = 0;
std::vector<RowsetLoadInfo> new_rowset_load_infos(src_rowsets.size());

std::vector<vectorized::ChunkPtr> chunk_arr;

vectorized::VectorizedSchema base_schema = ChunkHelper::convert_schema_to_format_v2(base_tablet->tablet_schema());
vectorized::ChunkSorter chunk_sorter(_chunk_allocator);

OlapReaderStatistics stats;

size_t total_bytes = 0;
size_t total_rows = 0;
size_t total_files = 0;
for (int i = 0; i < src_rowsets.size(); i++) {
const auto& src_rowset = src_rowsets[i];

RowsetReleaseGuard guard(src_rowset->shared_from_this());
auto res = src_rowset->get_segment_iterators2(base_schema, base_tablet->data_dir()->get_meta(), version.major(),
&stats);
if (!res.ok()) {
return res.status();
}
const auto& seg_iterators = res.value();

RowsetId rid = StorageEngine::instance()->next_rowset_id();

RowsetWriterContext writer_context;
writer_context.rowset_id = rid;
writer_context.tablet_uid = _tablet.tablet_uid();
writer_context.tablet_id = _tablet.tablet_id();
writer_context.partition_id = _tablet.partition_id();
writer_context.tablet_schema_hash = _tablet.schema_hash();
writer_context.rowset_path_prefix = _tablet.schema_hash_path();
writer_context.tablet_schema = &_tablet.tablet_schema();
writer_context.rowset_state = VISIBLE;
writer_context.version = src_rowset->version();
writer_context.segments_overlap = src_rowset->rowset_meta()->segments_overlap();
writer_context.schema_change_sorting = true;

std::unique_ptr<RowsetWriter> rowset_writer;
status = RowsetFactory::create_rowset_writer(writer_context, &rowset_writer);
if (!status.ok()) {
LOG(INFO) << "build rowset writer failed";
return Status::InternalError("build rowset writer failed");
}

vectorized::ChunkPtr base_chunk = ChunkHelper::new_chunk(base_schema, config::vector_chunk_size);

vectorized::VectorizedSchema new_schema = ChunkHelper::convert_schema_to_format_v2(_tablet.tablet_schema());
vectorized::ChunkPtr new_chunk = ChunkHelper::new_chunk(new_schema, config::vector_chunk_size);

for (auto& seg_iterator : seg_iterators) {
if (seg_iterator.get() == nullptr) {
continue;
}
while (true) {
base_chunk->reset();
Status status = seg_iterator->get_next(base_chunk.get());
if (!status.ok()) {
if (status.is_end_of_file()) {
break;
} else {
std::stringstream ss;
ss << "segment iterator failed to get next chunk, status is:" << status.to_string();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
}

for (auto i = 0; i < base_chunk->num_columns(); ++i) {
base_chunk->get_column_by_index(i)->swap_column(*new_chunk->get_column_by_index(i));
}

total_bytes += static_cast<double>(new_chunk->memory_usage());
total_rows += static_cast<double>(new_chunk->num_rows());

if (new_chunk->num_rows() > 0) {
if (!chunk_sorter.sort(new_chunk, std::static_pointer_cast<Tablet>(_tablet.shared_from_this()))) {
LOG(WARNING) << "chunk data sort failed";
return Status::InternalError("chunk data sort failed");
}
}
chunk_arr.push_back(new_chunk);
}
}

if (!chunk_arr.empty()) {
if (!vectorized::SchemaChangeWithSorting::_internal_sorting(
chunk_arr, rowset_writer.get(), std::static_pointer_cast<Tablet>(_tablet.shared_from_this()))) {
return Status::InternalError("failed to sorting internally.");
}
}

status = rowset_writer->flush();
if (!status.ok()) {
LOG(WARNING) << "failed to convert from base rowset, exit alter process";
return status;
}

auto new_rowset = rowset_writer->build();
if (!new_rowset.ok()) return new_rowset.status();

auto& new_rowset_load_info = new_rowset_load_infos[i];
new_rowset_load_info.num_segments = (*new_rowset)->num_segments();
new_rowset_load_info.rowset_id = next_rowset_id;

auto& rowset_meta_pb = new_rowset_load_info.rowset_meta_pb;
(*new_rowset)->rowset_meta()->to_rowset_pb(&rowset_meta_pb);
rowset_meta_pb.set_rowset_seg_id(new_rowset_load_info.rowset_id);
rowset_meta_pb.set_rowset_id(rid.to_string());

next_rowset_id += std::max(1U, (uint32_t)new_rowset_load_info.num_segments);

total_bytes += rowset_meta_pb.total_disk_size();
total_rows += rowset_meta_pb.num_rows();
total_files += rowset_meta_pb.num_segments() + rowset_meta_pb.num_delete_files();
}

TabletMetaPB meta_pb;
_tablet.tablet_meta()->to_meta_pb(&meta_pb);
meta_pb.set_tablet_state(TabletStatePB::PB_RUNNING);
TabletUpdatesPB* updates_pb = meta_pb.mutable_updates();
updates_pb->clear_versions();
auto version_pb = updates_pb->add_versions();
version_pb->mutable_version()->set_major(version.major());
version_pb->mutable_version()->set_minor(version.minor());
int64_t creation_time = time(nullptr);
version_pb->set_creation_time(creation_time);
for (auto& new_rowset_load_info : new_rowset_load_infos) {
version_pb->mutable_rowsets()->Add(new_rowset_load_info.rowset_id);
}
version_pb->set_rowsetid_add(next_rowset_id);
auto apply_version_pb = updates_pb->mutable_apply_version();
apply_version_pb->set_major(version.major());
apply_version_pb->set_minor(version.minor());
updates_pb->set_next_log_id(1);
updates_pb->set_next_rowset_id(next_rowset_id);

// delete old meta & write new meta
auto data_dir = _tablet.data_dir();
rocksdb::WriteBatch wb;
RETURN_IF_ERROR(TabletMetaManager::clear_log(data_dir, &wb, tablet_id));
RETURN_IF_ERROR(TabletMetaManager::clear_rowset(data_dir, &wb, tablet_id));
RETURN_IF_ERROR(TabletMetaManager::clear_del_vector(data_dir, &wb, tablet_id));
RETURN_IF_ERROR(TabletMetaManager::clear_persistent_index(data_dir, &wb, tablet_id));
// do not clear pending rowsets, because these pending rowsets should be committed after schemachange is done
RETURN_IF_ERROR(TabletMetaManager::put_tablet_meta(data_dir, &wb, meta_pb));
DelVector delvec;
for (const auto& new_rowset_load_info : new_rowset_load_infos) {
RETURN_IF_ERROR(
TabletMetaManager::put_rowset_meta(data_dir, &wb, tablet_id, new_rowset_load_info.rowset_meta_pb));
for (int j = 0; j < new_rowset_load_info.num_segments; j++) {
RETURN_IF_ERROR(TabletMetaManager::put_del_vector(data_dir, &wb, tablet_id,
new_rowset_load_info.rowset_id + j, delvec));
}
}

std::unique_lock wrlock(_tablet.get_header_lock());
status = kv_store->write_batch(&wb);
if (!status.ok()) {
LOG(WARNING) << "Fail to delete old meta and write new meta" << tablet_id << ": " << status;
return Status::InternalError("Fail to delete old meta and write new meta");
}

auto update_manager = StorageEngine::instance()->update_manager();
auto index_entry = update_manager->index_cache().get_or_create(tablet_id);
index_entry->update_expire_time(MonotonicMillis() + update_manager->get_cache_expire_ms());
auto& index = index_entry->value();
index.unload();
update_manager->index_cache().release(index_entry);
// 4. load from new meta
status = _load_from_pb(*updates_pb);
if (!status.ok()) {
LOG(WARNING) << "_load_from_pb failed tablet_id:" << tablet_id << " " << status;
return status;
}

_tablet.set_tablet_state(TabletState::TABLET_RUNNING);
LOG(INFO) << "reorder_from finish tablet:" << _tablet.tablet_id() << " version:" << this->max_version()
<< " base tablet:" << base_tablet->tablet_id() << " #pending:" << _pending_commits.size()
<< " time:" << watch.get_elapse_second() << "s"
<< " #column:" << _tablet.tablet_schema().num_columns() << " #rowset:" << src_rowsets.size()
<< " #file:" << total_files << " #row:" << total_rows << " bytes:" << total_bytes;
return Status::OK();
}

void TabletUpdates::_remove_unused_rowsets() {
size_t removed = 0;
std::vector<RowsetSharedPtr> skipped_rowsets;
Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/tablet_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class VectorizedSchema;
class TabletReader;
class ChunkChanger;
class SegmentIterator;
class ChunkAllocator;
} // namespace vectorized

struct CompactionInfo {
Expand Down Expand Up @@ -163,6 +164,8 @@ class TabletUpdates {
Status convert_from(const std::shared_ptr<Tablet>& base_tablet, int64_t request_version,
vectorized::ChunkChanger* chunk_changer);

Status reorder_from(const std::shared_ptr<Tablet>& base_tablet, int64_t request_version);

Status load_snapshot(const SnapshotMeta& snapshot_meta, bool restore_from_backup = false);

Status get_latest_applied_version(EditVersion* latest_applied_version);
Expand Down Expand Up @@ -408,6 +411,8 @@ class TabletUpdates {
std::atomic<bool> _error{false};
std::string _error_msg;

vectorized::ChunkAllocator* _chunk_allocator = nullptr;

TabletUpdates(const TabletUpdates&) = delete;
const TabletUpdates& operator=(const TabletUpdates&) = delete;
};
Expand Down
Loading