Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void setPartitionSpilledLookupSourceHandle(int partitionIndex, SpilledLoo

lock.writeLock().lock();
try {
if (destroyed.isDone()) {
if (partitionsNoLongerNeeded.isDone()) {
spilledLookupSourceHandle.dispose();
return;
}
Expand Down Expand Up @@ -306,6 +306,14 @@ public ListenableFuture<PartitionedConsumption<Supplier<LookupSource>>> finishPr
try {
if (!spillingInfo.hasSpilled()) {
finishedProbeOperators++;
if (lookupJoinsCount.isPresent()) {
checkState(finishedProbeOperators <= lookupJoinsCount.getAsInt(), "%s probe operators finished out of %s declared", finishedProbeOperators, lookupJoinsCount.getAsInt());
if (finishedProbeOperators == lookupJoinsCount.getAsInt()) {
// We can dispose partitions now since right outer is not supported with spill and lookupJoinsCount should be absent
freePartitions();
}
}

return immediateFuture(new PartitionedConsumption<>(
1,
emptyList(),
Expand All @@ -326,7 +334,7 @@ public ListenableFuture<PartitionedConsumption<Supplier<LookupSource>>> finishPr

finishedProbeOperators++;
if (finishedProbeOperators == operatorsCount) {
// We can dispose partitions now since as right outer is not supported with spill
// We can dispose partitions now since right outer is not supported with spill
freePartitions();
verify(!partitionedConsumption.isDone());
partitionedConsumption.set(new PartitionedConsumption<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2154,11 +2154,25 @@ private PhysicalOperation createLookupJoin(JoinNode node,
// Plan probe
PhysicalOperation probeSource = probeNode.accept(this, context);

// Plan build
LocalExecutionPlanContext buildContext = context.createSubContext();
PhysicalOperation buildSource = buildNode.accept(this, buildContext);
if (buildSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION) {
checkState(
probeSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION,
"Build execution is GROUPED_EXECUTION. Probe execution is expected be GROUPED_EXECUTION, but is UNGROUPED_EXECUTION.");
}

boolean buildOuter = node.getType() == RIGHT || node.getType() == FULL;
// spill does not work for probe only grouped execution because PartitionedLookupSourceFactory.finishProbe() expects a defined number of probe operators
boolean isProbeOnlyGroupedExecution = probeSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION && buildSource.getPipelineExecutionStrategy() != GROUPED_EXECUTION;
boolean spillEnabled = isSpillEnabled(context.getSession()) && isJoinSpillingEnabled(context.getSession()) && !buildOuter && !isProbeOnlyGroupedExecution;

// Plan build
JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactory =
createLookupSourceFactory(node, buildNode, buildVariables, buildHashVariable, probeSource, context);
createLookupSourceFactory(node, buildSource, buildContext, buildVariables, buildHashVariable, probeSource, spillEnabled, context);

OperatorFactory operator = createLookupJoin(node, probeSource, probeVariables, probeHashVariable, lookupSourceFactory, context);
OperatorFactory operator = createLookupJoin(node, probeSource, probeVariables, probeHashVariable, lookupSourceFactory, spillEnabled, context);

ImmutableMap.Builder<VariableReferenceExpression, Integer> outputMappings = ImmutableMap.builder();
List<VariableReferenceExpression> outputVariables = node.getOutputVariables();
Expand All @@ -2171,25 +2185,14 @@ private PhysicalOperation createLookupJoin(JoinNode node,

private JoinBridgeManager<PartitionedLookupSourceFactory> createLookupSourceFactory(
JoinNode node,
PlanNode buildNode,
PhysicalOperation buildSource,
LocalExecutionPlanContext buildContext,
List<VariableReferenceExpression> buildVariables,
Optional<VariableReferenceExpression> buildHashVariable,
PhysicalOperation probeSource,
boolean spillEnabled,
LocalExecutionPlanContext context)
{
// Determine if planning broadcast join
Optional<JoinNode.DistributionType> distributionType = node.getDistributionType();
boolean isBroadcastJoin = distributionType.isPresent() && distributionType.get() == REPLICATED;

LocalExecutionPlanContext buildContext = context.createSubContext();
PhysicalOperation buildSource = buildNode.accept(this, buildContext);

if (buildSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION) {
checkState(
probeSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION,
"Build execution is GROUPED_EXECUTION. Probe execution is expected be GROUPED_EXECUTION, but is UNGROUPED_EXECUTION.");
}

List<VariableReferenceExpression> buildOutputVariables = node.getOutputVariables().stream()
.filter(node.getRight().getOutputVariables()::contains)
.collect(toImmutableList());
Expand All @@ -2198,10 +2201,6 @@ private JoinBridgeManager<PartitionedLookupSourceFactory> createLookupSourceFact
OptionalInt buildHashChannel = buildHashVariable.map(variableChannelGetter(buildSource))
.map(OptionalInt::of).orElse(OptionalInt.empty());

boolean spillEnabled = isSpillEnabled(context.getSession()) && isJoinSpillingEnabled(context.getSession());
boolean buildOuter = node.getType() == RIGHT || node.getType() == FULL;
int partitionCount = buildContext.getDriverInstanceCount().orElse(1);

Optional<JoinFilterFunctionFactory> filterFunctionFactory = node.getFilter()
.map(filterExpression -> compileJoinFilterFunction(
session.getSqlFunctionProperties(),
Expand Down Expand Up @@ -2234,6 +2233,8 @@ private JoinBridgeManager<PartitionedLookupSourceFactory> createLookupSourceFact
ImmutableList<Type> buildOutputTypes = buildOutputChannels.stream()
.map(buildSource.getTypes()::get)
.collect(toImmutableList());
boolean buildOuter = node.getType() == RIGHT || node.getType() == FULL;
int partitionCount = buildContext.getDriverInstanceCount().orElse(1);
JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = new JoinBridgeManager<>(
buildOuter,
probeSource.getPipelineExecutionStrategy(),
Expand All @@ -2255,8 +2256,9 @@ private JoinBridgeManager<PartitionedLookupSourceFactory> createLookupSourceFact
createDynamicFilter(buildSource, node, context, partitionCount).ifPresent(
filter -> factoriesBuilder.add(createDynamicFilterSourceOperatorFactory(filter, node.getId(), buildSource, buildContext)));

// spill does not work for probe only grouped execution because PartitionedLookupSourceFactory.finishProbe() expects a defined number of probe operators
boolean isProbeOnlyGroupedExecution = probeSource.getPipelineExecutionStrategy() == GROUPED_EXECUTION && buildSource.getPipelineExecutionStrategy() != GROUPED_EXECUTION;
// Determine if planning broadcast join
Optional<JoinNode.DistributionType> distributionType = node.getDistributionType();
boolean isBroadcastJoin = distributionType.isPresent() && distributionType.get() == REPLICATED;

HashBuilderOperatorFactory hashBuilderOperatorFactory = new HashBuilderOperatorFactory(
buildContext.getNextOperatorId(),
Expand All @@ -2270,7 +2272,7 @@ private JoinBridgeManager<PartitionedLookupSourceFactory> createLookupSourceFact
searchFunctionFactories,
10_000,
pagesIndexFactory,
spillEnabled && !buildOuter && partitionCount > 1 && !isProbeOnlyGroupedExecution,
spillEnabled && partitionCount > 1,
singleStreamSpillerFactory,
isBroadcastJoin);

Expand Down Expand Up @@ -2360,6 +2362,7 @@ private OperatorFactory createLookupJoin(
List<VariableReferenceExpression> probeVariables,
Optional<VariableReferenceExpression> probeHashVariable,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactoryManager,
boolean spillEnabled,
LocalExecutionPlanContext context)
{
List<Type> probeTypes = probeSource.getTypes();
Expand All @@ -2370,7 +2373,7 @@ private OperatorFactory createLookupJoin(
List<Integer> probeJoinChannels = ImmutableList.copyOf(getChannelsForVariables(probeVariables, probeSource.getLayout()));
OptionalInt probeHashChannel = probeHashVariable.map(variableChannelGetter(probeSource))
.map(OptionalInt::of).orElse(OptionalInt.empty());
OptionalInt totalOperatorsCount = getJoinOperatorsCountForSpill(context, session);
OptionalInt totalOperatorsCount = getJoinOperatorsCountForSpill(context, spillEnabled);

switch (node.getType()) {
case INNER:
Expand All @@ -2386,13 +2389,14 @@ private OperatorFactory createLookupJoin(
}
}

private OptionalInt getJoinOperatorsCountForSpill(LocalExecutionPlanContext context, Session session)
private OptionalInt getJoinOperatorsCountForSpill(LocalExecutionPlanContext context, boolean spillEnabled)
{
OptionalInt driverInstanceCount = context.getDriverInstanceCount();
if (isSpillEnabled(session) && isJoinSpillingEnabled(session)) {
if (spillEnabled) {
OptionalInt driverInstanceCount = context.getDriverInstanceCount();
checkState(driverInstanceCount.isPresent(), "A fixed distribution is required for JOIN when spilling is enabled");
return driverInstanceCount;
}
return driverInstanceCount;
return OptionalInt.empty();
}

private Map<VariableReferenceExpression, Integer> createJoinSourcesLayout(Map<VariableReferenceExpression, Integer> lookupSourceLayout, Map<VariableReferenceExpression, Integer> probeSourceLayout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,41 @@ private static MaterializedResult getProperColumns(Operator joinOperator, List<T
return OperatorAssertion.toMaterializedResult(joinOperator.getOperatorContext().getSession(), types, actualPages);
}

@Test(timeOut = 30_000)
public void testBuildGracefulSpill()
throws Exception
{
TaskStateMachine taskStateMachine = new TaskStateMachine(new TaskId("query", 0, 0, 0), executor);
TaskContext taskContext = TestingTaskContext.createTaskContext(executor, scheduledExecutor, TEST_SESSION, taskStateMachine);

// build factory
RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(VARCHAR, BIGINT))
.addSequencePage(4, 20, 200);

DummySpillerFactory buildSpillerFactory = new DummySpillerFactory();

BuildSideSetup buildSideSetup = setupBuildSide(true, taskContext, Ints.asList(0), buildPages, Optional.empty(), true, buildSpillerFactory);
instantiateBuildDrivers(buildSideSetup, taskContext);

JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();
PartitionedLookupSourceFactory lookupSourceFactory = lookupSourceFactoryManager.getJoinBridge(Lifespan.taskWide());

// finish probe before any build partition is spilled
lookupSourceFactory.finishProbeOperator(OptionalInt.of(1));

// spill build partition after probe is finished
HashBuilderOperator hashBuilderOperator = buildSideSetup.getBuildOperators().get(0);
hashBuilderOperator.startMemoryRevoke().get();
hashBuilderOperator.finishMemoryRevoke();
hashBuilderOperator.finish();

// hash builder operator should not deadlock waiting for spilled lookup source to be disposed
hashBuilderOperator.isBlocked().get();

lookupSourceFactory.destroy();
assertTrue(hashBuilderOperator.isFinished());
}

@Test(dataProvider = "hashJoinTestValues")
public void testInnerJoinWithNullProbe(boolean parallelBuild, boolean probeHashEnabled, boolean buildHashEnabled)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void testParsingError()
}

@Test
public void selectLargeInterval()
public void testSelectLargeInterval()
{
MaterializedResult result = computeActual("SELECT INTERVAL '30' DAY");
assertEquals(result.getRowCount(), 1);
Expand All @@ -165,7 +165,7 @@ public void selectLargeInterval()
}

@Test
public void emptyJoins()
public void testEmptyJoins()
{
// Empty predicate
assertQuery("select 1 from (select * from orders where 1 = 0) DT join customer on DT.custkey=customer.custkey",
Expand All @@ -186,7 +186,7 @@ public void emptyJoins()
}

@Test
public void selectNull()
public void testSelectNull()
{
assertQuery("SELECT NULL");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,4 @@ public void testAssignUniqueId()
// TODO: disabled until https://github.com/prestodb/presto/issues/8926 is resolved
// due to long running query test created many spill files on disk.
}

@Test(enabled = false)
@Override
public void testCorrelatedNonAggregationScalarSubqueries()
{
// TODO: disable until https://github.com/prestodb/presto/issues/15542 is resolved
}
}