@@ -10,163 +10,52 @@ namespace DB
10
10
{
11
11
namespace DM
12
12
{
13
- // / Read `chunks` as blocks.
14
- // / We can use `handle_range` param to filter out rows, and use `filter` to ignore some chunks roughly.
15
- // /
16
- // / Note that `handle_range` param assumes that data in chunks are in order of handle. If not, please use handle range of {MIN, MAX}.
17
- // /
18
- // / For example:
19
- // / size_t skip_rows = 0;
20
- // / while(stream.hasNext())
21
- // / {
22
- // / if(stream.shouldSkipNext())
23
- // / {
24
- // / skip_rows += stream.nextRows();
25
- // / stream.skipNext();
26
- // / continue;
27
- // / }
28
- // / auto block = stream.read();
29
- // / ...
30
- // / }
31
13
class ChunkBlockInputStream final : public IBlockInputStream
32
14
{
33
15
public:
34
16
ChunkBlockInputStream (const Chunks & chunks_,
35
- size_t handle_col_pos_,
36
- const HandleRange & handle_range_,
37
17
const ColumnDefines & read_columns_,
38
18
const PageReader & page_reader_,
39
- const RSOperatorPtr & filter_)
40
- : chunks(chunks_),
41
- handle_col_pos (handle_col_pos_),
42
- handle_range(handle_range_),
43
- read_columns(read_columns_),
44
- page_reader(page_reader_),
45
- filter(filter_)
19
+ const RSOperatorPtr & filter)
20
+ : chunks(chunks_), skip_chunks(chunks.size(), 0 ), read_columns(read_columns_), page_reader(page_reader_)
46
21
{
47
- }
48
-
49
- String getName () const override { return " Chunk" ; }
50
- Block getHeader () const override { return toEmptyBlock (read_columns); }
51
-
52
- Block read () override
53
- {
54
- if (!hasNext ())
55
- return {};
56
- Block tmp;
57
- if (!cur_chunk_data)
58
- // It means user ignore the skipNext() result and insist to read data.
59
- tmp = readCurChunkData ();
60
- else
61
- tmp.swap (cur_chunk_data);
62
-
63
- ++cur_chunk_index;
64
- cur_chunk_skip = false ;
65
-
66
- return tmp;
67
- }
68
-
69
- bool hasNext ()
70
- {
71
- if (cur_chunk_index >= chunks.size ())
72
- return false ;
73
- // Filter out those rows not fit for handle_range.
74
- for (; cur_chunk_index < chunks.size (); ++cur_chunk_index)
75
- {
76
- auto [first, last] = chunks[cur_chunk_index].getHandleFirstLast ();
77
- if (handle_range.intersect (first, last))
78
- break ;
79
- }
80
-
81
- if (cur_chunk_index >= chunks.size ())
82
- return false ;
83
-
84
- if (!cur_chunk_data)
22
+ if (filter)
85
23
{
86
- if (filter )
24
+ for ( size_t i = 0 ; i < chunks. size (); ++i )
87
25
{
88
- auto & chunk = chunks[cur_chunk_index ];
26
+ auto & chunk = chunks[i ];
89
27
RSCheckParam param;
90
28
for (auto & [col_id, meta] : chunk.getMetas ())
91
29
param.indexes .emplace (col_id, RSIndex (meta.type , meta.minmax ));
92
-
93
- cur_chunk_skip = filter->roughCheck (param) == None;
94
- }
95
- if (!cur_chunk_skip)
96
- {
97
- cur_chunk_data = readCurChunkData ();
30
+ skip_chunks[i] = filter->roughCheck (param) == None;
98
31
}
99
32
}
100
-
101
- return true ;
102
- }
103
-
104
- size_t nextRows ()
105
- {
106
- auto & chunk = chunks[cur_chunk_index];
107
- if (isCurChunkCompleted (chunk))
108
- return chunk.getRows ();
109
-
110
- // Otherwise, some rows of current chunk are filtered out by handle_range.
111
-
112
- if (cur_chunk_data)
113
- {
114
- return cur_chunk_data.rows ();
115
- }
116
- else
117
- {
118
- // Current chunk is ignored by `filter`,
119
- // but we still need to get the row count which their handles are included by handle_range.
120
- auto block = readChunk (chunk, {read_columns[handle_col_pos]}, page_reader);
121
- auto offset_limit
122
- = HandleFilter::getPosRangeOfSorted (handle_range, block.getByPosition (handle_col_pos).column , 0 , block.rows ());
123
- return offset_limit.second ;
124
- }
125
33
}
126
34
127
- bool shouldSkipNext () { return cur_chunk_skip; }
35
+ String getName () const override { return " Chunk" ; }
36
+ Block getHeader () const override { return toEmptyBlock (read_columns); }
128
37
129
- void skipNext ()
38
+ Block read () override
130
39
{
131
- ++cur_chunk_index;
132
-
133
- cur_chunk_data = {};
134
- cur_chunk_skip = false ;
40
+ if (!hasNext ())
41
+ return {};
42
+ return readChunk (chunks[cur_chunk_index++], read_columns, page_reader);
135
43
}
136
44
137
- private:
138
- inline bool isCurChunkCompleted (const Chunk & chunk)
139
- {
140
- auto [first, last] = chunk.getHandleFirstLast ();
141
- return handle_range.include (first, last);
142
- }
45
+ bool hasNext () { return cur_chunk_index < chunks.size (); }
46
+ size_t nextRows () { return chunks[cur_chunk_index].getRows (); }
143
47
144
- inline Block readCurChunkData ()
145
- {
146
- auto & chunk = chunks[cur_chunk_index];
147
- if (isCurChunkCompleted (chunk))
148
- {
149
- return readChunk (chunk, read_columns, page_reader);
150
- }
151
- else
152
- {
153
- auto block = readChunk (chunk, read_columns, page_reader);
154
- return HandleFilter::filterSorted (handle_range, std::move (block), handle_col_pos);
155
- }
156
- }
48
+ bool shouldSkipNext () { return skip_chunks[cur_chunk_index]; }
49
+ void skipNext () { ++cur_chunk_index; }
157
50
158
51
private:
159
- Chunks chunks;
160
- size_t handle_col_pos;
161
- HandleRange handle_range;
52
+ Chunks chunks;
53
+ std::vector<UInt8 > skip_chunks;
162
54
163
55
ColumnDefines read_columns;
164
56
PageReader page_reader;
165
- RSOperatorPtr filter;
166
57
167
58
size_t cur_chunk_index = 0 ;
168
- bool cur_chunk_skip = false ;
169
- Block cur_chunk_data;
170
59
};
171
60
172
61
using ChunkBlockInputStreamPtr = std::shared_ptr<ChunkBlockInputStream>;
0 commit comments