Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support append-only left all join append-only #739

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/Interpreters/Streaming/HashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ size_t insertFromBlockImpl(
const HashJoin::SupportMatrix HashJoin::support_matrix = {
/// <left_stroage_semantic, join_kind, join_strictness, right_storage_semantic> - supported
/// Append ...
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::Append}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::ChangelogKV}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::VersionedKV}, true},
{{StorageSemantic::Append, JoinKind::Left, JoinStrictness::All, StorageSemantic::Changelog}, true},
Expand Down Expand Up @@ -927,15 +928,15 @@ void HashJoin::init()

bidirectional_hash_join = !data_enrichment_join;

/// append-only inner join append-only on ... and date_diff_within(10s)
/// append-only inner/left join append-only on ... and date_diff_within(10s)
/// In case when emitChangeLog()
if (streaming_strictness == Strictness::Range
&& (left_data.join_stream_desc->data_stream_semantic != DataStreamSemantic::Append
|| right_data.join_stream_desc->data_stream_semantic != DataStreamSemantic::Append
|| streaming_kind != Kind::Inner))
|| (streaming_kind != Kind::Inner && streaming_kind != Kind::Left)))
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Only inner range join is supported and the left and right stream must be append-only streams in range join");
"Only inner/left range join is supported and the left and right stream must be append-only streams in range join");

range_bidirectional_hash_join = bidirectional_hash_join && (streaming_strictness == Strictness::Range);

Expand Down
114 changes: 113 additions & 1 deletion src/Interpreters/Streaming/tests/gtest_streaming_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ TEST(StreamingHashJoin, SimpleJoinTests)
context);

/// Additional range between
if (Streaming::isAppendStorage(left_data_stream_semantic) && kind == JoinKind::Inner && strictness == JoinStrictness::All
if (Streaming::isAppendStorage(left_data_stream_semantic) && (kind == JoinKind::Inner || kind == JoinKind::Left) && strictness == JoinStrictness::All
&& Streaming::isAppendStorage(right_data_stream_semantic))
{
commonTest(
Expand All @@ -826,6 +826,118 @@ TEST(StreamingHashJoin, SimpleJoinTests)
}
}

TEST(StreamingHashJoin, AppendLeftAllJoinAppend)
{
auto context = getContext().context;
Block left_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
Block right_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);

/// stream(t1) left all join stream(t2) on t1.col_1 = t2.col_1
commonTest(
"left",
"all",
/*on_clause*/ "t1.col_1 = t2.col_1",
left_header,
Streaming::StorageSemantic::Append,
/*left_primary_key_column_indexes*/ std::nullopt,
right_header,
Streaming::StorageSemantic::Append,
/*right_primary_key_column_indexes*/ std::nullopt,
/*to_join_steps*/
{
{
/*to join pos*/ ToJoinStep::RIGHT,
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:00')", context),
/*expected join results*/ ExpectedJoinResults{},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:00')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')"
"(2, '2023-1-1 00:00:00', '1970-1-1 00:00:00')",
},
},
},
context);
}

TEST(StreamingHashJoin, AppendLeftRangeJoinAppend)
{
auto context = getContext().context;
Block left_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);
Block right_header = prepareBlock(/*types*/ {"int", "datetime64(3, 'UTC')"}, /*no data*/ "", context);

commonTest(
"left",
"all",
/*on_clause*/ "t1.col_1 = t2.col_1 and date_diff_within(2s, t1.col_2, t2.col_2)",
left_header,
Streaming::StorageSemantic::Append,
/*left_primary_key_column_indexes*/ std::nullopt,
right_header,
Streaming::StorageSemantic::Append,
/*right_primary_key_column_indexes*/ std::nullopt,
/*to_join_steps*/
{
{
/*to join pos*/ ToJoinStep::RIGHT,
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:00')(1, '2023-1-1 00:00:01')", context),
/*expected join results*/ ExpectedJoinResults{},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:00')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')(1, '2023-1-1 00:00:00', '2023-1-1 00:00:01')"
"(2, '2023-1-1 00:00:00', '1970-1-1 00:00:00')",
},
},
{
/*to join pos*/ ToJoinStep::RIGHT,
/*to join block*/ prepareBlockByHeader(right_header, "(1, '2023-1-1 00:00:02')(1, '2023-1-1 00:00:03')(2, '2023-1-1 00:00:02')(2, '2023-1-1 00:00:03')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
"(2, '2023-1-1 00:00:00', '2023-1-1 00:00:02')",
},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(1, '2023-1-1 00:00:00')(1, '2023-1-1 00:00:02')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(1, '2023-1-1 00:00:00', '2023-1-1 00:00:00')"
"(1, '2023-1-1 00:00:00', '2023-1-1 00:00:01')"
"(1, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:00')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:01')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:02')"
"(1, '2023-1-1 00:00:02', '2023-1-1 00:00:03')",
},
},
{
/*to join pos*/ ToJoinStep::LEFT,
/*to join block*/ prepareBlockByHeader(left_header, "(2, '2023-1-1 00:00:00')(2, '2023-1-1 00:00:02')(3, '2023-1-1 00:00:03')", context),
/*expected join results*/
ExpectedJoinResults{
/// output header: col_1, col_2, t2.col_2
.values = "(2, '2023-1-1 00:00:00', '2023-1-1 00:00:02')"
"(2, '2023-1-1 00:00:02', '2023-1-1 00:00:02')"
"(2, '2023-1-1 00:00:02', '2023-1-1 00:00:03')"
"(3, '2023-1-1 00:00:03', '1970-1-1 00:00:00')",
},
},
},
context);
}

TEST(StreamingHashJoin, AppendLeftAsofJoinAppend)
{
auto context = getContext().context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,19 @@
{"client":"python", "query_type": "stream", "query_id":"1056", "wait":1, "terminate":"manual", "query":"select i, k, j, kk from test10_append_left_stream left join test10_append_right_stream on k == kk;"},
{"client":"python", "query_type": "table", "depends_on":"1056", "wait":1, "query": "insert into test10_append_right_stream (j, kk) values (1, 'a') (1, 'b');"},
{"client":"python", "query_type": "table", "wait":1, "query": "insert into test10_append_left_stream (i, k) values (2, 'a');"},
{"client":"python", "query_type": "table", "wait":1, "query": "insert into test10_append_left_stream (i, k) values (2, 'b');"},
{"client":"python", "query_type": "table", "kill":"1056", "kill_wait":3, "wait":1, "query": "insert into test10_append_left_stream (i, k) values (2, 'c');"}
{"client":"python", "query_type": "table", "query": "insert into test10_append_left_stream (i, k) values (2, 'b');"},
{"client":"python", "query_type": "table", "kill":"1056", "kill_wait":3, "query": "insert into test10_append_left_stream (i, k) values (2, 'c');"}
]
}
],
"expected_results": [
{
"query_id":"1056",
"expected_results": "error_code:48"
"expected_results":[
[2, "a", 1, "a"],
[2, "b", 1, "b"],
[2, "c", 0, ""]
]
}
]
},
Expand Down
Loading