diff --git a/testing/trino-tests/src/test/java/io/trino/memory/TestMemoryManager.java b/testing/trino-tests/src/test/java/io/trino/memory/TestMemoryManager.java index 5587df7de6b8..f3cd57a459d2 100644 --- a/testing/trino-tests/src/test/java/io/trino/memory/TestMemoryManager.java +++ b/testing/trino-tests/src/test/java/io/trino/memory/TestMemoryManager.java @@ -15,11 +15,13 @@ import com.google.common.collect.ImmutableMap; import io.trino.Session; +import io.trino.plugin.blackhole.BlackHolePlugin; import io.trino.server.BasicQueryInfo; import io.trino.server.BasicQueryStats; import io.trino.server.testing.TestingTrinoServer; import io.trino.spi.QueryId; import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import io.trino.tests.tpch.TpchQueryRunnerBuilder; import org.intellij.lang.annotations.Language; @@ -31,21 +33,28 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import static com.google.common.base.Preconditions.checkState; import static io.trino.SystemSessionProperties.RESOURCE_OVERCOMMIT; +import static io.trino.execution.QueryState.FAILED; import static io.trino.execution.QueryState.FINISHED; import static io.trino.operator.BlockedReason.WAITING_FOR_MEMORY; import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY; import static io.trino.testing.TestingSession.testSessionBuilder; +import static io.trino.testing.assertions.Assert.assertEventually; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; // run single threaded to avoid creating multiple query runners at once @@ -100,7 +109,7 @@ public void testResourceOverCommit() } } - @Test(timeOut = 240_000, expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*Query killed because the cluster is out of memory. Please try again in a few minutes.") + @Test(timeOut = 240_000) public void testOutOfMemoryKiller() throws Exception { @@ -110,6 +119,12 @@ public void testOutOfMemoryKiller() .buildOrThrow(); try (DistributedQueryRunner queryRunner = createQueryRunner(TINY_SESSION, properties)) { + queryRunner.installPlugin(new BlackHolePlugin()); + queryRunner.createCatalog("blackhole", "blackhole"); + queryRunner.execute("" + + "CREATE TABLE blackhole.default.take_30s(dummy varchar(10)) " + + "WITH (split_count=1, pages_per_split=30, rows_per_page=1, page_processing_delay='1s')"); + // Reserve all the memory QueryId fakeQueryId = new QueryId("fake"); for (TestingTrinoServer server : queryRunner.getServers()) { @@ -117,11 +132,18 @@ public void testOutOfMemoryKiller() assertTrue(memoryPool.tryReserve(fakeQueryId, "test", memoryPool.getMaxBytes())); } - List> queryFutures = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - queryFutures.add(executor.submit(() -> queryRunner.execute("SELECT COUNT(*), clerk FROM orders GROUP BY clerk"))); + int queries = 2; + CompletionService completionService = new ExecutorCompletionService<>(executor); + for (int i = 0; i < queries; i++) { + completionService.submit(() -> queryRunner.execute("" + + "SELECT COUNT(*), clerk " + + "FROM (SELECT clerk FROM orders UNION ALL SELECT dummy FROM blackhole.default.take_30s)" + + "GROUP BY clerk")); } + // Wait for queries to start + assertEventually(() -> assertThat(queryRunner.getCoordinator().getQueryManager().getQueries()).hasSize(1 + queries)); + // Wait for one of the queries to die waitForQueryToBeKilled(queryRunner); @@ -133,9 +155,13 @@ public void testOutOfMemoryKiller() assertTrue(pool.getFreeBytes() > 0); } - for (Future query : queryFutures) { - query.get(); - } + assertThatThrownBy(() -> { + for (int i = 0; i < queries; i++) { + completionService.take().get(); + } + }) + .isInstanceOf(ExecutionException.class) + .hasMessageMatching(".*Query killed because the cluster is out of memory. Please try again in a few minutes."); } } @@ -143,13 +169,18 @@ private void waitForQueryToBeKilled(DistributedQueryRunner queryRunner) throws InterruptedException { while (true) { + boolean hasRunningQuery = false; for (BasicQueryInfo info : queryRunner.getCoordinator().getQueryManager().getQueries()) { - if (info.getState().isDone()) { - assertNotNull(info.getErrorCode()); + if (info.getState() == FAILED) { assertEquals(info.getErrorCode(), CLUSTER_OUT_OF_MEMORY.toErrorCode()); return; } + assertNull(info.getErrorCode(), "errorCode unexpectedly present for " + info); + if (!info.getState().isDone()) { + hasRunningQuery = true; + } } + checkState(hasRunningQuery, "All queries already completed without failure"); MILLISECONDS.sleep(10); } }