Skip to content

Commit 0234d7a

Browse files
committed
Binary compatibility version for DeltaMerge's chunk; Fix disappear of PageStorage del meta (pingcap#257)
* Chunk binary version * Add name to PageStorage for identify different storages * Remove getMaxId of PageStorage * Migrate DelPage mark in doing GC * enable fullstack-test * Revert "Remove getMaxId of PageStorage" This reverts commit 34d50eb6e9fb2f229f32e2d6b219b74c340d0d0a.
1 parent f430bae commit 0234d7a

15 files changed

+170
-63
lines changed

dbms/src/Storages/DeltaMerge/Chunk.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@ namespace DB
1010
namespace DM
1111
{
1212

13+
const Chunk::Version Chunk::CURRENT_VERSION = 1;
14+
1315
void Chunk::serialize(WriteBuffer & buf) const
1416
{
17+
writeVarUInt(Chunk::CURRENT_VERSION, buf); // Add binary version
18+
1519
writeIntBinary(handle_start, buf);
1620
writeIntBinary(handle_end, buf);
1721
writePODBinary(is_delete_range, buf);
@@ -37,6 +41,12 @@ void Chunk::serialize(WriteBuffer & buf) const
3741

3842
Chunk Chunk::deserialize(ReadBuffer & buf)
3943
{
44+
// Check binary version
45+
Chunk::Version chunk_batch_version;
46+
readVarUInt(chunk_batch_version, buf);
47+
if (chunk_batch_version != Chunk::CURRENT_VERSION)
48+
throw Exception("Chunk binary version not match: " + DB::toString(chunk_batch_version), ErrorCodes::LOGICAL_ERROR);
49+
4050
Handle start, end;
4151
readIntBinary(start, buf);
4252
readIntBinary(end, buf);

dbms/src/Storages/DeltaMerge/Chunk.h

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ using ColumnMetas = std::vector<ColumnMeta>;
3535

3636
class Chunk
3737
{
38+
public:
39+
// Binary version of chunk
40+
using Version = UInt32;
41+
static const Version CURRENT_VERSION;
3842
public:
3943
using ColumnMetaMap = std::unordered_map<ColId, ColumnMeta>;
4044

dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp

+4-3
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,14 @@ namespace DM
3838

3939
DeltaMergeStore::DeltaMergeStore(Context & db_context,
4040
const String & path_,
41-
const String & name,
41+
const String & db_name,
42+
const String & tbl_name,
4243
const ColumnDefines & columns,
4344
const ColumnDefine & handle,
4445
const Settings & settings_)
4546
: path(path_),
46-
storage_pool(path),
47-
table_name(name),
47+
storage_pool(db_name + "." + tbl_name, path),
48+
table_name(tbl_name),
4849
table_handle_define(handle),
4950
background_pool(db_context.getBackgroundPool()),
5051
settings(settings_),

dbms/src/Storages/DeltaMerge/DeltaMergeStore.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ class DeltaMergeStore
4343

4444
DeltaMergeStore(Context & db_context, //
4545
const String & path_,
46-
const String & name,
46+
const String & db_name,
47+
const String & tbl_name,
4748
const ColumnDefines & columns,
4849
const ColumnDefine & handle,
4950
const Settings & settings_);

dbms/src/Storages/DeltaMerge/StoragePool.cpp

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ namespace DM
66
{
77

88
// TODO: Load configs from settings.
9-
StoragePool::StoragePool(const String & path)
10-
: log_storage(path + "/log", {}),
11-
data_storage(path + "/data", {}),
12-
meta_storage(path + "/meta", {}),
9+
StoragePool::StoragePool(const String &name, const String & path)
10+
: log_storage(name + ".log", path + "/log", {}),
11+
data_storage(name + ".data", path + "/data", {}),
12+
meta_storage(name + ".meta", path + "/meta", {}),
1313
max_log_page_id(log_storage.getMaxId()),
1414
max_data_page_id(data_storage.getMaxId()),
1515
max_meta_page_id(meta_storage.getMaxId())

dbms/src/Storages/DeltaMerge/StoragePool.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class StoragePool : private boost::noncopyable
2020
using Duration = Clock::duration;
2121
using Seconds = std::chrono::seconds;
2222

23-
explicit StoragePool(const String & path);
23+
StoragePool(const String & name, const String & path);
2424

2525
PageId maxLogPageId() { return max_log_page_id; }
2626
PageId maxDataPageId() { return max_data_page_id; }

dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class DeltaMergeStore_test : public ::testing::Test
5151
ColumnDefine handle_column_define = cols[0];
5252

5353
DeltaMergeStorePtr s
54-
= std::make_shared<DeltaMergeStore>(*context, path, name, cols, handle_column_define, DeltaMergeStore::Settings());
54+
= std::make_shared<DeltaMergeStore>(*context, path, "test", name, cols, handle_column_define, DeltaMergeStore::Settings());
5555
return s;
5656
}
5757

dbms/src/Storages/DeltaMerge/tests/gtest_dm_disk_value_space.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class DiskValueSpace_test : public ::testing::Test
3434
{
3535
dropDataInDisk();
3636

37-
storage_pool = std::make_unique<StoragePool>(path);
37+
storage_pool = std::make_unique<StoragePool>("test.t1", path);
3838
Context & context = DMTestEnv::getContext();
3939
table_handle_define = ColumnDefine(1, "pk", std::make_shared<DataTypeInt64>());
4040
table_columns.clear();

dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class Segment_test : public ::testing::Test
5353

5454
SegmentPtr reload(ColumnDefines && pre_define_columns = {}, DB::Settings && db_settings = DB::Settings())
5555
{
56-
storage_pool = std::make_unique<StoragePool>(path);
56+
storage_pool = std::make_unique<StoragePool>("test.t1", path);
5757
*db_context = DMTestEnv::getContext(db_settings);
5858
ColumnDefines cols = pre_define_columns.empty() ? DMTestEnv::getDefaultColumns() : pre_define_columns;
5959
setColumns(cols);

dbms/src/Storages/Page/PageStorage.cpp

+62-35
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
#include <Storages/Page/PageStorage.h>
2-
31
#include <set>
2+
#include <utility>
3+
4+
#include <Storages/Page/PageStorage.h>
45

56
#include <IO/ReadBufferFromMemory.h>
67
#include <Poco/File.h>
@@ -51,8 +52,9 @@ PageStorage::listAllPageFiles(const String & storage_path, bool remove_tmp_file,
5152
return page_files;
5253
}
5354

54-
PageStorage::PageStorage(const String & storage_path_, const Config & config_)
55-
: storage_path(storage_path_),
55+
PageStorage::PageStorage(String name, const String & storage_path_, const Config & config_)
56+
: storage_name(std::move(name)),
57+
storage_path(storage_path_),
5658
config(config_),
5759
versioned_page_entries(),
5860
page_file_log(&Poco::Logger::get("PageFile")),
@@ -120,7 +122,7 @@ PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot)
120122
}
121123
catch (DB::Exception & e)
122124
{
123-
LOG_WARNING(log, e.message());
125+
LOG_WARNING(log, storage_name << " " << e.message());
124126
return {}; // return invalid PageEntry
125127
}
126128
}
@@ -336,8 +338,6 @@ bool PageStorage::gc()
336338
return false;
337339
}
338340

339-
LOG_DEBUG(log, "PageStorage GC start");
340-
341341
PageFileIdAndLevel writing_file_id_level;
342342
{
343343
std::lock_guard<std::mutex> lock(write_mutex);
@@ -387,12 +387,14 @@ bool PageStorage::gc()
387387
|| (merge_files.size() >= 2 && candidate_total_size >= config.merge_hint_low_used_file_total_size);
388388
if (!should_merge)
389389
{
390-
LOG_DEBUG(log,
391-
"GC exit without merging. merge file size: " << merge_files.size() << ", candidate size: " << candidate_total_size);
390+
LOG_TRACE(log,
391+
storage_name << " GC exit without merging. merge file size: " << merge_files.size()
392+
<< ", candidate size: " << candidate_total_size);
392393
return false;
393394
}
394395

395-
LOG_INFO(log, "GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count << " regions");
396+
LOG_INFO(log,
397+
storage_name << " GC decide to merge " << merge_files.size() << " files, containing " << migrate_page_count << " regions");
396398

397399
// There are no valid pages to be migrated but valid ref pages, scan over all `merge_files` and do migrate.
398400
gc_file_entries_edit = gcMigratePages(snapshot, file_valid_pages, merge_files);
@@ -421,20 +423,7 @@ bool PageStorage::gc()
421423
}
422424

423425
// Delete obsolete files that are not used by any version, without lock
424-
for (const auto & page_file : page_files)
425-
{
426-
const auto page_id_and_lvl = page_file.fileIdLevel();
427-
if (page_id_and_lvl >= writing_file_id_level)
428-
{
429-
continue;
430-
}
431-
432-
if (live_files.count(page_id_and_lvl) == 0)
433-
{
434-
// the page file is not used by any version, remove reader cache
435-
page_file.destroy();
436-
}
437-
}
426+
gcRemoveObsoleteFiles(page_files, writing_file_id_level, live_files);
438427
return true;
439428
}
440429

@@ -491,6 +480,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
491480

492481
size_t num_successful_migrate_pages = 0;
493482
size_t num_valid_ref_pages = 0;
483+
size_t num_del_page_meta = 0;
494484
auto * current = snapshot->version();
495485
{
496486
PageEntriesEdit legacy_edit; // All page entries in `merge_files`
@@ -510,8 +500,8 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
510500
continue;
511501
}
512502

503+
PageIdAndEntries page_id_and_entries; // The valid pages that we need to migrate to `gc_file`
513504
auto to_merge_file_reader = to_merge_file.createReader();
514-
PageIdAndEntries page_id_and_entries;
515505
{
516506
const auto & page_ids = it->second.second;
517507
for (auto page_id : page_ids)
@@ -530,7 +520,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
530520
catch (DB::Exception & e)
531521
{
532522
// ignore if it2 is a ref to non-exist page
533-
LOG_WARNING(log, "Ignore invalid RefPage while gcMigratePages: " + e.message());
523+
LOG_WARNING(log, storage_name << " Ignore invalid RefPage while gcMigratePages: " << e.message());
534524
}
535525
}
536526
}
@@ -554,17 +544,26 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
554544
}
555545

556546
{
557-
// Migrate RefPages which are still valid.
547+
// Migrate valid RefPages and DelPage.
558548
WriteBatch batch;
559549
for (const auto & rec : legacy_edit.getRecords())
560550
{
561-
// Get `normal_page_id` from memory's `page_entry_map`. Note: can not get `normal_page_id` from disk,
562-
// if it is a record of RefPage to another RefPage, the later ref-id is resolve to the actual `normal_page_id`.
563-
auto [is_ref, normal_page_id] = current->isRefId(rec.page_id);
564-
if (is_ref)
551+
if (rec.type == WriteBatch::WriteType::REF)
565552
{
566-
batch.putRefPage(rec.page_id, normal_page_id);
567-
num_valid_ref_pages += 1;
553+
// Get `normal_page_id` from memory's `page_entry_map`. Note: can not get `normal_page_id` from disk,
554+
// if it is a record of RefPage to another RefPage, the later ref-id is resolve to the actual `normal_page_id`.
555+
auto [is_ref, normal_page_id] = current->isRefId(rec.page_id);
556+
if (is_ref)
557+
{
558+
batch.putRefPage(rec.page_id, normal_page_id);
559+
num_valid_ref_pages += 1;
560+
}
561+
}
562+
else if (rec.type == WriteBatch::WriteType::DEL)
563+
{
564+
// DelPage should be migrate to new PageFile
565+
batch.delPage(rec.page_id);
566+
num_del_page_meta += 1;
568567
}
569568
}
570569
gc_file_writer->write(batch, gc_file_edit);
@@ -580,10 +579,38 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
580579
gc_file.setFormal();
581580
const auto id = gc_file.fileIdLevel();
582581
LOG_INFO(log,
583-
"GC have migrated " << num_successful_migrate_pages << " regions and " << num_valid_ref_pages << " RefPages to PageFile_"
584-
<< id.first << "_" << id.second);
582+
storage_name << " GC have migrated " << num_successful_migrate_pages //
583+
<< " regions and " << num_valid_ref_pages //
584+
<< " RefPages and " << num_del_page_meta //
585+
<< " DelPage to PageFile_" << id.first << "_" << id.second);
585586
}
586587
return gc_file_edit;
587588
}
588589

590+
/**
591+
* Delete obsolete files that are not used by any version
592+
* @param page_files All avaliable files in disk
593+
* @param writing_file_id_level The PageFile id which is writing to
594+
* @param live_files The live files after gc
595+
*/
596+
void PageStorage::gcRemoveObsoleteFiles(const std::set<PageFile, PageFile::Comparator> & page_files,
597+
const PageFileIdAndLevel & writing_file_id_level,
598+
const std::set<PageFileIdAndLevel> & live_files)
599+
{
600+
for (const auto & page_file : page_files)
601+
{
602+
const auto page_id_and_lvl = page_file.fileIdLevel();
603+
if (page_id_and_lvl >= writing_file_id_level)
604+
{
605+
continue;
606+
}
607+
608+
if (live_files.count(page_id_and_lvl) == 0)
609+
{
610+
// the page file is not used by any version, remove reader cache
611+
page_file.destroy();
612+
}
613+
}
614+
}
615+
589616
} // namespace DB

dbms/src/Storages/Page/PageStorage.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class PageStorage
5656
using OpenReadFiles = std::map<PageFileIdAndLevel, ReaderPtr>;
5757

5858
public:
59-
PageStorage(const String & storage_path, const Config & config_);
59+
PageStorage(String name, const String & storage_path, const Config & config_);
6060

6161
PageId getMaxId();
6262

@@ -89,7 +89,12 @@ class PageStorage
8989
PageEntriesEdit
9090
gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & file_valid_pages, const GcCandidates & merge_files) const;
9191

92+
static void gcRemoveObsoleteFiles(const std::set<PageFile, PageFile::Comparator> & page_files,
93+
const PageFileIdAndLevel & writing_file_id_level,
94+
const std::set<PageFileIdAndLevel> & live_files);
95+
9296
private:
97+
String storage_name; // Identify between different Storage
9398
String storage_path;
9499
Config config;
95100

dbms/src/Storages/Page/tests/gtest_page_storage.cpp

+60-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class PageStorage_test : public ::testing::Test
5656

5757
std::shared_ptr<PageStorage> reopenWithConfig(const PageStorage::Config & config_)
5858
{
59-
return std::make_shared<PageStorage>(path, config_);
59+
return std::make_shared<PageStorage>("test.t", path, config_);
6060
}
6161

6262
protected:
@@ -324,6 +324,65 @@ TEST_F(PageStorage_test, GcMoveRefPage)
324324
ASSERT_EQ(normal_page_id, 1UL);
325325
}
326326

327+
TEST_F(PageStorage_test, GcMovePageDelMeta)
328+
{
329+
const size_t buf_sz = 256;
330+
char c_buff[buf_sz];
331+
332+
{
333+
// Page1 should be written to PageFile{1, 0}
334+
WriteBatch batch;
335+
memset(c_buff, 0xf, buf_sz);
336+
ReadBufferPtr buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
337+
batch.putPage(1, 0, buff, buf_sz);
338+
buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
339+
batch.putPage(2, 0, buff, buf_sz);
340+
buff = std::make_shared<ReadBufferFromMemory>(c_buff, sizeof(c_buff));
341+
batch.putPage(3, 0, buff, buf_sz);
342+
343+
storage->write(batch);
344+
}
345+
346+
{
347+
// DelPage1 should be written to PageFile{2, 0}
348+
WriteBatch batch;
349+
batch.delPage(1);
350+
351+
storage->write(batch);
352+
}
353+
354+
PageFileIdAndLevel id_and_lvl = {2, 0}; // PageFile{2, 0} is ready to be migrated by gc
355+
PageStorage::GcLivesPages livesPages{{id_and_lvl, {0, {}}}};
356+
PageStorage::GcCandidates candidates{
357+
id_and_lvl,
358+
};
359+
const auto page_files = PageStorage::listAllPageFiles(storage->storage_path, true, storage->page_file_log);
360+
auto s0 = storage->getSnapshot();
361+
PageEntriesEdit edit = storage->gcMigratePages(s0, livesPages, candidates);
362+
363+
// We should see migration of DelPage1
364+
bool exist = false;
365+
for (const auto & rec : edit.getRecords())
366+
{
367+
if (rec.type == WriteBatch::WriteType::DEL && rec.page_id == 1)
368+
{
369+
exist = true;
370+
break;
371+
}
372+
}
373+
EXPECT_TRUE(exist);
374+
s0.reset();
375+
376+
auto live_files = storage->versioned_page_entries.gcApply(edit);
377+
EXPECT_EQ(live_files.find(id_and_lvl), live_files.end());
378+
storage->gcRemoveObsoleteFiles(/* page_files= */ page_files, /* writing_file_id_level= */ {3, 0}, live_files);
379+
380+
// reopen PageStorage, Page 1 should be deleted
381+
storage = reopenWithConfig(config);
382+
auto s1 = storage->getSnapshot();
383+
ASSERT_EQ(s1->version()->find(1), std::nullopt);
384+
}
385+
327386
/**
328387
* PageStorage tests with predefine Page1 && Page2
329388
*/

0 commit comments

Comments
 (0)