Skip to content

Commit

Permalink
[Feature] Support sort key be able to nullable. (StarRocks#15641)
Browse files Browse the repository at this point in the history
Support sort key be able to nullable.

Signed-off-by: zhangqiang <[email protected]>
  • Loading branch information
Linkerist authored and sevev committed Aug 15, 2023
1 parent 9c92660 commit 2bfaa76
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 8 deletions.
45 changes: 37 additions & 8 deletions be/src/storage/primary_key_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@

namespace starrocks {

constexpr uint8_t SORT_KEY_NULL_FIRST_MARKER = 0x00;
constexpr uint8_t SORT_KEY_NORMAL_MARKER = 0x01;

template <class UT>
UT to_bigendian(UT v);

Expand Down Expand Up @@ -226,9 +229,6 @@ inline Status decode_slice(Slice* src, string* dest, bool is_last) {
}

bool PrimaryKeyEncoder::is_supported(const vectorized::Field& f) {
if (f.is_nullable()) {
return false;
}
switch (f.type()->type()) {
case OLAP_FIELD_TYPE_BOOL:
case OLAP_FIELD_TYPE_TINYINT:
Expand Down Expand Up @@ -260,6 +260,9 @@ FieldType PrimaryKeyEncoder::encoded_primary_key_type(const vectorized::Schema&
return OLAP_FIELD_TYPE_NONE;
}
if (key_idxes.size() == 1) {
if (!schema.sort_key_idxes().empty() && schema.field(schema.sort_key_idxes()[0])->is_nullable()) {
return TYPE_VARCHAR;
}
return schema.field(key_idxes[0])->type()->type();
}
return OLAP_FIELD_TYPE_VARCHAR;
Expand Down Expand Up @@ -443,13 +446,39 @@ void PrimaryKeyEncoder::encode_sort_key(const vectorized::Schema& schema, const
prepare_ops_datas(schema, schema.sort_key_idxes(), chunk, &ops, &datas);
auto& bdest = down_cast<vectorized::BinaryColumn&>(*dest);
bdest.reserve(bdest.size() + len);
std::vector<std::shared_ptr<Column>> cols(ncol);
for (int i = 0; i < ncol; i++) {
cols[i] = chunk.get_column_by_index(schema.sort_key_idxes()[i]);
}
bool has_nullable_sort_key = false;
for (int i = 0; i < ncol; i++) {
if (schema.field(schema.sort_key_idxes()[i])->is_nullable()) {
has_nullable_sort_key = true;
break;
}
}
std::string buff;
for (size_t i = 0; i < len; i++) {
buff.clear();
for (int j = 0; j < ncol; j++) {
ops[j](datas[j], offset + i, &buff);
if (!has_nullable_sort_key) {
for (size_t i = 0; i < len; i++) {
buff.clear();
for (int j = 0; j < ncol; j++) {
ops[j](datas[j], offset + i, &buff);
}
bdest.append(buff);
}
} else {
for (size_t i = 0; i < len; i++) {
buff.clear();
for (int j = 0; j < ncol; j++) {
if (cols[j]->is_null(i)) {
buff.push_back(SORT_KEY_NULL_FIRST_MARKER);
} else {
buff.push_back(SORT_KEY_NORMAL_MARKER);
ops[j](datas[j], offset + i, &buff);
}
}
bdest.append(buff);
}
bdest.append(buff);
}
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/rowset_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <memory>
#include <queue>

#include "column/binary_column.h"
#include "gutil/stl_util.h"
#include "storage/chunk_helper.h"
#include "storage/empty_iterator.h"
Expand Down Expand Up @@ -263,6 +264,8 @@ class RowsetMergerImpl : public RowsetMerger {
if (!PrimaryKeyEncoder::create_column(schema, &sort_column, schema.sort_key_idxes()).ok()) {
LOG(FATAL) << "create column for primary key encoder failed";
}
} else if (schema.sort_key_idxes().size() == 1 && schema.field(schema.sort_key_idxes()[0])->is_nullable()) {
sort_column = std::make_unique<BinaryColumn>();
}
std::vector<std::unique_ptr<vector<RowSourceMask>>> rowsets_source_masks;
uint16_t order = 0;
Expand Down
201 changes: 201 additions & 0 deletions be/test/storage/tablet_updates_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,43 @@ class TabletUpdatesTest : public testing::Test {
return *writer->build();
}

RowsetSharedPtr create_nullable_sort_key_rowset(const TabletSharedPtr& tablet,
const vector<vector<int64_t>>& all_cols) {
RowsetWriterContext writer_context;
RowsetId rowset_id = StorageEngine::instance()->next_rowset_id();
writer_context.rowset_id = rowset_id;
writer_context.tablet_id = tablet->tablet_id();
writer_context.tablet_schema_hash = tablet->schema_hash();
writer_context.partition_id = 0;
writer_context.rowset_path_prefix = tablet->schema_hash_path();
writer_context.rowset_state = COMMITTED;
writer_context.tablet_schema = &tablet->tablet_schema();
writer_context.version.first = 0;
writer_context.version.second = 0;
writer_context.segments_overlap = NONOVERLAPPING;
std::unique_ptr<RowsetWriter> writer;
EXPECT_TRUE(RowsetFactory::create_rowset_writer(writer_context, &writer).ok());
auto schema = ChunkHelper::convert_schema_to_format_v2(tablet->tablet_schema());
const auto keys_size = all_cols[0].size();
auto chunk = ChunkHelper::new_chunk(schema, keys_size);
auto& cols = chunk->columns();
for (auto i = 0; i < keys_size; ++i) {
auto append_datum_func = []<typename T>(std::shared_ptr<Column> col, T val) {
if (val == -1) {
col->append_nulls(1);
} else {
col->append_datum(Datum(val));
}
};
append_datum_func(cols[0], static_cast<int64_t>(all_cols[0][i]));
append_datum_func(cols[1], static_cast<int16_t>(all_cols[1][i]));
append_datum_func(cols[2], static_cast<int32_t>(all_cols[2][i]));
}

CHECK_OK(writer->flush_chunk(*chunk));
return *writer->build();
}

TabletSharedPtr create_tablet(int64_t tablet_id, int32_t schema_hash, bool multi_column_pk = false) {
TCreateTabletReq request;
request.tablet_id = tablet_id;
Expand Down Expand Up @@ -366,6 +403,41 @@ class TabletUpdatesTest : public testing::Test {
return StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, false);
}

TabletSharedPtr create_tablet_with_nullable_sort_key(int64_t tablet_id, int32_t schema_hash,
std::vector<int32_t> sort_key_idxes) {
TCreateTabletReq request;
request.tablet_id = tablet_id;
request.__set_version(1);
request.tablet_schema.schema_hash = schema_hash;
request.tablet_schema.short_key_column_count = 1;
request.tablet_schema.keys_type = TKeysType::PRIMARY_KEYS;
request.tablet_schema.storage_type = TStorageType::COLUMN;
request.tablet_schema.sort_key_idxes = sort_key_idxes;

TColumn k1;
k1.column_name = "pk";
k1.__set_is_key(true);
k1.column_type.type = TPrimitiveType::BIGINT;
request.tablet_schema.columns.push_back(k1);

TColumn k2;
k2.column_name = "v1";
k2.__set_is_key(false);
k2.__set_is_allow_null(true);
k2.column_type.type = TPrimitiveType::SMALLINT;
request.tablet_schema.columns.push_back(k2);

TColumn k3;
k3.column_name = "v2";
k3.__set_is_key(false);
k3.__set_is_allow_null(true);
k3.column_type.type = TPrimitiveType::INT;
request.tablet_schema.columns.push_back(k3);
auto st = StorageEngine::instance()->create_tablet(request);
CHECK(st.ok()) << st.to_string();
return StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, false);
}

TabletSharedPtr create_tablet2(int64_t tablet_id, int32_t schema_hash) {
TCreateTabletReq request;
request.tablet_id = tablet_id;
Expand Down Expand Up @@ -876,6 +948,48 @@ static ssize_t read_tablet_and_compare_sort_key_error_encode_case(const TabletSh
return count;
}

static ssize_t read_tablet_and_compare_nullable_sort_key(const TabletSharedPtr& tablet, int64_t version,
const vector<vector<int64_t>>& all_cols) {
VectorizedSchema schema = ChunkHelper::convert_schema_to_format_v2(tablet->tablet_schema());
TabletReader reader(tablet, Version(0, version), schema);
auto iter = create_tablet_iterator(reader, schema);
if (iter == nullptr) {
return -1;
}
const auto keys_size = all_cols[0].size();
auto full_chunk = ChunkHelper::new_chunk(iter->schema(), keys_size);
auto& cols = full_chunk->columns();
for (auto i = 0; i < keys_size; ++i) {
auto append_datum_func = []<typename T>(std::shared_ptr<Column> col, T val) {
if (val == -1) {
col->append_nulls(1);
} else {
col->append_datum(Datum(val));
}
};
append_datum_func(cols[0], static_cast<int64_t>(all_cols[0][i]));
append_datum_func(cols[1], static_cast<int16_t>(all_cols[1][i]));
append_datum_func(cols[2], static_cast<int32_t>(all_cols[2][i]));
}
auto chunk = ChunkHelper::new_chunk(iter->schema(), 100);
size_t count = 0;
while (true) {
auto st = iter->get_next(chunk.get());
if (st.is_end_of_file()) {
break;
} else if (st.ok()) {
for (auto i = 0; i < chunk->num_rows(); i++) {
EXPECT_EQ(full_chunk->get(count + i).compare(iter->schema(), chunk->get(i)), 0);
}
count += chunk->num_rows();
chunk->reset();
} else {
return -1;
}
}
return count;
}

void TabletUpdatesTest::test_writeread(bool enable_persistent_index) {
srand(GetCurrentTimeMicros());
_tablet = create_tablet(rand(), rand());
Expand Down Expand Up @@ -1637,6 +1751,93 @@ TEST_F(TabletUpdatesTest, horizontal_compaction_with_sort_key_error_encode_case)
EXPECT_EQ(best_tablet->updates()->get_compaction_score(), -1);
}

TEST_F(TabletUpdatesTest, horizontal_compaction_with_nullable_sort_key) {
auto orig = config::vertical_compaction_max_columns_per_group;
config::vertical_compaction_max_columns_per_group = 5;
DeferOp unset_config([&] { config::vertical_compaction_max_columns_per_group = orig; });

{
int N = 10;
srand(GetCurrentTimeMicros());
_tablet = create_tablet_with_nullable_sort_key(rand(), rand(), {2});
std::vector<int64_t> keys;
for (int i = 0; i < N; i++) {
keys.push_back(i);
}
ASSERT_TRUE(_tablet->rowset_commit(2, create_nullable_sort_key_rowset(_tablet, {{1, 2, 3, 4, 5, 6, 7, 8},
{8, 7, 6, 5, 4, 3, 2, 1},
{-1, -1, -1, -1, 5, 6, 7, 8}}))
.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ASSERT_TRUE(
_tablet->rowset_commit(3, create_nullable_sort_key_rowset(_tablet, {{5, 6, 7, 8, 9, 10, 11, 12},
{12, 11, 10, 9, 8, 7, 6, 5},
{-1, -1, -1, -1, 9, 10, 11, 12}}))
.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ASSERT_EQ(_tablet->updates()->version_history_count(), 3);
ASSERT_EQ(12, read_tablet(_tablet, 3));
const auto& best_tablet = StorageEngine::instance()->tablet_manager()->find_best_tablet_to_do_update_compaction(
_tablet->data_dir());
EXPECT_EQ(best_tablet->tablet_id(), _tablet->tablet_id());
EXPECT_GT(best_tablet->updates()->get_compaction_score(), 0);
ASSERT_TRUE(best_tablet->updates()->compaction(_compaction_mem_tracker.get()).ok());
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_EQ(12, read_tablet_and_compare_nullable_sort_key(best_tablet, 3,
{{1, 5, 2, 6, 3, 7, 4, 8, 9, 10, 11, 12},
{8, 12, 7, 11, 6, 10, 5, 9, 8, 7, 6, 5},
{-1, -1, -1, -1, -1, -1, -1, -1, 9, 10, 11, 12}}));
ASSERT_EQ(best_tablet->updates()->num_rowsets(), 1);
ASSERT_EQ(best_tablet->updates()->version_history_count(), 4);
// the time interval is not enough after last compaction
EXPECT_EQ(best_tablet->updates()->get_compaction_score(), -1);
}

{
int N = 10;
srand(GetCurrentTimeMicros());
_tablet = create_tablet_with_nullable_sort_key(rand(), rand(), {1, 2});
std::vector<int64_t> keys;
for (int i = 0; i < N; i++) {
keys.push_back(i);
}
ASSERT_TRUE(_tablet->rowset_commit(2, create_nullable_sort_key_rowset(_tablet,
{
{1, 2, 3, 4, 5, 6, 7, 8},
{-1, -1, -1, -1, 5, 6, 7, 8},
{8, 7, 6, 5, 4, 3, 2, 1},
}))
.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ASSERT_TRUE(_tablet->rowset_commit(3, create_nullable_sort_key_rowset(_tablet,
{
{5, 6, 7, 8, 9, 10, 11, 12},
{-1, -1, -1, -1, 9, 10, 11, 12},
{12, 11, 10, 9, 8, 7, 6, 5},
}))
.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ASSERT_EQ(_tablet->updates()->version_history_count(), 3);
ASSERT_EQ(12, read_tablet(_tablet, 3));
const auto& best_tablet = StorageEngine::instance()->tablet_manager()->find_best_tablet_to_do_update_compaction(
_tablet->data_dir());
EXPECT_EQ(best_tablet->tablet_id(), _tablet->tablet_id());
EXPECT_GT(best_tablet->updates()->get_compaction_score(), 0);
ASSERT_TRUE(best_tablet->updates()->compaction(_compaction_mem_tracker.get()).ok());
std::this_thread::sleep_for(std::chrono::seconds(1));
EXPECT_EQ(12, read_tablet_and_compare_nullable_sort_key(best_tablet, 3,
{
{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
{-1, -1, -1, -1, -1, -1, -1, -1, 9, 10, 11, 12},
{8, 7, 6, 5, 12, 11, 10, 9, 8, 7, 6, 5},
}));
ASSERT_EQ(best_tablet->updates()->num_rowsets(), 1);
ASSERT_EQ(best_tablet->updates()->version_history_count(), 4);
// the time interval is not enough after last compaction
EXPECT_EQ(best_tablet->updates()->get_compaction_score(), -1);
}
}

void TabletUpdatesTest::test_vertical_compaction(bool enable_persistent_index) {
auto orig = config::vertical_compaction_max_columns_per_group;
config::vertical_compaction_max_columns_per_group = 1;
Expand Down

0 comments on commit 2bfaa76

Please sign in to comment.