1
- #include < Storages/Page/PageStorage.h>
2
-
3
1
#include < set>
2
+ #include < utility>
3
+
4
+ #include < Storages/Page/PageStorage.h>
4
5
5
6
#include < IO/ReadBufferFromMemory.h>
6
7
#include < Poco/File.h>
@@ -51,8 +52,9 @@ PageStorage::listAllPageFiles(const String & storage_path, bool remove_tmp_file,
51
52
return page_files;
52
53
}
53
54
54
- PageStorage::PageStorage (const String & storage_path_, const Config & config_)
55
- : storage_path(storage_path_),
55
+ PageStorage::PageStorage (String name, const String & storage_path_, const Config & config_)
56
+ : storage_name(std::move(name)),
57
+ storage_path (storage_path_),
56
58
config(config_),
57
59
versioned_page_entries(),
58
60
page_file_log(&Poco::Logger::get (" PageFile" )),
@@ -120,7 +122,7 @@ PageEntry PageStorage::getEntry(PageId page_id, SnapshotPtr snapshot)
120
122
}
121
123
catch (DB::Exception & e)
122
124
{
123
- LOG_WARNING (log , e.message ());
125
+ LOG_WARNING (log , storage_name << " " << e.message ());
124
126
return {}; // return invalid PageEntry
125
127
}
126
128
}
@@ -336,8 +338,6 @@ bool PageStorage::gc()
336
338
return false ;
337
339
}
338
340
339
- LOG_DEBUG (log , " PageStorage GC start" );
340
-
341
341
PageFileIdAndLevel writing_file_id_level;
342
342
{
343
343
std::lock_guard<std::mutex> lock (write_mutex);
@@ -387,12 +387,14 @@ bool PageStorage::gc()
387
387
|| (merge_files.size () >= 2 && candidate_total_size >= config.merge_hint_low_used_file_total_size );
388
388
if (!should_merge)
389
389
{
390
- LOG_DEBUG (log ,
391
- " GC exit without merging. merge file size: " << merge_files.size () << " , candidate size: " << candidate_total_size);
390
+ LOG_TRACE (log ,
391
+ storage_name << " GC exit without merging. merge file size: " << merge_files.size ()
392
+ << " , candidate size: " << candidate_total_size);
392
393
return false ;
393
394
}
394
395
395
- LOG_INFO (log , " GC decide to merge " << merge_files.size () << " files, containing " << migrate_page_count << " regions" );
396
+ LOG_INFO (log ,
397
+ storage_name << " GC decide to merge " << merge_files.size () << " files, containing " << migrate_page_count << " regions" );
396
398
397
399
// There are no valid pages to be migrated but valid ref pages, scan over all `merge_files` and do migrate.
398
400
gc_file_entries_edit = gcMigratePages (snapshot, file_valid_pages, merge_files);
@@ -421,20 +423,7 @@ bool PageStorage::gc()
421
423
}
422
424
423
425
// Delete obsolete files that are not used by any version, without lock
424
- for (const auto & page_file : page_files)
425
- {
426
- const auto page_id_and_lvl = page_file.fileIdLevel ();
427
- if (page_id_and_lvl >= writing_file_id_level)
428
- {
429
- continue ;
430
- }
431
-
432
- if (live_files.count (page_id_and_lvl) == 0 )
433
- {
434
- // the page file is not used by any version, remove reader cache
435
- page_file.destroy ();
436
- }
437
- }
426
+ gcRemoveObsoleteFiles (page_files, writing_file_id_level, live_files);
438
427
return true ;
439
428
}
440
429
@@ -491,6 +480,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
491
480
492
481
size_t num_successful_migrate_pages = 0 ;
493
482
size_t num_valid_ref_pages = 0 ;
483
+ size_t num_del_page_meta = 0 ;
494
484
auto * current = snapshot->version ();
495
485
{
496
486
PageEntriesEdit legacy_edit; // All page entries in `merge_files`
@@ -510,8 +500,8 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
510
500
continue ;
511
501
}
512
502
503
+ PageIdAndEntries page_id_and_entries; // The valid pages that we need to migrate to `gc_file`
513
504
auto to_merge_file_reader = to_merge_file.createReader ();
514
- PageIdAndEntries page_id_and_entries;
515
505
{
516
506
const auto & page_ids = it->second .second ;
517
507
for (auto page_id : page_ids)
@@ -530,7 +520,7 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
530
520
catch (DB::Exception & e)
531
521
{
532
522
// ignore if it2 is a ref to non-exist page
533
- LOG_WARNING (log , " Ignore invalid RefPage while gcMigratePages: " + e.message ());
523
+ LOG_WARNING (log , storage_name << " Ignore invalid RefPage while gcMigratePages: " << e.message ());
534
524
}
535
525
}
536
526
}
@@ -554,17 +544,26 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
554
544
}
555
545
556
546
{
557
- // Migrate RefPages which are still valid .
547
+ // Migrate valid RefPages and DelPage .
558
548
WriteBatch batch;
559
549
for (const auto & rec : legacy_edit.getRecords ())
560
550
{
561
- // Get `normal_page_id` from memory's `page_entry_map`. Note: can not get `normal_page_id` from disk,
562
- // if it is a record of RefPage to another RefPage, the later ref-id is resolve to the actual `normal_page_id`.
563
- auto [is_ref, normal_page_id] = current->isRefId (rec.page_id );
564
- if (is_ref)
551
+ if (rec.type == WriteBatch::WriteType::REF)
565
552
{
566
- batch.putRefPage (rec.page_id , normal_page_id);
567
- num_valid_ref_pages += 1 ;
553
+ // Get `normal_page_id` from memory's `page_entry_map`. Note: can not get `normal_page_id` from disk,
554
+ // if it is a record of RefPage to another RefPage, the later ref-id is resolve to the actual `normal_page_id`.
555
+ auto [is_ref, normal_page_id] = current->isRefId (rec.page_id );
556
+ if (is_ref)
557
+ {
558
+ batch.putRefPage (rec.page_id , normal_page_id);
559
+ num_valid_ref_pages += 1 ;
560
+ }
561
+ }
562
+ else if (rec.type == WriteBatch::WriteType::DEL)
563
+ {
564
+ // DelPage should be migrate to new PageFile
565
+ batch.delPage (rec.page_id );
566
+ num_del_page_meta += 1 ;
568
567
}
569
568
}
570
569
gc_file_writer->write (batch, gc_file_edit);
@@ -580,10 +579,38 @@ PageStorage::gcMigratePages(const SnapshotPtr & snapshot, const GcLivesPages & f
580
579
gc_file.setFormal ();
581
580
const auto id = gc_file.fileIdLevel ();
582
581
LOG_INFO (log ,
583
- " GC have migrated " << num_successful_migrate_pages << " regions and " << num_valid_ref_pages << " RefPages to PageFile_"
584
- << id.first << " _" << id.second );
582
+ storage_name << " GC have migrated " << num_successful_migrate_pages //
583
+ << " regions and " << num_valid_ref_pages //
584
+ << " RefPages and " << num_del_page_meta //
585
+ << " DelPage to PageFile_" << id.first << " _" << id.second );
585
586
}
586
587
return gc_file_edit;
587
588
}
588
589
590
+ /* *
591
+ * Delete obsolete files that are not used by any version
592
+ * @param page_files All avaliable files in disk
593
+ * @param writing_file_id_level The PageFile id which is writing to
594
+ * @param live_files The live files after gc
595
+ */
596
+ void PageStorage::gcRemoveObsoleteFiles (const std::set<PageFile, PageFile::Comparator> & page_files,
597
+ const PageFileIdAndLevel & writing_file_id_level,
598
+ const std::set<PageFileIdAndLevel> & live_files)
599
+ {
600
+ for (const auto & page_file : page_files)
601
+ {
602
+ const auto page_id_and_lvl = page_file.fileIdLevel ();
603
+ if (page_id_and_lvl >= writing_file_id_level)
604
+ {
605
+ continue ;
606
+ }
607
+
608
+ if (live_files.count (page_id_and_lvl) == 0 )
609
+ {
610
+ // the page file is not used by any version, remove reader cache
611
+ page_file.destroy ();
612
+ }
613
+ }
614
+ }
615
+
589
616
} // namespace DB
0 commit comments