Skip to content

Commit 6c94ed5

Browse files
committed
[FLASH-541] Read ranges indicated by mvcc_query_info (pingcap#267)
* Read ranges indicated by mvcc_query_info * Optimization: Don't do handle range filter inside Segment
1 parent dae2348 commit 6c94ed5

12 files changed

+380
-234
lines changed

dbms/src/Interpreters/InterpreterSelectQuery.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include <Storages/Transaction/SchemaSyncer.h>
3838
#include <Storages/Transaction/TiKVRange.h>
3939
#include <Storages/Transaction/TMTContext.h>
40+
#include <Storages/Transaction/RegionRangeKeys.h>
4041

4142
#include <Storages/IStorage.h>
4243
#include <Storages/StorageMergeTree.h>
@@ -793,6 +794,14 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline
793794
const auto & epoch = region.region_epoch();
794795
info.version = epoch.version();
795796
info.conf_version = epoch.conf_ver();
797+
if (const auto & managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage))
798+
{
799+
// Extract the handle range according to current table
800+
TiKVKey start_key = RecordKVFormat::encodeAsTiKVKey(region.start_key());
801+
TiKVKey end_key = RecordKVFormat::encodeAsTiKVKey(region.end_key());
802+
RegionRangeKeys region_range(std::move(start_key), std::move(end_key));
803+
info.range_in_table = region_range.getHandleRangeByTable(managed_storage->getTableInfo().id);
804+
}
796805
query_info.mvcc_query_info->regions_query_info.push_back(info);
797806
}
798807

dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h

+18-129
Original file line numberDiff line numberDiff line change
@@ -10,163 +10,52 @@ namespace DB
1010
{
1111
namespace DM
1212
{
13-
/// Read `chunks` as blocks.
14-
/// We can use `handle_range` param to filter out rows, and use `filter` to ignore some chunks roughly.
15-
///
16-
/// Note that `handle_range` param assumes that data in chunks are in order of handle. If not, please use handle range of {MIN, MAX}.
17-
///
18-
/// For example:
19-
/// size_t skip_rows = 0;
20-
/// while(stream.hasNext())
21-
/// {
22-
/// if(stream.shouldSkipNext())
23-
/// {
24-
/// skip_rows += stream.nextRows();
25-
/// stream.skipNext();
26-
/// continue;
27-
/// }
28-
/// auto block = stream.read();
29-
/// ...
30-
/// }
3113
class ChunkBlockInputStream final : public IBlockInputStream
3214
{
3315
public:
3416
ChunkBlockInputStream(const Chunks & chunks_,
35-
size_t handle_col_pos_,
36-
const HandleRange & handle_range_,
3717
const ColumnDefines & read_columns_,
3818
const PageReader & page_reader_,
39-
const RSOperatorPtr & filter_)
40-
: chunks(chunks_),
41-
handle_col_pos(handle_col_pos_),
42-
handle_range(handle_range_),
43-
read_columns(read_columns_),
44-
page_reader(page_reader_),
45-
filter(filter_)
19+
const RSOperatorPtr & filter)
20+
: chunks(chunks_), skip_chunks(chunks.size(), 0), read_columns(read_columns_), page_reader(page_reader_)
4621
{
47-
}
48-
49-
String getName() const override { return "Chunk"; }
50-
Block getHeader() const override { return toEmptyBlock(read_columns); }
51-
52-
Block read() override
53-
{
54-
if (!hasNext())
55-
return {};
56-
Block tmp;
57-
if (!cur_chunk_data)
58-
// It means user ignore the skipNext() result and insist to read data.
59-
tmp = readCurChunkData();
60-
else
61-
tmp.swap(cur_chunk_data);
62-
63-
++cur_chunk_index;
64-
cur_chunk_skip = false;
65-
66-
return tmp;
67-
}
68-
69-
bool hasNext()
70-
{
71-
if (cur_chunk_index >= chunks.size())
72-
return false;
73-
// Filter out those rows not fit for handle_range.
74-
for (; cur_chunk_index < chunks.size(); ++cur_chunk_index)
75-
{
76-
auto [first, last] = chunks[cur_chunk_index].getHandleFirstLast();
77-
if (handle_range.intersect(first, last))
78-
break;
79-
}
80-
81-
if (cur_chunk_index >= chunks.size())
82-
return false;
83-
84-
if (!cur_chunk_data)
22+
if (filter)
8523
{
86-
if (filter)
24+
for (size_t i = 0; i < chunks.size(); ++i)
8725
{
88-
auto & chunk = chunks[cur_chunk_index];
26+
auto & chunk = chunks[i];
8927
RSCheckParam param;
9028
for (auto & [col_id, meta] : chunk.getMetas())
9129
param.indexes.emplace(col_id, RSIndex(meta.type, meta.minmax));
92-
93-
cur_chunk_skip = filter->roughCheck(param) == None;
94-
}
95-
if (!cur_chunk_skip)
96-
{
97-
cur_chunk_data = readCurChunkData();
30+
skip_chunks[i] = filter->roughCheck(param) == None;
9831
}
9932
}
100-
101-
return true;
102-
}
103-
104-
size_t nextRows()
105-
{
106-
auto & chunk = chunks[cur_chunk_index];
107-
if (isCurChunkCompleted(chunk))
108-
return chunk.getRows();
109-
110-
// Otherwise, some rows of current chunk are filtered out by handle_range.
111-
112-
if (cur_chunk_data)
113-
{
114-
return cur_chunk_data.rows();
115-
}
116-
else
117-
{
118-
// Current chunk is ignored by `filter`,
119-
// but we still need to get the row count which their handles are included by handle_range.
120-
auto block = readChunk(chunk, {read_columns[handle_col_pos]}, page_reader);
121-
auto offset_limit
122-
= HandleFilter::getPosRangeOfSorted(handle_range, block.getByPosition(handle_col_pos).column, 0, block.rows());
123-
return offset_limit.second;
124-
}
12533
}
12634

127-
bool shouldSkipNext() { return cur_chunk_skip; }
35+
String getName() const override { return "Chunk"; }
36+
Block getHeader() const override { return toEmptyBlock(read_columns); }
12837

129-
void skipNext()
38+
Block read() override
13039
{
131-
++cur_chunk_index;
132-
133-
cur_chunk_data = {};
134-
cur_chunk_skip = false;
40+
if (!hasNext())
41+
return {};
42+
return readChunk(chunks[cur_chunk_index++], read_columns, page_reader);
13543
}
13644

137-
private:
138-
inline bool isCurChunkCompleted(const Chunk & chunk)
139-
{
140-
auto [first, last] = chunk.getHandleFirstLast();
141-
return handle_range.include(first, last);
142-
}
45+
bool hasNext() { return cur_chunk_index < chunks.size(); }
46+
size_t nextRows() { return chunks[cur_chunk_index].getRows(); }
14347

144-
inline Block readCurChunkData()
145-
{
146-
auto & chunk = chunks[cur_chunk_index];
147-
if (isCurChunkCompleted(chunk))
148-
{
149-
return readChunk(chunk, read_columns, page_reader);
150-
}
151-
else
152-
{
153-
auto block = readChunk(chunk, read_columns, page_reader);
154-
return HandleFilter::filterSorted(handle_range, std::move(block), handle_col_pos);
155-
}
156-
}
48+
bool shouldSkipNext() { return skip_chunks[cur_chunk_index]; }
49+
void skipNext() { ++cur_chunk_index; }
15750

15851
private:
159-
Chunks chunks;
160-
size_t handle_col_pos;
161-
HandleRange handle_range;
52+
Chunks chunks;
53+
std::vector<UInt8> skip_chunks;
16254

16355
ColumnDefines read_columns;
16456
PageReader page_reader;
165-
RSOperatorPtr filter;
16657

16758
size_t cur_chunk_index = 0;
168-
bool cur_chunk_skip = false;
169-
Block cur_chunk_data;
17059
};
17160

17261
using ChunkBlockInputStreamPtr = std::shared_ptr<ChunkBlockInputStream>;

dbms/src/Storages/DeltaMerge/DeltaMerge.h

+5-21
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
2828
using DeltaValueSpacePtr = std::shared_ptr<DeltaValueSpace>;
2929
using SharedLock = std::shared_lock<std::shared_mutex>;
3030

31-
size_t handle_column_pos;
32-
HandleRange handle_range;
33-
3431
ChunkBlockInputStreamPtr stable_input_stream;
3532
ChunkBlockInputStream * stable_input_stream_raw_ptr;
3633

@@ -59,16 +56,12 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
5956
bool delta_done = false;
6057

6158
public:
62-
DeltaMergeBlockInputStream(size_t handle_column_pos_,
63-
const HandleRange & handle_range_,
64-
const ChunkBlockInputStreamPtr & stable_input_stream_,
59+
DeltaMergeBlockInputStream(const ChunkBlockInputStreamPtr & stable_input_stream_,
6560
const DeltaValueSpacePtr & delta_value_space_,
6661
IndexIterator index_begin,
6762
IndexIterator index_end,
6863
size_t max_block_size_)
69-
: handle_column_pos(handle_column_pos_),
70-
handle_range(handle_range_),
71-
stable_input_stream(stable_input_stream_),
64+
: stable_input_stream(stable_input_stream_),
7265
stable_input_stream_raw_ptr(stable_input_stream.get()),
7366
delta_value_space(delta_value_space_),
7467
entry_it(index_begin),
@@ -125,13 +118,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
125118
if (limit == max_block_size)
126119
continue;
127120

128-
Block block = header.cloneWithColumns(std::move(columns));
129-
130-
Block res = HandleFilter::filterSorted(handle_range, std::move(block), handle_column_pos);
131-
if (!res || !res.rows())
132-
continue;
133-
else
134-
return res;
121+
return header.cloneWithColumns(std::move(columns));
135122
}
136123
return {};
137124
}
@@ -160,11 +147,8 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
160147
writeDeleteFromDelta(1);
161148
break;
162149
case DT_INS:
163-
if (handle_range.check(delta_value_space->getHandle(tuple_id)))
164-
{
165-
writeInsertFromDelta(output_columns, tuple_id);
166-
--output_write_limit;
167-
}
150+
writeInsertFromDelta(output_columns, tuple_id);
151+
--output_write_limit;
168152
break;
169153
default:
170154
throw Exception("Entry type " + DTTypeString(entry_it.getType()) + " is not supported, is end: "

dbms/src/Storages/DeltaMerge/DiskValueSpace.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ DiskValueSpacePtr DiskValueSpace::doFlushCache(const OpContext & context, WriteB
630630

631631
ChunkBlockInputStreamPtr DiskValueSpace::getInputStream(const ColumnDefines & read_columns, const PageReader & page_reader) const
632632
{
633-
return std::make_shared<ChunkBlockInputStream>(chunks, 0, HandleRange::newAll(), read_columns, page_reader, RSOperatorPtr());
633+
return std::make_shared<ChunkBlockInputStream>(chunks, read_columns, page_reader, RSOperatorPtr());
634634
}
635635

636636
size_t DiskValueSpace::num_rows() const

dbms/src/Storages/DeltaMerge/HandleFilter.h

+3-4
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,14 @@ inline Block filterUnsorted(const HandleRange & handle_range, Block && block, si
8383
}
8484
} // namespace HandleFilter
8585

86+
template <bool is_block_sorted>
8687
class DMHandleFilterBlockInputStream : public IProfilingBlockInputStream
8788
{
8889
public:
8990
DMHandleFilterBlockInputStream(const BlockInputStreamPtr & input,
9091
HandleRange handle_range_,
91-
size_t handle_col_pos_,
92-
bool is_block_sorted_)
93-
: handle_range(handle_range_), handle_col_pos(handle_col_pos_), is_block_sorted(is_block_sorted_)
92+
size_t handle_col_pos_)
93+
: handle_range(handle_range_), handle_col_pos(handle_col_pos_)
9494
{
9595
children.push_back(input);
9696
}
@@ -120,7 +120,6 @@ class DMHandleFilterBlockInputStream : public IProfilingBlockInputStream
120120
private:
121121
HandleRange handle_range;
122122
size_t handle_col_pos;
123-
bool is_block_sorted;
124123
};
125124

126125
} // namespace DM

0 commit comments

Comments
 (0)