Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,17 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
}
catch (TrinoException e) {
// listTables throws an exception if cannot connect the database
LOG.debug(e, "Failed to get tables for catalog: %s", catalog);
LOG.warn(e, "Failed to get tables for catalog: %s", catalog);
}

for (SchemaTableName name : names) {
Optional<String> comment = Optional.empty();
try {
comment = getComment(session, prefix, name, views, materializedViews);
}
catch (TrinoException e) {
catch (RuntimeException e) {
// getTableHandle may throw an exception (e.g. Cassandra connector doesn't allow case insensitive column names)
LOG.debug(e, "Failed to get metadata for table: %s", name);
LOG.warn(e, "Failed to get metadata for table: %s", name);
}
table.addRow(prefix.getCatalogName(), name.getSchemaName(), name.getTableName(), comment.orElse(null));
}
Expand Down Expand Up @@ -153,7 +153,7 @@ private Optional<String> getComment(
.map(metadata -> metadata.getMetadata().getComment())
.orElseGet(() -> {
// A previously listed table might have been dropped concurrently
LOG.debug("Failed to get metadata for table: %s", name);
LOG.warn("Failed to get metadata for table: %s", name);
return Optional.empty();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.hive.metastore.cache;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -579,8 +578,7 @@ public void dropColumn(String databaseName, String tableName, String columnName)
}
}

@VisibleForTesting
void invalidateTable(String databaseName, String tableName)
public void invalidateTable(String databaseName, String tableName)
{
invalidateTableCache(databaseName, tableName);
tableNamesCache.invalidate(databaseName);
Expand Down
5 changes: 5 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@
<artifactId>joda-time</artifactId>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<!-- Iceberg -->
<dependency>
<groupId>org.apache.iceberg</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
catch (UnknownTableTypeException e) {
// ignore table of unknown type
}
catch (RuntimeException e) {
// Table can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of table %s during column listing for %s", table, prefix);
}
}
return columns.buildOrThrow();
}
Expand Down Expand Up @@ -1276,6 +1280,22 @@ public List<SchemaTableName> listMaterializedViews(ConnectorSession session, Opt
return catalog.listMaterializedViews(session, schemaName);
}

@Override
public Map<SchemaTableName, ConnectorMaterializedViewDefinition> getMaterializedViews(ConnectorSession session, Optional<String> schemaName)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this change just go into the default implementation in ConnectorMetadata?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg has a challenge because it keeps state between metastore and filesystem, and cannot rely on observing consistent state. I would hope most connectors don't have problems like that. Or, may be able to better exception-control this (here i catch RuntimeException which is too broad in a general case).

{
Map<SchemaTableName, ConnectorMaterializedViewDefinition> materializedViews = new HashMap<>();
for (SchemaTableName name : listMaterializedViews(session, schemaName)) {
try {
getMaterializedView(session, name).ifPresent(view -> materializedViews.put(name, view));
}
catch (RuntimeException e) {
// Materialized view can be being removed and this may cause all sorts of exceptions. Log, because we're catching broadly.
log.warn(e, "Failed to access metadata of materialized view %s during listing", name);
}
}
return materializedViews;
}

@Override
public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(ConnectorSession session, SchemaTableName viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import io.trino.plugin.hive.ViewReaderUtil;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HivePrincipal;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore;
import io.trino.plugin.hive.util.HiveUtil;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.spi.TrinoException;
Expand All @@ -47,6 +47,8 @@
import io.trino.spi.connector.ViewNotFoundException;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.type.TypeManager;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
Expand All @@ -58,6 +60,8 @@
import org.apache.iceberg.Transaction;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand All @@ -68,6 +72,7 @@
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE;
Expand Down Expand Up @@ -125,7 +130,7 @@ class TrinoHiveCatalog
private static final String PRESTO_VIEW_EXPANDED_TEXT_MARKER = HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER;

private final CatalogName catalogName;
private final HiveMetastore metastore;
private final CachingHiveMetastore metastore;
private final HdfsEnvironment hdfsEnvironment;
private final TypeManager typeManager;
private final IcebergTableOperationsProvider tableOperationsProvider;
Expand All @@ -139,7 +144,7 @@ class TrinoHiveCatalog

public TrinoHiveCatalog(
CatalogName catalogName,
HiveMetastore metastore,
CachingHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
IcebergTableOperationsProvider tableOperationsProvider,
Expand Down Expand Up @@ -282,13 +287,13 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
listNamespaces(session, namespace)
.stream()
.flatMap(schema -> Stream.concat(
// Get tables with parameter table_type set to "ICEBERG" or "iceberg". This is required because
// Trino uses lowercase value whereas Spark and Flink use uppercase.
// TODO: use one metastore call to pass both the filters: https://github.com/trinodb/trino/issues/7710
metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(Locale.ENGLISH)).stream()
.map(table -> new SchemaTableName(schema, table)),
metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)).stream()
.map(table -> new SchemaTableName(schema, table)))
// Get tables with parameter table_type set to "ICEBERG" or "iceberg". This is required because
// Trino uses lowercase value whereas Spark and Flink use uppercase.
// TODO: use one metastore call to pass both the filters: https://github.com/trinodb/trino/issues/7710
metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toLowerCase(Locale.ENGLISH)).stream()
.map(table -> new SchemaTableName(schema, table)),
metastore.getTablesWithParameter(schema, TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)).stream()
.map(table -> new SchemaTableName(schema, table)))
.distinct()) // distinct() to avoid duplicates for case-insensitive HMS backends
.forEach(tablesListBuilder::add);

Expand Down Expand Up @@ -341,6 +346,15 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT
}
}

@Override
public void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional<String> comment)
{
metastore.commentColumn(schemaTableName.getSchemaName(), schemaTableName.getTableName(), columnIdentity.getName(), comment);

Table icebergTable = loadTable(session, schemaTableName);
icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit();
}

@Override
public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
{
Expand Down Expand Up @@ -605,6 +619,22 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName schem

@Override
public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName)
{
try {
return Failsafe.with(new RetryPolicy<>()
.withMaxAttempts(10)
.withBackoff(1, 5_000, ChronoUnit.MILLIS, 4)
.withMaxDuration(Duration.ofSeconds(30))
Comment thread
findepi marked this conversation as resolved.
Outdated
.abortOn(failure -> !(failure instanceof MaterializedViewMayBeBeingRemovedException)))
.get(() -> doGetMaterializedView(session, schemaViewName));
}
catch (MaterializedViewMayBeBeingRemovedException e) {
throwIfUnchecked(e.getCause());
throw new RuntimeException(e.getCause());
}
}

private Optional<ConnectorMaterializedViewDefinition> doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName)
{
Optional<io.trino.plugin.hive.metastore.Table> tableOptional = metastore.getTable(schemaViewName.getSchemaName(), schemaViewName.getTableName());
if (tableOptional.isEmpty()) {
Expand All @@ -623,7 +653,20 @@ public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Connect
IcebergMaterializedViewDefinition definition = decodeMaterializedViewData(materializedView.getViewOriginalText()
.orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, "No view original text: " + schemaViewName)));

Table icebergTable = loadTable(session, new SchemaTableName(schemaViewName.getSchemaName(), storageTable));
Table icebergTable;
try {
icebergTable = loadTable(session, new SchemaTableName(schemaViewName.getSchemaName(), storageTable));
}
catch (RuntimeException e) {
// The materialized view could be removed concurrently. This may manifest in a number of ways, e.g.
// - io.trino.spi.connector.TableNotFoundException
// - org.apache.iceberg.exceptions.NotFoundException when accessing manifest file
// - other failures when reading storage table's metadata files
// Retry, as we're catching broadly.
metastore.invalidateTable(schemaViewName.getSchemaName(), schemaViewName.getTableName());
metastore.invalidateTable(schemaViewName.getSchemaName(), storageTable);
throw new MaterializedViewMayBeBeingRemovedException(e);
}
ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder();
properties.put(FILE_FORMAT_PROPERTY, IcebergUtil.getFileFormat(icebergTable));
if (!icebergTable.spec().fields().isEmpty()) {
Expand Down Expand Up @@ -660,12 +703,12 @@ private List<String> listNamespaces(ConnectorSession session, Optional<String> n
return listNamespaces(session);
}

@Override
public void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional<String> comment)
private static class MaterializedViewMayBeBeingRemovedException
extends RuntimeException
{
metastore.commentColumn(schemaTableName.getSchemaName(), schemaTableName.getTableName(), columnIdentity.getName(), comment);

Table icebergTable = loadTable(session, schemaTableName);
icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit();
public MaterializedViewMayBeBeingRemovedException(Throwable cause)
{
super(requireNonNull(cause, "cause is null"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ protected void refreshFromMetadataLocation(String newLocation)
Tasks.foreach(newLocation)
.retry(20)
.exponentialBackoff(100, 5000, 600000, 4.0)
.stopRetryOn(org.apache.iceberg.exceptions.NotFoundException.class) // qualified name, as this is NOT the io.trino.spi.connector.NotFoundException
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(fileIo, io().newInputFile(metadataLocation))));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.trino.testing.ResultWithQueryId;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.sql.TestTable;
import io.trino.testng.services.Flaky;
import io.trino.tpch.TpchTable;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
Expand Down Expand Up @@ -283,17 +282,6 @@ public void testShowCreateTable()
")");
}

@Test
@Flaky(
issue = "https://github.com/trinodb/trino/issues/10976",
// Due to the nature of the problem, actual failure can vary greatly
match = "^")
@Override
public void testSelectInformationSchemaColumns()
{
super.testSelectInformationSchemaColumns();
}

@Override
protected void checkInformationSchemaViewsForMaterializedView(String schemaName, String viewName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,13 @@ public void testWrittenStats()
throw new SkipException("TODO");
}

@Override
public void testReadMetadataWithRelationsConcurrentModifications()
{
// TODO Support these test once kudu connector can create tables with default partitions
throw new SkipException("TODO");
}

@Test
@Override
public void testCreateTableAsSelectNegativeDate()
Expand Down
6 changes: 6 additions & 0 deletions plugin/trino-phoenix/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-services</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-tpch</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.sql.SqlExecutor;
import io.trino.testing.sql.TestTable;
import io.trino.testng.services.Flaky;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -220,6 +221,30 @@ public void testShowCreateTable()
")");
}

// TODO (https://github.com/trinodb/trino/issues/10904): Test is flaky because tests execute in parallel
@Flaky(issue = "https://github.com/trinodb/trino/issues/10904", match = "\\QERROR 1012 (42M03): Table undefined. tableName=")
@Test
@Override
public void testSelectInformationSchemaColumns()
{
super.testSelectInformationSchemaColumns();
}

@Override
public void testReadMetadataWithRelationsConcurrentModifications()
{
try {
super.testReadMetadataWithRelationsConcurrentModifications();
}
catch (Exception expected) {
// The test failure is not guaranteed
// TODO (https://github.com/trinodb/trino/issues/10904): shouldn't fail
assertThat(expected)
.hasMessageContaining("ERROR 1012 (42M03): Table undefined. tableName=");
throw new SkipException("to be fixed");
}
}

@Override
public void testCharVarcharComparison()
{
Expand Down
6 changes: 6 additions & 0 deletions plugin/trino-phoenix5/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-services</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-tpch</artifactId>
Expand Down
Loading