Skip to content

Commit 3641c0f

Browse files
Lloyd-Pottigerti-chi-bot[bot]JinheLinJaySon-Huang
authored
Storages: Optimize vector search in scenarios with updates (pingcap#316)
Signed-off-by: Lloyd-Pottiger <[email protected]> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Co-authored-by: jinhelin <[email protected]> Co-authored-by: JaySon <[email protected]>
1 parent 9e44d4e commit 3641c0f

File tree

84 files changed

+3497
-1395
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+3497
-1395
lines changed

Diff for: dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp

-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
#include <Common/Exception.h>
1616
#include <DataStreams/PushingToViewsBlockOutputStream.h>
17-
#include <DataStreams/SquashingBlockInputStream.h>
1817
#include <Interpreters/Context.h>
1918
#include <Interpreters/InterpreterSelectQuery.h>
2019
#include <Storages/IStorage.h>

Diff for: dbms/src/DataStreams/SquashingBlockInputStream.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
namespace DB
1818
{
19+
1920
SquashingBlockInputStream::SquashingBlockInputStream(
2021
const BlockInputStreamPtr & src,
2122
size_t min_block_size_rows,
@@ -28,7 +29,7 @@ SquashingBlockInputStream::SquashingBlockInputStream(
2829
}
2930

3031

31-
Block SquashingBlockInputStream::readImpl()
32+
Block SquashingBlockInputStream::read()
3233
{
3334
if (all_read)
3435
return {};

Diff for: dbms/src/DataStreams/SquashingBlockInputStream.h

+3-4
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414

1515
#pragma once
1616

17-
#include <DataStreams/IProfilingBlockInputStream.h>
17+
#include <DataStreams/IBlockInputStream.h>
1818
#include <DataStreams/SquashingTransform.h>
1919

2020

2121
namespace DB
2222
{
2323
/** Merging consecutive blocks of stream to specified minimum size.
2424
*/
25-
class SquashingBlockInputStream : public IProfilingBlockInputStream
25+
class SquashingBlockInputStream : public IBlockInputStream
2626
{
2727
static constexpr auto NAME = "Squashing";
2828

@@ -37,8 +37,7 @@ class SquashingBlockInputStream : public IProfilingBlockInputStream
3737

3838
Block getHeader() const override { return children.at(0)->getHeader(); }
3939

40-
protected:
41-
Block readImpl() override;
40+
Block read() override;
4241

4342
private:
4443
const LoggerPtr log;

Diff for: dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h

+6-3
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,15 @@ class BitmapFilterView
4646
return BitmapFilterView(std::make_shared<BitmapFilter>(size, default_value), 0, size);
4747
}
4848

49-
inline bool get(UInt32 n) const
49+
BitmapFilterView createSubView(UInt32 offset, UInt32 size) const
5050
{
51-
RUNTIME_CHECK(n < filter_size);
52-
return filter->get(filter_offset + n);
51+
RUNTIME_CHECK(offset + size <= filter_size, offset, size, filter_size);
52+
return BitmapFilterView(filter, filter_offset + offset, size);
5353
}
5454

55+
// Caller should ensure n in [0, size).
56+
inline bool get(UInt32 n) const { return filter->get(filter_offset + n); }
57+
5558
inline bool operator[](UInt32 n) const { return get(n); }
5659

5760
inline UInt32 size() const { return filter_size; }

Diff for: dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h

+9-10
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ class ColumnFile
6565
: id(++MAX_COLUMN_FILE_ID)
6666
{}
6767

68+
public:
6869
virtual ~ColumnFile() = default;
6970

70-
public:
7171
enum Type : UInt32
7272
{
7373
DELETE_RANGE = 1,
@@ -96,8 +96,8 @@ class ColumnFile
9696
UInt64 getId() const { return id; }
9797

9898
virtual size_t getRows() const { return 0; }
99-
virtual size_t getBytes() const { return 0; };
100-
virtual size_t getDeletes() const { return 0; };
99+
virtual size_t getBytes() const { return 0; }
100+
virtual size_t getDeletes() const { return 0; }
101101

102102
virtual Type getType() const = 0;
103103

@@ -106,11 +106,11 @@ class ColumnFile
106106
/// Is a ColumnFileTiny or not.
107107
bool isTinyFile() const { return getType() == Type::TINY_FILE; }
108108
/// Is a ColumnFileDeleteRange or not.
109-
bool isDeleteRange() const { return getType() == Type::DELETE_RANGE; };
109+
bool isDeleteRange() const { return getType() == Type::DELETE_RANGE; }
110110
/// Is a ColumnFileBig or not.
111-
bool isBigFile() const { return getType() == Type::BIG_FILE; };
111+
bool isBigFile() const { return getType() == Type::BIG_FILE; }
112112
/// Is a ColumnFilePersisted or not
113-
bool isPersisted() const { return getType() != Type::INMEMORY_FILE; };
113+
bool isPersisted() const { return getType() != Type::INMEMORY_FILE; }
114114

115115
/**
116116
* Whether this column file SEEMS TO BE flushed from another.
@@ -132,7 +132,8 @@ class ColumnFile
132132
virtual ColumnFileReaderPtr getReader(
133133
const DMContext & context,
134134
const IColumnFileDataProviderPtr & data_provider,
135-
const ColumnDefinesPtr & col_defs) const
135+
const ColumnDefinesPtr & col_defs,
136+
ReadTag read_tag) const
136137
= 0;
137138

138139
/// Note: Only ColumnFileInMemory can be appendable. Other ColumnFiles (i.e. ColumnFilePersisted) have
@@ -179,9 +180,7 @@ class ColumnFileReader
179180
virtual size_t skipNextBlock() { throw Exception("Unsupported operation", ErrorCodes::LOGICAL_ERROR); }
180181

181182
/// Create a new reader from current reader with different columns to read.
182-
virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs) = 0;
183-
184-
virtual void setReadTag(ReadTag /*read_tag*/) {}
183+
virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs, ReadTag read_tag) = 0;
185184
};
186185

187186
std::pair<size_t, size_t> copyColumnsData(

Diff for: dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp

+5-11
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ void ColumnFileBig::removeData(WriteBatches & wbs) const
6666
ColumnFileReaderPtr ColumnFileBig::getReader(
6767
const DMContext & dm_context,
6868
const IColumnFileDataProviderPtr &,
69-
const ColumnDefinesPtr & col_defs) const
69+
const ColumnDefinesPtr & col_defs,
70+
ReadTag read_tag) const
7071
{
71-
return std::make_shared<ColumnFileBigReader>(dm_context, *this, col_defs);
72+
return std::make_shared<ColumnFileBigReader>(dm_context, *this, col_defs, read_tag);
7273
}
7374

7475
void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) const
@@ -379,17 +380,10 @@ size_t ColumnFileBigReader::skipNextBlock()
379380
}
380381
}
381382

382-
ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
383+
ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
383384
{
384385
// Currently we don't reuse the cache data.
385-
return std::make_shared<ColumnFileBigReader>(dm_context, column_file, new_col_defs);
386-
}
387-
388-
void ColumnFileBigReader::setReadTag(ReadTag read_tag_)
389-
{
390-
// `read_tag` should be set before `file_stream` is initialized.
391-
RUNTIME_CHECK(file_stream == nullptr);
392-
read_tag = read_tag_;
386+
return std::make_shared<ColumnFileBigReader>(dm_context, column_file, new_col_defs, read_tag);
393387
}
394388

395389
} // namespace DB::DM

Diff for: dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h

+7-6
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ class ColumnFileBig : public ColumnFilePersisted
8181
ColumnFileReaderPtr getReader(
8282
const DMContext & dm_context,
8383
const IColumnFileDataProviderPtr & data_provider,
84-
const ColumnDefinesPtr & col_defs) const override;
84+
const ColumnDefinesPtr & col_defs,
85+
ReadTag) const override;
8586

8687
void serializeMetadata(WriteBuffer & buf, bool save_schema) const override;
8788
void serializeMetadata(dtpb::ColumnFilePersisted * cf_pb, bool save_schema) const override;
@@ -145,7 +146,7 @@ class ColumnFileBigReader : public ColumnFileReader
145146
Block cur_block;
146147
Columns cur_block_data; // The references to columns in cur_block, for faster access.
147148

148-
ReadTag read_tag = ReadTag::Internal;
149+
ReadTag read_tag;
149150

150151
private:
151152
void initStream();
@@ -164,10 +165,12 @@ class ColumnFileBigReader : public ColumnFileReader
164165
ColumnFileBigReader(
165166
const DMContext & dm_context_,
166167
const ColumnFileBig & column_file_,
167-
const ColumnDefinesPtr & col_defs_)
168+
const ColumnDefinesPtr & col_defs_,
169+
ReadTag read_tag_)
168170
: dm_context(dm_context_)
169171
, column_file(column_file_)
170172
, col_defs(col_defs_)
173+
, read_tag(read_tag_)
171174
{
172175
if (col_defs_->size() == 1)
173176
{
@@ -199,9 +202,7 @@ class ColumnFileBigReader : public ColumnFileReader
199202

200203
size_t skipNextBlock() override;
201204

202-
ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;
203-
204-
void setReadTag(ReadTag read_tag_) override;
205+
ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag) override;
205206
};
206207

207208
} // namespace DB::DM

Diff for: dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileDeleteRange.cpp

+7-11
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,17 @@
1616
#include <Storages/DeltaMerge/DMContext.h>
1717

1818

19-
namespace DB
20-
{
21-
namespace DM
19+
namespace DB::DM
2220
{
21+
2322
ColumnFileReaderPtr ColumnFileDeleteRange::getReader(
2423
const DMContext &,
2524
const IColumnFileDataProviderPtr &,
26-
const ColumnDefinesPtr &) const
25+
const ColumnDefinesPtr &,
26+
ReadTag) const
2727
{
28-
return std::make_shared<ColumnFileEmptyReader>();
28+
// ColumnFileDeleteRange is not readable.
29+
return nullptr;
2930
}
3031

3132
void ColumnFileDeleteRange::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) const
@@ -50,9 +51,4 @@ ColumnFilePersistedPtr ColumnFileDeleteRange::deserializeMetadata(const dtpb::Co
5051
return std::make_shared<ColumnFileDeleteRange>(RowKeyRange::deserialize(dr_pb.range()));
5152
}
5253

53-
ColumnFileReaderPtr ColumnFileEmptyReader::createNewReader(const ColumnDefinesPtr &)
54-
{
55-
return std::make_shared<ColumnFileEmptyReader>();
56-
}
57-
} // namespace DM
58-
} // namespace DB
54+
} // namespace DB::DM

Diff for: dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileDeleteRange.h

+8-12
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@
1818
#include <Storages/DeltaMerge/Remote/Serializer_fwd.h>
1919

2020

21-
namespace DB
22-
{
23-
namespace DM
21+
namespace DB::DM
2422
{
23+
2524
class ColumnFileDeleteRange;
2625
using ColumnFileDeleteRangePtr = std::shared_ptr<ColumnFileDeleteRange>;
2726

@@ -42,8 +41,11 @@ class ColumnFileDeleteRange : public ColumnFilePersisted
4241
{}
4342
ColumnFileDeleteRange(const ColumnFileDeleteRange &) = default;
4443

45-
ColumnFileReaderPtr getReader(const DMContext &, const IColumnFileDataProviderPtr &, const ColumnDefinesPtr &)
46-
const override;
44+
ColumnFileReaderPtr getReader(
45+
const DMContext &,
46+
const IColumnFileDataProviderPtr &,
47+
const ColumnDefinesPtr &,
48+
ReadTag) const override;
4749

4850
const auto & getDeleteRange() { return delete_range; }
4951

@@ -74,10 +76,4 @@ class ColumnFileDeleteRange : public ColumnFilePersisted
7476
String toString() const override { return "{delete_range:" + delete_range.toString() + "}"; }
7577
};
7678

77-
class ColumnFileEmptyReader : public ColumnFileReader
78-
{
79-
public:
80-
ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr &) override;
81-
};
82-
} // namespace DM
83-
} // namespace DB
79+
} // namespace DB::DM

Diff for: dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ void ColumnFileInMemory::fillColumns(const ColumnDefines & col_defs, size_t col_
5959
ColumnFileReaderPtr ColumnFileInMemory::getReader(
6060
const DMContext &,
6161
const IColumnFileDataProviderPtr &,
62-
const ColumnDefinesPtr & col_defs) const
62+
const ColumnDefinesPtr & col_defs,
63+
ReadTag) const
6364
{
6465
return std::make_shared<ColumnFileInMemoryReader>(*this, col_defs);
6566
}
@@ -155,7 +156,7 @@ size_t ColumnFileInMemoryReader::skipNextBlock()
155156
return memory_file.getRows();
156157
}
157158

158-
ColumnFileReaderPtr ColumnFileInMemoryReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
159+
ColumnFileReaderPtr ColumnFileInMemoryReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag)
159160
{
160161
// Reuse the cache data.
161162
return std::make_shared<ColumnFileInMemoryReader>(memory_file, new_col_defs, cols_data_cache);

Diff for: dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class ColumnFileInMemory : public ColumnFile
6060
Type getType() const override { return Type::INMEMORY_FILE; }
6161

6262
size_t getRows() const override { return rows; }
63-
size_t getBytes() const override { return bytes; };
63+
size_t getBytes() const override { return bytes; }
6464

6565
CachePtr getCache() { return cache; }
6666

@@ -72,7 +72,8 @@ class ColumnFileInMemory : public ColumnFile
7272
ColumnFileReaderPtr getReader(
7373
const DMContext & context,
7474
const IColumnFileDataProviderPtr & data_provider,
75-
const ColumnDefinesPtr & col_defs) const override;
75+
const ColumnDefinesPtr & col_defs,
76+
ReadTag) const override;
7677

7778
bool isAppendable() const override { return !disable_append; }
7879
void disableAppend() override { disable_append = true; }
@@ -133,7 +134,7 @@ class ColumnFileInMemoryReader : public ColumnFileReader
133134

134135
size_t skipNextBlock() override;
135136

136-
ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;
137+
ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag) override;
137138
};
138139

139140
} // namespace DM

0 commit comments

Comments
 (0)