From ed38c2a78c5e17de6ffe762eedd1a2908ff91c9e Mon Sep 17 00:00:00 2001
From: JaySon <jayson.hjs@gmail.com>
Date: Sat, 12 Oct 2019 17:51:15 +0800
Subject: [PATCH] [FLASH-546] Fix exception after reload PageStorage from disk
 (#274)

* Idempotent del for PageStorage
* Fix the won't decrease of Normal Page in PageStorage
* Add more test case for checking the ref-count of NormalPage

* add metrics: PSMVCCNumSnapshots
---
 dbms/src/Common/CurrentMetrics.cpp            |   1 +
 dbms/src/Common/CurrentMetrics.h              |   2 +-
 dbms/src/Storages/Page/PageEntries.h          |  11 +-
 .../PageEntriesVersionSetWithDelta.cpp        |  11 +-
 .../Page/VersionSet/PageEntriesView.cpp       |   2 +
 .../Storages/Page/mvcc/VersionSetWithDelta.h  |  20 ++-
 .../Page/tests/gtest_page_entry_map.cpp       |  49 +++++
 .../Page/tests/gtest_page_map_version_set.cpp | 169 ++++++++++++++++--
 .../Page/tests/gtest_page_storage.cpp         | 117 +++++++++++-
 .../Page/tests/utils_get_valid_pages.cpp      |  16 +-
 10 files changed, 369 insertions(+), 29 deletions(-)

diff --git a/dbms/src/Common/CurrentMetrics.cpp b/dbms/src/Common/CurrentMetrics.cpp
index af29cb4912f..c7e83fb9572 100644
--- a/dbms/src/Common/CurrentMetrics.cpp
+++ b/dbms/src/Common/CurrentMetrics.cpp
@@ -37,6 +37,7 @@
     M(StorageBufferBytes) \
     M(DictCacheRequests) \
     M(Revision) \
+    M(PSMVCCNumSnapshots) \
     M(RWLockWaitingReaders) \
     M(RWLockWaitingWriters) \
     M(RWLockActiveReaders) \
diff --git a/dbms/src/Common/CurrentMetrics.h b/dbms/src/Common/CurrentMetrics.h
index 82384475e3c..19e76fbd7e9 100644
--- a/dbms/src/Common/CurrentMetrics.h
+++ b/dbms/src/Common/CurrentMetrics.h
@@ -50,7 +50,7 @@ namespace CurrentMetrics
         add(metric, -value);
     }
 
-    /// For lifetime of object, add amout for specified metric. Then subtract.
+    /// For lifetime of object, add amount for specified metric. Then subtract.
     class Increment
     {
     private:
diff --git a/dbms/src/Storages/Page/PageEntries.h b/dbms/src/Storages/Page/PageEntries.h
index faf4cada80a..0543498f076 100644
--- a/dbms/src/Storages/Page/PageEntries.h
+++ b/dbms/src/Storages/Page/PageEntries.h
@@ -107,7 +107,7 @@ class PageEntriesMixin
         {
             return {false, 0UL};
         }
-        return {ref_pair->second != page_id, ref_pair->second};
+        return {true, ref_pair->second};
     }
 
     inline void clear()
@@ -237,10 +237,13 @@ void PageEntriesMixin<T>::del(PageId page_id)
     assert(is_base); // can only call by base
     // Note: must resolve ref-id before erasing entry in `page_ref`
     const PageId normal_page_id = resolveRefId(page_id);
-    page_ref.erase(page_id);
 
-    // decrease origin page's ref counting
-    decreasePageRef<must_exist>(normal_page_id);
+    const size_t num_erase = page_ref.erase(page_id);
+    if (num_erase > 0)
+    {
+        // decrease origin page's ref counting
+        decreasePageRef<must_exist>(normal_page_id);
+    }
 }
 
 template <typename T>
diff --git a/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp b/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp
index 939ed67160b..0567b0c5321 100644
--- a/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp
+++ b/dbms/src/Storages/Page/VersionSet/PageEntriesVersionSetWithDelta.cpp
@@ -208,10 +208,15 @@ void DeltaVersionEditAcceptor::applyPut(PageEntriesEdit::EditRecord & rec)
 void DeltaVersionEditAcceptor::applyDel(PageEntriesEdit::EditRecord & rec)
 {
     assert(rec.type == WriteBatch::WriteType::DEL);
-    const PageId normal_page_id = view->resolveRefId(rec.page_id);
+    auto [is_ref, normal_page_id] = view->isRefId(rec.page_id);
+    view->resolveRefId(rec.page_id);
     current_version->ref_deletions.insert(rec.page_id);
     current_version->page_ref.erase(rec.page_id);
-    this->decreasePageRef(normal_page_id);
+    if (is_ref)
+    {
+        // If ref exists, we need to decrease entry ref-count
+        this->decreasePageRef(normal_page_id);
+    }
 }
 
 void DeltaVersionEditAcceptor::applyRef(PageEntriesEdit::EditRecord & rec)
@@ -283,7 +288,7 @@ void DeltaVersionEditAcceptor::applyInplace(const PageEntriesVersionSetWithDelta
 void DeltaVersionEditAcceptor::decreasePageRef(const PageId page_id)
 {
     const auto old_entry = view->findNormalPageEntry(page_id);
-    if (!old_entry)
+    if (old_entry)
     {
         auto entry = *old_entry;
         entry.ref  = old_entry->ref <= 1 ? 0 : old_entry->ref - 1;
diff --git a/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp b/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp
index 39eb443cb8c..8553ae74b0a 100644
--- a/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp
+++ b/dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp
@@ -67,6 +67,7 @@ std::optional<PageEntry> PageEntriesView::findNormalPageEntry(PageId page_id) co
 std::pair<bool, PageId> PageEntriesView::isRefId(PageId page_id) const
 {
     auto node = tail;
+    // For delta, we need to check if page_id is deleted, then try to find in page_ref
     for (; !node->isBase(); node = node->prev)
     {
         if (node->ref_deletions.count(page_id) > 0)
@@ -75,6 +76,7 @@ std::pair<bool, PageId> PageEntriesView::isRefId(PageId page_id) const
         if (iter != node->page_ref.end())
             return {true, iter->second};
     }
+    // For base
     return node->isRefId(page_id);
 }
 
diff --git a/dbms/src/Storages/Page/mvcc/VersionSetWithDelta.h b/dbms/src/Storages/Page/mvcc/VersionSetWithDelta.h
index 784d35c2581..c969c61b722 100644
--- a/dbms/src/Storages/Page/mvcc/VersionSetWithDelta.h
+++ b/dbms/src/Storages/Page/mvcc/VersionSetWithDelta.h
@@ -8,6 +8,7 @@
 #include <stack>
 #include <unordered_set>
 
+#include <Common/CurrentMetrics.h>
 #include <Common/ProfileEvents.h>
 #include <IO/WriteHelpers.h>
 #include <Storages/Page/mvcc/VersionSet.h>
@@ -22,6 +23,11 @@ extern const Event PSMVCCApplyOnCurrentDelta;
 extern const Event PSMVCCApplyOnNewDelta;
 } // namespace ProfileEvents
 
+namespace CurrentMetrics
+{
+extern const Metric PSMVCCNumSnapshots;
+} // namespace CurrentMetrics
+
 namespace DB
 {
 namespace MVCC
@@ -61,6 +67,8 @@ class VersionSetWithDelta
           snapshots(std::move(std::make_shared<Snapshot>(this, nullptr))), //
           config(config_)
     {
+        // The placeholder snapshot should not be counted.
+        CurrentMetrics::sub(CurrentMetrics::PSMVCCNumSnapshots);
     }
 
     virtual ~VersionSetWithDelta()
@@ -68,6 +76,9 @@ class VersionSetWithDelta
         current.reset();
         // snapshot list is empty
         assert(snapshots->prev == snapshots.get());
+
+        // Ignore the destructor of placeholder snapshot
+        CurrentMetrics::add(CurrentMetrics::PSMVCCNumSnapshots);
     }
 
     void apply(TVersionEdit & edit)
@@ -114,7 +125,10 @@ class VersionSetWithDelta
         Snapshot * next;
 
     public:
-        Snapshot(VersionSetWithDelta * vset_, VersionPtr tail_) : vset(vset_), view(std::move(tail_)), prev(this), next(this) {}
+        Snapshot(VersionSetWithDelta * vset_, VersionPtr tail_) : vset(vset_), view(std::move(tail_)), prev(this), next(this)
+        {
+            CurrentMetrics::add(CurrentMetrics::PSMVCCNumSnapshots);
+        }
 
         ~Snapshot()
         {
@@ -123,6 +137,8 @@ class VersionSetWithDelta
             std::unique_lock lock = vset->acquireForLock();
             prev->next            = next;
             next->prev            = prev;
+
+            CurrentMetrics::sub(CurrentMetrics::PSMVCCNumSnapshots);
         }
 
         const TVersionView * version() const { return &view; }
@@ -327,6 +343,8 @@ class VersionSetWithDelta
     VersionPtr                   current;
     SnapshotPtr                  snapshots;
     ::DB::MVCC::VersionSetConfig config;
+
+    friend class VersionSetWithDeltaCompactTest;
 };
 
 } // namespace MVCC
diff --git a/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp b/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp
index 2bfb6be5e96..267ce863509 100644
--- a/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp
+++ b/dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp
@@ -125,6 +125,55 @@ TEST_F(PageEntryMap_test, PutDel)
     ASSERT_EQ(map->find(2), std::nullopt);
 }
 
+TEST_F(PageEntryMap_test, IdempotentDel)
+{
+    PageEntry p0entry;
+    p0entry.file_id  = 1;
+    p0entry.checksum = 0x123;
+    map->put(0, p0entry);
+    {
+        ASSERT_NE(map->find(0), std::nullopt);
+        const PageEntry & entry = map->at(0);
+        EXPECT_EQ(entry.file_id, p0entry.file_id);
+        EXPECT_EQ(entry.checksum, p0entry.checksum);
+    }
+    map->ref(2, 0);
+    {
+        ASSERT_NE(map->find(2), std::nullopt);
+        const PageEntry & entry = map->at(2);
+        EXPECT_EQ(entry.file_id, p0entry.file_id);
+        EXPECT_EQ(entry.checksum, p0entry.checksum);
+    }
+
+    map->del(0);
+    {
+        // Should not found Page0, but Page2 is still available
+        ASSERT_EQ(map->find(0), std::nullopt);
+        auto entry = map->find(2);
+        ASSERT_TRUE(entry);
+        EXPECT_EQ(entry->file_id, p0entry.file_id);
+        EXPECT_EQ(entry->checksum, p0entry.checksum);
+        entry = map->findNormalPageEntry(0);
+        ASSERT_TRUE(entry);
+        EXPECT_EQ(entry->file_id, p0entry.file_id);
+        EXPECT_EQ(entry->checksum, p0entry.checksum);
+    }
+
+    // Del should be idempotent
+    map->del(0);
+    {
+        ASSERT_EQ(map->find(0), std::nullopt);
+        auto entry = map->find(2);
+        ASSERT_TRUE(entry);
+        EXPECT_EQ(entry->file_id, p0entry.file_id);
+        EXPECT_EQ(entry->checksum, p0entry.checksum);
+        entry = map->findNormalPageEntry(0);
+        ASSERT_TRUE(entry);
+        EXPECT_EQ(entry->file_id, p0entry.file_id);
+        EXPECT_EQ(entry->checksum, p0entry.checksum);
+    }
+}
+
 TEST_F(PageEntryMap_test, UpdateRefPageEntry)
 {
     const PageId page_id = 0;
diff --git a/dbms/src/Storages/Page/tests/gtest_page_map_version_set.cpp b/dbms/src/Storages/Page/tests/gtest_page_map_version_set.cpp
index 0f19832e672..63a075f01c6 100644
--- a/dbms/src/Storages/Page/tests/gtest_page_map_version_set.cpp
+++ b/dbms/src/Storages/Page/tests/gtest_page_map_version_set.cpp
@@ -3,6 +3,10 @@
 
 #include <type_traits>
 
+#define protected public
+#include <Storages/Page/mvcc/VersionSetWithDelta.h>
+#undef protected
+
 #include <Poco/AutoPtr.h>
 #include <Poco/ConsoleChannel.h>
 #include <Poco/FormattingChannel.h>
@@ -62,6 +66,7 @@ TYPED_TEST_P(PageMapVersionSet_test, ApplyEdit)
         PageEntry       e;
         e.checksum = 0x456;
         edit.put(1, e);
+        edit.ref(2, 0);
         versions.apply(edit);
     }
     LOG_TRACE(&Logger::root(), "apply    B:" + versions.toDebugStringUnlocked());
@@ -69,8 +74,10 @@ TYPED_TEST_P(PageMapVersionSet_test, ApplyEdit)
     EXPECT_EQ(versions.size(), 1UL);
     auto entry = s2->version()->at(0);
     ASSERT_EQ(entry.checksum, 0x123UL);
+    ASSERT_EQ(entry.ref, 2UL);
     auto entry2 = s2->version()->at(1);
     ASSERT_EQ(entry2.checksum, 0x456UL);
+    ASSERT_EQ(entry2.ref, 1UL);
     s2.reset(); // release snapshot
     EXPECT_EQ(versions.size(), 1UL);
 }
@@ -225,7 +232,8 @@ namespace
 
 #pragma clang diagnostic push
 #pragma clang diagnostic ignored "-Wunused-function"
-std::set<PageId> getNormalPageIDs(const PageEntriesVersionSet::SnapshotPtr &s)
+
+std::set<PageId> getNormalPageIDs(const PageEntriesVersionSet::SnapshotPtr & s)
 {
     std::set<PageId> ids;
     for (auto iter = s->version()->pages_cbegin(); iter != s->version()->pages_cend(); iter++)
@@ -233,10 +241,11 @@ std::set<PageId> getNormalPageIDs(const PageEntriesVersionSet::SnapshotPtr &s)
     return ids;
 }
 
-std::set<PageId> getNormalPageIDs(const PageEntriesVersionSetWithDelta::SnapshotPtr &s)
+std::set<PageId> getNormalPageIDs(const PageEntriesVersionSetWithDelta::SnapshotPtr & s)
 {
     return s->version()->validNormalPageIds();
 }
+
 #pragma clang diagnostic pop
 
 } // namespace
@@ -246,10 +255,10 @@ TYPED_TEST_P(PageMapVersionSet_test, Restore)
     TypeParam versions(this->config_);
     if constexpr (std::is_same_v<TypeParam, PageEntriesVersionSet>)
     {
+        // For PageEntriesVersionSet, we need a builder
         auto s1 = versions.getSnapshot();
 
         typename TypeParam::BuilderType builder(s1->version(), true, &Poco::Logger::root());
-
         {
             PageEntriesEdit edit;
             PageEntry       e;
@@ -271,6 +280,7 @@ TYPED_TEST_P(PageMapVersionSet_test, Restore)
     }
     else
     {
+        // For PageEntriesVersionSetWithDelta, we directly apply edit to versions
         {
             PageEntriesEdit edit;
             PageEntry       e;
@@ -305,12 +315,12 @@ TYPED_TEST_P(PageMapVersionSet_test, Restore)
     ASSERT_TRUE(valid_normal_page_ids.count(3) > 0);
 }
 
-TYPED_TEST_P(PageMapVersionSet_test, PutRefPage)
+TYPED_TEST_P(PageMapVersionSet_test, PutOrDelRefPage)
 {
     TypeParam versions(this->config_);
     {
         PageEntriesEdit edit;
-        PageEntry e;
+        PageEntry       e;
         e.checksum = 0xf;
         edit.put(2, e);
         versions.apply(edit);
@@ -324,12 +334,144 @@ TYPED_TEST_P(PageMapVersionSet_test, PutRefPage)
         edit.ref(3, 2);
         versions.apply(edit);
     }
+    auto s2                      = versions.getSnapshot();
+    auto ensure_snapshot2_status = [&s2]() {
+        // Check the ref-count
+        auto entry3 = s2->version()->at(3);
+        ASSERT_EQ(entry3.checksum, 0xfUL);
+        ASSERT_EQ(entry3.ref, 2UL);
+
+        auto entry2 = s2->version()->at(2);
+        ASSERT_EQ(entry2.checksum, 0xfUL);
+        ASSERT_EQ(entry2.ref, 2UL);
+
+        auto normal_entry2 = s2->version()->findNormalPageEntry(2);
+        ASSERT_TRUE(normal_entry2);
+        ASSERT_EQ(normal_entry2->checksum, 0xfUL);
+        ASSERT_EQ(normal_entry2->ref, 2UL);
+
+        std::set<PageId> valid_normal_page_ids = getNormalPageIDs(s2);
+        ASSERT_TRUE(valid_normal_page_ids.count(2) > 0);
+        ASSERT_FALSE(valid_normal_page_ids.count(3) > 0);
+    };
+    ensure_snapshot2_status();
+
+    // Del Page2
+    {
+        PageEntriesEdit edit;
+        edit.del(2);
+        versions.apply(edit);
+    }
+    auto s3                      = versions.getSnapshot();
+    auto ensure_snapshot3_status = [&s3]() {
+        // Check that NormalPage2's ref-count is decreased.
+        auto entry3 = s3->version()->at(3);
+        ASSERT_EQ(entry3.checksum, 0xfUL);
+        ASSERT_EQ(entry3.ref, 1UL);
+
+        auto entry2 = s3->version()->find(2);
+        ASSERT_FALSE(entry2);
+
+        auto normal_entry2 = s3->version()->findNormalPageEntry(2);
+        ASSERT_TRUE(normal_entry2);
+        ASSERT_EQ(normal_entry2->checksum, 0xfUL);
+        ASSERT_EQ(normal_entry2->ref, 1UL);
+
+        std::set<PageId> valid_normal_page_ids = getNormalPageIDs(s3);
+        ASSERT_TRUE(valid_normal_page_ids.count(2) > 0);
+        ASSERT_FALSE(valid_normal_page_ids.count(3) > 0);
+    };
+    ensure_snapshot3_status();
+
+    // Del RefPage3
+    {
+        PageEntriesEdit edit;
+        edit.del(3);
+        versions.apply(edit);
+    }
+    auto s4                      = versions.getSnapshot();
+    auto ensure_snapshot4_status = [&s4]() {
+        auto entry3 = s4->version()->find(3);
+        ASSERT_FALSE(entry3);
+
+        auto entry2 = s4->version()->find(2);
+        ASSERT_FALSE(entry2);
+
+        auto normal_entry2 = s4->version()->findNormalPageEntry(2);
+        if constexpr (std::is_same_v<TypeParam, PageEntriesVersionSet>)
+        {
+            // For PageEntriesVersionSet, we delete the normal page
+            ASSERT_FALSE(normal_entry2);
+        }
+        else
+        {
+            // For PageEntriesVersionSetWithDelta, a tombstone is left.
+            ASSERT_TRUE(normal_entry2);
+            ASSERT_EQ(normal_entry2->checksum, 0xfUL);
+            ASSERT_TRUE(normal_entry2->isTombstone());
+        }
+
+        // We can not get 2 or 3 as normal page
+        std::set<PageId> valid_normal_page_ids = getNormalPageIDs(s4);
+        ASSERT_FALSE(valid_normal_page_ids.count(2) > 0);
+        ASSERT_FALSE(valid_normal_page_ids.count(3) > 0);
+    };
+    ensure_snapshot4_status();
+
+    // Test if one snapshot removed, other snapshot is not affected.
+    s3.reset();
+    ensure_snapshot4_status();
+    ensure_snapshot2_status();
+
+    s2.reset();
+    ensure_snapshot4_status();
+}
+
+TYPED_TEST_P(PageMapVersionSet_test, IdempotentDel)
+{
+    TypeParam versions(this->config_);
+    {
+        PageEntriesEdit edit;
+        PageEntry       e;
+        e.checksum = 0xf;
+        edit.put(2, e);
+        edit.ref(3, 2);
+        versions.apply(edit);
+    }
+    auto s1 = versions.getSnapshot();
+    ASSERT_EQ(s1->version()->at(2).checksum, 0xfUL);
+
+    // Del Page2
+    {
+        PageEntriesEdit edit;
+        edit.del(2);
+        versions.apply(edit);
+    }
     auto s2 = versions.getSnapshot();
-    ASSERT_EQ(s2->version()->at(3).checksum, 0xfUL);
+    {
+        auto ref_entry = s2->version()->at(3);
+        ASSERT_EQ(ref_entry.checksum, 0xfUL);
+        auto normal_entry = s2->version()->findNormalPageEntry(2);
+        ASSERT_TRUE(normal_entry);
+        ASSERT_EQ(normal_entry->ref, 1UL);
+        ASSERT_EQ(ref_entry.checksum, normal_entry->checksum);
+    }
 
-    std::set<PageId> valid_normal_page_ids = getNormalPageIDs(s2);
-    ASSERT_TRUE(valid_normal_page_ids.count(2) > 0);
-    ASSERT_FALSE(valid_normal_page_ids.count(3) > 0);
+    // Del Page2 again, should be idempotent.
+    {
+        PageEntriesEdit edit;
+        edit.del(2);
+        versions.apply(edit);
+    }
+    auto s3 = versions.getSnapshot();
+    {
+        auto ref_entry = s3->version()->at(3);
+        ASSERT_EQ(ref_entry.checksum, 0xfUL);
+        auto normal_entry = s3->version()->findNormalPageEntry(2);
+        ASSERT_TRUE(normal_entry);
+        ASSERT_EQ(normal_entry->ref, 1UL);
+        ASSERT_EQ(ref_entry.checksum, normal_entry->checksum);
+    }
 }
 
 TYPED_TEST_P(PageMapVersionSet_test, GcConcurrencyDelPage)
@@ -617,11 +759,11 @@ namespace
 
 #pragma clang diagnostic push
 #pragma clang diagnostic ignored "-Wunused-function"
-String liveFilesToString(const std::set<PageFileIdAndLevel> &files)
+String                   liveFilesToString(const std::set<PageFileIdAndLevel> & files)
 {
     std::stringstream ss;
-    bool is_first = true;
-    for (const auto & file: files)
+    bool              is_first = true;
+    for (const auto & file : files)
     {
         if (!is_first)
         {
@@ -703,7 +845,8 @@ REGISTER_TYPED_TEST_CASE_P(PageMapVersionSet_test,
                            GcConcurrencyDelPage,
                            GcPageMove,
                            GcConcurrencySetPage,
-                           PutRefPage,
+                           PutOrDelRefPage,
+                           IdempotentDel,
                            UpdateOnRefPage,
                            UpdateOnRefPage2,
                            IsRefId,
diff --git a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp
index b25a3d02204..16b5da2307c 100644
--- a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp
+++ b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp
@@ -1,5 +1,6 @@
 #include "gtest/gtest.h"
 
+#include <Common/CurrentMetrics.h>
 #include <IO/ReadBufferFromMemory.h>
 #include <Poco/AutoPtr.h>
 #include <Poco/ConsoleChannel.h>
@@ -204,6 +205,82 @@ TEST_F(PageStorage_test, WriteReadAfterGc)
     }
 }
 
+TEST_F(PageStorage_test, IdempotentDelAndRef)
+{
+    const size_t buf_sz = 1024;
+    char         c_buff[buf_sz];
+
+    {
+        // Page1 should be written to PageFile{1, 0}
+        WriteBatch batch;
+        memset(c_buff, 0xf, buf_sz);
+        ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
+        batch.putPage(1, 0, buff, buf_sz);
+
+        storage->write(batch);
+    }
+
+    {
+        // RefPage 2 -> 1, Del Page 1 should be written to PageFile{2, 0}
+        WriteBatch batch;
+        batch.putRefPage(2, 1);
+        batch.delPage(1);
+        ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
+        batch.putPage(1000, 0, buff, buf_sz);
+
+        storage->write(batch);
+    }
+
+    {
+        // Another RefPage 2 -> 1, Del Page 1 should be written to PageFile{3, 0}
+        WriteBatch batch;
+        batch.putRefPage(2, 1);
+        batch.delPage(1);
+
+        storage->write(batch);
+    }
+
+    {
+        auto snap      = storage->getSnapshot();
+        auto ref_entry = snap->version()->find(1);
+        ASSERT_FALSE(ref_entry);
+
+        ref_entry = snap->version()->find(2);
+        ASSERT_TRUE(ref_entry);
+        ASSERT_EQ(ref_entry->file_id, 1UL);
+        ASSERT_EQ(ref_entry->ref, 1UL);
+
+        auto normal_entry = snap->version()->findNormalPageEntry(1);
+        ASSERT_TRUE(normal_entry);
+        ASSERT_EQ(normal_entry->file_id, 1UL);
+        ASSERT_EQ(normal_entry->ref, 1UL);
+
+        // Point to the same entry
+        ASSERT_EQ(ref_entry->offset, normal_entry->offset);
+    }
+
+    storage = reopenWithConfig(config);
+
+    {
+        auto snap      = storage->getSnapshot();
+        auto ref_entry = snap->version()->find(1);
+        ASSERT_FALSE(ref_entry);
+
+        ref_entry = snap->version()->find(2);
+        ASSERT_TRUE(ref_entry);
+        ASSERT_EQ(ref_entry->file_id, 1UL);
+        ASSERT_EQ(ref_entry->ref, 1UL);
+
+        auto normal_entry = snap->version()->findNormalPageEntry(1);
+        ASSERT_TRUE(normal_entry);
+        ASSERT_EQ(normal_entry->file_id, 1UL);
+        ASSERT_EQ(normal_entry->ref, 1UL);
+
+        // Point to the same entry
+        ASSERT_EQ(ref_entry->offset, normal_entry->offset);
+    }
+}
+
 TEST_F(PageStorage_test, GcMigrateValidRefPages)
 {
     const size_t buf_sz      = 1024;
@@ -723,11 +800,32 @@ TEST_F(PageStorageWith2Pages_test, AddRefPageToNonExistPage)
     ASSERT_FALSE(storage->getEntry(3).isValid());
 }
 
+namespace
+{
+
+CurrentMetrics::Value getPSMVCCNumSnapshots()
+{
+    for (size_t i = 0, end = CurrentMetrics::end(); i < end; ++i)
+    {
+        if (i == CurrentMetrics::PSMVCCNumSnapshots)
+        {
+            return CurrentMetrics::values[i].load(std::memory_order_relaxed);
+        }
+    }
+    throw Exception(std::string(CurrentMetrics::getDescription(CurrentMetrics::PSMVCCNumSnapshots)) + " not found.");
+}
+
+} // namespace
+
+
 TEST_F(PageStorageWith2Pages_test, SnapshotReadSnapshotVersion)
 {
-    char      ch_before         = 0x01;
-    char      ch_update         = 0xFF;
-    auto      snapshot          = storage->getSnapshot();
+    char ch_before = 0x01;
+    char ch_update = 0xFF;
+
+    EXPECT_EQ(getPSMVCCNumSnapshots(), 0);
+    auto snapshot = storage->getSnapshot();
+    EXPECT_EQ(getPSMVCCNumSnapshots(), 1);
     PageEntry p1_snapshot_entry = storage->getEntry(1, snapshot);
 
     {
@@ -789,9 +887,13 @@ TEST_F(PageStorageWith2Pages_test, GetIdenticalSnapshots)
     char      ch_before         = 0x01;
     char      ch_update         = 0xFF;
     PageEntry p1_snapshot_entry = storage->getEntry(1);
-    auto      s1                = storage->getSnapshot();
-    auto      s2                = storage->getSnapshot();
-    auto      s3                = storage->getSnapshot();
+    EXPECT_EQ(getPSMVCCNumSnapshots(), 0);
+    auto s1 = storage->getSnapshot();
+    EXPECT_EQ(getPSMVCCNumSnapshots(), 1);
+    auto s2 = storage->getSnapshot();
+    EXPECT_EQ(getPSMVCCNumSnapshots(), 2);
+    auto s3 = storage->getSnapshot();
+    EXPECT_EQ(getPSMVCCNumSnapshots(), 3);
 
     {
         // write new version of Page1
@@ -840,6 +942,7 @@ TEST_F(PageStorageWith2Pages_test, GetIdenticalSnapshots)
     ASSERT_NE(p1_entry.checksum, p1_snapshot_entry.checksum);
 
     s1.reset(); /// free snapshot 1
+    EXPECT_EQ(getPSMVCCNumSnapshots(), 2);
 
     // getEntry with snapshot
     p1_entry = storage->getEntry(1, s2);
@@ -865,6 +968,7 @@ TEST_F(PageStorageWith2Pages_test, GetIdenticalSnapshots)
     ASSERT_NE(p1_entry.checksum, p1_snapshot_entry.checksum);
 
     s2.reset(); /// free snapshot 2
+    EXPECT_EQ(getPSMVCCNumSnapshots(), 1);
 
     // getEntry with snapshot
     p1_entry = storage->getEntry(1, s3);
@@ -882,6 +986,7 @@ TEST_F(PageStorageWith2Pages_test, GetIdenticalSnapshots)
     ASSERT_NE(p1_entry.checksum, p1_snapshot_entry.checksum);
 
     s3.reset(); /// free snapshot 3
+    EXPECT_EQ(getPSMVCCNumSnapshots(), 0);
 
     // without snapshot
     p1_entry = storage->getEntry(1);
diff --git a/dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp b/dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp
index 5d5b31fa962..ed938b9acd9 100644
--- a/dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp
+++ b/dbms/src/Storages/Page/tests/utils_get_valid_pages.cpp
@@ -32,6 +32,7 @@ void printPageEntry(const DB::PageId pid, const DB::PageEntry & entry)
 }
 
 int main(int argc, char ** argv)
+try
 {
     (void)argc;
     (void)argv;
@@ -59,7 +60,9 @@ int main(int argc, char ** argv)
         Usage(argv[0]);
         return 1;
     }
-    auto page_files = DB::PageStorage::listAllPageFiles(path, true, &Poco::Logger::get("root"));
+
+    // Do not remove any files.
+    auto page_files = DB::PageStorage::listAllPageFiles(path, /* remove_tmp_file= */ false, &Poco::Logger::get("root"));
 
     //DB::PageEntriesVersionSet versions;
     DB::PageEntriesVersionSetWithDelta versions;
@@ -126,3 +129,14 @@ int main(int argc, char ** argv)
     }
     return 0;
 }
+catch (const DB::Exception & e)
+{
+    std::string text = e.displayText();
+
+    auto embedded_stack_trace_pos = text.find("Stack trace");
+    std::cerr << "Code: " << e.code() << ". " << text << std::endl << std::endl;
+    if (std::string::npos == embedded_stack_trace_pos)
+        std::cerr << "Stack trace:" << std::endl << e.getStackTrace().toString() << std::endl;
+
+    return -1;
+}