Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.testing;

import com.google.common.base.Joiner;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please drop issue from commit message

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.graph.Traverser;
Expand All @@ -30,10 +31,13 @@
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -346,7 +350,8 @@ protected void testNonSelect(Optional<Session> session, Optional<String> setupQu
.withSession(session)
.withSetupQuery(setupQuery)
.withCleanupQuery(cleanupQuery)
.failsDespiteRetries(failure -> failure.hasMessageMatching("This connector does not support query retries"));
.failsDespiteRetries(failure -> failure.hasMessageMatching("This connector does not support query retries"))
.cleansUpTemporaryTables();
return;
}

Expand All @@ -356,15 +361,17 @@ protected void testNonSelect(Optional<Session> session, Optional<String> setupQu
.withCleanupQuery(cleanupQuery)
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
.at(boundaryCoordinatorStage())
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))
.cleansUpTemporaryTables();

assertThatQuery(query)
.withSession(session)
.withSetupQuery(setupQuery)
.withCleanupQuery(cleanupQuery)
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
.at(rootStage())
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE));
.failsAlways(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))
.cleansUpTemporaryTables();

assertThatQuery(query)
.withSession(session)
Expand All @@ -373,15 +380,17 @@ protected void testNonSelect(Optional<Session> session, Optional<String> setupQu
.experiencing(TASK_FAILURE, Optional.of(ErrorType.INTERNAL_ERROR))
.at(boundaryDistributedStage())
.failsWithoutRetries(failure -> failure.hasMessageContaining(FAILURE_INJECTION_MESSAGE))
.finishesSuccessfully();
.finishesSuccessfully()
.cleansUpTemporaryTables();

assertThatQuery(query)
.withSetupQuery(setupQuery)
.withCleanupQuery(cleanupQuery)
.experiencing(TASK_MANAGEMENT_REQUEST_TIMEOUT)
.at(boundaryDistributedStage())
.failsWithoutRetries(failure -> failure.hasMessageContaining("Encountered too many errors talking to a worker node"))
.finishesSuccessfully();
.finishesSuccessfully()
.cleansUpTemporaryTables();

if (getRetryPolicy() == RetryPolicy.QUERY) {
assertThatQuery(query)
Expand All @@ -391,15 +400,17 @@ protected void testNonSelect(Optional<Session> session, Optional<String> setupQu
.experiencing(TASK_GET_RESULTS_REQUEST_FAILURE)
.at(boundaryDistributedStage())
.failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Error 500 Internal Server Error|Error closing remote buffer, expected 204 got 500"))
.finishesSuccessfully();
.finishesSuccessfully()
.cleansUpTemporaryTables();

assertThatQuery(query)
.withSetupQuery(setupQuery)
.withCleanupQuery(cleanupQuery)
.experiencing(TASK_GET_RESULTS_REQUEST_TIMEOUT)
.at(boundaryDistributedStage())
.failsWithoutRetries(failure -> failure.hasMessageFindingMatch("Encountered too many errors talking to a worker node|Error closing remote buffer"))
.finishesSuccessfully();
.finishesSuccessfully()
.cleansUpTemporaryTables();
}
}

Expand All @@ -408,6 +419,45 @@ protected FailureRecoveryAssert assertThatQuery(String query)
return new FailureRecoveryAssert(query);
}

// Provided as a protected method here in case this is not a one-sized-fits-all solution
protected void checkTemporaryTables(Set<String> queryIds)
{
// queryId -> temporary table names
Map<String, Set<String>> remainingTemporaryTables = new HashMap<>();
// queryId -> assertion messages
Map<String, Set<String>> assertionErrorMessages = new HashMap<>();
for (String queryId : queryIds) {
String temporaryTablePrefix = temporaryTableNamePrefix(queryId);
MaterializedResult temporaryTablesResult = getQueryRunner()
.execute("SHOW TABLES LIKE '%s%%' ESCAPE '\\'".formatted(temporaryTablePrefix.replace("_", "\\_")));
// Unfortunately, information_schema is not strictly consistent with recently dropped tables,
// and for some connectors, it can return tables that have been recently dropped. Therefore,
// we can't rely simply on SHOW TABLES LIKE returning no results - we have to try to query the table
for (MaterializedRow temporaryTableRow : temporaryTablesResult.getMaterializedRows()) {
String temporaryTableName = (String) temporaryTableRow.getField(0);
try {
assertThatThrownBy(() -> getQueryRunner().execute("SELECT 1 FROM %s WHERE 1 = 0".formatted(temporaryTableName)))
.hasMessageContaining("%s does not exist", temporaryTableName);
}
catch (AssertionError e) {
remainingTemporaryTables.computeIfAbsent(queryId, ignored -> new HashSet<>()).add(temporaryTableName);
assertionErrorMessages.computeIfAbsent(queryId, ignored -> new HashSet<>()).add(e.getMessage());
}
}
}

assertThat(remainingTemporaryTables.isEmpty())
.as("There should be no remaining tmp_trino tables that are queryable. They are:\n%s",
remainingTemporaryTables.entrySet().stream()
.map(entry -> "\tFor queryId [%s] (prefix [%s]) remaining tables: [%s]\n\t\tWith errors: [%s]".formatted(
entry.getKey(),
temporaryTableNamePrefix(entry.getKey()),
Joiner.on(",").join(entry.getValue()),
Joiner.on("],\n[").join(assertionErrorMessages.get(entry.getKey())).replace("\n", "\n\t\t\t")))
.collect(joining("\n")))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example message from this:

There should be no remaining tmp_trino tables that are queryable. They are:
	For queryId [20230310_183735_00015_ii5en] (prefix [tmp_trino_31f5781f_]) remaining tables: [tmp_trino_31f5781f_14b3eb70]
		With errors: [
			Expecting code to raise a throwable.]
	For queryId [20230310_183738_00020_ii5en] (prefix [tmp_trino_5ce0903c_]) remaining tables: [tmp_trino_5ce0903c_2f4304bc]
		With errors: [
			Expecting code to raise a throwable.]

.isTrue();
}

protected class FailureRecoveryAssert
{
private final String query;
Expand All @@ -417,6 +467,7 @@ protected class FailureRecoveryAssert
private Optional<ErrorType> errorType = Optional.empty();
private Optional<String> setup = Optional.empty();
private Optional<String> cleanup = Optional.empty();
private Set<String> queryIds = new HashSet<>();

public FailureRecoveryAssert(String query)
{
Expand Down Expand Up @@ -519,16 +570,7 @@ private ExecutionResult execute(Session session, String query, Optional<String>
}

if (queryId != null) {
String temporaryTablePrefix = temporaryTableNamePrefix(queryId);
MaterializedResult temporaryTablesResult = getQueryRunner()
.execute("SHOW TABLES LIKE '%s%%' ESCAPE '\\'".formatted(temporaryTablePrefix.replace("_", "\\_")));
assertThat(temporaryTablesResult.getRowCount())
.as("There should be no remaining %s* tables. They are: [%s]",
temporaryTablePrefix,
temporaryTablesResult.getMaterializedRows().stream()
.map(row -> row.getField(0).toString())
.collect(joining(",")))
.isEqualTo(0);
queryIds.add(queryId);
}

MaterializedResult result = resultWithQueryId == null ? null : resultWithQueryId.getResult();
Expand Down Expand Up @@ -572,22 +614,28 @@ public void isCoordinatorOnly()
assertThat(subStages).isEmpty();
}

public void finishesSuccessfully()
public FailureRecoveryAssert cleansUpTemporaryTables()
{
checkTemporaryTables(queryIds);
return this;
}

public FailureRecoveryAssert finishesSuccessfully()
{
finishesSuccessfully(queryId -> {});
return finishesSuccessfully(queryId -> {});
}

public void finishesSuccessfullyWithoutTaskFailures()
public FailureRecoveryAssert finishesSuccessfullyWithoutTaskFailures()
{
finishesSuccessfully(queryId -> {}, false);
return finishesSuccessfully(queryId -> {}, false);
}

private void finishesSuccessfully(Consumer<QueryId> queryAssertion)
private FailureRecoveryAssert finishesSuccessfully(Consumer<QueryId> queryAssertion)
{
finishesSuccessfully(queryAssertion, true);
return finishesSuccessfully(queryAssertion, true);
}

public void finishesSuccessfully(Consumer<QueryId> queryAssertion, boolean expectTaskFailures)
public FailureRecoveryAssert finishesSuccessfully(Consumer<QueryId> queryAssertion, boolean expectTaskFailures)
{
verifyFailureTypeAndStageSelector();
ExecutionResult expected = executeExpected();
Expand Down Expand Up @@ -657,6 +705,7 @@ else if (isExplain) {
}

queryAssertion.accept(actual.getQueryId());
return this;
}

public FailureRecoveryAssert failsAlways(Consumer<AbstractThrowableAssert<?, ? extends Throwable>> failureAssertion)
Expand Down