From 88c29017f244f82d6f3f390c6723d447e31c07fe Mon Sep 17 00:00:00 2001 From: Matt Deady Date: Fri, 10 Mar 2023 10:32:04 -0500 Subject: [PATCH] Fix BaseFailureRecoveryTest temp table checking flakiness --- .../testing/BaseFailureRecoveryTest.java | 97 ++++++++++++++----- 1 file changed, 73 insertions(+), 24 deletions(-) diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java index 22edf629405a..7404db1d3ca1 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java @@ -13,6 +13,7 @@ */ package io.trino.testing; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.graph.Traverser; @@ -30,10 +31,13 @@ import org.testng.annotations.Test; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; +import java.util.Set; import java.util.UUID; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; @@ -346,7 +350,8 @@ protected void testNonSelect(Optional session, Optional setupQu .withSession(session) .withSetupQuery(setupQuery) .withCleanupQuery(cleanupQuery) - .failsDespiteRetries(failure -> failure.hasMessageMatching("This connector does not support query retries")); + .failsDespiteRetries(failure -> failure.hasMessageMatching("This connector does not support query retries")) + .cleansUpTemporaryTables(); return; } @@ -356,7 +361,8 @@ protected void testNonSelect(Optional session, Optional setupQu .withCleanupQuery(cleanupQuery) .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)) .at(boundaryCoordinatorStage()) - .failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE)); + .failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE)) + .cleansUpTemporaryTables(); assertThatQuery(query) .withSession(session) @@ -364,7 +370,8 @@ protected void testNonSelect(Optional session, Optional setupQu .withCleanupQuery(cleanupQuery) .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)) .at(rootStage()) - .failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE)); + .failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE)) + .cleansUpTemporaryTables(); assertThatQuery(query) .withSession(session) @@ -373,7 +380,8 @@ protected void testNonSelect(Optional session, Optional setupQu .experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR)) .at(boundaryDistributedStage()) .failsWithoutRetries(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE)) - .finishesSuccessfully(); + .finishesSuccessfully() + .cleansUpTemporaryTables(); assertThatQuery(query) .withSetupQuery(setupQuery) @@ -381,7 +389,8 @@ protected void testNonSelect(Optional session, Optional setupQu .experiencing(TASK_MANAGEMENT_REQUEST_TIMEOUT) .at(boundaryDistributedStage()) .failsWithoutRetries(failure -> failure.hasMessageContaining("Encountered too many errors talking to a worker node")) - .finishesSuccessfully(); + .finishesSuccessfully() + .cleansUpTemporaryTables(); if (getRetryPolicy() == RetryPolicy.QUERY) { assertThatQuery(query) @@ -391,7 +400,8 @@ protected void testNonSelect(Optional session, Optional setupQu .experiencing(TASK_GET_RESULTS_REQUEST_FAILURE) .at(boundaryDistributedStage()) .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500")) - .finishesSuccessfully(); + .finishesSuccessfully() + .cleansUpTemporaryTables(); assertThatQuery(query) .withSetupQuery(setupQuery) @@ -399,7 +409,8 @@ protected void testNonSelect(Optional session, Optional setupQu .experiencing(TASK_GET_RESULTS_REQUEST_TIMEOUT) .at(boundaryDistributedStage()) .failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Encountered too many errors talking to a worker node|Error closing remote buffer")) - .finishesSuccessfully(); + .finishesSuccessfully() + .cleansUpTemporaryTables(); } } @@ -408,6 +419,45 @@ protected FailureRecoveryAssert assertThatQuery(String query) return new FailureRecoveryAssert(query); } + // Provided as a protected method here in case this is not a one-sized-fits-all solution + protected void checkTemporaryTables(Set queryIds) + { + // queryId -> temporary table names + Map> remainingTemporaryTables = new HashMap<>(); + // queryId -> assertion messages + Map> assertionErrorMessages = new HashMap<>(); + for (String queryId : queryIds) { + String temporaryTablePrefix = temporaryTableNamePrefix(queryId); + MaterializedResult temporaryTablesResult = getQueryRunner() + .execute("SHOW TABLES LIKE '%s%%' ESCAPE '\\'".formatted(temporaryTablePrefix.replace("_", "\\_"))); + // Unfortunately, information_schema is not strictly consistent with recently dropped tables, + // and for some connectors, it can return tables that have been recently dropped. Therefore, + // we can't rely simply on SHOW TABLES LIKE returning no results - we have to try to query the table + for (MaterializedRow temporaryTableRow : temporaryTablesResult.getMaterializedRows()) { + String temporaryTableName = (String) temporaryTableRow.getField(0); + try { + assertThatThrownBy(() -> getQueryRunner().execute("SELECT 1 FROM %s WHERE 1 = 0".formatted(temporaryTableName))) + .hasMessageContaining("%s does not exist", temporaryTableName); + } + catch (AssertionError e) { + remainingTemporaryTables.computeIfAbsent(queryId, ignored -> new HashSet<>()).add(temporaryTableName); + assertionErrorMessages.computeIfAbsent(queryId, ignored -> new HashSet<>()).add(e.getMessage()); + } + } + } + + assertThat(remainingTemporaryTables.isEmpty()) + .as("There should be no remaining tmp_trino tables that are queryable. They are:\n%s", + remainingTemporaryTables.entrySet().stream() + .map(entry -> "\tFor queryId [%s] (prefix [%s]) remaining tables: [%s]\n\t\tWith errors: [%s]".formatted( + entry.getKey(), + temporaryTableNamePrefix(entry.getKey()), + Joiner.on(",").join(entry.getValue()), + Joiner.on("],\n[").join(assertionErrorMessages.get(entry.getKey())).replace("\n", "\n\t\t\t"))) + .collect(joining("\n"))) + .isTrue(); + } + protected class FailureRecoveryAssert { private final String query; @@ -417,6 +467,7 @@ protected class FailureRecoveryAssert private Optional errorType = Optional.empty(); private Optional setup = Optional.empty(); private Optional cleanup = Optional.empty(); + private Set queryIds = new HashSet<>(); public FailureRecoveryAssert(String query) { @@ -519,16 +570,7 @@ private ExecutionResult execute(Session session, String query, Optional } if (queryId != null) { - String temporaryTablePrefix = temporaryTableNamePrefix(queryId); - MaterializedResult temporaryTablesResult = getQueryRunner() - .execute("SHOW TABLES LIKE '%s%%' ESCAPE '\\'".formatted(temporaryTablePrefix.replace("_", "\\_"))); - assertThat(temporaryTablesResult.getRowCount()) - .as("There should be no remaining %s* tables. They are: [%s]", - temporaryTablePrefix, - temporaryTablesResult.getMaterializedRows().stream() - .map(row -> row.getField(0).toString()) - .collect(joining(","))) - .isEqualTo(0); + queryIds.add(queryId); } MaterializedResult result = resultWithQueryId == null ? null : resultWithQueryId.getResult(); @@ -572,22 +614,28 @@ public void isCoordinatorOnly() assertThat(subStages).isEmpty(); } - public void finishesSuccessfully() + public FailureRecoveryAssert cleansUpTemporaryTables() + { + checkTemporaryTables(queryIds); + return this; + } + + public FailureRecoveryAssert finishesSuccessfully() { - finishesSuccessfully(queryId -> {}); + return finishesSuccessfully(queryId -> {}); } - public void finishesSuccessfullyWithoutTaskFailures() + public FailureRecoveryAssert finishesSuccessfullyWithoutTaskFailures() { - finishesSuccessfully(queryId -> {}, false); + return finishesSuccessfully(queryId -> {}, false); } - private void finishesSuccessfully(Consumer queryAssertion) + private FailureRecoveryAssert finishesSuccessfully(Consumer queryAssertion) { - finishesSuccessfully(queryAssertion, true); + return finishesSuccessfully(queryAssertion, true); } - public void finishesSuccessfully(Consumer queryAssertion, boolean expectTaskFailures) + public FailureRecoveryAssert finishesSuccessfully(Consumer queryAssertion, boolean expectTaskFailures) { verifyFailureTypeAndStageSelector(); ExecutionResult expected = executeExpected(); @@ -657,6 +705,7 @@ else if (isExplain) { } queryAssertion.accept(actual.getQueryId()); + return this; } public FailureRecoveryAssert failsAlways(Consumer> failureAssertion)