diff --git a/dbms/src/IO/Compression/CompressedReadBufferBase.cpp b/dbms/src/IO/Compression/CompressedReadBufferBase.cpp index aeed4e7f69c..d4369d11a74 100644 --- a/dbms/src/IO/Compression/CompressedReadBufferBase.cpp +++ b/dbms/src/IO/Compression/CompressedReadBufferBase.cpp @@ -46,7 +46,7 @@ static void readHeaderAndGetCodec(const char * compressed_buffer, CompressionCod throw Exception( ErrorCodes::CANNOT_DECOMPRESS, "Data compressed with different methods, given method " - "byte {#x}, previous method byte {#x}", + "byte {:#x}, previous method byte {:#x}", method_byte, codec->getMethodByte()); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMerge.h b/dbms/src/Storages/DeltaMerge/DeltaMerge.h index 167c7a794b9..acef20e68a4 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMerge.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMerge.h @@ -149,7 +149,7 @@ class DeltaMergeBlockInputStream final } else { - use_stable_rows = delta_index_it.getSid(); + use_stable_rows = delta_index_it->getSid(); } auto all_range = RowKeyRange::newAll(is_common_handle, rowkey_column_size); last_value = all_range.getStart().toRowKeyValue(); @@ -365,10 +365,10 @@ class DeltaMergeBlockInputStream final } else { - if (delta_index_it.isDelete()) + if (delta_index_it->isDelete()) { // Delete. - writeDeleteFromDelta(delta_index_it.getCount()); + writeDeleteFromDelta(delta_index_it->getCount()); } else { @@ -376,17 +376,17 @@ class DeltaMergeBlockInputStream final bool do_write = true; if constexpr (skippable_place) { - if (delta_index_it.getSid() < sk_skip_stable_rows) + if (delta_index_it->getSid() < sk_skip_stable_rows) { do_write = false; - sk_skip_total_rows += delta_index_it.getCount(); + sk_skip_total_rows += delta_index_it->getCount(); } } if (do_write) { - use_delta_offset = delta_index_it.getValue(); - use_delta_rows = delta_index_it.getCount(); + use_delta_offset = delta_index_it->getValue(); + use_delta_rows = delta_index_it->getCount(); writeInsertFromDelta(output_columns, output_write_limit); } } @@ -630,9 +630,9 @@ class DeltaMergeBlockInputStream final { UInt64 prev_sid; { - prev_sid = delta_index_it.getSid(); - if (delta_index_it.isDelete()) - prev_sid += delta_index_it.getCount(); + prev_sid = delta_index_it->getSid(); + if (delta_index_it->isDelete()) + prev_sid += delta_index_it->getCount(); } ++delta_index_it; @@ -644,7 +644,7 @@ class DeltaMergeBlockInputStream final } else { - use_stable_rows = delta_index_it.getSid() - prev_sid; + use_stable_rows = delta_index_it->getSid() - prev_sid; } } }; diff --git a/dbms/src/Storages/DeltaMerge/DeltaTree.h b/dbms/src/Storages/DeltaMerge/DeltaTree.h index e3fe7203f27..57cbbc12618 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaTree.h +++ b/dbms/src/Storages/DeltaMerge/DeltaTree.h @@ -732,34 +732,30 @@ class DTCompactedEntries public: struct Entry { + friend class DTCompactedEntries; + + public: + Entry(UInt64 sid_, bool is_insert_, UInt32 count_, UInt64 value_) + : sid(sid_) + , is_insert(is_insert_) + , count(count_) + , value(value_) + {} + + UInt64 getSid() const { return sid; } + bool isInsert() const { return is_insert; } + bool isDelete() const { return !is_insert; } + UInt32 getCount() const { return count; } + UInt64 getValue() const { return value; } + + private: UInt64 sid; bool is_insert; UInt32 count; UInt64 value; }; using Entries = std::vector; - - struct Iterator - { - typename Entries::iterator it; - - explicit Iterator(typename Entries::iterator it_) - : it(it_) - {} - bool operator==(const Iterator & rhs) const { return it == rhs.it; } - bool operator!=(const Iterator & rhs) const { return it != rhs.it; } - Iterator & operator++() - { - ++it; - return *this; - } - - UInt64 getSid() const { return it->sid; } - bool isInsert() const { return it->is_insert; } - bool isDelete() const { return !it->is_insert; } - UInt32 getCount() const { return it->count; } - UInt64 getValue() const { return it->value; } - }; + using Iterator = typename Entries::iterator; private: Entries entries; @@ -783,14 +779,12 @@ class DTCompactedEntries continue; } } - Entry entry - = {.sid = it.getSid(), .is_insert = it.isInsert(), .count = it.getCount(), .value = it.getValue()}; - entries.emplace_back(entry); + entries.emplace_back(it.getSid(), it.isInsert(), it.getCount(), it.getValue()); } } - auto begin() { return Iterator(entries.begin()); } - auto end() { return Iterator(entries.end()); } + auto begin() { return entries.begin(); } + auto end() { return entries.end(); } }; template diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index b0fd3c26b08..9c5919670dd 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -303,7 +303,7 @@ void DMFilePackFilter::tryLoadIndex(RSCheckParam & param, ColId col_id) } std::pair, DMFilePackFilterResults> DMFilePackFilter:: - getSkippedRangeAndFilterForBitmap( + getSkippedRangeAndFilterForBitmapStableOnly( const DMContext & dm_context, const DMFiles & dmfiles, const DMFilePackFilterResults & pack_filter_results, @@ -318,12 +318,9 @@ std::pair, DMFilePackFilterResults> DMFileP // We need to read these packs and do RowKey filter and MVCC filter for them. DMFilePackFilterResults new_pack_filter_results; new_pack_filter_results.reserve(dmfiles.size()); + RUNTIME_CHECK(pack_filter_results.size() == dmfiles.size()); - // The offset of the first row in the current range. - size_t offset = 0; - // The number of rows in the current range. - size_t rows = 0; - UInt32 preceded_rows = 0; + UInt64 current_offset = 0; auto file_provider = dm_context.global_context.getFileProvider(); for (size_t i = 0; i < dmfiles.size(); ++i) @@ -337,7 +334,8 @@ std::pair, DMFilePackFilterResults> DMFileP for (size_t pack_id = 0; pack_id < pack_stats.size(); ++pack_id) { const auto & pack_stat = pack_stats[pack_id]; - preceded_rows += pack_stat.rows; + auto prev_offset = current_offset; + current_offset += pack_stat.rows; if (!pack_res[pack_id].isUse()) continue; @@ -355,18 +353,156 @@ std::pair, DMFilePackFilterResults> DMFileP // This pack is skipped by the skipped_range, do not need to read the rows from disk new_pack_filter->pack_res[pack_id] = RSResult::None; - // When this pack is next to the previous pack, we merge them. - // Otherwise, we record the previous continuous packs and start a new one. - if (offset + rows == preceded_rows - pack_stat.rows) + // When this pack is next to the previous skipped pack, we merge them. + if (!skipped_ranges.empty() && skipped_ranges.back().offset + skipped_ranges.back().rows == prev_offset) + skipped_ranges.back().rows += pack_stat.rows; + else + skipped_ranges.emplace_back(prev_offset, pack_stat.rows); + } + + if (new_pack_filter) + new_pack_filter_results.emplace_back(std::move(new_pack_filter)); + else + new_pack_filter_results.emplace_back(pack_filter); + } + + return {skipped_ranges, new_pack_filter_results}; +} + +std::pair, DMFilePackFilterResults> DMFilePackFilter:: + getSkippedRangeAndFilterForBitmapNormal( + const DMContext & dm_context, + const DMFiles & dmfiles, + const DMFilePackFilterResults & pack_filter_results, + UInt64 start_ts, + const DeltaIndexIterator & delta_index_begin, + const DeltaIndexIterator & delta_index_end) +{ + // Packs that all rows compliant with MVCC filter and RowKey filter requirements. + // For building bitmap filter, we don't need to read these packs, + // just set corresponding positions in the bitmap to true. + // So we record the offset and rows of these packs and merge continuous ranges. + std::vector skipped_ranges; + // Packs that some rows compliant with MVCC filter and RowKey filter requirements. + // We need to read these packs and do RowKey filter and MVCC filter for them. + DMFilePackFilterResults new_pack_filter_results; + new_pack_filter_results.reserve(dmfiles.size()); + RUNTIME_CHECK(pack_filter_results.size() == dmfiles.size()); + + UInt64 current_offset = 0; + UInt64 prev_sid = 0; + UInt64 sid = 0; + UInt32 prev_delete_count = 0; + + auto delta_index_it = delta_index_begin; + auto file_provider = dm_context.global_context.getFileProvider(); + for (size_t i = 0; i < dmfiles.size(); ++i) + { + const auto & dmfile = dmfiles[i]; + const auto & pack_filter = pack_filter_results[i]; + const auto & pack_res = pack_filter->getPackRes(); + const auto & handle_res = pack_filter->getHandleRes(); + const auto & pack_stats = dmfile->getPackStats(); + DMFilePackFilterResultPtr new_pack_filter; + for (size_t pack_id = 0; pack_id < pack_stats.size(); ++pack_id) + { + const auto & pack_stat = pack_stats[pack_id]; + auto prev_offset = current_offset; + current_offset += pack_stat.rows; + if (!pack_res[pack_id].isUse()) + continue; + + // Find the first `delta_index_it` whose sid > prev_offset + auto new_it = std::upper_bound( + delta_index_it, + delta_index_end, + prev_offset, + [](UInt64 val, const DeltaIndexCompacted::Entry & e) { return val < e.getSid(); }); + if (new_it != delta_index_it) { - rows += pack_stat.rows; + auto prev_it = std::prev(new_it); + prev_sid = prev_it->getSid(); + prev_delete_count = prev_it->isDelete() ? prev_it->getCount() : 0; + delta_index_it = new_it; } - else + sid = delta_index_it != delta_index_end ? delta_index_it->getSid() : std::numeric_limits::max(); + + // The sid range of the pack: (prev_offset, current_offset]. + // The continuously sorted sid range in delta index: (prev_sid, sid]. + + // Since `delta_index_it` is the first element with sid > prev_offset, + // the preceding element’s sid (prev_sid) must be <= prev_offset. + RUNTIME_CHECK(prev_offset >= prev_sid); + if (prev_offset == prev_sid) + { + // If `prev_offset == prev_sid`, the RowKey of the delta row preceding `prev_sid` should not + // be the same as the RowKey of `prev_sid`. This is because for the same RowKey, the version + // in the delta data should be greater than the version in the stable data. + // However, this is not always the case and many situations need to be confirmed. For safety + // reasons, the pack will not be skipped in this situation. + // TODO: It might be possible to use a minmax index to compare the RowKey of the + // `prev_sid` row with the RowKey of the preceding delta row. + continue; + } + + // Now check the right boundary of this pack(i.e. current_offset) + if (current_offset >= sid) + { + // If `current_offset > sid`, it means some data in pack exceeds the right boundary of + // (prev_sid, sid] so this pack can not be skipped. + // + // If `current_offset == sid`, the delta row following this sid row might have the same + // RowKey. The pack also can not be skipped because delta merge and MVCC filter is necessary. + // TODO: It might be possible to use a minmax index to compare the RowKey of the + // current sid row with the RowKey of the following delta row. + continue; + } + + if (prev_delete_count > 0) { - skipped_ranges.emplace_back(offset, rows); - offset = preceded_rows - pack_stat.rows; - rows = pack_stat.rows; + // The previous delta index iterator is a delete, we must check if the sid range of the + // pack intersects with the delete range. + // The sid range of the pack: (prev_offset, current_offset]. + // The delete sid range: (prev_sid, prev_sid + prev_delete_count]. + if (current_offset <= prev_sid + prev_delete_count) + { + // The sid range of the pack is fully covered by the delete sid range, it means that + // every row in this pack has been deleted. In this case, the pack can be safely skipped. + if unlikely (!new_pack_filter) + new_pack_filter = std::make_shared(*pack_filter); + + new_pack_filter->pack_res[pack_id] = RSResult::None; + continue; + } + if (prev_offset < prev_sid + prev_delete_count) + { + // Some rows in the pack are deleted while others are not, it means the pack cannot + // be skipped. + continue; + } + // None of the rows in the pack have been deleted } + + // Check other conditions that may allow the pack to be skipped + if (handle_res[pack_id] == RSResult::Some || pack_stat.not_clean > 0 + || pack_filter->getMaxVersion(dmfile, pack_id, file_provider, dm_context.scan_context) > start_ts) + { + // `not_clean > 0` means there are more than one version for some rowkeys in this pack + // `pack.max_version > start_ts` means some rows will be filtered by MVCC reading + // We need to read this pack to do delte merge, RowKey or MVCC filter. + continue; + } + + if unlikely (!new_pack_filter) + new_pack_filter = std::make_shared(*pack_filter); + + // This pack is skipped by the skipped_range, do not need to read the rows from disk + new_pack_filter->pack_res[pack_id] = RSResult::None; + // When this pack is next to the previous skipped pack, we merge them. + if (!skipped_ranges.empty() && skipped_ranges.back().offset + skipped_ranges.back().rows == prev_offset) + skipped_ranges.back().rows += pack_stat.rows; + else + skipped_ranges.emplace_back(prev_offset, pack_stat.rows); } if (new_pack_filter) @@ -374,8 +510,6 @@ std::pair, DMFilePackFilterResults> DMFileP else new_pack_filter_results.emplace_back(pack_filter); } - if (rows > 0) - skipped_ranges.emplace_back(offset, rows); return {skipped_ranges, new_pack_filter_results}; } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index 4e6a6c4ab35..116b6c8e87f 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -110,6 +111,8 @@ class DMFilePackFilter : offset(offset_) , rows(rows_) {} + + bool operator==(const Range &) const = default; }; /** * @brief For all the packs in `pack_filter_results`, if all the rows in the pack @@ -121,12 +124,31 @@ class DMFilePackFilter * - NewPackFilterResults: Those packs should be read from disk and go through the * RowKey filter and MVCC filter */ - static std::pair, DMFilePackFilterResults> getSkippedRangeAndFilterForBitmap( + static std::pair, DMFilePackFilterResults> getSkippedRangeAndFilterForBitmapStableOnly( const DMContext & dm_context, const DMFiles & dmfiles, const DMFilePackFilterResults & pack_filter_results, UInt64 start_ts); + /** + * @brief For all the packs in `pack_filter_results`, if all the rows in the pack + * comply with RowKey filter and MVCC filter (by `start_ts`) requirements, + * and are continuously sorted in delta index, or are deleted, then we skip + * reading the packs from disk and return the skipped ranges(not deleted), + * and new PackFilterResults for building bitmap. + * @return + * - SkippedRanges: All the rows in the ranges that comply with the requirements. + * - NewPackFilterResults: Those packs should be read from disk and go through + * the delta merge, RowKey filter, and MVCC filter. + */ + static std::pair, DMFilePackFilterResults> getSkippedRangeAndFilterForBitmapNormal( + const DMContext & dm_context, + const DMFiles & dmfiles, + const DMFilePackFilterResults & pack_filter_results, + UInt64 start_ts, + const DeltaIndexIterator & delta_index_begin, + const DeltaIndexIterator & delta_index_end); + private: DMFilePackFilter( const DMFilePtr & dmfile_, diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 93fa3f8b26b..cf88e802dad 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -2806,7 +2806,7 @@ std::pair Segment::ensurePlace( { const auto & stable_snap = segment_snap->stable; auto delta_snap = delta_reader->getDeltaSnap(); - // Try to clone from the sahred delta index, if it fails to reuse the shared delta index, + // Try to clone from the shared delta index, if it fails to reuse the shared delta index, // it will return an empty delta index and we should place it in the following branch. auto my_delta_index = delta_snap->getSharedDeltaIndex()->tryClone(delta_snap->getRows(), delta_snap->getDeletes()); auto my_delta_tree = my_delta_index->getDeltaTree(); @@ -3116,20 +3116,59 @@ BitmapFilterPtr Segment::buildBitmapFilterNormal( ColumnDefines columns_to_read{ getExtraHandleColumnDefine(is_common_handle), }; - // Generate the bitmap according to the MVCC filter result - auto stream = getInputStreamModeNormal( + sanitizeCheckReadRanges(__FUNCTION__, read_ranges, rowkey_range, log); + + LOG_TRACE(segment_snap->log, "Begin segment create input stream"); + + auto read_tag = ReadTag::MVCC; + auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, read_ranges, read_tag, start_ts); + + const auto & dmfiles = segment_snap->stable->getDMFiles(); + auto [skipped_ranges, new_pack_filter_results] = DMFilePackFilter::getSkippedRangeAndFilterForBitmapNormal( dm_context, - columns_to_read, - segment_snap, - read_ranges, + dmfiles, pack_filter_results, start_ts, + read_info.index_begin, + read_info.index_end); + + BlockInputStreamPtr stream = getPlacedStream( + dm_context, + *read_info.read_columns, + read_ranges, + segment_snap->stable, + read_info.getDeltaReader(read_tag), + read_info.index_begin, + read_info.index_end, expected_block_size, - /*need_row_id*/ true); + read_tag, + new_pack_filter_results, + start_ts, + true); + + stream = std::make_shared>(stream, read_ranges, 0); + stream = std::make_shared>( + stream, + columns_to_read, + start_ts, + is_common_handle, + dm_context.tracing_id, + dm_context.scan_context); + + LOG_TRACE( + segment_snap->log, + "Finish segment create input stream, start_ts={} range_size={} ranges={}", + start_ts, + read_ranges.size(), + DB::DM::toDebugString(read_ranges)); + // `total_rows` is the rows read for building bitmap auto total_rows = segment_snap->delta->getRows() + segment_snap->stable->getDMFilesRows(); auto bitmap_filter = std::make_shared(total_rows, /*default_value*/ false); + // Generate the bitmap according to the MVCC filter result bitmap_filter->set(stream); + for (const auto & range : skipped_ranges) + bitmap_filter->set(range.offset, range.rows); bitmap_filter->runOptimize(); const auto elapse_ns = sw_total.elapsed(); @@ -3160,8 +3199,11 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly( return elapse_ns / 1'000'000.0; }; - auto [skipped_ranges, new_pack_filter_results] - = DMFilePackFilter::getSkippedRangeAndFilterForBitmap(dm_context, dmfiles, pack_filter_results, start_ts); + auto [skipped_ranges, new_pack_filter_results] = DMFilePackFilter::getSkippedRangeAndFilterForBitmapStableOnly( + dm_context, + dmfiles, + pack_filter_results, + start_ts); if (skipped_ranges.size() == 1 && skipped_ranges[0].offset == 0 && skipped_ranges[0].rows == segment_snap->stable->getDMFilesRows()) { diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 1aac3acc6b8..3c140fbce86 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -1085,11 +1086,18 @@ try // test built bitmap filter auto segment_snap = segment->createSnapshot(dmContext(), false, CurrentMetrics::DT_SnapshotOfRead); auto real_ranges = segment->shrinkRowKeyRanges({read_range}); + DMFilePackFilterResults pack_filter_results; + pack_filter_results.reserve(segment_snap->stable->getDMFiles().size()); + for (const auto & file : segment_snap->stable->getDMFiles()) + { + auto pack_filter = DMFilePackFilter::loadFrom(*dm_context, file, true, real_ranges, EMPTY_RS_OPERATOR, {}); + pack_filter_results.push_back(pack_filter); + } auto bitmap_filter = segment->buildBitmapFilter( // dmContext(), segment_snap, real_ranges, - {}, + pack_filter_results, std::numeric_limits::max(), DEFAULT_BLOCK_SIZE); // the bitmap only contains the overlapped packs of ColumnFileBig. So only 60 here. diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_bitmap.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_bitmap.cpp index 854ed2ced9b..aadbf595707 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_bitmap.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_bitmap.cpp @@ -55,7 +55,7 @@ class SegmentBitmapFilterTest : public SegmentTestBasic `seg_data`: s:[a, b)|d_tiny:[a, b)|d_tiny_del:[a, b)|d_big:[a, b)|d_dr:[a, b)|d_mem:[a, b)|d_mem_del - s: stable - d_tiny: delta ColumnFileTiny - - d_del_tiny: delta ColumnFileTiny with delete flag + - d_tiny_del: delta ColumnFileTiny with delete flag - d_big: delta ColumnFileBig - d_dr: delta delete range @@ -565,4 +565,194 @@ try } CATCH +TEST_F(SegmentBitmapFilterTest, testSkipPackStableOnly) +{ + std::string expect_result; + expect_result.append(std::string(200, '0')); + expect_result.append(std::string(300, '1')); + for (size_t i = 0; i < 500; i++) + expect_result.append(std::string("01")); + expect_result.append(std::string(500, '1')); + expect_result.append(std::string(500, '0')); + for (size_t pack_rows : {1, 2, 10, 100, 8192}) + { + db_context->getSettingsRef().dt_segment_stable_pack_rows = pack_rows; + db_context->getGlobalContext().getSettingsRef().dt_segment_stable_pack_rows = pack_rows; + reloadDMContext(); + + version = 0; + writeSegment("d_mem:[0, 1000)|d_mem:[500, 1500)|d_mem:[1500, 2000)"); + mergeSegmentDelta(SEG_ID, true); + auto [seg, snap] = getSegmentForRead(SEG_ID); + ASSERT_EQ(seg->getDelta()->getRows(), 0); + ASSERT_EQ(seg->getDelta()->getDeletes(), 0); + ASSERT_EQ(seg->getStable()->getRows(), 2500); + + auto ranges = std::vector{buildRowKeyRange(200, 2000)}; + auto pack_filter_results = loadPackFilterResults(snap, ranges); + + if (pack_rows == 1) + { + ASSERT_EQ(version, 3); + const auto & dmfiles = snap->stable->getDMFiles(); + auto [skipped_ranges, new_pack_filter_results] + = DMFilePackFilter::getSkippedRangeAndFilterForBitmapStableOnly( + *dm_context, + dmfiles, + pack_filter_results, + 2); + // [200, 500), [1500, 2000) + ASSERT_EQ(skipped_ranges.size(), 2); + ASSERT_EQ(skipped_ranges[0], DMFilePackFilter::Range(200, 300)); + ASSERT_EQ(skipped_ranges[1], DMFilePackFilter::Range(1500, 500)); + ASSERT_EQ(new_pack_filter_results.size(), 1); + const auto & pack_res = new_pack_filter_results[0]->getPackRes(); + ASSERT_EQ(pack_res.size(), 2000); + // [0, 200) is not in range, [200, 500) is skipped. + for (size_t i = 0; i < 500; ++i) + ASSERT_EQ(pack_res[i], RSResult::None); + // Not clean + for (size_t i = 500; i < 1000; ++i) + ASSERT_EQ(pack_res[i], RSResult::Some); + for (size_t i = 1000; i < 1500; ++i) + ASSERT_EQ(pack_res[i], RSResult::None); + // min version > start_ts + for (size_t i = 1500; i < 2000; ++i) + ASSERT_EQ(pack_res[i], RSResult::Some); + } + + auto bitmap_filter + = seg->buildBitmapFilterStableOnly(*dm_context, snap, ranges, pack_filter_results, 2, DEFAULT_BLOCK_SIZE); + + ASSERT_EQ(expect_result, bitmap_filter->toDebugString()); + + deleteRangeSegment(SEG_ID); + } +} + +TEST_F(SegmentBitmapFilterTest, testSkipPackNormal) +{ + std::string expect_result; + expect_result.append(std::string(50, '0')); + expect_result.append(std::string(450, '1')); + for (size_t i = 0; i < 500; i++) + expect_result.append(std::string("01")); + expect_result.append(std::string(1000, '1')); + expect_result.append(std::string(16, '1')); + + expect_result[99] = '0'; + expect_result[200] = '0'; + for (size_t i = 301; i < 315; ++i) + expect_result[i] = '0'; + for (size_t i = 355; i < 370; ++i) + expect_result[i] = '0'; + for (size_t i = 409; i < 481; ++i) + expect_result[i] = '0'; + + for (size_t pack_rows : {1, 2, 10, 100, 8192}) + { + db_context->getSettingsRef().dt_segment_stable_pack_rows = pack_rows; + db_context->getGlobalContext().getSettingsRef().dt_segment_stable_pack_rows = pack_rows; + reloadDMContext(); + + version = 0; + writeSegment("d_mem:[0, 1000)|d_mem:[500, 1500)|d_mem:[1500, 2000)"); + mergeSegmentDelta(SEG_ID, true); + writeSegment("d_tiny:[99, 100)|d_dr:[355, 370)|d_dr:[409, 481)|d_mem:[200, 201)|d_mem:[301, 315)"); + auto [seg, snap] = getSegmentForRead(SEG_ID); + ASSERT_EQ(seg->getDelta()->getRows(), 16); + ASSERT_EQ(seg->getDelta()->getDeletes(), 2); + ASSERT_EQ(seg->getStable()->getRows(), 2500); + + auto ranges = std::vector{buildRowKeyRange(50, 2000)}; + auto pack_filter_results = loadPackFilterResults(snap, ranges); + UInt64 start_ts = 6; + if (pack_rows == 10) + { + ASSERT_EQ(version, 6); + const auto & dmfiles = snap->stable->getDMFiles(); + ColumnDefines columns_to_read{ + getExtraHandleColumnDefine(seg->is_common_handle), + }; + auto read_info = seg->getReadInfo(*dm_context, columns_to_read, snap, ranges, ReadTag::MVCC, start_ts); + auto [skipped_ranges, new_pack_filter_results] = DMFilePackFilter::getSkippedRangeAndFilterForBitmapNormal( + *dm_context, + dmfiles, + pack_filter_results, + start_ts, + read_info.index_begin, + read_info.index_end); + + std::vector> skip_ranges_result = { + // [50, 90) + {50, 40, false}, + // Due to changing 99, [90, 100) pack can not be skipped. + // However, [100, 110) pack also can not be skipped because there is a preceding delta row, + // which can be optimized further if we can identify whether the rowkeys are the same. + // [110, 200) + {110, 90, false}, + // Due to changing 200, [200, 210) pack can not be skipped. + // [210, 300) + {210, 90, false}, + // Due to changing [301, 315), [300, 310) and [310, 320) packs can not be skipped. + // [320, 350) + {320, 30, false}, + // Due to deleting [355, 370), [350, 360) pack can not be skipped. + // [360, 370) pack can be skipped due to be totally deleted. + {360, 10, true}, + // [370, 400) + {370, 30, false}, + // Due to deleting [409, 481), [400, 410) and [480, 490) packs can not be skipped. + // Packs between [410, 480) can be skipped due to be totally deleted. + {410, 70, true}, + // [490, 500) + {490, 10, false}, + // [1500, 2500) + {1500, 1000, false}, + }; + + size_t skipped_ranges_idx = 0; + const auto & pack_res = new_pack_filter_results[0]->getPackRes(); + ASSERT_EQ(pack_filter_results.size(), 1); + ASSERT_EQ(pack_res.size(), 250); + std::vector flag(250); + for (auto [offset, limit, is_delete] : skip_ranges_result) + { + if (!is_delete) + ASSERT_EQ(skipped_ranges.at(skipped_ranges_idx++), DMFilePackFilter::Range(offset, limit)); + + for (size_t i = 0; i < limit; i += 10) + { + size_t pack_id = (offset + i) / 10; + ASSERT_EQ(pack_res.at(pack_id), RSResult::None); + flag[pack_id] = 1; + } + } + ASSERT_EQ(skipped_ranges_idx, skipped_ranges.size()); + // Check if other pack results are the same as the original pack results + ASSERT_EQ(new_pack_filter_results.size(), 1); + const auto & original_pack_res = pack_filter_results[0]->getPackRes(); + ASSERT_EQ(original_pack_res.size(), 250); + for (size_t i = 0; i < 250; ++i) + { + if (flag[i]) + continue; + ASSERT_EQ(original_pack_res[i], pack_res[i]); + } + } + + auto bitmap_filter = seg->buildBitmapFilterNormal( + *dm_context, + snap, + ranges, + pack_filter_results, + start_ts, + DEFAULT_BLOCK_SIZE); + + ASSERT_EQ(expect_result, bitmap_filter->toDebugString()); + + deleteRangeSegment(SEG_ID); + } +} + } // namespace DB::DM::tests