diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PartitionedLookupSourceFactory.java b/presto-main/src/main/java/com/facebook/presto/operator/PartitionedLookupSourceFactory.java index 14205927bfa48..75d863913d102 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PartitionedLookupSourceFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PartitionedLookupSourceFactory.java @@ -216,7 +216,7 @@ public void setPartitionSpilledLookupSourceHandle(int partitionIndex, SpilledLoo lock.writeLock().lock(); try { - if (destroyed.isDone()) { + if (partitionsNoLongerNeeded.isDone()) { spilledLookupSourceHandle.dispose(); return; } @@ -306,6 +306,14 @@ public ListenableFuture>> finishPr try { if (!spillingInfo.hasSpilled()) { finishedProbeOperators++; + if (lookupJoinsCount.isPresent()) { + checkState(finishedProbeOperators <= lookupJoinsCount.getAsInt(), "%s probe operators finished out of %s declared", finishedProbeOperators, lookupJoinsCount.getAsInt()); + if (finishedProbeOperators == lookupJoinsCount.getAsInt()) { + // We can dispose partitions now since right outer is not supported with spill and lookupJoinsCount should be absent + freePartitions(); + } + } + return immediateFuture(new PartitionedConsumption<>( 1, emptyList(), @@ -326,7 +334,7 @@ public ListenableFuture>> finishPr finishedProbeOperators++; if (finishedProbeOperators == operatorsCount) { - // We can dispose partitions now since as right outer is not supported with spill + // We can dispose partitions now since right outer is not supported with spill freePartitions(); verify(!partitionedConsumption.isDone()); partitionedConsumption.set(new PartitionedConsumption<>( diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index 69988e4a21922..8ec7177d76a33 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -2154,11 +2154,25 @@ private PhysicalOperation createLookupJoin(JoinNode node, // Plan probe PhysicalOperation probeSource = probeNode.accept(this, context); + // Plan build + LocalExecutionPlanContext buildContext = context.createSubContext(); + PhysicalOperation buildSource = buildNode.accept(this, buildContext); + if (buildSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION) { + checkState( + probeSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION, + "Build execution is GROUPED_EXECUTION. Probe execution is expected be GROUPED_EXECUTION, but is UNGROUPED_EXECUTION."); + } + + boolean buildOuter = node.getType() == RIGHT || node.getType() == FULL; + // spill does not work for probe only grouped execution because PartitionedLookupSourceFactory.finishProbe() expects a defined number of probe operators + boolean isProbeOnlyGroupedExecution = probeSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION && buildSource.getPipelineExecutionStrategy() != GROUPED_EXECUTION; + boolean spillEnabled = isSpillEnabled(context.getSession()) && isJoinSpillingEnabled(context.getSession()) && !buildOuter && !isProbeOnlyGroupedExecution; + // Plan build JoinBridgeManager lookupSourceFactory = - createLookupSourceFactory(node, buildNode, buildVariables, buildHashVariable, probeSource, context); + createLookupSourceFactory(node, buildSource, buildContext, buildVariables, buildHashVariable, probeSource, spillEnabled, context); - OperatorFactory operator = createLookupJoin(node, probeSource, probeVariables, probeHashVariable, lookupSourceFactory, context); + OperatorFactory operator = createLookupJoin(node, probeSource, probeVariables, probeHashVariable, lookupSourceFactory, spillEnabled, context); ImmutableMap.Builder outputMappings = ImmutableMap.builder(); List outputVariables = node.getOutputVariables(); @@ -2171,25 +2185,14 @@ private PhysicalOperation createLookupJoin(JoinNode node, private JoinBridgeManager createLookupSourceFactory( JoinNode node, - PlanNode buildNode, + PhysicalOperation buildSource, + LocalExecutionPlanContext buildContext, List buildVariables, Optional buildHashVariable, PhysicalOperation probeSource, + boolean spillEnabled, LocalExecutionPlanContext context) { - // Determine if planning broadcast join - Optional distributionType = node.getDistributionType(); - boolean isBroadcastJoin = distributionType.isPresent() && distributionType.get() == REPLICATED; - - LocalExecutionPlanContext buildContext = context.createSubContext(); - PhysicalOperation buildSource = buildNode.accept(this, buildContext); - - if (buildSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION) { - checkState( - probeSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION, - "Build execution is GROUPED_EXECUTION. Probe execution is expected be GROUPED_EXECUTION, but is UNGROUPED_EXECUTION."); - } - List buildOutputVariables = node.getOutputVariables().stream() .filter(node.getRight().getOutputVariables()::contains) .collect(toImmutableList()); @@ -2198,10 +2201,6 @@ private JoinBridgeManager createLookupSourceFact OptionalInt buildHashChannel = buildHashVariable.map(variableChannelGetter(buildSource)) .map(OptionalInt::of).orElse(OptionalInt.empty()); - boolean spillEnabled = isSpillEnabled(context.getSession()) && isJoinSpillingEnabled(context.getSession()); - boolean buildOuter = node.getType() == RIGHT || node.getType() == FULL; - int partitionCount = buildContext.getDriverInstanceCount().orElse(1); - Optional filterFunctionFactory = node.getFilter() .map(filterExpression -> compileJoinFilterFunction( session.getSqlFunctionProperties(), @@ -2234,6 +2233,8 @@ private JoinBridgeManager createLookupSourceFact ImmutableList buildOutputTypes = buildOutputChannels.stream() .map(buildSource.getTypes()::get) .collect(toImmutableList()); + boolean buildOuter = node.getType() == RIGHT || node.getType() == FULL; + int partitionCount = buildContext.getDriverInstanceCount().orElse(1); JoinBridgeManager lookupSourceFactoryManager = new JoinBridgeManager<>( buildOuter, probeSource.getPipelineExecutionStrategy(), @@ -2255,8 +2256,9 @@ private JoinBridgeManager createLookupSourceFact createDynamicFilter(buildSource, node, context, partitionCount).ifPresent( filter -> factoriesBuilder.add(createDynamicFilterSourceOperatorFactory(filter, node.getId(), buildSource, buildContext))); - // spill does not work for probe only grouped execution because PartitionedLookupSourceFactory.finishProbe() expects a defined number of probe operators - boolean isProbeOnlyGroupedExecution = probeSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION && buildSource.getPipelineExecutionStrategy() != GROUPED_EXECUTION; + // Determine if planning broadcast join + Optional distributionType = node.getDistributionType(); + boolean isBroadcastJoin = distributionType.isPresent() && distributionType.get() == REPLICATED; HashBuilderOperatorFactory hashBuilderOperatorFactory = new HashBuilderOperatorFactory( buildContext.getNextOperatorId(), @@ -2270,7 +2272,7 @@ private JoinBridgeManager createLookupSourceFact searchFunctionFactories, 10_000, pagesIndexFactory, - spillEnabled && !buildOuter && partitionCount > 1 && !isProbeOnlyGroupedExecution, + spillEnabled && partitionCount > 1, singleStreamSpillerFactory, isBroadcastJoin); @@ -2360,6 +2362,7 @@ private OperatorFactory createLookupJoin( List probeVariables, Optional probeHashVariable, JoinBridgeManager lookupSourceFactoryManager, + boolean spillEnabled, LocalExecutionPlanContext context) { List probeTypes = probeSource.getTypes(); @@ -2370,7 +2373,7 @@ private OperatorFactory createLookupJoin( List probeJoinChannels = ImmutableList.copyOf(getChannelsForVariables(probeVariables, probeSource.getLayout())); OptionalInt probeHashChannel = probeHashVariable.map(variableChannelGetter(probeSource)) .map(OptionalInt::of).orElse(OptionalInt.empty()); - OptionalInt totalOperatorsCount = getJoinOperatorsCountForSpill(context, session); + OptionalInt totalOperatorsCount = getJoinOperatorsCountForSpill(context, spillEnabled); switch (node.getType()) { case INNER: @@ -2386,13 +2389,14 @@ private OperatorFactory createLookupJoin( } } - private OptionalInt getJoinOperatorsCountForSpill(LocalExecutionPlanContext context, Session session) + private OptionalInt getJoinOperatorsCountForSpill(LocalExecutionPlanContext context, boolean spillEnabled) { - OptionalInt driverInstanceCount = context.getDriverInstanceCount(); - if (isSpillEnabled(session) && isJoinSpillingEnabled(session)) { + if (spillEnabled) { + OptionalInt driverInstanceCount = context.getDriverInstanceCount(); checkState(driverInstanceCount.isPresent(), "A fixed distribution is required for JOIN when spilling is enabled"); + return driverInstanceCount; } - return driverInstanceCount; + return OptionalInt.empty(); } private Map createJoinSourcesLayout(Map lookupSourceLayout, Map probeSourceLayout) diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java index 91243fb99ef38..ea184084c30a8 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java @@ -546,6 +546,41 @@ private static MaterializedResult getProperColumns(Operator joinOperator, List lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager(); + PartitionedLookupSourceFactory lookupSourceFactory = lookupSourceFactoryManager.getJoinBridge(Lifespan.taskWide()); + + // finish probe before any build partition is spilled + lookupSourceFactory.finishProbeOperator(OptionalInt.of(1)); + + // spill build partition after probe is finished + HashBuilderOperator hashBuilderOperator = buildSideSetup.getBuildOperators().get(0); + hashBuilderOperator.startMemoryRevoke().get(); + hashBuilderOperator.finishMemoryRevoke(); + hashBuilderOperator.finish(); + + // hash builder operator should not deadlock waiting for spilled lookup source to be disposed + hashBuilderOperator.isBlocked().get(); + + lookupSourceFactory.destroy(); + assertTrue(hashBuilderOperator.isFinished()); + } + @Test(dataProvider = "hashJoinTestValues") public void testInnerJoinWithNullProbe(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled) { diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java index db385122641ea..ebfba7b0a2eea 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java @@ -153,7 +153,7 @@ public void testParsingError() } @Test - public void selectLargeInterval() + public void testSelectLargeInterval() { MaterializedResult result = computeActual("SELECT INTERVAL '30' DAY"); assertEquals(result.getRowCount(), 1); @@ -165,7 +165,7 @@ public void selectLargeInterval() } @Test - public void emptyJoins() + public void testEmptyJoins() { // Empty predicate assertQuery("select 1 from (select * from orders where 1 = 0) DT join customer on DT.custkey=customer.custkey", @@ -186,7 +186,7 @@ public void emptyJoins() } @Test - public void selectNull() + public void testSelectNull() { assertQuery("SELECT NULL"); } diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestDistributedSpilledQueries.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestDistributedSpilledQueries.java index dcbb4ba87a527..e36ad96ed3110 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestDistributedSpilledQueries.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestDistributedSpilledQueries.java @@ -78,11 +78,4 @@ public void testAssignUniqueId() // TODO: disabled until https://github.com/prestodb/presto/issues/8926 is resolved // due to long running query test created many spill files on disk. } - - @Test(enabled = false) - @Override - public void testCorrelatedNonAggregationScalarSubqueries() - { - // TODO: disable until https://github.com/prestodb/presto/issues/15542 is resolved - } }