From 871514864529b94178fc029a679a01ee16c54cce Mon Sep 17 00:00:00 2001 From: Liangliang Gu Date: Fri, 6 Sep 2019 11:57:38 +0800 Subject: [PATCH] [FLASH-466] improve code coverage (#219) --- dbms/CMakeLists.txt | 4 +- dbms/src/Storages/DeltaMerge/Chunk.cpp | 39 +- dbms/src/Storages/DeltaMerge/HandleFilter.h | 2 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 11 +- .../DeltaMerge/tests/gtest_dm_chunk.cpp | 104 +++++- .../tests/gtest_dm_delta_merge_store.cpp | 344 ++++++++++++++++++ .../tests/gtest_dm_disk_value_space.cpp | 54 ++- .../DeltaMerge/tests/gtest_dm_segment.cpp | 24 +- dbms/src/Storages/Page/PageFile.cpp | 273 ++------------ dbms/src/Storages/Page/PageUtil.cpp | 86 +++++ dbms/src/Storages/Page/PageUtil.h | 157 ++++++++ .../Storages/Page/tests/gtest_page_util.cpp | 38 ++ dbms/src/Storages/StorageDeltaMerge.cpp | 52 +-- 13 files changed, 849 insertions(+), 339 deletions(-) create mode 100644 dbms/src/Storages/Page/PageUtil.cpp create mode 100644 dbms/src/Storages/Page/PageUtil.h create mode 100644 dbms/src/Storages/Page/tests/gtest_page_util.cpp diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index ed1a230b2d2..2e4a9f917ad 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -295,7 +295,7 @@ if (TEST_COVERAGE AND CMAKE_BUILD_TYPE STREQUAL "Debug") ) SETUP_TARGET_FOR_COVERAGE_GCOVR_HTML( NAME tiflash_gcovr_coverage - DEPENDENCIES unit_tests_dbms - EXECUTABLE unit_tests_dbms + DEPENDENCIES gtests_dbms + EXECUTABLE gtests_dbms ) endif() diff --git a/dbms/src/Storages/DeltaMerge/Chunk.cpp b/dbms/src/Storages/DeltaMerge/Chunk.cpp index 31da65744d8..7878917b925 100644 --- a/dbms/src/Storages/DeltaMerge/Chunk.cpp +++ b/dbms/src/Storages/DeltaMerge/Chunk.cpp @@ -431,6 +431,9 @@ bool castNonNullNumericColumn(const DataTypePtr & disk_type_not_null_, assert(!disk_type_not_null->isNullable()); assert(!read_type_not_null->isNullable()); + /// Caller should ensure that dist_type != read_type + assert(!disk_type_not_null->equals(*read_type_not_null)); + if (checkDataType(disk_type_not_null)) { using FromType = UInt32; @@ -440,12 +443,6 @@ bool castNonNullNumericColumn(const DataTypePtr & disk_type_not_null_, disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); return true; } - else if (checkDataType(read_type_not_null)) - { - insertRangeFromWithNumericTypeCast( - disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); - return true; - } } else if (checkDataType(disk_type_not_null)) { @@ -456,12 +453,6 @@ bool castNonNullNumericColumn(const DataTypePtr & disk_type_not_null_, disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); return true; } - else if (checkDataType(read_type_not_null)) - { - insertRangeFromWithNumericTypeCast( - disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); - return true; - } } else if (checkDataType(disk_type_not_null)) { @@ -478,12 +469,6 @@ bool castNonNullNumericColumn(const DataTypePtr & disk_type_not_null_, disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); return true; } - else if (checkDataType(read_type_not_null)) - { - insertRangeFromWithNumericTypeCast( - disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); - return true; - } } else if (checkDataType(disk_type_not_null)) { @@ -500,12 +485,6 @@ bool castNonNullNumericColumn(const DataTypePtr & disk_type_not_null_, disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); return true; } - else if (checkDataType(read_type_not_null)) - { - insertRangeFromWithNumericTypeCast( - disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); - return true; - } } else if (checkDataType(disk_type_not_null)) { @@ -528,12 +507,6 @@ bool castNonNullNumericColumn(const DataTypePtr & disk_type_not_null_, disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); return true; } - else if (checkDataType(read_type_not_null)) - { - insertRangeFromWithNumericTypeCast( - disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); - return true; - } } else if (checkDataType(disk_type_not_null)) { @@ -556,12 +529,6 @@ bool castNonNullNumericColumn(const DataTypePtr & disk_type_not_null_, disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); return true; } - else if (checkDataType(read_type_not_null)) - { - insertRangeFromWithNumericTypeCast( - disk_col_not_null, null_map, read_define, memory_col_not_null, rows_offset, rows_limit); - return true; - } } // else is not support diff --git a/dbms/src/Storages/DeltaMerge/HandleFilter.h b/dbms/src/Storages/DeltaMerge/HandleFilter.h index 7d3b358bab4..ff97e62066c 100644 --- a/dbms/src/Storages/DeltaMerge/HandleFilter.h +++ b/dbms/src/Storages/DeltaMerge/HandleFilter.h @@ -62,7 +62,7 @@ inline Block filterUnsorted(const HandleRange & handle_range, Block && block, si IColumn::Filter filter(rows); size_t passed_count = 0; - for (size_t i = 0; i < rows - 1; ++i) + for (size_t i = 0; i < rows; ++i) { bool ok = handle_range.check(handle_col_data[i]); filter[i] = ok; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 869d9aca42b..e01ec7cbe6c 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -369,7 +369,7 @@ SegmentPtr Segment::flush(DMContext & dm_context) return new_me; } -void Segment::flushCache(DMContext &dm_context) +void Segment::flushCache(DMContext & dm_context) { std::unique_lock lock(read_write_mutex); delta->tryFlushCache(OpContext::createForLogStorage(dm_context), /* force= */ true); @@ -913,7 +913,14 @@ size_t Segment::estimatedRows() size_t Segment::estimatedBytes() { size_t stable_bytes = stable->num_bytes(); - return stable_bytes + delta->num_bytes() - (stable_bytes / stable->num_rows()) * delta_tree->numDeletes(); + if (stable->num_rows() == 0) + { + return stable_bytes + delta->num_bytes(); + } + else + { + return stable_bytes + delta->num_bytes() - (stable_bytes / stable->num_rows()) * delta_tree->numDeletes(); + } } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_chunk.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_chunk.cpp index 221c96b2811..428ae18fb1e 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_chunk.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_chunk.cpp @@ -109,7 +109,7 @@ DataTypePtr typeFromString(const String & str) TEST(ChunkColumnCast_test, CastNumeric) { { - const Strings to_types = {"UInt16", "UInt32", "UInt64"}; + const Strings to_types = {"UInt8", "UInt16", "UInt32", "UInt64"}; DataTypePtr disk_data_type = typeFromString("UInt8"); MutableColumnPtr disk_col = disk_data_type->createColumn(); @@ -133,7 +133,55 @@ TEST(ChunkColumnCast_test, CastNumeric) } { - const Strings to_types = {"Int16", "Int32", "Int64"}; + const Strings to_types = {"UInt16", "UInt32", "UInt64"}; + + DataTypePtr disk_data_type = typeFromString("UInt16"); + MutableColumnPtr disk_col = disk_data_type->createColumn(); + disk_col->insert(Field(UInt64(15))); + disk_col->insert(Field(UInt64(255))); + + for (const String & to_type : to_types) + { + DataTypePtr read_data_type = typeFromString(to_type); + ColumnDefine read_define(0, "c", read_data_type); + MutableColumnPtr memory_column = read_data_type->createColumn(); + memory_column->reserve(2); + + castColumnAccordingToColumnDefine(disk_data_type, disk_col->getPtr(), read_define, memory_column->getPtr(), 0, 2); + + UInt64 val1 = memory_column->getUInt(0); + ASSERT_EQ(val1, 15UL); + UInt64 val2 = memory_column->getUInt(1); + ASSERT_EQ(val2, 255UL); + } + } + + { + const Strings to_types = {"UInt32", "UInt64"}; + + DataTypePtr disk_data_type = typeFromString("UInt32"); + MutableColumnPtr disk_col = disk_data_type->createColumn(); + disk_col->insert(Field(UInt64(15))); + disk_col->insert(Field(UInt64(255))); + + for (const String & to_type : to_types) + { + DataTypePtr read_data_type = typeFromString(to_type); + ColumnDefine read_define(0, "c", read_data_type); + MutableColumnPtr memory_column = read_data_type->createColumn(); + memory_column->reserve(2); + + castColumnAccordingToColumnDefine(disk_data_type, disk_col->getPtr(), read_define, memory_column->getPtr(), 0, 2); + + UInt64 val1 = memory_column->getUInt(0); + ASSERT_EQ(val1, 15UL); + UInt64 val2 = memory_column->getUInt(1); + ASSERT_EQ(val2, 255UL); + } + } + + { + const Strings to_types = {"Int8", "Int16", "Int32", "Int64"}; DataTypePtr disk_data_type = typeFromString("Int8"); MutableColumnPtr disk_col = disk_data_type->createColumn(); @@ -155,6 +203,54 @@ TEST(ChunkColumnCast_test, CastNumeric) ASSERT_EQ(val2, -1L); } } + + { + const Strings to_types = {"Int16", "Int32", "Int64"}; + + DataTypePtr disk_data_type = typeFromString("Int16"); + MutableColumnPtr disk_col = disk_data_type->createColumn(); + disk_col->insert(Field(Int64(127))); + disk_col->insert(Field(Int64(-1))); + + for (const String & to_type : to_types) + { + DataTypePtr read_data_type = typeFromString(to_type); + ColumnDefine read_define(0, "c", read_data_type); + MutableColumnPtr memory_column = read_data_type->createColumn(); + memory_column->reserve(2); + + castColumnAccordingToColumnDefine(disk_data_type, disk_col->getPtr(), read_define, memory_column->getPtr(), 0, 2); + + Int64 val1 = memory_column->getInt(0); + ASSERT_EQ(val1, 127L); + Int64 val2 = memory_column->getInt(1); + ASSERT_EQ(val2, -1L); + } + } + + { + const Strings to_types = {"Int32", "Int64"}; + + DataTypePtr disk_data_type = typeFromString("Int32"); + MutableColumnPtr disk_col = disk_data_type->createColumn(); + disk_col->insert(Field(Int64(127))); + disk_col->insert(Field(Int64(-1))); + + for (const String & to_type : to_types) + { + DataTypePtr read_data_type = typeFromString(to_type); + ColumnDefine read_define(0, "c", read_data_type); + MutableColumnPtr memory_column = read_data_type->createColumn(); + memory_column->reserve(2); + + castColumnAccordingToColumnDefine(disk_data_type, disk_col->getPtr(), read_define, memory_column->getPtr(), 0, 2); + + Int64 val1 = memory_column->getInt(0); + ASSERT_EQ(val1, 127L); + Int64 val2 = memory_column->getInt(1); + ASSERT_EQ(val2, -1L); + } + } } TEST(ChunkColumnCast_test, CastNullableToNotNull) @@ -216,7 +312,7 @@ TEST(ChunkColumnCast_test, DISABLED_CastNullableToNotNullWithNonZeroDefaultValue TEST(ChunkColumnCast_test, CastNullableToNullable) { - const Strings to_types = {"Nullable(Int16)", "Nullable(Int32)", "Nullable(Int64)"}; + const Strings to_types = {"Nullable(Int8)", "Nullable(Int16)", "Nullable(Int32)", "Nullable(Int64)"}; DataTypePtr disk_data_type = typeFromString("Nullable(Int8)"); MutableColumnPtr disk_col = disk_data_type->createColumn(); @@ -251,7 +347,7 @@ TEST(ChunkColumnCast_test, CastNullableToNullable) TEST(ChunkColumnCast_test, CastNotNullToNullable) { - const Strings to_types = {"Nullable(Int16)", "Nullable(Int32)", "Nullable(Int64)"}; + const Strings to_types = {"Nullable(Int8)", "Nullable(Int16)", "Nullable(Int32)", "Nullable(Int64)"}; DataTypePtr disk_data_type = typeFromString("Int8"); MutableColumnPtr disk_col = disk_data_type->createColumn(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 165f479553d..cb79f3c36a1 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -27,6 +27,8 @@ class DeltaMergeStore_test : public ::testing::Test context = std::make_unique(DMTestEnv::getContext()); store = reload(); + + Logger::get("DeltaMergeStore").setLevel("trace"); } DeltaMergeStorePtr reload(const ColumnDefines & pre_define_columns = {}) @@ -179,6 +181,114 @@ TEST_F(DeltaMergeStore_test, SimpleWriteRead) in->readSuffix(); ASSERT_EQ(num_rows_read, num_rows_write); } + + { + // test readRaw + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->readRaw(*context, + context->getSettingsRef(), + columns, 1)[0]; + + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto && iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); ++i) + { + if (iter.name == "pk") + { + EXPECT_EQ(c->getInt(i), i); + } + else if (iter.name == col_str_define.name) + { + EXPECT_EQ(c->getDataAt(i), DB::toString(i)); + } + else if (iter.name == col_i8_define.name) + { + Int64 num = i * (i % 2 == 0 ? -1 : 1); + EXPECT_EQ(c->getInt(i), num); + } + } + } + } + in->readSuffix(); + ASSERT_EQ(num_rows_read, num_rows_write); + } +} + +TEST_F(DeltaMergeStore_test, Split) +{ + const String col_name_c1 = "col2"; + const ColId col_id_c1 = 2; + const DataTypePtr col_type_c1 = DataTypeFactory::instance().get("Int64"); + { + ColumnDefines table_column_defines = DMTestEnv::getDefaultColumns(); + ColumnDefine cd(col_id_c1, col_name_c1, col_type_c1); + table_column_defines.emplace_back(cd); + store = reload(table_column_defines); + } + + const size_t num_rows_write = 2097153; + { + // write to store + Block block; + { + block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + // Add a column of col1:String for test + ColumnWithTypeAndName col1(std::make_shared(), col_name_c1); + { + IColumn::MutablePtr m_col2 = col1.type->createColumn(); + for (size_t i = 0; i < num_rows_write; i++) + { + Int64 num = i; + m_col2->insert(Field(num)); + } + col1.column = std::move(m_col2); + } + block.insert(col1); + } + store->write(*context, context->getSettingsRef(), block); + } + + { + // read all columns from store + const auto & columns = store->getTableColumns(); + BlockInputStreamPtr in = store->read(*context, + context->getSettingsRef(), + columns, + {HandleRange::newAll()}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024)[0]; + + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto && iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); ++i) + { + if (iter.name == "pk") + { + EXPECT_EQ(c->getInt(i) % 1024, i); + } + else if (iter.name == col_name_c1) + { + EXPECT_EQ(c->getInt(i) % 1024, i); + } + } + } + } + in->readSuffix(); + ASSERT_EQ(num_rows_read, num_rows_write); + } } TEST_F(DeltaMergeStore_test, DDLChanegInt8ToInt32) @@ -302,6 +412,240 @@ catch (const Exception & e) throw; } + +TEST_F(DeltaMergeStore_test, DDLDropColumn) +try +{ + const String col_name_to_drop = "i8"; + const ColId col_id_to_drop = 2; + const DataTypePtr col_type_to_drop = DataTypeFactory::instance().get("Int8"); + { + ColumnDefines table_column_defines = DMTestEnv::getDefaultColumns(); + ColumnDefine cd(col_id_to_drop, col_name_to_drop, col_type_to_drop); + table_column_defines.emplace_back(cd); + store = reload(table_column_defines); + } + + { + // check column structure + const auto & cols = store->getTableColumns(); + ASSERT_EQ(cols.size(), 4UL); + const auto & str_col = cols[3]; + ASSERT_EQ(str_col.name, col_name_to_drop); + ASSERT_EQ(str_col.id, col_id_to_drop); + ASSERT_TRUE(str_col.type->equals(*col_type_to_drop)); + } + + const size_t num_rows_write = 128; + { + // write to store + Block block; + { + block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + // Add a column of col2:String for test + ColumnWithTypeAndName col2(std::make_shared(), col_name_to_drop); + { + IColumn::MutablePtr m_col2 = col2.type->createColumn(); + for (size_t i = 0; i < num_rows_write; i++) + { + Int64 num = i * (i % 2 == 0 ? -1 : 1); + m_col2->insert(Field(num)); + } + col2.column = std::move(m_col2); + } + block.insert(col2); + } + store->write(*context, context->getSettingsRef(), block); + } + + { + // DDL change delete col i8 + AlterCommands commands; + { + AlterCommand com; + com.type = AlterCommand::DROP_COLUMN; + com.data_type = col_type_to_drop; + com.column_name = col_name_to_drop; + commands.emplace_back(std::move(com)); + } + ColumnID _col_to_drop = col_id_to_drop; + store->applyAlters(commands, std::nullopt, _col_to_drop, *context); + } + + { + // read all columns from store + const auto & columns = store->getTableColumns(); + BlockInputStreams ins = store->read(*context, + context->getSettingsRef(), + columns, + {HandleRange::newAll()}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024); + ASSERT_EQ(ins.size(), 1UL); + BlockInputStreamPtr & in = ins[0]; + { + const Block head = in->getHeader(); + ASSERT_FALSE(head.has(col_name_to_drop)); + } + + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto && iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); ++i) + { + if (iter.name == "pk") + { + EXPECT_EQ(c->getInt(i), i); + } + } + } + } + in->readSuffix(); + ASSERT_EQ(num_rows_read, num_rows_write); + } +} +catch (const Exception & e) +{ + std::string text = e.displayText(); + + auto embedded_stack_trace_pos = text.find("Stack trace"); + std::cerr << "Code: " << e.code() << ". " << text << std::endl << std::endl; + if (std::string::npos == embedded_stack_trace_pos) + std::cerr << "Stack trace:" << std::endl << e.getStackTrace().toString() << std::endl; + + throw; +} + +TEST_F(DeltaMergeStore_test, DDLAddColumn) +try +{ + const String col_name_c1 = "i8"; + const ColId col_id_c1 = 2; + const DataTypePtr col_type_c1 = DataTypeFactory::instance().get("Int8"); + + const String col_name_to_add = "i32"; + const ColId col_id_to_add = 3; + const DataTypePtr col_type_to_add = DataTypeFactory::instance().get("Int32"); + { + ColumnDefines table_column_defines = DMTestEnv::getDefaultColumns(); + ColumnDefine cd(col_id_c1, col_name_c1, col_type_c1); + table_column_defines.emplace_back(cd); + store = reload(table_column_defines); + } + + const size_t num_rows_write = 128; + { + // write to store + Block block; + { + block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + // Add a column of col1:String for test + ColumnWithTypeAndName col1(std::make_shared(), col_name_c1); + { + IColumn::MutablePtr m_col2 = col1.type->createColumn(); + for (size_t i = 0; i < num_rows_write; i++) + { + Int64 num = i * (i % 2 == 0 ? -1 : 1); + m_col2->insert(Field(num)); + } + col1.column = std::move(m_col2); + } + block.insert(col1); + } + store->write(*context, context->getSettingsRef(), block); + } + + { + // DDL change add col i32 + AlterCommands commands; + { + AlterCommand com; + com.type = AlterCommand::ADD_COLUMN; + com.data_type = col_type_to_add; + com.column_name = col_name_to_add; + commands.emplace_back(std::move(com)); + } + ColumnID _col_to_add = col_id_to_add; + store->applyAlters(commands, std::nullopt, _col_to_add, *context); + } + + { + // read all columns from store + const auto & columns = store->getTableColumns(); + BlockInputStreams ins = store->read(*context, + context->getSettingsRef(), + columns, + {HandleRange::newAll()}, + /* num_streams= */ 1, + /* max_version= */ std::numeric_limits::max(), + /* expected_block_size= */ 1024); + ASSERT_EQ(ins.size(), 1UL); + BlockInputStreamPtr & in = ins[0]; + { + const Block head = in->getHeader(); + { + const auto & col = head.getByName(col_name_c1); + ASSERT_EQ(col.name, col_name_c1); + ASSERT_EQ(col.column_id, col_id_c1); + ASSERT_TRUE(col.type->equals(*col_type_c1)); + } + + { + const auto & col = head.getByName(col_name_to_add); + ASSERT_EQ(col.name, col_name_to_add); + ASSERT_EQ(col.column_id, col_id_to_add); + ASSERT_TRUE(col.type->equals(*col_type_to_add)); + } + } + + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + for (auto && iter : block) + { + auto c = iter.column; + for (Int64 i = 0; i < Int64(c->size()); ++i) + { + if (iter.name == "pk") + { + EXPECT_EQ(c->getInt(i), i); + } + else if (iter.name == col_name_c1) + { + Int64 num = i * (i % 2 == 0 ? -1 : 1); + EXPECT_EQ(c->getInt(i), num); + } + else if (iter.name == col_name_to_add) + { + EXPECT_EQ(c->getInt(i), 0); + } + } + } + } + in->readSuffix(); + ASSERT_EQ(num_rows_read, num_rows_write); + } +} +catch (const Exception & e) +{ + std::string text = e.displayText(); + + auto embedded_stack_trace_pos = text.find("Stack trace"); + std::cerr << "Code: " << e.code() << ". " << text << std::endl << std::endl; + if (std::string::npos == embedded_stack_trace_pos) + std::cerr << "Stack trace:" << std::endl << e.getStackTrace().toString() << std::endl; + + throw; +} + } // namespace tests } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp index 461bb600087..33057c89f8f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp @@ -2,8 +2,10 @@ #include +#include #include +#include #include #include @@ -64,7 +66,7 @@ class DiskValueSpace_test : public ::testing::Test String path; /// all these var lives as ref in dm_context std::unique_ptr storage_pool; - TiDB::TableInfo table_info; + TiDB::TableInfo table_info; ColumnDefine table_handle_define; ColumnDefines table_columns; DM::DeltaMergeStore::Settings settings; @@ -79,13 +81,24 @@ TEST_F(DiskValueSpace_test, LogStorageWriteRead) DiskValueSpace delta(true, 0); { // write to DiskValueSpace - Block block = DMTestEnv::prepareSimpleWriteBlock(value_beg, value_beg + num_rows_write, false); - DiskValueSpace::OpContext opc = DiskValueSpace::OpContext::createForLogStorage(*dm_context); - Chunks chunks = DiskValueSpace::writeChunks(opc, std::make_shared(block)); - for (auto & chunk : chunks) + Block block1 = DMTestEnv::prepareSimpleWriteBlock(value_beg, value_beg + num_rows_write / 2, false); + Block block2 = DMTestEnv::prepareSimpleWriteBlock(value_beg + num_rows_write / 2, value_beg + num_rows_write, false); + DiskValueSpace::OpContext opc = DiskValueSpace::OpContext::createForLogStorage(*dm_context); + Chunks chunks1 = DiskValueSpace::writeChunks(opc, std::make_shared(block1)); + Chunks chunks2 = DiskValueSpace::writeChunks(opc, std::make_shared(block2)); + for (auto & chunk : chunks1) { - delta.appendChunkWithCache(opc, std::move(chunk), block); + delta.appendChunkWithCache(opc, std::move(chunk), block1); } + + for (auto & chunk : chunks2) + { + delta.appendChunkWithCache(opc, std::move(chunk), block2); + } + + EXPECT_EQ(num_rows_write, delta.num_rows(0, 2)); + delta.tryFlushCache(opc, true); + EXPECT_EQ(num_rows_write, delta.num_rows(0, 1)); } { @@ -134,6 +147,35 @@ TEST_F(DiskValueSpace_test, LogStorageWriteRead) } } } + + { + // read using `read` of chunk_index + const size_t chunk_index = 0; + PageReader page_reader(dm_context->storage_pool.log()); + Block block = delta.read(table_columns, page_reader, chunk_index); + + // check the order of cols is the same as read_columns + const Names col_names = block.getNames(); + ASSERT_EQ(col_names.size(), table_columns.size()); + for (size_t i = 0; i < col_names.size(); ++i) + { + EXPECT_EQ(col_names[i], table_columns[i].name); + } + + // check the value + ASSERT_EQ(block.rows(), num_rows_write); + for (const auto & iter : block) + { + auto c = iter.column; + for (size_t i = 0; i < c->size(); ++i) + { + if (iter.name == "pk") + { + EXPECT_EQ(c->getInt(i), static_cast(value_beg + i)); + } + } + } + } } } // namespace tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index b289e3291b9..d0bde10a3e1 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -112,6 +112,27 @@ TEST_F(Segment_test, WriteRead) segment->write(dmContext(), std::move(block)); } + { + // estimate segment + auto estimatedRows = segment->getEstimatedRows(); + ASSERT_GT(estimatedRows, num_rows_write / 2); + ASSERT_LT(estimatedRows, num_rows_write * 2); + + auto estimatedBytes = segment->getEstimatedBytes(); + ASSERT_GT(estimatedBytes, num_rows_write * 5 / 2); + ASSERT_LT(estimatedBytes, num_rows_write * 5 * 2); + } + + { + // check segment + segment->check(dmContext(), "test"); + } + + { + // flush segment + segment->flush(dmContext()); + } + { // read written data auto in = segment->getInputStream(/* dm_context= */ dmContext(), @@ -338,7 +359,8 @@ TEST_F(Segment_test, DDLAlterInt8ToInt32) { num_rows_read += block.rows(); const ColumnWithTypeAndName & col = block.getByName(column_name_i8_to_i32); - ASSERT_TRUE(col.type->equals(*column_i32_after_ddl.type)) << "col.type: " + col.type->getName() + " expect type: " + column_i32_after_ddl.type->getName(); + ASSERT_TRUE(col.type->equals(*column_i32_after_ddl.type)) + << "col.type: " + col.type->getName() + " expect type: " + column_i32_after_ddl.type->getName(); ASSERT_EQ(col.name, column_i32_after_ddl.name); ASSERT_EQ(col.column_id, column_i32_after_ddl.id); for (size_t i = 0; i < block.rows(); ++i) diff --git a/dbms/src/Storages/Page/PageFile.cpp b/dbms/src/Storages/Page/PageFile.cpp index 16c204250a7..a6240c67381 100644 --- a/dbms/src/Storages/Page/PageFile.cpp +++ b/dbms/src/Storages/Page/PageFile.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include @@ -16,55 +15,15 @@ #include #include -#include #include #include - -namespace ProfileEvents -{ -extern const Event FileOpen; -extern const Event FileOpenFailed; -extern const Event Seek; -extern const Event PSMWritePages; -extern const Event PSMWriteCalls; -extern const Event PSMWriteIOCalls; -extern const Event PSMWriteBytes; -extern const Event PSMReadPages; -extern const Event PSMReadCalls; -extern const Event PSMReadIOCalls; -extern const Event PSMReadBytes; -extern const Event PSMWriteFailed; -extern const Event PSMReadFailed; -} // namespace ProfileEvents - -namespace CurrentMetrics -{ -extern const Metric Write; -extern const Metric Read; -} // namespace CurrentMetrics +#include namespace DB { - -namespace ErrorCodes -{ -extern const int UNKNOWN_FORMAT_VERSION; -extern const int CHECKSUM_DOESNT_MATCH; -extern const int FILE_DOESNT_EXIST; -extern const int CANNOT_OPEN_FILE; -extern const int CANNOT_FSYNC; -extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR; -extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; -extern const int CANNOT_SEEK_THROUGH_FILE; -extern const int PAGE_SIZE_NOT_MATCH; -extern const int LOGICAL_ERROR; -extern const int ILLFORMED_PAGE_NAME; -extern const int FILE_SIZE_NOT_MATCH; -} // namespace ErrorCodes - // ========================================================= -// Helper functions +// Page Meta format // ========================================================= static constexpr bool PAGE_CHECKSUM_ON_READ = true; @@ -73,164 +32,6 @@ static constexpr bool PAGE_CHECKSUM_ON_READ = true; #define O_DIRECT 00040000 #endif -template -int openFile(const std::string & path) -{ - ProfileEvents::increment(ProfileEvents::FileOpen); - - int flags; - if constexpr (read) - { - flags = O_RDONLY; - } - else - { - flags = O_WRONLY | O_CREAT; - } - - int fd = ::open(path.c_str(), flags, 0666); - if (-1 == fd) - { - ProfileEvents::increment(ProfileEvents::FileOpenFailed); - if constexpr (!must_exist) - { - if (errno == ENOENT) - { - return 0; - } - } - throwFromErrno("Cannot open file " + path, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); - } - - return fd; -} - -void syncFile(int fd, const std::string & path) -{ - if (-1 == ::fsync(fd)) - throwFromErrno("Cannot fsync " + path, ErrorCodes::CANNOT_FSYNC); -} - -void seekFile(int fd, off_t pos, const std::string & path) -{ - ProfileEvents::increment(ProfileEvents::Seek); - - off_t res = lseek(fd, pos, SEEK_SET); - if (-1 == res) - throwFromErrno("Cannot seek through file " + path, ErrorCodes::CANNOT_SEEK_THROUGH_FILE); -} - -void writeFile(int fd, UInt64 offset, const char * data, size_t to_write, const std::string & path) -{ - ProfileEvents::increment(ProfileEvents::PSMWriteCalls); - ProfileEvents::increment(ProfileEvents::PSMWriteBytes, to_write); - - size_t bytes_written = 0; - while (bytes_written != to_write) - { - ProfileEvents::increment(ProfileEvents::PSMWriteIOCalls); - ssize_t res = 0; - { - CurrentMetrics::Increment metric_increment{CurrentMetrics::Write}; - res = ::pwrite(fd, data + bytes_written, to_write - bytes_written, offset + bytes_written); - } - - if ((-1 == res || 0 == res) && errno != EINTR) - { - ProfileEvents::increment(ProfileEvents::PSMWriteFailed); - throwFromErrno("Cannot write to file " + path, ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR); - } - - if (res > 0) - bytes_written += res; - } -} - - -void readFile(int fd, const off_t offset, const char * buf, size_t expected_bytes, const std::string & path) -{ - ProfileEvents::increment(ProfileEvents::PSMReadCalls); - - size_t bytes_read = 0; - while (bytes_read < expected_bytes) - { - ProfileEvents::increment(ProfileEvents::PSMReadIOCalls); - - ssize_t res = 0; - { - CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; - res = ::pread(fd, const_cast(buf + bytes_read), expected_bytes - bytes_read, offset + bytes_read); - } - if (!res) - break; - - if (-1 == res && errno != EINTR) - { - ProfileEvents::increment(ProfileEvents::PSMReadFailed); - throwFromErrno("Cannot read from file " + path, ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); - } - - if (res > 0) - bytes_read += res; - } - ProfileEvents::increment(ProfileEvents::PSMReadBytes, bytes_read); - - if (unlikely(bytes_read != expected_bytes)) - throw Exception("Not enough data in file " + path, ErrorCodes::FILE_SIZE_NOT_MATCH); -} - -/// Write and advance sizeof(T) bytes. -template -inline void put(char *& pos, const T & v) -{ - std::memcpy(pos, reinterpret_cast(&v), sizeof(T)); - pos += sizeof(T); -} - -/// Read and advance sizeof(T) bytes. -template -inline T get(std::conditional_t pos) -{ - T v; - std::memcpy(reinterpret_cast(&v), pos, sizeof(T)); - if constexpr (advance) - pos += sizeof(T); - return v; -} - -template -std::unique_ptr readValuesFromFile(const std::string & path, Allocator & allocator) -{ - Poco::File file(path); - if (!file.exists()) - return {}; - - size_t file_size = file.getSize(); - int file_fd = openFile(path); - char * data = (char *)allocator.alloc(file_size); - SCOPE_EXIT({ allocator.free(data, file_size); }); - char * pos = data; - - readFile(file_fd, 0, data, file_size, path); - - auto size = get(pos); - std::unique_ptr values = std::make_unique(); - for (size_t i = 0; i < size; ++i) - { - T v = get(pos); - values->push_back(v); - } - - if (unlikely(pos != data + file_size)) - throw Exception("pos not match", ErrorCodes::FILE_SIZE_NOT_MATCH); - - return values; -} - -// ========================================================= -// Page Meta format -// ========================================================= - namespace PageMetaFormat { using WBSize = UInt32; @@ -282,13 +83,13 @@ std::pair genWriteData( // char * meta_pos = meta_buffer; char * data_pos = data_buffer; - put(meta_pos, meta_write_bytes); - put(meta_pos, PageFile::CURRENT_VERSION); + PageUtil::put(meta_pos, meta_write_bytes); + PageUtil::put(meta_pos, PageFile::CURRENT_VERSION); PageOffset page_data_file_off = page_file.getDataFileAppendPos(); for (const auto & write : wb.getWrites()) { - put(meta_pos, static_cast(write.type)); + PageUtil::put(meta_pos, static_cast(write.type)); switch (write.type) { case WriteBatch::WriteType::PUT: @@ -306,23 +107,23 @@ std::pair genWriteData( // edit.put(write.page_id, pc); - put(meta_pos, (PageId)write.page_id); - put(meta_pos, (PageTag)write.tag); - put(meta_pos, (PageOffset)page_data_file_off); - put(meta_pos, (PageSize)write.size); - put(meta_pos, (Checksum)page_checksum); + PageUtil::put(meta_pos, (PageId)write.page_id); + PageUtil::put(meta_pos, (PageTag)write.tag); + PageUtil::put(meta_pos, (PageOffset)page_data_file_off); + PageUtil::put(meta_pos, (PageSize)write.size); + PageUtil::put(meta_pos, (Checksum)page_checksum); page_data_file_off += write.size; break; } case WriteBatch::WriteType::DEL: - put(meta_pos, (PageId)write.page_id); + PageUtil::put(meta_pos, (PageId)write.page_id); edit.del(write.page_id); break; case WriteBatch::WriteType::REF: - put(meta_pos, static_cast(write.page_id)); - put(meta_pos, static_cast(write.ori_page_id)); + PageUtil::put(meta_pos, static_cast(write.page_id)); + PageUtil::put(meta_pos, static_cast(write.ori_page_id)); edit.ref(write.page_id, write.ori_page_id); break; @@ -330,7 +131,7 @@ std::pair genWriteData( // } const Checksum wb_checksum = CityHash_v1_0_2::CityHash64(meta_buffer, meta_write_bytes - sizeof(Checksum)); - put(meta_pos, wb_checksum); + PageUtil::put(meta_pos, wb_checksum); if (unlikely(meta_pos != meta_buffer + meta_write_bytes || data_pos != data_buffer + data_write_bytes)) throw Exception("pos not match", ErrorCodes::LOGICAL_ERROR); @@ -360,7 +161,7 @@ std::pair analyzeMetaFile( // break; } const char * wb_start_pos = pos; - const auto wb_bytes = get(pos); + const auto wb_bytes = PageUtil::get(pos); if (wb_start_pos + wb_bytes > meta_data_end) { LOG_WARNING(log, "Incomplete write batch, ignored."); @@ -368,13 +169,13 @@ std::pair analyzeMetaFile( // } // this field is always true now - const auto version = get(pos); + const auto version = PageUtil::get(pos); if (version != PageFile::CURRENT_VERSION) throw Exception("Version not match", ErrorCodes::LOGICAL_ERROR); // check the checksum of WriteBatch const auto wb_bytes_without_checksum = wb_bytes - sizeof(Checksum); - const auto wb_checksum = get(wb_start_pos + wb_bytes_without_checksum); + const auto wb_checksum = PageUtil::get(wb_start_pos + wb_bytes_without_checksum); if (wb_checksum != CityHash_v1_0_2::CityHash64(wb_start_pos, wb_bytes_without_checksum)) { throw Exception("Write batch checksum not match, path: " + path + ", offset: " + DB::toString(wb_start_pos - meta_data), @@ -384,20 +185,20 @@ std::pair analyzeMetaFile( // // recover WriteBatch while (pos < wb_start_pos + wb_bytes_without_checksum) { - const auto is_put = get(pos); + const auto is_put = PageUtil::get(pos); const auto write_type = static_cast(is_put); switch (write_type) { case WriteBatch::WriteType::PUT: { - auto page_id = get(pos); + auto page_id = PageUtil::get(pos); PageEntry pc; pc.file_id = file_id; pc.level = level; - pc.tag = get(pos); - pc.offset = get(pos); - pc.size = get(pos); - pc.checksum = get(pos); + pc.tag = PageUtil::get(pos); + pc.offset = PageUtil::get(pos); + pc.size = PageUtil::get(pos); + pc.checksum = PageUtil::get(pos); edit.put(page_id, pc); page_data_file_size += pc.size; @@ -405,14 +206,14 @@ std::pair analyzeMetaFile( // } case WriteBatch::WriteType::DEL: { - auto page_id = get(pos); + auto page_id = PageUtil::get(pos); edit.del(page_id); // Reserve the order of removal. break; } case WriteBatch::WriteType::REF: { - const auto ref_id = get(pos); - const auto page_id = get(pos); + const auto ref_id = PageUtil::get(pos); + const auto page_id = PageUtil::get(pos); edit.ref(ref_id, page_id); } } @@ -436,8 +237,8 @@ PageFile::Writer::Writer(PageFile & page_file_, bool sync_on_write_) sync_on_write(sync_on_write_), data_file_path(page_file.dataPath()), meta_file_path(page_file.metaPath()), - data_file_fd(openFile(data_file_path)), - meta_file_fd(openFile(meta_file_path)) + data_file_fd(PageUtil::openFile(data_file_path)), + meta_file_fd(PageUtil::openFile(meta_file_path)) { } @@ -447,8 +248,8 @@ PageFile::Writer::~Writer() ::close(data_file_fd); ::close(meta_file_fd); }); - syncFile(data_file_fd, data_file_path); - syncFile(meta_file_fd, meta_file_path); + PageUtil::syncFile(data_file_fd, data_file_path); + PageUtil::syncFile(meta_file_fd, meta_file_path); } void PageFile::Writer::write(const WriteBatch & wb, PageEntriesEdit & edit) @@ -463,9 +264,9 @@ void PageFile::Writer::write(const WriteBatch & wb, PageEntriesEdit & edit) SCOPE_EXIT({ page_file.free(data_buf.begin(), data_buf.size()); }); auto write_buf = [&](int fd, UInt64 offset, const std::string & path, ByteBuffer buf) { - writeFile(fd, offset, buf.begin(), buf.size(), path); + PageUtil::writeFile(fd, offset, buf.begin(), buf.size(), path); if (sync_on_write) - syncFile(fd, path); + PageUtil::syncFile(fd, path); }; write_buf(data_file_fd, page_file.data_file_pos, data_file_path, data_buf); @@ -479,7 +280,7 @@ void PageFile::Writer::write(const WriteBatch & wb, PageEntriesEdit & edit) // PageFile::Reader // ========================================================= -PageFile::Reader::Reader(PageFile & page_file) : data_file_path(page_file.dataPath()), data_file_fd(openFile(data_file_path)) {} +PageFile::Reader::Reader(PageFile & page_file) : data_file_path(page_file.dataPath()), data_file_fd(PageUtil::openFile(data_file_path)) {} PageFile::Reader::~Reader() { @@ -513,7 +314,7 @@ PageMap PageFile::Reader::read(PageIdAndEntries & to_read) PageMap page_map; for (const auto & [page_id, page_cache] : to_read) { - readFile(data_file_fd, page_cache.offset, pos, page_cache.size, data_file_path); + PageUtil::readFile(data_file_fd, page_cache.offset, pos, page_cache.size, data_file_path); if constexpr (PAGE_CHECKSUM_ON_READ) { @@ -562,7 +363,7 @@ void PageFile::Reader::read(PageIdAndEntries & to_read, const PageHandler & hand { auto && [page_id, page_cache] = *it; - readFile(data_file_fd, page_cache.offset, data_buf, page_cache.size, data_file_path); + PageUtil::readFile(data_file_fd, page_cache.offset, data_buf, page_cache.size, data_file_path); if constexpr (PAGE_CHECKSUM_ON_READ) { @@ -665,14 +466,14 @@ void PageFile::readAndSetPageMetas(PageEntriesEdit & edit) Poco::File file(path); const size_t file_size = file.getSize(); - int file_fd = openFile(path); + int file_fd = PageUtil::openFile(path); // File not exists. if (!file_fd) return; char * data = (char *)alloc(file_size); SCOPE_EXIT({ free(data, file_size); }); - readFile(file_fd, 0, data, file_size, path); + PageUtil::readFile(file_fd, 0, data, file_size, path); // analyze meta file and update page_entries std::tie(this->meta_file_pos, this->data_file_pos) diff --git a/dbms/src/Storages/Page/PageUtil.cpp b/dbms/src/Storages/Page/PageUtil.cpp new file mode 100644 index 00000000000..98cadeaa459 --- /dev/null +++ b/dbms/src/Storages/Page/PageUtil.cpp @@ -0,0 +1,86 @@ +#include + +#include +#include +#include +#include + +#include + +#ifndef __APPLE__ +#include +#endif + +#include + +#include + +namespace DB::PageUtil +{ + +void syncFile(int fd, const std::string & path) +{ + if (-1 == ::fsync(fd)) + DB::throwFromErrno("Cannot fsync " + path, ErrorCodes::CANNOT_FSYNC); +} + +void writeFile(int fd, UInt64 offset, const char * data, size_t to_write, const std::string & path) +{ + ProfileEvents::increment(ProfileEvents::PSMWriteCalls); + ProfileEvents::increment(ProfileEvents::PSMWriteBytes, to_write); + + size_t bytes_written = 0; + while (bytes_written != to_write) + { + ProfileEvents::increment(ProfileEvents::PSMWriteIOCalls); + ssize_t res = 0; + { + CurrentMetrics::Increment metric_increment{CurrentMetrics::Write}; + res = ::pwrite(fd, data + bytes_written, to_write - bytes_written, offset + bytes_written); + } + + if ((-1 == res || 0 == res) && errno != EINTR) + { + ProfileEvents::increment(ProfileEvents::PSMWriteFailed); + DB::throwFromErrno("Cannot write to file " + path, ErrorCodes::CANNOT_WRITE_TO_FILE_DESCRIPTOR); + } + + if (res > 0) + bytes_written += res; + } +} + + +void readFile(int fd, const off_t offset, const char * buf, size_t expected_bytes, const std::string & path) +{ + ProfileEvents::increment(ProfileEvents::PSMReadCalls); + + size_t bytes_read = 0; + while (bytes_read < expected_bytes) + { + ProfileEvents::increment(ProfileEvents::PSMReadIOCalls); + + ssize_t res = 0; + { + CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; + res = ::pread(fd, const_cast(buf + bytes_read), expected_bytes - bytes_read, offset + bytes_read); + } + if (!res) + break; + + if (-1 == res && errno != EINTR) + { + ProfileEvents::increment(ProfileEvents::PSMReadFailed); + DB::throwFromErrno("Cannot read from file " + path, ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR); + } + + if (res > 0) + bytes_read += res; + } + ProfileEvents::increment(ProfileEvents::PSMReadBytes, bytes_read); + + if (unlikely(bytes_read != expected_bytes)) + throw DB::Exception("Not enough data in file " + path, ErrorCodes::FILE_SIZE_NOT_MATCH); +} + +} // namespace DB::PageUtil \ No newline at end of file diff --git a/dbms/src/Storages/Page/PageUtil.h b/dbms/src/Storages/Page/PageUtil.h new file mode 100644 index 00000000000..4e28f1005a6 --- /dev/null +++ b/dbms/src/Storages/Page/PageUtil.h @@ -0,0 +1,157 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +#include + +#ifndef __APPLE__ +#include +#endif + +#include +#include +#include + +#include + +namespace ProfileEvents +{ +extern const Event FileOpen; +extern const Event FileOpenFailed; +extern const Event Seek; +extern const Event PSMWritePages; +extern const Event PSMWriteCalls; +extern const Event PSMWriteIOCalls; +extern const Event PSMWriteBytes; +extern const Event PSMReadPages; +extern const Event PSMReadCalls; +extern const Event PSMReadIOCalls; +extern const Event PSMReadBytes; +extern const Event PSMWriteFailed; +extern const Event PSMReadFailed; +} // namespace ProfileEvents + +namespace CurrentMetrics +{ +extern const Metric Write; +extern const Metric Read; +} // namespace CurrentMetrics + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int UNKNOWN_FORMAT_VERSION; +extern const int CHECKSUM_DOESNT_MATCH; +extern const int FILE_DOESNT_EXIST; +extern const int CANNOT_OPEN_FILE; +extern const int CANNOT_FSYNC; +extern const int CANNOT_WRITE_TO_FILE_DESCRIPTOR; +extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR; +extern const int CANNOT_SEEK_THROUGH_FILE; +extern const int FILE_SIZE_NOT_MATCH; +} // namespace ErrorCodes + +namespace PageUtil +{ +// ========================================================= +// Helper functions +// ========================================================= + +template +int openFile(const std::string & path) +{ + ProfileEvents::increment(ProfileEvents::FileOpen); + + int flags; + if constexpr (read) + { + flags = O_RDONLY; + } + else + { + flags = O_WRONLY | O_CREAT; + } + + int fd = ::open(path.c_str(), flags, 0666); + if (-1 == fd) + { + ProfileEvents::increment(ProfileEvents::FileOpenFailed); + if constexpr (!must_exist) + { + if (errno == ENOENT) + { + return 0; + } + } + DB::throwFromErrno("Cannot open file " + path, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE); + } + + return fd; +} + +void syncFile(int fd, const std::string & path); + +void writeFile(int fd, UInt64 offset, const char * data, size_t to_write, const std::string & path); + +void readFile(int fd, const off_t offset, const char * buf, size_t expected_bytes, const std::string & path); + +/// Write and advance sizeof(T) bytes. +template +inline void put(char *& pos, const T & v) +{ + std::memcpy(pos, reinterpret_cast(&v), sizeof(T)); + pos += sizeof(T); +} + +/// Read and advance sizeof(T) bytes. +template +inline T get(std::conditional_t pos) +{ + T v; + std::memcpy(reinterpret_cast(&v), pos, sizeof(T)); + if constexpr (advance) + pos += sizeof(T); + return v; +} + +template +std::unique_ptr readValuesFromFile(const std::string & path, Allocator & allocator) +{ + Poco::File file(path); + if (!file.exists()) + return {}; + + size_t file_size = file.getSize(); + int file_fd = openFile(path); + char * data = (char *)allocator.alloc(file_size); + SCOPE_EXIT({ allocator.free(data, file_size); }); + char * pos = data; + + readFile(file_fd, 0, data, file_size, path); + + auto size = get(pos); + std::unique_ptr values = std::make_unique(); + for (size_t i = 0; i < size; ++i) + { + T v = get(pos); + values->push_back(v); + } + + if (unlikely(pos != data + file_size)) + throw DB::Exception("pos not match", ErrorCodes::FILE_SIZE_NOT_MATCH); + + return values; +} + +} // namespace PageUtil + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/Page/tests/gtest_page_util.cpp b/dbms/src/Storages/Page/tests/gtest_page_util.cpp new file mode 100644 index 00000000000..8c1de9b045f --- /dev/null +++ b/dbms/src/Storages/Page/tests/gtest_page_util.cpp @@ -0,0 +1,38 @@ +#include "gtest/gtest.h" + +#include + +#include + +namespace DB +{ +namespace tests +{ +static const std::string FileName = "page_util_test"; + +TEST(PageUtils_test, ReadWriteFile) +{ + ::remove(FileName.c_str()); + + int fd = PageUtil::openFile(FileName); + PageUtil::writeFile(fd, 0, "123", 3, FileName); + PageUtil::syncFile(fd, FileName); + ::close(fd); + + int fd2 = PageUtil::openFile(FileName); + ASSERT_GT(fd2, 0); + ::close(fd2); + + ::remove(FileName.c_str()); +} + +TEST(PageUtils_test, FileNotExists) +{ + ::remove(FileName.c_str()); + + int fd = PageUtil::openFile(FileName); + ASSERT_EQ(fd, 0); +} + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 1223cf46b8f..21d86bb48bd 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -31,8 +31,6 @@ extern const int DIRECTORY_ALREADY_EXISTS; using namespace DM; -constexpr bool TEST_SPLIT = false; - StorageDeltaMerge::StorageDeltaMerge(const std::string & path_, const std::string & name_, const OptionTableInfoConstRef table_info_, @@ -270,56 +268,8 @@ BlockInputStreams StorageDeltaMerge::read( // HandleRanges ranges; - if (TEST_SPLIT) - { - /// TODO Those code is used to test range read, should be removed later - - size_t rate_base = 1000000; - Handle start = N_INF_HANDLE; - { - auto streams = store->readRaw(context, context.getSettingsRef(), {store->getHandle()}, 1); - auto stream = streams[0]; - stream->readPrefix(); - - while (true) - { - Block block = stream->read(); - if (!block) - break; - auto & handle_data = DB::DM::getColumnVectorData(block, 0); - for (size_t i = 0; i < handle_data.size(); ++i) - { - if ((std::abs(random()) % rate_base) == 0) - { - ranges.emplace_back(start, handle_data[i]); - start = handle_data[i]; - } - } - } - - ranges.emplace_back(start, DB::DM::P_INF_HANDLE); - - stream->readSuffix(); - } - LOG_DEBUG(log, "Random split ranges: " + DB::toString(ranges.size())); - - Handle prev = N_INF_HANDLE; - for (auto & r : ranges) - { - if (r.start != prev) - LOG_DEBUG(log, "illegal, expected " + DB::toString(prev) + ". got " + DB::toString(r.start)); - - prev = r.end; - - if (r.start == r.end) - LOG_DEBUG(log, "range start and end are identical"); - } - } - else - { - ranges.emplace_back(DB::DM::HandleRange::newAll()); - } + ranges.emplace_back(DB::DM::HandleRange::newAll()); const ASTSelectQuery & select_query = typeid_cast(*query_info.query); if (select_query.raw_for_mutable)