diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 908237302c77d..8c6c52ee0f6c5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -297,7 +297,9 @@ jobs: - ":presto-elasticsearch" - ":presto-orc" - ":presto-thrift-connector" - - ":presto-spark-base" + - ":presto-spark-base -P presto-spark-tests-smoke" + - ":presto-spark-base -P presto-spark-tests-all-queries" + - ":presto-spark-base -P presto-spark-tests-spill-queries" timeout-minutes: 80 steps: - uses: actions/checkout@v2 diff --git a/presto-main/src/main/java/com/facebook/presto/operator/LookupJoinOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/LookupJoinOperator.java index d78a609e62e16..d1c0299e03a8b 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/LookupJoinOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/LookupJoinOperator.java @@ -381,6 +381,7 @@ private void tryUnspillNext() } currentPartition.ifPresent(Partition::release); + currentPartition = Optional.empty(); if (lookupSourceProvider != null) { // There are no more partitions to process, so clean up everything lookupSourceProvider.close(); @@ -510,6 +511,19 @@ public void close() closed = true; probe = null; + // In case of early termination (before operator is finished) release partition consumption to avoid a deadlock + if (partitionedConsumption == null) { + partitionedConsumption = lookupSourceFactory.finishProbeOperator(lookupJoinsCount); + addSuccessCallback(partitionedConsumption, consumption -> consumption.beginConsumption().forEachRemaining(Partition::release)); + } + currentPartition.ifPresent(Partition::release); + currentPartition = Optional.empty(); + if (lookupPartitions != null) { + while (lookupPartitions.hasNext()) { + lookupPartitions.next().release(); + } + } + try (Closer closer = Closer.create()) { // `afterClose` must be run last. // Closer is documented to mimic try-with-resource, which implies close will happen in reverse order. diff --git a/presto-main/src/main/java/com/facebook/presto/operator/PartitionedConsumption.java b/presto-main/src/main/java/com/facebook/presto/operator/PartitionedConsumption.java index d677970e5090e..49b6423300e56 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/PartitionedConsumption.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/PartitionedConsumption.java @@ -152,7 +152,6 @@ public ListenableFuture load() public synchronized void release() { - checkState(loaded.isDone()); pendingReleases--; checkState(pendingReleases >= 0); if (pendingReleases == 0) { diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java index ea184084c30a8..43ac7dff1395d 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java @@ -488,6 +488,129 @@ private void innerJoinWithSpill(boolean probeHashEnabled, List whenSp } } + @Test(timeOut = 60000) + public void testInnerJoinWithSpillWithEarlyTermination() + { + TaskStateMachine taskStateMachine = new TaskStateMachine(new TaskId("query", 0, 0, 0), executor); + TaskContext taskContext = TestingTaskContext.createTaskContext(executor, scheduledExecutor, TEST_SESSION, taskStateMachine); + + PipelineContext joinPipelineContext = taskContext.addPipelineContext(2, true, true, false); + DriverContext joinDriverContext1 = joinPipelineContext.addDriverContext(); + DriverContext joinDriverContext2 = joinPipelineContext.addDriverContext(); + DriverContext joinDriverContext3 = joinPipelineContext.addDriverContext(); + + // build factory + RowPagesBuilder buildPages = rowPagesBuilder(ImmutableList.of(VARCHAR, BIGINT)) + .addSequencePage(4, 20, 200) + .addSequencePage(4, 20, 200) + .addSequencePage(4, 30, 300) + .addSequencePage(4, 40, 400); + + // force a yield for every match in LookupJoinOperator, set called to true after first + AtomicBoolean called = new AtomicBoolean(false); + InternalJoinFilterFunction filterFunction = new TestInternalJoinFilterFunction( + (leftPosition, leftPage, rightPosition, rightPage) -> { + called.set(true); + return true; + }); + + BuildSideSetup buildSideSetup = setupBuildSide(true, taskContext, Ints.asList(0), buildPages, Optional.of(filterFunction), true, SINGLE_STREAM_SPILLER_FACTORY); + JoinBridgeManager lookupSourceFactoryManager = buildSideSetup.getLookupSourceFactoryManager(); + + // probe factory + RowPagesBuilder probe1Pages = rowPagesBuilder(true, Ints.asList(0), ImmutableList.of(VARCHAR, BIGINT)) + .row("no_match_1", 123_000L) + .row("no_match_2", 123_000L); + RowPagesBuilder probe2Pages = rowPagesBuilder(true, Ints.asList(0), ImmutableList.of(VARCHAR, BIGINT)) + .row("20", 123_000L) + .row("20", 123_000L) + .pageBreak() + .addSequencePage(20, 0, 123_000) + .addSequencePage(10, 30, 123_000); + OperatorFactory joinOperatorFactory = innerJoinOperatorFactory(lookupSourceFactoryManager, probe2Pages, PARTITIONING_SPILLER_FACTORY, OptionalInt.of(3)); + + // build drivers and operators + instantiateBuildDrivers(buildSideSetup, taskContext); + List buildDrivers = buildSideSetup.getBuildDrivers(); + int buildOperatorCount = buildDrivers.size(); + LookupSourceFactory lookupSourceFactory = lookupSourceFactoryManager.getJoinBridge(Lifespan.taskWide()); + + Operator lookupOperator1 = joinOperatorFactory.createOperator(joinDriverContext1); + Operator lookupOperator2 = joinOperatorFactory.createOperator(joinDriverContext2); + Operator lookupOperator3 = joinOperatorFactory.createOperator(joinDriverContext3); + joinOperatorFactory.noMoreOperators(); + + ListenableFuture lookupSourceProvider = lookupSourceFactory.createLookupSourceProvider(); + while (!lookupSourceProvider.isDone()) { + for (Driver buildDriver : buildDrivers) { + checkErrors(taskStateMachine); + buildDriver.process(); + } + } + getFutureValue(lookupSourceProvider).close(); + + for (int i = 0; i < buildOperatorCount; i++) { + revokeMemory(buildSideSetup.getBuildOperators().get(i)); + } + + for (Driver buildDriver : buildDrivers) { + runDriverInThread(executor, buildDriver); + } + + ValuesOperatorFactory valuesOperatorFactory1 = new ValuesOperatorFactory(17, new PlanNodeId("values1"), probe1Pages.build()); + ValuesOperatorFactory valuesOperatorFactory2 = new ValuesOperatorFactory(18, new PlanNodeId("values2"), probe2Pages.build()); + ValuesOperatorFactory valuesOperatorFactory3 = new ValuesOperatorFactory(18, new PlanNodeId("values3"), ImmutableList.of()); + PageBuffer pageBuffer = new PageBuffer(10); + PageBufferOperatorFactory pageBufferOperatorFactory = new PageBufferOperatorFactory(19, new PlanNodeId("pageBuffer"), pageBuffer); + + Driver joinDriver1 = Driver.createDriver( + joinDriverContext1, + valuesOperatorFactory1.createOperator(joinDriverContext1), + lookupOperator1, + pageBufferOperatorFactory.createOperator(joinDriverContext1)); + Driver joinDriver2 = Driver.createDriver( + joinDriverContext2, + valuesOperatorFactory2.createOperator(joinDriverContext2), + lookupOperator2, + pageBufferOperatorFactory.createOperator(joinDriverContext2)); + Driver joinDriver3 = Driver.createDriver( + joinDriverContext3, + valuesOperatorFactory3.createOperator(joinDriverContext3), + lookupOperator3, + pageBufferOperatorFactory.createOperator(joinDriverContext3)); + + joinDriver3.close(); + joinDriver3.process(); + + while (!called.get()) { + checkErrors(taskStateMachine); + processRow(joinDriver1, taskStateMachine); + processRow(joinDriver2, taskStateMachine); + } + joinDriver1.close(); + joinDriver1.process(); + + while (!joinDriver2.isFinished()) { + processRow(joinDriver2, taskStateMachine); + } + checkErrors(taskStateMachine); + + List pages = getPages(pageBuffer); + + MaterializedResult expected = MaterializedResult.resultBuilder(taskContext.getSession(), concat(probe2Pages.getTypesWithoutHash(), buildPages.getTypesWithoutHash())) + .row("20", 123_000L, "20", 200L) + .row("20", 123_000L, "20", 200L) + .row("20", 123_000L, "20", 200L) + .row("20", 123_000L, "20", 200L) + .row("30", 123_000L, "30", 300L) + .row("31", 123_001L, "31", 301L) + .row("32", 123_002L, "32", 302L) + .row("33", 123_003L, "33", 303L) + .build(); + + assertEqualsIgnoreOrder(getProperColumns(lookupOperator1, concat(probe2Pages.getTypes(), buildPages.getTypes()), probe2Pages, pages).getMaterializedRows(), expected.getMaterializedRows()); + } + private static void processRow(final Driver joinDriver, final TaskStateMachine taskStateMachine) { joinDriver.getDriverContext().getYieldSignal().setWithDelay(TimeUnit.SECONDS.toNanos(1), joinDriver.getDriverContext().getYieldExecutor()); @@ -1325,7 +1448,19 @@ private OperatorFactory probeOuterJoinOperatorFactory(JoinBridgeManager lookupSourceFactoryManager, RowPagesBuilder probePages, PartitioningSpillerFactory partitioningSpillerFactory) + private OperatorFactory innerJoinOperatorFactory( + JoinBridgeManager lookupSourceFactoryManager, + RowPagesBuilder probePages, + PartitioningSpillerFactory partitioningSpillerFactory) + { + return innerJoinOperatorFactory(lookupSourceFactoryManager, probePages, partitioningSpillerFactory, OptionalInt.of(1)); + } + + private OperatorFactory innerJoinOperatorFactory( + JoinBridgeManager lookupSourceFactoryManager, + RowPagesBuilder probePages, + PartitioningSpillerFactory partitioningSpillerFactory, + OptionalInt totalOperatorsCount) { return LOOKUP_JOIN_OPERATORS.innerJoin( 0, @@ -1335,7 +1470,7 @@ private OperatorFactory innerJoinOperatorFactory(JoinBridgeManager + + + presto-spark-tests-smoke + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/tests/*.java + + + **/TestPrestoSparkAggregations.java + **/TestPrestoSparkJoinQueries.java + **/TestPrestoSparkOrderByQueries.java + **/TestPrestoSparkWindowQueries.java + **/TestPrestoSparkSpilledAggregations.java + **/TestPrestoSparkSpilledJoinQueries.java + + + + + + + + presto-spark-tests-all-queries + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/TestPrestoSparkAggregations.java + **/TestPrestoSparkJoinQueries.java + **/TestPrestoSparkOrderByQueries.java + **/TestPrestoSparkWindowQueries.java + + + + + + + + presto-spark-tests-spill-queries + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/TestPrestoSparkSpilledAggregations.java + **/TestPrestoSparkSpilledJoinQueries.java + + + + + + + diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java index 05cd7fa8d16b1..3e0e91083ada0 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/PrestoSparkQueryRunner.java @@ -70,6 +70,8 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.file.Paths; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -113,6 +115,7 @@ public class PrestoSparkQueryRunner private static final Logger log = Logger.get(PrestoSparkQueryRunner.class); private static final int NODE_COUNT = 4; + private static final int TASK_CONCURRENCY = 4; private static final Map instances = new ConcurrentHashMap<>(); private static final SparkContextHolder sparkContextHolder = new SparkContextHolder(); @@ -150,11 +153,29 @@ public static PrestoSparkQueryRunner createHivePrestoSparkQueryRunner() return createHivePrestoSparkQueryRunner(getTables()); } + public static PrestoSparkQueryRunner createSpilledHivePrestoSparkQueryRunner(Iterable> tables) + { + return createSpilledHivePrestoSparkQueryRunner(tables, ImmutableMap.of()); + } + public static PrestoSparkQueryRunner createHivePrestoSparkQueryRunner(Iterable> tables) { return createHivePrestoSparkQueryRunner(tables, ImmutableMap.of()); } + public static PrestoSparkQueryRunner createSpilledHivePrestoSparkQueryRunner(Iterable> tables, Map additionalConfigProperties) + { + Map properties = new HashMap<>(); + properties.put("experimental.spill-enabled", "true"); + properties.put("experimental.join-spill-enabled", "true"); + properties.put("experimental.temp-storage-buffer-size", "1MB"); + properties.put("spark.memory-revoking-threshold", "0.0"); + properties.put("experimental.spiller-spill-path", Paths.get(System.getProperty("java.io.tmpdir"), "presto", "spills").toString()); + properties.put("experimental.spiller-threads", Integer.toString(NODE_COUNT * TASK_CONCURRENCY)); + properties.putAll(additionalConfigProperties); + return createHivePrestoSparkQueryRunner(tables, properties); + } + public static PrestoSparkQueryRunner createHivePrestoSparkQueryRunner(Iterable> tables, Map additionalConfigProperties) { PrestoSparkQueryRunner queryRunner = new PrestoSparkQueryRunner("hive", additionalConfigProperties); @@ -210,6 +231,7 @@ public PrestoSparkQueryRunner(String defaultCatalog, Map additio configProperties.put("query.hash-partition-count", Integer.toString(NODE_COUNT * 2)); configProperties.put("task.writer-count", Integer.toString(2)); configProperties.put("task.partitioned-writer-count", Integer.toString(4)); + configProperties.put("task.concurrency", Integer.toString(TASK_CONCURRENCY)); configProperties.putAll(additionalConfigProperties); PrestoSparkInjectorFactory injectorFactory = new PrestoSparkInjectorFactory( diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkAbstractTestQueries.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoQueries.java similarity index 98% rename from presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkAbstractTestQueries.java rename to presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoQueries.java index 3bfcc7b23f728..b716ac715f195 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkAbstractTestQueries.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoQueries.java @@ -16,7 +16,7 @@ import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.AbstractTestQueries; -public class TestPrestoSparkAbstractTestQueries +public class TestPrestoQueries extends AbstractTestQueries { @Override diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkAbstractTestOrderByQueries.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkOrderByQueries.java similarity index 94% rename from presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkAbstractTestOrderByQueries.java rename to presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkOrderByQueries.java index 7b92b48f1b34b..47e0f1fea5f9a 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkAbstractTestOrderByQueries.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkOrderByQueries.java @@ -18,7 +18,7 @@ import static com.facebook.presto.spark.PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner; -public class TestPrestoSparkAbstractTestOrderByQueries +public class TestPrestoSparkOrderByQueries extends AbstractTestOrderByQueries { @Override diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkSpilledAggregations.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkSpilledAggregations.java index 1bc29f387cb68..19461e9a42f5b 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkSpilledAggregations.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkSpilledAggregations.java @@ -14,11 +14,8 @@ package com.facebook.presto.spark; import com.facebook.presto.testing.QueryRunner; -import com.google.common.collect.ImmutableMap; -import java.nio.file.Paths; - -import static com.facebook.presto.spark.PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner; +import static com.facebook.presto.spark.PrestoSparkQueryRunner.createSpilledHivePrestoSparkQueryRunner; import static io.airlift.tpch.TpchTable.getTables; public class TestPrestoSparkSpilledAggregations @@ -27,12 +24,6 @@ public class TestPrestoSparkSpilledAggregations @Override protected QueryRunner createQueryRunner() { - ImmutableMap.Builder configProperties = ImmutableMap.builder(); - configProperties.put("experimental.spill-enabled", "true"); - configProperties.put("experimental.join-spill-enabled", "true"); - configProperties.put("experimental.temp-storage-buffer-size", "1MB"); - configProperties.put("spark.memory-revoking-threshold", "0.0"); - configProperties.put("experimental.spiller-spill-path", Paths.get(System.getProperty("java.io.tmpdir"), "presto", "spills").toString()); - return createHivePrestoSparkQueryRunner(getTables(), configProperties.build()); + return createSpilledHivePrestoSparkQueryRunner(getTables()); } } diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkSpilledJoinQueries.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkSpilledJoinQueries.java index 50fabfe0055b0..f500b932ee3bc 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkSpilledJoinQueries.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkSpilledJoinQueries.java @@ -14,12 +14,8 @@ package com.facebook.presto.spark; import com.facebook.presto.testing.QueryRunner; -import com.google.common.collect.ImmutableMap; -import org.testng.annotations.Test; -import java.nio.file.Paths; - -import static com.facebook.presto.spark.PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner; +import static com.facebook.presto.spark.PrestoSparkQueryRunner.createSpilledHivePrestoSparkQueryRunner; import static io.airlift.tpch.TpchTable.getTables; public class TestPrestoSparkSpilledJoinQueries @@ -28,25 +24,6 @@ public class TestPrestoSparkSpilledJoinQueries @Override protected QueryRunner createQueryRunner() { - ImmutableMap.Builder configProperties = ImmutableMap.builder(); - configProperties.put("experimental.spill-enabled", "true"); - configProperties.put("experimental.join-spill-enabled", "true"); - configProperties.put("task.concurrency", "2"); - configProperties.put("experimental.temp-storage-buffer-size", "1MB"); - configProperties.put("spark.memory-revoking-threshold", "0.0"); - configProperties.put("experimental.spiller-spill-path", Paths.get(System.getProperty("java.io.tmpdir"), "presto", "spills").toString()); - return createHivePrestoSparkQueryRunner(getTables(), configProperties.build()); - } - - // Presto on Spark execution triggers test hanging easily for some unknown reason - // Given this is a known flaky test https://github.com/prestodb/presto/issues/13859, disable it for now - @Test(enabled = false) - @Override - public void testLimitWithJoin() - { - // Join with limit triggers test hanging consistently in Presto on Spark environment - // It is related to limit cause because query always succeeds without limit clause, - // likely related to early termination of unSpilled partitions - // Decreasing task_concurrency and hash_partition_count makes this query succeed + return createSpilledHivePrestoSparkQueryRunner(getTables()); } } diff --git a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkAbstractTestWindowQueries.java b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkWindowQueries.java similarity index 94% rename from presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkAbstractTestWindowQueries.java rename to presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkWindowQueries.java index 4ca8971017913..49183adfb141e 100644 --- a/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkAbstractTestWindowQueries.java +++ b/presto-spark-base/src/test/java/com/facebook/presto/spark/TestPrestoSparkWindowQueries.java @@ -18,7 +18,7 @@ import static com.facebook.presto.spark.PrestoSparkQueryRunner.createHivePrestoSparkQueryRunner; -public class TestPrestoSparkAbstractTestWindowQueries +public class TestPrestoSparkWindowQueries extends AbstractTestWindowQueries { @Override