From 66b2a8b58116ca022c3902ef1e5f17944bd60b9d Mon Sep 17 00:00:00 2001 From: skrzypo987 Date: Fri, 8 Jul 2022 09:46:07 +0300 Subject: [PATCH 1/3] Fix faulty flag The `force-spilling-join-operator` config flag and the corresponding session property might have not been working properly and produce ClassCastException. --- .../sql/planner/LocalExecutionPlanner.java | 22 ++++++------- .../TestJoinQueriesWithForceSpilling.java | 31 +++++++++++++++++++ 2 files changed, 42 insertions(+), 11 deletions(-) create mode 100644 testing/trino-tests/src/test/java/io/trino/tests/TestJoinQueriesWithForceSpilling.java diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 2908f84b6480..231f1f808b25 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -2787,7 +2787,7 @@ private JoinBridgeManager createLookupSourceFactoryManager( int taskConcurrency = getTaskConcurrency(session); OperatorFactory hashBuilderOperatorFactory; - if (useSpillingJoinOperator()) { + if (useSpillingJoinOperator(session)) { hashBuilderOperatorFactory = new HashBuilderOperatorFactory( buildContext.getNextOperatorId(), node.getId(), @@ -2967,7 +2967,7 @@ private OperatorFactory createLookupJoin( outputSingleMatch, waitForBuild, node.getFilter().isPresent(), - useSpillingJoinOperator(), + useSpillingJoinOperator(session), probeTypes, probeJoinChannels, probeHashChannel, @@ -2982,7 +2982,7 @@ private OperatorFactory createLookupJoin( lookupSourceFactoryManager, outputSingleMatch, node.getFilter().isPresent(), - useSpillingJoinOperator(), + useSpillingJoinOperator(session), probeTypes, probeJoinChannels, probeHashChannel, @@ -2997,7 +2997,7 @@ private OperatorFactory createLookupJoin( lookupSourceFactoryManager, waitForBuild, node.getFilter().isPresent(), - useSpillingJoinOperator(), + useSpillingJoinOperator(session), probeTypes, probeJoinChannels, probeHashChannel, @@ -3011,7 +3011,7 @@ private OperatorFactory createLookupJoin( node.getId(), lookupSourceFactoryManager, node.getFilter().isPresent(), - useSpillingJoinOperator(), + useSpillingJoinOperator(session), probeTypes, probeJoinChannels, probeHashChannel, @@ -3023,11 +3023,6 @@ private OperatorFactory createLookupJoin( throw new UnsupportedOperationException("Unsupported join type: " + node.getType()); } - private boolean useSpillingJoinOperator() - { - return isSpillEnabled(session) || isForceSpillingOperator(session); - } - private Map createJoinSourcesLayout(Map lookupSourceLayout, Map probeSourceLayout) { ImmutableMap.Builder joinSourcesLayout = ImmutableMap.builder(); @@ -3886,7 +3881,7 @@ private JoinBridge createLookupSourceFactory( List buildTypes, Session session) { - if (isSpillEnabled(session) && !isForceSpillingOperator(session)) { + if (useSpillingJoinOperator(session)) { return new PartitionedLookupSourceFactory( buildTypes, buildOutputTypes, @@ -4298,4 +4293,9 @@ public String toString() .toString(); } } + + private boolean useSpillingJoinOperator(Session session) + { + return isSpillEnabled(session) || isForceSpillingOperator(session); + } } diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestJoinQueriesWithForceSpilling.java b/testing/trino-tests/src/test/java/io/trino/tests/TestJoinQueriesWithForceSpilling.java new file mode 100644 index 000000000000..a7543b968c55 --- /dev/null +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestJoinQueriesWithForceSpilling.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests; + +import io.trino.testing.AbstractTestJoinQueries; +import io.trino.testing.QueryRunner; +import io.trino.tests.tpch.TpchQueryRunnerBuilder; + +public class TestJoinQueriesWithForceSpilling + extends AbstractTestJoinQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return TpchQueryRunnerBuilder.builder() + .addExtraProperty("force-spilling-join-operator", "true") + .build(); + } +} From 21b113c3fc5200158c3b7e9b4cac6bfbbbb0689c Mon Sep 17 00:00:00 2001 From: skrzypo987 Date: Mon, 11 Jul 2022 11:39:40 +0300 Subject: [PATCH 2/3] Use more specific flag Use `spillEnabled` local variable instead of session property. That way the new non-spilling join operator will be used in the case of spilling enabled by session property but disabled because of different reasons. --- .../sql/planner/LocalExecutionPlanner.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 231f1f808b25..e4677d3fe2f3 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -2768,7 +2768,7 @@ private JoinBridgeManager createLookupSourceFactoryManager( List buildTypes = buildSource.getTypes(); JoinBridgeManager lookupSourceFactoryManager = new JoinBridgeManager<>( buildOuter, - createLookupSourceFactory(buildChannels, buildOuter, partitionCount, buildOutputTypes, buildTypes, session), + createLookupSourceFactory(buildChannels, buildOuter, partitionCount, buildOutputTypes, buildTypes, spillEnabled, session), buildOutputTypes); int operatorId = buildContext.getNextOperatorId(); @@ -2787,7 +2787,7 @@ private JoinBridgeManager createLookupSourceFactoryManager( int taskConcurrency = getTaskConcurrency(session); OperatorFactory hashBuilderOperatorFactory; - if (useSpillingJoinOperator(session)) { + if (useSpillingJoinOperator(spillEnabled, session)) { hashBuilderOperatorFactory = new HashBuilderOperatorFactory( buildContext.getNextOperatorId(), node.getId(), @@ -2967,7 +2967,7 @@ private OperatorFactory createLookupJoin( outputSingleMatch, waitForBuild, node.getFilter().isPresent(), - useSpillingJoinOperator(session), + useSpillingJoinOperator(spillEnabled, session), probeTypes, probeJoinChannels, probeHashChannel, @@ -2982,7 +2982,7 @@ private OperatorFactory createLookupJoin( lookupSourceFactoryManager, outputSingleMatch, node.getFilter().isPresent(), - useSpillingJoinOperator(session), + useSpillingJoinOperator(spillEnabled, session), probeTypes, probeJoinChannels, probeHashChannel, @@ -2997,7 +2997,7 @@ private OperatorFactory createLookupJoin( lookupSourceFactoryManager, waitForBuild, node.getFilter().isPresent(), - useSpillingJoinOperator(session), + useSpillingJoinOperator(spillEnabled, session), probeTypes, probeJoinChannels, probeHashChannel, @@ -3011,7 +3011,7 @@ private OperatorFactory createLookupJoin( node.getId(), lookupSourceFactoryManager, node.getFilter().isPresent(), - useSpillingJoinOperator(session), + useSpillingJoinOperator(spillEnabled, session), probeTypes, probeJoinChannels, probeHashChannel, @@ -3879,9 +3879,10 @@ private JoinBridge createLookupSourceFactory( int partitionCount, ImmutableList buildOutputTypes, List buildTypes, + boolean spillEnabled, Session session) { - if (useSpillingJoinOperator(session)) { + if (useSpillingJoinOperator(spillEnabled, session)) { return new PartitionedLookupSourceFactory( buildTypes, buildOutputTypes, @@ -4294,8 +4295,8 @@ public String toString() } } - private boolean useSpillingJoinOperator(Session session) + private boolean useSpillingJoinOperator(boolean spillEnabled, Session session) { - return isSpillEnabled(session) || isForceSpillingOperator(session); + return spillEnabled || isForceSpillingOperator(session); } } From 11a85a3ea12089fbcf0a4be77605505500ce9c09 Mon Sep 17 00:00:00 2001 From: skrzypo987 Date: Mon, 11 Jul 2022 11:40:13 +0300 Subject: [PATCH 3/3] Fix unnecessary TODO --- .../main/java/io/trino/sql/planner/LocalExecutionPlanner.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index e4677d3fe2f3..053da85a10ee 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -3872,7 +3872,6 @@ private OperatorFactory createHashAggregationOperatorFactory( } } - // TODO comment private JoinBridge createLookupSourceFactory( List buildChannels, boolean buildOuter,