diff --git a/core/trino-main/src/main/java/io/trino/connector/system/TableCommentSystemTable.java b/core/trino-main/src/main/java/io/trino/connector/system/TableCommentSystemTable.java index 16e867079d98..fd7f59694e88 100644 --- a/core/trino-main/src/main/java/io/trino/connector/system/TableCommentSystemTable.java +++ b/core/trino-main/src/main/java/io/trino/connector/system/TableCommentSystemTable.java @@ -113,7 +113,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect } catch (TrinoException e) { // listTables throws an exception if cannot connect the database - LOG.debug(e, "Failed to get tables for catalog: %s", catalog); + LOG.warn(e, "Failed to get tables for catalog: %s", catalog); } for (SchemaTableName name : names) { @@ -121,9 +121,9 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect try { comment = getComment(session, prefix, name, views, materializedViews); } - catch (TrinoException e) { + catch (RuntimeException e) { // getTableHandle may throw an exception (e.g. Cassandra connector doesn't allow case insensitive column names) - LOG.debug(e, "Failed to get metadata for table: %s", name); + LOG.warn(e, "Failed to get metadata for table: %s", name); } table.addRow(prefix.getCatalogName(), name.getSchemaName(), name.getTableName(), comment.orElse(null)); } @@ -153,7 +153,7 @@ private Optional getComment( .map(metadata -> metadata.getMetadata().getComment()) .orElseGet(() -> { // A previously listed table might have been dropped concurrently - LOG.debug("Failed to get metadata for table: %s", name); + LOG.warn("Failed to get metadata for table: %s", name); return Optional.empty(); }); } diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java index fdb59a6e8aa6..0d9d56686299 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.hive.metastore.cache; -import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; @@ -579,8 +578,7 @@ public void dropColumn(String databaseName, String tableName, String columnName) } } - @VisibleForTesting - void invalidateTable(String databaseName, String tableName) + public void invalidateTable(String databaseName, String tableName) { invalidateTableCache(databaseName, tableName); tableNamesCache.invalidate(databaseName); diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index c98357752bdb..d6f44ecebb76 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -125,6 +125,11 @@ joda-time + + net.jodah + failsafe + + org.apache.iceberg diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 38cfc5ff6331..c45bc685f949 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -408,6 +408,10 @@ public Map> listTableColumns(ConnectorSess catch (UnknownTableTypeException e) { // ignore table of unknown type } + catch (RuntimeException e) { + // Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly. + log.warn(e, "Failed to access metadata of table %s during column listing for %s", table, prefix); + } } return columns.buildOrThrow(); } @@ -1276,6 +1280,22 @@ public List listMaterializedViews(ConnectorSession session, Opt return catalog.listMaterializedViews(session, schemaName); } + @Override + public Map getMaterializedViews(ConnectorSession session, Optional schemaName) + { + Map materializedViews = new HashMap<>(); + for (SchemaTableName name : listMaterializedViews(session, schemaName)) { + try { + getMaterializedView(session, name).ifPresent(view -> materializedViews.put(name, view)); + } + catch (RuntimeException e) { + // Materialized view can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly. + log.warn(e, "Failed to access metadata of materialized view %s during listing", name); + } + } + return materializedViews; + } + @Override public Optional getMaterializedView(ConnectorSession session, SchemaTableName viewName) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java index 5c12795c865c..dfb417e97791 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java @@ -28,9 +28,9 @@ import io.trino.plugin.hive.ViewReaderUtil; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Database; -import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HivePrincipal; import io.trino.plugin.hive.metastore.PrincipalPrivileges; +import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.plugin.hive.util.HiveUtil; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; import io.trino.spi.TrinoException; @@ -47,6 +47,8 @@ import io.trino.spi.connector.ViewNotFoundException; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.type.TypeManager; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; @@ -58,6 +60,8 @@ import org.apache.iceberg.Transaction; import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -68,6 +72,7 @@ import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE; @@ -125,7 +130,7 @@ class TrinoHiveCatalog private static final String PRESTO_VIEW_EXPANDED_TEXT_MARKER = HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER; private final CatalogName catalogName; - private final HiveMetastore metastore; + private final CachingHiveMetastore metastore; private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; private final IcebergTableOperationsProvider tableOperationsProvider; @@ -139,7 +144,7 @@ class TrinoHiveCatalog public TrinoHiveCatalog( CatalogName catalogName, - HiveMetastore metastore, + CachingHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, IcebergTableOperationsProvider tableOperationsProvider, @@ -282,13 +287,13 @@ public List listTables(ConnectorSession session, Optional Stream.concat( - // Get tables with parameter table_type set to "ICEBERG" or "iceberg". This is required because - // Trino uses lowercase value whereas Spark and Flink use uppercase. - // TODO: use one metastore call to pass both the filters: https://github.com/trinodb/trino/issues/7710 - metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(Locale.ENGLISH)).stream() - .map(table -> new SchemaTableName(schema, table)), - metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)).stream() - .map(table -> new SchemaTableName(schema, table))) + // Get tables with parameter table_type set to "ICEBERG" or "iceberg". This is required because + // Trino uses lowercase value whereas Spark and Flink use uppercase. + // TODO: use one metastore call to pass both the filters: https://github.com/trinodb/trino/issues/7710 + metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(Locale.ENGLISH)).stream() + .map(table -> new SchemaTableName(schema, table)), + metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)).stream() + .map(table -> new SchemaTableName(schema, table))) .distinct()) // distinct() to avoid duplicates for case-insensitive HMS backends .forEach(tablesListBuilder::add); @@ -341,6 +346,15 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT } } + @Override + public void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional comment) + { + metastore.commentColumn(schemaTableName.getSchemaName(), schemaTableName.getTableName(), columnIdentity.getName(), comment); + + Table icebergTable = loadTable(session, schemaTableName); + icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit(); + } + @Override public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) { @@ -605,6 +619,22 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName schem @Override public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + try { + return Failsafe.with(new RetryPolicy<>() + .withMaxAttempts(10) + .withBackoff(1, 5_000, ChronoUnit.MILLIS, 4) + .withMaxDuration(Duration.ofSeconds(30)) + .abortOn(failure -> !(failure instanceof MaterializedViewMayBeBeingRemovedException))) + .get(() -> doGetMaterializedView(session, schemaViewName)); + } + catch (MaterializedViewMayBeBeingRemovedException e) { + throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); + } + } + + private Optional doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) { Optional tableOptional = metastore.getTable(schemaViewName.getSchemaName(), schemaViewName.getTableName()); if (tableOptional.isEmpty()) { @@ -623,7 +653,20 @@ public Optional getMaterializedView(Connect IcebergMaterializedViewDefinition definition = decodeMaterializedViewData(materializedView.getViewOriginalText() .orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, "No view original text: " + schemaViewName))); - Table icebergTable = loadTable(session, new SchemaTableName(schemaViewName.getSchemaName(), storageTable)); + Table icebergTable; + try { + icebergTable = loadTable(session, new SchemaTableName(schemaViewName.getSchemaName(), storageTable)); + } + catch (RuntimeException e) { + // The materialized view could be removed concurrently. This may manifest in a number of ways, e.g. + // - io.trino.spi.connector.TableNotFoundException + // - org.apache.iceberg.exceptions.NotFoundException when accessing manifest file + // - other failures when reading storage table's metadata files + // Retry, as we're catching broadly. + metastore.invalidateTable(schemaViewName.getSchemaName(), schemaViewName.getTableName()); + metastore.invalidateTable(schemaViewName.getSchemaName(), storageTable); + throw new MaterializedViewMayBeBeingRemovedException(e); + } ImmutableMap.Builder properties = ImmutableMap.builder(); properties.put(FILE_FORMAT_PROPERTY, IcebergUtil.getFileFormat(icebergTable)); if (!icebergTable.spec().fields().isEmpty()) { @@ -660,12 +703,12 @@ private List listNamespaces(ConnectorSession session, Optional n return listNamespaces(session); } - @Override - public void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional comment) + private static class MaterializedViewMayBeBeingRemovedException + extends RuntimeException { - metastore.commentColumn(schemaTableName.getSchemaName(), schemaTableName.getTableName(), columnIdentity.getName(), comment); - - Table icebergTable = loadTable(session, schemaTableName); - icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit(); + public MaterializedViewMayBeBeingRemovedException(Throwable cause) + { + super(requireNonNull(cause, "cause is null")); + } } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java index 1272898042fc..5ef7fbdfdb8d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java @@ -279,6 +279,7 @@ protected void refreshFromMetadataLocation(String newLocation) Tasks.foreach(newLocation) .retry(20) .exponentialBackoff(100, 5000, 600000, 4.0) + .stopRetryOn(org.apache.iceberg.exceptions.NotFoundException.class) // qualified name, as this is NOT the io.trino.spi.connector.NotFoundException .run(metadataLocation -> newMetadata.set( TableMetadataParser.read(fileIo, io().newInputFile(metadataLocation)))); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index c711a7b010d6..ced10b207eb6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -42,7 +42,6 @@ import io.trino.testing.ResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; -import io.trino.testng.services.Flaky; import io.trino.tpch.TpchTable; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; @@ -283,17 +282,6 @@ public void testShowCreateTable() ")"); } - @Test - @Flaky( - issue = "https://github.com/trinodb/trino/issues/10976", - // Due to the nature of the problem, actual failure can vary greatly - match = "^") - @Override - public void testSelectInformationSchemaColumns() - { - super.testSelectInformationSchemaColumns(); - } - @Override protected void checkInformationSchemaViewsForMaterializedView(String schemaName, String viewName) { diff --git a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java index 5a4dba6a74a8..3dac125c2f29 100644 --- a/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java +++ b/plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/AbstractKuduConnectorTest.java @@ -355,6 +355,13 @@ public void testWrittenStats() throw new SkipException("TODO"); } + @Override + public void testReadMetadataWithRelationsConcurrentModifications() + { + // TODO Support these test once kudu connector can create tables with default partitions + throw new SkipException("TODO"); + } + @Test @Override public void testCreateTableAsSelectNegativeDate() diff --git a/plugin/trino-phoenix/pom.xml b/plugin/trino-phoenix/pom.xml index 8e428774ba3c..104c5a002e56 100644 --- a/plugin/trino-phoenix/pom.xml +++ b/plugin/trino-phoenix/pom.xml @@ -185,6 +185,12 @@ + + io.trino + trino-testing-services + test + + io.trino trino-tpch diff --git a/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java b/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java index f71e24f68014..def39f9ce8c0 100644 --- a/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java +++ b/plugin/trino-phoenix/src/test/java/io/trino/plugin/phoenix/TestPhoenixConnectorTest.java @@ -22,6 +22,7 @@ import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.SqlExecutor; import io.trino.testing.sql.TestTable; +import io.trino.testng.services.Flaky; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; @@ -220,6 +221,30 @@ public void testShowCreateTable() ")"); } + // TODO (https://github.com/trinodb/trino/issues/10904): Test is flaky because tests execute in parallel + @Flaky(issue = "https://github.com/trinodb/trino/issues/10904", match = "\\QERROR 1012 (42M03): Table undefined. tableName=") + @Test + @Override + public void testSelectInformationSchemaColumns() + { + super.testSelectInformationSchemaColumns(); + } + + @Override + public void testReadMetadataWithRelationsConcurrentModifications() + { + try { + super.testReadMetadataWithRelationsConcurrentModifications(); + } + catch (Exception expected) { + // The test failure is not guaranteed + // TODO (https://github.com/trinodb/trino/issues/10904): shouldn't fail + assertThat(expected) + .hasMessageContaining("ERROR 1012 (42M03): Table undefined. tableName="); + throw new SkipException("to be fixed"); + } + } + @Override public void testCharVarcharComparison() { diff --git a/plugin/trino-phoenix5/pom.xml b/plugin/trino-phoenix5/pom.xml index ebdcf6864cf6..0077151c6a13 100644 --- a/plugin/trino-phoenix5/pom.xml +++ b/plugin/trino-phoenix5/pom.xml @@ -190,6 +190,12 @@ + + io.trino + trino-testing-services + test + + io.trino trino-tpch diff --git a/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java b/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java index cb1ab18824fa..b88005198421 100644 --- a/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java +++ b/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java @@ -24,6 +24,7 @@ import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.SqlExecutor; import io.trino.testing.sql.TestTable; +import io.trino.testng.services.Flaky; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; @@ -238,6 +239,30 @@ public void testShowCreateTable() ")"); } + // TODO (https://github.com/trinodb/trino/issues/10904): Test is flaky because tests execute in parallel + @Flaky(issue = "https://github.com/trinodb/trino/issues/10904", match = "\\QERROR 1012 (42M03): Table undefined. tableName=") + @Test + @Override + public void testSelectInformationSchemaColumns() + { + super.testSelectInformationSchemaColumns(); + } + + @Override + public void testReadMetadataWithRelationsConcurrentModifications() + { + try { + super.testReadMetadataWithRelationsConcurrentModifications(); + } + catch (Exception expected) { + // The test failure is not guaranteed + // TODO (https://github.com/trinodb/trino/issues/10904): shouldn't fail + assertThat(expected) + .hasMessageContaining("ERROR 1012 (42M03): Table undefined. tableName="); + throw new SkipException("to be fixed"); + } + } + @Override public void testCharVarcharComparison() { diff --git a/plugin/trino-sqlserver/pom.xml b/plugin/trino-sqlserver/pom.xml index 6cdd6b42231f..a41364daa744 100644 --- a/plugin/trino-sqlserver/pom.xml +++ b/plugin/trino-sqlserver/pom.xml @@ -140,6 +140,12 @@ test + + io.trino + trino-testing-services + test + + io.trino trino-tpch diff --git a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java index 29717dc9e172..dcb7cf107222 100644 --- a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java +++ b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java @@ -25,6 +25,8 @@ import io.trino.sql.planner.plan.FilterNode; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; +import io.trino.testng.services.Flaky; +import org.testng.SkipException; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -135,6 +137,35 @@ public void testReadFromView() onRemoteDatabase().execute("DROP VIEW IF EXISTS test_view"); } + // TODO (https://github.com/trinodb/trino/issues/10846): Test is expected to be flaky because tests execute in parallel + @Flaky(issue = "https://github.com/trinodb/trino/issues/10846", match = "was deadlocked on lock resources with another process and has been chosen as the deadlock victim") + @Test + @Override + public void testSelectInformationSchemaColumns() + { + super.testSelectInformationSchemaColumns(); + } + + @Test + @Override + public void testReadMetadataWithRelationsConcurrentModifications() + { + try { + super.testReadMetadataWithRelationsConcurrentModifications(); + } + catch (Exception expected) { + // The test failure is not guaranteed + // TODO (https://github.com/trinodb/trino/issues/10846): shouldn't fail + assertThat(expected) + .hasMessageMatching("(?s).*(" + + "No task completed before timeout|" + + "was deadlocked on lock resources with another process and has been chosen as the deadlock victim|" + + // E.g. system.metadata.table_comments can return empty results, when underlying metadata list tables call fails + "Expecting actual not to be empty).*"); + throw new SkipException("to be fixed"); + } + } + @Test public void testColumnComment() throws Exception diff --git a/testing/trino-testing-services/src/main/java/io/trino/testng/services/LogTestDurationListener.java b/testing/trino-testing-services/src/main/java/io/trino/testng/services/LogTestDurationListener.java index 31a81d3e880b..8ababbb0d2d1 100644 --- a/testing/trino-testing-services/src/main/java/io/trino/testng/services/LogTestDurationListener.java +++ b/testing/trino-testing-services/src/main/java/io/trino/testng/services/LogTestDurationListener.java @@ -260,6 +260,7 @@ private static String getName(ITestClass testClass) private static String getName(IInvokedMethod method) { - return format("%s::%s", method.getTestMethod().getTestClass().getName(), method.getTestMethod().getMethodName()); + // See ProgressLoggingListener.formatTestName + return format("%s.%s", method.getTestMethod().getTestClass().getName(), method.getTestMethod().getMethodName()); } } diff --git a/testing/trino-testing-services/src/main/java/io/trino/testng/services/ProgressLoggingListener.java b/testing/trino-testing-services/src/main/java/io/trino/testng/services/ProgressLoggingListener.java index 65190de9ad36..0602d9478721 100644 --- a/testing/trino-testing-services/src/main/java/io/trino/testng/services/ProgressLoggingListener.java +++ b/testing/trino-testing-services/src/main/java/io/trino/testng/services/ProgressLoggingListener.java @@ -108,6 +108,7 @@ public void onFinish(ITestContext context) private String formatTestName(ITestResult testCase) { + // See LogTestDurationListener.getName return format("%s.%s%s", testCase.getTestClass().getName(), testCase.getName(), formatTestParameters(testCase)); } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index c35eb3ab1ebc..9b11b948dda8 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -13,6 +13,7 @@ */ package io.trino.testing; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.UncheckedTimeoutException; @@ -34,11 +35,19 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; import java.util.List; import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.regex.Matcher; @@ -47,6 +56,7 @@ import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verifyNotNull; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; @@ -105,6 +115,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; /** * Generic test for connectors. @@ -1199,6 +1210,184 @@ protected void checkInformationSchemaViewsForMaterializedView(String schemaName, .containsAll("VALUES '" + viewName + "'"); } + /** + * Test that reading table, column metadata, like {@code SHOW TABLES} or reading from {@code information_schema.views} + * does not fail when relations are concurrently created or dropped. + */ + @Test(timeOut = 180_000) + public void testReadMetadataWithRelationsConcurrentModifications() + throws Exception + { + if (!hasBehavior(SUPPORTS_CREATE_TABLE) && !hasBehavior(SUPPORTS_CREATE_VIEW) && !hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)) { + throw new SkipException("Cannot test"); + } + + int readIterations = 5; + // generous timeout as this is a generic test; typically should be faster + int testTimeoutSeconds = 150; + + testReadMetadataWithRelationsConcurrentModifications(readIterations, testTimeoutSeconds); + } + + protected void testReadMetadataWithRelationsConcurrentModifications(int readIterations, int testTimeoutSeconds) + throws Exception + { + Stopwatch testWatch = Stopwatch.createStarted(); + + int readerTasksCount = 6 + + (hasBehavior(SUPPORTS_CREATE_VIEW) ? 1 : 0) + + (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW) ? 1 : 0); + AtomicInteger incompleteReadTasks = new AtomicInteger(readerTasksCount); + List> readerTasks = new ArrayList<>(); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SHOW TABLES")); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM information_schema.tables WHERE table_schema = CURRENT_SCHEMA")); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM information_schema.columns WHERE table_schema = CURRENT_SCHEMA")); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.jdbc.tables WHERE table_cat = CURRENT_CATALOG AND table_schem = CURRENT_SCHEMA")); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.jdbc.columns WHERE table_cat = CURRENT_CATALOG AND table_schem = CURRENT_SCHEMA")); + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.metadata.table_comments WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA")); + if (hasBehavior(SUPPORTS_CREATE_VIEW)) { + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM information_schema.views WHERE table_schema = CURRENT_SCHEMA")); + } + if (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)) { + readerTasks.add(queryRepeatedly(readIterations, incompleteReadTasks, "SELECT * FROM system.metadata.materialized_views WHERE catalog_name = CURRENT_CATALOG AND schema_name = CURRENT_SCHEMA")); + } + assertEquals(readerTasks.size(), readerTasksCount); + + int writeTasksCount = 1 + + (hasBehavior(SUPPORTS_CREATE_VIEW) ? 1 : 0) + + (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW) ? 1 : 0); + writeTasksCount = 2 * writeTasksCount; // writes are scheduled twice + CountDownLatch writeTasksInitialized = new CountDownLatch(writeTasksCount); + Runnable writeInitialized = writeTasksInitialized::countDown; + Supplier done = () -> incompleteReadTasks.get() == 0; + List> writeTasks = new ArrayList<>(); + writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_table", "CREATE TABLE %s(a integer)", "DROP TABLE %s")); + if (hasBehavior(SUPPORTS_CREATE_VIEW)) { + writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_view", "CREATE VIEW %s AS SELECT 1 a", "DROP VIEW %s")); + } + if (hasBehavior(SUPPORTS_CREATE_MATERIALIZED_VIEW)) { + writeTasks.add(createDropRepeatedly(writeInitialized, done, "concur_mview", "CREATE MATERIALIZED VIEW %s AS SELECT 1 a", "DROP MATERIALIZED VIEW %s")); + } + assertEquals(writeTasks.size() * 2, writeTasksCount); + + ExecutorService executor = newFixedThreadPool(readerTasksCount + writeTasksCount); + try { + CompletionService completionService = new ExecutorCompletionService<>(executor); + submitTasks(writeTasks, completionService); + submitTasks(writeTasks, completionService); // twice to increase chances of catching problems + if (!writeTasksInitialized.await(testTimeoutSeconds, SECONDS)) { + Future someFailure = completionService.poll(); + if (someFailure != null) { + someFailure.get(); // non-blocking + } + fail("Setup failed"); + } + submitTasks(readerTasks, completionService); + for (int i = 0; i < readerTasksCount + writeTasksCount; i++) { + long remainingTimeSeconds = testTimeoutSeconds - testWatch.elapsed(SECONDS); + Future future = completionService.poll(remainingTimeSeconds, SECONDS); + verifyNotNull(future, "Task did not completed before timeout; completed tasks: %s, current poll timeout: %s s", i, remainingTimeSeconds); + future.get(); // non-blocking + } + } + finally { + executor.shutdownNow(); + } + assertTrue(executor.awaitTermination(10, SECONDS)); + } + + /** + * Run {@code sql} query at least {@code minIterations} times and keep running until other tasks complete. + * {@code incompleteReadTasks} is used for orchestrating end of execution. + */ + protected Callable queryRepeatedly(int minIterations, AtomicInteger incompleteReadTasks, @Language("SQL") String sql) + { + return new Callable<>() + { + @Override + public Void call() + { + boolean alwaysEmpty = true; + for (int i = 0; i < minIterations; i++) { + MaterializedResult result = computeActual(sql); + alwaysEmpty &= result.getRowCount() == 0; + } + if (alwaysEmpty) { + fail(format("The results of [%s] are always empty after %s iterations, this may indicate test misconfiguration or broken connector behavior", sql, minIterations)); + } + assertThat(incompleteReadTasks.decrementAndGet()).as("incompleteReadTasks").isGreaterThanOrEqualTo(0); + // Keep running so that faster test queries have same length of exposure in wall time + while (incompleteReadTasks.get() != 0) { + computeActual(sql); + } + return null; + } + + @Override + public String toString() + { + return format("Query(%s)", sql); + } + }; + } + + protected Callable createDropRepeatedly(Runnable initReady, Supplier done, String namePrefix, String createTemplate, String dropTemplate) + { + return new Callable<>() + { + @Override + public Void call() + { + int objectsToKeep = 3; + Deque liveObjects = new ArrayDeque<>(objectsToKeep); + for (int i = 0; i < objectsToKeep; i++) { + String name = namePrefix + "_" + randomTableSuffix(); + assertUpdate(format(createTemplate, name)); + liveObjects.addLast(name); + } + initReady.run(); + while (!done.get()) { + assertUpdate(format(dropTemplate, liveObjects.removeFirst())); + String name = namePrefix + "_" + randomTableSuffix(); + assertUpdate(format(createTemplate, name)); + liveObjects.addLast(name); + } + while (!liveObjects.isEmpty()) { + assertUpdate(format(dropTemplate, liveObjects.removeFirst())); + } + return null; + } + + @Override + public String toString() + { + return format("Repeat (%s) and (%s)", createTemplate, dropTemplate); + } + }; + } + + protected void submitTasks(List> callables, CompletionService completionService) + { + for (Callable callable : callables) { + String taskDescription = callable.toString(); + completionService.submit(new Callable() + { + @Override + public T call() + throws Exception + { + try { + return callable.call(); + } + catch (Throwable e) { + e.addSuppressed(new Exception("Task: " + taskDescription)); + throw e; + } + } + }); + } + } + @Test public void testExplainAnalyze() {