Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-546] Fix exception after reload PageStorage from disk #274

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
M(StorageBufferBytes) \
M(DictCacheRequests) \
M(Revision) \
M(PSMVCCNumSnapshots) \
M(RWLockWaitingReaders) \
M(RWLockWaitingWriters) \
M(RWLockActiveReaders) \
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Common/CurrentMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace CurrentMetrics
add(metric, -value);
}

/// For lifetime of object, add amout for specified metric. Then subtract.
/// For lifetime of object, add amount for specified metric. Then subtract.
class Increment
{
private:
Expand Down
11 changes: 7 additions & 4 deletions dbms/src/Storages/Page/PageEntries.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class PageEntriesMixin
{
return {false, 0UL};
}
return {ref_pair->second != page_id, ref_pair->second};
return {true, ref_pair->second};
}

inline void clear()
Expand Down Expand Up @@ -237,10 +237,13 @@ void PageEntriesMixin<T>::del(PageId page_id)
assert(is_base); // can only call by base
// Note: must resolve ref-id before erasing entry in `page_ref`
const PageId normal_page_id = resolveRefId(page_id);
page_ref.erase(page_id);

// decrease origin page's ref counting
decreasePageRef<must_exist>(normal_page_id);
const size_t num_erase = page_ref.erase(page_id);
if (num_erase > 0)
{
// decrease origin page's ref counting
decreasePageRef<must_exist>(normal_page_id);
}
}

template <typename T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,15 @@ void DeltaVersionEditAcceptor::applyPut(PageEntriesEdit::EditRecord & rec)
void DeltaVersionEditAcceptor::applyDel(PageEntriesEdit::EditRecord & rec)
{
assert(rec.type == WriteBatch::WriteType::DEL);
const PageId normal_page_id = view->resolveRefId(rec.page_id);
auto [is_ref, normal_page_id] = view->isRefId(rec.page_id);
view->resolveRefId(rec.page_id);
current_version->ref_deletions.insert(rec.page_id);
current_version->page_ref.erase(rec.page_id);
this->decreasePageRef(normal_page_id);
if (is_ref)
{
// If ref exists, we need to decrease entry ref-count
this->decreasePageRef(normal_page_id);
}
}

void DeltaVersionEditAcceptor::applyRef(PageEntriesEdit::EditRecord & rec)
Expand Down Expand Up @@ -283,7 +288,7 @@ void DeltaVersionEditAcceptor::applyInplace(const PageEntriesVersionSetWithDelta
void DeltaVersionEditAcceptor::decreasePageRef(const PageId page_id)
{
const auto old_entry = view->findNormalPageEntry(page_id);
if (!old_entry)
if (old_entry)
{
auto entry = *old_entry;
entry.ref = old_entry->ref <= 1 ? 0 : old_entry->ref - 1;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ std::optional<PageEntry> PageEntriesView::findNormalPageEntry(PageId page_id) co
std::pair<bool, PageId> PageEntriesView::isRefId(PageId page_id) const
{
auto node = tail;
// For delta, we need to check if page_id is deleted, then try to find in page_ref
for (; !node->isBase(); node = node->prev)
{
if (node->ref_deletions.count(page_id) > 0)
Expand All @@ -75,6 +76,7 @@ std::pair<bool, PageId> PageEntriesView::isRefId(PageId page_id) const
if (iter != node->page_ref.end())
return {true, iter->second};
}
// For base
return node->isRefId(page_id);
}

Expand Down
20 changes: 19 additions & 1 deletion dbms/src/Storages/Page/mvcc/VersionSetWithDelta.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <stack>
#include <unordered_set>

#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <IO/WriteHelpers.h>
#include <Storages/Page/mvcc/VersionSet.h>
Expand All @@ -22,6 +23,11 @@ extern const Event PSMVCCApplyOnCurrentDelta;
extern const Event PSMVCCApplyOnNewDelta;
} // namespace ProfileEvents

namespace CurrentMetrics
{
extern const Metric PSMVCCNumSnapshots;
} // namespace CurrentMetrics

namespace DB
{
namespace MVCC
Expand Down Expand Up @@ -61,13 +67,18 @@ class VersionSetWithDelta
snapshots(std::move(std::make_shared<Snapshot>(this, nullptr))), //
config(config_)
{
// The placeholder snapshot should not be counted.
CurrentMetrics::sub(CurrentMetrics::PSMVCCNumSnapshots);
}

virtual ~VersionSetWithDelta()
{
current.reset();
// snapshot list is empty
assert(snapshots->prev == snapshots.get());

// Ignore the destructor of placeholder snapshot
CurrentMetrics::add(CurrentMetrics::PSMVCCNumSnapshots);
}

void apply(TVersionEdit & edit)
Expand Down Expand Up @@ -114,7 +125,10 @@ class VersionSetWithDelta
Snapshot * next;

public:
Snapshot(VersionSetWithDelta * vset_, VersionPtr tail_) : vset(vset_), view(std::move(tail_)), prev(this), next(this) {}
Snapshot(VersionSetWithDelta * vset_, VersionPtr tail_) : vset(vset_), view(std::move(tail_)), prev(this), next(this)
{
CurrentMetrics::add(CurrentMetrics::PSMVCCNumSnapshots);
}

~Snapshot()
{
Expand All @@ -123,6 +137,8 @@ class VersionSetWithDelta
std::unique_lock lock = vset->acquireForLock();
prev->next = next;
next->prev = prev;

CurrentMetrics::sub(CurrentMetrics::PSMVCCNumSnapshots);
}

const TVersionView * version() const { return &view; }
Expand Down Expand Up @@ -327,6 +343,8 @@ class VersionSetWithDelta
VersionPtr current;
SnapshotPtr snapshots;
::DB::MVCC::VersionSetConfig config;

friend class VersionSetWithDeltaCompactTest;
};

} // namespace MVCC
Expand Down
49 changes: 49 additions & 0 deletions dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,55 @@ TEST_F(PageEntryMap_test, PutDel)
ASSERT_EQ(map->find(2), std::nullopt);
}

TEST_F(PageEntryMap_test, IdempotentDel)
{
PageEntry p0entry;
p0entry.file_id = 1;
p0entry.checksum = 0x123;
map->put(0, p0entry);
{
ASSERT_NE(map->find(0), std::nullopt);
const PageEntry & entry = map->at(0);
EXPECT_EQ(entry.file_id, p0entry.file_id);
EXPECT_EQ(entry.checksum, p0entry.checksum);
}
map->ref(2, 0);
{
ASSERT_NE(map->find(2), std::nullopt);
const PageEntry & entry = map->at(2);
EXPECT_EQ(entry.file_id, p0entry.file_id);
EXPECT_EQ(entry.checksum, p0entry.checksum);
}

map->del(0);
{
// Should not found Page0, but Page2 is still available
ASSERT_EQ(map->find(0), std::nullopt);
auto entry = map->find(2);
ASSERT_TRUE(entry);
EXPECT_EQ(entry->file_id, p0entry.file_id);
EXPECT_EQ(entry->checksum, p0entry.checksum);
entry = map->findNormalPageEntry(0);
ASSERT_TRUE(entry);
EXPECT_EQ(entry->file_id, p0entry.file_id);
EXPECT_EQ(entry->checksum, p0entry.checksum);
}

// Del should be idempotent
map->del(0);
{
ASSERT_EQ(map->find(0), std::nullopt);
auto entry = map->find(2);
ASSERT_TRUE(entry);
EXPECT_EQ(entry->file_id, p0entry.file_id);
EXPECT_EQ(entry->checksum, p0entry.checksum);
entry = map->findNormalPageEntry(0);
ASSERT_TRUE(entry);
EXPECT_EQ(entry->file_id, p0entry.file_id);
EXPECT_EQ(entry->checksum, p0entry.checksum);
}
}

TEST_F(PageEntryMap_test, UpdateRefPageEntry)
{
const PageId page_id = 0;
Expand Down
Loading