From d9a49e54f124d36cf5b509327f31fc93cb5cac06 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Tue, 21 May 2024 16:15:39 +0800 Subject: [PATCH] Storage: Separate DMFile (#195) Signed-off-by: Lloyd-Pottiger Co-authored-by: Wenxuan --- contrib/libunwind-cmake/CMakeLists.txt | 2 +- .../Flash/Disaggregated/MockS3LockClient.h | 2 +- .../src/Flash/Disaggregated/S3LockService.cpp | 2 +- .../tests/gtest_s3_lock_service.cpp | 4 +- dbms/src/Server/DTTool/DTToolInspect.cpp | 2 +- dbms/src/Server/DTTool/DTToolMigrate.cpp | 18 +- dbms/src/Server/tests/gtest_dttool.cpp | 2 +- .../DeltaMerge/ColumnFile/ColumnFileBig.cpp | 6 +- .../DeltaMerge/Delta/DeltaValueSpace.cpp | 2 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 2 +- .../DeltaMerge/DeltaMergeStore_Ingest.cpp | 10 +- .../DeltaMerge/DeltaMergeStore_InternalBg.cpp | 4 +- dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 969 +----------------- dbms/src/Storages/DeltaMerge/File/DMFile.h | 391 ++----- .../File/DMFileBlockInputStream.cpp | 7 +- .../Storages/DeltaMerge/File/DMFileMeta.cpp | 438 ++++++++ .../src/Storages/DeltaMerge/File/DMFileMeta.h | 258 +++++ .../Storages/DeltaMerge/File/DMFileMetaV2.cpp | 413 ++++++++ .../Storages/DeltaMerge/File/DMFileMetaV2.h | 111 ++ .../DeltaMerge/File/DMFilePackFilter.h | 30 +- .../Storages/DeltaMerge/File/DMFileReader.cpp | 49 +- .../Storages/DeltaMerge/File/DMFileReader.h | 2 +- .../Storages/DeltaMerge/File/DMFileUtil.cpp | 58 ++ .../src/Storages/DeltaMerge/File/DMFileUtil.h | 54 + .../Storages/DeltaMerge/File/DMFileWriter.cpp | 109 +- .../Storages/DeltaMerge/File/DMFileWriter.h | 17 +- .../File/VectorColumnFromIndexReader.cpp | 4 +- .../File/VectorColumnFromIndexReader.h | 7 +- .../DeltaMerge/Remote/DataStore/DataStore.h | 4 +- .../Remote/DataStore/DataStoreS3.cpp | 10 +- .../DeltaMerge/Remote/DataStore/DataStoreS3.h | 2 +- .../Storages/DeltaMerge/Remote/Serializer.cpp | 4 +- dbms/src/Storages/DeltaMerge/Segment.cpp | 8 +- .../Storages/DeltaMerge/StableValueSpace.cpp | 8 +- ...est_dm_delta_merge_store_fast_add_peer.cpp | 2 +- .../gtest_dm_delta_merge_store_test_basic.h | 4 + .../DeltaMerge/tests/gtest_dm_file.cpp | 14 +- .../DeltaMerge/tests/gtest_dm_segment.cpp | 12 +- .../tests/gtest_dm_vector_index.cpp | 2 +- .../DeltaMerge/tests/gtest_segment_reader.cpp | 2 +- .../tests/gtest_segment_test_basic.cpp | 4 +- .../tests/gtest_sst_files_stream.cpp | 4 +- .../V3/Universal/tests/gtest_checkpoint.cpp | 2 +- .../Universal/tests/gtest_lock_local_mgr.cpp | 4 +- dbms/src/Storages/S3/FileCache.cpp | 4 +- dbms/src/Storages/S3/FileCache.h | 3 +- dbms/src/Storages/S3/tests/gtest_s3file.cpp | 9 +- 47 files changed, 1635 insertions(+), 1440 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileMeta.h create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.cpp create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileMetaV2.h create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileUtil.cpp create mode 100644 dbms/src/Storages/DeltaMerge/File/DMFileUtil.h diff --git a/contrib/libunwind-cmake/CMakeLists.txt b/contrib/libunwind-cmake/CMakeLists.txt index 524a7206d16..6c469ca7591 100644 --- a/contrib/libunwind-cmake/CMakeLists.txt +++ b/contrib/libunwind-cmake/CMakeLists.txt @@ -152,5 +152,5 @@ target_include_directories (unwind PUBLIC ${LIBUNWIND_SOURCE_DIR}/include) target_include_directories (unwind PRIVATE ${LIBUNWIND_SOURCE_DIR}/include/tdep) target_include_directories (unwind PRIVATE ${LIBUNWIND_SOURCE_DIR}/src) target_include_directories (unwind PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/include) -target_compile_options (unwind PRIVATE "-Wno-single-bit-bitfield-constant-conversion") +target_compile_options (unwind PRIVATE "-Wno-bitfield-constant-conversion") target_compile_options (unwind PRIVATE "-Wno-absolute-value") diff --git a/dbms/src/Flash/Disaggregated/MockS3LockClient.h b/dbms/src/Flash/Disaggregated/MockS3LockClient.h index aad835bbee6..a2f754f6582 100644 --- a/dbms/src/Flash/Disaggregated/MockS3LockClient.h +++ b/dbms/src/Flash/Disaggregated/MockS3LockClient.h @@ -43,7 +43,7 @@ class MockS3LockClient : public IS3LockClient // If the data file exist and no delmark exist, then create a lock file on `data_file_key` auto view = S3FilenameView::fromKey(data_file_key); auto object_key - = view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFile::metav2FileName()) : data_file_key; + = view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key; if (!objectExists(*s3_client, object_key)) { return {false, ""}; diff --git a/dbms/src/Flash/Disaggregated/S3LockService.cpp b/dbms/src/Flash/Disaggregated/S3LockService.cpp index d894345f2cc..85524bdadea 100644 --- a/dbms/src/Flash/Disaggregated/S3LockService.cpp +++ b/dbms/src/Flash/Disaggregated/S3LockService.cpp @@ -206,7 +206,7 @@ bool S3LockService::tryAddLockImpl( auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); // make sure data file exists auto object_key - = key_view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFile::metav2FileName()) : data_file_key; + = key_view.isDMFile() ? fmt::format("{}/{}", data_file_key, DM::DMFileMetaV2::metaFileName()) : data_file_key; if (!DB::S3::objectExists(*s3_client, object_key)) { auto * e = response->mutable_result()->mutable_conflict(); diff --git a/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp index a79a1c6e03a..c4f6176e93d 100644 --- a/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp +++ b/dbms/src/Flash/Disaggregated/tests/gtest_s3_lock_service.cpp @@ -68,7 +68,7 @@ class S3LockServiceTest : public DB::base::TiFlashStorageTestBasic DMFileOID{.store_id = store_id, .table_id = physical_table_id, .file_id = dm_file_id}); DB::S3::uploadEmptyFile( *s3_client, - fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFile::metav2FileName())); + fmt::format("{}/{}", data_filename.toFullKey(), DM::DMFileMetaV2::metaFileName())); ++dm_file_id; } } @@ -110,7 +110,7 @@ class S3LockServiceTest : public DB::base::TiFlashStorageTestBasic #define CHECK_S3_ENABLED \ if (!is_s3_test_enabled) \ { \ - const auto * t = ::testing::UnitTest::GetInstance()->current_test_info(); \ + const auto * t = ::testing::UnitTest::GetInstance() -> current_test_info(); \ LOG_INFO(log, "{}.{} is skipped because S3ClientFactory is not inited.", t->test_case_name(), t->name()); \ return; \ } diff --git a/dbms/src/Server/DTTool/DTToolInspect.cpp b/dbms/src/Server/DTTool/DTToolInspect.cpp index f866f531cdd..b399b67c6e4 100644 --- a/dbms/src/Server/DTTool/DTToolInspect.cpp +++ b/dbms/src/Server/DTTool/DTToolInspect.cpp @@ -46,7 +46,7 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args) // Open the DMFile at `workdir/dmf_` auto fp = context.getFileProvider(); - auto dmfile = DB::DM::DMFile::restore(fp, args.file_id, 0, args.workdir, DB::DM::DMFile::ReadMetaMode::all()); + auto dmfile = DB::DM::DMFile::restore(fp, args.file_id, 0, args.workdir, DB::DM::DMFileMeta::ReadMode::all()); LOG_INFO(logger, "bytes on disk: {}", dmfile->getBytesOnDisk()); diff --git a/dbms/src/Server/DTTool/DTToolMigrate.cpp b/dbms/src/Server/DTTool/DTToolMigrate.cpp index 53b5ec22ed9..ee08cc45321 100644 --- a/dbms/src/Server/DTTool/DTToolMigrate.cpp +++ b/dbms/src/Server/DTTool/DTToolMigrate.cpp @@ -32,16 +32,16 @@ bool isIgnoredInMigration(const DB::DM::DMFile & file, const std::string & targe UNUSED(file); return target == "NGC"; // this is not exported } -bool needFrameMigration(const DB::DM::DMFile & file, const std::string & target) +bool needFrameMigration(const DB::DM::DMFile & /*file*/, const std::string & target) { return endsWith(target, ".mrk") || endsWith(target, ".dat") || endsWith(target, ".idx") - || endsWith(target, ".merged") || file.packStatFileName() == target; + || endsWith(target, ".merged") || DB::DM::DMFileMeta::packStatFileName() == target; } bool isRecognizable(const DB::DM::DMFile & file, const std::string & target) { - return file.metaFileName() == target || file.configurationFileName() == target - || file.packPropertyFileName() == target || needFrameMigration(file, target) - || isIgnoredInMigration(file, target) || file.metav2FileName() == target; + return DB::DM::DMFileMeta::metaFileName() == target || DB::DM::DMFileMeta::configurationFileName() == target + || DB::DM::DMFileMeta::packPropertyFileName() == target || needFrameMigration(file, target) + || isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::metaFileName() == target; } namespace bpo = boost::program_options; @@ -194,7 +194,7 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args) args.file_id, 0, args.workdir, - DB::DM::DMFile::ReadMetaMode::all()); + DB::DM::DMFileMeta::ReadMode::all()); auto source_version = 0; if (src_file->useMetaV2()) { @@ -239,8 +239,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args) input_stream->readPrefix(); if (!args.dry_mode) output_stream.writePrefix(); - auto stat_iter = src_file->pack_stats.begin(); - auto properties_iter = src_file->pack_properties.property().begin(); + auto stat_iter = src_file->getPackStats().begin(); + auto properties_iter = src_file->getPackProperties().property().begin(); size_t counter = 0; // iterate all blocks and rewrite them to new dtfile while (auto block = input_stream->read()) @@ -271,7 +271,7 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args) args.file_id, 1, keeper.migration_temp_dir.path(), - DB::DM::DMFile::ReadMetaMode::all()); + DB::DM::DMFileMeta::ReadMode::all()); } } LOG_INFO(logger, "migration finished"); diff --git a/dbms/src/Server/tests/gtest_dttool.cpp b/dbms/src/Server/tests/gtest_dttool.cpp index 6af026c25ee..6797d1330a7 100644 --- a/dbms/src/Server/tests/gtest_dttool.cpp +++ b/dbms/src/Server/tests/gtest_dttool.cpp @@ -318,7 +318,7 @@ TEST_F(DTToolTest, BlockwiseInvariant) 1, 0, getTemporaryPath(), - DB::DM::DMFile::ReadMetaMode::all()); + DB::DM::DMFileMeta::ReadMode::all()); if (version == 2) { EXPECT_EQ(refreshed_file->getConfiguration()->getChecksumFrameLength(), frame_size); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index eac4c7c52fa..947206766d4 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -106,7 +106,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata( const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)); auto file_oid = lock_key_view.asDataFile().getDMFileOID(); auto prepared = remote_data_store->prepareDMFile(file_oid, file_page_id); - dmfile = prepared->restore(DMFile::ReadMetaMode::all()); + dmfile = prepared->restore(DMFileMeta::ReadMode::all()); // gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here path_delegate.addRemoteDTFileIfNotExists(local_external_id, dmfile->getBytesOnDisk()); } @@ -119,7 +119,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata( file_id, file_page_id, file_parent_path, - DMFile::ReadMetaMode::all()); + DMFileMeta::ReadMode::all()); auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk()); RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path()); } @@ -159,7 +159,7 @@ ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint( wbs.data.putRemoteExternal(new_local_page_id, loc); auto remote_data_store = context.db_context.getSharedContextDisagg()->remote_data_store; auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id); - auto dmfile = prepared->restore(DMFile::ReadMetaMode::all()); + auto dmfile = prepared->restore(DMFileMeta::ReadMode::all()); wbs.writeLogAndData(); // new_local_page_id is already applied to PageDirectory so we can safely call addRemoteDTFileIfNotExists here delegator.addRemoteDTFileIfNotExists(new_local_page_id, dmfile->getBytesOnDisk()); diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index 42f1d253ec0..90f524f8d8e 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -158,7 +158,7 @@ std::vector CloneColumnFilesHelper::clone( file_id, /* page_id= */ new_page_id, file_parent_path, - DMFile::ReadMetaMode::all()); + DMFileMeta::ReadMode::all()); auto new_column_file = f->cloneWith(context, new_file, target_range); cloned.push_back(new_column_file); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 5e96463e6b5..24cfa05d0a7 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1969,7 +1969,7 @@ void DeltaMergeStore::restoreStableFilesFromLocal() const void DeltaMergeStore::removeLocalStableFilesIfDisagg() const { listLocalStableFiles([](UInt64 file_id, const String & root_path) { - auto path = DMFile::getPathByStatus(root_path, file_id, DMFile::Status::READABLE); + auto path = getPathByStatus(root_path, file_id, DMFileStatus::READABLE); Poco::File file(path); if (file.exists()) { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index 0f17cc047c9..08af6a96a2f 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -120,7 +120,7 @@ void DeltaMergeStore::cleanPreIngestFiles( f.id, f.id, file_parent_path, - DM::DMFile::ReadMetaMode::memoryAndDiskSize()); + DM::DMFileMeta::ReadMode::memoryAndDiskSize()); removePreIngestFile(f.id, false); file->remove(file_provider); } @@ -182,7 +182,7 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile( auto page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); auto ref_file - = DMFile::restore(file_provider, file_id, page_id, file_parent_path, DMFile::ReadMetaMode::all()); + = DMFile::restore(file_provider, file_id, page_id, file_parent_path, DMFileMeta::ReadMode::all()); data_files.emplace_back(std::move(ref_file)); wbs.data.putRefPage(page_id, file->pageId()); } @@ -464,7 +464,7 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit( file->fileId(), new_page_id, file->parentPath(), - DMFile::ReadMetaMode::all()); + DMFileMeta::ReadMode::all()); wbs.data.putRefPage(new_page_id, file->pageId()); // We have to commit those file_ids to PageStorage before applying the ingest, because after the write @@ -653,7 +653,7 @@ UInt64 DeltaMergeStore::ingestFiles( external_file.id, external_file.id, file_parent_path, - DMFile::ReadMetaMode::memoryAndDiskSize()); + DMFileMeta::ReadMode::memoryAndDiskSize()); } else { @@ -663,7 +663,7 @@ UInt64 DeltaMergeStore::ingestFiles( .table_id = dm_context->physical_table_id, .file_id = external_file.id}; file = remote_data_store->prepareDMFile(oid, external_file.id) - ->restore(DMFile::ReadMetaMode::memoryAndDiskSize()); + ->restore(DMFileMeta::ReadMode::memoryAndDiskSize()); } rows += file->getRows(); bytes += file->getBytes(); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index 9bdcd4900e7..3be37e98633 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -129,7 +129,7 @@ class LocalDMFileGcRemover final continue; // Note that page_id is useless here. - auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none()); + auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFileMeta::ReadMode::none()); if (unlikely(!dmfile)) { // If the dtfile directory is not exist, it means `StoragePathPool::drop` have been @@ -145,7 +145,7 @@ class LocalDMFileGcRemover final LOG_INFO( logger, "GC try remove useless DM file, but file not found and may have been removed, dmfile={}", - DMFile::getPathByStatus(path, id, DMFile::Status::READABLE)); + getPathByStatus(path, id, DMFileStatus::READABLE)); continue; // next file } else if (dmfile->canGC()) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index efdc090057e..73ef59c30c0 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -13,21 +13,10 @@ // limitations under the License. #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include #include -#include #include #include #include @@ -36,8 +25,6 @@ #include #include -#include -#include #include #include @@ -46,9 +33,7 @@ namespace DB namespace ErrorCodes { extern const int CANNOT_READ_ALL_DATA; -extern const int CORRUPTED_DATA; extern const int INCORRECT_DATA; -extern const int BAD_ARGUMENTS; } // namespace ErrorCodes namespace FailPoints @@ -60,59 +45,15 @@ extern const char force_use_dmfile_format_v3[]; namespace DM { -namespace details -{ -inline constexpr static const char * NGC_FILE_NAME = "NGC"; -inline constexpr static const char * FOLDER_PREFIX_WRITABLE = ".tmp.dmf_"; -inline constexpr static const char * FOLDER_PREFIX_READABLE = "dmf_"; -inline constexpr static const char * FOLDER_PREFIX_DROPPED = ".del.dmf_"; -inline constexpr static const char * DATA_FILE_SUFFIX = ".dat"; -inline constexpr static const char * INDEX_FILE_SUFFIX = ".idx"; -inline constexpr static const char * MARK_FILE_SUFFIX = ".mrk"; - -inline String getNGCPath(const String & prefix) -{ - return prefix + "/" + NGC_FILE_NAME; -} -} // namespace details - -// Some static helper functions - -String DMFile::getPathByStatus(const String & parent_path, UInt64 file_id, DMFile::Status status) -{ - String s = parent_path + "/"; - switch (status) - { - case DMFile::Status::READABLE: - s += details::FOLDER_PREFIX_READABLE; - break; - case DMFile::Status::WRITABLE: - case DMFile::Status::WRITING: - s += details::FOLDER_PREFIX_WRITABLE; - break; - case DMFile::Status::DROPPED: - s += details::FOLDER_PREFIX_DROPPED; - break; - } - s += DB::toString(file_id); - return s; -} - -String DMFile::getNGCPath(const String & parent_path, UInt64 file_id, DMFile::Status status) -{ - return details::getNGCPath(getPathByStatus(parent_path, file_id, status)); -} - -// String DMFile::path() const { - return getPathByStatus(parent_path, file_id, status); + return getPathByStatus(parentPath(), fileId(), getStatus()); } String DMFile::ngcPath() const { - return getNGCPath(parent_path, file_id, status); + return getNGCPath(parentPath(), fileId(), getStatus()); } DMFilePtr DMFile::create( @@ -139,7 +80,7 @@ DMFilePtr DMFile::create( file_id, file_id, parent_path, - Status::WRITABLE, + DMFileStatus::WRITABLE, small_file_size_threshold, merged_file_max_size, std::move(configuration), @@ -168,30 +109,42 @@ DMFilePtr DMFile::restore( UInt64 file_id, UInt64 page_id, const String & parent_path, - const ReadMetaMode & read_meta_mode) + const DMFileMeta::ReadMode & read_meta_mode) { auto is_s3_file = S3::S3FilenameView::fromKeyWithPrefix(parent_path).isDataFile(); if (!is_s3_file) { // Unrecognized xx:// protocol. RUNTIME_CHECK_MSG(parent_path.find("://") == std::string::npos, "Unsupported protocol in path {}", parent_path); - String path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE); + String path = getPathByStatus(parent_path, file_id, DMFileStatus::READABLE); // The path may be dropped by another thread in some cases auto poco_file = Poco::File(path); if (!poco_file.exists()) return nullptr; } - DMFilePtr dmfile(new DMFile(file_id, page_id, parent_path, Status::READABLE)); + DMFilePtr dmfile(new DMFile(file_id, page_id, parent_path, DMFileStatus::READABLE)); if (is_s3_file || Poco::File(dmfile->metav2Path()).exists()) { - auto s = dmfile->readMetaV2(file_provider); - dmfile->parseMetaV2(std::string_view(s.data(), s.size())); + dmfile->meta = std::make_unique( + file_id, + parent_path, + DMFileStatus::READABLE, + 128 * 1024, + 16 * 1024 * 1024, + std::nullopt, + STORAGE_FORMAT_CURRENT.dm_file); + dmfile->meta->read(file_provider, read_meta_mode); } else if (!read_meta_mode.isNone()) { - dmfile->readConfiguration(file_provider); - dmfile->readMetadata(file_provider, read_meta_mode); + dmfile->meta = std::make_unique( + file_id, + parent_path, + DMFileStatus::READABLE, + std::nullopt, + STORAGE_FORMAT_CURRENT.dm_file); + dmfile->meta->read(file_provider, read_meta_mode); } return dmfile; } @@ -210,20 +163,20 @@ bool DMFile::isColIndexExist(const ColId & col_id) const { if (useMetaV2()) { - auto itr = column_stats.find(col_id); - return itr != column_stats.end() && itr->second.index_bytes > 0; + auto itr = meta->column_stats.find(col_id); + return itr != meta->column_stats.end() && itr->second.index_bytes > 0; } else { - return column_indices.count(col_id) != 0; + return meta->column_indices.count(col_id) != 0; } } -size_t DMFile::colIndexSize(ColId id) +size_t DMFile::colIndexSize(ColId id) const { if (useMetaV2()) { - if (auto itr = column_stats.find(id); itr != column_stats.end() && itr->second.index_bytes > 0) + if (auto itr = meta->column_stats.find(id); itr != meta->column_stats.end() && itr->second.index_bytes > 0) { return itr->second.index_bytes; } @@ -239,12 +192,12 @@ size_t DMFile::colIndexSize(ColId id) } // Only used when metav2 is not enabled, clean it up -size_t DMFile::colDataSize(ColId id, ColDataType type) +size_t DMFile::colDataSize(ColId id, ColDataType type) const { if (useMetaV2()) { - auto itr = column_stats.find(id); - RUNTIME_CHECK_MSG(itr != column_stats.end(), "Data of column not exist, col_id={} path={}", id, path()); + auto itr = meta->column_stats.find(id); + RUNTIME_CHECK_MSG(itr != meta->column_stats.end(), "Data of column not exist, col_id={} path={}", id, path()); switch (type) { case ColDataType::Elements: @@ -280,10 +233,9 @@ size_t DMFile::colDataSize(ColId id, ColDataType type) String DMFile::encryptionBasePath() const { - return getPathByStatus(parent_path, file_id, DMFile::Status::READABLE); + return getPathByStatus(parentPath(), fileId(), DMFileStatus::READABLE); } - EncryptionPath DMFile::encryptionDataPath(const FileNameBase & file_name_base) const { return EncryptionPath(encryptionBasePath(), file_name_base + details::DATA_FILE_SUFFIX); @@ -299,386 +251,26 @@ EncryptionPath DMFile::encryptionMarkPath(const FileNameBase & file_name_base) c return EncryptionPath(encryptionBasePath(), file_name_base + details::MARK_FILE_SUFFIX); } -EncryptionPath DMFile::encryptionMetaPath() const -{ - return EncryptionPath(encryptionBasePath(), metaFileName()); -} - -EncryptionPath DMFile::encryptionPackStatPath() const -{ - return EncryptionPath(encryptionBasePath(), packStatFileName()); -} - -EncryptionPath DMFile::encryptionPackPropertyPath() const -{ - return EncryptionPath(encryptionBasePath(), packPropertyFileName()); -} - -EncryptionPath DMFile::encryptionConfigurationPath() const -{ - return EncryptionPath(encryptionBasePath(), configurationFileName()); -} - -EncryptionPath DMFile::encryptionMetav2Path() const -{ - return EncryptionPath(encryptionBasePath(), metav2FileName()); -} - -EncryptionPath DMFile::encryptionMergedPath(UInt32 number) const -{ - return EncryptionPath(encryptionBasePath(), mergedFilename(number)); -} - -String DMFile::colDataFileName(const FileNameBase & file_name_base) -{ - return file_name_base + details::DATA_FILE_SUFFIX; -} -String DMFile::colIndexFileName(const FileNameBase & file_name_base) -{ - return file_name_base + details::INDEX_FILE_SUFFIX; -} -String DMFile::colMarkFileName(const FileNameBase & file_name_base) -{ - return file_name_base + details::MARK_FILE_SUFFIX; -} - -DMFile::OffsetAndSize DMFile::writeMetaToBuffer(WriteBuffer & buffer) -{ - size_t meta_offset = buffer.count(); - writeString("DTFile format: ", buffer); - writeIntText(configuration ? DMFileFormat::V2 : DMFileFormat::V1, buffer); - writeString("\n", buffer); - writeText(column_stats, STORAGE_FORMAT_CURRENT.dm_file, buffer); - size_t meta_size = buffer.count() - meta_offset; - return std::make_tuple(meta_offset, meta_size); -} - -DMFile::OffsetAndSize DMFile::writePackStatToBuffer(WriteBuffer & buffer) -{ - size_t pack_offset = buffer.count(); - for (auto & stat : pack_stats) - { - writePODBinary(stat, buffer); - } - size_t pack_size = buffer.count() - pack_offset; - return std::make_tuple(pack_offset, pack_size); -} - -DMFile::OffsetAndSize DMFile::writePackPropertyToBuffer(WriteBuffer & buffer, UnifiedDigestBase * digest) -{ - size_t offset = buffer.count(); - auto data = pack_properties.SerializeAsString(); - if (digest) - { - digest->update(data.data(), data.size()); - } - writeStringBinary(data, buffer); - size_t size = buffer.count() - offset; - return std::make_tuple(offset, size); -} - -void DMFile::writeMeta(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter) -{ - String meta_path = metaPath(); - String tmp_meta_path = meta_path + ".tmp"; - - { - WriteBufferFromFileProvider buf(file_provider, tmp_meta_path, encryptionMetaPath(), false, write_limiter, 4096); - if (configuration) - { - auto digest = configuration->createUnifiedDigest(); - auto tmp_buffer = WriteBufferFromOwnString{}; - writeMetaToBuffer(tmp_buffer); - auto serialized = tmp_buffer.releaseStr(); - digest->update(serialized.data(), serialized.length()); - configuration->addChecksum(metaFileName(), digest->raw()); - buf.write(serialized.data(), serialized.size()); - } - else - { - writeMetaToBuffer(buf); - } - buf.sync(); - } - Poco::File(tmp_meta_path).renameTo(meta_path); -} - -void DMFile::writePackProperty(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter) -{ - String property_path = packPropertyPath(); - String tmp_property_path = property_path + ".tmp"; - { - WriteBufferFromFileProvider - buf(file_provider, tmp_property_path, encryptionPackPropertyPath(), false, write_limiter, 4096); - if (configuration) - { - auto digest = configuration->createUnifiedDigest(); - writePackPropertyToBuffer(buf, digest.get()); - configuration->addChecksum(packPropertyFileName(), digest->raw()); - } - else - { - writePackPropertyToBuffer(buf); - } - buf.sync(); - } - Poco::File(tmp_property_path).renameTo(property_path); -} - - -void DMFile::writeConfiguration(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter) -{ - assert(configuration); - String config_path = configurationPath(); - String tmp_config_path = config_path + ".tmp"; - { - WriteBufferFromFileProvider buf( - file_provider, - tmp_config_path, - encryptionConfigurationPath(), - false, - write_limiter, - DBMS_DEFAULT_BUFFER_SIZE); - { - auto stream = OutputStreamWrapper{buf}; - stream << *configuration; - } - buf.sync(); - } - Poco::File(tmp_config_path).renameTo(config_path); -} - -void DMFile::writeMetadata(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter) -{ - writePackProperty(file_provider, write_limiter); - writeMeta(file_provider, write_limiter); - if (configuration) - { - writeConfiguration(file_provider, write_limiter); - } -} - -void DMFile::upgradeMetaIfNeed(const FileProviderPtr & file_provider, DMFileFormat::Version ver) -{ - if (unlikely(ver == DMFileFormat::V0)) - { - // Update ColumnStat.serialized_bytes - for (auto && c : column_stats) - { - auto col_id = c.first; - auto & stat = c.second; - c.second.type->enumerateStreams( - [col_id, &stat, this](const IDataType::SubstreamPath & substream) { - String stream_name = DMFile::getFileNameBase(col_id, substream); - String data_file = colDataPath(stream_name); - if (Poco::File f(data_file); f.exists()) - stat.serialized_bytes += f.getSize(); - String mark_file = colDataPath(stream_name); - if (Poco::File f(mark_file); f.exists()) - stat.serialized_bytes += f.getSize(); - String index_file = colIndexPath(stream_name); - if (Poco::File f(index_file); f.exists()) - stat.serialized_bytes += f.getSize(); - }, - {}); - } - // Update ColumnStat in meta. - writeMeta(file_provider, nullptr); - } -} - -void DMFile::readColumnStat(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info) -{ - const auto name = metaFileName(); - auto file_buf = openForRead(file_provider, metaPath(), encryptionMetaPath(), meta_pack_info.column_stat_size); - auto meta_buf = std::vector(meta_pack_info.column_stat_size); - auto meta_reader = ReadBufferFromMemory{meta_buf.data(), meta_buf.size()}; - ReadBuffer * buf = &file_buf; - file_buf.seek(meta_pack_info.column_stat_offset); - - // checksum examination - if (configuration) - { - auto location = configuration->getEmbeddedChecksum().find(name); - if (location != configuration->getEmbeddedChecksum().end()) - { - auto digest = configuration->createUnifiedDigest(); - file_buf.readBig(meta_buf.data(), meta_buf.size()); - digest->update(meta_buf.data(), meta_buf.size()); - if (unlikely(!digest->compareRaw(location->second))) - { - throw TiFlashException( - fmt::format("checksum mismatch for {}", metaPath()), - Errors::Checksum::DataCorruption); - } - buf = &meta_reader; - } - else - { - LOG_WARNING(log, "checksum for {} not found", name); - } - } - - DMFileFormat::Version ver; // Binary version - assertString("DTFile format: ", *buf); - DB::readText(ver, *buf); - assertString("\n", *buf); - readText(column_stats, ver, *buf); - - // for V2, we do not apply in-place upgrade for now - // but it should not affect the normal read procedure - if (unlikely(ver >= DMFileFormat::V2 && !configuration)) - { - throw TiFlashException("configuration expected but not loaded", Errors::Checksum::Missing); - } - upgradeMetaIfNeed(file_provider, ver); -} - -void DMFile::readPackStat(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info) +void DMFile::finalize() { - size_t packs = meta_pack_info.pack_stat_size / sizeof(PackStat); - pack_stats.resize(packs); - const auto path = packStatPath(); - if (configuration) - { - auto buf = createReadBufferFromFileBaseByFileProvider( - file_provider, - path, - encryptionPackStatPath(), - configuration->getChecksumFrameLength(), - nullptr, - configuration->getChecksumAlgorithm(), - configuration->getChecksumFrameLength()); - buf->seek(meta_pack_info.pack_stat_offset); - if (sizeof(PackStat) * packs - != buf->readBig(reinterpret_cast(pack_stats.data()), sizeof(PackStat) * packs)) - { - throw Exception("Cannot read all data", ErrorCodes::CANNOT_READ_ALL_DATA); - } - } - else - { - auto buf = openForRead(file_provider, path, encryptionPackStatPath(), meta_pack_info.pack_stat_size); - buf.seek(meta_pack_info.pack_stat_offset); - if (sizeof(PackStat) * packs - != buf.readBig(reinterpret_cast(pack_stats.data()), sizeof(PackStat) * packs)) - { - throw Exception("Cannot read all data", ErrorCodes::CANNOT_READ_ALL_DATA); - } - } -} - -void DMFile::readConfiguration(const FileProviderPtr & file_provider) -{ - if (Poco::File(configurationPath()).exists()) - { - auto file - = openForRead(file_provider, configurationPath(), encryptionConfigurationPath(), DBMS_DEFAULT_BUFFER_SIZE); - auto stream = InputStreamWrapper{file}; - configuration.emplace(stream); - version = DMFileFormat::V2; - } - else - { - configuration.reset(); - version = DMFileFormat::V1; - } -} - -void DMFile::readPackProperty(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info) -{ - String tmp_buf; - const auto name = packPropertyFileName(); - auto buf = openForRead( - file_provider, - packPropertyPath(), - encryptionPackPropertyPath(), - meta_pack_info.pack_property_size); - buf.seek(meta_pack_info.pack_property_offset); - - readStringBinary(tmp_buf, buf); - pack_properties.ParseFromString(tmp_buf); - - if (configuration) - { - auto location = configuration->getEmbeddedChecksum().find(name); - if (location != configuration->getEmbeddedChecksum().end()) - { - auto digest = configuration->createUnifiedDigest(); - const auto & target = location->second; - digest->update(tmp_buf.data(), tmp_buf.size()); - if (unlikely(!digest->compareRaw(target))) - { - throw TiFlashException( - fmt::format("checksum mismatch for {}", packPropertyPath()), - Errors::Checksum::DataCorruption); - } - } - else - { - LOG_WARNING(log, "checksum for {} not found", name); - } - } -} - -// Only used when metav2 is not enabled -void DMFile::readMetadata(const FileProviderPtr & file_provider, const ReadMetaMode & read_meta_mode) -{ - Footer footer; - - if (read_meta_mode.isAll()) - { - initializeIndices(); - } - if (auto file = Poco::File(packPropertyPath()); file.exists()) - footer.meta_pack_info.pack_property_size = file.getSize(); - - auto recheck = [&](size_t size) { - if (this->configuration) - { - auto total_size - = this->configuration->getChecksumFrameLength() + this->configuration->getChecksumHeaderLength(); - auto frame_count = size / total_size + (0 != size % total_size); - size -= frame_count * this->configuration->getChecksumHeaderLength(); - } - return size; - }; - - if (auto file = Poco::File(packPropertyPath()); file.exists()) - footer.meta_pack_info.pack_property_size = file.getSize(); - - footer.meta_pack_info.column_stat_size = Poco::File(metaPath()).getSize(); - footer.meta_pack_info.pack_stat_size = recheck(Poco::File(packStatPath()).getSize()); - - if (read_meta_mode.needPackProperty() && footer.meta_pack_info.pack_property_size != 0) - readPackProperty(file_provider, footer.meta_pack_info); - - if (read_meta_mode.needColumnStat()) - readColumnStat(file_provider, footer.meta_pack_info); - - if (read_meta_mode.needPackStat()) - readPackStat(file_provider, footer.meta_pack_info); -} - -void DMFile::finalizeForFolderMode(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter) -{ - if (STORAGE_FORMAT_CURRENT.dm_file >= DMFileFormat::V2 && !configuration) + if (STORAGE_FORMAT_CURRENT.dm_file >= DMFileFormat::V2 && !meta->configuration) { LOG_WARNING(log, "checksum disabled due to lack of configuration"); } - writeMetadata(file_provider, write_limiter); - if (unlikely(status != Status::WRITING)) - throw Exception("Expected WRITING status, now " + statusString(status)); + RUNTIME_CHECK_MSG( + getStatus() == DMFileStatus::WRITING, + "FileId={} Expected WRITING status, but {}", + fileId(), + magic_enum::enum_name(getStatus())); Poco::File old_file(path()); - setStatus(Status::READABLE); + setStatus(DMFileStatus::READABLE); auto new_path = path(); - Poco::File file(new_path); if (file.exists()) { LOG_WARNING(log, "Existing dmfile, removing: {}", new_path); - const String deleted_path = getPathByStatus(parent_path, file_id, Status::DROPPED); + const String deleted_path = getPathByStatus(parentPath(), fileId(), DMFileStatus::DROPPED); // no need to delete the encryption info associated with the dmfile path here. // because this dmfile path is still a valid path and no obsolete encryption info will be left. file.renameTo(deleted_path); @@ -686,7 +278,9 @@ void DMFile::finalizeForFolderMode(const FileProviderPtr & file_provider, const LOG_WARNING(log, "Existing dmfile, removed: {}", deleted_path); } old_file.renameTo(new_path); - initializeIndices(); + // MetaV2 column index may be merged into merged file, so column_indices is unused. + if (auto * metav1 = typeid_cast(meta.get()); metav1) + meta->initializeIndices(); } std::vector DMFile::listLocal(const String & parent_path) @@ -758,7 +352,7 @@ std::set DMFile::listAllInPath( continue; } UInt64 file_id = *res; - const String readable_path = getPathByStatus(parent_path, file_id, DMFile::Status::READABLE); + const String readable_path = getPathByStatus(parent_path, file_id, DMFileStatus::READABLE); file_provider->deleteEncryptionInfo(EncryptionPath(readable_path, ""), /* throw_on_error= */ false); const auto full_path = parent_path + "/" + name; if (Poco::File file(full_path); file.exists()) @@ -819,7 +413,7 @@ void DMFile::remove(const FileProviderPtr & file_provider) const String dir_path = path(); if (Poco::File dir_file(dir_path); dir_file.exists()) { - setStatus(Status::DROPPED); + setStatus(DMFileStatus::DROPPED); const String deleted_path = path(); // Rename the directory first (note that we should do it before deleting encryption info) dir_file.renameTo(deleted_path); @@ -831,318 +425,7 @@ void DMFile::remove(const FileProviderPtr & file_provider) } } -void DMFile::initializeIndices() -{ - auto decode = [](const StringRef & data) { - try - { - auto original = unescapeForFileName(data); - return std::stoll(original); - } - catch (const std::invalid_argument & err) - { - throw DB::Exception(fmt::format("invalid ColId: {} from file: {}", err.what(), data)); - } - catch (const std::out_of_range & err) - { - throw DB::Exception(fmt::format("invalid ColId: {} from file: {}", err.what(), data)); - } - }; - - Poco::File directory{path()}; - std::vector sub_files{}; - directory.list(sub_files); - for (const auto & name : sub_files) - { - if (endsWith(name, details::INDEX_FILE_SUFFIX)) - { - column_indices.insert( - decode(removeSuffix(name, strlen(details::INDEX_FILE_SUFFIX)))); // strip tailing `.idx` - } - } -} - -DMFile::MetaBlockHandle DMFile::writeSLPackStatToBuffer(WriteBuffer & buffer) -{ - auto offset = buffer.count(); - const char * data = reinterpret_cast(&pack_stats[0]); - size_t size = pack_stats.size() * sizeof(PackStat); - writeString(data, size, buffer); - return MetaBlockHandle{MetaBlockType::PackStat, offset, buffer.count() - offset}; -} - -DMFile::MetaBlockHandle DMFile::writeSLPackPropertyToBuffer(WriteBuffer & buffer) -{ - auto offset = buffer.count(); - for (const auto & pb : pack_properties.property()) - { - PackProperty tmp{pb}; - const char * data = reinterpret_cast(&tmp); - size_t size = sizeof(PackProperty); - writeString(data, size, buffer); - } - return MetaBlockHandle{MetaBlockType::PackProperty, offset, buffer.count() - offset}; -} - -DMFile::MetaBlockHandle DMFile::writeColumnStatToBuffer(WriteBuffer & buffer) -{ - auto offset = buffer.count(); - writeIntBinary(column_stats.size(), buffer); - for (const auto & [id, stat] : column_stats) - { - stat.serializeToBuffer(buffer); - } - return MetaBlockHandle{MetaBlockType::ColumnStat, offset, buffer.count() - offset}; -} - -DMFile::MetaBlockHandle DMFile::writeExtendColumnStatToBuffer(WriteBuffer & buffer) -{ - auto offset = buffer.count(); - dtpb::ColumnStats msg_stats; - for (const auto & [id, stat] : column_stats) - { - auto msg = stat.toProto(); - msg_stats.add_column_stats()->Swap(&msg); - } - String output; - msg_stats.SerializeToString(&output); - writeString(output.data(), output.length(), buffer); - return MetaBlockHandle{MetaBlockType::ExtendColumnStat, offset, buffer.count() - offset}; -} - -DMFile::MetaBlockHandle DMFile::writeMergedSubFilePosotionsToBuffer(WriteBuffer & buffer) -{ - auto offset = buffer.count(); - - writeIntBinary(merged_files.size(), buffer); - const auto * data = reinterpret_cast(&merged_files[0]); - auto bytes = merged_files.size() * sizeof(MergedFile); - writeString(data, bytes, buffer); - - writeIntBinary(merged_sub_file_infos.size(), buffer); - for (const auto & [fname, info] : merged_sub_file_infos) - { - info.serializeToBuffer(buffer); - } - return MetaBlockHandle{MetaBlockType::MergedSubFilePos, offset, buffer.count() - offset}; -} - -void DMFile::finalizeMetaV2(WriteBuffer & buffer) -{ - auto tmp_buffer = WriteBufferFromOwnString{}; - std::array meta_block_handles = { - // - writeSLPackStatToBuffer(tmp_buffer), - writeSLPackPropertyToBuffer(tmp_buffer), - writeColumnStatToBuffer(tmp_buffer), - writeExtendColumnStatToBuffer(tmp_buffer), - writeMergedSubFilePosotionsToBuffer(tmp_buffer), - }; - writePODBinary(meta_block_handles, tmp_buffer); - writeIntBinary(static_cast(meta_block_handles.size()), tmp_buffer); - writeIntBinary(version, tmp_buffer); - - // Write to file and do checksums. - auto s = tmp_buffer.releaseStr(); - writeString(s.data(), s.size(), buffer); - if (configuration) - { - auto digest = configuration->createUnifiedDigest(); - digest->update(s.data(), s.size()); - auto checksum_result = digest->raw(); - writeString(checksum_result.data(), checksum_result.size(), buffer); - } - - MetaFooter footer{}; - if (configuration) - { - footer.checksum_algorithm = static_cast(configuration->getChecksumAlgorithm()); - footer.checksum_frame_length = configuration->getChecksumFrameLength(); - } - writePODBinary(footer, buffer); -} - -std::vector DMFile::readMetaV2(const FileProviderPtr & file_provider) -{ - auto rbuf = openForRead(file_provider, metav2Path(), encryptionMetav2Path(), meta_buffer_size); - std::vector buf(meta_buffer_size); - size_t read_bytes = 0; - for (;;) - { - read_bytes += rbuf.readBig(buf.data() + read_bytes, meta_buffer_size); - if (likely(read_bytes < buf.size())) - { - break; - } - LOG_WARNING(log, "{}'s size is larger than {}", metav2Path(), buf.size()); - buf.resize(buf.size() + meta_buffer_size); - } - buf.resize(read_bytes); - return buf; -} - -void DMFile::parseMetaV2(std::string_view buffer) -{ - // MetaFooter - const auto * footer = reinterpret_cast(buffer.data() + buffer.size() - sizeof(MetaFooter)); - if (footer->checksum_algorithm != 0 && footer->checksum_frame_length != 0) - { - configuration = DMChecksumConfig{ - /*embedded_checksum*/ {}, - footer->checksum_frame_length, - static_cast(footer->checksum_algorithm)}; - } - else - { - configuration.reset(); - } - - const auto * ptr = reinterpret_cast(footer); - - // Checksum - if (configuration) - { - auto digest = configuration->createUnifiedDigest(); - auto hash_size = digest->hashSize(); - ptr = ptr - hash_size; - digest->update(buffer.data(), buffer.size() - sizeof(MetaFooter) - hash_size); - if (unlikely(!digest->compareRaw(ptr))) - { - LOG_ERROR(log, "{} checksum invalid", metav2Path()); - throw Exception(ErrorCodes::CORRUPTED_DATA, "{} checksum invalid", metav2Path()); - } - } - - ptr = ptr - sizeof(DMFileFormat::Version); - version = *(reinterpret_cast(ptr)); - - ptr = ptr - sizeof(UInt64); - auto meta_block_handle_count = *(reinterpret_cast(ptr)); - - for (UInt64 i = 0; i < meta_block_handle_count; ++i) - { - ptr = ptr - sizeof(MetaBlockHandle); - const auto * handle = reinterpret_cast(ptr); - // omit the default branch. If there are unknown MetaBlock (after in-place downgrade), just ignore and throw away - switch (handle->type) - { - case MetaBlockType::ColumnStat: // parse the `ColumnStat` from old version - parseColumnStat(buffer.substr(handle->offset, handle->size)); - break; - case MetaBlockType::ExtendColumnStat: - parseExtendColumnStat(buffer.substr(handle->offset, handle->size)); - break; - case MetaBlockType::PackProperty: - parsePackProperty(buffer.substr(handle->offset, handle->size)); - break; - case MetaBlockType::PackStat: - parsePackStat(buffer.substr(handle->offset, handle->size)); - break; - case MetaBlockType::MergedSubFilePos: - parseMergedSubFilePos(buffer.substr(handle->offset, handle->size)); - break; - } - } -} - -void DMFile::parseColumnStat(std::string_view buffer) -{ - ReadBufferFromString rbuf(buffer); - size_t count; - readIntBinary(count, rbuf); - column_stats.reserve(count); - for (size_t i = 0; i < count; ++i) - { - ColumnStat stat; - stat.parseFromBuffer(rbuf); - // Do not overwrite the ColumnStat if already exist, it may - // created by `ExteandColumnStat` - column_stats.emplace(stat.col_id, std::move(stat)); - } -} - -void DMFile::parseExtendColumnStat(std::string_view buffer) -{ - dtpb::ColumnStats msg_stats; - auto parse_ok = msg_stats.ParseFromArray(buffer.begin(), buffer.size()); - RUNTIME_CHECK_MSG(parse_ok, "Parse extend column stat fail! filename={}", path()); - column_stats.reserve(msg_stats.column_stats_size()); - for (int i = 0; i < msg_stats.column_stats_size(); ++i) - { - const auto & msg = msg_stats.column_stats(i); - ColumnStat stat; - stat.mergeFromProto(msg); - // replace the ColumnStat if exists - if (auto [iter, inserted] = column_stats.emplace(stat.col_id, stat); unlikely(!inserted)) - { - iter->second = stat; - } - } -} - -void DMFile::parseMergedSubFilePos(std::string_view buffer) -{ - ReadBufferFromString rbuf(buffer); - - UInt64 merged_files_count; - readIntBinary(merged_files_count, rbuf); - merged_files.resize(merged_files_count); - readString(reinterpret_cast(merged_files.data()), merged_files_count * sizeof(MergedFile), rbuf); - - UInt64 count; - readIntBinary(count, rbuf); - merged_sub_file_infos.reserve(count); - for (UInt64 i = 0; i < count; ++i) - { - auto t = MergedSubFileInfo::parseFromBuffer(rbuf); - auto fname = t.fname; - merged_sub_file_infos.emplace(std::move(fname), std::move(t)); - } -} - -void DMFile::parsePackProperty(std::string_view buffer) -{ - const auto * pp = reinterpret_cast(buffer.data()); - auto count = buffer.size() / sizeof(PackProperty); - pack_properties.mutable_property()->Reserve(count); - for (size_t i = 0; i < count; ++i) - { - pp[i].toProtobuf(pack_properties.add_property()); - } -} - -void DMFile::parsePackStat(std::string_view buffer) -{ - auto count = buffer.size() / sizeof(PackStat); - pack_stats.resize(count); - memcpy(reinterpret_cast(pack_stats.data()), buffer.data(), buffer.size()); -} - -void DMFile::finalizeDirName() -{ - RUNTIME_CHECK_MSG( - status == Status::WRITING, - "FileId={} Expected WRITING status, but {}", - file_id, - statusString(status)); - Poco::File old_file(path()); - setStatus(Status::READABLE); - auto new_path = path(); - Poco::File file(new_path); - if (file.exists()) - { - LOG_WARNING(log, "Existing dmfile, removing: {}", new_path); - const String deleted_path = getPathByStatus(parent_path, file_id, Status::DROPPED); - // no need to delete the encryption info associated with the dmfile path here. - // because this dmfile path is still a valid path and no obsolete encryption info will be left. - file.renameTo(deleted_path); - file.remove(true); - LOG_WARNING(log, "Existing dmfile, removed: {}", deleted_path); - } - old_file.renameTo(new_path); -} - -std::vector DMFile::listFilesForUpload() +std::vector DMFile::listFilesForUpload() const { RUNTIME_CHECK(useMetaV2()); std::vector fnames; @@ -1160,171 +443,15 @@ std::vector DMFile::listFilesForUpload() void DMFile::switchToRemote(const S3::DMFileOID & oid) { RUNTIME_CHECK(useMetaV2()); - RUNTIME_CHECK(status == Status::READABLE); + RUNTIME_CHECK(getStatus() == DMFileStatus::READABLE); auto local_path = path(); // Update the parent_path so that it will read data from remote storage. - parent_path = S3::S3Filename::fromTableID(oid.store_id, oid.keyspace_id, oid.table_id).toFullKeyWithPrefix(); + meta->parent_path = S3::S3Filename::fromTableID(oid.store_id, oid.keyspace_id, oid.table_id).toFullKeyWithPrefix(); // Remove local directory. std::filesystem::remove_all(local_path); } - -void DMFile::checkMergedFile( - MergedFileWriter & writer, - FileProviderPtr & file_provider, - WriteLimiterPtr & write_limiter) -{ - if (writer.file_info.size >= merged_file_max_size) - { - // finialize cur merged file - writer.buffer->sync(); - merged_files.push_back(writer.file_info); - auto cur_number = writer.file_info.number; - - // create a new merge file - writer.file_info.number = cur_number + 1; - writer.file_info.size = 0; - writer.buffer.reset(); - - writer.buffer = std::make_unique( - file_provider, - mergedPath(writer.file_info.number), - encryptionMergedPath(writer.file_info.number), - /*create_new_encryption_info*/ false, - write_limiter); - } -} - -// Merge the small files into a single file to avoid -// filesystem inodes exhausting -void DMFile::finalizeSmallFiles( - MergedFileWriter & writer, - FileProviderPtr & file_provider, - WriteLimiterPtr & write_limiter) -{ - auto copy_file_to_cur = [&](const String & fname, UInt64 fsize) { - checkMergedFile(writer, file_provider, write_limiter); - - auto read_file - = openForRead(file_provider, subFilePath(fname), EncryptionPath(encryptionBasePath(), fname), fsize); - std::vector read_buf(fsize); - auto read_size = read_file.readBig(read_buf.data(), read_buf.size()); - RUNTIME_CHECK(read_size == fsize, fname, read_size, fsize); - - writer.buffer->write(read_buf.data(), read_buf.size()); - merged_sub_file_infos.emplace( - fname, - MergedSubFileInfo( - fname, - writer.file_info.number, - /*offset*/ writer.file_info.size, - /*size*/ read_buf.size())); - writer.file_info.size += read_buf.size(); - }; - - std::vector delete_file_name; - for (const auto & [col_id, stat] : column_stats) - { - // check .data - if (stat.data_bytes <= small_file_size_threshold) - { - auto fname = colDataFileName(getFileNameBase(col_id, {})); - auto fsize = stat.data_bytes; - copy_file_to_cur(fname, fsize); - delete_file_name.emplace_back(std::move(fname)); - } - - // check .null.data - if (stat.type->isNullable() && stat.nullmap_data_bytes <= small_file_size_threshold) - { - auto fname = colDataFileName(getFileNameBase(col_id, {IDataType::Substream::NullMap})); - auto fsize = stat.nullmap_data_bytes; - copy_file_to_cur(fname, fsize); - delete_file_name.emplace_back(std::move(fname)); - } - - // check .size0.dat - if (stat.array_sizes_bytes > 0 && stat.array_sizes_bytes <= small_file_size_threshold) - { - auto fname = colDataFileName(getFileNameBase(col_id, {IDataType::Substream::ArraySizes})); - auto fsize = stat.array_sizes_bytes; - copy_file_to_cur(fname, fsize); - delete_file_name.emplace_back(std::move(fname)); - } - } - - writer.buffer->sync(); - merged_files.push_back(writer.file_info); - - for (auto & fname : delete_file_name) - { - std::filesystem::remove(subFilePath(fname)); - } -} - -UInt64 DMFile::getFileSize(ColId col_id, const String & filename) const -{ - auto itr = column_stats.find(col_id); - RUNTIME_CHECK(itr != column_stats.end(), col_id); - if (endsWith(filename, ".idx")) - { - return itr->second.index_bytes; - } - // Note that ".null.dat"/"null.mrk" must be check before ".dat"/".mrk" - else if (endsWith(filename, ".null.dat")) - { - return itr->second.nullmap_data_bytes; - } - else if (endsWith(filename, ".null.mrk")) - { - return itr->second.nullmap_mark_bytes; - } - // Note that ".size0.dat"/".size0.mrk" must be check before ".dat"/".mrk" - else if (endsWith(filename, ".size0.dat")) - { - return itr->second.array_sizes_bytes; - } - else if (endsWith(filename, ".size0.mrk")) - { - return itr->second.array_sizes_mark_bytes; - } - else if (endsWith(filename, ".dat")) - { - return itr->second.data_bytes; - } - else if (endsWith(filename, ".mrk")) - { - return itr->second.mark_bytes; - } - else - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknow filename={} col_id={}", filename, col_id); - } -} - -UInt64 DMFile::getReadFileSize(ColId col_id, const String & filename) const -{ - auto itr = merged_sub_file_infos.find(filename); - if (itr != merged_sub_file_infos.end()) - { - return getMergedFileSizeOfColumn(itr->second); - } - else - { - return getFileSize(col_id, filename); - } -} - -UInt64 DMFile::getMergedFileSizeOfColumn(const MergedSubFileInfo & file_info) const -{ - // Get filesize of merged file. - auto itr = std::find_if(merged_files.begin(), merged_files.end(), [&file_info](const auto & merged_file) { - return merged_file.number == file_info.number; - }); - RUNTIME_CHECK(itr != merged_files.end()); - return itr->size; -} } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index efa719478da..8270a7869a7 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -14,20 +14,15 @@ #pragma once -#include -#include -#include -#include #include -#include -#include #include -#include +#include #include #include #include #include #include + namespace DB::DM { class DMFile; @@ -55,169 +50,6 @@ class DMFile : private boost::noncopyable friend class DMFileWithVectorIndexBlockInputStream; public: - enum Status : int - { - WRITABLE, - WRITING, - READABLE, - DROPPED, - }; - - static String statusString(Status status) - { - switch (status) - { - case WRITABLE: - return "WRITABLE"; - case WRITING: - return "WRITING"; - case READABLE: - return "READABLE"; - case DROPPED: - return "DROPPED"; - default: - throw Exception("Unexpected status: " + DB::toString(static_cast(status))); - } - } - - struct ReadMetaMode - { - private: - static constexpr size_t READ_NONE = 0x00; - static constexpr size_t READ_COLUMN_STAT = 0x01; - static constexpr size_t READ_PACK_STAT = 0x02; - static constexpr size_t READ_PACK_PROPERTY = 0x04; - - size_t value; - - public: - explicit ReadMetaMode(size_t value_) - : value(value_) - {} - - static ReadMetaMode all() { return ReadMetaMode(READ_COLUMN_STAT | READ_PACK_STAT | READ_PACK_PROPERTY); } - static ReadMetaMode none() { return ReadMetaMode(READ_NONE); } - // after restore with mode, you can call `getBytesOnDisk` to get disk size of this DMFile - static ReadMetaMode diskSizeOnly() { return ReadMetaMode(READ_COLUMN_STAT); } - // after restore with mode, you can call `getRows`, `getBytes` to get memory size of this DMFile, - // and call `getBytesOnDisk` to get disk size of this DMFile - static ReadMetaMode memoryAndDiskSize() { return ReadMetaMode(READ_COLUMN_STAT | READ_PACK_STAT); } - - inline bool needColumnStat() const { return value & READ_COLUMN_STAT; } - inline bool needPackStat() const { return value & READ_PACK_STAT; } - inline bool needPackProperty() const { return value & READ_PACK_PROPERTY; } - - inline bool isNone() const { return value == READ_NONE; } - inline bool isAll() const { return needColumnStat() && needPackStat() && needPackProperty(); } - }; - - struct PackStat - { - UInt32 rows; - UInt32 not_clean; - UInt64 first_version; - UInt64 bytes; - UInt8 first_tag; - - String toDebugString() const - { - return fmt::format( - "rows={}, not_clean={}, first_version={}, bytes={}, first_tag={}", - rows, - not_clean, - first_version, - bytes, - first_tag); - } - }; - static_assert(std::is_standard_layout_v); - - struct PackProperty - { - // when gc_safe_point exceed this version, there must be some data obsolete in this pack - UInt64 gc_hint_version{}; - // effective rows(multiple versions of one row is count as one include delete) - UInt64 num_rows{}; - // the number of rows in this pack which are deleted - UInt64 deleted_rows{}; - - void toProtobuf(dtpb::PackProperty * p) const - { - p->set_gc_hint_version(gc_hint_version); - p->set_num_rows(num_rows); - p->set_deleted_rows(deleted_rows); - } - - void fromProtoBuf(const dtpb::PackProperty & p) - { - gc_hint_version = p.gc_hint_version(); - num_rows = p.num_rows(); - deleted_rows = p.deleted_rows(); - } - - explicit PackProperty(const dtpb::PackProperty & p) { fromProtoBuf(p); } - }; - static_assert(std::is_standard_layout_v); - - enum class MetaBlockType : UInt64 - { - PackStat = 0, - PackProperty, - ColumnStat, // Deprecated, use `ExtendColumnStat` instead - MergedSubFilePos, - ExtendColumnStat, - }; - struct MetaBlockHandle - { - MetaBlockType type; - UInt64 offset; - UInt64 size; - }; - static_assert(std::is_standard_layout_v && sizeof(MetaBlockHandle) == sizeof(UInt64) * 3); - - struct MetaFooter - { - UInt64 checksum_frame_length = 0; - UInt64 checksum_algorithm = 0; - }; - static_assert(std::is_standard_layout_v && sizeof(MetaFooter) == sizeof(UInt64) * 2); - - struct MetaPackInfo - { - UInt64 pack_property_offset; - UInt64 pack_property_size; - UInt64 column_stat_offset; - UInt64 column_stat_size; - UInt64 pack_stat_offset; - UInt64 pack_stat_size; - - MetaPackInfo() - : pack_property_offset(0) - , pack_property_size(0) - , column_stat_offset(0) - , column_stat_size(0) - , pack_stat_offset(0) - , pack_stat_size(0) - {} - }; - - struct Footer - { - MetaPackInfo meta_pack_info; - UInt64 sub_file_stat_offset; - UInt32 sub_file_num; - - Footer() - : sub_file_stat_offset(0) - , sub_file_num(0) - {} - }; - - using PackStats = PaddedPODArray; - // `PackProperties` is similar to `PackStats` except it uses protobuf to do serialization - using PackProperties = dtpb::PackProperties; - - // Normally, we use STORAGE_FORMAT_CURRENT to determine whether use meta v2. static DMFilePtr create( UInt64 file_id, @@ -232,7 +64,7 @@ class DMFile : private boost::noncopyable UInt64 file_id, UInt64 page_id, const String & parent_path, - const ReadMetaMode & read_meta_mode); + const DMFileMeta::ReadMode & read_meta_mode); struct ListOptions { @@ -248,27 +80,23 @@ class DMFile : private boost::noncopyable const String & parent_path, const ListOptions & options); - // static helper function for getting path - static String getPathByStatus(const String & parent_path, UInt64 file_id, DMFile::Status status); - static String getNGCPath(const String & parent_path, UInt64 file_id, DMFile::Status status); - bool canGC() const; void enableGC() const; void remove(const FileProviderPtr & file_provider); // The ID for locating DTFile on disk - UInt64 fileId() const { return file_id; } + UInt64 fileId() const { return meta->file_id; } // The PageID for locating this object in the StoragePool.data UInt64 pageId() const { return page_id; } String path() const; - const String & parentPath() const { return parent_path; } + const String & parentPath() const { return meta->parent_path; } size_t getRows() const { size_t rows = 0; - for (const auto & s : pack_stats) + for (const auto & s : meta->pack_stats) rows += s.rows; return rows; } @@ -276,7 +104,7 @@ class DMFile : private boost::noncopyable size_t getBytes() const { size_t bytes = 0; - for (const auto & s : pack_stats) + for (const auto & s : meta->pack_stats) bytes += s.bytes; return bytes; } @@ -286,24 +114,29 @@ class DMFile : private boost::noncopyable // This include column data & its index bytes in disk. // Not counting DMFile's meta and pack stat, they are usally small enough to ignore. size_t bytes = 0; - for (const auto & c : column_stats) + for (const auto & c : meta->column_stats) bytes += c.second.serialized_bytes; return bytes; } - size_t getPacks() const { return pack_stats.size(); } - const PackStats & getPackStats() const { return pack_stats; } - PackProperties & getPackProperties() { return pack_properties; } + size_t getPacks() const { return meta->pack_stats.size(); } + const DMFileMeta::PackStats & getPackStats() const { return meta->pack_stats; } + const DMFileMeta::PackProperties & getPackProperties() const { return meta->pack_properties; } + const ColumnStats & getColumnStats() const { return meta->column_stats; } + const std::unordered_set & getColumnIndices() const { return meta->column_indices; } + + // only used in gtest + void clearPackProperties() { meta->pack_properties.clear_property(); } const ColumnStat & getColumnStat(ColId col_id) const { - if (auto it = column_stats.find(col_id); likely(it != column_stats.end())) + if (auto it = meta->column_stats.find(col_id); likely(it != meta->column_stats.end())) { return it->second; } throw Exception("Column [" + DB::toString(col_id) + "] not found in dm file [" + path() + "]"); } - bool isColumnExist(ColId col_id) const { return column_stats.find(col_id) != column_stats.end(); } + bool isColumnExist(ColId col_id) const { return meta->column_stats.contains(col_id); } /* * TODO: This function is currently unused. We could use it when: @@ -316,26 +149,26 @@ class DMFile : private boost::noncopyable // + ", file size: " + DB::toString(getBytesOnDisk()) + "}"; // } - DMConfigurationOpt & getConfiguration() { return configuration; } + const DMConfigurationOpt & getConfiguration() const { return meta->configuration; } /** * Return all column defines. This is useful if you want to read all data from a dmfile. * Note that only the column id and type is valid. * @return All columns */ - ColumnDefines getColumnDefines() + ColumnDefines getColumnDefines() const { ColumnDefines results{}; - results.reserve(this->column_stats.size()); - for (const auto & cs : this->column_stats) + results.reserve(this->meta->column_stats.size()); + for (const auto & cs : this->meta->column_stats) { results.emplace_back(cs.first, "", cs.second.type); } return results; } - static String metav2FileName() { return "meta"; } - std::vector listFilesForUpload(); + bool useMetaV2() const { return meta->version == DMFileFormat::V3; } + std::vector listFilesForUpload() const; void switchToRemote(const S3::DMFileOID & oid); #ifndef DBMS_PUBLIC_GTEST @@ -347,30 +180,43 @@ class DMFile : private boost::noncopyable UInt64 file_id_, UInt64 page_id_, String parent_path_, - Status status_, + DMFileStatus status_, UInt64 small_file_size_threshold_ = 128 * 1024, UInt64 merged_file_max_size_ = 16 * 1024 * 1024, DMConfigurationOpt configuration_ = std::nullopt, DMFileFormat::Version version_ = STORAGE_FORMAT_CURRENT.dm_file) - : file_id(file_id_) - , page_id(page_id_) - , parent_path(std::move(parent_path_)) - , status(status_) - , configuration(std::move(configuration_)) + : page_id(page_id_) , log(Logger::get()) - , version(version_) - , small_file_size_threshold(small_file_size_threshold_) - , merged_file_max_size(merged_file_max_size_) - {} + { + if (version_ == DMFileFormat::V3) + { + meta = std::make_unique( + file_id_, + std::move(parent_path_), + status_, + small_file_size_threshold_, + merged_file_max_size_, + configuration_, + version_); + } + else + { + meta = std::make_unique( // + file_id_, + parent_path_, + status_, + configuration_, + version_); + } + } // Do not gc me. String ngcPath() const; - String metaPath() const { return subFilePath(metaFileName()); } - String packStatPath() const { return subFilePath(packStatFileName()); } - String packPropertyPath() const { return subFilePath(packPropertyFileName()); } - String configurationPath() const { return subFilePath(configurationFileName()); } - String metav2Path() const { return subFilePath(metav2FileName()); } - String mergedPath(UInt32 number) const { return subFilePath(mergedFilename(number)); } + String metav2Path() const { return subFilePath(DMFileMetaV2::metaFileName()); } + UInt64 getReadFileSize(ColId col_id, const String & filename) const + { + return meta->getReadFileSize(col_id, filename); + } using FileNameBase = String; size_t colIndexSizeByName(const FileNameBase & file_name_base) const @@ -381,14 +227,14 @@ class DMFile : private boost::noncopyable { return Poco::File(colDataPath(file_name_base)).getSize(); } - size_t colIndexSize(ColId id); + size_t colIndexSize(ColId id) const; enum class ColDataType { Elements, NullMap, ArraySizes, }; - size_t colDataSize(ColId id, ColDataType type); + size_t colDataSize(ColId id, ColDataType type) const; String colDataPath(const FileNameBase & file_name_base) const { @@ -412,130 +258,26 @@ class DMFile : private boost::noncopyable EncryptionPath encryptionDataPath(const FileNameBase & file_name_base) const; EncryptionPath encryptionIndexPath(const FileNameBase & file_name_base) const; EncryptionPath encryptionMarkPath(const FileNameBase & file_name_base) const; - EncryptionPath encryptionMetaPath() const; - EncryptionPath encryptionPackStatPath() const; - EncryptionPath encryptionPackPropertyPath() const; - EncryptionPath encryptionConfigurationPath() const; - EncryptionPath encryptionMetav2Path() const; - EncryptionPath encryptionMergedPath(UInt32 number) const; static FileNameBase getFileNameBase(ColId col_id, const IDataType::SubstreamPath & substream = {}) { return IDataType::getFileNameForStream(DB::toString(col_id), substream); } - static String metaFileName() { return "meta.txt"; } - static String packStatFileName() { return "pack"; } - static String packPropertyFileName() { return "property"; } - static String configurationFileName() { return "config"; } - static String mergedFilename(UInt32 number) { return fmt::format("{}.merged", number); } - - static String colDataFileName(const FileNameBase & file_name_base); - static String colIndexFileName(const FileNameBase & file_name_base); - static String colMarkFileName(const FileNameBase & file_name_base); - - using OffsetAndSize = std::tuple; - OffsetAndSize writeMetaToBuffer(WriteBuffer & buffer); - OffsetAndSize writePackStatToBuffer(WriteBuffer & buffer); - OffsetAndSize writePackPropertyToBuffer(WriteBuffer & buffer, UnifiedDigestBase * digest = nullptr); - - void writeMeta(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter); - void writePackProperty(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter); - void writeConfiguration(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter); - void readColumnStat(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info); - void readMeta(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info); - void readPackStat(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info); - void readPackProperty(const FileProviderPtr & file_provider, const MetaPackInfo & meta_pack_info); - void readConfiguration(const FileProviderPtr & file_provider); - - void writeMetadata(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter); - void readMetadata(const FileProviderPtr & file_provider, const ReadMetaMode & read_meta_mode); + void addPack(const DMFileMeta::PackStat & pack_stat) { meta->pack_stats.push_back(pack_stat); } - void upgradeMetaIfNeed(const FileProviderPtr & file_provider, DMFileFormat::Version ver); + DMFileStatus getStatus() const { return meta->status; } + void setStatus(DMFileStatus status_) { meta->status = status_; } - void addPack(const PackStat & pack_stat) { pack_stats.push_back(pack_stat); } - - Status getStatus() const { return status; } - void setStatus(Status status_) { status = status_; } - - void finalizeForFolderMode(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter); + void finalize(); String subFilePath(const String & file_name) const { return path() + "/" + file_name; } - void initializeIndices(); - - /* New metadata file format: - * |Pack Stats|Pack Properties|Column Stats|Pack Stats Handle|Pack Properties Handle|Column Stats Handle|Meta Block Handle Count|DMFile Version|Checksum|MetaFooter| - * |----------------------------------------Checksum include-----------------------------------------------------------------------------------| - * `MetaFooter` is saved at the end of the file, with fixed length, it contains checksum algorithm and checksum frame length. - * First, read `MetaFooter` and `Checksum`, and check data integrity. - * Second, parse handle and parse corresponding data. - * `PackStatsHandle`, `PackPropertiesHandle` and `ColumnStatsHandle` are offset and size of `PackStats`, `PackProperties` and `ColumnStats`. - */ - // Meta data is small and 64KB is enough. - static constexpr size_t meta_buffer_size = 64 * 1024; - void finalizeMetaV2(WriteBuffer & buffer); - MetaBlockHandle writeSLPackStatToBuffer(WriteBuffer & buffer); - MetaBlockHandle writeSLPackPropertyToBuffer(WriteBuffer & buffer); - MetaBlockHandle writeColumnStatToBuffer(WriteBuffer & buffer); - MetaBlockHandle writeExtendColumnStatToBuffer(WriteBuffer & buffer); - MetaBlockHandle writeMergedSubFilePosotionsToBuffer(WriteBuffer & buffer); - std::vector readMetaV2(const FileProviderPtr & file_provider); - void parseMetaV2(std::string_view buffer); - void parseColumnStat(std::string_view buffer); - void parseExtendColumnStat(std::string_view buffer); - void parseMergedSubFilePos(std::string_view buffer); - void parsePackProperty(std::string_view buffer); - void parsePackStat(std::string_view buffer); - void finalizeDirName(); - bool useMetaV2() const { return version == DMFileFormat::V3; } - - UInt64 getFileSize(ColId col_id, const String & filename) const; - UInt64 getReadFileSize(ColId col_id, const String & filename) const; - UInt64 getMergedFileSizeOfColumn(const MergedSubFileInfo & file_info) const; - - // The id to construct the file path on disk. - UInt64 file_id; // It is the page_id that represent this file in the PageStorage. It could be the same as file id. UInt64 page_id; - String parent_path; - - PackStats pack_stats; - PackProperties pack_properties; - ColumnStats column_stats; - std::unordered_set column_indices; - - Status status; - DMConfigurationOpt configuration; // configuration LoggerPtr log; - - DMFileFormat::Version version; - - struct MergedFile - { - UInt64 number = 0; - UInt64 size = 0; - }; - - struct MergedFileWriter - { - MergedFile file_info; - std::unique_ptr buffer; - }; - PaddedPODArray merged_files; - // Filename -> MergedSubFileInfo - std::unordered_map merged_sub_file_infos; - - UInt64 small_file_size_threshold; - UInt64 merged_file_max_size; - - void finalizeSmallFiles( - MergedFileWriter & writer, - FileProviderPtr & file_provider, - WriteLimiterPtr & write_limiter); - // check if the size of merged file is larger then the threshold. If so, create a new merged file. - void checkMergedFile(MergedFileWriter & writer, FileProviderPtr & file_provider, WriteLimiterPtr & write_limiter); + DMFileMetaPtr meta; friend class DMFileWriter; friend class DMFileWriterRemote; @@ -549,18 +291,5 @@ class DMFile : private boost::noncopyable friend bool ::DTTool::Migrate::needFrameMigration(const DB::DM::DMFile & file, const std::string & target); }; -inline ReadBufferFromFileProvider openForRead( - const FileProviderPtr & file_provider, - const String & path, - const EncryptionPath & encryption_path, - const size_t & file_size) -{ - return ReadBufferFromFileProvider( - file_provider, - path, - encryption_path, - std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), file_size)); -} - } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index d1bd1b9760c..62b1fad24e6 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -19,6 +19,9 @@ #include #include +#include +#include + namespace DB::DM { DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & context) @@ -42,9 +45,9 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build( const ScanContextPtr & scan_context) { RUNTIME_CHECK( - dmfile->getStatus() == DMFile::Status::READABLE, + dmfile->getStatus() == DMFileStatus::READABLE, dmfile->fileId(), - DMFile::statusString(dmfile->getStatus())); + magic_enum::enum_name(dmfile->getStatus())); // if `rowkey_ranges` is empty, we unconditionally read all packs // `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode. diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp new file mode 100644 index 00000000000..3215b3b0c3a --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp @@ -0,0 +1,438 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "Encryption/WriteBufferFromFileProvider.h" +#include "Encryption/createReadBufferFromFileBaseByFileProvider.h" + +namespace DB::ErrorCodes +{ +extern const int BAD_ARGUMENTS; +} // namespace DB::ErrorCodes + +namespace DB::DM +{ + +void DMFileMeta::initializeIndices() +{ + auto decode = [](const StringRef & data) { + try + { + auto original = unescapeForFileName(data); + return std::stoll(original); + } + catch (const std::invalid_argument & err) + { + throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "invalid ColId: {} from file: {}", err.what(), data); + } + catch (const std::out_of_range & err) + { + throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "invalid ColId: {} from file: {}", err.what(), data); + } + }; + + Poco::File directory{path()}; + std::vector sub_files{}; + directory.list(sub_files); + for (const auto & name : sub_files) + { + if (endsWith(name, details::INDEX_FILE_SUFFIX)) + { + column_indices.insert( + decode(removeSuffix(name, strlen(details::INDEX_FILE_SUFFIX)))); // strip tailing `.idx` + } + } +} + +void DMFileMeta::read(const FileProviderPtr & file_provider, const DMFileMeta::ReadMode & read_meta_mode) +{ + readConfiguration(file_provider); + if (read_meta_mode.isAll()) + { + initializeIndices(); + } + + auto recheck = [&](size_t size) { + if (this->configuration) + { + auto total_size + = this->configuration->getChecksumFrameLength() + this->configuration->getChecksumHeaderLength(); + auto frame_count = size / total_size + (0 != size % total_size); + size -= frame_count * this->configuration->getChecksumHeaderLength(); + } + return size; + }; + + size_t pack_property_size = 0; + if (auto file = Poco::File(packPropertyPath()); file.exists()) + pack_property_size = file.getSize(); + size_t column_stat_size = Poco::File(metaPath()).getSize(); + size_t pack_stat_size = recheck(Poco::File(packStatPath()).getSize()); + + if (read_meta_mode.needPackProperty() && pack_property_size != 0) + readPackProperty(file_provider, pack_property_size); + + if (read_meta_mode.needColumnStat()) + readColumnStat(file_provider, column_stat_size); + + if (read_meta_mode.needPackStat()) + readPackStat(file_provider, pack_stat_size); +} + +void DMFileMeta::readColumnStat(const FileProviderPtr & file_provider, size_t size) +{ + const auto name = metaFileName(); + auto file_buf = openForRead( + file_provider, + metaPath(), + encryptionMetaPath(), + std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), size)); + auto meta_buf = std::vector(size); + auto meta_reader = ReadBufferFromMemory{meta_buf.data(), meta_buf.size()}; + ReadBuffer * buf = &file_buf; + + // checksum examination + if (configuration) + { + auto location = configuration->getEmbeddedChecksum().find(name); + if (location != configuration->getEmbeddedChecksum().end()) + { + auto digest = configuration->createUnifiedDigest(); + file_buf.readBig(meta_buf.data(), meta_buf.size()); + digest->update(meta_buf.data(), meta_buf.size()); + if (unlikely(!digest->compareRaw(location->second))) + { + throw TiFlashException(Errors::Checksum::DataCorruption, "checksum mismatch for {}", metaPath()); + } + buf = &meta_reader; + } + else + { + LOG_WARNING(log, "checksum for {} not found", name); + } + } + + DMFileFormat::Version ver; // Binary version + assertString("DTFile format: ", *buf); + DB::readText(ver, *buf); + assertString("\n", *buf); + readText(column_stats, ver, *buf); + + // for V2, we do not apply in-place upgrade for now + // but it should not affect the normal read procedure + if (unlikely(ver >= DMFileFormat::V2 && !configuration)) + { + throw TiFlashException("configuration expected but not loaded", Errors::Checksum::Missing); + } + tryUpgradeColumnStatInMetaV1(file_provider, ver); +} + +void DMFileMeta::readPackStat(const FileProviderPtr & file_provider, size_t size) +{ + size_t packs = size / sizeof(PackStat); + pack_stats.resize(packs); + const auto path = packStatPath(); + if (configuration) + { + auto buf = createReadBufferFromFileBaseByFileProvider( + file_provider, + path, + encryptionPackStatPath(), + configuration->getChecksumFrameLength(), + nullptr, + configuration->getChecksumAlgorithm(), + configuration->getChecksumFrameLength()); + if (sizeof(PackStat) * packs + != buf->readBig(reinterpret_cast(pack_stats.data()), sizeof(PackStat) * packs)) + { + throw Exception("Cannot read all data", ErrorCodes::CANNOT_READ_ALL_DATA); + } + } + else + { + auto buf = openForRead(file_provider, path, encryptionPackStatPath(), size); + if (sizeof(PackStat) * packs + != buf.readBig(reinterpret_cast(pack_stats.data()), sizeof(PackStat) * packs)) + { + throw Exception("Cannot read all data", ErrorCodes::CANNOT_READ_ALL_DATA); + } + } +} + +void DMFileMeta::readConfiguration(const FileProviderPtr & file_provider) +{ + if (Poco::File(configurationPath()).exists()) + { + auto buf + = openForRead(file_provider, configurationPath(), encryptionConfigurationPath(), DBMS_DEFAULT_BUFFER_SIZE); + auto stream = InputStreamWrapper{buf}; + configuration.emplace(stream); + version = DMFileFormat::V2; + } + else + { + configuration.reset(); + version = DMFileFormat::V1; + } +} + +void DMFileMeta::readPackProperty(const FileProviderPtr & file_provider, size_t size) +{ + String tmp_buf; + const auto name = packPropertyFileName(); + auto buf = openForRead(file_provider, packPropertyPath(), encryptionPackPropertyPath(), size); + + readStringBinary(tmp_buf, buf); + pack_properties.ParseFromString(tmp_buf); + + if (configuration) + { + auto location = configuration->getEmbeddedChecksum().find(name); + if (location != configuration->getEmbeddedChecksum().end()) + { + auto digest = configuration->createUnifiedDigest(); + const auto & target = location->second; + digest->update(tmp_buf.data(), tmp_buf.size()); + if (unlikely(!digest->compareRaw(target))) + { + throw TiFlashException( + Errors::Checksum::DataCorruption, + "checksum mismatch for {}", + packPropertyPath()); + } + } + else + { + LOG_WARNING(log, "checksum for {} not found", name); + } + } +} + +void DMFileMeta::tryUpgradeColumnStatInMetaV1(const FileProviderPtr & file_provider, DMFileFormat::Version ver) +{ + if (likely(ver != DMFileFormat::V0)) + return; + + // Update ColumnStat.serialized_bytes + for (auto && c : column_stats) + { + auto col_id = c.first; + auto & stat = c.second; + c.second.type->enumerateStreams( + [col_id, &stat, this](const IDataType::SubstreamPath & substream) { + String stream_name = DMFileMeta::getFileNameBase(col_id, substream); + String data_file = colDataPath(stream_name); + if (Poco::File f(data_file); f.exists()) + stat.serialized_bytes += f.getSize(); + String mark_file = colDataPath(stream_name); + if (Poco::File f(mark_file); f.exists()) + stat.serialized_bytes += f.getSize(); + String index_file = colIndexPath(stream_name); + if (Poco::File f(index_file); f.exists()) + stat.serialized_bytes += f.getSize(); + }, + {}); + } + // Update ColumnStat in metaV1. + writeMeta(file_provider, nullptr); +} + +void DMFileMeta::writeMeta(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter) +{ + String meta_path = metaPath(); + String tmp_meta_path = meta_path + ".tmp"; + + { + WriteBufferFromFileProvider buf(file_provider, tmp_meta_path, encryptionMetaPath(), false, write_limiter, 4096); + if (configuration) + { + auto digest = configuration->createUnifiedDigest(); + auto tmp_buffer = WriteBufferFromOwnString{}; + writeMetaToBuffer(tmp_buffer); + auto serialized = tmp_buffer.releaseStr(); + digest->update(serialized.data(), serialized.length()); + configuration->addChecksum(metaFileName(), digest->raw()); + buf.write(serialized.data(), serialized.size()); + } + else + { + writeMetaToBuffer(buf); + } + buf.sync(); + } + Poco::File(tmp_meta_path).renameTo(meta_path); +} + +void DMFileMeta::writePackProperty(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter) +{ + String property_path = packPropertyPath(); + String tmp_property_path = property_path + ".tmp"; + { + WriteBufferFromFileProvider + buf(file_provider, tmp_property_path, encryptionPackPropertyPath(), false, write_limiter, 4096); + if (configuration) + { + auto digest = configuration->createUnifiedDigest(); + writePackPropertyToBuffer(buf, digest.get()); + configuration->addChecksum(packPropertyFileName(), digest->raw()); + } + else + { + writePackPropertyToBuffer(buf); + } + buf.sync(); + } + Poco::File(tmp_property_path).renameTo(property_path); +} + +void DMFileMeta::writeConfiguration(const FileProviderPtr & file_provider, const WriteLimiterPtr & write_limiter) +{ + assert(configuration); + String config_path = configurationPath(); + String tmp_config_path = config_path + ".tmp"; + { + WriteBufferFromFileProvider buf( + file_provider, + tmp_config_path, + encryptionConfigurationPath(), + false, + write_limiter, + DBMS_DEFAULT_BUFFER_SIZE); + { + auto stream = OutputStreamWrapper{buf}; + stream << *configuration; + } + buf.sync(); + } + Poco::File(tmp_config_path).renameTo(config_path); +} + +DMFileMeta::OffsetAndSize DMFileMeta::writeMetaToBuffer(WriteBuffer & buffer) const +{ + size_t meta_offset = buffer.count(); + writeString("DTFile format: ", buffer); + writeIntText(configuration ? DMFileFormat::V2 : DMFileFormat::V1, buffer); + writeString("\n", buffer); + writeText(column_stats, STORAGE_FORMAT_CURRENT.dm_file, buffer); + size_t meta_size = buffer.count() - meta_offset; + return std::make_tuple(meta_offset, meta_size); +} + +DMFileMeta::OffsetAndSize DMFileMeta::writePackPropertyToBuffer(WriteBuffer & buffer, UnifiedDigestBase * digest) +{ + size_t offset = buffer.count(); + auto data = pack_properties.SerializeAsString(); + if (digest) + { + digest->update(data.data(), data.size()); + } + writeStringBinary(data, buffer); + size_t size = buffer.count() - offset; + return std::make_tuple(offset, size); +} + +void DMFileMeta::finalize( + WriteBuffer & buffer, + const FileProviderPtr & file_provider, + const WriteLimiterPtr & write_limiter) +{ + // write pack stats + for (const auto & pack_stat : pack_stats) + { + writePODBinary(pack_stat, buffer); + } + writePackProperty(file_provider, write_limiter); + writeMeta(file_provider, write_limiter); + if (configuration) + { + writeConfiguration(file_provider, write_limiter); + } +} + +String DMFileMeta::encryptionBasePath() const +{ + return getPathByStatus(parent_path, file_id, DMFileStatus::READABLE); +} + +EncryptionPath DMFileMeta::encryptionMetaPath() const +{ + return EncryptionPath(encryptionBasePath(), metaFileName()); +} + +EncryptionPath DMFileMeta::encryptionPackStatPath() const +{ + return EncryptionPath(encryptionBasePath(), packStatFileName()); +} + +EncryptionPath DMFileMeta::encryptionPackPropertyPath() const +{ + return EncryptionPath(encryptionBasePath(), packPropertyFileName()); +} + +EncryptionPath DMFileMeta::encryptionConfigurationPath() const +{ + return EncryptionPath(encryptionBasePath(), configurationFileName()); +} + +UInt64 DMFileMeta::getFileSize(ColId col_id, const String & filename) const +{ + auto itr = column_stats.find(col_id); + RUNTIME_CHECK(itr != column_stats.end(), col_id); + if (endsWith(filename, ".idx")) + { + return itr->second.index_bytes; + } + // Note that ".null.dat"/"null.mrk" must be check before ".dat"/".mrk" + else if (endsWith(filename, ".null.dat")) + { + return itr->second.nullmap_data_bytes; + } + else if (endsWith(filename, ".null.mrk")) + { + return itr->second.nullmap_mark_bytes; + } + // Note that ".size0.dat"/".size0.mrk" must be check before ".dat"/".mrk" + else if (endsWith(filename, ".size0.dat")) + { + return itr->second.array_sizes_bytes; + } + else if (endsWith(filename, ".size0.mrk")) + { + return itr->second.array_sizes_mark_bytes; + } + else if (endsWith(filename, ".dat")) + { + return itr->second.data_bytes; + } + else if (endsWith(filename, ".mrk")) + { + return itr->second.mark_bytes; + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknow filename={} col_id={}", filename, col_id); + } +} + +UInt64 DMFileMeta::getReadFileSize(ColId col_id, const String & filename) const +{ + return getFileSize(col_id, filename); +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h new file mode 100644 index 00000000000..b3476b11af4 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileMeta.h @@ -0,0 +1,258 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB::DM +{ + +namespace tests +{ +class DMStoreForSegmentReadTaskTest; +class DMFileMetaV2Test; +} // namespace tests + +class DMFile; +class DMFileWriter; + +class DMFileMeta +{ +public: + DMFileMeta( + UInt64 file_id_, + const String & parent_path_, + DMFileStatus status_, + DMConfigurationOpt configuration_, + DMFileFormat::Version version_) + : file_id(file_id_) + , parent_path(parent_path_) + , status(status_) + , configuration(configuration_) + , log(Logger::get()) + , version(version_) + {} + + virtual ~DMFileMeta() = default; + + struct ReadMode + { + private: + static constexpr UInt8 READ_NONE = 0x00; + static constexpr UInt8 READ_COLUMN_STAT = 0x01; + static constexpr UInt8 READ_PACK_STAT = 0x02; + static constexpr UInt8 READ_PACK_PROPERTY = 0x04; + + UInt8 value; + + public: + explicit ReadMode(UInt8 value_) + : value(value_) + {} + + static ReadMode all() { return ReadMode(READ_COLUMN_STAT | READ_PACK_STAT | READ_PACK_PROPERTY); } + static ReadMode none() { return ReadMode(READ_NONE); } + // after restore with mode, you can call `getBytesOnDisk` to get disk size of this DMFile + static ReadMode diskSizeOnly() { return ReadMode(READ_COLUMN_STAT); } + // after restore with mode, you can call `getRows`, `getBytes` to get memory size of this DMFile, + // and call `getBytesOnDisk` to get disk size of this DMFile + static ReadMode memoryAndDiskSize() { return ReadMode(READ_COLUMN_STAT | READ_PACK_STAT); } + + inline bool needColumnStat() const { return value & READ_COLUMN_STAT; } + inline bool needPackStat() const { return value & READ_PACK_STAT; } + inline bool needPackProperty() const { return value & READ_PACK_PROPERTY; } + + inline bool isNone() const { return value == READ_NONE; } + inline bool isAll() const { return needColumnStat() && needPackStat() && needPackProperty(); } + }; + + struct PackStat + { + UInt32 rows; + UInt32 not_clean; + UInt64 first_version; + UInt64 bytes; + UInt8 first_tag; + + String toDebugString() const + { + return fmt::format( + "rows={}, not_clean={}, first_version={}, bytes={}, first_tag={}", + rows, + not_clean, + first_version, + bytes, + first_tag); + } + }; + static_assert(std::is_standard_layout_v); + + struct PackProperty + { + // when gc_safe_point exceed this version, there must be some data obsolete in this pack + UInt64 gc_hint_version{}; + // effective rows(multiple versions of one row is count as one include delete) + UInt64 num_rows{}; + // the number of rows in this pack which are deleted + UInt64 deleted_rows{}; + + void toProtobuf(dtpb::PackProperty * p) const + { + p->set_gc_hint_version(gc_hint_version); + p->set_num_rows(num_rows); + p->set_deleted_rows(deleted_rows); + } + + void fromProtoBuf(const dtpb::PackProperty & p) + { + gc_hint_version = p.gc_hint_version(); + num_rows = p.num_rows(); + deleted_rows = p.deleted_rows(); + } + + explicit PackProperty(const dtpb::PackProperty & p) { fromProtoBuf(p); } + }; + static_assert(std::is_standard_layout_v); + + enum class BlockType : UInt64 + { + PackStat = 0, + PackProperty, + ColumnStat, // Deprecated, use `ExtendColumnStat` instead + MergedSubFilePos, + ExtendColumnStat, + }; + struct BlockHandle + { + BlockType type; + UInt64 offset; + UInt64 size; + }; + static_assert(std::is_standard_layout_v && sizeof(BlockHandle) == sizeof(UInt64) * 3); + + struct Footer + { + UInt64 checksum_frame_length = 0; + UInt64 checksum_algorithm = 0; + }; + static_assert(std::is_standard_layout_v