Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary compatibility version for DeltaMerge's chunk; Fix disappear of PageStorage del meta #257

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Revert "Remove getMaxId of PageStorage"
This reverts commit 34d50eb6e9fb2f229f32e2d6b219b74c340d0d0a.
JaySon-Huang committed Sep 27, 2019

Verified

This commit was signed with the committer’s verified signature.
ViBiOh Vincent Boutour
commit 1d6e0f179282587bd7020baa5df0193b2fd411cf
12 changes: 6 additions & 6 deletions dbms/src/Storages/DeltaMerge/StoragePool.cpp
Original file line number Diff line number Diff line change
@@ -7,12 +7,12 @@ namespace DM

// TODO: Load configs from settings.
StoragePool::StoragePool(const String &name, const String & path)
: max_log_page_id(0),
max_data_page_id(0),
max_meta_page_id(0),
log_storage(name + ".log", path + "/log", {}, &max_log_page_id),
data_storage(name + ".data", path + "/data", {}, &max_data_page_id),
meta_storage(name + ".meta", path + "/meta", {}, &max_meta_page_id)
: log_storage(name + ".log", path + "/log", {}),
data_storage(name + ".data", path + "/data", {}),
meta_storage(name + ".meta", path + "/meta", {}),
max_log_page_id(log_storage.getMaxId()),
max_data_page_id(data_storage.getMaxId()),
max_meta_page_id(meta_storage.getMaxId())
{
}

9 changes: 4 additions & 5 deletions dbms/src/Storages/DeltaMerge/StoragePool.h
Original file line number Diff line number Diff line change
@@ -37,15 +37,14 @@ class StoragePool : private boost::noncopyable
bool gc(const Seconds & try_gc_period = DELTA_MERGE_GC_PERIOD);

private:
// These should init before PageStorages
std::atomic<PageId> max_log_page_id;
std::atomic<PageId> max_data_page_id;
std::atomic<PageId> max_meta_page_id;

PageStorage log_storage;
PageStorage data_storage;
PageStorage meta_storage;

std::atomic<PageId> max_log_page_id;
std::atomic<PageId> max_data_page_id;
std::atomic<PageId> max_meta_page_id;

std::atomic<Timepoint> last_try_gc_time = Clock::now();

std::mutex mutex;
12 changes: 11 additions & 1 deletion dbms/src/Storages/Page/PageEntries.h
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ template <typename T>
class PageEntriesMixin
{
public:
explicit PageEntriesMixin(bool is_base_) : normal_pages(), page_ref(), ref_deletions(), is_base(is_base_) {}
explicit PageEntriesMixin(bool is_base_) : normal_pages(), page_ref(), ref_deletions(), max_page_id(0), is_base(is_base_) {}

public:
static std::shared_ptr<T> createBase() { return std::make_shared<T>(true); }
@@ -105,9 +105,12 @@ class PageEntriesMixin
{
page_ref.clear();
normal_pages.clear();
max_page_id = 0;
ref_deletions.clear();
}

PageId maxId() const { return max_page_id; }

public:
using const_normal_page_iterator = std::unordered_map<PageId, PageEntry>::const_iterator;
// only scan over normal Pages, excluding RefPages
@@ -120,6 +123,7 @@ class PageEntriesMixin
// RefPageId deletions
std::unordered_set<PageId> ref_deletions;

PageId max_page_id;
bool is_base;

protected:
@@ -154,6 +158,7 @@ class PageEntriesMixin
{
page_ref = rhs.page_ref;
normal_pages = rhs.normal_pages;
max_page_id = rhs.max_page_id;
ref_deletions = rhs.ref_deletions;
}

@@ -169,6 +174,7 @@ class PageEntriesMixin
{
normal_pages.swap(rhs.normal_pages);
page_ref.swap(rhs.page_ref);
max_page_id = rhs.max_page_id;
is_base = rhs.is_base;
ref_deletions.swap(rhs.ref_deletions);
}
@@ -211,6 +217,8 @@ void PageEntriesMixin<T>::put(PageId page_id, const PageEntry & entry)
normal_pages[normal_page_id].ref = page_ref_count + is_new_ref_pair_inserted;
}

// update max_page_id
max_page_id = std::max(max_page_id, page_id);
}

template <typename T>
@@ -265,6 +273,7 @@ void PageEntriesMixin<T>::ref(const PageId ref_id, const PageId page_id)
page_ref[ref_id] = normal_page_id;
}
}
max_page_id = std::max(max_page_id, std::max(ref_id, page_id));
}

template <typename T>
@@ -434,6 +443,7 @@ class PageEntriesForDelta : public PageEntriesMixin<PageEntriesForDelta>,
normal_pages[it.first] = it.second;
}
}
max_page_id = std::max(max_page_id, rhs.max_page_id);
}

bool shouldCompactToBase(const ::DB::MVCC::VersionSetConfig & config)
34 changes: 10 additions & 24 deletions dbms/src/Storages/Page/PageFile.cpp
Original file line number Diff line number Diff line change
@@ -141,17 +141,14 @@ std::pair<ByteBuffer, ByteBuffer> genWriteData( //

/// Analyze meta file, and return <available meta size, available data size>.
std::pair<UInt64, UInt64> analyzeMetaFile( //
const String & path,
PageFileId file_id,
UInt32 level,
const char * meta_data,
const size_t meta_data_size,
PageEntriesEdit & edit,
std::atomic<PageId> * max_page_id,
Logger * log)
const String & path,
PageFileId file_id,
UInt32 level,
const char * meta_data,
const size_t meta_data_size,
PageEntriesEdit & edit,
Logger * log)
{
assert(max_page_id != nullptr);

const char * meta_data_end = meta_data + meta_data_size;

UInt64 page_data_file_size = 0;
@@ -204,23 +201,20 @@ std::pair<UInt64, UInt64> analyzeMetaFile( //
pc.checksum = PageUtil::get<Checksum>(pos);

edit.put(page_id, pc);
*max_page_id = std::max<PageId>(*max_page_id, page_id);
page_data_file_size += pc.size;
break;
}
case WriteBatch::WriteType::DEL:
{
auto page_id = PageUtil::get<PageId>(pos);
edit.del(page_id); // Reserve the order of removal.
*max_page_id = std::max<PageId>(*max_page_id, page_id);
break;
}
case WriteBatch::WriteType::REF:
{
const auto ref_id = PageUtil::get<PageId>(pos);
const auto page_id = PageUtil::get<PageId>(pos);
edit.ref(ref_id, page_id);
*max_page_id = std::max<PageId>(*max_page_id, page_id);
}
}
}
@@ -286,10 +280,7 @@ void PageFile::Writer::write(const WriteBatch & wb, PageEntriesEdit & edit)
// PageFile::Reader
// =========================================================

PageFile::Reader::Reader(PageFile & page_file)
: data_file_path(page_file.dataPath()), data_file_fd(PageUtil::openFile<true>(data_file_path))
{
}
PageFile::Reader::Reader(PageFile & page_file) : data_file_path(page_file.dataPath()), data_file_fd(PageUtil::openFile<true>(data_file_path)) {}

PageFile::Reader::~Reader()
{
@@ -469,7 +460,7 @@ PageFile PageFile::openPageFileForRead(PageFileId file_id, UInt32 level, const s
return PageFile(file_id, level, parent_path, false, false, log);
}

void PageFile::readAndSetPageMetas(PageEntriesEdit & edit, std::atomic<PageId> * max_page_id)
void PageFile::readAndSetPageMetas(PageEntriesEdit & edit)
{
const auto path = metaPath();
Poco::File file(path);
@@ -484,14 +475,9 @@ void PageFile::readAndSetPageMetas(PageEntriesEdit & edit, std::atomic<PageId> *

PageUtil::readFile(file_fd, 0, data, file_size, path);

// If max_page_id is nullptr, we will ignore that
std::atomic<PageId> ignored_max_page_id = 0;
if (max_page_id == nullptr)
max_page_id = &ignored_max_page_id;

// analyze meta file and update page_entries
std::tie(this->meta_file_pos, this->data_file_pos)
= PageMetaFormat::analyzeMetaFile(folderPath(), file_id, level, data, file_size, edit, max_page_id, log);
= PageMetaFormat::analyzeMetaFile(folderPath(), file_id, level, data, file_size, edit, log);
}

void PageFile::setFormal()
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/PageFile.h
Original file line number Diff line number Diff line change
@@ -91,7 +91,7 @@ class PageFile : public Allocator<false>
/// Get pages' metadata by this method. Will also update file pos.
/// Call this method after a page file recovered.
/// if check_page_map_complete is true, do del or ref on non-exist page will throw exception.
void readAndSetPageMetas(PageEntriesEdit & edit, std::atomic<PageId> * max_page_id = nullptr);
void readAndSetPageMetas(PageEntriesEdit & edit);

/// Rename this page file into formal style.
void setFormal();
10 changes: 8 additions & 2 deletions dbms/src/Storages/Page/PageStorage.cpp
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ PageStorage::listAllPageFiles(const String & storage_path, bool remove_tmp_file,
return page_files;
}

PageStorage::PageStorage(String name, const String & storage_path_, const Config & config_, std::atomic<PageId> * max_page_id)
PageStorage::PageStorage(String name, const String & storage_path_, const Config & config_)
: storage_name(std::move(name)),
storage_path(storage_path_),
config(config_),
@@ -68,7 +68,7 @@ PageStorage::PageStorage(String name, const String & storage_path_, const Config
for (auto & page_file : page_files)
{
PageEntriesEdit edit;
const_cast<PageFile &>(page_file).readAndSetPageMetas(edit, max_page_id);
const_cast<PageFile &>(page_file).readAndSetPageMetas(edit);

// Only level 0 is writable.
if (page_file.getLevel() == 0)
@@ -99,6 +99,12 @@ PageStorage::PageStorage(String name, const String & storage_path_, const Config
#endif
}

PageId PageStorage::getMaxId()
{
std::lock_guard<std::mutex> write_lock(write_mutex);
return versioned_page_entries.getSnapshot()->version()->maxId();
}

PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot)
{
if (!snapshot)
4 changes: 3 additions & 1 deletion dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
@@ -56,7 +56,9 @@ class PageStorage
using OpenReadFiles = std::map<PageFileIdAndLevel, ReaderPtr>;

public:
PageStorage(String name, const String & storage_path, const Config & config_, std::atomic<PageId> * max_page_id = nullptr);
PageStorage(String name, const String & storage_path, const Config & config_);

PageId getMaxId();

void write(const WriteBatch & write_batch);

Original file line number Diff line number Diff line change
@@ -199,6 +199,8 @@ void DeltaVersionEditAcceptor::applyPut(PageEntriesEdit::EditRecord & rec)
rec.entry.ref = old_entry->ref + !is_ref_exist;
current_version->normal_pages[normal_page_id] = rec.entry;
}

current_version->max_page_id = std::max(current_version->max_page_id, rec.page_id);
}

void DeltaVersionEditAcceptor::applyDel(PageEntriesEdit::EditRecord & rec)
@@ -251,6 +253,7 @@ void DeltaVersionEditAcceptor::applyRef(PageEntriesEdit::EditRecord & rec)
current_version->page_ref[rec.page_id] = normal_page_id;
}
}
current_version->max_page_id = std::max(current_version->max_page_id, rec.page_id);
}

void DeltaVersionEditAcceptor::applyInplace(const PageEntriesVersionSetWithDelta::VersionPtr & current, const PageEntriesEdit & edit)
10 changes: 10 additions & 0 deletions dbms/src/Storages/Page/VersionSet/PageEntriesView.cpp
Original file line number Diff line number Diff line change
@@ -141,4 +141,14 @@ std::set<PageId> PageEntriesView::validNormalPageIds() const
return valid_normal_pages;
}

PageId PageEntriesView::maxId() const
{
PageId max_id = 0;
for (auto node = tail; node != nullptr; node = node->prev)
{
max_id = std::max(max_id, node->maxId());
}
return max_id;
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/VersionSet/PageEntriesView.h
Original file line number Diff line number Diff line change
@@ -28,6 +28,8 @@ class PageEntriesView
std::set<PageId> validNormalPageIds() const;
std::optional<PageEntry> findNormalPageEntry(PageId page_id) const;

PageId maxId() const;

inline std::shared_ptr<PageEntriesForDelta> getSharedTailVersion() const { return tail; }

inline std::shared_ptr<PageEntriesForDelta> transferTailVersionOwn()
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/tests/gtest_page_entry_map.cpp
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ TEST_F(PageEntryMap_test, Empty)
item_count += 1;
}
ASSERT_EQ(item_count, 0UL);
ASSERT_EQ(map->maxId(), 0UL);


// add some Pages, RefPages
@@ -52,6 +53,7 @@ TEST_F(PageEntryMap_test, Empty)
item_count += 1;
}
ASSERT_EQ(item_count, 2UL);
ASSERT_EQ(map->maxId(), 1UL);

map->clear();
item_count = 0;
@@ -60,6 +62,7 @@ TEST_F(PageEntryMap_test, Empty)
item_count += 1;
}
ASSERT_EQ(item_count, 0UL);
ASSERT_EQ(map->maxId(), 0UL);
}

TEST_F(PageEntryMap_test, UpdatePageEntry)