From da0d19acedb2c59f82d093206503eb7309adcbb6 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 26 Sep 2019 13:21:41 +0800 Subject: [PATCH 1/6] Chunk binary version --- dbms/src/Storages/DeltaMerge/Chunk.cpp | 10 ++++++++++ dbms/src/Storages/DeltaMerge/Chunk.h | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/dbms/src/Storages/DeltaMerge/Chunk.cpp b/dbms/src/Storages/DeltaMerge/Chunk.cpp index 67f31caf7be..44f4307b71d 100644 --- a/dbms/src/Storages/DeltaMerge/Chunk.cpp +++ b/dbms/src/Storages/DeltaMerge/Chunk.cpp @@ -10,8 +10,12 @@ namespace DB namespace DM { +const Chunk::Version Chunk::CURRENT_VERSION = 1; + void Chunk::serialize(WriteBuffer & buf) const { + writeVarUInt(Chunk::CURRENT_VERSION, buf); // Add binary version + writeIntBinary(handle_start, buf); writeIntBinary(handle_end, buf); writePODBinary(is_delete_range, buf); @@ -37,6 +41,12 @@ void Chunk::serialize(WriteBuffer & buf) const Chunk Chunk::deserialize(ReadBuffer & buf) { + // Check binary version + Chunk::Version chunk_batch_version; + readVarUInt(chunk_batch_version, buf); + if (chunk_batch_version != Chunk::CURRENT_VERSION) + throw Exception("Chunk binary version not match: " + DB::toString(chunk_batch_version), ErrorCodes::LOGICAL_ERROR); + Handle start, end; readIntBinary(start, buf); readIntBinary(end, buf); diff --git a/dbms/src/Storages/DeltaMerge/Chunk.h b/dbms/src/Storages/DeltaMerge/Chunk.h index d68994f7cbf..41c34805913 100644 --- a/dbms/src/Storages/DeltaMerge/Chunk.h +++ b/dbms/src/Storages/DeltaMerge/Chunk.h @@ -35,6 +35,10 @@ using ColumnMetas = std::vector; class Chunk { +public: + // Binary version of chunk + using Version = UInt32; + static const Version CURRENT_VERSION; public: using ColumnMetaMap = std::unordered_map; From d72a780d2a845e7cf47d6db0eed5755bcee8b086 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 26 Sep 2019 13:48:07 +0800 Subject: [PATCH 2/6] Add name to PageStorage for identify different storages --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 7 +++-- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 3 +- dbms/src/Storages/DeltaMerge/StoragePool.cpp | 8 +++--- dbms/src/Storages/DeltaMerge/StoragePool.h | 2 +- .../tests/gtest_dm_delta_merge_store.cpp | 2 +- .../tests/gtest_dm_disk_value_space.cpp | 2 +- .../DeltaMerge/tests/gtest_dm_segment.cpp | 2 +- dbms/src/Storages/Page/PageStorage.cpp | 28 ++++++++++--------- dbms/src/Storages/Page/PageStorage.h | 3 +- .../Page/tests/gtest_page_storage.cpp | 2 +- dbms/src/Storages/StorageDeltaMerge.cpp | 9 +++--- .../Storages/Transaction/RegionPersister.h | 2 +- 12 files changed, 38 insertions(+), 32 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 1178d7b02af..9edd2c59695 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -38,13 +38,14 @@ namespace DM DeltaMergeStore::DeltaMergeStore(Context & db_context, const String & path_, - const String & name, + const String & db_name, + const String & tbl_name, const ColumnDefines & columns, const ColumnDefine & handle, const Settings & settings_) : path(path_), - storage_pool(path), - table_name(name), + storage_pool(db_name + "." + tbl_name, path), + table_name(tbl_name), table_handle_define(handle), background_pool(db_context.getBackgroundPool()), settings(settings_), diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 411b6c802c8..0f0886bb8dd 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -43,7 +43,8 @@ class DeltaMergeStore DeltaMergeStore(Context & db_context, // const String & path_, - const String & name, + const String & db_name, + const String & tbl_name, const ColumnDefines & columns, const ColumnDefine & handle, const Settings & settings_); diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index ff063aebfd7..a4bd62f14ca 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -6,10 +6,10 @@ namespace DM { // TODO: Load configs from settings. -StoragePool::StoragePool(const String & path) - : log_storage(path + "/log", {}), - data_storage(path + "/data", {}), - meta_storage(path + "/meta", {}), +StoragePool::StoragePool(const String &name, const String & path) + : log_storage(name + ".log", path + "/log", {}), + data_storage(name + ".data", path + "/data", {}), + meta_storage(name + ".meta", path + "/meta", {}), max_log_page_id(log_storage.getMaxId()), max_data_page_id(data_storage.getMaxId()), max_meta_page_id(meta_storage.getMaxId()) diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index e58f62317d0..11f03c6bfd8 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -20,7 +20,7 @@ class StoragePool : private boost::noncopyable using Duration = Clock::duration; using Seconds = std::chrono::seconds; - explicit StoragePool(const String & path); + StoragePool(const String & name, const String & path); PageId maxLogPageId() { return max_log_page_id; } PageId maxDataPageId() { return max_data_page_id; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 7db1c3d2e11..c611bb62509 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -51,7 +51,7 @@ class DeltaMergeStore_test : public ::testing::Test ColumnDefine handle_column_define = cols[0]; DeltaMergeStorePtr s - = std::make_shared(*context, path, name, cols, handle_column_define, DeltaMergeStore::Settings()); + = std::make_shared(*context, path, "test", name, cols, handle_column_define, DeltaMergeStore::Settings()); return s; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp index 3133758dcab..1f1896ce623 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp @@ -34,7 +34,7 @@ class DiskValueSpace_test : public ::testing::Test { dropDataInDisk(); - storage_pool = std::make_unique(path); + storage_pool = std::make_unique("test.t1", path); Context & context = DMTestEnv::getContext(); table_handle_define = ColumnDefine(1, "pk", std::make_shared()); table_columns.clear(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 812231f7240..e52fc67179e 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -53,7 +53,7 @@ class Segment_test : public ::testing::Test SegmentPtr reload(ColumnDefines && pre_define_columns = {}, DB::Settings && db_settings = DB::Settings()) { - storage_pool = std::make_unique(path); + storage_pool = std::make_unique("test.t1", path); *db_context = DMTestEnv::getContext(db_settings); ColumnDefines cols = pre_define_columns.empty() ? DMTestEnv::getDefaultColumns() : pre_define_columns; setColumns(cols); diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index d3714e3fe86..1558ff06a0b 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -1,6 +1,7 @@ -#include - #include +#include + +#include #include #include @@ -51,8 +52,9 @@ PageStorage::listAllPageFiles(const String & storage_path, bool remove_tmp_file, return page_files; } -PageStorage::PageStorage(const String & storage_path_, const Config & config_) - : storage_path(storage_path_), +PageStorage::PageStorage(String name, const String & storage_path_, const Config & config_) + : storage_name(std::move(name)), + storage_path(storage_path_), config(config_), versioned_page_entries(), page_file_log(&Poco::Logger::get("PageFile")), @@ -120,7 +122,7 @@ PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot) } catch (DB::Exception & e) { - LOG_WARNING(log, e.message()); + LOG_WARNING(log, storage_name << " " << e.message()); return {}; // return invalid PageEntry } } @@ -336,8 +338,6 @@ bool PageStorage::gc() return false; } - LOG_DEBUG(log, "PageStorage GC start"); - PageFileIdAndLevel writing_file_id_level; { std::lock_guard lock(write_mutex); @@ -387,12 +387,14 @@ bool PageStorage::gc() || (merge_files.size() >= 2 && candidate_total_size >= config.merge_hint_low_used_file_total_size); if (!should_merge) { - LOG_DEBUG(log, - "GC exit without merging. merge file size: " << merge_files.size() << ", candidate size: " << candidate_total_size); + LOG_TRACE(log, + storage_name << " GC exit without merging. merge file size: " << merge_files.size() + << ", candidate size: " << candidate_total_size); return false; } - LOG_INFO(log, "GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count << " regions"); + LOG_INFO(log, + storage_name << " GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count << " regions"); // There are no valid pages to be migrated but valid ref pages, scan over all `merge_files` and do migrate. gc_file_entries_edit = gcMigratePages(snapshot, file_valid_pages, merge_files); @@ -530,7 +532,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f catch (DB::Exception & e) { // ignore if it2 is a ref to non-exist page - LOG_WARNING(log, "Ignore invalid RefPage while gcMigratePages: " + e.message()); + LOG_WARNING(log, storage_name << " Ignore invalid RefPage while gcMigratePages: " << e.message()); } } } @@ -580,8 +582,8 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f gc_file.setFormal(); const auto id = gc_file.fileIdLevel(); LOG_INFO(log, - "GC have migrated " << num_successful_migrate_pages << " regions and " << num_valid_ref_pages << " RefPages to PageFile_" - << id.first << "_" << id.second); + storage_name << " GC have migrated " << num_successful_migrate_pages << " regions and " << num_valid_ref_pages + << " RefPages to PageFile_" << id.first << "_" << id.second); } return gc_file_edit; } diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 7471c872416..3e1aba5b9c8 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -56,7 +56,7 @@ class PageStorage using OpenReadFiles = std::map; public: - PageStorage(const String & storage_path, const Config & config_); + PageStorage(String name, const String & storage_path, const Config & config_); PageId getMaxId(); @@ -90,6 +90,7 @@ class PageStorage gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & file_valid_pages, const GcCandidates & merge_files) const; private: + String storage_name; // Identify between different Storage String storage_path; Config config; diff --git a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp index 09966c3a6ee..b6f984b521a 100644 --- a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp @@ -56,7 +56,7 @@ class PageStorage_test : public ::testing::Test std::shared_ptr reopenWithConfig(const PageStorage::Config & config_) { - return std::make_shared(path, config_); + return std::make_shared("test.t", path, config_); } protected: diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index c4b9bdff4ee..dcb51c85548 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -120,8 +120,8 @@ StorageDeltaMerge::StorageDeltaMerge(const String & path_, assert(!handle_column_define.name.empty()); assert(!table_column_defines.empty()); - store = std::make_shared( - global_context, path, table_name, std::move(table_column_defines), std::move(handle_column_define), DeltaMergeStore::Settings()); + store = std::make_shared(global_context, path, db_name, table_name, std::move(table_column_defines), + std::move(handle_column_define), DeltaMergeStore::Settings()); } void StorageDeltaMerge::drop() @@ -522,8 +522,9 @@ void StorageDeltaMerge::rename(const String & new_path_to_db, const String & new // rename path and generate a new store Poco::File(path).renameTo(new_path); - store = std::make_shared( - global_context, new_path, new_table_name, std::move(table_column_defines), std::move(handle_column_define), settings); + store = std::make_shared(global_context, // + new_path, new_database_name, new_table_name, // + std::move(table_column_defines), std::move(handle_column_define), settings); path = new_path; db_name = new_database_name; diff --git a/dbms/src/Storages/Transaction/RegionPersister.h b/dbms/src/Storages/Transaction/RegionPersister.h index f6fa5393ca4..e7c2489da83 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.h +++ b/dbms/src/Storages/Transaction/RegionPersister.h @@ -20,7 +20,7 @@ class RegionPersister final : private boost::noncopyable { public: RegionPersister(const std::string & storage_path, const RegionManager & region_manager_, const PageStorage::Config & config = {}) - : page_storage(storage_path, config), region_manager(region_manager_), log(&Logger::get("RegionPersister")) + : page_storage("RegionPersister", storage_path, config), region_manager(region_manager_), log(&Logger::get("RegionPersister")) {} void drop(RegionID region_id); From e22361acee3f9c9c3b837aa8869569c253140bb3 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 26 Sep 2019 15:48:19 +0800 Subject: [PATCH 3/6] Remove getMaxId of PageStorage --- dbms/src/Storages/DeltaMerge/StoragePool.cpp | 12 +++---- dbms/src/Storages/DeltaMerge/StoragePool.h | 9 ++--- dbms/src/Storages/Page/PageEntries.h | 12 +------ dbms/src/Storages/Page/PageFile.cpp | 34 +++++++++++++------ dbms/src/Storages/Page/PageFile.h | 2 +- dbms/src/Storages/Page/PageStorage.cpp | 10 ++---- dbms/src/Storages/Page/PageStorage.h | 4 +-- .../PageEntriesVersionSetWithDelta.cpp | 3 -- .../Page/VersionSet/PageEntriesView.cpp | 10 ------ .../Page/VersionSet/PageEntriesView.h | 2 -- .../Page/tests/gtest_page_entry_map.cpp | 3 -- 11 files changed, 40 insertions(+), 61 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index a4bd62f14ca..d88a60f959b 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -7,12 +7,12 @@ namespace DM // TODO: Load configs from settings. StoragePool::StoragePool(const String &name, const String & path) - : log_storage(name + ".log", path + "/log", {}), - data_storage(name + ".data", path + "/data", {}), - meta_storage(name + ".meta", path + "/meta", {}), - max_log_page_id(log_storage.getMaxId()), - max_data_page_id(data_storage.getMaxId()), - max_meta_page_id(meta_storage.getMaxId()) + : max_log_page_id(0), + max_data_page_id(0), + max_meta_page_id(0), + log_storage(name + ".log", path + "/log", {}, &max_log_page_id), + data_storage(name + ".data", path + "/data", {}, &max_data_page_id), + meta_storage(name + ".meta", path + "/meta", {}, &max_meta_page_id) { } diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index 11f03c6bfd8..9b8a1f19298 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -37,14 +37,15 @@ class StoragePool : private boost::noncopyable bool gc(const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD); private: - PageStorage log_storage; - PageStorage data_storage; - PageStorage meta_storage; - + // These should init before PageStorages std::atomic max_log_page_id; std::atomic max_data_page_id; std::atomic max_meta_page_id; + PageStorage log_storage; + PageStorage data_storage; + PageStorage meta_storage; + std::atomic last_try_gc_time = Clock::now(); std::mutex mutex; diff --git a/dbms/src/Storages/Page/PageEntries.h b/dbms/src/Storages/Page/PageEntries.h index 58bf16bfef9..c081da85158 100644 --- a/dbms/src/Storages/Page/PageEntries.h +++ b/dbms/src/Storages/Page/PageEntries.h @@ -26,7 +26,7 @@ template class PageEntriesMixin { public: - explicit PageEntriesMixin(bool is_base_) : normal_pages(), page_ref(), ref_deletions(), max_page_id(0), is_base(is_base_) {} + explicit PageEntriesMixin(bool is_base_) : normal_pages(), page_ref(), ref_deletions(), is_base(is_base_) {} public: static std::shared_ptr createBase() { return std::make_shared(true); } @@ -105,12 +105,9 @@ class PageEntriesMixin { page_ref.clear(); normal_pages.clear(); - max_page_id = 0; ref_deletions.clear(); } - PageId maxId() const { return max_page_id; } - public: using const_normal_page_iterator = std::unordered_map::const_iterator; // only scan over normal Pages, excluding RefPages @@ -123,7 +120,6 @@ class PageEntriesMixin // RefPageId deletions std::unordered_set ref_deletions; - PageId max_page_id; bool is_base; protected: @@ -158,7 +154,6 @@ class PageEntriesMixin { page_ref = rhs.page_ref; normal_pages = rhs.normal_pages; - max_page_id = rhs.max_page_id; ref_deletions = rhs.ref_deletions; } @@ -174,7 +169,6 @@ class PageEntriesMixin { normal_pages.swap(rhs.normal_pages); page_ref.swap(rhs.page_ref); - max_page_id = rhs.max_page_id; is_base = rhs.is_base; ref_deletions.swap(rhs.ref_deletions); } @@ -217,8 +211,6 @@ void PageEntriesMixin::put(PageId page_id, const PageEntry & entry) normal_pages[normal_page_id].ref = page_ref_count + is_new_ref_pair_inserted; } - // update max_page_id - max_page_id = std::max(max_page_id, page_id); } template @@ -273,7 +265,6 @@ void PageEntriesMixin::ref(const PageId ref_id, const PageId page_id) page_ref[ref_id] = normal_page_id; } } - max_page_id = std::max(max_page_id, std::max(ref_id, page_id)); } template @@ -443,7 +434,6 @@ class PageEntriesForDelta : public PageEntriesMixin, normal_pages[it.first] = it.second; } } - max_page_id = std::max(max_page_id, rhs.max_page_id); } bool shouldCompactToBase(const ::DB::MVCC::VersionSetConfig & config) diff --git a/dbms/src/Storages/Page/PageFile.cpp b/dbms/src/Storages/Page/PageFile.cpp index a6240c67381..417b36267d7 100644 --- a/dbms/src/Storages/Page/PageFile.cpp +++ b/dbms/src/Storages/Page/PageFile.cpp @@ -141,14 +141,17 @@ std::pair genWriteData( // /// Analyze meta file, and return . std::pair analyzeMetaFile( // - const String & path, - PageFileId file_id, - UInt32 level, - const char * meta_data, - const size_t meta_data_size, - PageEntriesEdit & edit, - Logger * log) + const String & path, + PageFileId file_id, + UInt32 level, + const char * meta_data, + const size_t meta_data_size, + PageEntriesEdit & edit, + std::atomic * max_page_id, + Logger * log) { + assert(max_page_id != nullptr); + const char * meta_data_end = meta_data + meta_data_size; UInt64 page_data_file_size = 0; @@ -201,6 +204,7 @@ std::pair analyzeMetaFile( // pc.checksum = PageUtil::get(pos); edit.put(page_id, pc); + *max_page_id = std::max(*max_page_id, page_id); page_data_file_size += pc.size; break; } @@ -208,6 +212,7 @@ std::pair analyzeMetaFile( // { auto page_id = PageUtil::get(pos); edit.del(page_id); // Reserve the order of removal. + *max_page_id = std::max(*max_page_id, page_id); break; } case WriteBatch::WriteType::REF: @@ -215,6 +220,7 @@ std::pair analyzeMetaFile( // const auto ref_id = PageUtil::get(pos); const auto page_id = PageUtil::get(pos); edit.ref(ref_id, page_id); + *max_page_id = std::max(*max_page_id, page_id); } } } @@ -280,7 +286,10 @@ void PageFile::Writer::write(const WriteBatch & wb, PageEntriesEdit & edit) // PageFile::Reader // ========================================================= -PageFile::Reader::Reader(PageFile & page_file) : data_file_path(page_file.dataPath()), data_file_fd(PageUtil::openFile(data_file_path)) {} +PageFile::Reader::Reader(PageFile & page_file) + : data_file_path(page_file.dataPath()), data_file_fd(PageUtil::openFile(data_file_path)) +{ +} PageFile::Reader::~Reader() { @@ -460,7 +469,7 @@ PageFile PageFile::openPageFileForRead(PageFileId file_id, UInt32 level, const s return PageFile(file_id, level, parent_path, false, false, log); } -void PageFile::readAndSetPageMetas(PageEntriesEdit & edit) +void PageFile::readAndSetPageMetas(PageEntriesEdit & edit, std::atomic * max_page_id) { const auto path = metaPath(); Poco::File file(path); @@ -475,9 +484,14 @@ void PageFile::readAndSetPageMetas(PageEntriesEdit & edit) PageUtil::readFile(file_fd, 0, data, file_size, path); + // If max_page_id is nullptr, we will ignore that + std::atomic ignored_max_page_id = 0; + if (max_page_id == nullptr) + max_page_id = &ignored_max_page_id; + // analyze meta file and update page_entries std::tie(this->meta_file_pos, this->data_file_pos) - = PageMetaFormat::analyzeMetaFile(folderPath(), file_id, level, data, file_size, edit, log); + = PageMetaFormat::analyzeMetaFile(folderPath(), file_id, level, data, file_size, edit, max_page_id, log); } void PageFile::setFormal() diff --git a/dbms/src/Storages/Page/PageFile.h b/dbms/src/Storages/Page/PageFile.h index 34d74386f6f..8aeec5e94bf 100644 --- a/dbms/src/Storages/Page/PageFile.h +++ b/dbms/src/Storages/Page/PageFile.h @@ -91,7 +91,7 @@ class PageFile : public Allocator /// Get pages' metadata by this method. Will also update file pos. /// Call this method after a page file recovered. /// if check_page_map_complete is true, do del or ref on non-exist page will throw exception. - void readAndSetPageMetas(PageEntriesEdit & edit); + void readAndSetPageMetas(PageEntriesEdit & edit, std::atomic * max_page_id = nullptr); /// Rename this page file into formal style. void setFormal(); diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index 1558ff06a0b..328fb023792 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -52,7 +52,7 @@ PageStorage::listAllPageFiles(const String & storage_path, bool remove_tmp_file, return page_files; } -PageStorage::PageStorage(String name, const String & storage_path_, const Config & config_) +PageStorage::PageStorage(String name, const String & storage_path_, const Config & config_, std::atomic * max_page_id) : storage_name(std::move(name)), storage_path(storage_path_), config(config_), @@ -68,7 +68,7 @@ PageStorage::PageStorage(String name, const String & storage_path_, const Config for (auto & page_file : page_files) { PageEntriesEdit edit; - const_cast(page_file).readAndSetPageMetas(edit); + const_cast(page_file).readAndSetPageMetas(edit, max_page_id); // Only level 0 is writable. if (page_file.getLevel() == 0) @@ -99,12 +99,6 @@ PageStorage::PageStorage(String name, const String & storage_path_, const Config #endif } -PageId PageStorage::getMaxId() -{ - std::lock_guard write_lock(write_mutex); - return versioned_page_entries.getSnapshot()->version()->maxId(); -} - PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot) { if (!snapshot) diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 3e1aba5b9c8..6f0dec366e0 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -56,9 +56,7 @@ class PageStorage using OpenReadFiles = std::map; public: - PageStorage(String name, const String & storage_path, const Config & config_); - - PageId getMaxId(); + PageStorage(String name, const String & storage_path, const Config & config_, std::atomic * max_page_id = nullptr); void write(const WriteBatch & write_batch); diff --git a/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp b/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp index 946fc04567b..19f31ca483f 100644 --- a/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp +++ b/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp @@ -199,8 +199,6 @@ void DeltaVersionEditAcceptor::applyPut(PageEntriesEdit::EditRecord & rec) rec.entry.ref = old_entry->ref + !is_ref_exist; current_version->normal_pages[normal_page_id] = rec.entry; } - - current_version->max_page_id = std::max(current_version->max_page_id, rec.page_id); } void DeltaVersionEditAcceptor::applyDel(PageEntriesEdit::EditRecord & rec) @@ -253,7 +251,6 @@ void DeltaVersionEditAcceptor::applyRef(PageEntriesEdit::EditRecord & rec) current_version->page_ref[rec.page_id] = normal_page_id; } } - current_version->max_page_id = std::max(current_version->max_page_id, rec.page_id); } void DeltaVersionEditAcceptor::applyInplace(const PageEntriesVersionSetWithDelta::VersionPtr & current, const PageEntriesEdit & edit) diff --git a/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp b/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp index 39eb443cb8c..02a90a95a1b 100644 --- a/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp +++ b/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp @@ -141,14 +141,4 @@ std::set PageEntriesView::validNormalPageIds() const return valid_normal_pages; } -PageId PageEntriesView::maxId() const -{ - PageId max_id = 0; - for (auto node = tail; node != nullptr; node = node->prev) - { - max_id = std::max(max_id, node->maxId()); - } - return max_id; -} - } // namespace DB diff --git a/dbms/src/Storages/Page/VersionSet/PageEntriesView.h b/dbms/src/Storages/Page/VersionSet/PageEntriesView.h index f6857fd213a..ddb188b7aa5 100644 --- a/dbms/src/Storages/Page/VersionSet/PageEntriesView.h +++ b/dbms/src/Storages/Page/VersionSet/PageEntriesView.h @@ -28,8 +28,6 @@ class PageEntriesView std::set validNormalPageIds() const; std::optional findNormalPageEntry(PageId page_id) const; - PageId maxId() const; - inline std::shared_ptr getSharedTailVersion() const { return tail; } inline std::shared_ptr transferTailVersionOwn() diff --git a/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp b/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp index 2bfb6be5e96..2ba00eb6852 100644 --- a/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp +++ b/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp @@ -37,7 +37,6 @@ TEST_F(PageEntryMap_test, Empty) item_count += 1; } ASSERT_EQ(item_count, 0UL); - ASSERT_EQ(map->maxId(), 0UL); // add some Pages, RefPages @@ -53,7 +52,6 @@ TEST_F(PageEntryMap_test, Empty) item_count += 1; } ASSERT_EQ(item_count, 2UL); - ASSERT_EQ(map->maxId(), 1UL); map->clear(); item_count = 0; @@ -62,7 +60,6 @@ TEST_F(PageEntryMap_test, Empty) item_count += 1; } ASSERT_EQ(item_count, 0UL); - ASSERT_EQ(map->maxId(), 0UL); } TEST_F(PageEntryMap_test, UpdatePageEntry) From 20412b2b0767cee46a4a7056884bd0492da4063c Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 26 Sep 2019 18:12:12 +0800 Subject: [PATCH 4/6] Migrate DelPage mark in doing GC --- dbms/src/Storages/Page/PageStorage.cpp | 73 +++++++++++++------ dbms/src/Storages/Page/PageStorage.h | 4 + .../Page/tests/gtest_page_storage.cpp | 59 +++++++++++++++ 3 files changed, 112 insertions(+), 24 deletions(-) diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index 328fb023792..bd0f2c9661f 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -417,20 +417,7 @@ bool PageStorage::gc() } // Delete obsolete files that are not used by any version, without lock - for (const auto & page_file : page_files) - { - const auto page_id_and_lvl = page_file.fileIdLevel(); - if (page_id_and_lvl >= writing_file_id_level) - { - continue; - } - - if (live_files.count(page_id_and_lvl) == 0) - { - // the page file is not used by any version, remove reader cache - page_file.destroy(); - } - } + gcRemoveObsoleteFiles(page_files, writing_file_id_level, live_files); return true; } @@ -487,6 +474,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f size_t num_successful_migrate_pages = 0; size_t num_valid_ref_pages = 0; + size_t num_del_page_meta = 0; auto * current = snapshot->version(); { PageEntriesEdit legacy_edit; // All page entries in `merge_files` @@ -506,8 +494,8 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f continue; } + PageIdAndEntries page_id_and_entries; // The valid pages that we need to migrate to `gc_file` auto to_merge_file_reader = to_merge_file.createReader(); - PageIdAndEntries page_id_and_entries; { const auto & page_ids = it->second.second; for (auto page_id : page_ids) @@ -550,17 +538,26 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f } { - // Migrate RefPages which are still valid. + // Migrate valid RefPages and DelPage. WriteBatch batch; for (const auto & rec : legacy_edit.getRecords()) { - // Get `normal_page_id` from memory's `page_entry_map`. Note: can not get `normal_page_id` from disk, - // if it is a record of RefPage to another RefPage, the later ref-id is resolve to the actual `normal_page_id`. - auto [is_ref, normal_page_id] = current->isRefId(rec.page_id); - if (is_ref) + if (rec.type == WriteBatch::WriteType::REF) + { + // Get `normal_page_id` from memory's `page_entry_map`. Note: can not get `normal_page_id` from disk, + // if it is a record of RefPage to another RefPage, the later ref-id is resolve to the actual `normal_page_id`. + auto [is_ref, normal_page_id] = current->isRefId(rec.page_id); + if (is_ref) + { + batch.putRefPage(rec.page_id, normal_page_id); + num_valid_ref_pages += 1; + } + } + else if (rec.type == WriteBatch::WriteType::DEL) { - batch.putRefPage(rec.page_id, normal_page_id); - num_valid_ref_pages += 1; + // DelPage should be migrate to new PageFile + batch.delPage(rec.page_id); + num_del_page_meta += 1; } } gc_file_writer->write(batch, gc_file_edit); @@ -576,10 +573,38 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f gc_file.setFormal(); const auto id = gc_file.fileIdLevel(); LOG_INFO(log, - storage_name << " GC have migrated " << num_successful_migrate_pages << " regions and " << num_valid_ref_pages - << " RefPages to PageFile_" << id.first << "_" << id.second); + storage_name << " GC have migrated " << num_successful_migrate_pages // + << " regions and " << num_valid_ref_pages // + << " RefPages and " << num_del_page_meta // + << " DelPage to PageFile_" << id.first << "_" << id.second); } return gc_file_edit; } +/** + * Delete obsolete files that are not used by any version + * @param page_files All avaliable files in disk + * @param writing_file_id_level The PageFile id which is writing to + * @param live_files The live files after gc + */ +void PageStorage::gcRemoveObsoleteFiles(const std::set & page_files, + const PageFileIdAndLevel & writing_file_id_level, + const std::set & live_files) +{ + for (const auto & page_file : page_files) + { + const auto page_id_and_lvl = page_file.fileIdLevel(); + if (page_id_and_lvl >= writing_file_id_level) + { + continue; + } + + if (live_files.count(page_id_and_lvl) == 0) + { + // the page file is not used by any version, remove reader cache + page_file.destroy(); + } + } +} + } // namespace DB diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 6f0dec366e0..54b1ce27f4c 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -87,6 +87,10 @@ class PageStorage PageEntriesEdit gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & file_valid_pages, const GcCandidates & merge_files) const; + static void gcRemoveObsoleteFiles(const std::set & page_files, + const PageFileIdAndLevel & writing_file_id_level, + const std::set & live_files); + private: String storage_name; // Identify between different Storage String storage_path; diff --git a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp index b6f984b521a..79576777039 100644 --- a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp @@ -324,6 +324,65 @@ TEST_F(PageStorage_test, GcMoveRefPage) ASSERT_EQ(normal_page_id, 1UL); } +TEST_F(PageStorage_test, GcMovePageDelMeta) +{ + const size_t buf_sz = 256; + char c_buff[buf_sz]; + + { + // Page1 should be written to PageFile{1, 0} + WriteBatch batch; + memset(c_buff, 0xf, buf_sz); + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, 0, buff, buf_sz); + buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(2, 0, buff, buf_sz); + buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(3, 0, buff, buf_sz); + + storage->write(batch); + } + + { + // DelPage1 should be written to PageFile{2, 0} + WriteBatch batch; + batch.delPage(1); + + storage->write(batch); + } + + PageFileIdAndLevel id_and_lvl = {2, 0}; // PageFile{2, 0} is ready to be migrated by gc + PageStorage::GcLivesPages livesPages{{id_and_lvl, {0, {}}}}; + PageStorage::GcCandidates candidates{ + id_and_lvl, + }; + const auto page_files = PageStorage::listAllPageFiles(storage->storage_path, true, storage->page_file_log); + auto s0 = storage->getSnapshot(); + PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates); + + // We should see migration of DelPage1 + bool exist = false; + for (const auto & rec : edit.getRecords()) + { + if (rec.type == WriteBatch::WriteType::DEL && rec.page_id == 1) + { + exist = true; + break; + } + } + EXPECT_TRUE(exist); + s0.reset(); + + auto live_files = storage->versioned_page_entries.gcApply(edit); + EXPECT_EQ(live_files.find(id_and_lvl), live_files.end()); + storage->gcRemoveObsoleteFiles(/* page_files= */ page_files, /* writing_file_id_level= */ {3, 0}, live_files); + + // reopen PageStorage, Page 1 should be deleted + storage = reopenWithConfig(config); + auto s1 = storage->getSnapshot(); + ASSERT_EQ(s1->version()->find(1), std::nullopt); +} + /** * PageStorage tests with predefine Page1 && Page2 */ From fb42fde4f829a91ff69b538789f2239a10b95074 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 27 Sep 2019 15:07:12 +0800 Subject: [PATCH 5/6] enable fullstack-test --- tests/docker/run.sh | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/docker/run.sh b/tests/docker/run.sh index a9ff9dc2d56..4f526fd2be2 100755 --- a/tests/docker/run.sh +++ b/tests/docker/run.sh @@ -6,20 +6,19 @@ docker-compose down rm -rf ./data ./log -# temporary disable for branch DeltaMergeEngine -# ./build_learner_config.sh +./build_learner_config.sh -#docker-compose up -d --scale tics0=0 --scale tics-gtest=0 --scale tiflash0=0 --scale tikv-learner0=0 +docker-compose up -d --scale tics0=0 --scale tics-gtest=0 --scale tiflash0=0 --scale tikv-learner0=0 -#sleep 60 +sleep 60 -#docker-compose up -d --scale tics0=0 --scale tics-gtest=0 --scale tikv-learner0=0 --build +docker-compose up -d --scale tics0=0 --scale tics-gtest=0 --scale tikv-learner0=0 --build -#sleep 5 +sleep 5 -#docker-compose up -d --scale tics0=0 --scale tics-gtest=0 -#docker-compose exec -T tiflash0 bash -c 'cd /tests ; ./run-test.sh fullstack-test true' -#docker-compose down +docker-compose up -d --scale tics0=0 --scale tics-gtest=0 +docker-compose exec -T tiflash0 bash -c 'cd /tests ; ./run-test.sh fullstack-test true' +docker-compose down # (only tics0 up) docker-compose up -d --scale tics-gtest=0 --scale tiflash0=0 --scale tikv-learner0=0 --scale tikv0=0 --scale tidb0=0 --scale pd0=0 From 1d6e0f179282587bd7020baa5df0193b2fd411cf Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 27 Sep 2019 15:36:23 +0800 Subject: [PATCH 6/6] Revert "Remove getMaxId of PageStorage" This reverts commit 34d50eb6e9fb2f229f32e2d6b219b74c340d0d0a. --- dbms/src/Storages/DeltaMerge/StoragePool.cpp | 12 +++---- dbms/src/Storages/DeltaMerge/StoragePool.h | 9 +++-- dbms/src/Storages/Page/PageEntries.h | 12 ++++++- dbms/src/Storages/Page/PageFile.cpp | 34 ++++++------------- dbms/src/Storages/Page/PageFile.h | 2 +- dbms/src/Storages/Page/PageStorage.cpp | 10 ++++-- dbms/src/Storages/Page/PageStorage.h | 4 ++- .../PageEntriesVersionSetWithDelta.cpp | 3 ++ .../Page/VersionSet/PageEntriesView.cpp | 10 ++++++ .../Page/VersionSet/PageEntriesView.h | 2 ++ .../Page/tests/gtest_page_entry_map.cpp | 3 ++ 11 files changed, 61 insertions(+), 40 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index d88a60f959b..a4bd62f14ca 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -7,12 +7,12 @@ namespace DM // TODO: Load configs from settings. StoragePool::StoragePool(const String &name, const String & path) - : max_log_page_id(0), - max_data_page_id(0), - max_meta_page_id(0), - log_storage(name + ".log", path + "/log", {}, &max_log_page_id), - data_storage(name + ".data", path + "/data", {}, &max_data_page_id), - meta_storage(name + ".meta", path + "/meta", {}, &max_meta_page_id) + : log_storage(name + ".log", path + "/log", {}), + data_storage(name + ".data", path + "/data", {}), + meta_storage(name + ".meta", path + "/meta", {}), + max_log_page_id(log_storage.getMaxId()), + max_data_page_id(data_storage.getMaxId()), + max_meta_page_id(meta_storage.getMaxId()) { } diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index 9b8a1f19298..11f03c6bfd8 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -37,15 +37,14 @@ class StoragePool : private boost::noncopyable bool gc(const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD); private: - // These should init before PageStorages - std::atomic max_log_page_id; - std::atomic max_data_page_id; - std::atomic max_meta_page_id; - PageStorage log_storage; PageStorage data_storage; PageStorage meta_storage; + std::atomic max_log_page_id; + std::atomic max_data_page_id; + std::atomic max_meta_page_id; + std::atomic last_try_gc_time = Clock::now(); std::mutex mutex; diff --git a/dbms/src/Storages/Page/PageEntries.h b/dbms/src/Storages/Page/PageEntries.h index c081da85158..58bf16bfef9 100644 --- a/dbms/src/Storages/Page/PageEntries.h +++ b/dbms/src/Storages/Page/PageEntries.h @@ -26,7 +26,7 @@ template class PageEntriesMixin { public: - explicit PageEntriesMixin(bool is_base_) : normal_pages(), page_ref(), ref_deletions(), is_base(is_base_) {} + explicit PageEntriesMixin(bool is_base_) : normal_pages(), page_ref(), ref_deletions(), max_page_id(0), is_base(is_base_) {} public: static std::shared_ptr createBase() { return std::make_shared(true); } @@ -105,9 +105,12 @@ class PageEntriesMixin { page_ref.clear(); normal_pages.clear(); + max_page_id = 0; ref_deletions.clear(); } + PageId maxId() const { return max_page_id; } + public: using const_normal_page_iterator = std::unordered_map::const_iterator; // only scan over normal Pages, excluding RefPages @@ -120,6 +123,7 @@ class PageEntriesMixin // RefPageId deletions std::unordered_set ref_deletions; + PageId max_page_id; bool is_base; protected: @@ -154,6 +158,7 @@ class PageEntriesMixin { page_ref = rhs.page_ref; normal_pages = rhs.normal_pages; + max_page_id = rhs.max_page_id; ref_deletions = rhs.ref_deletions; } @@ -169,6 +174,7 @@ class PageEntriesMixin { normal_pages.swap(rhs.normal_pages); page_ref.swap(rhs.page_ref); + max_page_id = rhs.max_page_id; is_base = rhs.is_base; ref_deletions.swap(rhs.ref_deletions); } @@ -211,6 +217,8 @@ void PageEntriesMixin::put(PageId page_id, const PageEntry & entry) normal_pages[normal_page_id].ref = page_ref_count + is_new_ref_pair_inserted; } + // update max_page_id + max_page_id = std::max(max_page_id, page_id); } template @@ -265,6 +273,7 @@ void PageEntriesMixin::ref(const PageId ref_id, const PageId page_id) page_ref[ref_id] = normal_page_id; } } + max_page_id = std::max(max_page_id, std::max(ref_id, page_id)); } template @@ -434,6 +443,7 @@ class PageEntriesForDelta : public PageEntriesMixin, normal_pages[it.first] = it.second; } } + max_page_id = std::max(max_page_id, rhs.max_page_id); } bool shouldCompactToBase(const ::DB::MVCC::VersionSetConfig & config) diff --git a/dbms/src/Storages/Page/PageFile.cpp b/dbms/src/Storages/Page/PageFile.cpp index 417b36267d7..a6240c67381 100644 --- a/dbms/src/Storages/Page/PageFile.cpp +++ b/dbms/src/Storages/Page/PageFile.cpp @@ -141,17 +141,14 @@ std::pair genWriteData( // /// Analyze meta file, and return . std::pair analyzeMetaFile( // - const String & path, - PageFileId file_id, - UInt32 level, - const char * meta_data, - const size_t meta_data_size, - PageEntriesEdit & edit, - std::atomic * max_page_id, - Logger * log) + const String & path, + PageFileId file_id, + UInt32 level, + const char * meta_data, + const size_t meta_data_size, + PageEntriesEdit & edit, + Logger * log) { - assert(max_page_id != nullptr); - const char * meta_data_end = meta_data + meta_data_size; UInt64 page_data_file_size = 0; @@ -204,7 +201,6 @@ std::pair analyzeMetaFile( // pc.checksum = PageUtil::get(pos); edit.put(page_id, pc); - *max_page_id = std::max(*max_page_id, page_id); page_data_file_size += pc.size; break; } @@ -212,7 +208,6 @@ std::pair analyzeMetaFile( // { auto page_id = PageUtil::get(pos); edit.del(page_id); // Reserve the order of removal. - *max_page_id = std::max(*max_page_id, page_id); break; } case WriteBatch::WriteType::REF: @@ -220,7 +215,6 @@ std::pair analyzeMetaFile( // const auto ref_id = PageUtil::get(pos); const auto page_id = PageUtil::get(pos); edit.ref(ref_id, page_id); - *max_page_id = std::max(*max_page_id, page_id); } } } @@ -286,10 +280,7 @@ void PageFile::Writer::write(const WriteBatch & wb, PageEntriesEdit & edit) // PageFile::Reader // ========================================================= -PageFile::Reader::Reader(PageFile & page_file) - : data_file_path(page_file.dataPath()), data_file_fd(PageUtil::openFile(data_file_path)) -{ -} +PageFile::Reader::Reader(PageFile & page_file) : data_file_path(page_file.dataPath()), data_file_fd(PageUtil::openFile(data_file_path)) {} PageFile::Reader::~Reader() { @@ -469,7 +460,7 @@ PageFile PageFile::openPageFileForRead(PageFileId file_id, UInt32 level, const s return PageFile(file_id, level, parent_path, false, false, log); } -void PageFile::readAndSetPageMetas(PageEntriesEdit & edit, std::atomic * max_page_id) +void PageFile::readAndSetPageMetas(PageEntriesEdit & edit) { const auto path = metaPath(); Poco::File file(path); @@ -484,14 +475,9 @@ void PageFile::readAndSetPageMetas(PageEntriesEdit & edit, std::atomic * PageUtil::readFile(file_fd, 0, data, file_size, path); - // If max_page_id is nullptr, we will ignore that - std::atomic ignored_max_page_id = 0; - if (max_page_id == nullptr) - max_page_id = &ignored_max_page_id; - // analyze meta file and update page_entries std::tie(this->meta_file_pos, this->data_file_pos) - = PageMetaFormat::analyzeMetaFile(folderPath(), file_id, level, data, file_size, edit, max_page_id, log); + = PageMetaFormat::analyzeMetaFile(folderPath(), file_id, level, data, file_size, edit, log); } void PageFile::setFormal() diff --git a/dbms/src/Storages/Page/PageFile.h b/dbms/src/Storages/Page/PageFile.h index 8aeec5e94bf..34d74386f6f 100644 --- a/dbms/src/Storages/Page/PageFile.h +++ b/dbms/src/Storages/Page/PageFile.h @@ -91,7 +91,7 @@ class PageFile : public Allocator /// Get pages' metadata by this method. Will also update file pos. /// Call this method after a page file recovered. /// if check_page_map_complete is true, do del or ref on non-exist page will throw exception. - void readAndSetPageMetas(PageEntriesEdit & edit, std::atomic * max_page_id = nullptr); + void readAndSetPageMetas(PageEntriesEdit & edit); /// Rename this page file into formal style. void setFormal(); diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index bd0f2c9661f..1f1d84bc5da 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -52,7 +52,7 @@ PageStorage::listAllPageFiles(const String & storage_path, bool remove_tmp_file, return page_files; } -PageStorage::PageStorage(String name, const String & storage_path_, const Config & config_, std::atomic * max_page_id) +PageStorage::PageStorage(String name, const String & storage_path_, const Config & config_) : storage_name(std::move(name)), storage_path(storage_path_), config(config_), @@ -68,7 +68,7 @@ PageStorage::PageStorage(String name, const String & storage_path_, const Config for (auto & page_file : page_files) { PageEntriesEdit edit; - const_cast(page_file).readAndSetPageMetas(edit, max_page_id); + const_cast(page_file).readAndSetPageMetas(edit); // Only level 0 is writable. if (page_file.getLevel() == 0) @@ -99,6 +99,12 @@ PageStorage::PageStorage(String name, const String & storage_path_, const Config #endif } +PageId PageStorage::getMaxId() +{ + std::lock_guard write_lock(write_mutex); + return versioned_page_entries.getSnapshot()->version()->maxId(); +} + PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot) { if (!snapshot) diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 54b1ce27f4c..30c81b247b1 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -56,7 +56,9 @@ class PageStorage using OpenReadFiles = std::map; public: - PageStorage(String name, const String & storage_path, const Config & config_, std::atomic * max_page_id = nullptr); + PageStorage(String name, const String & storage_path, const Config & config_); + + PageId getMaxId(); void write(const WriteBatch & write_batch); diff --git a/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp b/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp index 19f31ca483f..946fc04567b 100644 --- a/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp +++ b/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp @@ -199,6 +199,8 @@ void DeltaVersionEditAcceptor::applyPut(PageEntriesEdit::EditRecord & rec) rec.entry.ref = old_entry->ref + !is_ref_exist; current_version->normal_pages[normal_page_id] = rec.entry; } + + current_version->max_page_id = std::max(current_version->max_page_id, rec.page_id); } void DeltaVersionEditAcceptor::applyDel(PageEntriesEdit::EditRecord & rec) @@ -251,6 +253,7 @@ void DeltaVersionEditAcceptor::applyRef(PageEntriesEdit::EditRecord & rec) current_version->page_ref[rec.page_id] = normal_page_id; } } + current_version->max_page_id = std::max(current_version->max_page_id, rec.page_id); } void DeltaVersionEditAcceptor::applyInplace(const PageEntriesVersionSetWithDelta::VersionPtr & current, const PageEntriesEdit & edit) diff --git a/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp b/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp index 02a90a95a1b..39eb443cb8c 100644 --- a/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp +++ b/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp @@ -141,4 +141,14 @@ std::set PageEntriesView::validNormalPageIds() const return valid_normal_pages; } +PageId PageEntriesView::maxId() const +{ + PageId max_id = 0; + for (auto node = tail; node != nullptr; node = node->prev) + { + max_id = std::max(max_id, node->maxId()); + } + return max_id; +} + } // namespace DB diff --git a/dbms/src/Storages/Page/VersionSet/PageEntriesView.h b/dbms/src/Storages/Page/VersionSet/PageEntriesView.h index ddb188b7aa5..f6857fd213a 100644 --- a/dbms/src/Storages/Page/VersionSet/PageEntriesView.h +++ b/dbms/src/Storages/Page/VersionSet/PageEntriesView.h @@ -28,6 +28,8 @@ class PageEntriesView std::set validNormalPageIds() const; std::optional findNormalPageEntry(PageId page_id) const; + PageId maxId() const; + inline std::shared_ptr getSharedTailVersion() const { return tail; } inline std::shared_ptr transferTailVersionOwn() diff --git a/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp b/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp index 2ba00eb6852..2bfb6be5e96 100644 --- a/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp +++ b/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp @@ -37,6 +37,7 @@ TEST_F(PageEntryMap_test, Empty) item_count += 1; } ASSERT_EQ(item_count, 0UL); + ASSERT_EQ(map->maxId(), 0UL); // add some Pages, RefPages @@ -52,6 +53,7 @@ TEST_F(PageEntryMap_test, Empty) item_count += 1; } ASSERT_EQ(item_count, 2UL); + ASSERT_EQ(map->maxId(), 1UL); map->clear(); item_count = 0; @@ -60,6 +62,7 @@ TEST_F(PageEntryMap_test, Empty) item_count += 1; } ASSERT_EQ(item_count, 0UL); + ASSERT_EQ(map->maxId(), 0UL); } TEST_F(PageEntryMap_test, UpdatePageEntry)