Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce redundant pack reads during building bitmap filter for delta merge case #9876

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion dbms/src/IO/Compression/CompressedReadBufferBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ static void readHeaderAndGetCodec(const char * compressed_buffer, CompressionCod
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Data compressed with different methods, given method "
"byte {#x}, previous method byte {#x}",
"byte {:#x}, previous method byte {:#x}",
method_byte,
codec->getMethodByte());
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ struct Settings
M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests, include those sharing blocks in block queue. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_count, 5, "Deprecated") \
M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \
M(SettingBool, dt_enable_bitmap_filter_skip_pack, true, "Enable bitmap filter skip pack or not") \
M(SettingDouble, dt_read_thread_count_scale, 2.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \
M(SettingDouble, io_thread_count_scale, 5.0, "Number of thread of IOThreadPool = number of logical cpu cores * io_thread_count_scale. Only has meaning at server startup.") \
M(SettingUInt64, init_thread_count_scale, 100, "Number of thread = number of logical cpu cores * init_thread_count_scale. It just works for thread pool for initStores and loadMetadata. Only has meaning at server startup.") \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/DMContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ DMContext::DMContext(
, read_stable_only(settings.dt_read_stable_only)
, enable_relevant_place(settings.dt_enable_relevant_place)
, enable_skippable_place(settings.dt_enable_skippable_place)
, enable_bitmap_filter_skip_pack(settings.dt_enable_bitmap_filter_skip_pack)
, tracing_id(tracing_id_)
, scan_context(scan_context_ ? scan_context_ : std::make_shared<ScanContext>())
{}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ struct DMContext : private boost::noncopyable
const bool read_stable_only;
const bool enable_relevant_place;
const bool enable_skippable_place;
const bool enable_bitmap_filter_skip_pack;

String tracing_id;

Expand Down
22 changes: 11 additions & 11 deletions dbms/src/Storages/DeltaMerge/DeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class DeltaMergeBlockInputStream final
}
else
{
use_stable_rows = delta_index_it.getSid();
use_stable_rows = delta_index_it->getSid();
}
auto all_range = RowKeyRange::newAll(is_common_handle, rowkey_column_size);
last_value = all_range.getStart().toRowKeyValue();
Expand Down Expand Up @@ -365,28 +365,28 @@ class DeltaMergeBlockInputStream final
}
else
{
if (delta_index_it.isDelete())
if (delta_index_it->isDelete())
{
// Delete.
writeDeleteFromDelta(delta_index_it.getCount());
writeDeleteFromDelta(delta_index_it->getCount());
}
else
{
// Insert.
bool do_write = true;
if constexpr (skippable_place)
{
if (delta_index_it.getSid() < sk_skip_stable_rows)
if (delta_index_it->getSid() < sk_skip_stable_rows)
{
do_write = false;
sk_skip_total_rows += delta_index_it.getCount();
sk_skip_total_rows += delta_index_it->getCount();
}
}

if (do_write)
{
use_delta_offset = delta_index_it.getValue();
use_delta_rows = delta_index_it.getCount();
use_delta_offset = delta_index_it->getValue();
use_delta_rows = delta_index_it->getCount();
writeInsertFromDelta(output_columns, output_write_limit);
}
}
Expand Down Expand Up @@ -630,9 +630,9 @@ class DeltaMergeBlockInputStream final
{
UInt64 prev_sid;
{
prev_sid = delta_index_it.getSid();
if (delta_index_it.isDelete())
prev_sid += delta_index_it.getCount();
prev_sid = delta_index_it->getSid();
if (delta_index_it->isDelete())
prev_sid += delta_index_it->getCount();
}

++delta_index_it;
Expand All @@ -644,7 +644,7 @@ class DeltaMergeBlockInputStream final
}
else
{
use_stable_rows = delta_index_it.getSid() - prev_sid;
use_stable_rows = delta_index_it->getSid() - prev_sid;
}
}
};
Expand Down
48 changes: 21 additions & 27 deletions dbms/src/Storages/DeltaMerge/DeltaTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -732,34 +732,30 @@ class DTCompactedEntries
public:
struct Entry
{
friend class DTCompactedEntries<M, F, S>;

public:
Entry(UInt64 sid_, bool is_insert_, UInt32 count_, UInt64 value_)
: sid(sid_)
, is_insert(is_insert_)
, count(count_)
, value(value_)
{}

UInt64 getSid() const { return sid; }
bool isInsert() const { return is_insert; }
bool isDelete() const { return !is_insert; }
UInt32 getCount() const { return count; }
UInt64 getValue() const { return value; }

private:
UInt64 sid;
bool is_insert;
UInt32 count;
UInt64 value;
};
using Entries = std::vector<Entry>;

struct Iterator
{
typename Entries::iterator it;

explicit Iterator(typename Entries::iterator it_)
: it(it_)
{}
bool operator==(const Iterator & rhs) const { return it == rhs.it; }
bool operator!=(const Iterator & rhs) const { return it != rhs.it; }
Iterator & operator++()
{
++it;
return *this;
}

UInt64 getSid() const { return it->sid; }
bool isInsert() const { return it->is_insert; }
bool isDelete() const { return !it->is_insert; }
UInt32 getCount() const { return it->count; }
UInt64 getValue() const { return it->value; }
};
using Iterator = typename Entries::iterator;

private:
Entries entries;
Expand All @@ -783,14 +779,12 @@ class DTCompactedEntries
continue;
}
}
Entry entry
= {.sid = it.getSid(), .is_insert = it.isInsert(), .count = it.getCount(), .value = it.getValue()};
entries.emplace_back(entry);
entries.emplace_back(it.getSid(), it.isInsert(), it.getCount(), it.getValue());
}
}

auto begin() { return Iterator(entries.begin()); }
auto end() { return Iterator(entries.end()); }
auto begin() { return entries.begin(); }
auto end() { return entries.end(); }
};

template <class ValueSpace, size_t M, size_t F, size_t S, typename Allocator>
Expand Down
155 changes: 154 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ void DMFilePackFilter::tryLoadIndex(RSCheckParam & param, ColId col_id)
}

std::pair<std::vector<DMFilePackFilter::Range>, DMFilePackFilterResults> DMFilePackFilter::
getSkippedRangeAndFilterForBitmap(
getSkippedRangeAndFilterForBitmapStableOnly(
const DMContext & dm_context,
const DMFiles & dmfiles,
const DMFilePackFilterResults & pack_filter_results,
Expand All @@ -318,6 +318,7 @@ std::pair<std::vector<DMFilePackFilter::Range>, DMFilePackFilterResults> DMFileP
// We need to read these packs and do RowKey filter and MVCC filter for them.
DMFilePackFilterResults new_pack_filter_results;
new_pack_filter_results.reserve(dmfiles.size());
RUNTIME_CHECK(pack_filter_results.size() == dmfiles.size());

// The offset of the first row in the current range.
size_t offset = 0;
Expand Down Expand Up @@ -380,4 +381,156 @@ std::pair<std::vector<DMFilePackFilter::Range>, DMFilePackFilterResults> DMFileP
return {skipped_ranges, new_pack_filter_results};
}

std::tuple<std::vector<DMFilePackFilter::Range>, std::vector<DMFilePackFilter::Range>, DMFilePackFilterResults> DMFilePackFilter::
getSkippedRangeAndFilterForBitmapNormal(
const DMContext & dm_context,
const DMFiles & dmfiles,
const DMFilePackFilterResults & pack_filter_results,
UInt64 start_ts,
const DeltaIndexIterator & delta_index_begin,
const DeltaIndexIterator & delta_index_end)
{
// Packs that all rows compliant with MVCC filter and RowKey filter requirements.
// For building bitmap filter, we don't need to read these packs,
// just set corresponding positions in the bitmap to true.
// So we record the offset and rows of these packs and merge continuous ranges.
std::vector<Range> skipped_ranges;
// Packs that are deleted by delete_range.
// Need to set the corresponding position in the bitmap to false.
std::vector<Range> skipped_del_ranges;
// Packs that some rows compliant with MVCC filter and RowKey filter requirements.
// We need to read these packs and do RowKey filter and MVCC filter for them.
DMFilePackFilterResults new_pack_filter_results;
new_pack_filter_results.reserve(dmfiles.size());
RUNTIME_CHECK(pack_filter_results.size() == dmfiles.size());

// The offset of the first row in the current range.
size_t range_offset = 0;
// The number of rows in the current range.
size_t range_rows = 0;

UInt32 prev_offset = 0;
UInt32 current_offset = 0;
UInt64 prev_sid = 0;
UInt64 sid = 0;
UInt32 prev_delete_count = 0;

auto delta_index_it = delta_index_begin;
auto file_provider = dm_context.global_context.getFileProvider();
for (size_t i = 0; i < dmfiles.size(); ++i)
{
const auto & dmfile = dmfiles[i];
const auto & pack_filter = pack_filter_results[i];
const auto & pack_res = pack_filter->getPackRes();
const auto & handle_res = pack_filter->getHandleRes();
const auto & pack_stats = dmfile->getPackStats();
DMFilePackFilterResultPtr new_pack_filter;
for (size_t pack_id = 0; pack_id < pack_stats.size(); ++pack_id)
{
const auto & pack_stat = pack_stats[pack_id];
prev_offset = current_offset;
current_offset += pack_stat.rows;
if (!pack_res[pack_id].isUse())
continue;

// Find the first delta_index_it whose sid > prev_offset
auto new_it = std::upper_bound(
delta_index_it,
delta_index_end,
prev_offset,
[](UInt32 val, const DeltaIndexCompacted::Entry & e) { return val < e.getSid(); });
if (new_it != delta_index_it)
{
auto prev_it = std::prev(new_it);
prev_sid = prev_it->getSid();
prev_delete_count = prev_it->isDelete() ? prev_it->getCount() : 0;
delta_index_it = new_it;
}
sid = delta_index_it != delta_index_end ? delta_index_it->getSid() : std::numeric_limits<UInt64>::max();
// Since delta_index_it is the first element with sid > prev_offset,
// the preceding element’s sid (prev_sid) must be <= prev_offset.
RUNTIME_CHECK(prev_offset >= prev_sid);
// Note: If `prev_offset == prev_sid`, the RowKey of the delta row preceding `prev_sid`
// must be smaller than the RowKey of `prev_sid`. This is because for the same RowKey,
// the version in the delta data will always be greater than the version in the stable data.
// Therefore, the RowKey in the preceding row will definitely be smaller than the RowKey of `prev_sid`.

// The sid range of the pack: (prev_offset, current_offset].
// The continuously sorted sid range in delta index: (prev_sid, sid].

// Now check the right boundary of this pack(i.e. current_offset)
if (current_offset >= sid)
{
// If `current_offset > sid`, it means some data in pack exceeds the right boundary of
// (prev_sid, sid] so this pack can not be skipped.
//
// If `current_offset == sid`, the delta row following this sid row might have the same
// RowKey. The pack also can not be skipped because delta merge and MVCC filter is necessary.
// TODO: It might be possible to use a minmax index to compare the RowKey of the
// current sid row with the RowKey of the following delta row.
continue;
}

if (prev_delete_count > 0)
{
// The previous delta index iterator is a delete, we must check if the sid range of the
// pack intersects with the delete range.
// The sid range of the pack: (prev_offset, current_offset].
// The delete sid range: (prev_sid, prev_sid + prev_delete_count].
if (current_offset <= prev_sid + prev_delete_count)
{
// The sid range of the pack is fully covered by the delete sid range, it means that
// every row in this pack has been deleted. In this case, the pack can be safely skipped.
skipped_del_ranges.emplace_back(prev_offset, pack_stat.rows);
continue;
}
if (prev_offset < prev_sid + prev_delete_count)
{
// Some rows in the pack are deleted while others are not, it means the pack cannot
// be skipped.
continue;
}
// None of the rows in the pack have been deleted
}

// Check other conditions that may allow the pack to be skipped
if (handle_res[pack_id] == RSResult::Some || pack_stat.not_clean > 0
|| pack_filter->getMaxVersion(dmfile, pack_id, file_provider, dm_context.scan_context) > start_ts)
{
// `not_clean > 0` means there are more than one version for some rowkeys in this pack
// `pack.max_version > start_ts` means some rows will be filtered by MVCC reading
// We need to read this pack to do delte merge, RowKey or MVCC filter.
continue;
}

if unlikely (!new_pack_filter)
new_pack_filter = std::make_shared<DMFilePackFilterResult>(*pack_filter);

// This pack is skipped by the skipped_range, do not need to read the rows from disk
new_pack_filter->pack_res[pack_id] = RSResult::None;
// When this pack is next to the previous pack, we merge them.
// Otherwise, we record the previous continuous packs and start a new one.
if (range_offset + range_rows == prev_offset)
{
range_rows += pack_stat.rows;
}
else
{
skipped_ranges.emplace_back(range_offset, range_rows);
range_offset = prev_offset;
range_rows = pack_stat.rows;
}
}

if (new_pack_filter)
new_pack_filter_results.emplace_back(std::move(new_pack_filter));
else
new_pack_filter_results.emplace_back(pack_filter);
}
if (range_rows > 0)
skipped_ranges.emplace_back(range_offset, range_rows);

return {skipped_ranges, skipped_del_ranges, new_pack_filter_results};
}

} // namespace DB::DM
23 changes: 22 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/Delta/DeltaValueSpace.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/File/DMFilePackFilterResult.h>
#include <Storages/DeltaMerge/File/DMFilePackFilter_fwd.h>
Expand Down Expand Up @@ -121,12 +122,32 @@ class DMFilePackFilter
* - NewPackFilterResults: Those packs should be read from disk and go through the
* RowKey filter and MVCC filter
*/
static std::pair<std::vector<Range>, DMFilePackFilterResults> getSkippedRangeAndFilterForBitmap(
static std::pair<std::vector<Range>, DMFilePackFilterResults> getSkippedRangeAndFilterForBitmapStableOnly(
const DMContext & dm_context,
const DMFiles & dmfiles,
const DMFilePackFilterResults & pack_filter_results,
UInt64 start_ts);

/**
* @brief For all the packs in `pack_filter_results`, if all the rows in the pack
* comply with RowKey filter and MVCC filter (by `start_ts`) requirements,
* and are continuously sorted in delta index, or are deleted, then we skip
* reading the packs from disk and return the skipped ranges, skipped delete
* ranges, and new PackFilterResults for building bitmap.
* @return <SkippedRanges, SkippedDelRanges, NewPackFilterResults>
* - SkippedRanges: All the rows in the ranges that comply with the requirements.
* - SkippedDelRanges: All the rows in the ranges that are deleted.
* - NewPackFilterResults: Those packs should be read from disk and go through
* the delta merge, RowKey filter, and MVCC filter.
*/
static std::tuple<std::vector<Range>, std::vector<Range>, DMFilePackFilterResults> getSkippedRangeAndFilterForBitmapNormal(
const DMContext & dm_context,
const DMFiles & dmfiles,
const DMFilePackFilterResults & pack_filter_results,
UInt64 start_ts,
const DeltaIndexIterator & delta_index_begin,
const DeltaIndexIterator & delta_index_end);

private:
DMFilePackFilter(
const DMFilePtr & dmfile_,
Expand Down
Loading