diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestDistributedQueries.java b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestDistributedQueries.java index c09861354c551..06113a84d9ef4 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestDistributedQueries.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestDistributedQueries.java @@ -26,12 +26,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedTimeoutException; import io.airlift.testing.Assertions; import io.airlift.units.Duration; import org.intellij.lang.annotations.Language; import org.testng.annotations.Test; import java.util.Optional; +import java.util.function.Supplier; import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_MEMORY; import static com.facebook.presto.connector.informationSchema.InformationSchemaMetadata.INFORMATION_SCHEMA; @@ -58,9 +60,11 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static io.airlift.units.Duration.nanosSince; import static java.lang.String.format; +import static java.lang.Thread.currentThread; import static java.util.Collections.nCopies; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -749,7 +753,11 @@ public void testQueryLoggingCount() ImmutableList.of()), new Duration(1, MINUTES)); - long beforeCompletedQueriesCount = queryManager.getStats().getCompletedQueries().getTotalCount(); + // We cannot simply get the number of completed queries as soon as all the queries are completed, because this counter may not be up-to-date at that point. + // The completed queries counter is updated in a final query info listener, which is called eventually. + // Therefore, here we wait until the value of this counter gets stable. + + long beforeCompletedQueriesCount = waitUntilStable(() -> queryManager.getStats().getCompletedQueries().getTotalCount(), new Duration(5, SECONDS)); long beforeSubmittedQueriesCount = queryManager.getStats().getSubmittedQueries().getTotalCount(); assertUpdate("CREATE TABLE test_query_logging_count AS SELECT 1 foo_1, 2 foo_2_4", 1); assertQuery("SELECT foo_1, foo_2_4 FROM test_query_logging_count", "SELECT 1, 2"); @@ -764,10 +772,25 @@ public void testQueryLoggingCount() }); } + private T waitUntilStable(Supplier computation, Duration timeout) + { + T lastValue = computation.get(); + long start = System.nanoTime(); + while (!currentThread().isInterrupted() && nanosSince(start).compareTo(timeout) < 0) { + sleepUninterruptibly(100, MILLISECONDS); + T currentValue = computation.get(); + if (currentValue.equals(lastValue)) { + return currentValue; + } + lastValue = currentValue; + } + throw new UncheckedTimeoutException(); + } + private static void assertUntilTimeout(Runnable assertion, Duration timeout) { long start = System.nanoTime(); - while (!Thread.currentThread().isInterrupted()) { + while (!currentThread().isInterrupted()) { try { assertion.run(); return; @@ -968,7 +991,7 @@ public void testComplexCast() .build(); // This is optimized using CAST(null AS interval day to second) which may be problematic to deserialize on worker assertQuery(session, "WITH t(a, b) AS (VALUES (1, INTERVAL '1' SECOND)) " + - "SELECT count(DISTINCT a), CAST(max(b) AS VARCHAR) FROM t", + "SELECT count(DISTINCT a), CAST(max(b) AS VARCHAR) FROM t", "VALUES (1, '0 00:00:01.000')"); } }