Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ jobs:
- ":presto-elasticsearch"
- ":presto-orc"
- ":presto-thrift-connector"
- ":presto-spark-base"
- ":presto-spark-base -P presto-spark-tests-smoke"
- ":presto-spark-base -P presto-spark-tests-all-queries"
- ":presto-spark-base -P presto-spark-tests-spill-queries"
timeout-minutes: 80
steps:
- uses: actions/checkout@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ private void tryUnspillNext()
}

currentPartition.ifPresent(Partition::release);
currentPartition = Optional.empty();
if (lookupSourceProvider != null) {
// There are no more partitions to process, so clean up everything
lookupSourceProvider.close();
Expand Down Expand Up @@ -510,6 +511,19 @@ public void close()
closed = true;
probe = null;

// In case of early termination (before operator is finished) release partition consumption to avoid a deadlock
if (partitionedConsumption == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a comment here that this code is needed for the case of early termination where the operator is closed before being finished.

partitionedConsumption = lookupSourceFactory.finishProbeOperator(lookupJoinsCount);
addSuccessCallback(partitionedConsumption, consumption -> consumption.beginConsumption().forEachRemaining(Partition::release));
}
currentPartition.ifPresent(Partition::release);
currentPartition = Optional.empty();
if (lookupPartitions != null) {
while (lookupPartitions.hasNext()) {
lookupPartitions.next().release();
}
}

try (Closer closer = Closer.create()) {
// `afterClose` must be run last.
// Closer is documented to mimic try-with-resource, which implies close will happen in reverse order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ public ListenableFuture<T> load()

public synchronized void release()
{
checkState(loaded.isDone());
pendingReleases--;
checkState(pendingReleases >= 0);
if (pendingReleases == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,129 @@ private void innerJoinWithSpill(boolean probeHashEnabled, List<WhenSpill> whenSp
}
}

@Test(timeOut = 60000)
public void testInnerJoinWithSpillWithEarlyTermination()
{
TaskStateMachine taskStateMachine = new TaskStateMachine(new TaskId("query", 0, 0, 0), executor);
TaskContext taskContext = TestingTaskContext.createTaskContext(executor, scheduledExecutor, TEST_SESSION, taskStateMachine);

PipelineContext joinPipelineContext = taskContext.addPipelineContext(2, true, true, false);
DriverContext joinDriverContext1 = joinPipelineContext.addDriverContext();
DriverContext joinDriverContext2 = joinPipelineContext.addDriverContext();
DriverContext joinDriverContext3 = joinPipelineContext.addDriverContext();

// build factory
RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(VARCHAR, BIGINT))
.addSequencePage(4, 20, 200)
.addSequencePage(4, 20, 200)
.addSequencePage(4, 30, 300)
.addSequencePage(4, 40, 400);

// force a yield for every match in LookupJoinOperator, set called to true after first
AtomicBoolean called = new AtomicBoolean(false);
InternalJoinFilterFunction filterFunction = new TestInternalJoinFilterFunction(
(leftPosition, leftPage, rightPosition, rightPage) -> {
called.set(true);
return true;
});

BuildSideSetup buildSideSetup = setupBuildSide(true, taskContext, Ints.asList(0), buildPages, Optional.of(filterFunction), true, SINGLE_STREAM_SPILLER_FACTORY);
JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager();

// probe factory
RowPagesBuilder probe1Pages = rowPagesBuilder(true, Ints.asList(0), ImmutableList.of(VARCHAR, BIGINT))
.row("no_match_1", 123_000L)
.row("no_match_2", 123_000L);
RowPagesBuilder probe2Pages = rowPagesBuilder(true, Ints.asList(0), ImmutableList.of(VARCHAR, BIGINT))
.row("20", 123_000L)
.row("20", 123_000L)
.pageBreak()
.addSequencePage(20, 0, 123_000)
.addSequencePage(10, 30, 123_000);
OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactoryManager, probe2Pages, PARTITIONING_SPILLER_FACTORY, OptionalInt.of(3));

// build drivers and operators
instantiateBuildDrivers(buildSideSetup, taskContext);
List<Driver> buildDrivers = buildSideSetup.getBuildDrivers();
int buildOperatorCount = buildDrivers.size();
LookupSourceFactory lookupSourceFactory = lookupSourceFactoryManager.getJoinBridge(Lifespan.taskWide());

Operator lookupOperator1 = joinOperatorFactory.createOperator(joinDriverContext1);
Operator lookupOperator2 = joinOperatorFactory.createOperator(joinDriverContext2);
Operator lookupOperator3 = joinOperatorFactory.createOperator(joinDriverContext3);
joinOperatorFactory.noMoreOperators();

ListenableFuture<LookupSourceProvider> lookupSourceProvider = lookupSourceFactory.createLookupSourceProvider();
while (!lookupSourceProvider.isDone()) {
for (Driver buildDriver : buildDrivers) {
checkErrors(taskStateMachine);
buildDriver.process();
}
}
getFutureValue(lookupSourceProvider).close();

for (int i = 0; i < buildOperatorCount; i++) {
revokeMemory(buildSideSetup.getBuildOperators().get(i));
}

for (Driver buildDriver : buildDrivers) {
runDriverInThread(executor, buildDriver);
}

ValuesOperatorFactory valuesOperatorFactory1 = new ValuesOperatorFactory(17, new PlanNodeId("values1"), probe1Pages.build());
ValuesOperatorFactory valuesOperatorFactory2 = new ValuesOperatorFactory(18, new PlanNodeId("values2"), probe2Pages.build());
ValuesOperatorFactory valuesOperatorFactory3 = new ValuesOperatorFactory(18, new PlanNodeId("values3"), ImmutableList.of());
PageBuffer pageBuffer = new PageBuffer(10);
PageBufferOperatorFactory pageBufferOperatorFactory = new PageBufferOperatorFactory(19, new PlanNodeId("pageBuffer"), pageBuffer);

Driver joinDriver1 = Driver.createDriver(
joinDriverContext1,
valuesOperatorFactory1.createOperator(joinDriverContext1),
lookupOperator1,
pageBufferOperatorFactory.createOperator(joinDriverContext1));
Driver joinDriver2 = Driver.createDriver(
joinDriverContext2,
valuesOperatorFactory2.createOperator(joinDriverContext2),
lookupOperator2,
pageBufferOperatorFactory.createOperator(joinDriverContext2));
Driver joinDriver3 = Driver.createDriver(
joinDriverContext3,
valuesOperatorFactory3.createOperator(joinDriverContext3),
lookupOperator3,
pageBufferOperatorFactory.createOperator(joinDriverContext3));

joinDriver3.close();
joinDriver3.process();

while (!called.get()) {
checkErrors(taskStateMachine);
processRow(joinDriver1, taskStateMachine);
processRow(joinDriver2, taskStateMachine);
}
joinDriver1.close();
joinDriver1.process();

while (!joinDriver2.isFinished()) {
processRow(joinDriver2, taskStateMachine);
}
checkErrors(taskStateMachine);

List<Page> pages = getPages(pageBuffer);

MaterializedResult expected = MaterializedResult.resultBuilder(taskContext.getSession(), concat(probe2Pages.getTypesWithoutHash(), buildPages.getTypesWithoutHash()))
.row("20", 123_000L, "20", 200L)
.row("20", 123_000L, "20", 200L)
.row("20", 123_000L, "20", 200L)
.row("20", 123_000L, "20", 200L)
.row("30", 123_000L, "30", 300L)
.row("31", 123_001L, "31", 301L)
.row("32", 123_002L, "32", 302L)
.row("33", 123_003L, "33", 303L)
.build();

assertEqualsIgnoreOrder(getProperColumns(lookupOperator1, concat(probe2Pages.getTypes(), buildPages.getTypes()), probe2Pages, pages).getMaterializedRows(), expected.getMaterializedRows());
}

private static void processRow(final Driver joinDriver, final TaskStateMachine taskStateMachine)
{
joinDriver.getDriverContext().getYieldSignal().setWithDelay(TimeUnit.SECONDS.toNanos(1), joinDriver.getDriverContext().getYieldExecutor());
Expand Down Expand Up @@ -1325,7 +1448,19 @@ private OperatorFactory probeOuterJoinOperatorFactory(JoinBridgeManager<Partitio
PARTITIONING_SPILLER_FACTORY);
}

private OperatorFactory innerJoinOperatorFactory(JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager, RowPagesBuilder probePages, PartitioningSpillerFactory partitioningSpillerFactory)
private OperatorFactory innerJoinOperatorFactory(
JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager,
RowPagesBuilder probePages,
PartitioningSpillerFactory partitioningSpillerFactory)
{
return innerJoinOperatorFactory(lookupSourceFactoryManager, probePages, partitioningSpillerFactory, OptionalInt.of(1));
}

private OperatorFactory innerJoinOperatorFactory(
JoinBridgeManager<PartitionedLookupSourceFactory> lookupSourceFactoryManager,
RowPagesBuilder probePages,
PartitioningSpillerFactory partitioningSpillerFactory,
OptionalInt totalOperatorsCount)
{
return LOOKUP_JOIN_OPERATORS.innerJoin(
0,
Expand All @@ -1335,7 +1470,7 @@ private OperatorFactory innerJoinOperatorFactory(JoinBridgeManager<PartitionedLo
Ints.asList(0),
getHashChannelAsInt(probePages),
Optional.empty(),
OptionalInt.of(1),
totalOperatorsCount,
partitioningSpillerFactory);
}

Expand Down
62 changes: 62 additions & 0 deletions presto-spark-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,66 @@
</plugins>
</build>

<profiles>
<profile>
<id>presto-spark-tests-smoke</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/tests/*.java</include>
</includes>
<excludes>
<exclude>**/TestPrestoSparkAggregations.java</exclude>
<exclude>**/TestPrestoSparkJoinQueries.java</exclude>
<exclude>**/TestPrestoSparkOrderByQueries.java</exclude>
<exclude>**/TestPrestoSparkWindowQueries.java</exclude>
<exclude>**/TestPrestoSparkSpilledAggregations.java</exclude>
<exclude>**/TestPrestoSparkSpilledJoinQueries.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>presto-spark-tests-all-queries</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/TestPrestoSparkAggregations.java</include>
<include>**/TestPrestoSparkJoinQueries.java</include>
<include>**/TestPrestoSparkOrderByQueries.java</include>
<include>**/TestPrestoSparkWindowQueries.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>presto-spark-tests-spill-queries</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/TestPrestoSparkSpilledAggregations.java</include>
<include>**/TestPrestoSparkSpilledJoinQueries.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -113,6 +115,7 @@ public class PrestoSparkQueryRunner
private static final Logger log = Logger.get(PrestoSparkQueryRunner.class);

private static final int NODE_COUNT = 4;
private static final int TASK_CONCURRENCY = 4;

private static final Map<String, PrestoSparkQueryRunner> instances = new ConcurrentHashMap<>();
private static final SparkContextHolder sparkContextHolder = new SparkContextHolder();
Expand Down Expand Up @@ -150,11 +153,29 @@ public static PrestoSparkQueryRunner createHivePrestoSparkQueryRunner()
return createHivePrestoSparkQueryRunner(getTables());
}

public static PrestoSparkQueryRunner createSpilledHivePrestoSparkQueryRunner(Iterable<TpchTable<?>> tables)
{
return createSpilledHivePrestoSparkQueryRunner(tables, ImmutableMap.of());
}

public static PrestoSparkQueryRunner createHivePrestoSparkQueryRunner(Iterable<TpchTable<?>> tables)
{
return createHivePrestoSparkQueryRunner(tables, ImmutableMap.of());
}

public static PrestoSparkQueryRunner createSpilledHivePrestoSparkQueryRunner(Iterable<TpchTable<?>> tables, Map<String, String> additionalConfigProperties)
{
Map<String, String> properties = new HashMap<>();
properties.put("experimental.spill-enabled", "true");
properties.put("experimental.join-spill-enabled", "true");
properties.put("experimental.temp-storage-buffer-size", "1MB");
properties.put("spark.memory-revoking-threshold", "0.0");
properties.put("experimental.spiller-spill-path", Paths.get(System.getProperty("java.io.tmpdir"), "presto", "spills").toString());
properties.put("experimental.spiller-threads", Integer.toString(NODE_COUNT * TASK_CONCURRENCY));
properties.putAll(additionalConfigProperties);
return createHivePrestoSparkQueryRunner(tables, properties);
}

public static PrestoSparkQueryRunner createHivePrestoSparkQueryRunner(Iterable<TpchTable<?>> tables, Map<String, String> additionalConfigProperties)
{
PrestoSparkQueryRunner queryRunner = new PrestoSparkQueryRunner("hive", additionalConfigProperties);
Expand Down Expand Up @@ -210,6 +231,7 @@ public PrestoSparkQueryRunner(String defaultCatalog, Map<String, String> additio
configProperties.put("query.hash-partition-count", Integer.toString(NODE_COUNT * 2));
configProperties.put("task.writer-count", Integer.toString(2));
configProperties.put("task.partitioned-writer-count", Integer.toString(4));
configProperties.put("task.concurrency", Integer.toString(TASK_CONCURRENCY));
configProperties.putAll(additionalConfigProperties);

PrestoSparkInjectorFactory injectorFactory = new PrestoSparkInjectorFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueries;

public class TestPrestoSparkAbstractTestQueries
public class TestPrestoQueries
extends AbstractTestQueries
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static com.facebook.presto.spark.PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner;

public class TestPrestoSparkAbstractTestOrderByQueries
public class TestPrestoSparkOrderByQueries
extends AbstractTestOrderByQueries
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
package com.facebook.presto.spark;

import com.facebook.presto.testing.QueryRunner;
import com.google.common.collect.ImmutableMap;

import java.nio.file.Paths;

import static com.facebook.presto.spark.PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner;
import static com.facebook.presto.spark.PrestoSparkQueryRunner.createSpilledHivePrestoSparkQueryRunner;
import static io.airlift.tpch.TpchTable.getTables;

public class TestPrestoSparkSpilledAggregations
Expand All @@ -27,12 +24,6 @@ public class TestPrestoSparkSpilledAggregations
@Override
protected QueryRunner createQueryRunner()
{
ImmutableMap.Builder<String, String> configProperties = ImmutableMap.builder();
configProperties.put("experimental.spill-enabled", "true");
configProperties.put("experimental.join-spill-enabled", "true");
configProperties.put("experimental.temp-storage-buffer-size", "1MB");
configProperties.put("spark.memory-revoking-threshold", "0.0");
configProperties.put("experimental.spiller-spill-path", Paths.get(System.getProperty("java.io.tmpdir"), "presto", "spills").toString());
return createHivePrestoSparkQueryRunner(getTables(), configProperties.build());
return createSpilledHivePrestoSparkQueryRunner(getTables());
}
}
Loading