Skip to content

Commit

Permalink
[FLASH-546] Fix exception after reload PageStorage from disk (pingcap…
Browse files Browse the repository at this point in the history
…#274)

* Idempotent del for PageStorage
* Fix the won't decrease of Normal Page in PageStorage
* Add more test case for checking the ref-count of NormalPage

* add metrics: PSMVCCNumSnapshots
  • Loading branch information
JaySon-Huang committed Oct 18, 2019
1 parent 234f816 commit ed38c2a
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 29 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
M(StorageBufferBytes) \
M(DictCacheRequests) \
M(Revision) \
M(PSMVCCNumSnapshots) \
M(RWLockWaitingReaders) \
M(RWLockWaitingWriters) \
M(RWLockActiveReaders) \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Common/CurrentMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace CurrentMetrics
add(metric, -value);
}

/// For lifetime of object, add amout for specified metric. Then subtract.
/// For lifetime of object, add amount for specified metric. Then subtract.
class Increment
{
private:
Expand Down
11 changes: 7 additions & 4 deletions dbms/src/Storages/Page/PageEntries.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class PageEntriesMixin
{
return {false, 0UL};
}
return {ref_pair->second != page_id, ref_pair->second};
return {true, ref_pair->second};
}

inline void clear()
Expand Down Expand Up @@ -237,10 +237,13 @@ void PageEntriesMixin<T>::del(PageId page_id)
assert(is_base); // can only call by base
// Note: must resolve ref-id before erasing entry in `page_ref`
const PageId normal_page_id = resolveRefId(page_id);
page_ref.erase(page_id);

// decrease origin page's ref counting
decreasePageRef<must_exist>(normal_page_id);
const size_t num_erase = page_ref.erase(page_id);
if (num_erase > 0)
{
// decrease origin page's ref counting
decreasePageRef<must_exist>(normal_page_id);
}
}

template <typename T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,15 @@ void DeltaVersionEditAcceptor::applyPut(PageEntriesEdit::EditRecord & rec)
void DeltaVersionEditAcceptor::applyDel(PageEntriesEdit::EditRecord & rec)
{
assert(rec.type == WriteBatch::WriteType::DEL);
const PageId normal_page_id = view->resolveRefId(rec.page_id);
auto [is_ref, normal_page_id] = view->isRefId(rec.page_id);
view->resolveRefId(rec.page_id);
current_version->ref_deletions.insert(rec.page_id);
current_version->page_ref.erase(rec.page_id);
this->decreasePageRef(normal_page_id);
if (is_ref)
{
// If ref exists, we need to decrease entry ref-count
this->decreasePageRef(normal_page_id);
}
}

void DeltaVersionEditAcceptor::applyRef(PageEntriesEdit::EditRecord & rec)
Expand Down Expand Up @@ -283,7 +288,7 @@ void DeltaVersionEditAcceptor::applyInplace(const PageEntriesVersionSetWithDelta
void DeltaVersionEditAcceptor::decreasePageRef(const PageId page_id)
{
const auto old_entry = view->findNormalPageEntry(page_id);
if (!old_entry)
if (old_entry)
{
auto entry = *old_entry;
entry.ref = old_entry->ref <= 1 ? 0 : old_entry->ref - 1;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ std::optional<PageEntry> PageEntriesView::findNormalPageEntry(PageId page_id) co
std::pair<bool, PageId> PageEntriesView::isRefId(PageId page_id) const
{
auto node = tail;
// For delta, we need to check if page_id is deleted, then try to find in page_ref
for (; !node->isBase(); node = node->prev)
{
if (node->ref_deletions.count(page_id) > 0)
Expand All @@ -75,6 +76,7 @@ std::pair<bool, PageId> PageEntriesView::isRefId(PageId page_id) const
if (iter != node->page_ref.end())
return {true, iter->second};
}
// For base
return node->isRefId(page_id);
}

Expand Down
20 changes: 19 additions & 1 deletion dbms/src/Storages/Page/mvcc/VersionSetWithDelta.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <stack>
#include <unordered_set>

#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <IO/WriteHelpers.h>
#include <Storages/Page/mvcc/VersionSet.h>
Expand All @@ -22,6 +23,11 @@ extern const Event PSMVCCApplyOnCurrentDelta;
extern const Event PSMVCCApplyOnNewDelta;
} // namespace ProfileEvents

namespace CurrentMetrics
{
extern const Metric PSMVCCNumSnapshots;
} // namespace CurrentMetrics

namespace DB
{
namespace MVCC
Expand Down Expand Up @@ -61,13 +67,18 @@ class VersionSetWithDelta
snapshots(std::move(std::make_shared<Snapshot>(this, nullptr))), //
config(config_)
{
// The placeholder snapshot should not be counted.
CurrentMetrics::sub(CurrentMetrics::PSMVCCNumSnapshots);
}

virtual ~VersionSetWithDelta()
{
current.reset();
// snapshot list is empty
assert(snapshots->prev == snapshots.get());

// Ignore the destructor of placeholder snapshot
CurrentMetrics::add(CurrentMetrics::PSMVCCNumSnapshots);
}

void apply(TVersionEdit & edit)
Expand Down Expand Up @@ -114,7 +125,10 @@ class VersionSetWithDelta
Snapshot * next;

public:
Snapshot(VersionSetWithDelta * vset_, VersionPtr tail_) : vset(vset_), view(std::move(tail_)), prev(this), next(this) {}
Snapshot(VersionSetWithDelta * vset_, VersionPtr tail_) : vset(vset_), view(std::move(tail_)), prev(this), next(this)
{
CurrentMetrics::add(CurrentMetrics::PSMVCCNumSnapshots);
}

~Snapshot()
{
Expand All @@ -123,6 +137,8 @@ class VersionSetWithDelta
std::unique_lock lock = vset->acquireForLock();
prev->next = next;
next->prev = prev;

CurrentMetrics::sub(CurrentMetrics::PSMVCCNumSnapshots);
}

const TVersionView * version() const { return &view; }
Expand Down Expand Up @@ -327,6 +343,8 @@ class VersionSetWithDelta
VersionPtr current;
SnapshotPtr snapshots;
::DB::MVCC::VersionSetConfig config;

friend class VersionSetWithDeltaCompactTest;
};

} // namespace MVCC
Expand Down
49 changes: 49 additions & 0 deletions dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,55 @@ TEST_F(PageEntryMap_test, PutDel)
ASSERT_EQ(map->find(2), std::nullopt);
}

TEST_F(PageEntryMap_test, IdempotentDel)
{
PageEntry p0entry;
p0entry.file_id = 1;
p0entry.checksum = 0x123;
map->put(0, p0entry);
{
ASSERT_NE(map->find(0), std::nullopt);
const PageEntry & entry = map->at(0);
EXPECT_EQ(entry.file_id, p0entry.file_id);
EXPECT_EQ(entry.checksum, p0entry.checksum);
}
map->ref(2, 0);
{
ASSERT_NE(map->find(2), std::nullopt);
const PageEntry & entry = map->at(2);
EXPECT_EQ(entry.file_id, p0entry.file_id);
EXPECT_EQ(entry.checksum, p0entry.checksum);
}

map->del(0);
{
// Should not found Page0, but Page2 is still available
ASSERT_EQ(map->find(0), std::nullopt);
auto entry = map->find(2);
ASSERT_TRUE(entry);
EXPECT_EQ(entry->file_id, p0entry.file_id);
EXPECT_EQ(entry->checksum, p0entry.checksum);
entry = map->findNormalPageEntry(0);
ASSERT_TRUE(entry);
EXPECT_EQ(entry->file_id, p0entry.file_id);
EXPECT_EQ(entry->checksum, p0entry.checksum);
}

// Del should be idempotent
map->del(0);
{
ASSERT_EQ(map->find(0), std::nullopt);
auto entry = map->find(2);
ASSERT_TRUE(entry);
EXPECT_EQ(entry->file_id, p0entry.file_id);
EXPECT_EQ(entry->checksum, p0entry.checksum);
entry = map->findNormalPageEntry(0);
ASSERT_TRUE(entry);
EXPECT_EQ(entry->file_id, p0entry.file_id);
EXPECT_EQ(entry->checksum, p0entry.checksum);
}
}

TEST_F(PageEntryMap_test, UpdateRefPageEntry)
{
const PageId page_id = 0;
Expand Down
Loading

0 comments on commit ed38c2a

Please sign in to comment.