Skip to content

Commit bf350c6

Browse files
flowbehappyJaySon-Huang
authored andcommitted
[FLASH-426] Eliminate data copy of Segment split/merge (#258)
* Use ref page to do fast segment split. * Segment merge is done by simply merge the chunks' meta. * Improve the logic of checking delta merge, split and merge, after DeltaMergeStore#write or delateRange. * Optimize split/merge: update meta data only, instead of moving data * fix get valid normal page ids * use ref page for both side when split * Use `std::optional<PageEntry>` instead of `PageEntry *` for PageEntries::find * Fix bug of PageEntriesForDelta::merge * fix bug: PageStorage GC Normal page not found * fix bug: shrink read range while doing DeltaMerge * Add a test case for multiple split and DeltaMerge * fix data error after ref split
1 parent de7ced3 commit bf350c6

26 files changed

+714
-309
lines changed

dbms/src/Storages/DeltaMerge/Chunk.cpp

+35-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ namespace DB
99
{
1010
namespace DM
1111
{
12+
1213
void Chunk::serialize(WriteBuffer & buf) const
1314
{
1415
writeIntBinary(handle_start, buf);
@@ -71,6 +72,39 @@ Chunk Chunk::deserialize(ReadBuffer & buf)
7172
return chunk;
7273
}
7374

75+
Chunk createRefChunk(const Chunk & chunk, const GenPageId & gen_data_page_id, WriteBatch & wb)
76+
{
77+
if (chunk.isDeleteRange())
78+
return Chunk(chunk.getDeleteRange());
79+
80+
auto [handle_first, handle_end] = chunk.getHandleFirstLast();
81+
Chunk ref_chunk(handle_first, handle_end);
82+
for (auto && [col_id, col_meta] : chunk.getMetas())
83+
{
84+
ColumnMeta m;
85+
86+
m.col_id = col_id;
87+
m.page_id = gen_data_page_id();
88+
m.rows = col_meta.rows;
89+
m.bytes = col_meta.bytes;
90+
m.type = col_meta.type;
91+
m.minmax = col_meta.minmax;
92+
93+
wb.putRefPage(m.page_id, col_meta.page_id);
94+
ref_chunk.insert(m);
95+
}
96+
return ref_chunk;
97+
}
98+
99+
Chunks createRefChunks(const Chunks & chunks, const GenPageId & gen_data_page_id, WriteBatch & wb)
100+
{
101+
Chunks ref_chunks;
102+
ref_chunks.reserve(chunks.size());
103+
for (auto & chunk : chunks)
104+
ref_chunks.push_back(createRefChunk(chunk, gen_data_page_id, wb));
105+
return ref_chunks;
106+
}
107+
74108
void serializeChunks(
75109
WriteBuffer & buf, Chunks::const_iterator begin, Chunks ::const_iterator end, const Chunk * extra1, const Chunk * extra2)
76110
{
@@ -241,7 +275,7 @@ void readChunkData(MutableColumns & columns,
241275
ErrorCodes::NOT_IMPLEMENTED);
242276
}
243277

244-
// Read from disk according as chunk meta
278+
// Read from disk according to chunk meta
245279
MutableColumnPtr disk_col = disk_meta.type->createColumn();
246280
deserializeColumn(*disk_col, disk_meta, page, rows_offset + rows_limit);
247281

dbms/src/Storages/DeltaMerge/Chunk.h

+3
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ class Chunk
115115
using Chunks = std::vector<Chunk>;
116116
using GenPageId = std::function<PageId()>;
117117

118+
Chunk createRefChunk(const Chunk & chunk, const GenPageId & gen_data_page_id, WriteBatch & wb);
119+
Chunks createRefChunks(const Chunks & chunks, const GenPageId & gen_data_page_id, WriteBatch & wb);
120+
118121
void serializeChunks(WriteBuffer & buf,
119122
Chunks::const_iterator begin,
120123
Chunks ::const_iterator end,

dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h

+136-27
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,46 @@
44

55
#include <Storages/DeltaMerge/Chunk.h>
66
#include <Storages/DeltaMerge/Filter/RSOperator.h>
7+
#include <Storages/DeltaMerge/HandleFilter.h>
78

89
namespace DB
910
{
1011
namespace DM
1112
{
12-
/// Read `chunks` as blocks according to `read_columns`
13+
/// Read `chunks` as blocks.
14+
/// We can use `handle_range` param to filter out rows, and use `filter` to ignore some chunks roughly.
15+
///
16+
/// Note that `handle_range` param assumes that data in chunks are in order of handle. If not, please use handle range of {MIN, MAX}.
17+
///
18+
/// For example:
19+
/// size_t skip_rows = 0;
20+
/// while(stream.hasNext())
21+
/// {
22+
/// if(stream.shouldSkipNext())
23+
/// {
24+
/// skip_rows += stream.nextRows();
25+
/// stream.skipNext();
26+
/// continue;
27+
/// }
28+
/// auto block = stream.read();
29+
/// ...
30+
/// }
1331
class ChunkBlockInputStream final : public IBlockInputStream
1432
{
1533
public:
1634
ChunkBlockInputStream(const Chunks & chunks_,
17-
const RSOperatorPtr & filter,
35+
size_t handle_col_pos_,
36+
const HandleRange & handle_range_,
1837
const ColumnDefines & read_columns_,
19-
const PageReader & page_reader_)
20-
: chunks(chunks_), skip_chunks(chunks.size()), read_columns(read_columns_), page_reader(page_reader_)
38+
const PageReader & page_reader_,
39+
const RSOperatorPtr & filter_)
40+
: chunks(chunks_),
41+
handle_col_pos(handle_col_pos_),
42+
handle_range(handle_range_),
43+
read_columns(read_columns_),
44+
page_reader(page_reader_),
45+
filter(filter_)
2146
{
22-
for (size_t i = 0; i < chunks.size(); ++i)
23-
{
24-
if (!filter)
25-
{
26-
skip_chunks[i] = 0;
27-
continue;
28-
}
29-
auto & chunk = chunks[i];
30-
RSCheckParam param;
31-
for (auto & [col_id, meta] : chunk.getMetas())
32-
param.indexes.emplace(col_id, RSIndex(meta.type, meta.minmax));
33-
34-
skip_chunks[i] = filter->roughCheck(param) == None;
35-
}
3647
}
3748

3849
String getName() const override { return "Chunk"; }
@@ -42,22 +53,120 @@ class ChunkBlockInputStream final : public IBlockInputStream
4253
{
4354
if (!hasNext())
4455
return {};
45-
return readChunk(chunks[chunk_index++], read_columns, page_reader);
56+
Block tmp;
57+
if (!cur_chunk_data)
58+
// It means user ignore the skipNext() result and insist to read data.
59+
tmp = readCurChunkData();
60+
else
61+
tmp.swap(cur_chunk_data);
62+
63+
++cur_chunk_index;
64+
cur_chunk_skip = false;
65+
66+
return tmp;
67+
}
68+
69+
bool hasNext()
70+
{
71+
if (cur_chunk_index >= chunks.size())
72+
return false;
73+
// Filter out those rows not fit for handle_range.
74+
for (; cur_chunk_index < chunks.size(); ++cur_chunk_index)
75+
{
76+
auto [first, last] = chunks[cur_chunk_index].getHandleFirstLast();
77+
if (handle_range.intersect(first, last))
78+
break;
79+
}
80+
81+
if (cur_chunk_index >= chunks.size())
82+
return false;
83+
84+
if (!cur_chunk_data)
85+
{
86+
if (filter)
87+
{
88+
auto & chunk = chunks[cur_chunk_index];
89+
RSCheckParam param;
90+
for (auto & [col_id, meta] : chunk.getMetas())
91+
param.indexes.emplace(col_id, RSIndex(meta.type, meta.minmax));
92+
93+
cur_chunk_skip = filter->roughCheck(param) == None;
94+
}
95+
if (!cur_chunk_skip)
96+
{
97+
cur_chunk_data = readCurChunkData();
98+
}
99+
}
100+
101+
return true;
46102
}
47103

48-
bool hasNext() { return chunk_index < chunks.size(); }
49-
size_t nextRows() { return chunks[chunk_index].getRows(); }
50-
bool shouldSkipNext() { return skip_chunks[chunk_index]; }
51-
void skipNext() { ++chunk_index; }
104+
size_t nextRows()
105+
{
106+
auto & chunk = chunks[cur_chunk_index];
107+
if (isCurChunkCompleted(chunk))
108+
return chunk.getRows();
109+
110+
// Otherwise, some rows of current chunk are filtered out by handle_range.
111+
112+
if (cur_chunk_data)
113+
{
114+
return cur_chunk_data.rows();
115+
}
116+
else
117+
{
118+
// Current chunk is ignored by `filter`,
119+
// but we still need to get the row count which their handles are included by handle_range.
120+
auto block = readChunk(chunk, {read_columns[handle_col_pos]}, page_reader);
121+
auto offset_limit
122+
= HandleFilter::getPosRangeOfSorted(handle_range, block.getByPosition(handle_col_pos).column, 0, block.rows());
123+
return offset_limit.second;
124+
}
125+
}
126+
127+
bool shouldSkipNext() { return cur_chunk_skip; }
128+
129+
void skipNext()
130+
{
131+
++cur_chunk_index;
132+
133+
cur_chunk_data = {};
134+
cur_chunk_skip = false;
135+
}
52136

53137
private:
54-
Chunks chunks;
55-
std::vector<UInt8> skip_chunks;
138+
inline bool isCurChunkCompleted(const Chunk & chunk)
139+
{
140+
auto [first, last] = chunk.getHandleFirstLast();
141+
return handle_range.include(first, last);
142+
}
143+
144+
inline Block readCurChunkData()
145+
{
146+
auto & chunk = chunks[cur_chunk_index];
147+
if (isCurChunkCompleted(chunk))
148+
{
149+
return readChunk(chunk, read_columns, page_reader);
150+
}
151+
else
152+
{
153+
auto block = readChunk(chunk, read_columns, page_reader);
154+
return HandleFilter::filterSorted(handle_range, std::move(block), handle_col_pos);
155+
}
156+
}
157+
158+
private:
159+
Chunks chunks;
160+
size_t handle_col_pos;
161+
HandleRange handle_range;
56162

57-
size_t chunk_index = 0;
58163
ColumnDefines read_columns;
59164
PageReader page_reader;
60-
Block header;
165+
RSOperatorPtr filter;
166+
167+
size_t cur_chunk_index = 0;
168+
bool cur_chunk_skip = false;
169+
Block cur_chunk_data;
61170
};
62171

63172
using ChunkBlockInputStreamPtr = std::shared_ptr<ChunkBlockInputStream>;

dbms/src/Storages/DeltaMerge/DMVersionFilterBlockInputStream.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ namespace DM
1111

1212
/// Use the latest rows. For rows with the same handle, only take the rows with biggest version and version <= version_limit.
1313
static constexpr int DM_VERSION_FILTER_MODE_MVCC = 0;
14-
/// Remove the outdated rows. For rows with the same handle, take all rows with version >= version_limit. And if all of them are smaller than version_limit, then take the biggest one, if it is not deleted.
14+
/// Remove the outdated rows. For rows with the same handle, take all rows with version >= version_limit.
15+
/// And if all of them are smaller than version_limit, then take the biggest one, if it is not deleted.
1516
static constexpr int DM_VERSION_FILTER_MODE_COMPACT = 1;
1617

1718
template <int MODE>

dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h

+1
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,6 @@ static_assert(static_cast<Int64>(static_cast<UInt64>(MAX_INT64)) == MAX_INT64, "
131131

132132
static constexpr UInt64 DEL_RANGE_POS_MARK = (1ULL << 63);
133133

134+
134135
} // namespace DM
135136
} // namespace DB

dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h

+10-4
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
#include <Interpreters/sortBlock.h>
1414
#include <Storages/ColumnsDescription.h>
1515
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
16-
#include <Storages/Transaction/TiDB.h>
1716
#include <Storages/DeltaMerge/Filter/RSOperator.h>
17+
#include <Storages/Transaction/TiDB.h>
1818

1919
namespace DB
2020
{
@@ -123,7 +123,7 @@ inline PaddedPODArray<T> const * getColumnVectorDataPtr(const Block & block, siz
123123
return toColumnVectorDataPtr<T>(block.getByPosition(pos).column);
124124
}
125125

126-
inline void addColumnToBlock(Block & block, ColId col_id, const String &col_name, const DataTypePtr & col_type, const ColumnPtr & col)
126+
inline void addColumnToBlock(Block & block, ColId col_id, const String & col_name, const DataTypePtr & col_type, const ColumnPtr & col)
127127
{
128128
ColumnWithTypeAndName column(col, col_type, col_name, col_id);
129129
block.insert(std::move(column));
@@ -209,15 +209,21 @@ inline void appendIntoHandleColumn(ColumnVector<Handle>::Container & handle_colu
209209
auto & data_vector = typeid_cast<const ColumnVector<Int32> &>(*data).getData();
210210
APPEND(32, 0xFFFFFFFF, data_vector)
211211
}
212-
else if (checkDataType<DataTypeInt64>(type_ptr) || checkDataType<DataTypeDateTime>(type_ptr))
212+
else if (checkDataType<DataTypeInt64>(type_ptr))
213213
{
214214
auto & data_vector = typeid_cast<const ColumnVector<Int64> &>(*data).getData();
215215
for (size_t i = 0; i < size; ++i)
216216
handle_column[i] |= data_vector[i];
217217
}
218+
else if (checkDataType<DataTypeDateTime>(type_ptr))
219+
{
220+
auto & data_vector = typeid_cast<const ColumnVector<typename DataTypeDateTime::FieldType> &>(*data).getData();
221+
for (size_t i = 0; i < size; ++i)
222+
handle_column[i] |= data_vector[i];
223+
}
218224
else if (checkDataType<DataTypeDate>(type_ptr))
219225
{
220-
auto & data_vector = typeid_cast<const ColumnVector<UInt32> &>(*data).getData();
226+
auto & data_vector = typeid_cast<const ColumnVector<typename DataTypeDate::FieldType> &>(*data).getData();
221227
APPEND(32, 0xFFFFFFFF, data_vector)
222228
}
223229
else

0 commit comments

Comments
 (0)