From 7ecf8dbeaf4422a2a9fd8d6b038e0e4605c195d2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lisen=20=E6=9D=A8?=
Date: Thu, 28 Mar 2024 14:48:28 +0800
Subject: [PATCH] separate different streaming join
---
src/Common/HashMapsTemplate.h | 236 ++++++-----
src/Core/BlockWithShard.h | 17 -
src/Core/DataBlockWithShard.h | 23 ++
src/Core/LightChunk.h | 21 +-
src/Interpreters/ExpressionAnalyzer.cpp | 2 +-
src/Interpreters/Streaming/AsofHashJoin.cpp | 85 ++++
src/Interpreters/Streaming/AsofHashJoin.h | 32 ++
.../Streaming/BidirectionalAllHashJoin.h | 17 +
.../BidirectionalChangelogHashJoin.h | 17 +
.../Streaming/BidirectionalRangeHashJoin.h | 17 +
.../Streaming/ChangelogHashJoin.h | 17 +
.../Streaming/ConcurrentHashJoin.cpp | 242 ++---------
.../Streaming/ConcurrentHashJoin.h | 37 +-
.../Streaming/DynamicEnrichmentHashJoin.h | 377 ++++++++++++++++++
src/Interpreters/Streaming/HashJoin.cpp | 135 ++++++-
src/Interpreters/Streaming/HashJoin.h | 37 +-
src/Interpreters/Streaming/IHashJoin.h | 37 +-
src/Interpreters/Streaming/LatestHashJoin.h | 17 +
src/Interpreters/Streaming/joinData.h | 52 +++
.../tests/gtest_streaming_hash_join.cpp | 55 +--
.../Streaming/Join/AsofJoinTransform.h | 24 ++
.../Join/AsofJoinTransformWithAlignment.h | 23 ++
.../Join/BidirectionalAllJoinTransform.h | 24 ++
...directionalAllJoinTransformWithAlignment.h | 23 ++
.../BidirectionalChangelogJoinTransform.h | 24 ++
...ionalChangelogJoinTransformWithAlignment.h | 23 ++
.../Join/BidirectionalRangeJoinTransform.h | 24 ++
...rectionalRangeJoinTransformWithAlignment.h | 126 ++++++
.../Streaming/Join/ChangelogJoinTransform.h | 24 ++
.../ChangelogJoinTransformWithAlignment.h | 23 ++
.../Streaming/{ => Join}/JoinTransform.cpp | 38 +-
.../Streaming/{ => Join}/JoinTransform.h | 2 +-
.../{ => Join}/JoinTransformWithAlignment.cpp | 76 +---
.../{ => Join}/JoinTransformWithAlignment.h | 6 +-
.../Streaming/Join/LatestJoinTransform.h | 24 ++
.../Join/LatestJoinTransformWithAlignment.h | 23 ++
src/QueryPipeline/QueryPipelineBuilder.cpp | 74 +++-
src/Storages/ExternalStream/Kafka/KafkaSink.h | 2 +-
src/Storages/Streaming/StreamSink.h | 2 +-
39 files changed, 1537 insertions(+), 521 deletions(-)
delete mode 100644 src/Core/BlockWithShard.h
create mode 100644 src/Core/DataBlockWithShard.h
create mode 100644 src/Interpreters/Streaming/AsofHashJoin.cpp
create mode 100644 src/Interpreters/Streaming/AsofHashJoin.h
create mode 100644 src/Interpreters/Streaming/BidirectionalAllHashJoin.h
create mode 100644 src/Interpreters/Streaming/BidirectionalChangelogHashJoin.h
create mode 100644 src/Interpreters/Streaming/BidirectionalRangeHashJoin.h
create mode 100644 src/Interpreters/Streaming/ChangelogHashJoin.h
create mode 100644 src/Interpreters/Streaming/DynamicEnrichmentHashJoin.h
create mode 100644 src/Interpreters/Streaming/LatestHashJoin.h
create mode 100644 src/Processors/Transforms/Streaming/Join/AsofJoinTransform.h
create mode 100644 src/Processors/Transforms/Streaming/Join/AsofJoinTransformWithAlignment.h
create mode 100644 src/Processors/Transforms/Streaming/Join/BidirectionalAllJoinTransform.h
create mode 100644 src/Processors/Transforms/Streaming/Join/BidirectionalAllJoinTransformWithAlignment.h
create mode 100644 src/Processors/Transforms/Streaming/Join/BidirectionalChangelogJoinTransform.h
create mode 100644 src/Processors/Transforms/Streaming/Join/BidirectionalChangelogJoinTransformWithAlignment.h
create mode 100644 src/Processors/Transforms/Streaming/Join/BidirectionalRangeJoinTransform.h
create mode 100644 src/Processors/Transforms/Streaming/Join/BidirectionalRangeJoinTransformWithAlignment.h
create mode 100644 src/Processors/Transforms/Streaming/Join/ChangelogJoinTransform.h
create mode 100644 src/Processors/Transforms/Streaming/Join/ChangelogJoinTransformWithAlignment.h
rename src/Processors/Transforms/Streaming/{ => Join}/JoinTransform.cpp (86%)
rename src/Processors/Transforms/Streaming/{ => Join}/JoinTransform.h (98%)
rename src/Processors/Transforms/Streaming/{ => Join}/JoinTransformWithAlignment.cpp (85%)
rename src/Processors/Transforms/Streaming/{ => Join}/JoinTransformWithAlignment.h (95%)
create mode 100644 src/Processors/Transforms/Streaming/Join/LatestJoinTransform.h
create mode 100644 src/Processors/Transforms/Streaming/Join/LatestJoinTransformWithAlignment.h
diff --git a/src/Common/HashMapsTemplate.h b/src/Common/HashMapsTemplate.h
index 09e1a031935..38f3cc4b9c4 100644
--- a/src/Common/HashMapsTemplate.h
+++ b/src/Common/HashMapsTemplate.h
@@ -12,6 +12,123 @@ namespace DB
class WriteBuffer;
class ReadBuffer;
+/// Different types of keys for maps.
+#define APPLY_FOR_HASH_KEY_VARIANTS(M) \
+ M(key8) \
+ M(key16) \
+ M(key32) \
+ M(key64) \
+ M(key_string) \
+ M(key_fixed_string) \
+ M(keys128) \
+ M(keys256) \
+ M(hashed)
+
+enum class HashType
+{
+#define M(NAME) NAME,
+ APPLY_FOR_HASH_KEY_VARIANTS(M)
+#undef M
+};
+
+template
+using FindResultImpl = ColumnsHashing::columns_hashing_impl::FindResultImpl;
+
+/// Dummy key getter, always find nothing, used for JOIN ON NULL
+template
+class KeyGetterEmpty
+{
+public:
+ struct MappedType
+ {
+ using mapped_type = Mapped;
+ };
+
+ using FindResult = ColumnsHashing::columns_hashing_impl::FindResultImpl;
+
+ KeyGetterEmpty() = default;
+
+ FindResult findKey(MappedType, size_t, const Arena &) { return FindResult(); }
+};
+
+template
+struct KeyGetterForTypeImpl;
+
+template
+struct KeyGetterForTypeImpl
+{
+ using Type = ColumnsHashing::HashMethodOneNumber;
+};
+
+template
+struct KeyGetterForTypeImpl
+{
+ using Type = ColumnsHashing::HashMethodOneNumber;
+};
+
+template
+struct KeyGetterForTypeImpl
+{
+ using Type = ColumnsHashing::HashMethodOneNumber;
+};
+
+template
+struct KeyGetterForTypeImpl
+{
+ using Type = ColumnsHashing::HashMethodOneNumber;
+};
+
+template
+struct KeyGetterForTypeImpl
+{
+ using Type = ColumnsHashing::HashMethodString;
+};
+
+template
+struct KeyGetterForTypeImpl
+{
+ using Type = ColumnsHashing::HashMethodFixedString;
+};
+
+template
+struct KeyGetterForTypeImpl
+{
+ using Type = ColumnsHashing::HashMethodKeysFixed;
+};
+
+template
+struct KeyGetterForTypeImpl
+{
+ using Type = ColumnsHashing::HashMethodKeysFixed;
+};
+
+template
+struct KeyGetterForTypeImpl
+{
+ using Type = ColumnsHashing::HashMethodHashed;
+};
+
+template
+struct KeyGetterForType
+{
+ using Value = typename Data::value_type;
+ using Mapped_t = typename Data::mapped_type;
+ using Mapped = std::conditional_t, const Mapped_t, Mapped_t>;
+ using Type = typename KeyGetterForTypeImpl::Type;
+};
+
+template
+requires (std::is_invocable_v)
+void insertIntoHashMap(
+ Map & map, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, size_t rows, Arena & pool, MappedHandler && mapped_handler)
+{
+ KeyGetter key_getter(key_columns, key_sizes, nullptr);
+ for (size_t i = 0; i < rows; ++i)
+ {
+ auto emplace_result = key_getter.emplaceKey(map, i, pool);
+ mapped_handler(emplace_result.getMapped(), emplace_result.isInserted(), i);
+ }
+}
template
void serializeHashMap(const Map & map, MappedSerializer && mapped_serializer, WriteBuffer & wb)
@@ -83,25 +200,6 @@ void deserializeTwoLevelHashMap(Map & map, MappedDeserializer && mapped_deserial
/// HashMapsTemplate is a taken from HashJoin class and make it standalone
/// and could be shared among different components
-/// Different types of keys for maps.
-#define APPLY_FOR_HASH_KEY_VARIANTS(M) \
- M(key8) \
- M(key16) \
- M(key32) \
- M(key64) \
- M(key_string) \
- M(key_fixed_string) \
- M(keys128) \
- M(keys256) \
- M(hashed)
-
-enum class HashType
-{
-#define M(NAME) NAME,
- APPLY_FOR_HASH_KEY_VARIANTS(M)
-#undef M
-};
-
template
struct ConservativeHashTableGrowerWithPrecalculation : public HashTableGrowerWithPrecalculation
{
@@ -139,6 +237,21 @@ struct HashMapsTemplate
type = which;
}
+ template
+ void insert(const ColumnRawPtrs & key_columns, const Sizes & key_sizes, size_t rows, Arena & pool, MappedHandler && mapped_handler)
+ {
+ switch (type)
+ {
+#define M(NAME) \
+ case HashType::NAME: \
+ using KeyGetter = typename KeyGetterForType>::Type; \
+ insertIntoHashMap(*NAME, key_columns, key_sizes, rows, pool, std::move(mapped_handler)); \
+ break;
+ APPLY_FOR_HASH_KEY_VARIANTS(M)
+#undef M
+ }
+ }
+
size_t getTotalRowCount() const
{
switch (type)
@@ -219,89 +332,4 @@ struct HashMapsTemplate
HashType type;
};
-template
-using FindResultImpl = ColumnsHashing::columns_hashing_impl::FindResultImpl;
-
-/// Dummy key getter, always find nothing, used for JOIN ON NULL
-template
-class KeyGetterEmpty
-{
-public:
- struct MappedType
- {
- using mapped_type = Mapped;
- };
-
- using FindResult = ColumnsHashing::columns_hashing_impl::FindResultImpl;
-
- KeyGetterEmpty() = default;
-
- FindResult findKey(MappedType, size_t, const Arena &) { return FindResult(); }
-};
-
-template
-struct KeyGetterForTypeImpl;
-
-template
-struct KeyGetterForTypeImpl
-{
- using Type = ColumnsHashing::HashMethodOneNumber;
-};
-
-template
-struct KeyGetterForTypeImpl
-{
- using Type = ColumnsHashing::HashMethodOneNumber;
-};
-
-template
-struct KeyGetterForTypeImpl
-{
- using Type = ColumnsHashing::HashMethodOneNumber;
-};
-
-template
-struct KeyGetterForTypeImpl
-{
- using Type = ColumnsHashing::HashMethodOneNumber;
-};
-
-template
-struct KeyGetterForTypeImpl
-{
- using Type = ColumnsHashing::HashMethodString;
-};
-
-template
-struct KeyGetterForTypeImpl
-{
- using Type = ColumnsHashing::HashMethodFixedString;
-};
-
-template
-struct KeyGetterForTypeImpl
-{
- using Type = ColumnsHashing::HashMethodKeysFixed;
-};
-
-template
-struct KeyGetterForTypeImpl
-{
- using Type = ColumnsHashing::HashMethodKeysFixed;
-};
-
-template
-struct KeyGetterForTypeImpl
-{
- using Type = ColumnsHashing::HashMethodHashed;
-};
-
-template
-struct KeyGetterForType
-{
- using Value = typename Data::value_type;
- using Mapped_t = typename Data::mapped_type;
- using Mapped = std::conditional_t, const Mapped_t, Mapped_t>;
- using Type = typename KeyGetterForTypeImpl::Type;
-};
}
diff --git a/src/Core/BlockWithShard.h b/src/Core/BlockWithShard.h
deleted file mode 100644
index c920f47a3d4..00000000000
--- a/src/Core/BlockWithShard.h
+++ /dev/null
@@ -1,17 +0,0 @@
-#pragma once
-
-#include
-
-namespace DB
-{
-struct BlockWithShard
-{
- Block block;
- int32_t shard;
-
- BlockWithShard(Block && block_, int32_t shard_) : block(std::move(block_)), shard(shard_) { }
-};
-
-using BlocksWithShard = std::vector;
-}
-
diff --git a/src/Core/DataBlockWithShard.h b/src/Core/DataBlockWithShard.h
new file mode 100644
index 00000000000..28051e23de7
--- /dev/null
+++ b/src/Core/DataBlockWithShard.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include
+#include
+
+namespace DB
+{
+template
+struct DataBlockWithShard
+{
+ DataBlock block;
+ int32_t shard;
+
+ DataBlockWithShard(DataBlock && block_, int32_t shard_) : block(std::move(block_)), shard(shard_) { }
+};
+
+using BlockWithShard = DataBlockWithShard;
+using BlocksWithShard = std::vector;
+
+using LightChunkWithShard = DataBlockWithShard;
+using LightChunksWithShard = std::vector;
+}
+
diff --git a/src/Core/LightChunk.h b/src/Core/LightChunk.h
index 4f3ef515b3a..5766bf45a51 100644
--- a/src/Core/LightChunk.h
+++ b/src/Core/LightChunk.h
@@ -27,6 +27,9 @@ struct LightChunk
void concat(const LightChunk & other)
{
auto added_rows = other.rows();
+ if (added_rows <= 0)
+ return;
+
assert(columns() == other.columns());
for (size_t c = 0; auto & col : data)
{
@@ -35,9 +38,21 @@ struct LightChunk
}
}
+ LightChunk cloneEmpty() const
+ {
+ LightChunk res;
+ res.data.reserve(data.size());
+
+ for (const auto & elem : data)
+ res.data.emplace_back(elem->cloneEmpty());
+
+ return res;
+ }
+
size_t rows() const noexcept { return data.empty() ? 0 : data[0]->size(); }
size_t columns() const noexcept { return data.size(); }
+ Columns & getColumns() noexcept { return data; }
const Columns & getColumns() const noexcept { return data; }
Columns detachColumns() noexcept { return std::move(data); }
@@ -88,7 +103,9 @@ struct LightChunkWithTimestamp
LightChunkWithTimestamp() = default;
LightChunkWithTimestamp(Columns && data_) : chunk(std::move(data_)) { }
LightChunkWithTimestamp(Chunk && chunk_, Int64 min_ts, Int64 max_ts)
- : chunk(std::move(chunk_)), min_timestamp(min_ts), max_timestamp(max_ts) { }
+ : chunk(std::move(chunk_)), min_timestamp(min_ts), max_timestamp(max_ts)
+ {
+ }
LightChunkWithTimestamp(const Block & block)
: chunk(block), min_timestamp(block.minTimestamp()), max_timestamp(block.maxTimestamp()) { }
@@ -122,4 +139,4 @@ struct LightChunkWithTimestamp
Int64 maxTimestamp() const noexcept { return max_timestamp; }
};
-}
\ No newline at end of file
+}
diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp
index 5eba751ac7e..5889774bfc6 100644
--- a/src/Interpreters/ExpressionAnalyzer.cpp
+++ b/src/Interpreters/ExpressionAnalyzer.cpp
@@ -2394,7 +2394,7 @@ std::shared_ptr SelectQueryExpressionAnalyzer::chooseJoinAlgorithmStreami
return std::make_shared(
analyzed_join, max_threads, std::move(left_join_stream_desc), std::move(right_join_stream_desc));
else
- return std::make_shared(analyzed_join, std::move(left_join_stream_desc), std::move(right_join_stream_desc));
+ return Streaming::HashJoin::create(analyzed_join, std::move(left_join_stream_desc), std::move(right_join_stream_desc));
}
/// proton : ends
diff --git a/src/Interpreters/Streaming/AsofHashJoin.cpp b/src/Interpreters/Streaming/AsofHashJoin.cpp
new file mode 100644
index 00000000000..ac50cd21388
--- /dev/null
+++ b/src/Interpreters/Streaming/AsofHashJoin.cpp
@@ -0,0 +1,85 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+namespace Streaming
+{
+AsofHashJoin::AsofHashJoin(
+ std::shared_ptr table_join_,
+ JoinStreamDescriptionPtr left_join_stream_desc_,
+ JoinStreamDescriptionPtr right_join_stream_desc_)
+ : HashJoin(table_join_, left_join_stream_desc_, right_join_stream_desc_)
+ , asof_type(*table_join->getAsofType())
+ , asof_inequality(table_join->getAsofInequality())
+{
+ /// last key is asof column, not use as a hash key
+ hash_key_sizes = key_sizes;
+ hash_key_sizes.back();
+}
+
+void AsofHashJoin::joinLeftBlock(Block & block)
+{
+ doJoinBlockWithHashTable(block, hash_blocks);
+}
+
+void AsofHashJoin::insertRightBlock(Block block)
+{
+ /// FIXME, there are quite some block copies
+ /// FIXME, all_key_columns shall hold shared_ptr to columns instead of raw ptr
+ /// then we can update `source_block` in place
+ /// key columns are from source `block`
+ ColumnRawPtrMap all_key_columns = JoinCommon::materializeColumnsInplaceMap(block, table_join->getAllNames(JoinTableSide::Right));
+
+ /// We have copy of source `block` to `block_to_save` after prepare, so `block_to_save` is good to get moved to the buffered stream data
+ Block block_to_save = prepareBlockToSave(block, right_data.buffered_data->sample_block);
+
+ /// FIXME, multiple disjuncts OR clause
+ ColumnRawPtrs key_columns;
+ const Names & key_names = table_join->getClauses().front().key_names_right;
+ key_columns.reserve(key_names.size());
+ for (const auto & name : key_names)
+ key_columns.push_back(all_key_columns[name]);
+
+ auto asof_column = key_columns.back();
+ key_columns.pop_back();
+
+ /// Add `block_to_save` to target stream data
+ /// Note `block_to_save` may be empty for cases in which the query doesn't care other non-key columns.
+ /// For example, SELECT count() FROM stream_a JOIN stream_b ON i=ii;
+ auto rows = block_to_save.rows();
+ auto start_row = right_buffered_hash_data->blocks.pushBackOrConcat(std::move(block_to_save));
+ auto row_ref_handler = [&](AsofRowRef & row_ref, bool inserted, size_t original_row, size_t row) {
+ AsofRowRef * row_ref_ptr = &row_ref;
+ if (inserted)
+ row_ref_ptr = new (row_ref_ptr) AsofRowRef(asof_type);
+
+ row_ref_ptr->insert(
+ asof_type,
+ asof_column,
+ &(right_buffered_hash_data->blocks),
+ original_row,
+ row,
+ asof_inequality,
+ rightJoinStreamDescription()->keep_versions);
+ };
+
+ right_buffered_hash_data->map.insert(std::move(block_to_save), key_columns, hash_key_sizes, rows, right_buffered_hash_data->pool, std::move(row_ref_handler));
+
+ checkLimits();
+}
+
+void AsofHashJoin::checkLimits() const
+{
+ auto current_total_bytes = right_buffered_hash_data->totalBufferedBytes();
+ if (current_total_bytes >= join_max_cached_bytes)
+ throw Exception(
+ ErrorCodes::SET_SIZE_LIMIT_EXCEEDED,
+ "Streaming asof join's memory reaches max size: {}, current total: {}, right: {}",
+ join_max_cached_bytes,
+ current_total_bytes,
+ buffered_hash_data->getMetricsString());
+}
+}
+}
diff --git a/src/Interpreters/Streaming/AsofHashJoin.h b/src/Interpreters/Streaming/AsofHashJoin.h
new file mode 100644
index 00000000000..4f0c5907708
--- /dev/null
+++ b/src/Interpreters/Streaming/AsofHashJoin.h
@@ -0,0 +1,32 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+namespace Streaming
+{
+class AsofHashJoin final : public HashJoin
+{
+public:
+ AsofHashJoin(
+ std::shared_ptr table_join_,
+ JoinStreamDescriptionPtr left_join_stream_desc_,
+ JoinStreamDescriptionPtr right_join_stream_desc_);
+
+ HashJoinType type() const override { return HashJoinType::Asof; }
+
+ void joinLeftBlock(Block & block) override;
+ void insertRightBlock(Block block) override;
+
+private:
+ TypeIndex asof_type;
+ ASOFJoinInequality asof_inequality;
+
+ using DataBlock = LightChunkWithTimestamp;
+ using BufferedAsofHashData = BufferedHashData>;
+ SERDE std::unique_ptr right_buffered_hash_data;
+};
+
+}
+}
diff --git a/src/Interpreters/Streaming/BidirectionalAllHashJoin.h b/src/Interpreters/Streaming/BidirectionalAllHashJoin.h
new file mode 100644
index 00000000000..abdadf7757c
--- /dev/null
+++ b/src/Interpreters/Streaming/BidirectionalAllHashJoin.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+namespace Streaming
+{
+class BidirectionalAllHashJoin final : public HashJoin
+{
+public:
+ using HashJoin::HashJoin;
+ HashJoinType type() const override { return HashJoinType::BidirectionalAll; }
+};
+
+}
+}
diff --git a/src/Interpreters/Streaming/BidirectionalChangelogHashJoin.h b/src/Interpreters/Streaming/BidirectionalChangelogHashJoin.h
new file mode 100644
index 00000000000..e991a6f9f28
--- /dev/null
+++ b/src/Interpreters/Streaming/BidirectionalChangelogHashJoin.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+namespace Streaming
+{
+class BidirectionalChangelogHashJoin final : public HashJoin
+{
+public:
+ using HashJoin::HashJoin;
+ HashJoinType type() const override { return HashJoinType::BidirectionalChangelog; }
+};
+
+}
+}
diff --git a/src/Interpreters/Streaming/BidirectionalRangeHashJoin.h b/src/Interpreters/Streaming/BidirectionalRangeHashJoin.h
new file mode 100644
index 00000000000..455bd175ddb
--- /dev/null
+++ b/src/Interpreters/Streaming/BidirectionalRangeHashJoin.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+namespace Streaming
+{
+class BidirectionalRangeHashJoin final : public HashJoin
+{
+public:
+ using HashJoin::HashJoin;
+ HashJoinType type() const override { return HashJoinType::BidirectionalRange; }
+};
+
+}
+}
diff --git a/src/Interpreters/Streaming/ChangelogHashJoin.h b/src/Interpreters/Streaming/ChangelogHashJoin.h
new file mode 100644
index 00000000000..ad40d4dccad
--- /dev/null
+++ b/src/Interpreters/Streaming/ChangelogHashJoin.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+namespace Streaming
+{
+class ChangelogHashJoin final : public HashJoin
+{
+public:
+ using HashJoin::HashJoin;
+ HashJoinType type() const override { return HashJoinType::Changelog; }
+};
+
+}
+}
diff --git a/src/Interpreters/Streaming/ConcurrentHashJoin.cpp b/src/Interpreters/Streaming/ConcurrentHashJoin.cpp
index dd6827ba0d8..ccc59e21358 100644
--- a/src/Interpreters/Streaming/ConcurrentHashJoin.cpp
+++ b/src/Interpreters/Streaming/ConcurrentHashJoin.cpp
@@ -49,7 +49,7 @@ ConcurrentHashJoin::ConcurrentHashJoin(
for (size_t i = 0; i < slots; ++i)
{
auto inner_hash_join = std::make_shared();
- inner_hash_join->data = std::make_unique(table_join, left_join_stream_desc, right_join_stream_desc);
+ inner_hash_join->data = Streaming::HashJoin::create(table_join, left_join_stream_desc, right_join_stream_desc);
hash_joins.emplace_back(std::move(inner_hash_join));
}
}
@@ -74,7 +74,7 @@ void ConcurrentHashJoin::rescale(size_t slots_)
for (; slots < new_slots; ++slots)
{
auto inner_hash_join = std::make_shared();
- inner_hash_join->data = std::make_unique(table_join, left_join_stream_desc, right_join_stream_desc);
+ inner_hash_join->data = Streaming::HashJoin::create(table_join, left_join_stream_desc, right_join_stream_desc);
hash_joins.emplace_back(std::move(inner_hash_join));
}
}
@@ -97,184 +97,24 @@ void ConcurrentHashJoin::transformHeader(Block & header)
hash_joins[0]->data->transformHeader(header);
}
-void ConcurrentHashJoin::insertRightBlock(Block right_block)
-{
- auto dispatched_blocks = dispatchBlock(right_key_column_positions, std::move(right_block));
-
- size_t blocks_left = dispatched_blocks.size();
-
- while (blocks_left > 0)
- {
- if (is_cancelled)
- break;
-
- /// insert blocks into corresponding HashJoin instances
- for (auto & dispatched_block : dispatched_blocks)
- {
- if (dispatched_block.block)
- {
- if (dispatched_block.block.rows() > 0)
- {
- auto & hash_join = hash_joins[dispatched_block.shard];
-
- /// if current hash_join is already processed by another thread, skip it and try later
- std::unique_lock lock(hash_join->mutex, std::try_to_lock);
- if (!lock.owns_lock())
- continue;
-
- hash_join->data->insertRightBlock(std::move(dispatched_block.block));
- }
- else
- dispatched_block.block = {};
-
- assert(!dispatched_block.block);
- blocks_left--;
- }
- }
- }
-}
-
-void ConcurrentHashJoin::joinLeftBlock(Block & left_block)
-{
- auto dispatched_blocks = dispatchBlock(right_key_column_positions, std::move(left_block));
- Blocks joined_blocks;
- joined_blocks.reserve(dispatched_blocks.size());
-
- left_block = {};
-
- size_t blocks_left = dispatched_blocks.size();
- while (blocks_left > 0)
- {
- if (is_cancelled)
- break;
-
- for (auto & dispatched_block : dispatched_blocks)
- {
- if (dispatched_block.block)
- {
- if (dispatched_block.block.rows() > 0)
- {
- auto & hash_join = hash_joins[dispatched_block.shard];
-
- /// if current hash_join is already processed by another thread, skip it and try later
- std::unique_lock lock(hash_join->mutex, std::try_to_lock);
- if (!lock.owns_lock())
- continue;
-
- hash_join->data->joinLeftBlock(dispatched_block.block);
-
- if (dispatched_block.block.rows() > 0)
- joined_blocks.emplace_back(std::move(dispatched_block.block));
- else
- dispatched_block.block = {};
- }
- else
- dispatched_block.block = {};
-
- assert(!dispatched_block.block);
- blocks_left--;
- }
- }
- }
-
- left_block = concatenateBlocks(joined_blocks);
-}
-
template
-Block ConcurrentHashJoin::insertBlockAndJoin(Block & block)
+std::pair ConcurrentHashJoin::doInsertDataBlockAndJoin(LightChunk && chunk)
{
- const std::vector * key_column_positions;
- if constexpr (is_left_block)
- key_column_positions = &left_key_column_positions;
- else
- key_column_positions = &right_key_column_positions;
-
- auto dispatched_blocks = dispatchBlock(*key_column_positions, std::move(block));
-
- size_t blocks_left = dispatched_blocks.size();
-
- Blocks retracted_blocks;
- if (emitChangeLog())
- retracted_blocks.reserve(blocks_left);
-
- Blocks joined_blocks;
- joined_blocks.reserve(blocks_left);
-
- while (blocks_left > 0)
- {
- if (is_cancelled)
- break;
-
- /// insert blocks into corresponding HashJoin instances
- for (auto & dispatched_block : dispatched_blocks)
- {
- if (dispatched_block.block)
- {
- if (dispatched_block.block.rows() > 0)
- {
- auto & hash_join = hash_joins[dispatched_block.shard];
-
- /// if current hash_join is already processed by another thread, skip it and try later
- std::unique_lock lock(hash_join->mutex, std::try_to_lock);
- if (!lock.owns_lock())
- continue;
-
- if constexpr (is_left_block)
- {
- auto retracted_block = hash_join->data->insertLeftBlockAndJoin(dispatched_block.block);
- if (retracted_block.rows() > 0)
- retracted_blocks.emplace_back(std::move(retracted_block));
- }
- else
- {
- auto retracted_block = hash_join->data->insertRightBlockAndJoin(dispatched_block.block);
- if (retracted_block.rows() > 0)
- retracted_blocks.emplace_back(std::move(retracted_block));
- }
-
- if (dispatched_block.block.rows() > 0)
- joined_blocks.emplace_back(std::move(dispatched_block.block));
- else
- dispatched_block.block = {};
- }
- else
- dispatched_block.block = {};
-
- assert(!dispatched_block.block);
- blocks_left--;
- }
- }
- }
-
- block = concatenateBlocks(joined_blocks);
- return concatenateBlocks(retracted_blocks);
-}
-
-Block ConcurrentHashJoin::insertLeftBlockAndJoin(Block & left_block)
-{
- return insertBlockAndJoin(left_block);
-}
-
-Block ConcurrentHashJoin::insertRightBlockAndJoin(Block & right_block)
-{
- return insertBlockAndJoin(right_block);
-}
+ auto rows = chunk.rows();
+ if (rows <= 0)
+ return {};
-template
-std::vector ConcurrentHashJoin::insertBlockToRangeBucketAndJoin(Block block)
-{
- const std::vector * key_column_positions;
+ /// FIXME: Optmize hash join Block to LightChunk.
+ LightChunksWithShard dispatched_blocks;
if constexpr (is_left_block)
- key_column_positions = &left_key_column_positions;
+ dispatched_blocks = dispatchBlock(left_key_column_positions, std::move(chunk));
else
- key_column_positions = &right_key_column_positions;
-
- auto dispatched_blocks = dispatchBlock(*key_column_positions, std::move(block));
+ dispatched_blocks = dispatchBlock(right_key_column_positions, std::move(chunk));
size_t blocks_left = dispatched_blocks.size();
- std::vector joined_results;
- joined_results.reserve(blocks_left);
+ std::pair joined_result;
+ auto & [retracted_data, joined_data] = joined_result;
while (blocks_left > 0)
{
@@ -297,48 +137,37 @@ std::vector ConcurrentHashJoin::insertBlockToRangeBucketAndJoin(Block blo
if constexpr (is_left_block)
{
- auto joined_blocks = hash_join->data->insertLeftBlockToRangeBucketsAndJoin(std::move(dispatched_block.block));
- for (auto & joined_block : joined_blocks)
- joined_results.emplace_back(std::move(joined_block));
+ auto res = hash_join->data->insertLeftDataBlockAndJoin(std::move(dispatched_block.block));
+ retracted_data.concat(std::move(res.first));
+ joined_data.concat(std::move(res.second));
}
else
{
- auto joined_blocks = hash_join->data->insertRightBlockToRangeBucketsAndJoin(std::move(dispatched_block.block));
- for (auto & joined_block : joined_blocks)
- joined_results.emplace_back(std::move(joined_block));
+ auto res = hash_join->data->insertRightDataBlockAndJoin(std::move(dispatched_block.block));
+ retracted_data.concat(std::move(res.first));
+ joined_data.concat(std::move(res.second));
}
}
- else
- dispatched_block.block = {};
- assert(!dispatched_block.block);
+ dispatched_block.block = {};
blocks_left--;
}
}
}
- return joined_results;
-}
-
-std::vector ConcurrentHashJoin::insertLeftBlockToRangeBucketsAndJoin(Block left_block)
-{
- return insertBlockToRangeBucketAndJoin(std::move(left_block));
-}
-
-std::vector ConcurrentHashJoin::insertRightBlockToRangeBucketsAndJoin(Block right_block)
-{
- return insertBlockToRangeBucketAndJoin(std::move(right_block));
+ return joined_result;
}
bool ConcurrentHashJoin::addJoinedBlock(const Block & right_block, bool /*check_limits*/)
{
- insertRightBlock(right_block); /// We have a block copy here
+ insertRightDataBlockAndJoin(right_block);
return true;
}
void ConcurrentHashJoin::joinBlock(Block & block, std::shared_ptr & /*not_processed*/)
{
- joinLeftBlock(block);
+ auto joined_result = insertLeftDataBlockAndJoin(block);
+ block = left_join_stream_desc->input_header.cloneWithColumns(joined_result.second.detachColumns());
}
void ConcurrentHashJoin::checkTypesOfKeys(const Block & block) const
@@ -416,50 +245,51 @@ static ALWAYS_INLINE IColumn::Selector hashToSelector(const WeakHash32 & hash, s
return selector;
}
-IColumn::Selector ConcurrentHashJoin::selectDispatchBlock(const std::vector & key_column_positions, const Block & from_block)
+IColumn::Selector ConcurrentHashJoin::selectDispatchBlock(const std::vector & key_column_positions, const LightChunk & chunk)
{
- size_t num_rows = from_block.rows();
+ size_t num_rows = chunk.rows();
size_t num_shards = hash_joins.size();
WeakHash32 hash(num_rows);
for (const auto & key_column_position : key_column_positions)
{
- const auto & key_col = from_block.getByPosition(key_column_position).column->convertToFullColumnIfConst();
+ const auto & key_col = chunk.data[key_column_position]->convertToFullColumnIfConst();
const auto & key_col_no_lc = recursiveRemoveLowCardinality(recursiveRemoveSparse(key_col));
key_col_no_lc->updateWeakHash32(hash);
}
return hashToSelector(hash, num_shards);
}
-BlocksWithShard ConcurrentHashJoin::dispatchBlock(const std::vector & key_column_positions, Block && from_block)
+LightChunksWithShard ConcurrentHashJoin::dispatchBlock(const std::vector & key_column_positions, LightChunk && chunk)
{
size_t num_shards = hash_joins.size();
- size_t num_cols = from_block.columns();
+ size_t num_cols = chunk.columns();
- IColumn::Selector selector = selectDispatchBlock(key_column_positions, from_block);
+ IColumn::Selector selector = selectDispatchBlock(key_column_positions, chunk);
/// Optimized for 1 row block
if (selector.size() == 1)
- return BlocksWithShard{{std::move(from_block), static_cast(selector[0])}};
+ return LightChunksWithShard{{std::move(chunk), static_cast(selector[0])}};
std::vector> dispatched_columns;
dispatched_columns.reserve(num_cols);
+ const auto & columns = chunk.getColumns();
for (size_t i = 0; i < num_cols; ++i)
- dispatched_columns.emplace_back(from_block.getByPosition(i).column->scatter(num_shards, selector));
+ dispatched_columns.emplace_back(columns[i]->scatter(num_shards, selector));
- BlocksWithShard result;
+ LightChunksWithShard result;
result.reserve(num_shards);
for (size_t shard = 0; shard < num_shards; ++shard)
{
if (dispatched_columns[0][shard])
{
- result.emplace_back(from_block.cloneEmpty(), static_cast(shard));
- auto & current_block = result.back().block;
+ result.emplace_back(chunk.cloneEmpty(), static_cast(shard));
+ auto & current_block_columns = result.back().block.getColumns();
/// if dispatched column is not null at `shard`
for (size_t col_pos = 0; col_pos < num_cols; ++col_pos)
- current_block.getByPosition(col_pos).column = std::move(dispatched_columns[col_pos][shard]);
+ current_block_columns[col_pos] = std::move(dispatched_columns[col_pos][shard]);
}
}
diff --git a/src/Interpreters/Streaming/ConcurrentHashJoin.h b/src/Interpreters/Streaming/ConcurrentHashJoin.h
index 56aa3dfe6d9..7a69c060c37 100644
--- a/src/Interpreters/Streaming/ConcurrentHashJoin.h
+++ b/src/Interpreters/Streaming/ConcurrentHashJoin.h
@@ -1,6 +1,6 @@
#pragma once
-#include
+#include
#include
#include
@@ -30,19 +30,17 @@ class ConcurrentHashJoin final : public IHashJoin
void transformHeader(Block & header) override;
- /// For non-bidirectional hash join
- void insertRightBlock(Block right_block) override;
- void joinLeftBlock(Block & left_block) override;
-
- /// For bidirectional hash join
- /// There are 2 blocks returned : joined block via parameter and retracted block via returned-value if there is
- Block insertLeftBlockAndJoin(Block & left_block) override;
- Block insertRightBlockAndJoin(Block & right_block) override;
-
- /// For bidirectional range hash join, there may be multiple joined blocks
- std::vector insertLeftBlockToRangeBucketsAndJoin(Block left_block) override;
- std::vector insertRightBlockToRangeBucketsAndJoin(Block right_block) override;
+ /// \returns
+ std::pair insertLeftDataBlockAndJoin(LightChunk && chunk) override
+ {
+ return doInsertDataBlockAndJoin(std::move(chunk));
+ }
+ std::pair insertRightDataBlockAndJoin(LightChunk && chunk) override
+ {
+ return doInsertDataBlockAndJoin(std::move(chunk));
+ }
+ HashJoinType type() const override { return hash_joins[0]->data->type(); }
bool emitChangeLog() const override { return hash_joins[0]->data->emitChangeLog(); }
bool bidirectionalHashJoin() const override { return hash_joins[0]->data->bidirectionalHashJoin(); }
bool rangeBidirectionalHashJoin() const override { return hash_joins[0]->data->rangeBidirectionalHashJoin(); }
@@ -82,6 +80,8 @@ class ConcurrentHashJoin final : public IHashJoin
return hash_joins[0]->data->rightJoinStreamDescription();
}
+ const Block & getOutputHeader() const override { return hash_joins[0]->data->getOutputHeader(); }
+
void serialize(WriteBuffer &, VersionType) const override;
void deserialize(ReadBuffer &, VersionType) override;
@@ -89,13 +89,10 @@ class ConcurrentHashJoin final : public IHashJoin
private:
template
- Block insertBlockAndJoin(Block & block);
-
- template
- std::vector insertBlockToRangeBucketAndJoin(Block block);
+ std::pair doInsertDataBlockAndJoin(LightChunk && chunk);
- IColumn::Selector selectDispatchBlock(const std::vector & key_column_positions, const Block & from_block);
- BlocksWithShard dispatchBlock(const std::vector & key_column_positions, Block && from_block);
+ IColumn::Selector selectDispatchBlock(const std::vector & key_column_positions, const LightChunk & from_block);
+ LightChunksWithShard dispatchBlock(const std::vector & key_column_positions, LightChunk && from_block);
void doSerialize(WriteBuffer &) const;
void doDeserialize(ReadBuffer &);
@@ -104,7 +101,7 @@ class ConcurrentHashJoin final : public IHashJoin
struct InternalHashJoin
{
std::mutex mutex;
- std::unique_ptr data;
+ std::shared_ptr data;
};
std::shared_ptr table_join;
diff --git a/src/Interpreters/Streaming/DynamicEnrichmentHashJoin.h b/src/Interpreters/Streaming/DynamicEnrichmentHashJoin.h
new file mode 100644
index 00000000000..1cfea4309fd
--- /dev/null
+++ b/src/Interpreters/Streaming/DynamicEnrichmentHashJoin.h
@@ -0,0 +1,377 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+namespace Streaming
+{
+class DynamicEnrichmentHashJoin final : public HashJoin
+{
+public:
+ using HashJoin:HashJoin;
+
+ ~DynamicEnrichmentHashJoin() noexcept override { }
+
+ /// \returns
+ std::pair insertLeftAndJoin(LightChunk && chunk) override;
+ std::pair insertRightDataBlockAndJoin(LightChunk && chunk) override;
+
+ UInt64 keepVersions() const { return right_data.join_stream_desc->keep_versions; }
+
+ const TableJoin & getTableJoin() const override { return *table_join; }
+
+ void checkTypesOfKeys(const Block & block) const override;
+
+ bool isFilled() const override { return false; }
+
+ /** For RIGHT and FULL JOINs.
+ * A stream that will contain default values from left table, joined with rows from right table, that was not joined before.
+ * Use only after all calls to joinBlock was done.
+ * left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside).
+ */
+ std::shared_ptr
+ getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
+
+ /// Number of keys in all built JOIN maps.
+ size_t getTotalRowCount() const final;
+ /// Sum size in bytes of all buffers, used for JOIN maps and for all memory pools.
+ size_t getTotalByteCount() const final;
+
+ bool alwaysReturnsEmptySet() const final;
+
+ void getKeyColumnPositions(
+ std::vector & left_key_column_positions,
+ std::vector & right_key_column_positions,
+ bool include_asof_key_column) const override
+ {
+ auto calc_key_positions = [](const auto & key_column_names, const auto & header, auto & key_column_positions) {
+ key_column_positions.reserve(key_column_names.size());
+ for (const auto & name : key_column_names)
+ key_column_positions.push_back(header.getPositionByName(name));
+ };
+
+ calc_key_positions(table_join->getOnlyClause().key_names_left, left_data.join_stream_desc->input_header, left_key_column_positions);
+ calc_key_positions(
+ table_join->getOnlyClause().key_names_right, right_data.join_stream_desc->input_header, right_key_column_positions);
+
+ if (!include_asof_key_column && (streaming_strictness == Strictness::Range || streaming_strictness == Strictness::Asof))
+ {
+ left_key_column_positions.pop_back();
+ right_key_column_positions.pop_back();
+ }
+
+ assert(!left_key_column_positions.empty());
+ assert(!right_key_column_positions.empty());
+ }
+
+ size_t dataBlockSize() const noexcept { return table_join->dataBlockSize(); }
+ JoinKind getKind() const { return kind; }
+ JoinStrictness getStrictness() const { return strictness; }
+ Kind getStreamingKind() const { return streaming_kind; }
+ Strictness getStreamingStrictness() const { return streaming_strictness; }
+ const std::optional & getAsofType() const { return asof_type; }
+ ASOFJoinInequality getAsofInequality() const { return asof_inequality; }
+ bool anyTakeLastRow() const { return any_take_last_row; }
+
+ const ColumnWithTypeAndName & rightAsofKeyColumn() const { return right_data.asof_key_column; }
+ const ColumnWithTypeAndName & leftAsofKeyColumn() const { return left_data.asof_key_column; }
+
+ const Block & getOutputHeader() const { return output_header; }
+
+ void serialize(WriteBuffer & wb) const override;
+ void deserialize(ReadBuffer & rb) override;
+
+ void cancel() override { }
+
+ using RefListMultiple = RowRefListMultiple;
+ using RefListMultipleRef = RowRefListMultipleRef;
+ using RefListMultipleRefPtr = RowRefListMultipleRefPtr;
+
+ using MapsOne = HashMapsTemplate>;
+ using MapsMultiple = HashMapsTemplate>;
+ using MapsAll = HashMapsTemplate>;
+ using MapsAsof = HashMapsTemplate>;
+ using MapsRangeAsof = HashMapsTemplate>;
+ using MapsVariant = std::variant;
+
+ HashMapSizes sizesOfMapsVariant(const MapsVariant & maps_variant) const;
+ HashType getHashMethodType() const { return hash_method_type; }
+
+ /// bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); }
+ /// bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); }
+
+ friend struct BufferedStreamData;
+
+ /// For changelog emit
+ struct JoinResults
+ {
+ JoinResults(const Block & header_) : sample_block(header_), blocks(metrics), maps(std::make_unique()) { }
+
+ String joinMetricsString(const HashJoin * join) const;
+
+ mutable std::mutex mutex;
+
+ Block sample_block;
+
+ SERDE CachedBlockMetrics metrics;
+ SERDE JoinDataBlockList blocks;
+
+ /// Arena pool to hold the (string) keys
+ Arena pool;
+
+ /// Building hash map for joined blocks, then we can find previous
+ /// join blocks quickly by using joined keys
+ SERDE std::unique_ptr maps;
+ };
+
+ JoinStreamDescriptionPtr leftJoinStreamDescription() const noexcept override { return left_data.join_stream_desc; }
+ JoinStreamDescriptionPtr rightJoinStreamDescription() const noexcept override { return right_data.join_stream_desc; }
+
+private:
+ /// For non-bidirectional hash join
+ void insertRightBlock(Block right_block) override;
+ void joinLeftBlock(Block & left_block) override;
+
+ /// For bidirectional hash join
+ /// There are 2 blocks returned : joined block via parameter and retracted block via returned-value if there is
+ Block insertLeftBlockAndJoin(Block & left_block) override;
+ Block insertRightBlockAndJoin(Block & right_block) override;
+
+ /// For bidirectional range hash join, there may be multiple joined blocks
+ std::vector insertLeftBlockToRangeBucketsAndJoin(Block left_block) override;
+ std::vector insertRightBlockToRangeBucketsAndJoin(Block right_block) override;
+
+ void checkJoinSemantic() const;
+ void init();
+ void initBufferedData();
+ void initHashMaps(std::vector & all_maps);
+ void dataMapInit(MapsVariant &);
+
+ /// For versioned kv / changelog kv
+ void initLeftPrimaryKeyHashTable();
+ void initRightPrimaryKeyHashTable();
+ void reviseJoinStrictness();
+
+ void initLeftBlockStructure();
+ void initRightBlockStructure();
+ struct JoinData;
+ void initBlockStructure(JoinData & join_data, const Block & table_keys, const Block & sample_block_with_columns_to_add) const;
+
+ void chooseHashMethod();
+
+ void validateAsofJoinKey();
+
+ void checkLimits() const;
+
+ const Block & savedLeftBlockSample() const { return left_data.buffered_data->sample_block; }
+ const Block & savedRightBlockSample() const { return right_data.buffered_data->sample_block; }
+
+ /// Modify left or right block (update structure according to sample block) to save it in block list
+ template
+ Block prepareBlock(const Block & block) const;
+
+ /// Remove columns which are not needed to be projected
+ static Block prepareBlockToSave(const Block & block, const Block & sample_block);
+
+ /// For range bidirectional hash join
+ template
+ std::vector insertBlockToRangeBucketsAndJoin(Block block);
+
+ template
+ void doInsertBlock(Block block, HashBlocksPtr target_hash_blocks, std::vector row_refs = {});
+
+ /// For bidirectional hash join
+ /// Return retracted block if needs emit changelog, otherwise empty block
+ template
+ Block joinBlockWithHashTable(Block & block, HashBlocksPtr target_hash_blocks);
+
+ template
+ void doJoinBlockWithHashTable(Block & block, HashBlocksPtr target_hash_blocks);
+
+ /// Join left block with right hash table or join right block with left hash table
+ template
+ void joinBlockImpl(Block & block, const Block & block_with_columns_to_add, const std::vector & maps_) const;
+
+ /// `retract` does
+ /// 1) Add result_block to JoinResults
+ /// 2) Retract previous joined block. changelog emit
+ Block retract(const Block & result_block);
+
+ /// When left stream joins right stream, watermark calculation is more complicated.
+ /// Firstly, each stream has its own watermark and progresses separately.
+ /// Secondly, `combined_watermark` is calculated periodically according the watermarks in the left and right streams
+ void calculateWatermark();
+
+ /// For versioned-kv / changelog-kv join
+ /// Check if this ia row with a new primary key or an existing primary key
+ /// For the later, erase the element in the target join linked list
+ std::vector eraseOrAppendForPartialPrimaryKeyJoin(const Block & block);
+
+ template
+ std::vector eraseOrAppendForPartialPrimaryKeyJoin(Map & map, ColumnRawPtrs && primary_key_columns);
+
+ /// If the left / right_block is a retraction block : rows in `_tp_delta` column all have `-1`
+ /// We erase the previous key / values from hash table
+ /// Use for append JOIN changelog_kv / versioned_kv case
+ /// @return true if keys are erased, otherwise false
+ template
+ void eraseExistingKeys(Block & block, JoinData & join_data);
+ inline bool isRetractBlock(const Block & block, const JoinStreamDescription & join_stream_desc);
+
+ template
+ void doEraseExistingKeys(
+ Map & map,
+ const DB::Block & right_block,
+ ColumnRawPtrs && key_columns,
+ const Sizes & key_size,
+ const std::vector & skip_columns,
+ bool delete_key);
+
+ /// For bidirectional join, the input is a retract block
+ template
+ std::optional eraseExistingKeysAndRetractJoin(Block & left_block);
+
+ template
+ void transformToOutputBlock(Block & joined_block) const;
+
+private:
+ /// Only SERDE the clauses of join
+ SERDE std::shared_ptr table_join;
+ JoinKind kind;
+ JoinStrictness strictness;
+
+ SERDE Kind streaming_kind;
+ SERDE Strictness streaming_strictness;
+
+ bool any_take_last_row; /// Overwrite existing values when encountering the same key again
+ /// Only SERDE for ASOF JOIN or RANGE JOIN
+ SERDE std::optional asof_type;
+ SERDE ASOFJoinInequality asof_inequality;
+
+ /// Cache data members which avoid re-computation for every join
+ std::vector> cond_column_names;
+
+ SERDE std::vector key_sizes;
+
+ /// versioned-kv and changelog-kv both needs define a primary key
+ /// If join on partial primary key columns (or no primary key column is expressed in join on clause),
+ /// init this data structure
+ struct PrimaryKeyHashTable
+ {
+ PrimaryKeyHashTable(HashType hash_method_type_, Sizes && key_size_);
+
+ HashType hash_method_type;
+
+ Sizes key_size;
+
+ /// For key allocation
+ Arena pool;
+
+ /// Hash maps variants indexed by primary key columns
+ SERDE HashMapsTemplate map;
+ };
+ using PrimaryKeyHashTablePtr = std::shared_ptr;
+
+ struct JoinData
+ {
+ explicit JoinData(JoinStreamDescriptionPtr join_stream_desc_) : join_stream_desc(std::move(join_stream_desc_))
+ {
+ assert(join_stream_desc);
+ }
+
+ JoinStreamDescriptionPtr join_stream_desc;
+
+ ColumnWithTypeAndName asof_key_column;
+
+ /// Only for kv join. Used to maintain all unique keys
+ PrimaryKeyHashTablePtr primary_key_hash_table;
+
+ SERDE BufferedStreamDataPtr buffered_data;
+
+ /// Block with columns from the (left or) right-side table except key columns.
+ Block sample_block_with_columns_to_add;
+ /// Block with key columns in the same order they appear in the (left or) right-side table (duplicates appear once).
+ Block table_keys;
+ /// Block with key columns (left or) right-side table keys that are needed in result (would be attached after joined columns).
+ Block required_keys;
+ /// Left table column names that are sources for required_right_keys columns
+ std::vector required_keys_sources;
+
+ bool validated_join_key_types = false;
+ };
+
+ friend void serialize(const JoinData & join_data, WriteBuffer & wb);
+ friend void deserialize(JoinData & join_data, ReadBuffer & rb);
+
+ /// Note: when left block joins right hashtable, use `right_data`
+ SERDE JoinData right_data;
+ /// Note: when right block joins left hashtable, use `left_data`
+ SERDE JoinData left_data;
+
+ /// Right table data. StorageJoin shares it between many Join objects.
+ /// Flags that indicate that particular row already used in join.
+ /// Flag is stored for every record in hash map.
+ /// Number of this flags equals to hashtable buffer size (plus one for zero value).
+ /// Changes in hash table broke correspondence,
+ /// mutable JoinStuff::JoinUsedFlags used_flags;
+
+ SERDE HashType hash_method_type;
+
+ /// Only SERDE when emit_changelog is true
+ SERDE std::optional join_results;
+
+ bool retract_push_down = false;
+ bool emit_changelog = false;
+ bool bidirectional_hash_join = true;
+ bool range_bidirectional_hash_join = true;
+
+ /// Delta column in right-left-join
+ /// `rlj` -> right-left-join
+ std::optional left_delta_column_position_rlj;
+ std::optional right_delta_column_position_rlj;
+
+ /// `lrj` -> left-right-join
+ std::optional left_delta_column_position_lrj;
+ std::optional right_delta_column_position_lrj;
+
+ UInt64 join_max_cached_bytes = 0;
+
+ Block output_header;
+
+ /// Combined timestamp watermark progression of left stream and right stream
+ SERDE std::atomic_int64_t combined_watermark = 0;
+
+ struct JoinGlobalMetrics
+ {
+ size_t total_join = 0;
+ size_t left_block_and_right_range_bucket_no_intersection_skip = 0;
+ size_t right_block_and_left_range_bucket_no_intersection_skip = 0;
+
+ std::string string() const
+ {
+ return fmt::format(
+ "total_join={} "
+ "left_block_and_right_range_bucket_no_intersection_skip={} right_block_and_left_range_bucket_no_intersection_skip={}",
+ total_join,
+ left_block_and_right_range_bucket_no_intersection_skip,
+ right_block_and_left_range_bucket_no_intersection_skip);
+ }
+ };
+ friend void serialize(const JoinGlobalMetrics & join_metrics, WriteBuffer & wb);
+ friend void deserialize(JoinGlobalMetrics & join_metrics, ReadBuffer & rb);
+
+ SERDE JoinGlobalMetrics join_metrics;
+
+ Poco::Logger * logger;
+};
+
+struct HashJoinMapsVariants
+{
+ /// \return Number of keys in the map
+ HashMapSizes sizes(const HashJoin * join) const;
+ std::vector map_variants;
+};
+
+}
+}
diff --git a/src/Interpreters/Streaming/HashJoin.cpp b/src/Interpreters/Streaming/HashJoin.cpp
index 0e11f730df0..b28f47d18ba 100644
--- a/src/Interpreters/Streaming/HashJoin.cpp
+++ b/src/Interpreters/Streaming/HashJoin.cpp
@@ -13,8 +13,14 @@
#include
#include
#include
+#include
+#include
+#include
+#include
#include
+#include
#include
+#include
#include
#include
#include
@@ -585,13 +591,7 @@ struct Inserter
}
static ALWAYS_INLINE void insertMultiple(
- const HashJoin & join,
- Map & map,
- KeyGetter & key_getter,
- JoinDataBlockList * blocks,
- size_t original_row,
- size_t row,
- Arena & pool)
+ const HashJoin & join, Map & map, KeyGetter & key_getter, JoinDataBlockList * blocks, size_t original_row, size_t row, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, original_row, pool);
auto * mapped = &emplace_result.getMapped();
@@ -823,6 +823,68 @@ void HashJoin::validate(const JoinCombinationType & join_combination)
magic_enum::enum_name(std::get<3>(join_combination)));
}
+HashJoinPtr HashJoin::create(
+ std::shared_ptr table_join_, JoinStreamDescriptionPtr left_join_stream_desc, JoinStreamDescriptionPtr right_join_stream_desc)
+{
+ if (!table_join_->oneDisjunct())
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Stream to Stream join only supports only one disjunct join clause");
+
+ /// For streaming join, we don't support inline JOIN ON predicate like `left.value > 10` in the following query example
+ /// since stream query will need buffer more additional non-joined data in-memory
+ /// SELECT * FROM left INNER JOIN right ON left.key = right.key AND left.value > 10 and right.value > 80;
+ /// User shall use WHERE predicate like
+ /// SELECT * FROM (SELECT * FROM left WHERE left.value > 10) as left INNER JOIN right ON left.key = right.key
+ for (const auto & on_expr : table_join_->getClauses())
+ {
+ const auto & on_expr_cond_column_names = on_expr.condColumnNames();
+ if (!on_expr_cond_column_names.first.empty() || !on_expr_cond_column_names.second.empty())
+ throw Exception(
+ ErrorCodes::NOT_IMPLEMENTED, "Streaming join doesn't support predicates in JOIN ON clause. Use WHERE predicate instead");
+ }
+
+ auto join_kind = table_join_->kind();
+ auto join_strictness = table_join_->strictness();
+ validate(
+ {left_join_stream_desc->data_stream_semantic.toStorageSemantic(),
+ join_kind,
+ join_strictness,
+ right_join_stream_desc->data_stream_semantic.toStorageSemantic()});
+
+ [[maybe_unused]] auto streaming_kind = Streaming::toJoinKind(join_kind);
+ auto streaming_strictness = Streaming::toJoinStrictness(join_strictness, table_join_->isRangeJoin());
+
+ /// We don't even care the left stream semantic since if right stream is versioned-kv or changelog-kv
+ /// we will use `Multiple` list to hold the values since users may use partial primary key to join
+ if (isChangelogDataStream(right_join_stream_desc->data_stream_semantic))
+ streaming_strictness = Streaming::Strictness::Multiple;
+
+ /// Choose join algorithm
+ switch (streaming_strictness)
+ {
+ case Strictness::Asof:
+ return std::make_shared(
+ std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc));
+ case Strictness::Latest:
+ return std::make_shared(
+ std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc));
+ case Strictness::All:
+ return std::make_shared(
+ std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc));
+ case Strictness::Range:
+ return std::make_shared(
+ std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc));
+ case Strictness::Multiple: {
+ if (isChangelogDataStream(left_join_stream_desc->data_stream_semantic))
+ return std::make_shared(
+ std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc));
+ else
+ return std::make_shared(
+ std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc));
+ }
+ }
+ UNREACHABLE();
+}
+
HashJoin::HashJoin(
std::shared_ptr table_join_,
JoinStreamDescriptionPtr left_join_stream_desc_,
@@ -1311,12 +1373,62 @@ Block HashJoin::prepareBlock(const Block & block) const
return prepareBlockToSave(block, savedRightBlockSample());
}
-bool HashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/)
+std::pair HashJoin::insertLeftDataBlockAndJoin(LightChunk && chunk)
+{
+ if (rangeBidirectionalHashJoin())
+ {
+ auto block = left_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns());
+ auto joined_block = concatenateBlocks(insertLeftBlockToRangeBucketsAndJoin(std::move(block)));
+ return {LightChunk{}, std::move(joined_block)};
+ }
+ else if (bidirectionalHashJoin())
+ {
+ auto block = left_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns());
+ auto retracted_block = insertLeftBlockAndJoin(block);
+ return {std::move(retracted_block), std::move(block)};
+ }
+ else
+ {
+ auto block = left_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns());
+ joinLeftBlock(block);
+ return {LightChunk{}, std::move(block)};
+ }
+}
+
+std::pair HashJoin::insertRightDataBlockAndJoin(LightChunk && chunk)
{
- doInsertBlock(block, right_data.buffered_data->getCurrentHashBlocksPtr()); /// Copy the block
+ if (rangeBidirectionalHashJoin())
+ {
+ auto block = right_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns());
+ auto joined_block = concatenateBlocks(insertRightBlockToRangeBucketsAndJoin(std::move(block)));
+ return {LightChunk{}, std::move(joined_block)};
+ }
+ else if (bidirectionalHashJoin())
+ {
+ auto block = right_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns());
+ auto retracted_block = insertRightBlockAndJoin(block);
+ return {std::move(retracted_block), std::move(block)};
+ }
+ else
+ {
+ auto block = right_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns());
+ insertRightBlock(std::move(block));
+ return {};
+ }
+}
+
+bool HashJoin::addJoinedBlock(const Block & right_block, bool /*check_limits*/)
+{
+ insertRightDataBlockAndJoin(right_block);
return true;
}
+void HashJoin::joinBlock(Block & block, std::shared_ptr & /*not_processed*/)
+{
+ auto joined_result = insertLeftDataBlockAndJoin(block);
+ block = left_data.join_stream_desc->input_header.cloneWithColumns(joined_result.second.detachColumns());
+}
+
template
void HashJoin::doInsertBlock(Block block, HashBlocksPtr target_hash_blocks)
{
@@ -1575,11 +1687,6 @@ void HashJoin::checkTypesOfKeys(const Block & block) const
JoinCommon::checkTypesOfKeys(block, onexpr.key_names_left, right_data.table_keys, onexpr.key_names_right);
}
-void HashJoin::joinBlock(Block & block, ExtraBlockPtr & /*not_processed*/)
-{
- joinLeftBlock(block);
-}
-
template
void HashJoin::doJoinBlockWithHashTable(Block & block, HashBlocksPtr target_hash_blocks)
{
diff --git a/src/Interpreters/Streaming/HashJoin.h b/src/Interpreters/Streaming/HashJoin.h
index 299d91cfa2b..b484fa8d9c2 100644
--- a/src/Interpreters/Streaming/HashJoin.h
+++ b/src/Interpreters/Streaming/HashJoin.h
@@ -91,7 +91,7 @@ namespace Streaming
struct HashJoinMapsVariants;
-class HashJoin final : public IHashJoin
+class HashJoin : public IHashJoin
{
public:
/// - bool
@@ -102,6 +102,11 @@ class HashJoin final : public IHashJoin
static const SupportMatrix support_matrix;
static void validate(const JoinCombinationType & join_combination);
+ static HashJoinPtr create(
+ std::shared_ptr table_join_,
+ JoinStreamDescriptionPtr left_join_stream_desc_,
+ JoinStreamDescriptionPtr right_join_stream_desc_);
+
public:
HashJoin(
std::shared_ptr table_join_,
@@ -116,18 +121,9 @@ class HashJoin final : public IHashJoin
void transformHeader(Block & header) override;
- /// For non-bidirectional hash join
- void insertRightBlock(Block right_block) override;
- void joinLeftBlock(Block & left_block) override;
-
- /// For bidirectional hash join
- /// There are 2 blocks returned : joined block via parameter and retracted block via returned-value if there is
- Block insertLeftBlockAndJoin(Block & left_block) override;
- Block insertRightBlockAndJoin(Block & right_block) override;
-
- /// For bidirectional range hash join, there may be multiple joined blocks
- std::vector insertLeftBlockToRangeBucketsAndJoin(Block left_block) override;
- std::vector insertRightBlockToRangeBucketsAndJoin(Block right_block) override;
+ /// \returns
+ std::pair insertLeftDataBlockAndJoin(LightChunk && chunk) override;
+ std::pair insertRightDataBlockAndJoin(LightChunk && chunk) override;
/// "Legacy API", use insertRightBlock()
bool addJoinedBlock(const Block & block, bool check_limits) override;
@@ -203,7 +199,7 @@ class HashJoin final : public IHashJoin
const ColumnWithTypeAndName & rightAsofKeyColumn() const { return right_data.asof_key_column; }
const ColumnWithTypeAndName & leftAsofKeyColumn() const { return left_data.asof_key_column; }
- const Block & getOutputHeader() const { return output_header; }
+ const Block & getOutputHeader() const override { return output_header; }
void serialize(WriteBuffer & wb, VersionType version) const override;
void deserialize(ReadBuffer & rb, VersionType version) override;
@@ -258,6 +254,19 @@ class HashJoin final : public IHashJoin
JoinStreamDescriptionPtr rightJoinStreamDescription() const noexcept override { return right_data.join_stream_desc; }
private:
+ /// For non-bidirectional hash join
+ virtual void insertRightBlock(Block right_block);
+ virtual void joinLeftBlock(Block & left_block);
+
+ /// For bidirectional hash join
+ /// There are 2 blocks returned : joined block via parameter and retracted block via returned-value if there is
+ Block insertLeftBlockAndJoin(Block & left_block);
+ Block insertRightBlockAndJoin(Block & right_block);
+
+ /// For bidirectional range hash join, there may be multiple joined blocks
+ std::vector insertLeftBlockToRangeBucketsAndJoin(Block left_block);
+ std::vector insertRightBlockToRangeBucketsAndJoin(Block right_block);
+
void checkJoinSemantic() const;
void init();
void initBufferedData();
diff --git a/src/Interpreters/Streaming/IHashJoin.h b/src/Interpreters/Streaming/IHashJoin.h
index b27d9e6de79..0b6562c0314 100644
--- a/src/Interpreters/Streaming/IHashJoin.h
+++ b/src/Interpreters/Streaming/IHashJoin.h
@@ -5,8 +5,24 @@
namespace DB
{
+struct LightChunk;
+using LightChunks = std::vector;
+
namespace Streaming
{
+enum class HashJoinType : uint8_t
+{
+ /// Dynamic enrichment hash join
+ Asof = 1, /// append-only (left/inner) asof join append-only
+ Latest = 2, /// append-only (left/inner) latest join append-only
+ Changelog = 3, /// append-only (left/inner) all join changelog, for exmaple sources: `versioned_kv, changelog_kv and changelog(...)`
+
+ /// Bidirectional hash join
+ BidirectionalAll = 4, /// append-only inner all join append-only
+ BidirectionalRange = 5, /// append-only inner all join append-only with on clause `date_diff_within(...)
+ BidirectionalChangelog = 6, /// changelog inner all join changelog, for exmaple sources: `versioned_kv, changelog_kv and changelog(...)`
+};
+
class IHashJoin : public IJoin
{
public:
@@ -14,19 +30,10 @@ class IHashJoin : public IJoin
virtual void transformHeader(Block & header) = 0;
- /// For non-bidirectional hash join
- virtual void insertRightBlock(Block right_block) = 0;
- virtual void joinLeftBlock(Block & left_block) = 0;
-
- /// For bidirectional hash join
- /// There are 2 blocks returned : joined block via parameter and retracted block via returned-value if there is
- virtual Block insertLeftBlockAndJoin(Block & left_block) = 0;
- virtual Block insertRightBlockAndJoin(Block & right_block) = 0;
-
- /// For bidirectional range hash join, there may be multiple joined blocks
- virtual std::vector insertLeftBlockToRangeBucketsAndJoin(Block left_block) = 0;
- virtual std::vector insertRightBlockToRangeBucketsAndJoin(Block right_block) = 0;
+ virtual LightChunks insertLeftDataBlockAndJoin(LightChunk && chunk) = 0;
+ virtual LightChunks insertRightDataBlockAndJoin(LightChunk && chunk) = 0;
+ virtual HashJoinType type() const = 0;
virtual bool emitChangeLog() const = 0;
virtual bool bidirectionalHashJoin() const = 0;
virtual bool rangeBidirectionalHashJoin() const = 0;
@@ -46,8 +53,10 @@ class IHashJoin : public IJoin
virtual JoinStreamDescriptionPtr leftJoinStreamDescription() const noexcept = 0;
virtual JoinStreamDescriptionPtr rightJoinStreamDescription() const noexcept = 0;
- virtual void serialize(WriteBuffer & wb, VersionType version) const = 0;
- virtual void deserialize(ReadBuffer & rb, VersionType version) = 0;
+ virtual const Block & getOutputHeader() const = 0;
+
+ virtual void serialize(WriteBuffer &, VersionType) const = 0;
+ virtual void deserialize(ReadBuffer &, VersionType) = 0;
virtual void cancel() = 0;
};
diff --git a/src/Interpreters/Streaming/LatestHashJoin.h b/src/Interpreters/Streaming/LatestHashJoin.h
new file mode 100644
index 00000000000..091e7c4c716
--- /dev/null
+++ b/src/Interpreters/Streaming/LatestHashJoin.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include
+
+namespace DB
+{
+namespace Streaming
+{
+class LatestHashJoin final : public HashJoin
+{
+public:
+ using HashJoin::HashJoin;
+ HashJoinType type() const override { return HashJoinType::Latest; }
+};
+
+}
+}
diff --git a/src/Interpreters/Streaming/joinData.h b/src/Interpreters/Streaming/joinData.h
index abcb0facb6c..4f68c92e4fb 100644
--- a/src/Interpreters/Streaming/joinData.h
+++ b/src/Interpreters/Streaming/joinData.h
@@ -15,6 +15,7 @@
#include
#include
#include
+#include