diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 2908f84b6480..053da85a10ee 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -2768,7 +2768,7 @@ private JoinBridgeManager createLookupSourceFactoryManager( List buildTypes = buildSource.getTypes(); JoinBridgeManager lookupSourceFactoryManager = new JoinBridgeManager<>( buildOuter, - createLookupSourceFactory(buildChannels, buildOuter, partitionCount, buildOutputTypes, buildTypes, session), + createLookupSourceFactory(buildChannels, buildOuter, partitionCount, buildOutputTypes, buildTypes, spillEnabled, session), buildOutputTypes); int operatorId = buildContext.getNextOperatorId(); @@ -2787,7 +2787,7 @@ private JoinBridgeManager createLookupSourceFactoryManager( int taskConcurrency = getTaskConcurrency(session); OperatorFactory hashBuilderOperatorFactory; - if (useSpillingJoinOperator()) { + if (useSpillingJoinOperator(spillEnabled, session)) { hashBuilderOperatorFactory = new HashBuilderOperatorFactory( buildContext.getNextOperatorId(), node.getId(), @@ -2967,7 +2967,7 @@ private OperatorFactory createLookupJoin( outputSingleMatch, waitForBuild, node.getFilter().isPresent(), - useSpillingJoinOperator(), + useSpillingJoinOperator(spillEnabled, session), probeTypes, probeJoinChannels, probeHashChannel, @@ -2982,7 +2982,7 @@ private OperatorFactory createLookupJoin( lookupSourceFactoryManager, outputSingleMatch, node.getFilter().isPresent(), - useSpillingJoinOperator(), + useSpillingJoinOperator(spillEnabled, session), probeTypes, probeJoinChannels, probeHashChannel, @@ -2997,7 +2997,7 @@ private OperatorFactory createLookupJoin( lookupSourceFactoryManager, waitForBuild, node.getFilter().isPresent(), - useSpillingJoinOperator(), + useSpillingJoinOperator(spillEnabled, session), probeTypes, probeJoinChannels, probeHashChannel, @@ -3011,7 +3011,7 @@ private OperatorFactory createLookupJoin( node.getId(), lookupSourceFactoryManager, node.getFilter().isPresent(), - useSpillingJoinOperator(), + useSpillingJoinOperator(spillEnabled, session), probeTypes, probeJoinChannels, probeHashChannel, @@ -3023,11 +3023,6 @@ private OperatorFactory createLookupJoin( throw new UnsupportedOperationException("Unsupported join type: " + node.getType()); } - private boolean useSpillingJoinOperator() - { - return isSpillEnabled(session) || isForceSpillingOperator(session); - } - private Map createJoinSourcesLayout(Map lookupSourceLayout, Map probeSourceLayout) { ImmutableMap.Builder joinSourcesLayout = ImmutableMap.builder(); @@ -3877,16 +3872,16 @@ private OperatorFactory createHashAggregationOperatorFactory( } } - // TODO comment private JoinBridge createLookupSourceFactory( List buildChannels, boolean buildOuter, int partitionCount, ImmutableList buildOutputTypes, List buildTypes, + boolean spillEnabled, Session session) { - if (isSpillEnabled(session) && !isForceSpillingOperator(session)) { + if (useSpillingJoinOperator(spillEnabled, session)) { return new PartitionedLookupSourceFactory( buildTypes, buildOutputTypes, @@ -4298,4 +4293,9 @@ public String toString() .toString(); } } + + private boolean useSpillingJoinOperator(boolean spillEnabled, Session session) + { + return spillEnabled || isForceSpillingOperator(session); + } } diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestJoinQueriesWithForceSpilling.java b/testing/trino-tests/src/test/java/io/trino/tests/TestJoinQueriesWithForceSpilling.java new file mode 100644 index 000000000000..a7543b968c55 --- /dev/null +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestJoinQueriesWithForceSpilling.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests; + +import io.trino.testing.AbstractTestJoinQueries; +import io.trino.testing.QueryRunner; +import io.trino.tests.tpch.TpchQueryRunnerBuilder; + +public class TestJoinQueriesWithForceSpilling + extends AbstractTestJoinQueries +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return TpchQueryRunnerBuilder.builder() + .addExtraProperty("force-spilling-join-operator", "true") + .build(); + } +}