From 779acfa849488987a367ac02c8183efa193f92ee Mon Sep 17 00:00:00 2001 From: JaySon Date: Sat, 28 Sep 2019 00:25:51 +0800 Subject: [PATCH] Binary compatibility version for DeltaMerge's chunk; Fix disappear of PageStorage del meta (#257) * Chunk binary version * Add name to PageStorage for identify different storages * Remove getMaxId of PageStorage * Migrate DelPage mark in doing GC * enable fullstack-test * Revert "Remove getMaxId of PageStorage" This reverts commit 34d50eb6e9fb2f229f32e2d6b219b74c340d0d0a. --- dbms/src/Storages/DeltaMerge/Chunk.cpp | 10 ++ dbms/src/Storages/DeltaMerge/Chunk.h | 4 + .../Storages/DeltaMerge/DeltaMergeStore.cpp | 7 +- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 3 +- dbms/src/Storages/DeltaMerge/StoragePool.cpp | 8 +- dbms/src/Storages/DeltaMerge/StoragePool.h | 2 +- .../tests/gtest_dm_delta_merge_store.cpp | 2 +- .../tests/gtest_dm_disk_value_space.cpp | 2 +- .../DeltaMerge/tests/gtest_dm_segment.cpp | 2 +- dbms/src/Storages/Page/PageStorage.cpp | 97 ++++++++++++------- dbms/src/Storages/Page/PageStorage.h | 7 +- .../Page/tests/gtest_page_storage.cpp | 61 +++++++++++- dbms/src/Storages/StorageDeltaMerge.cpp | 9 +- .../Storages/Transaction/RegionPersister.h | 2 +- tests/docker/run.sh | 17 ++-- 15 files changed, 170 insertions(+), 63 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Chunk.cpp b/dbms/src/Storages/DeltaMerge/Chunk.cpp index 67f31caf7be..44f4307b71d 100644 --- a/dbms/src/Storages/DeltaMerge/Chunk.cpp +++ b/dbms/src/Storages/DeltaMerge/Chunk.cpp @@ -10,8 +10,12 @@ namespace DB namespace DM { +const Chunk::Version Chunk::CURRENT_VERSION = 1; + void Chunk::serialize(WriteBuffer & buf) const { + writeVarUInt(Chunk::CURRENT_VERSION, buf); // Add binary version + writeIntBinary(handle_start, buf); writeIntBinary(handle_end, buf); writePODBinary(is_delete_range, buf); @@ -37,6 +41,12 @@ void Chunk::serialize(WriteBuffer & buf) const Chunk Chunk::deserialize(ReadBuffer & buf) { + // Check binary version + Chunk::Version chunk_batch_version; + readVarUInt(chunk_batch_version, buf); + if (chunk_batch_version != Chunk::CURRENT_VERSION) + throw Exception("Chunk binary version not match: " + DB::toString(chunk_batch_version), ErrorCodes::LOGICAL_ERROR); + Handle start, end; readIntBinary(start, buf); readIntBinary(end, buf); diff --git a/dbms/src/Storages/DeltaMerge/Chunk.h b/dbms/src/Storages/DeltaMerge/Chunk.h index d68994f7cbf..41c34805913 100644 --- a/dbms/src/Storages/DeltaMerge/Chunk.h +++ b/dbms/src/Storages/DeltaMerge/Chunk.h @@ -35,6 +35,10 @@ using ColumnMetas = std::vector; class Chunk { +public: + // Binary version of chunk + using Version = UInt32; + static const Version CURRENT_VERSION; public: using ColumnMetaMap = std::unordered_map; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 1178d7b02af..9edd2c59695 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -38,13 +38,14 @@ namespace DM DeltaMergeStore::DeltaMergeStore(Context & db_context, const String & path_, - const String & name, + const String & db_name, + const String & tbl_name, const ColumnDefines & columns, const ColumnDefine & handle, const Settings & settings_) : path(path_), - storage_pool(path), - table_name(name), + storage_pool(db_name + "." + tbl_name, path), + table_name(tbl_name), table_handle_define(handle), background_pool(db_context.getBackgroundPool()), settings(settings_), diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 411b6c802c8..0f0886bb8dd 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -43,7 +43,8 @@ class DeltaMergeStore DeltaMergeStore(Context & db_context, // const String & path_, - const String & name, + const String & db_name, + const String & tbl_name, const ColumnDefines & columns, const ColumnDefine & handle, const Settings & settings_); diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index ff063aebfd7..a4bd62f14ca 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -6,10 +6,10 @@ namespace DM { // TODO: Load configs from settings. -StoragePool::StoragePool(const String & path) - : log_storage(path + "/log", {}), - data_storage(path + "/data", {}), - meta_storage(path + "/meta", {}), +StoragePool::StoragePool(const String &name, const String & path) + : log_storage(name + ".log", path + "/log", {}), + data_storage(name + ".data", path + "/data", {}), + meta_storage(name + ".meta", path + "/meta", {}), max_log_page_id(log_storage.getMaxId()), max_data_page_id(data_storage.getMaxId()), max_meta_page_id(meta_storage.getMaxId()) diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index e58f62317d0..11f03c6bfd8 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -20,7 +20,7 @@ class StoragePool : private boost::noncopyable using Duration = Clock::duration; using Seconds = std::chrono::seconds; - explicit StoragePool(const String & path); + StoragePool(const String & name, const String & path); PageId maxLogPageId() { return max_log_page_id; } PageId maxDataPageId() { return max_data_page_id; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 7db1c3d2e11..c611bb62509 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -51,7 +51,7 @@ class DeltaMergeStore_test : public ::testing::Test ColumnDefine handle_column_define = cols[0]; DeltaMergeStorePtr s - = std::make_shared(*context, path, name, cols, handle_column_define, DeltaMergeStore::Settings()); + = std::make_shared(*context, path, "test", name, cols, handle_column_define, DeltaMergeStore::Settings()); return s; } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp index 3133758dcab..1f1896ce623 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp @@ -34,7 +34,7 @@ class DiskValueSpace_test : public ::testing::Test { dropDataInDisk(); - storage_pool = std::make_unique(path); + storage_pool = std::make_unique("test.t1", path); Context & context = DMTestEnv::getContext(); table_handle_define = ColumnDefine(1, "pk", std::make_shared()); table_columns.clear(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 812231f7240..e52fc67179e 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -53,7 +53,7 @@ class Segment_test : public ::testing::Test SegmentPtr reload(ColumnDefines && pre_define_columns = {}, DB::Settings && db_settings = DB::Settings()) { - storage_pool = std::make_unique(path); + storage_pool = std::make_unique("test.t1", path); *db_context = DMTestEnv::getContext(db_settings); ColumnDefines cols = pre_define_columns.empty() ? DMTestEnv::getDefaultColumns() : pre_define_columns; setColumns(cols); diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index d3714e3fe86..1f1d84bc5da 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -1,6 +1,7 @@ -#include - #include +#include + +#include #include #include @@ -51,8 +52,9 @@ PageStorage::listAllPageFiles(const String & storage_path, bool remove_tmp_file, return page_files; } -PageStorage::PageStorage(const String & storage_path_, const Config & config_) - : storage_path(storage_path_), +PageStorage::PageStorage(String name, const String & storage_path_, const Config & config_) + : storage_name(std::move(name)), + storage_path(storage_path_), config(config_), versioned_page_entries(), page_file_log(&Poco::Logger::get("PageFile")), @@ -120,7 +122,7 @@ PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot) } catch (DB::Exception & e) { - LOG_WARNING(log, e.message()); + LOG_WARNING(log, storage_name << " " << e.message()); return {}; // return invalid PageEntry } } @@ -336,8 +338,6 @@ bool PageStorage::gc() return false; } - LOG_DEBUG(log, "PageStorage GC start"); - PageFileIdAndLevel writing_file_id_level; { std::lock_guard lock(write_mutex); @@ -387,12 +387,14 @@ bool PageStorage::gc() || (merge_files.size() >= 2 && candidate_total_size >= config.merge_hint_low_used_file_total_size); if (!should_merge) { - LOG_DEBUG(log, - "GC exit without merging. merge file size: " << merge_files.size() << ", candidate size: " << candidate_total_size); + LOG_TRACE(log, + storage_name << " GC exit without merging. merge file size: " << merge_files.size() + << ", candidate size: " << candidate_total_size); return false; } - LOG_INFO(log, "GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count << " regions"); + LOG_INFO(log, + storage_name << " GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count << " regions"); // There are no valid pages to be migrated but valid ref pages, scan over all `merge_files` and do migrate. gc_file_entries_edit = gcMigratePages(snapshot, file_valid_pages, merge_files); @@ -421,20 +423,7 @@ bool PageStorage::gc() } // Delete obsolete files that are not used by any version, without lock - for (const auto & page_file : page_files) - { - const auto page_id_and_lvl = page_file.fileIdLevel(); - if (page_id_and_lvl >= writing_file_id_level) - { - continue; - } - - if (live_files.count(page_id_and_lvl) == 0) - { - // the page file is not used by any version, remove reader cache - page_file.destroy(); - } - } + gcRemoveObsoleteFiles(page_files, writing_file_id_level, live_files); return true; } @@ -491,6 +480,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f size_t num_successful_migrate_pages = 0; size_t num_valid_ref_pages = 0; + size_t num_del_page_meta = 0; auto * current = snapshot->version(); { PageEntriesEdit legacy_edit; // All page entries in `merge_files` @@ -510,8 +500,8 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f continue; } + PageIdAndEntries page_id_and_entries; // The valid pages that we need to migrate to `gc_file` auto to_merge_file_reader = to_merge_file.createReader(); - PageIdAndEntries page_id_and_entries; { const auto & page_ids = it->second.second; for (auto page_id : page_ids) @@ -530,7 +520,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f catch (DB::Exception & e) { // ignore if it2 is a ref to non-exist page - LOG_WARNING(log, "Ignore invalid RefPage while gcMigratePages: " + e.message()); + LOG_WARNING(log, storage_name << " Ignore invalid RefPage while gcMigratePages: " << e.message()); } } } @@ -554,17 +544,26 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f } { - // Migrate RefPages which are still valid. + // Migrate valid RefPages and DelPage. WriteBatch batch; for (const auto & rec : legacy_edit.getRecords()) { - // Get `normal_page_id` from memory's `page_entry_map`. Note: can not get `normal_page_id` from disk, - // if it is a record of RefPage to another RefPage, the later ref-id is resolve to the actual `normal_page_id`. - auto [is_ref, normal_page_id] = current->isRefId(rec.page_id); - if (is_ref) + if (rec.type == WriteBatch::WriteType::REF) { - batch.putRefPage(rec.page_id, normal_page_id); - num_valid_ref_pages += 1; + // Get `normal_page_id` from memory's `page_entry_map`. Note: can not get `normal_page_id` from disk, + // if it is a record of RefPage to another RefPage, the later ref-id is resolve to the actual `normal_page_id`. + auto [is_ref, normal_page_id] = current->isRefId(rec.page_id); + if (is_ref) + { + batch.putRefPage(rec.page_id, normal_page_id); + num_valid_ref_pages += 1; + } + } + else if (rec.type == WriteBatch::WriteType::DEL) + { + // DelPage should be migrate to new PageFile + batch.delPage(rec.page_id); + num_del_page_meta += 1; } } gc_file_writer->write(batch, gc_file_edit); @@ -580,10 +579,38 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f gc_file.setFormal(); const auto id = gc_file.fileIdLevel(); LOG_INFO(log, - "GC have migrated " << num_successful_migrate_pages << " regions and " << num_valid_ref_pages << " RefPages to PageFile_" - << id.first << "_" << id.second); + storage_name << " GC have migrated " << num_successful_migrate_pages // + << " regions and " << num_valid_ref_pages // + << " RefPages and " << num_del_page_meta // + << " DelPage to PageFile_" << id.first << "_" << id.second); } return gc_file_edit; } +/** + * Delete obsolete files that are not used by any version + * @param page_files All avaliable files in disk + * @param writing_file_id_level The PageFile id which is writing to + * @param live_files The live files after gc + */ +void PageStorage::gcRemoveObsoleteFiles(const std::set & page_files, + const PageFileIdAndLevel & writing_file_id_level, + const std::set & live_files) +{ + for (const auto & page_file : page_files) + { + const auto page_id_and_lvl = page_file.fileIdLevel(); + if (page_id_and_lvl >= writing_file_id_level) + { + continue; + } + + if (live_files.count(page_id_and_lvl) == 0) + { + // the page file is not used by any version, remove reader cache + page_file.destroy(); + } + } +} + } // namespace DB diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 7471c872416..30c81b247b1 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -56,7 +56,7 @@ class PageStorage using OpenReadFiles = std::map; public: - PageStorage(const String & storage_path, const Config & config_); + PageStorage(String name, const String & storage_path, const Config & config_); PageId getMaxId(); @@ -89,7 +89,12 @@ class PageStorage PageEntriesEdit gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & file_valid_pages, const GcCandidates & merge_files) const; + static void gcRemoveObsoleteFiles(const std::set & page_files, + const PageFileIdAndLevel & writing_file_id_level, + const std::set & live_files); + private: + String storage_name; // Identify between different Storage String storage_path; Config config; diff --git a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp index 09966c3a6ee..79576777039 100644 --- a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp @@ -56,7 +56,7 @@ class PageStorage_test : public ::testing::Test std::shared_ptr reopenWithConfig(const PageStorage::Config & config_) { - return std::make_shared(path, config_); + return std::make_shared("test.t", path, config_); } protected: @@ -324,6 +324,65 @@ TEST_F(PageStorage_test, GcMoveRefPage) ASSERT_EQ(normal_page_id, 1UL); } +TEST_F(PageStorage_test, GcMovePageDelMeta) +{ + const size_t buf_sz = 256; + char c_buff[buf_sz]; + + { + // Page1 should be written to PageFile{1, 0} + WriteBatch batch; + memset(c_buff, 0xf, buf_sz); + ReadBufferPtr buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(1, 0, buff, buf_sz); + buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(2, 0, buff, buf_sz); + buff = std::make_shared(c_buff, sizeof(c_buff)); + batch.putPage(3, 0, buff, buf_sz); + + storage->write(batch); + } + + { + // DelPage1 should be written to PageFile{2, 0} + WriteBatch batch; + batch.delPage(1); + + storage->write(batch); + } + + PageFileIdAndLevel id_and_lvl = {2, 0}; // PageFile{2, 0} is ready to be migrated by gc + PageStorage::GcLivesPages livesPages{{id_and_lvl, {0, {}}}}; + PageStorage::GcCandidates candidates{ + id_and_lvl, + }; + const auto page_files = PageStorage::listAllPageFiles(storage->storage_path, true, storage->page_file_log); + auto s0 = storage->getSnapshot(); + PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates); + + // We should see migration of DelPage1 + bool exist = false; + for (const auto & rec : edit.getRecords()) + { + if (rec.type == WriteBatch::WriteType::DEL && rec.page_id == 1) + { + exist = true; + break; + } + } + EXPECT_TRUE(exist); + s0.reset(); + + auto live_files = storage->versioned_page_entries.gcApply(edit); + EXPECT_EQ(live_files.find(id_and_lvl), live_files.end()); + storage->gcRemoveObsoleteFiles(/* page_files= */ page_files, /* writing_file_id_level= */ {3, 0}, live_files); + + // reopen PageStorage, Page 1 should be deleted + storage = reopenWithConfig(config); + auto s1 = storage->getSnapshot(); + ASSERT_EQ(s1->version()->find(1), std::nullopt); +} + /** * PageStorage tests with predefine Page1 && Page2 */ diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index c4b9bdff4ee..dcb51c85548 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -120,8 +120,8 @@ StorageDeltaMerge::StorageDeltaMerge(const String & path_, assert(!handle_column_define.name.empty()); assert(!table_column_defines.empty()); - store = std::make_shared( - global_context, path, table_name, std::move(table_column_defines), std::move(handle_column_define), DeltaMergeStore::Settings()); + store = std::make_shared(global_context, path, db_name, table_name, std::move(table_column_defines), + std::move(handle_column_define), DeltaMergeStore::Settings()); } void StorageDeltaMerge::drop() @@ -522,8 +522,9 @@ void StorageDeltaMerge::rename(const String & new_path_to_db, const String & new // rename path and generate a new store Poco::File(path).renameTo(new_path); - store = std::make_shared( - global_context, new_path, new_table_name, std::move(table_column_defines), std::move(handle_column_define), settings); + store = std::make_shared(global_context, // + new_path, new_database_name, new_table_name, // + std::move(table_column_defines), std::move(handle_column_define), settings); path = new_path; db_name = new_database_name; diff --git a/dbms/src/Storages/Transaction/RegionPersister.h b/dbms/src/Storages/Transaction/RegionPersister.h index f6fa5393ca4..e7c2489da83 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.h +++ b/dbms/src/Storages/Transaction/RegionPersister.h @@ -20,7 +20,7 @@ class RegionPersister final : private boost::noncopyable { public: RegionPersister(const std::string & storage_path, const RegionManager & region_manager_, const PageStorage::Config & config = {}) - : page_storage(storage_path, config), region_manager(region_manager_), log(&Logger::get("RegionPersister")) + : page_storage("RegionPersister", storage_path, config), region_manager(region_manager_), log(&Logger::get("RegionPersister")) {} void drop(RegionID region_id); diff --git a/tests/docker/run.sh b/tests/docker/run.sh index a9ff9dc2d56..4f526fd2be2 100755 --- a/tests/docker/run.sh +++ b/tests/docker/run.sh @@ -6,20 +6,19 @@ docker-compose down rm -rf ./data ./log -# temporary disable for branch DeltaMergeEngine -# ./build_learner_config.sh +./build_learner_config.sh -#docker-compose up -d --scale tics0=0 --scale tics-gtest=0 --scale tiflash0=0 --scale tikv-learner0=0 +docker-compose up -d --scale tics0=0 --scale tics-gtest=0 --scale tiflash0=0 --scale tikv-learner0=0 -#sleep 60 +sleep 60 -#docker-compose up -d --scale tics0=0 --scale tics-gtest=0 --scale tikv-learner0=0 --build +docker-compose up -d --scale tics0=0 --scale tics-gtest=0 --scale tikv-learner0=0 --build -#sleep 5 +sleep 5 -#docker-compose up -d --scale tics0=0 --scale tics-gtest=0 -#docker-compose exec -T tiflash0 bash -c 'cd /tests ; ./run-test.sh fullstack-test true' -#docker-compose down +docker-compose up -d --scale tics0=0 --scale tics-gtest=0 +docker-compose exec -T tiflash0 bash -c 'cd /tests ; ./run-test.sh fullstack-test true' +docker-compose down # (only tics0 up) docker-compose up -d --scale tics-gtest=0 --scale tiflash0=0 --scale tikv-learner0=0 --scale tikv0=0 --scale tidb0=0 --scale pd0=0