Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,6 @@ public void testDeleteWithLike()
.hasStackTraceContaining("TrinoException: " + MODIFYING_ROWS_MESSAGE);
}

@Test
@Override
public void testReadMetadataWithRelationsConcurrentModifications()
{
// Under concurrently, H2 sometimes returns null table name in DatabaseMetaData.getTables's ResultSet
// See https://github.com/trinodb/trino/issues/16658 for more information
abort("Skipped due to H2 problems");
}

@Test
public void testUnknownTypeAsIgnored()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,14 +571,6 @@ public void testShowCreateTable()
")");
}

@Test
@Override
public void testReadMetadataWithRelationsConcurrentModifications()
{
// TODO: Enable this test after fixing "Task did not completed before timeout" (https://github.com/trinodb/trino/issues/14230)
abort("Test fails with a timeout sometimes and is flaky");
}

@Test
public void testSkipUnsupportedType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,30 +731,6 @@ public void testWrittenStats()
// TODO Kudu connector supports CTAS and inserts, but the test would fail
}

@Test
@Override
public void testReadMetadataWithRelationsConcurrentModifications()
{
try {
super.testReadMetadataWithRelationsConcurrentModifications();
}
catch (Exception expected) {
// The test failure is not guaranteed
// TODO (https://github.com/trinodb/trino/issues/12974): shouldn't fail
assertThat(expected)
.hasMessageMatching(".* table .* was deleted: Table deleted at .* UTC");
abort("to be fixed");
}
}

@Override
protected String createTableSqlTemplateForConcurrentModifications()
{
// TODO Remove this overriding method once kudu connector can create tables with default partitions
return "CREATE TABLE %s(a integer WITH (primary_key=true)) " +
"WITH (partition_by_hash_columns = ARRAY['a'], partition_by_hash_buckets = 2)";
}

@Test
@Override
public void testCreateTableAsSelectNegativeDate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,13 +602,6 @@ public void testDecimalAvgPushdownFoShortDecimalScale()
}
}

@Test
@Override
public void testReadMetadataWithRelationsConcurrentModifications()
{
abort("Test fails with a timeout sometimes and is flaky");
}

@Test
@Override
public void testInsertRowConcurrently()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import static java.util.stream.Collectors.joining;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assumptions.abort;

public abstract class BaseSqlServerConnectorTest
extends BaseJdbcConnectorTest
Expand Down Expand Up @@ -134,26 +133,6 @@ public void testSelectInformationSchemaColumns()
super.testSelectInformationSchemaColumns();
}

@Test
@Override
public void testReadMetadataWithRelationsConcurrentModifications()
{
try {
super.testReadMetadataWithRelationsConcurrentModifications();
}
catch (Exception expected) {
// The test failure is not guaranteed
assertThat(expected)
.hasMessageMatching("(?s).*(" +
"No task completed before timeout|" +
"was deadlocked on lock resources with another process and has been chosen as the deadlock victim|" +
"Lock request time out period exceeded|" +
// E.g. system.metadata.table_comments can return empty results, when underlying metadata list tables call fails
"Expecting actual not to be empty).*");
abort("to be fixed");
}
}

@Override
protected void verifyAddNotNullColumnToNonEmptyTableFailurePermissible(Throwable e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.testing;

import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -57,24 +56,16 @@
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -1966,200 +1957,6 @@ public void testViewAndMaterializedViewTogether()
assertUpdate("DROP MATERIALIZED VIEW " + materializedViewName);
}

/**
* Test that reading table, column metadata, like {@code SHOW TABLES} or reading from {@code information_schema.views}
* does not fail when relations are concurrently created or dropped.
*/
@Test
@Timeout(180)
public void testReadMetadataWithRelationsConcurrentModifications()
throws Exception
{
if (!hasBehavior(SUPPORTS_CREATE_TABLE) && !hasBehavior(SUPPORTS_CREATE_VIEW) && !hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)) {
abort("Cannot test");
}

int readIterations = 5;
// generous timeout as this is a generic test; typically should be faster
int testTimeoutSeconds = 150;

testReadMetadataWithRelationsConcurrentModifications(readIterations, testTimeoutSeconds);
}

protected void testReadMetadataWithRelationsConcurrentModifications(int readIterations, int testTimeoutSeconds)
throws Exception
{
Stopwatch testWatch = Stopwatch.createStarted();

int readerTasksCount = 6
+ (hasBehavior(SUPPORTS_CREATE_VIEW) ? 1 : 0)
+ (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW) ? 1 : 0);
AtomicInteger incompleteReadTasks = new AtomicInteger(readerTasksCount);
List<Callable<Void>> readerTasks = new ArrayList<>();
readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SHOW TABLES"));
readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM information_schema.tables WHERE table_schema = CURRENT_SCHEMA"));
readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA"));
readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.jdbc.tables WHERE table_cat = CURRENT_CATALOG AND table_schem = CURRENT_SCHEMA"));
readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.jdbc.columns WHERE table_cat = CURRENT_CATALOG AND table_schem = CURRENT_SCHEMA"));
readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.metadata.table_comments WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA"));
if (hasBehavior(SUPPORTS_CREATE_VIEW)) {
readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM information_schema.views WHERE table_schema = CURRENT_SCHEMA"));
}
if (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)) {
readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA"));
}
assertThat(readerTasks.size()).isEqualTo(readerTasksCount);

int writeTasksCount = 1
+ (hasBehavior(SUPPORTS_CREATE_VIEW) ? 1 : 0)
+ (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW) ? 1 : 0);
writeTasksCount = 2 * writeTasksCount; // writes are scheduled twice
CountDownLatch writeTasksInitialized = new CountDownLatch(writeTasksCount);
Runnable writeInitialized = writeTasksInitialized::countDown;
AtomicBoolean aborted = new AtomicBoolean();
Supplier<Boolean> done = () -> aborted.get() || incompleteReadTasks.get() == 0;
List<Callable<Void>> writeTasks = new ArrayList<>();
writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_table", createTableSqlTemplateForConcurrentModifications(), "DROP TABLE %s"));
if (hasBehavior(SUPPORTS_CREATE_VIEW)) {
writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_view", "CREATE VIEW %s AS SELECT 1 a", "DROP VIEW %s"));
}
if (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)) {
writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_mview", "CREATE MATERIALIZED VIEW %s AS SELECT 1 a", "DROP MATERIALIZED VIEW %s"));
}
assertThat(writeTasks.size() * 2).isEqualTo(writeTasksCount);

ExecutorService executor = newFixedThreadPool(readerTasksCount + writeTasksCount);
try {
CompletionService<Void> completionService = new ExecutorCompletionService<>(executor);
submitTasks(writeTasks, completionService);
submitTasks(writeTasks, completionService); // twice to increase chances of catching problems
if (!writeTasksInitialized.await(testTimeoutSeconds, SECONDS)) {
Future<Void> someFailure = completionService.poll();
if (someFailure != null) {
someFailure.get(); // non-blocking
}
fail("Setup failed");
}
submitTasks(readerTasks, completionService);
for (int i = 0; i < readerTasksCount + writeTasksCount; i++) {
long remainingTimeSeconds = testTimeoutSeconds - testWatch.elapsed(SECONDS);
Future<Void> future = completionService.poll(remainingTimeSeconds, SECONDS);
verifyNotNull(future, "Task did not completed before timeout; completed tasks: %s, current poll timeout: %s s", i, remainingTimeSeconds);
future.get(); // non-blocking
}
}
catch (Throwable failure) {
aborted.set(true);
executor.shutdownNow();
if (!executor.awaitTermination(10, SECONDS)) {
throw new AssertionError("Test threads did not complete. Leaving test threads behind may violate AbstractTestQueryFramework.checkQueryInfosFinal", failure);
}
throw failure;
}
finally {
executor.shutdownNow();
}
assertThat(executor.awaitTermination(10, SECONDS)).isTrue();
}

@Language("SQL")
protected String createTableSqlTemplateForConcurrentModifications()
{
return "CREATE TABLE %s(a integer)";
}

/**
* Run {@code sql} query at least {@code minIterations} times and keep running until other tasks complete.
* {@code incompleteReadTasks} is used for orchestrating end of execution.
*/
protected Callable<Void> queryRepeatedly(int minIterations, AtomicInteger incompleteReadTasks, @Language("SQL") String sql)
{
return new Callable<>()
{
@Override
public Void call()
{
boolean alwaysEmpty = true;
for (int i = 0; i < minIterations; i++) {
MaterializedResult result = computeActual(sql);
alwaysEmpty &= result.getRowCount() == 0;
}
if (alwaysEmpty) {
fail(format("The results of [%s] are always empty after %s iterations, this may indicate test misconfiguration or broken connector behavior", sql, minIterations));
}
assertThat(incompleteReadTasks.decrementAndGet()).as("incompleteReadTasks").isGreaterThanOrEqualTo(0);
// Keep running so that faster test queries have same length of exposure in wall time
while (incompleteReadTasks.get() != 0) {
computeActual(sql);
}
return null;
}

@Override
public String toString()
{
return format("Query(%s)", sql);
}
};
}

protected Callable<Void> createDropRepeatedly(Runnable initReady, Supplier<Boolean> done, String namePrefix, String createTemplate, String dropTemplate)
{
return new Callable<>()
{
@Override
public Void call()
{
int objectsToKeep = 3;
Deque<String> liveObjects = new ArrayDeque<>(objectsToKeep);
for (int i = 0; i < objectsToKeep; i++) {
String name = namePrefix + "_" + randomNameSuffix();
assertUpdate(format(createTemplate, name));
liveObjects.addLast(name);
}
initReady.run();
while (!done.get()) {
assertUpdate(format(dropTemplate, liveObjects.removeFirst()));
String name = namePrefix + "_" + randomNameSuffix();
assertUpdate(format(createTemplate, name));
liveObjects.addLast(name);
}
while (!liveObjects.isEmpty()) {
assertUpdate(format(dropTemplate, liveObjects.removeFirst()));
}
return null;
}

@Override
public String toString()
{
return format("Repeat (%s) and (%s)", createTemplate, dropTemplate);
}
};
}

protected <T> void submitTasks(List<Callable<T>> callables, CompletionService<T> completionService)
{
for (Callable<T> callable : callables) {
String taskDescription = callable.toString();
completionService.submit(new Callable<T>()
{
@Override
public T call()
throws Exception
{
try {
return callable.call();
}
catch (Throwable e) {
e.addSuppressed(new Exception("Task: " + taskDescription));
throw e;
}
}
});
}
}

@Test
public void testExplainAnalyze()
{
Expand Down