Skip to content

Commit

Permalink
Feature/issue 634 support left join for versioned kv streams (#647)
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen authored Apr 21, 2024
1 parent fa69ae5 commit c959021
Show file tree
Hide file tree
Showing 13 changed files with 642 additions and 253 deletions.
11 changes: 11 additions & 0 deletions src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,17 @@ int Block::compareAt(size_t lhs_row, size_t rhs_row, const Block & rhs_block, co

return 0;
}

Columns Block::detachColumns()
{
size_t num_columns = data.size();
Columns columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
columns[i] = std::move(data[i].column);

clear();
return columns;
}
/// proton: ends

}
2 changes: 2 additions & 0 deletions src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ class Block

void renameColumn(String new_name, size_t column_pos);

Columns detachColumns();

/// Deep clone, use cautiously. Most of time, we don't need deepClone
Block deepClone() const;

Expand Down
295 changes: 217 additions & 78 deletions src/Interpreters/Streaming/HashJoin.cpp

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/Interpreters/Streaming/HashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ class HashJoin final : public IHashJoin
std::vector<Block> insertBlockToRangeBucketsAndJoin(Block block);

template <bool is_left_block>
void doInsertBlock(Block block, HashBlocksPtr target_hash_blocks);
void doInsertBlock(Block block, HashBlocksPtr target_hash_blocks, IColumn::Filter * new_keys_filter = nullptr);

/// For bidirectional hash join
/// Return retracted block if needs emit changelog, otherwise empty block
Expand Down
342 changes: 293 additions & 49 deletions src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp

Large diffs are not rendered by default.

47 changes: 21 additions & 26 deletions src/Processors/Transforms/Streaming/JoinTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ JoinTransform::JoinTransform(

IProcessor::Status JoinTransform::prepare()
{
std::scoped_lock lock(mutex);

auto & output = outputs.front();

/// Check can output.
Expand Down Expand Up @@ -125,8 +123,6 @@ void JoinTransform::work()
Chunks chunks;
{
/// Move out the input chunks
std::scoped_lock lock(mutex);

assert(input_ports_with_data[0].input_chunk || input_ports_with_data[1].input_chunk);

for (size_t i = 0; i < input_ports_with_data.size(); ++i)
Expand Down Expand Up @@ -188,7 +184,6 @@ void JoinTransform::work()
/// We only do this piggy-back once for the last output chunk if there is
if (has_watermark)
{
std::scoped_lock lock(mutex);
if (!output_chunks.empty())
setupWatermark(output_chunks.back(), local_watermark);
else
Expand All @@ -200,7 +195,6 @@ void JoinTransform::work()
checkpoint(requested_ckpt);

/// Propagate request checkpoint
std::scoped_lock lock(mutex);
assert(!output_chunks.empty());
output_chunks.back().setCheckpointContext(std::move(requested_ckpt));
}
Expand Down Expand Up @@ -257,10 +251,7 @@ inline void JoinTransform::doJoin(Chunks chunks)
join->joinLeftBlock(joined_block);

if (auto rows = joined_block.rows(); rows > 0)
{
std::scoped_lock lock(mutex);
output_chunks.emplace_back(joined_block.getColumns(), rows);
}
}
}
}
Expand All @@ -278,21 +269,23 @@ inline void JoinTransform::joinBidirectionally(Chunks chunks)
auto block = input_ports_with_data[i].input_port->getHeader().cloneWithColumns(chunks[i].detachColumns());
auto retracted_block = std::invoke(join_funcs[i], join.get(), block);

/// First emit retracted block
auto retracted_block_rows = retracted_block.rows();
if (retracted_block_rows)
{
std::scoped_lock lock(mutex);
/// First emit retracted block
auto retracted_block_rows = retracted_block.rows();
if (retracted_block_rows)
{
/// Don't watermark this block. We can concat retracted / result blocks or use avoid watermarking
auto chunk_ctx = ChunkContext::create();
chunk_ctx->setConsecutiveDataFlag();
output_chunks.emplace_back(retracted_block.getColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx));
}

if (block.rows())
output_chunks.emplace_back(block.getColumns(), block.rows());
/// Don't watermark retracted chunk since we like the retracted chunk and the following result chunk
/// to process in a consecutive way. For example, avoid emitting result right after processing retracted chunk
/// but without processing the following result chunk. This will prevent transitive emit result we don't like
/// to have usually.
/// To have retracted chunk / result chunk processed consecutively, we can either concat them into one bigger
/// chunk or use `consecutive` flag which we use here.
auto chunk_ctx = ChunkContext::create();
chunk_ctx->setConsecutiveDataFlag();
output_chunks.emplace_back(retracted_block.getColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx));
}

if (block.rows())
output_chunks.emplace_back(block.getColumns(), block.rows());
}
}

Expand All @@ -309,10 +302,12 @@ inline void JoinTransform::rangeJoinBidirectionally(Chunks chunks)
auto block = input_ports_with_data[i].input_port->getHeader().cloneWithColumns(chunks[i].detachColumns());
auto joined_blocks = std::invoke(join_funcs[i], join.get(), block);

std::scoped_lock lock(mutex);

for (size_t j = 0; j < joined_blocks.size(); ++j)
output_chunks.emplace_back(joined_blocks[j].getColumns(), joined_blocks[j].rows());
for (auto & joined_block : joined_blocks)
{
auto rows = joined_block.rows();
if (rows)
output_chunks.emplace_back(joined_block.detachColumns(), rows);
}
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/Processors/Transforms/Streaming/JoinTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ class JoinTransform final : public IProcessor

Poco::Logger * logger;

mutable std::mutex mutex;

/// When received request checkpoint, it's always empty chunk with checkpoint context
NO_SERDE std::array<InputPortWithData, 2> input_ports_with_data;
/// We always push output_chunks first, so we can assume no output_chunks when received request checkpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
{"client":"python", "query_type": "table","wait":1, "query":"drop stream if exists test10_changelog_kv_stream"},
{"client":"python", "query_type": "table", "wait":2,"exist":"test10_append_left_stream","exist_wait":1, "query":"create stream if not exists test10_append_left_stream(i int, k string)"},
{"client":"python", "query_type": "table", "wait":2,"exist":"test10_changelog_kv_stream","exist_wait":1, "query":"create stream if not exists test10_changelog_kv_stream(ii int, kk string) primary key kk settings mode='changelog_kv'"},
{"client":"python", "query_type": "stream", "query_id":"1051", "wait":1, "terminate":"manual", "query":"select i, k, ii, kk, _tp_delta from test10_append_left_stream inner join test10_changelog_kv_stream on i == ii"},
{"client":"python", "query_type": "stream", "query_id":"1051", "wait":1, "terminate":"manual", "query":"select i, k, ii, kk from test10_append_left_stream inner join test10_changelog_kv_stream on i == ii"},
{"client":"python", "query_type": "table", "depends_on":"1051", "wait":1, "query": "insert into test10_changelog_kv_stream (ii, kk) values (1, 'k1')"},
{"client":"python", "query_type": "table", "wait":1, "query": "insert into test10_append_left_stream (i, k) values (1, 'k1') (2, 'k2')"},
{"client":"python", "query_type": "table", "wait":1, "query": "insert into test10_changelog_kv_stream (ii, kk) values (1, 'k2')"},
Expand All @@ -63,8 +63,8 @@
{
"query_id":"1051",
"expected_results":[
[1, "k1", 1, "k1", 1],
[1, "k1", 1, "k2", 1]
[1, "k1", 1, "k1"],
[1, "k1", 1, "k2"]
]
}
]
Expand Down
Loading

0 comments on commit c959021

Please sign in to comment.