diff --git a/src/Core/BlockWithShard.h b/src/Core/BlockWithShard.h deleted file mode 100644 index c920f47a3d4..00000000000 --- a/src/Core/BlockWithShard.h +++ /dev/null @@ -1,17 +0,0 @@ -#pragma once - -#include - -namespace DB -{ -struct BlockWithShard -{ - Block block; - int32_t shard; - - BlockWithShard(Block && block_, int32_t shard_) : block(std::move(block_)), shard(shard_) { } -}; - -using BlocksWithShard = std::vector; -} - diff --git a/src/Core/DataBlockWithShard.h b/src/Core/DataBlockWithShard.h new file mode 100644 index 00000000000..28051e23de7 --- /dev/null +++ b/src/Core/DataBlockWithShard.h @@ -0,0 +1,23 @@ +#pragma once + +#include +#include + +namespace DB +{ +template +struct DataBlockWithShard +{ + DataBlock block; + int32_t shard; + + DataBlockWithShard(DataBlock && block_, int32_t shard_) : block(std::move(block_)), shard(shard_) { } +}; + +using BlockWithShard = DataBlockWithShard; +using BlocksWithShard = std::vector; + +using LightChunkWithShard = DataBlockWithShard; +using LightChunksWithShard = std::vector; +} + diff --git a/src/Core/LightChunk.h b/src/Core/LightChunk.h index fe428e7185b..68451a85c9c 100644 --- a/src/Core/LightChunk.h +++ b/src/Core/LightChunk.h @@ -27,6 +27,9 @@ struct LightChunk void concat(const LightChunk & other) { auto added_rows = other.rows(); + if (added_rows <= 0) + return; + assert(columns() == other.columns()); for (size_t c = 0; auto & col : data) { @@ -35,9 +38,21 @@ struct LightChunk } } + LightChunk cloneEmpty() const + { + LightChunk res; + res.data.reserve(data.size()); + + for (const auto & elem : data) + res.data.emplace_back(elem->cloneEmpty()); + + return res; + } + size_t rows() const noexcept { return data.empty() ? 0 : data[0]->size(); } size_t columns() const noexcept { return data.size(); } + Columns & getColumns() noexcept { return data; } const Columns & getColumns() const noexcept { return data; } Columns detachColumns() noexcept { return std::move(data); } @@ -88,7 +103,9 @@ struct LightChunkWithTimestamp LightChunkWithTimestamp() = default; LightChunkWithTimestamp(Columns && data_) : chunk(std::move(data_)) { } LightChunkWithTimestamp(Chunk && chunk_, Int64 min_ts, Int64 max_ts) - : chunk(std::move(chunk_)), min_timestamp(min_ts), max_timestamp(max_ts) { } + : chunk(std::move(chunk_)), min_timestamp(min_ts), max_timestamp(max_ts) + { + } LightChunkWithTimestamp(const Block & block) : chunk(block), min_timestamp(block.minTimestamp()), max_timestamp(block.maxTimestamp()) { } @@ -122,4 +139,4 @@ struct LightChunkWithTimestamp Int64 maxTimestamp() const noexcept { return max_timestamp; } }; -} \ No newline at end of file +} diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 38dce5f78bd..90b38cba48f 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -2331,7 +2331,7 @@ std::shared_ptr SelectQueryExpressionAnalyzer::chooseJoinAlgorithmStreami return std::make_shared( analyzed_join, max_threads, std::move(left_join_stream_desc), std::move(right_join_stream_desc)); else - return std::make_shared(analyzed_join, std::move(left_join_stream_desc), std::move(right_join_stream_desc)); + return Streaming::HashJoin::create(analyzed_join, std::move(left_join_stream_desc), std::move(right_join_stream_desc)); } /// proton : ends diff --git a/src/Interpreters/Streaming/AsofHashJoin.h b/src/Interpreters/Streaming/AsofHashJoin.h new file mode 100644 index 00000000000..a1ea2df6d25 --- /dev/null +++ b/src/Interpreters/Streaming/AsofHashJoin.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +class AsofHashJoin final : public HashJoin +{ +public: + using HashJoin::HashJoin; + HashJoinType type() const override { return HashJoinType::Asof; } +}; + +} +} diff --git a/src/Interpreters/Streaming/BidirectionalAllHashJoin.h b/src/Interpreters/Streaming/BidirectionalAllHashJoin.h new file mode 100644 index 00000000000..abdadf7757c --- /dev/null +++ b/src/Interpreters/Streaming/BidirectionalAllHashJoin.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +class BidirectionalAllHashJoin final : public HashJoin +{ +public: + using HashJoin::HashJoin; + HashJoinType type() const override { return HashJoinType::BidirectionalAll; } +}; + +} +} diff --git a/src/Interpreters/Streaming/BidirectionalChangelogHashJoin.h b/src/Interpreters/Streaming/BidirectionalChangelogHashJoin.h new file mode 100644 index 00000000000..e991a6f9f28 --- /dev/null +++ b/src/Interpreters/Streaming/BidirectionalChangelogHashJoin.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +class BidirectionalChangelogHashJoin final : public HashJoin +{ +public: + using HashJoin::HashJoin; + HashJoinType type() const override { return HashJoinType::BidirectionalChangelog; } +}; + +} +} diff --git a/src/Interpreters/Streaming/BidirectionalRangeHashJoin.h b/src/Interpreters/Streaming/BidirectionalRangeHashJoin.h new file mode 100644 index 00000000000..455bd175ddb --- /dev/null +++ b/src/Interpreters/Streaming/BidirectionalRangeHashJoin.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +class BidirectionalRangeHashJoin final : public HashJoin +{ +public: + using HashJoin::HashJoin; + HashJoinType type() const override { return HashJoinType::BidirectionalRange; } +}; + +} +} diff --git a/src/Interpreters/Streaming/ChangelogHashJoin.h b/src/Interpreters/Streaming/ChangelogHashJoin.h new file mode 100644 index 00000000000..ad40d4dccad --- /dev/null +++ b/src/Interpreters/Streaming/ChangelogHashJoin.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +class ChangelogHashJoin final : public HashJoin +{ +public: + using HashJoin::HashJoin; + HashJoinType type() const override { return HashJoinType::Changelog; } +}; + +} +} diff --git a/src/Interpreters/Streaming/ConcurrentHashJoin.cpp b/src/Interpreters/Streaming/ConcurrentHashJoin.cpp index 4eab6ce31f0..da4ed1152ac 100644 --- a/src/Interpreters/Streaming/ConcurrentHashJoin.cpp +++ b/src/Interpreters/Streaming/ConcurrentHashJoin.cpp @@ -47,7 +47,7 @@ ConcurrentHashJoin::ConcurrentHashJoin( for (size_t i = 0; i < slots; ++i) { auto inner_hash_join = std::make_shared(); - inner_hash_join->data = std::make_unique(table_join, left_join_stream_desc, right_join_stream_desc); + inner_hash_join->data = Streaming::HashJoin::create(table_join, left_join_stream_desc, right_join_stream_desc); hash_joins.emplace_back(std::move(inner_hash_join)); } } @@ -72,7 +72,7 @@ void ConcurrentHashJoin::rescale(size_t slots_) for (; slots < new_slots; ++slots) { auto inner_hash_join = std::make_shared(); - inner_hash_join->data = std::make_unique(table_join, left_join_stream_desc, right_join_stream_desc); + inner_hash_join->data = Streaming::HashJoin::create(table_join, left_join_stream_desc, right_join_stream_desc); hash_joins.emplace_back(std::move(inner_hash_join)); } } @@ -95,184 +95,24 @@ void ConcurrentHashJoin::transformHeader(Block & header) hash_joins[0]->data->transformHeader(header); } -void ConcurrentHashJoin::insertRightBlock(Block right_block) -{ - auto dispatched_blocks = dispatchBlock(right_key_column_positions, std::move(right_block)); - - size_t blocks_left = dispatched_blocks.size(); - - while (blocks_left > 0) - { - if (is_cancelled) - break; - - /// insert blocks into corresponding HashJoin instances - for (auto & dispatched_block : dispatched_blocks) - { - if (dispatched_block.block) - { - if (dispatched_block.block.rows() > 0) - { - auto & hash_join = hash_joins[dispatched_block.shard]; - - /// if current hash_join is already processed by another thread, skip it and try later - std::unique_lock lock(hash_join->mutex, std::try_to_lock); - if (!lock.owns_lock()) - continue; - - hash_join->data->insertRightBlock(std::move(dispatched_block.block)); - } - else - dispatched_block.block = {}; - - assert(!dispatched_block.block); - blocks_left--; - } - } - } -} - -void ConcurrentHashJoin::joinLeftBlock(Block & left_block) -{ - auto dispatched_blocks = dispatchBlock(right_key_column_positions, std::move(left_block)); - Blocks joined_blocks; - joined_blocks.reserve(dispatched_blocks.size()); - - left_block = {}; - - size_t blocks_left = dispatched_blocks.size(); - while (blocks_left > 0) - { - if (is_cancelled) - break; - - for (auto & dispatched_block : dispatched_blocks) - { - if (dispatched_block.block) - { - if (dispatched_block.block.rows() > 0) - { - auto & hash_join = hash_joins[dispatched_block.shard]; - - /// if current hash_join is already processed by another thread, skip it and try later - std::unique_lock lock(hash_join->mutex, std::try_to_lock); - if (!lock.owns_lock()) - continue; - - hash_join->data->joinLeftBlock(dispatched_block.block); - - if (dispatched_block.block.rows() > 0) - joined_blocks.emplace_back(std::move(dispatched_block.block)); - else - dispatched_block.block = {}; - } - else - dispatched_block.block = {}; - - assert(!dispatched_block.block); - blocks_left--; - } - } - } - - left_block = concatenateBlocks(joined_blocks); -} - template -Block ConcurrentHashJoin::insertBlockAndJoin(Block & block) +std::pair ConcurrentHashJoin::doInsertDataBlockAndJoin(LightChunk && chunk) { - const std::vector * key_column_positions; - if constexpr (is_left_block) - key_column_positions = &left_key_column_positions; - else - key_column_positions = &right_key_column_positions; - - auto dispatched_blocks = dispatchBlock(*key_column_positions, std::move(block)); - - size_t blocks_left = dispatched_blocks.size(); - - Blocks retracted_blocks; - if (emitChangeLog()) - retracted_blocks.reserve(blocks_left); - - Blocks joined_blocks; - joined_blocks.reserve(blocks_left); - - while (blocks_left > 0) - { - if (is_cancelled) - break; - - /// insert blocks into corresponding HashJoin instances - for (auto & dispatched_block : dispatched_blocks) - { - if (dispatched_block.block) - { - if (dispatched_block.block.rows() > 0) - { - auto & hash_join = hash_joins[dispatched_block.shard]; - - /// if current hash_join is already processed by another thread, skip it and try later - std::unique_lock lock(hash_join->mutex, std::try_to_lock); - if (!lock.owns_lock()) - continue; - - if constexpr (is_left_block) - { - auto retracted_block = hash_join->data->insertLeftBlockAndJoin(dispatched_block.block); - if (retracted_block.rows() > 0) - retracted_blocks.emplace_back(std::move(retracted_block)); - } - else - { - auto retracted_block = hash_join->data->insertRightBlockAndJoin(dispatched_block.block); - if (retracted_block.rows() > 0) - retracted_blocks.emplace_back(std::move(retracted_block)); - } - - if (dispatched_block.block.rows() > 0) - joined_blocks.emplace_back(std::move(dispatched_block.block)); - else - dispatched_block.block = {}; - } - else - dispatched_block.block = {}; - - assert(!dispatched_block.block); - blocks_left--; - } - } - } - - block = concatenateBlocks(joined_blocks); - return concatenateBlocks(retracted_blocks); -} - -Block ConcurrentHashJoin::insertLeftBlockAndJoin(Block & left_block) -{ - return insertBlockAndJoin(left_block); -} - -Block ConcurrentHashJoin::insertRightBlockAndJoin(Block & right_block) -{ - return insertBlockAndJoin(right_block); -} + auto rows = chunk.rows(); + if (rows <= 0) + return {}; -template -std::vector ConcurrentHashJoin::insertBlockToRangeBucketAndJoin(Block block) -{ - const std::vector * key_column_positions; + /// FIXME: Optmize hash join Block to LightChunk. + LightChunksWithShard dispatched_blocks; if constexpr (is_left_block) - key_column_positions = &left_key_column_positions; + dispatched_blocks = dispatchBlock(left_key_column_positions, std::move(chunk)); else - key_column_positions = &right_key_column_positions; - - auto dispatched_blocks = dispatchBlock(*key_column_positions, std::move(block)); + dispatched_blocks = dispatchBlock(right_key_column_positions, std::move(chunk)); size_t blocks_left = dispatched_blocks.size(); - std::vector joined_results; - joined_results.reserve(blocks_left); + std::pair joined_result; + auto & [retracted_data, joined_data] = joined_result; while (blocks_left > 0) { @@ -295,48 +135,37 @@ std::vector ConcurrentHashJoin::insertBlockToRangeBucketAndJoin(Block blo if constexpr (is_left_block) { - auto joined_blocks = hash_join->data->insertLeftBlockToRangeBucketsAndJoin(std::move(dispatched_block.block)); - for (auto & joined_block : joined_blocks) - joined_results.emplace_back(std::move(joined_block)); + auto res = hash_join->data->insertLeftDataBlockAndJoin(std::move(dispatched_block.block)); + retracted_data.concat(std::move(res.first)); + joined_data.concat(std::move(res.second)); } else { - auto joined_blocks = hash_join->data->insertRightBlockToRangeBucketsAndJoin(std::move(dispatched_block.block)); - for (auto & joined_block : joined_blocks) - joined_results.emplace_back(std::move(joined_block)); + auto res = hash_join->data->insertRightDataBlockAndJoin(std::move(dispatched_block.block)); + retracted_data.concat(std::move(res.first)); + joined_data.concat(std::move(res.second)); } } - else - dispatched_block.block = {}; - assert(!dispatched_block.block); + dispatched_block.block = {}; blocks_left--; } } } - return joined_results; -} - -std::vector ConcurrentHashJoin::insertLeftBlockToRangeBucketsAndJoin(Block left_block) -{ - return insertBlockToRangeBucketAndJoin(std::move(left_block)); -} - -std::vector ConcurrentHashJoin::insertRightBlockToRangeBucketsAndJoin(Block right_block) -{ - return insertBlockToRangeBucketAndJoin(std::move(right_block)); + return joined_result; } bool ConcurrentHashJoin::addJoinedBlock(const Block & right_block, bool /*check_limits*/) { - insertRightBlock(right_block); /// We have a block copy here + insertRightDataBlockAndJoin(right_block); return true; } void ConcurrentHashJoin::joinBlock(Block & block, std::shared_ptr & /*not_processed*/) { - joinLeftBlock(block); + auto joined_result = insertLeftDataBlockAndJoin(block); + block = left_join_stream_desc->input_header.cloneWithColumns(joined_result.second.detachColumns()); } void ConcurrentHashJoin::checkTypesOfKeys(const Block & block) const @@ -403,50 +232,51 @@ static ALWAYS_INLINE IColumn::Selector hashToSelector(const WeakHash32 & hash, s return selector; } -IColumn::Selector ConcurrentHashJoin::selectDispatchBlock(const std::vector & key_column_positions, const Block & from_block) +IColumn::Selector ConcurrentHashJoin::selectDispatchBlock(const std::vector & key_column_positions, const LightChunk & chunk) { - size_t num_rows = from_block.rows(); + size_t num_rows = chunk.rows(); size_t num_shards = hash_joins.size(); WeakHash32 hash(num_rows); for (const auto & key_column_position : key_column_positions) { - const auto & key_col = from_block.getByPosition(key_column_position).column->convertToFullColumnIfConst(); + const auto & key_col = chunk.data[key_column_position]->convertToFullColumnIfConst(); const auto & key_col_no_lc = recursiveRemoveLowCardinality(recursiveRemoveSparse(key_col)); key_col_no_lc->updateWeakHash32(hash); } return hashToSelector(hash, num_shards); } -BlocksWithShard ConcurrentHashJoin::dispatchBlock(const std::vector & key_column_positions, Block && from_block) +LightChunksWithShard ConcurrentHashJoin::dispatchBlock(const std::vector & key_column_positions, LightChunk && chunk) { size_t num_shards = hash_joins.size(); - size_t num_cols = from_block.columns(); + size_t num_cols = chunk.columns(); - IColumn::Selector selector = selectDispatchBlock(key_column_positions, from_block); + IColumn::Selector selector = selectDispatchBlock(key_column_positions, chunk); /// Optimized for 1 row block if (selector.size() == 1) - return BlocksWithShard{{std::move(from_block), static_cast(selector[0])}}; + return LightChunksWithShard{{std::move(chunk), static_cast(selector[0])}}; std::vector> dispatched_columns; dispatched_columns.reserve(num_cols); + const auto & columns = chunk.getColumns(); for (size_t i = 0; i < num_cols; ++i) - dispatched_columns.emplace_back(from_block.getByPosition(i).column->scatter(num_shards, selector)); + dispatched_columns.emplace_back(columns[i]->scatter(num_shards, selector)); - BlocksWithShard result; + LightChunksWithShard result; result.reserve(num_shards); for (size_t shard = 0; shard < num_shards; ++shard) { if (dispatched_columns[0][shard]) { - result.emplace_back(from_block.cloneEmpty(), static_cast(shard)); - auto & current_block = result.back().block; + result.emplace_back(chunk.cloneEmpty(), static_cast(shard)); + auto & current_block_columns = result.back().block.getColumns(); /// if dispatched column is not null at `shard` for (size_t col_pos = 0; col_pos < num_cols; ++col_pos) - current_block.getByPosition(col_pos).column = std::move(dispatched_columns[col_pos][shard]); + current_block_columns[col_pos] = std::move(dispatched_columns[col_pos][shard]); } } diff --git a/src/Interpreters/Streaming/ConcurrentHashJoin.h b/src/Interpreters/Streaming/ConcurrentHashJoin.h index 3371b7e2b9c..58ec81add4f 100644 --- a/src/Interpreters/Streaming/ConcurrentHashJoin.h +++ b/src/Interpreters/Streaming/ConcurrentHashJoin.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -30,19 +30,17 @@ class ConcurrentHashJoin final : public IHashJoin void transformHeader(Block & header) override; - /// For non-bidirectional hash join - void insertRightBlock(Block right_block) override; - void joinLeftBlock(Block & left_block) override; - - /// For bidirectional hash join - /// There are 2 blocks returned : joined block via parameter and retracted block via returned-value if there is - Block insertLeftBlockAndJoin(Block & left_block) override; - Block insertRightBlockAndJoin(Block & right_block) override; - - /// For bidirectional range hash join, there may be multiple joined blocks - std::vector insertLeftBlockToRangeBucketsAndJoin(Block left_block) override; - std::vector insertRightBlockToRangeBucketsAndJoin(Block right_block) override; + /// \returns + std::pair insertLeftDataBlockAndJoin(LightChunk && chunk) override + { + return doInsertDataBlockAndJoin(std::move(chunk)); + } + std::pair insertRightDataBlockAndJoin(LightChunk && chunk) override + { + return doInsertDataBlockAndJoin(std::move(chunk)); + } + HashJoinType type() const override { return hash_joins[0]->data->type(); } bool emitChangeLog() const override { return hash_joins[0]->data->emitChangeLog(); } bool bidirectionalHashJoin() const override { return hash_joins[0]->data->bidirectionalHashJoin(); } bool rangeBidirectionalHashJoin() const override { return hash_joins[0]->data->rangeBidirectionalHashJoin(); } @@ -81,6 +79,8 @@ class ConcurrentHashJoin final : public IHashJoin return hash_joins[0]->data->rightJoinStreamDescription(); } + const Block & getOutputHeader() const override { return hash_joins[0]->data->getOutputHeader(); } + void serialize(WriteBuffer &) const override; void deserialize(ReadBuffer &) override; @@ -88,13 +88,10 @@ class ConcurrentHashJoin final : public IHashJoin private: template - Block insertBlockAndJoin(Block & block); - - template - std::vector insertBlockToRangeBucketAndJoin(Block block); + std::pair doInsertDataBlockAndJoin(LightChunk && chunk); - IColumn::Selector selectDispatchBlock(const std::vector & key_column_positions, const Block & from_block); - BlocksWithShard dispatchBlock(const std::vector & key_column_positions, Block && from_block); + IColumn::Selector selectDispatchBlock(const std::vector & key_column_positions, const LightChunk & from_block); + LightChunksWithShard dispatchBlock(const std::vector & key_column_positions, LightChunk && from_block); void doSerialize(WriteBuffer &) const; void doDeserialize(ReadBuffer &); @@ -103,7 +100,7 @@ class ConcurrentHashJoin final : public IHashJoin struct InternalHashJoin { std::mutex mutex; - std::unique_ptr data; + std::shared_ptr data; }; std::shared_ptr table_join; diff --git a/src/Interpreters/Streaming/DynamicEnrichmentHashJoin.h b/src/Interpreters/Streaming/DynamicEnrichmentHashJoin.h new file mode 100644 index 00000000000..6474e0a8430 --- /dev/null +++ b/src/Interpreters/Streaming/DynamicEnrichmentHashJoin.h @@ -0,0 +1,377 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +class DynamicEnrichmentHashJoin final : public HashJoin +{ +public: + using HashJoin:HashJoin; + + ~DynamicEnrichmentHashJoin() noexcept override { } + + /// \returns + std::pair insertLeftDataBlockAndJoin(LightChunk && chunk) override; + std::pair insertRightDataBlockAndJoin(LightChunk && chunk) override; + + UInt64 keepVersions() const { return right_data.join_stream_desc->keep_versions; } + + const TableJoin & getTableJoin() const override { return *table_join; } + + void checkTypesOfKeys(const Block & block) const override; + + bool isFilled() const override { return false; } + + /** For RIGHT and FULL JOINs. + * A stream that will contain default values from left table, joined with rows from right table, that was not joined before. + * Use only after all calls to joinBlock was done. + * left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside). + */ + std::shared_ptr + getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override; + + /// Number of keys in all built JOIN maps. + size_t getTotalRowCount() const final; + /// Sum size in bytes of all buffers, used for JOIN maps and for all memory pools. + size_t getTotalByteCount() const final; + + bool alwaysReturnsEmptySet() const final; + + void getKeyColumnPositions( + std::vector & left_key_column_positions, + std::vector & right_key_column_positions, + bool include_asof_key_column) const override + { + auto calc_key_positions = [](const auto & key_column_names, const auto & header, auto & key_column_positions) { + key_column_positions.reserve(key_column_names.size()); + for (const auto & name : key_column_names) + key_column_positions.push_back(header.getPositionByName(name)); + }; + + calc_key_positions(table_join->getOnlyClause().key_names_left, left_data.join_stream_desc->input_header, left_key_column_positions); + calc_key_positions( + table_join->getOnlyClause().key_names_right, right_data.join_stream_desc->input_header, right_key_column_positions); + + if (!include_asof_key_column && (streaming_strictness == Strictness::Range || streaming_strictness == Strictness::Asof)) + { + left_key_column_positions.pop_back(); + right_key_column_positions.pop_back(); + } + + assert(!left_key_column_positions.empty()); + assert(!right_key_column_positions.empty()); + } + + size_t dataBlockSize() const noexcept { return table_join->dataBlockSize(); } + JoinKind getKind() const { return kind; } + JoinStrictness getStrictness() const { return strictness; } + Kind getStreamingKind() const { return streaming_kind; } + Strictness getStreamingStrictness() const { return streaming_strictness; } + const std::optional & getAsofType() const { return asof_type; } + ASOFJoinInequality getAsofInequality() const { return asof_inequality; } + bool anyTakeLastRow() const { return any_take_last_row; } + + const ColumnWithTypeAndName & rightAsofKeyColumn() const { return right_data.asof_key_column; } + const ColumnWithTypeAndName & leftAsofKeyColumn() const { return left_data.asof_key_column; } + + const Block & getOutputHeader() const { return output_header; } + + void serialize(WriteBuffer & wb) const override; + void deserialize(ReadBuffer & rb) override; + + void cancel() override { } + + using RefListMultiple = RowRefListMultiple; + using RefListMultipleRef = RowRefListMultipleRef; + using RefListMultipleRefPtr = RowRefListMultipleRefPtr; + + using MapsOne = HashMapsTemplate>; + using MapsMultiple = HashMapsTemplate>; + using MapsAll = HashMapsTemplate>; + using MapsAsof = HashMapsTemplate>; + using MapsRangeAsof = HashMapsTemplate>; + using MapsVariant = std::variant; + + HashMapSizes sizesOfMapsVariant(const MapsVariant & maps_variant) const; + HashType getHashMethodType() const { return hash_method_type; } + + /// bool isUsed(size_t off) const { return used_flags.getUsedSafe(off); } + /// bool isUsed(const Block * block_ptr, size_t row_idx) const { return used_flags.getUsedSafe(block_ptr, row_idx); } + + friend struct BufferedStreamData; + + /// For changelog emit + struct JoinResults + { + JoinResults(const Block & header_) : sample_block(header_), blocks(metrics), maps(std::make_unique()) { } + + String joinMetricsString(const HashJoin * join) const; + + mutable std::mutex mutex; + + Block sample_block; + + SERDE CachedBlockMetrics metrics; + SERDE JoinDataBlockList blocks; + + /// Arena pool to hold the (string) keys + Arena pool; + + /// Building hash map for joined blocks, then we can find previous + /// join blocks quickly by using joined keys + SERDE std::unique_ptr maps; + }; + + JoinStreamDescriptionPtr leftJoinStreamDescription() const noexcept override { return left_data.join_stream_desc; } + JoinStreamDescriptionPtr rightJoinStreamDescription() const noexcept override { return right_data.join_stream_desc; } + +private: + /// For non-bidirectional hash join + void insertRightBlock(Block right_block) override; + void joinLeftBlock(Block & left_block) override; + + /// For bidirectional hash join + /// There are 2 blocks returned : joined block via parameter and retracted block via returned-value if there is + Block insertLeftBlockAndJoin(Block & left_block) override; + Block insertRightBlockAndJoin(Block & right_block) override; + + /// For bidirectional range hash join, there may be multiple joined blocks + std::vector insertLeftBlockToRangeBucketsAndJoin(Block left_block) override; + std::vector insertRightBlockToRangeBucketsAndJoin(Block right_block) override; + + void checkJoinSemantic() const; + void init(); + void initBufferedData(); + void initHashMaps(std::vector & all_maps); + void dataMapInit(MapsVariant &); + + /// For versioned kv / changelog kv + void initLeftPrimaryKeyHashTable(); + void initRightPrimaryKeyHashTable(); + void reviseJoinStrictness(); + + void initLeftBlockStructure(); + void initRightBlockStructure(); + struct JoinData; + void initBlockStructure(JoinData & join_data, const Block & table_keys, const Block & sample_block_with_columns_to_add) const; + + void chooseHashMethod(); + + void validateAsofJoinKey(); + + void checkLimits() const; + + const Block & savedLeftBlockSample() const { return left_data.buffered_data->sample_block; } + const Block & savedRightBlockSample() const { return right_data.buffered_data->sample_block; } + + /// Modify left or right block (update structure according to sample block) to save it in block list + template + Block prepareBlock(const Block & block) const; + + /// Remove columns which are not needed to be projected + static Block prepareBlockToSave(const Block & block, const Block & sample_block); + + /// For range bidirectional hash join + template + std::vector insertBlockToRangeBucketsAndJoin(Block block); + + template + void doInsertBlock(Block block, HashBlocksPtr target_hash_blocks, std::vector row_refs = {}); + + /// For bidirectional hash join + /// Return retracted block if needs emit changelog, otherwise empty block + template + Block joinBlockWithHashTable(Block & block, HashBlocksPtr target_hash_blocks); + + template + void doJoinBlockWithHashTable(Block & block, HashBlocksPtr target_hash_blocks); + + /// Join left block with right hash table or join right block with left hash table + template + void joinBlockImpl(Block & block, const Block & block_with_columns_to_add, const std::vector & maps_) const; + + /// `retract` does + /// 1) Add result_block to JoinResults + /// 2) Retract previous joined block. changelog emit + Block retract(const Block & result_block); + + /// When left stream joins right stream, watermark calculation is more complicated. + /// Firstly, each stream has its own watermark and progresses separately. + /// Secondly, `combined_watermark` is calculated periodically according the watermarks in the left and right streams + void calculateWatermark(); + + /// For versioned-kv / changelog-kv join + /// Check if this ia row with a new primary key or an existing primary key + /// For the later, erase the element in the target join linked list + std::vector eraseOrAppendForPartialPrimaryKeyJoin(const Block & block); + + template + std::vector eraseOrAppendForPartialPrimaryKeyJoin(Map & map, ColumnRawPtrs && primary_key_columns); + + /// If the left / right_block is a retraction block : rows in `_tp_delta` column all have `-1` + /// We erase the previous key / values from hash table + /// Use for append JOIN changelog_kv / versioned_kv case + /// @return true if keys are erased, otherwise false + template + void eraseExistingKeys(Block & block, JoinData & join_data); + inline bool isRetractBlock(const Block & block, const JoinStreamDescription & join_stream_desc); + + template + void doEraseExistingKeys( + Map & map, + const DB::Block & right_block, + ColumnRawPtrs && key_columns, + const Sizes & key_size, + const std::vector & skip_columns, + bool delete_key); + + /// For bidirectional join, the input is a retract block + template + std::optional eraseExistingKeysAndRetractJoin(Block & left_block); + + template + void transformToOutputBlock(Block & joined_block) const; + +private: + /// Only SERDE the clauses of join + SERDE std::shared_ptr table_join; + JoinKind kind; + JoinStrictness strictness; + + SERDE Kind streaming_kind; + SERDE Strictness streaming_strictness; + + bool any_take_last_row; /// Overwrite existing values when encountering the same key again + /// Only SERDE for ASOF JOIN or RANGE JOIN + SERDE std::optional asof_type; + SERDE ASOFJoinInequality asof_inequality; + + /// Cache data members which avoid re-computation for every join + std::vector> cond_column_names; + + SERDE std::vector key_sizes; + + /// versioned-kv and changelog-kv both needs define a primary key + /// If join on partial primary key columns (or no primary key column is expressed in join on clause), + /// init this data structure + struct PrimaryKeyHashTable + { + PrimaryKeyHashTable(HashType hash_method_type_, Sizes && key_size_); + + HashType hash_method_type; + + Sizes key_size; + + /// For key allocation + Arena pool; + + /// Hash maps variants indexed by primary key columns + SERDE HashMapsTemplate map; + }; + using PrimaryKeyHashTablePtr = std::shared_ptr; + + struct JoinData + { + explicit JoinData(JoinStreamDescriptionPtr join_stream_desc_) : join_stream_desc(std::move(join_stream_desc_)) + { + assert(join_stream_desc); + } + + JoinStreamDescriptionPtr join_stream_desc; + + ColumnWithTypeAndName asof_key_column; + + /// Only for kv join. Used to maintain all unique keys + PrimaryKeyHashTablePtr primary_key_hash_table; + + SERDE BufferedStreamDataPtr buffered_data; + + /// Block with columns from the (left or) right-side table except key columns. + Block sample_block_with_columns_to_add; + /// Block with key columns in the same order they appear in the (left or) right-side table (duplicates appear once). + Block table_keys; + /// Block with key columns (left or) right-side table keys that are needed in result (would be attached after joined columns). + Block required_keys; + /// Left table column names that are sources for required_right_keys columns + std::vector required_keys_sources; + + bool validated_join_key_types = false; + }; + + friend void serialize(const JoinData & join_data, WriteBuffer & wb); + friend void deserialize(JoinData & join_data, ReadBuffer & rb); + + /// Note: when left block joins right hashtable, use `right_data` + SERDE JoinData right_data; + /// Note: when right block joins left hashtable, use `left_data` + SERDE JoinData left_data; + + /// Right table data. StorageJoin shares it between many Join objects. + /// Flags that indicate that particular row already used in join. + /// Flag is stored for every record in hash map. + /// Number of this flags equals to hashtable buffer size (plus one for zero value). + /// Changes in hash table broke correspondence, + /// mutable JoinStuff::JoinUsedFlags used_flags; + + SERDE HashType hash_method_type; + + /// Only SERDE when emit_changelog is true + SERDE std::optional join_results; + + bool retract_push_down = false; + bool emit_changelog = false; + bool bidirectional_hash_join = true; + bool range_bidirectional_hash_join = true; + + /// Delta column in right-left-join + /// `rlj` -> right-left-join + std::optional left_delta_column_position_rlj; + std::optional right_delta_column_position_rlj; + + /// `lrj` -> left-right-join + std::optional left_delta_column_position_lrj; + std::optional right_delta_column_position_lrj; + + UInt64 join_max_cached_bytes = 0; + + Block output_header; + + /// Combined timestamp watermark progression of left stream and right stream + SERDE std::atomic_int64_t combined_watermark = 0; + + struct JoinGlobalMetrics + { + size_t total_join = 0; + size_t left_block_and_right_range_bucket_no_intersection_skip = 0; + size_t right_block_and_left_range_bucket_no_intersection_skip = 0; + + std::string string() const + { + return fmt::format( + "total_join={} " + "left_block_and_right_range_bucket_no_intersection_skip={} right_block_and_left_range_bucket_no_intersection_skip={}", + total_join, + left_block_and_right_range_bucket_no_intersection_skip, + right_block_and_left_range_bucket_no_intersection_skip); + } + }; + friend void serialize(const JoinGlobalMetrics & join_metrics, WriteBuffer & wb); + friend void deserialize(JoinGlobalMetrics & join_metrics, ReadBuffer & rb); + + SERDE JoinGlobalMetrics join_metrics; + + Poco::Logger * logger; +}; + +struct HashJoinMapsVariants +{ + /// \return Number of keys in the map + HashMapSizes sizes(const HashJoin * join) const; + std::vector map_variants; +}; + +} +} diff --git a/src/Interpreters/Streaming/HashJoin.cpp b/src/Interpreters/Streaming/HashJoin.cpp index 2e15d8e2bf0..a1f05c45834 100644 --- a/src/Interpreters/Streaming/HashJoin.cpp +++ b/src/Interpreters/Streaming/HashJoin.cpp @@ -14,8 +14,14 @@ #include #include #include +#include +#include +#include +#include #include +#include #include +#include #include #include #include @@ -586,13 +592,7 @@ struct Inserter } static ALWAYS_INLINE void insertMultiple( - const HashJoin & join, - Map & map, - KeyGetter & key_getter, - JoinDataBlockList * blocks, - size_t original_row, - size_t row, - Arena & pool) + const HashJoin & join, Map & map, KeyGetter & key_getter, JoinDataBlockList * blocks, size_t original_row, size_t row, Arena & pool) { auto emplace_result = key_getter.emplaceKey(map, original_row, pool); auto * mapped = &emplace_result.getMapped(); @@ -824,6 +824,68 @@ void HashJoin::validate(const JoinCombinationType & join_combination) magic_enum::enum_name(std::get<3>(join_combination))); } +HashJoinPtr HashJoin::create( + std::shared_ptr table_join_, JoinStreamDescriptionPtr left_join_stream_desc, JoinStreamDescriptionPtr right_join_stream_desc) +{ + if (!table_join_->oneDisjunct()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Stream to Stream join only supports only one disjunct join clause"); + + /// For streaming join, we don't support inline JOIN ON predicate like `left.value > 10` in the following query example + /// since stream query will need buffer more additional non-joined data in-memory + /// SELECT * FROM left INNER JOIN right ON left.key = right.key AND left.value > 10 and right.value > 80; + /// User shall use WHERE predicate like + /// SELECT * FROM (SELECT * FROM left WHERE left.value > 10) as left INNER JOIN right ON left.key = right.key + for (const auto & on_expr : table_join_->getClauses()) + { + const auto & on_expr_cond_column_names = on_expr.condColumnNames(); + if (!on_expr_cond_column_names.first.empty() || !on_expr_cond_column_names.second.empty()) + throw Exception( + ErrorCodes::NOT_IMPLEMENTED, "Streaming join doesn't support predicates in JOIN ON clause. Use WHERE predicate instead"); + } + + auto join_kind = table_join_->kind(); + auto join_strictness = table_join_->strictness(); + validate( + {left_join_stream_desc->data_stream_semantic.toStorageSemantic(), + join_kind, + join_strictness, + right_join_stream_desc->data_stream_semantic.toStorageSemantic()}); + + [[maybe_unused]] auto streaming_kind = Streaming::toJoinKind(join_kind); + auto streaming_strictness = Streaming::toJoinStrictness(join_strictness, table_join_->isRangeJoin()); + + /// We don't even care the left stream semantic since if right stream is versioned-kv or changelog-kv + /// we will use `Multiple` list to hold the values since users may use partial primary key to join + if (isChangelogDataStream(right_join_stream_desc->data_stream_semantic)) + streaming_strictness = Streaming::Strictness::Multiple; + + /// Choose join algorithm + switch (streaming_strictness) + { + case Strictness::Asof: + return std::make_shared( + std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc)); + case Strictness::Latest: + return std::make_shared( + std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc)); + case Strictness::All: + return std::make_shared( + std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc)); + case Strictness::Range: + return std::make_shared( + std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc)); + case Strictness::Multiple: { + if (isChangelogDataStream(left_join_stream_desc->data_stream_semantic)) + return std::make_shared( + std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc)); + else + return std::make_shared( + std::move(table_join_), std::move(left_join_stream_desc), std::move(right_join_stream_desc)); + } + } + UNREACHABLE(); +} + HashJoin::HashJoin( std::shared_ptr table_join_, JoinStreamDescriptionPtr left_join_stream_desc_, @@ -1319,12 +1381,62 @@ Block HashJoin::prepareBlock(const Block & block) const return prepareBlockToSave(block, savedRightBlockSample()); } -bool HashJoin::addJoinedBlock(const Block & block, bool /*check_limits*/) +std::pair HashJoin::insertLeftDataBlockAndJoin(LightChunk && chunk) +{ + if (rangeBidirectionalHashJoin()) + { + auto block = left_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns()); + auto joined_block = concatenateBlocks(insertLeftBlockToRangeBucketsAndJoin(std::move(block))); + return {LightChunk{}, std::move(joined_block)}; + } + else if (bidirectionalHashJoin()) + { + auto block = left_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns()); + auto retracted_block = insertLeftBlockAndJoin(block); + return {std::move(retracted_block), std::move(block)}; + } + else + { + auto block = left_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns()); + joinLeftBlock(block); + return {LightChunk{}, std::move(block)}; + } +} + +std::pair HashJoin::insertRightDataBlockAndJoin(LightChunk && chunk) { - doInsertBlock(block, right_data.buffered_data->getCurrentHashBlocksPtr()); /// Copy the block + if (rangeBidirectionalHashJoin()) + { + auto block = right_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns()); + auto joined_block = concatenateBlocks(insertRightBlockToRangeBucketsAndJoin(std::move(block))); + return {LightChunk{}, std::move(joined_block)}; + } + else if (bidirectionalHashJoin()) + { + auto block = right_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns()); + auto retracted_block = insertRightBlockAndJoin(block); + return {std::move(retracted_block), std::move(block)}; + } + else + { + auto block = right_data.join_stream_desc->input_header.cloneWithColumns(chunk.detachColumns()); + insertRightBlock(std::move(block)); + return {}; + } +} + +bool HashJoin::addJoinedBlock(const Block & right_block, bool /*check_limits*/) +{ + insertRightDataBlockAndJoin(right_block); return true; } +void HashJoin::joinBlock(Block & block, std::shared_ptr & /*not_processed*/) +{ + auto joined_result = insertLeftDataBlockAndJoin(block); + block = left_data.join_stream_desc->input_header.cloneWithColumns(joined_result.second.detachColumns()); +} + template void HashJoin::doInsertBlock(Block block, HashBlocksPtr target_hash_blocks) { @@ -1583,11 +1695,6 @@ void HashJoin::checkTypesOfKeys(const Block & block) const JoinCommon::checkTypesOfKeys(block, onexpr.key_names_left, right_data.table_keys, onexpr.key_names_right); } -void HashJoin::joinBlock(Block & block, ExtraBlockPtr & /*not_processed*/) -{ - joinLeftBlock(block); -} - template void HashJoin::doJoinBlockWithHashTable(Block & block, HashBlocksPtr target_hash_blocks) { diff --git a/src/Interpreters/Streaming/HashJoin.h b/src/Interpreters/Streaming/HashJoin.h index a7e5071bfdc..ebba1936489 100644 --- a/src/Interpreters/Streaming/HashJoin.h +++ b/src/Interpreters/Streaming/HashJoin.h @@ -91,7 +91,7 @@ namespace Streaming struct HashJoinMapsVariants; -class HashJoin final : public IHashJoin +class HashJoin : public IHashJoin { public: /// - bool @@ -102,6 +102,11 @@ class HashJoin final : public IHashJoin static const SupportMatrix support_matrix; static void validate(const JoinCombinationType & join_combination); + static HashJoinPtr create( + std::shared_ptr table_join_, + JoinStreamDescriptionPtr left_join_stream_desc_, + JoinStreamDescriptionPtr right_join_stream_desc_); + public: HashJoin( std::shared_ptr table_join_, @@ -116,18 +121,9 @@ class HashJoin final : public IHashJoin void transformHeader(Block & header) override; - /// For non-bidirectional hash join - void insertRightBlock(Block right_block) override; - void joinLeftBlock(Block & left_block) override; - - /// For bidirectional hash join - /// There are 2 blocks returned : joined block via parameter and retracted block via returned-value if there is - Block insertLeftBlockAndJoin(Block & left_block) override; - Block insertRightBlockAndJoin(Block & right_block) override; - - /// For bidirectional range hash join, there may be multiple joined blocks - std::vector insertLeftBlockToRangeBucketsAndJoin(Block left_block) override; - std::vector insertRightBlockToRangeBucketsAndJoin(Block right_block) override; + /// \returns + std::pair insertLeftDataBlockAndJoin(LightChunk && chunk) override; + std::pair insertRightDataBlockAndJoin(LightChunk && chunk) override; /// "Legacy API", use insertRightBlock() bool addJoinedBlock(const Block & block, bool check_limits) override; @@ -201,7 +197,7 @@ class HashJoin final : public IHashJoin const ColumnWithTypeAndName & rightAsofKeyColumn() const { return right_data.asof_key_column; } const ColumnWithTypeAndName & leftAsofKeyColumn() const { return left_data.asof_key_column; } - const Block & getOutputHeader() const { return output_header; } + const Block & getOutputHeader() const override { return output_header; } void serialize(WriteBuffer & wb) const override; void deserialize(ReadBuffer & rb) override; @@ -253,6 +249,19 @@ class HashJoin final : public IHashJoin JoinStreamDescriptionPtr rightJoinStreamDescription() const noexcept override { return right_data.join_stream_desc; } private: + /// For non-bidirectional hash join + void insertRightBlock(Block right_block); + void joinLeftBlock(Block & left_block); + + /// For bidirectional hash join + /// There are 2 blocks returned : joined block via parameter and retracted block via returned-value if there is + Block insertLeftBlockAndJoin(Block & left_block); + Block insertRightBlockAndJoin(Block & right_block); + + /// For bidirectional range hash join, there may be multiple joined blocks + std::vector insertLeftBlockToRangeBucketsAndJoin(Block left_block); + std::vector insertRightBlockToRangeBucketsAndJoin(Block right_block); + void checkJoinSemantic() const; void init(); void initBufferedData(); diff --git a/src/Interpreters/Streaming/IHashJoin.h b/src/Interpreters/Streaming/IHashJoin.h index 95725ff6155..543685d1d67 100644 --- a/src/Interpreters/Streaming/IHashJoin.h +++ b/src/Interpreters/Streaming/IHashJoin.h @@ -5,8 +5,23 @@ namespace DB { +struct LightChunk; + namespace Streaming { +enum class HashJoinType : uint8_t +{ + /// Dynamic enrichment hash join + Asof = 1, /// append-only (left/inner) asof join append-only + Latest = 2, /// append-only (left/inner) latest join append-only + Changelog = 3, /// append-only (left/inner) all join changelog, for exmaple sources: `versioned_kv, changelog_kv and changelog(...)` + + /// Bidirectional hash join + BidirectionalAll = 4, /// append-only inner all join append-only + BidirectionalRange = 5, /// append-only inner all join append-only with on clause `date_diff_within(...) + BidirectionalChangelog = 6, /// changelog inner all join changelog, for exmaple sources: `versioned_kv, changelog_kv and changelog(...)` +}; + class IHashJoin : public IJoin { public: @@ -14,19 +29,11 @@ class IHashJoin : public IJoin virtual void transformHeader(Block & header) = 0; - /// For non-bidirectional hash join - virtual void insertRightBlock(Block right_block) = 0; - virtual void joinLeftBlock(Block & left_block) = 0; - - /// For bidirectional hash join - /// There are 2 blocks returned : joined block via parameter and retracted block via returned-value if there is - virtual Block insertLeftBlockAndJoin(Block & left_block) = 0; - virtual Block insertRightBlockAndJoin(Block & right_block) = 0; - - /// For bidirectional range hash join, there may be multiple joined blocks - virtual std::vector insertLeftBlockToRangeBucketsAndJoin(Block left_block) = 0; - virtual std::vector insertRightBlockToRangeBucketsAndJoin(Block right_block) = 0; + /// \returns + virtual std::pair insertLeftDataBlockAndJoin(LightChunk && chunk) = 0; + virtual std::pair insertRightDataBlockAndJoin(LightChunk && chunk) = 0; + virtual HashJoinType type() const = 0; virtual bool emitChangeLog() const = 0; virtual bool bidirectionalHashJoin() const = 0; virtual bool rangeBidirectionalHashJoin() const = 0; @@ -44,6 +51,8 @@ class IHashJoin : public IJoin virtual JoinStreamDescriptionPtr leftJoinStreamDescription() const noexcept = 0; virtual JoinStreamDescriptionPtr rightJoinStreamDescription() const noexcept = 0; + virtual const Block & getOutputHeader() const = 0; + virtual void serialize(WriteBuffer & wb) const = 0; virtual void deserialize(ReadBuffer & rb) = 0; diff --git a/src/Interpreters/Streaming/LatestHashJoin.h b/src/Interpreters/Streaming/LatestHashJoin.h new file mode 100644 index 00000000000..091e7c4c716 --- /dev/null +++ b/src/Interpreters/Streaming/LatestHashJoin.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +class LatestHashJoin final : public HashJoin +{ +public: + using HashJoin::HashJoin; + HashJoinType type() const override { return HashJoinType::Latest; } +}; + +} +} diff --git a/src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp b/src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp index e4dd766e701..e90601bb9e0 100644 --- a/src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp +++ b/src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp @@ -234,7 +234,7 @@ std::shared_ptr initTableJoin( return table_join; } -std::shared_ptr initHashJoin( +Streaming::HashJoinPtr initHashJoin( std::shared_ptr table_join, const Block & left_header, const Block & right_header, UInt64 keep_versions, ContextPtr context) { const auto & tables = table_join->getTablesWithColumns(); @@ -246,7 +246,7 @@ std::shared_ptr initHashJoin( tables[1], right_header, tables[1].output_data_stream_semantic, keep_versions, 0, 0); right_join_stream_desc->calculateColumnPositions(table_join->strictness()); - auto join = std::make_shared(table_join, std::move(left_join_stream_desc), std::move(right_join_stream_desc)); + auto join = Streaming::HashJoin::create(table_join, std::move(left_join_stream_desc), std::move(right_join_stream_desc)); auto output_header = Streaming::JoinTransform::transformHeader(left_header.cloneEmpty(), std::dynamic_pointer_cast(join)); @@ -255,7 +255,7 @@ std::shared_ptr initHashJoin( return join; } -void serdeAndCheck(const Streaming::HashJoin & join, Streaming::HashJoin & recovered_join, std::string_view msg) +void serdeAndCheck(const Streaming::IHashJoin & join, Streaming::IHashJoin & recovered_join, std::string_view msg) { WriteBufferFromOwnString wb; join.serialize(wb); @@ -487,42 +487,25 @@ void commonTest( }; auto join = initHashJoin(table_join, convert_left_block(left_header), convert_right_block(right_header), keep_versions, context); + auto output_header = join->getOutputHeader(); - auto do_join_step = [&](Streaming::HashJoin & join_, ToJoinStep to_join_step) -> JoinResults { + auto do_join_step = [&](Streaming::IHashJoin & join_, ToJoinStep to_join_step) -> JoinResults { auto & [pos_, block_, _] = to_join_step; - if (join_.rangeBidirectionalHashJoin()) - { - if (pos_ == ToJoinStep::LEFT) - return {concatenateBlocks(join_.insertLeftBlockToRangeBucketsAndJoin(convert_left_block(block_))), Block{}}; - else - return {concatenateBlocks(join_.insertRightBlockToRangeBucketsAndJoin(convert_right_block(block_))), Block{}}; - } - else if (join_.bidirectionalHashJoin()) - { - if (pos_ == ToJoinStep::LEFT) - { - auto retracted_block = join_.insertLeftBlockAndJoin(convert_left_block(block_)); - return {std::move(block_), retracted_block}; - } - else - { - auto retracted_block = join_.insertRightBlockAndJoin(convert_right_block(block_)); - return {std::move(block_), std::move(retracted_block)}; - } - } + std::pair join_result; + if (pos_ == ToJoinStep::LEFT) + join_result = join_.insertLeftDataBlockAndJoin(convert_left_block(block_)); else - { - if (pos_ == ToJoinStep::LEFT) - { - join_.joinLeftBlock(convert_left_block(block_)); - return {std::move(block_), Block{}}; - } - else - { - join_.insertRightBlock(convert_right_block(block_)); - return {Block{}, Block{}}; - } - } + join_result = join_.insertRightDataBlockAndJoin(convert_right_block(block_)); + + auto & [retracted_block, block] = join_result; + JoinResults results; + if (join_result.second.rows() > 0) + results.block = output_header.cloneWithColumns(block.detachColumns()); + + if (retracted_block.rows() > 0) + results.retracted_block = output_header.cloneWithColumns(retracted_block.detachColumns()); + + return results; }; /// Serde and check initiailized hash join diff --git a/src/Processors/Transforms/Streaming/AsofJoinTransform.h b/src/Processors/Transforms/Streaming/AsofJoinTransform.h new file mode 100644 index 00000000000..abc82fc5f8c --- /dev/null +++ b/src/Processors/Transforms/Streaming/AsofJoinTransform.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently +/// left stream -> ... -> +/// \ +/// AsofJoinTransform +/// / +/// right stream -> ... -> +class AsofJoinTransform final : public JoinTransform +{ +public: + using JoinTransform::JoinTransform; + String getName() const override { return "StreamingAsofJoinTransform"; } +}; +} +} diff --git a/src/Processors/Transforms/Streaming/AsofJoinTransformWithAlignment.h b/src/Processors/Transforms/Streaming/AsofJoinTransformWithAlignment.h new file mode 100644 index 00000000000..5e5a7b69e12 --- /dev/null +++ b/src/Processors/Transforms/Streaming/AsofJoinTransformWithAlignment.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +namespace DB::Streaming +{ + +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently and have watermark / timestamp +/// alignment for temporal join scenarios. +/// left stream -> ... -> +/// \ +/// AsofJoinTransformWithAlignment +/// / +/// right stream -> ... -> +class AsofJoinTransformWithAlignment final : public JoinTransformWithAlignment +{ +public: + using JoinTransformWithAlignment::JoinTransformWithAlignment; + String getName() const override { return "StreamingAsofJoinTransformWithAlignment"; } +}; +} diff --git a/src/Processors/Transforms/Streaming/BidirectionalAllJoinTransform.h b/src/Processors/Transforms/Streaming/BidirectionalAllJoinTransform.h new file mode 100644 index 00000000000..d5636df938d --- /dev/null +++ b/src/Processors/Transforms/Streaming/BidirectionalAllJoinTransform.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently +/// left stream -> ... -> +/// \ +/// BidirectionalAllJoinTransform +/// / +/// right stream -> ... -> +class BidirectionalAllJoinTransform final : public JoinTransform +{ +public: + using JoinTransform::JoinTransform; + String getName() const override { return "StreamingBidirectionalAllJoinTransform"; } +}; +} +} diff --git a/src/Processors/Transforms/Streaming/BidirectionalAllJoinTransformWithAlignment.h b/src/Processors/Transforms/Streaming/BidirectionalAllJoinTransformWithAlignment.h new file mode 100644 index 00000000000..383117d78ca --- /dev/null +++ b/src/Processors/Transforms/Streaming/BidirectionalAllJoinTransformWithAlignment.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +namespace DB::Streaming +{ + +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently and have watermark / timestamp +/// alignment for temporal join scenarios. +/// left stream -> ... -> +/// \ +/// BidirectionalAllJoinTransformWithAlignment +/// / +/// right stream -> ... -> +class BidirectionalAllJoinTransformWithAlignment final : public JoinTransformWithAlignment +{ +public: + using JoinTransformWithAlignment::JoinTransformWithAlignment; + String getName() const override { return "StreamingBidirectionalAllJoinTransformWithAlignment"; } +}; +} diff --git a/src/Processors/Transforms/Streaming/BidirectionalChangelogJoinTransform.h b/src/Processors/Transforms/Streaming/BidirectionalChangelogJoinTransform.h new file mode 100644 index 00000000000..84a5ae9aa10 --- /dev/null +++ b/src/Processors/Transforms/Streaming/BidirectionalChangelogJoinTransform.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently +/// left stream -> ... -> +/// \ +/// BidirectionalChangelogJoinTransform +/// / +/// right stream -> ... -> +class BidirectionalChangelogJoinTransform final : public JoinTransform +{ +public: + using JoinTransform::JoinTransform; + String getName() const override { return "StreamingBidirectionalChangelogJoinTransform"; } +}; +} +} diff --git a/src/Processors/Transforms/Streaming/BidirectionalChangelogJoinTransformWithAlignment.h b/src/Processors/Transforms/Streaming/BidirectionalChangelogJoinTransformWithAlignment.h new file mode 100644 index 00000000000..f617ae89c5d --- /dev/null +++ b/src/Processors/Transforms/Streaming/BidirectionalChangelogJoinTransformWithAlignment.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +namespace DB::Streaming +{ + +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently and have watermark / timestamp +/// alignment for temporal join scenarios. +/// left stream -> ... -> +/// \ +/// BidirectionalChangelogJoinTransformWithAlignment +/// / +/// right stream -> ... -> +class BidirectionalChangelogJoinTransformWithAlignment final : public JoinTransformWithAlignment +{ +public: + using JoinTransformWithAlignment::JoinTransformWithAlignment; + String getName() const override { return "StreamingBidirectionalChangelogJoinTransformWithAlignment"; } +}; +} diff --git a/src/Processors/Transforms/Streaming/BidirectionalRangeJoinTransform.h b/src/Processors/Transforms/Streaming/BidirectionalRangeJoinTransform.h new file mode 100644 index 00000000000..57d34009139 --- /dev/null +++ b/src/Processors/Transforms/Streaming/BidirectionalRangeJoinTransform.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently +/// left stream -> ... -> +/// \ +/// BidirectionalRangeJoinTransform +/// / +/// right stream -> ... -> +class BidirectionalRangeJoinTransform final : public JoinTransform +{ +public: + using JoinTransform::JoinTransform; + String getName() const override { return "StreamingBidirectionalRangeJoinTransform"; } +}; +} +} diff --git a/src/Processors/Transforms/Streaming/BidirectionalRangeJoinTransformWithAlignment.h b/src/Processors/Transforms/Streaming/BidirectionalRangeJoinTransformWithAlignment.h new file mode 100644 index 00000000000..cbb2c7ec4df --- /dev/null +++ b/src/Processors/Transforms/Streaming/BidirectionalRangeJoinTransformWithAlignment.h @@ -0,0 +1,123 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB::Streaming +{ + +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently and have watermark / timestamp +/// alignment for temporal join scenarios. +/// left stream -> ... -> +/// \ +/// BidirectionalRangeJoinTransformWithAlignment +/// / +/// right stream -> ... -> +class BidirectionalRangeJoinTransformWithAlignment final : public IProcessor +{ +public: + BidirectionalRangeJoinTransformWithAlignment( + Block left_input_header, Block right_input_header, Block output_header, HashJoinPtr join_, UInt64 join_max_cached_bytes_); + + String getName() const override { return "StreamingBidirectionalRangeJoinTransformWithAlignment"; } + Status prepare() override; + void work() override; + + void checkpoint(CheckpointContextPtr ckpt_ctx) override; + void recover(CheckpointContextPtr ckpt_ctx) override; + + static Block transformHeader(Block header, const HashJoinPtr & join); + +private: + struct InputPortWithData + { + explicit InputPortWithData(InputPort * input_port_) : input_port(input_port_) { } + + void add(Chunk && chunk); + + bool hasCompleteChunks() const noexcept { return !input_chunks.empty() && !(input_chunks.size() == 1 && required_update_processing); } + + bool isFull() const noexcept { return need_buffer_data_to_align && hasCompleteChunks(); } + + void serialize(WriteBuffer & wb) const; + void deserialize(ReadBuffer & rb); + + InputPort * input_port; + + /// Input state + /// NOTE: Assume the input chunk is time-ordered + SERDE std::list input_chunks; + /// For join transform, we keep track watermark by itself + SERDE Int64 watermark = INVALID_WATERMARK; + NO_SERDE Int64 last_data_ts = 0; + NO_SERDE CheckpointContextPtr ckpt_ctx = nullptr; + NO_SERDE bool muted = false; + NO_SERDE bool required_update_processing = false; + + /// Input description + std::optional watermark_column_position; + DataTypePtr watermark_column_type; + bool need_buffer_data_to_align; + }; + + Status prepareLeftInput(); + Status prepareRightInput(); + + template + void processInputData(LightChunk & chunk); + + bool isInputInQuiesce(const InputPortWithData & input_with_data) const noexcept + { + return DB::MonotonicMilliseconds::now() - input_with_data.last_data_ts >= quiesce_threshold_ms; + } + + void muteLeftInput() noexcept + { + left_input.muted = true; + ++stats.left_input_muted; + } + + void muteRightInput() noexcept + { + right_input.muted = true; + ++stats.right_input_muted; + } + + static void unmuteInput(InputPortWithData & input_with_data) noexcept { input_with_data.muted = false; } + + void onCancel() override; + +private: + SERDE HashJoinPtr join; + + Chunk output_header_chunk; + Int64 latency_threshold; + Int64 quiesce_threshold_ms; + + SERDE InputPortWithData left_input; + SERDE InputPortWithData right_input; + bool need_propagate_heartbeat = false; + + /// We always push output_chunks first, so we can assume no output_chunks when received request checkpoint + NO_SERDE std::list output_chunks; + + struct AlignmentStats + { + UInt64 left_input_muted = 0; + UInt64 right_input_muted = 0; + UInt64 left_quiesce_joins = 0; + UInt64 right_quiesce_joins = 0; + }; + AlignmentStats stats; + + Int64 last_stats_log_ts; + Poco::Logger * log; +}; +} diff --git a/src/Processors/Transforms/Streaming/ChangelogJoinTransform.h b/src/Processors/Transforms/Streaming/ChangelogJoinTransform.h new file mode 100644 index 00000000000..8a3915e2a70 --- /dev/null +++ b/src/Processors/Transforms/Streaming/ChangelogJoinTransform.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently +/// left stream -> ... -> +/// \ +/// ChangelogJoinTransform +/// / +/// right stream -> ... -> +class ChangelogJoinTransform final : public JoinTransform +{ +public: + using JoinTransform::JoinTransform; + String getName() const override { return "StreamingChangelogJoinTransform"; } +}; +} +} diff --git a/src/Processors/Transforms/Streaming/ChangelogJoinTransformWithAlignment.h b/src/Processors/Transforms/Streaming/ChangelogJoinTransformWithAlignment.h new file mode 100644 index 00000000000..54608f9bccc --- /dev/null +++ b/src/Processors/Transforms/Streaming/ChangelogJoinTransformWithAlignment.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +namespace DB::Streaming +{ + +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently and have watermark / timestamp +/// alignment for temporal join scenarios. +/// left stream -> ... -> +/// \ +/// ChangelogJoinTransformWithAlignment +/// / +/// right stream -> ... -> +class ChangelogJoinTransformWithAlignment final : public JoinTransformWithAlignment +{ +public: + using JoinTransformWithAlignment::JoinTransformWithAlignment; + String getName() const override { return "StreamingChangelogJoinTransformWithAlignment"; } +}; +} diff --git a/src/Processors/Transforms/Streaming/JoinTransform.cpp b/src/Processors/Transforms/Streaming/JoinTransform.cpp index c2e2754681b..37ff8b86dd3 100644 --- a/src/Processors/Transforms/Streaming/JoinTransform.cpp +++ b/src/Processors/Transforms/Streaming/JoinTransform.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -240,19 +241,18 @@ inline void JoinTransform::doJoin(Chunks chunks) { /// First insert right block to update the build-side hash table if (chunks[1].hasRows()) - join->insertRightBlock(input_ports_with_data[1].input_port->getHeader().cloneWithColumns(chunks[1].detachColumns())); + join->insertRightDataBlockAndJoin(std::move(chunks[1])); /// Then use left block to join the right updated hash table /// Please note in this mode, right stream data only changes won't trigger join since left stream data is not buffered if (chunks[0].hasRows()) { - auto joined_block = input_ports_with_data[0].input_port->getHeader().cloneWithColumns(chunks[0].detachColumns()); - join->joinLeftBlock(joined_block); + auto [_, joined_block] = join->insertLeftDataBlockAndJoin(std::move(chunks[0])); if (auto rows = joined_block.rows(); rows > 0) { std::scoped_lock lock(mutex); - output_chunks.emplace_back(joined_block.getColumns(), rows); + output_chunks.emplace_back(joined_block.detachColumns(), rows); } } } @@ -260,16 +260,15 @@ inline void JoinTransform::doJoin(Chunks chunks) inline void JoinTransform::joinBidirectionally(Chunks chunks) { - std::array join_funcs - = {&Streaming::IHashJoin::insertLeftBlockAndJoin, &Streaming::IHashJoin::insertRightBlockAndJoin}; + std::array join_funcs + = {&Streaming::IHashJoin::insertLeftDataBlockAndJoin, &Streaming::IHashJoin::insertRightDataBlockAndJoin}; for (size_t i = 0; i < chunks.size(); ++i) { if (!chunks[i].hasRows()) continue; - auto block = input_ports_with_data[i].input_port->getHeader().cloneWithColumns(chunks[i].detachColumns()); - auto retracted_block = std::invoke(join_funcs[i], join.get(), block); + auto [retracted_block, block] = std::invoke(join_funcs[i], join.get(), LightChunk{std::move(chunks[i])}); { std::scoped_lock lock(mutex); @@ -280,32 +279,31 @@ inline void JoinTransform::joinBidirectionally(Chunks chunks) /// Don't watermark this block. We can concat retracted / result blocks or use avoid watermarking auto chunk_ctx = ChunkContext::create(); chunk_ctx->setRetractedDataFlag(); - output_chunks.emplace_back(retracted_block.getColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx)); + output_chunks.emplace_back(retracted_block.detachColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx)); } - if (block.rows()) - output_chunks.emplace_back(block.getColumns(), block.rows()); + if (auto rows = block.rows(); rows > 0) + output_chunks.emplace_back(block.detachColumns(), rows); } } } inline void JoinTransform::rangeJoinBidirectionally(Chunks chunks) { - std::array join_funcs - = {&Streaming::IHashJoin::insertLeftBlockToRangeBucketsAndJoin, &Streaming::IHashJoin::insertRightBlockToRangeBucketsAndJoin}; + std::array join_funcs + = {&Streaming::IHashJoin::insertLeftDataBlockAndJoin, &Streaming::IHashJoin::insertRightDataBlockAndJoin}; for (size_t i = 0; i < chunks.size(); ++i) { if (!chunks[i].hasRows()) continue; - auto block = input_ports_with_data[i].input_port->getHeader().cloneWithColumns(chunks[i].detachColumns()); - auto joined_blocks = std::invoke(join_funcs[i], join.get(), block); - - std::scoped_lock lock(mutex); - - for (size_t j = 0; j < joined_blocks.size(); ++j) - output_chunks.emplace_back(joined_blocks[j].getColumns(), joined_blocks[j].rows()); + auto [_, joined_block] = std::invoke(join_funcs[i], join.get(), LightChunk{std::move(chunks[i])}); + if (auto rows = joined_block.rows(); rows > 0) + { + std::scoped_lock lock(mutex); + output_chunks.emplace_back(joined_block.detachColumns(), rows); + } } } diff --git a/src/Processors/Transforms/Streaming/JoinTransform.h b/src/Processors/Transforms/Streaming/JoinTransform.h index a1a12282d15..e0f230ee5b3 100644 --- a/src/Processors/Transforms/Streaming/JoinTransform.h +++ b/src/Processors/Transforms/Streaming/JoinTransform.h @@ -18,7 +18,7 @@ namespace Streaming /// JoinTransform /// / /// right stream -> ... -> -class JoinTransform final : public IProcessor +class JoinTransform : public IProcessor { public: JoinTransform( diff --git a/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.cpp b/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.cpp index ee1c620d9fb..7a20f86cc61 100644 --- a/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.cpp +++ b/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.cpp @@ -262,7 +262,7 @@ void JoinTransformWithAlignment::work() auto & chunk = right_input.input_chunks.front(); assert(chunk.rows()); - processRightInputData(chunk.chunk); + processInputData(chunk.chunk); right_input.input_chunks.pop_front(); } while (right_input.hasCompleteChunks()); @@ -294,7 +294,7 @@ void JoinTransformWithAlignment::work() auto & chunk = left_input.input_chunks.front(); assert(chunk.rows()); - processLeftInputData(chunk.chunk); + processInputData(chunk.chunk); left_input.input_chunks.pop_front(); } while (left_input.hasCompleteChunks()); @@ -370,70 +370,26 @@ void JoinTransformWithAlignment::work() } } -void JoinTransformWithAlignment::processLeftInputData(LightChunk & chunk) +template +void JoinTransformWithAlignment::processInputData(LightChunk & chunk) { - /// FIXME: Provide a unified interface for different joins. - if (join->rangeBidirectionalHashJoin()) - { - auto block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns()); - auto joined_blocks = join->insertLeftBlockToRangeBucketsAndJoin(block); - for (size_t j = 0; j < joined_blocks.size(); ++j) - output_chunks.emplace_back(joined_blocks[j].getColumns(), joined_blocks[j].rows()); - } - else if (join->bidirectionalHashJoin()) - { - auto block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns()); - auto retracted_block = join->insertLeftBlockAndJoin(block); - if (auto retracted_block_rows = retracted_block.rows(); retracted_block_rows > 0) - { - /// Don't watermark this block. We can concat retracted / result blocks or use avoid watermarking - auto chunk_ctx = ChunkContext::create(); - chunk_ctx->setRetractedDataFlag(); - output_chunks.emplace_back(retracted_block.getColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx)); - } - - if (block.rows()) - output_chunks.emplace_back(block.getColumns(), block.rows()); - } + std::pair result; + if constexpr (is_left_input) + result = join->insertLeftDataBlockAndJoin(std::move(chunk)); else - { - auto joined_block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns()); - join->joinLeftBlock(joined_block); + result = join->insertRightDataBlockAndJoin(std::move(chunk)); - if (auto rows = joined_block.rows(); rows > 0) - output_chunks.emplace_back(joined_block.getColumns(), rows); - } -} - -void JoinTransformWithAlignment::processRightInputData(LightChunk & chunk) -{ - /// FIXME: Provide a unified interface for different joins. - if (join->rangeBidirectionalHashJoin()) + auto & [retracted_block, joined_block] = result; + if (auto retracted_block_rows = retracted_block.rows(); retracted_block_rows > 0) { - auto block = inputs.back().getHeader().cloneWithColumns(chunk.detachColumns()); - auto joined_blocks = join->insertRightBlockToRangeBucketsAndJoin(block); - for (size_t j = 0; j < joined_blocks.size(); ++j) - output_chunks.emplace_back(joined_blocks[j].getColumns(), joined_blocks[j].rows()); + /// Don't watermark this block. We can concat retracted / result blocks or use avoid watermarking + auto chunk_ctx = ChunkContext::create(); + chunk_ctx->setRetractedDataFlag(); + output_chunks.emplace_back(retracted_block.detachColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx)); } - else if (join->bidirectionalHashJoin()) - { - auto block = inputs.back().getHeader().cloneWithColumns(chunk.detachColumns()); - auto retracted_block = join->insertRightBlockAndJoin(block); - if (auto retracted_block_rows = retracted_block.rows(); retracted_block_rows > 0) - { - /// Don't watermark this block. We can concat retracted / result blocks or use avoid watermarking - auto chunk_ctx = ChunkContext::create(); - chunk_ctx->setRetractedDataFlag(); - output_chunks.emplace_back(retracted_block.getColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx)); - } - if (block.rows()) - output_chunks.emplace_back(block.getColumns(), block.rows()); - } - else - { - join->insertRightBlock(inputs.back().getHeader().cloneWithColumns(chunk.detachColumns())); - } + if (auto rows = joined_block.rows(); rows > 0) + output_chunks.emplace_back(joined_block.detachColumns(), rows); } void JoinTransformWithAlignment::checkpoint(CheckpointContextPtr ckpt_ctx) diff --git a/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.h b/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.h index 811cc8f3a21..a45c19984f1 100644 --- a/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.h +++ b/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.h @@ -20,7 +20,7 @@ namespace DB::Streaming /// JoinTransformWithAlignment /// / /// right stream -> ... -> -class JoinTransformWithAlignment final : public IProcessor +class JoinTransformWithAlignment : public IProcessor { public: JoinTransformWithAlignment( @@ -70,8 +70,8 @@ class JoinTransformWithAlignment final : public IProcessor Status prepareLeftInput(); Status prepareRightInput(); - void processLeftInputData(LightChunk & chunk); - void processRightInputData(LightChunk & chunk); + template + void processInputData(LightChunk & chunk); bool isInputInQuiesce(const InputPortWithData & input_with_data) const noexcept { diff --git a/src/Processors/Transforms/Streaming/LatestJoinTransform.h b/src/Processors/Transforms/Streaming/LatestJoinTransform.h new file mode 100644 index 00000000000..643480faa7f --- /dev/null +++ b/src/Processors/Transforms/Streaming/LatestJoinTransform.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace DB +{ +namespace Streaming +{ +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently +/// left stream -> ... -> +/// \ +/// LatestJoinTransform +/// / +/// right stream -> ... -> +class LatestJoinTransform final : public JoinTransform +{ +public: + using JoinTransform::JoinTransform; + String getName() const override { return "StreamingLatestJoinTransform"; } +}; +} +} diff --git a/src/Processors/Transforms/Streaming/LatestJoinTransformWithAlignment.h b/src/Processors/Transforms/Streaming/LatestJoinTransformWithAlignment.h new file mode 100644 index 00000000000..36643e72309 --- /dev/null +++ b/src/Processors/Transforms/Streaming/LatestJoinTransformWithAlignment.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +namespace DB::Streaming +{ + +/// Streaming join rows from left stream to right stream +/// It has 2 inputs, the first one is left stream and the second one is right stream. +/// These 2 input streams will be pulled concurrently and have watermark / timestamp +/// alignment for temporal join scenarios. +/// left stream -> ... -> +/// \ +/// LatestJoinTransformWithAlignment +/// / +/// right stream -> ... -> +class LatestJoinTransformWithAlignment final : public JoinTransformWithAlignment +{ +public: + using JoinTransformWithAlignment::JoinTransformWithAlignment; + String getName() const override { return "StreamingLatestJoinTransformWithAlignment"; } +}; +} diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index 8960a2f65c4..a3b2820a458 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -20,9 +20,18 @@ /// proton : starts #include -#include -#include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include /// proton : ends namespace DB @@ -709,14 +718,65 @@ std::unique_ptr QueryPipelineBuilder::joinPipelinesStreami auto hash_join = std::dynamic_pointer_cast(join); if (hash_join->getTableJoin().requiredJoinAlignment()) { - joining = std::make_shared( - left->getHeader(), right->getHeader(), out_header, std::move(hash_join), join_max_cached_bytes); + switch (hash_join->type()) + { + case Streaming::HashJoinType::Asof: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), join_max_cached_bytes); + break; + case Streaming::HashJoinType::Latest: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), join_max_cached_bytes); + break; + case Streaming::HashJoinType::Changelog: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), join_max_cached_bytes); + break; + case Streaming::HashJoinType::BidirectionalAll: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), join_max_cached_bytes); + break; + case Streaming::HashJoinType::BidirectionalRange: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), join_max_cached_bytes); + break; + case Streaming::HashJoinType::BidirectionalChangelog: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), join_max_cached_bytes); + break; + } } else { - joining = std::make_shared( - left->getHeader(), right->getHeader(), out_header, std::move(hash_join), max_block_size, join_max_cached_bytes); + switch (hash_join->type()) + { + case Streaming::HashJoinType::Asof: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), max_block_size, join_max_cached_bytes); + break; + case Streaming::HashJoinType::Latest: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), max_block_size, join_max_cached_bytes); + break; + case Streaming::HashJoinType::Changelog: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), max_block_size, join_max_cached_bytes); + break; + case Streaming::HashJoinType::BidirectionalAll: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), max_block_size, join_max_cached_bytes); + break; + case Streaming::HashJoinType::BidirectionalRange: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), max_block_size, join_max_cached_bytes); + break; + case Streaming::HashJoinType::BidirectionalChangelog: + joining = std::make_shared( + left->getHeader(), right->getHeader(), out_header, std::move(hash_join), max_block_size, join_max_cached_bytes); + break; + } } + assert(joining); connect(**lit, joining->getInputs().front()); connect(**rit, joining->getInputs().back()); diff --git a/src/Storages/ExternalStream/Kafka/KafkaSink.h b/src/Storages/ExternalStream/Kafka/KafkaSink.h index dbc037520e6..c52245c5fe5 100644 --- a/src/Storages/ExternalStream/Kafka/KafkaSink.h +++ b/src/Storages/ExternalStream/Kafka/KafkaSink.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include #include diff --git a/src/Storages/Streaming/StreamSink.h b/src/Storages/Streaming/StreamSink.h index 5a46c1c38fa..fef5502a879 100644 --- a/src/Storages/Streaming/StreamSink.h +++ b/src/Storages/Streaming/StreamSink.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include namespace DB {