diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java index 07cff19974cd..7e9597aac98e 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java @@ -119,6 +119,7 @@ private static CoralTableRedirectionResolver coralTableRedirectionResolver( }); } + public static final String ICEBERG_MATERIALIZED_VIEW_COMMENT = "Presto Materialized View"; public static final String PRESTO_VIEW_FLAG = "presto_view"; static final String VIEW_PREFIX = "/* Presto View: "; static final String VIEW_SUFFIX = " */"; @@ -145,6 +146,11 @@ public static boolean isHiveOrPrestoView(String tableType) return tableType.equals(TableType.VIRTUAL_VIEW.name()); } + public static boolean isTrinoMaterializedView(String tableType, Map tableParameters) + { + return isHiveOrPrestoView(tableType) && isPrestoView(tableParameters) && tableParameters.get(TABLE_COMMENT).equalsIgnoreCase(ICEBERG_MATERIALIZED_VIEW_COMMENT); + } + public static boolean canDecodeView(Table table) { // we can decode Hive or Presto view diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index d48a53ff5161..96f43a8ce4c3 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -416,6 +416,7 @@ **/TestTrinoGlueCatalogTest.java **/TestSharedGlueMetastore.java **/TestIcebergGlueCatalogAccessOperations.java + **/TestIcebergGlueCatalogMaterializedViewTest.java @@ -455,6 +456,7 @@ **/TestTrinoGlueCatalogTest.java **/TestSharedGlueMetastore.java **/TestIcebergGlueCatalogAccessOperations.java + **/TestIcebergGlueCatalogMaterializedViewTest.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index 97ab4492f84d..a890c5e8b608 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -14,15 +14,25 @@ package io.trino.plugin.iceberg.catalog; import com.google.common.collect.ImmutableMap; +import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.HiveMetadata; import io.trino.plugin.hive.HiveViewNotSupportedException; import io.trino.plugin.hive.ViewReaderUtil; import io.trino.plugin.iceberg.ColumnIdentity; +import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition; +import io.trino.plugin.iceberg.IcebergUtil; import io.trino.spi.TrinoException; +import io.trino.spi.connector.CatalogSchemaTableName; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TypeManager; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.PartitionSpec; @@ -33,20 +43,33 @@ import org.apache.iceberg.Transaction; import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Throwables.throwIfUnchecked; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.trino.plugin.hive.ViewReaderUtil.ICEBERG_MATERIALIZED_VIEW_COMMENT; import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG; import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView; import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; +import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData; +import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; +import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; +import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields; import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; import static org.apache.iceberg.TableMetadata.newTableMetadata; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.Transactions.createTableTransaction; public abstract class AbstractTrinoCatalog @@ -60,15 +83,21 @@ public abstract class AbstractTrinoCatalog protected static final String PRESTO_QUERY_ID_NAME = HiveMetadata.PRESTO_QUERY_ID_NAME; protected static final String PRESTO_VIEW_EXPANDED_TEXT_MARKER = HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER; + private final CatalogName catalogName; + private final TypeManager typeManager; protected final IcebergTableOperationsProvider tableOperationsProvider; private final String trinoVersion; private final boolean useUniqueTableLocation; protected AbstractTrinoCatalog( + CatalogName catalogName, + TypeManager typeManager, IcebergTableOperationsProvider tableOperationsProvider, String trinoVersion, boolean useUniqueTableLocation) { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); this.useUniqueTableLocation = useUniqueTableLocation; @@ -113,6 +142,25 @@ public Map getViews(ConnectorSession s return views.buildOrThrow(); } + @Override + public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + try { + return Failsafe.with(new RetryPolicy<>() + .withMaxAttempts(10) + .withBackoff(1, 5_000, ChronoUnit.MILLIS, 4) + .withMaxDuration(Duration.ofSeconds(30)) + .abortOn(failure -> !(failure instanceof MaterializedViewMayBeBeingRemovedException))) + .get(() -> doGetMaterializedView(session, schemaViewName)); + } + catch (MaterializedViewMayBeBeingRemovedException e) { + throwIfUnchecked(e.getCause()); + throw new RuntimeException(e.getCause()); + } + } + + protected abstract Optional doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName); + protected Transaction newCreateTableTransaction( ConnectorSession session, SchemaTableName schemaTableName, @@ -206,4 +254,71 @@ protected Map createViewProperties(ConnectorSession session) .put(TABLE_COMMENT, PRESTO_VIEW_COMMENT) .buildOrThrow(); } + + protected SchemaTableName createMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName, ConnectorMaterializedViewDefinition definition) + { + // Generate a storage table name and create a storage table. The properties in the definition are table properties for the + // storage table as indicated in the materialized view definition. + String storageTableName = "st_" + randomUUID().toString().replace("-", ""); + Map storageTableProperties = new HashMap<>(definition.getProperties()); + storageTableProperties.putIfAbsent(FILE_FORMAT_PROPERTY, DEFAULT_FILE_FORMAT_DEFAULT); + + SchemaTableName storageTable = new SchemaTableName(viewName.getSchemaName(), storageTableName); + List columns = definition.getColumns().stream() + .map(column -> new ColumnMetadata(column.getName(), typeManager.getType(column.getType()))) + .collect(toImmutableList()); + + ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty()); + Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session); + transaction.newAppend().commit(); + transaction.commitTransaction(); + return storageTable; + } + + protected ConnectorMaterializedViewDefinition getMaterializedViewDefinition( + SchemaTableName viewName, + Table icebergTable, + Optional owner, + String viewOriginalText, + String storageTableName) + { + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.put(FILE_FORMAT_PROPERTY, IcebergUtil.getFileFormat(icebergTable)); + if (!icebergTable.spec().fields().isEmpty()) { + properties.put(PARTITIONING_PROPERTY, toPartitionFields(icebergTable.spec())); + } + + IcebergMaterializedViewDefinition definition = decodeMaterializedViewData(viewOriginalText); + return new ConnectorMaterializedViewDefinition( + definition.getOriginalSql(), + Optional.of(new CatalogSchemaTableName(catalogName.toString(), new SchemaTableName(viewName.getSchemaName(), storageTableName))), + definition.getCatalog(), + definition.getSchema(), + definition.getColumns().stream() + .map(column -> new ConnectorMaterializedViewDefinition.Column(column.getName(), column.getType())) + .collect(toImmutableList()), + definition.getComment(), + owner, + properties.buildOrThrow()); + } + + protected Map createMaterializedViewProperties(ConnectorSession session, String storageTableName) + { + return ImmutableMap.builder() + .put(PRESTO_QUERY_ID_NAME, session.getQueryId()) + .put(STORAGE_TABLE, storageTableName) + .put(PRESTO_VIEW_FLAG, "true") + .put(TRINO_CREATED_BY, TRINO_CREATED_BY_VALUE) + .put(TABLE_COMMENT, ICEBERG_MATERIALIZED_VIEW_COMMENT) + .buildOrThrow(); + } + + protected static class MaterializedViewMayBeBeingRemovedException + extends RuntimeException + { + public MaterializedViewMayBeBeingRemovedException(Throwable cause) + { + super(requireNonNull(cause, "cause is null")); + } + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index 805e1d3d1bb5..efc9f83cc116 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -108,14 +108,14 @@ Transaction newCreateTableTransaction( void createMaterializedView( ConnectorSession session, - SchemaTableName schemaViewName, + SchemaTableName viewName, ConnectorMaterializedViewDefinition definition, boolean replace, boolean ignoreExisting); - void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName); + void dropMaterializedView(ConnectorSession session, SchemaTableName viewName); - Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName); + Optional getMaterializedView(ConnectorSession session, SchemaTableName viewName); void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java index 9725260f9a27..96a187858314 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java @@ -21,6 +21,7 @@ import java.util.Optional; import static io.trino.plugin.hive.HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER; +import static io.trino.plugin.hive.ViewReaderUtil.ICEBERG_MATERIALIZED_VIEW_COMMENT; import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE; import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW; @@ -48,4 +49,15 @@ public static TableInput getViewTableInput(String viewName, String viewOriginalT .withOwner(owner) .withParameters(parameters); } + + public static TableInput getMaterializedViewTableInput(String viewName, String viewOriginalText, String owner, Map parameters) + { + return new TableInput() + .withName(viewName) + .withTableType(VIRTUAL_VIEW.name()) + .withViewOriginalText(viewOriginalText) + .withViewExpandedText(ICEBERG_MATERIALIZED_VIEW_COMMENT) + .withOwner(owner) + .withParameters(parameters); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 0d8da9c2ee5c..e14fb4e5196f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -33,6 +33,8 @@ import com.amazonaws.services.glue.model.UpdateTableRequest; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.SchemaAlreadyExistsException; import io.trino.plugin.hive.ViewAlreadyExistsException; @@ -44,12 +46,14 @@ import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.MaterializedViewNotFoundException; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.connector.ViewNotFoundException; import io.trino.spi.security.PrincipalType; import io.trino.spi.security.TrinoPrincipal; +import io.trino.spi.type.TypeManager; import net.jodah.failsafe.Failsafe; import net.jodah.failsafe.RetryPolicy; import org.apache.hadoop.fs.Path; @@ -69,22 +73,31 @@ import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; +import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE; import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData; import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; +import static io.trino.plugin.hive.ViewReaderUtil.isTrinoMaterializedView; import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; +import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData; +import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition; import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; +import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getMaterializedViewTableInput; import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getTableInput; import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getViewTableInput; +import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE; import static io.trino.spi.connector.SchemaTableName.schemaTableName; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -96,6 +109,8 @@ public class TrinoGlueCatalog extends AbstractTrinoCatalog { + private static final Logger LOG = Logger.get(TrinoGlueCatalog.class); + private final HdfsEnvironment hdfsEnvironment; private final Optional defaultSchemaLocation; private final AWSGlueAsync glueClient; @@ -104,7 +119,9 @@ public class TrinoGlueCatalog private final Map tableMetadataCache = new ConcurrentHashMap<>(); public TrinoGlueCatalog( + CatalogName catalogName, HdfsEnvironment hdfsEnvironment, + TypeManager typeManager, IcebergTableOperationsProvider tableOperationsProvider, String trinoVersion, AWSGlueAsync glueClient, @@ -112,7 +129,7 @@ public TrinoGlueCatalog( Optional defaultSchemaLocation, boolean useUniqueTableLocation) { - super(tableOperationsProvider, trinoVersion, useUniqueTableLocation); + super(catalogName, typeManager, tableOperationsProvider, trinoVersion, useUniqueTableLocation); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.glueClient = requireNonNull(glueClient, "glueClient is null"); this.stats = requireNonNull(stats, "stats is null"); @@ -363,6 +380,14 @@ private Optional getTable(SchemaTableNa } } + private void createTable(String schemaName, TableInput tableInput) + { + stats.getCreateTable().call(() -> + glueClient.createTable(new CreateTableRequest() + .withDatabaseName(schemaName) + .withTableInput(tableInput))); + } + private void deleteTable(String schema, String table) { stats.getDeleteTable().call(() -> @@ -556,31 +581,193 @@ public Optional getView(ConnectorSession session, Schem @Override public List listMaterializedViews(ConnectorSession session, Optional namespace) { - return ImmutableList.of(); + try { + List namespaces = namespace.map(List::of).orElseGet(() -> listNamespaces(session)); + return namespaces.stream() + .flatMap(glueNamespace -> { + try { + return getPaginatedResults( + glueClient::getTables, + new GetTablesRequest().withDatabaseName(glueNamespace), + GetTablesRequest::setNextToken, + GetTablesResult::getNextToken, + stats.getGetTables()) + .map(GetTablesResult::getTableList) + .flatMap(List::stream) + .filter(table -> isTrinoMaterializedView(table.getTableType(), table.getParameters())) + .map(table -> new SchemaTableName(glueNamespace, table.getName())); + } + catch (EntityNotFoundException e) { + // Namespace may have been deleted + return Stream.empty(); + } + }) + .collect(toImmutableList()); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } } @Override - public void createMaterializedView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorMaterializedViewDefinition definition, boolean replace, boolean ignoreExisting) + public void createMaterializedView( + ConnectorSession session, + SchemaTableName viewName, + ConnectorMaterializedViewDefinition definition, + boolean replace, + boolean ignoreExisting) { - throw new TrinoException(NOT_SUPPORTED, "createMaterializedView is not supported for Iceberg Glue catalogs"); + Optional existing = getTable(viewName); + + if (existing.isPresent()) { + if (!isTrinoMaterializedView(existing.get().getTableType(), existing.get().getParameters())) { + throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Existing table is not a Materialized View: " + viewName); + } + if (!replace) { + if (ignoreExisting) { + return; + } + throw new TrinoException(ALREADY_EXISTS, "Materialized view already exists: " + viewName); + } + } + + // Create the storage table + SchemaTableName storageTable = createMaterializedViewStorageTable(session, viewName, definition); + // Create a view indicating the storage table + TableInput materializedViewTableInput = getMaterializedViewTableInput( + viewName.getTableName(), + encodeMaterializedViewData(fromConnectorMaterializedViewDefinition(definition)), + session.getUser(), + createMaterializedViewProperties(session, storageTable.getTableName())); + + if (existing.isPresent()) { + try { + stats.getUpdateTable().call(() -> + glueClient.updateTable(new UpdateTableRequest() + .withDatabaseName(viewName.getSchemaName()) + .withTableInput(materializedViewTableInput))); + } + catch (RuntimeException e) { + try { + // Update failed, clean up new storage table + dropTable(session, storageTable); + } + catch (RuntimeException suppressed) { + LOG.warn(suppressed, "Failed to drop new storage table '%s' for materialized view '%s'", storageTable, viewName); + if (e != suppressed) { + e.addSuppressed(suppressed); + } + } + } + dropStorageTable(session, existing.get()); + } + else { + createTable(viewName.getSchemaName(), materializedViewTableInput); + } } @Override - public void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + public void dropMaterializedView(ConnectorSession session, SchemaTableName viewName) { - throw new TrinoException(NOT_SUPPORTED, "dropMaterializedView is not supported for Iceberg Glue catalogs"); + com.amazonaws.services.glue.model.Table view = getTable(viewName) + .orElseThrow(() -> new MaterializedViewNotFoundException(viewName)); + + if (!isTrinoMaterializedView(view.getTableType(), view.getParameters())) { + throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + view.getDatabaseName() + "." + view.getName()); + } + dropStorageTable(session, view); + deleteTable(view.getDatabaseName(), view.getName()); + } + + private void dropStorageTable(ConnectorSession session, com.amazonaws.services.glue.model.Table view) + { + String storageTableName = view.getParameters().get(STORAGE_TABLE); + if (storageTableName != null) { + try { + dropTable(session, new SchemaTableName(view.getDatabaseName(), storageTableName)); + } + catch (TrinoException e) { + LOG.warn(e, "Failed to drop storage table '%s' for materialized view '%s'", storageTableName, view.getName()); + } + } } @Override - public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + protected Optional doGetMaterializedView(ConnectorSession session, SchemaTableName viewName) { - return Optional.empty(); + Optional maybeTable = getTable(viewName); + if (maybeTable.isEmpty()) { + return Optional.empty(); + } + + com.amazonaws.services.glue.model.Table table = maybeTable.get(); + Map tableParameters = table.getParameters(); + if (!isTrinoMaterializedView(table.getTableType(), tableParameters)) { + return Optional.empty(); + } + + String storageTableName = table.getParameters().get(STORAGE_TABLE); + checkState(storageTableName != null, "Storage table missing in definition of materialized view " + viewName); + + Table icebergTable; + try { + icebergTable = loadTable(session, new SchemaTableName(viewName.getSchemaName(), storageTableName)); + } + catch (RuntimeException e) { + // The materialized view could be removed concurrently. This may manifest in a number of ways, e.g. + // - io.trino.spi.connector.TableNotFoundException + // - org.apache.iceberg.exceptions.NotFoundException when accessing manifest file + // - other failures when reading storage table's metadata files + // Retry, as we're catching broadly. + throw new MaterializedViewMayBeBeingRemovedException(e); + } + + String viewOriginalText = table.getViewOriginalText(); + if (viewOriginalText == null) { + throw new TrinoException(ICEBERG_BAD_DATA, "Materialized view did not have original text " + viewName); + } + return Optional.of(getMaterializedViewDefinition( + viewName, + icebergTable, + Optional.ofNullable(table.getOwner()), + viewOriginalText, + storageTableName)); } @Override public void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target) { - throw new TrinoException(NOT_SUPPORTED, "renameMaterializedView is not supported for Iceberg Glue catalogs"); + boolean newTableCreated = false; + try { + Optional table = getTable(source); + if (table.isEmpty()) { + throw new TableNotFoundException(source); + } + com.amazonaws.services.glue.model.Table glueTable = table.get(); + if (!isTrinoMaterializedView(glueTable.getTableType(), glueTable.getParameters())) { + throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + source); + } + TableInput tableInput = getMaterializedViewTableInput(target.getTableName(), glueTable.getViewOriginalText(), glueTable.getOwner(), glueTable.getParameters()); + CreateTableRequest createTableRequest = new CreateTableRequest() + .withDatabaseName(target.getSchemaName()) + .withTableInput(tableInput); + stats.getCreateTable().call(() -> glueClient.createTable(createTableRequest)); + newTableCreated = true; + deleteTable(source.getSchemaName(), source.getTableName()); + } + catch (RuntimeException e) { + if (newTableCreated) { + try { + deleteTable(target.getSchemaName(), target.getTableName()); + } + catch (RuntimeException cleanupException) { + if (!cleanupException.equals(e)) { + e.addSuppressed(cleanupException); + } + } + } + throw e; + } } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java index 57b2235bb801..2c9c6758b26a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java @@ -15,6 +15,7 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.glue.AWSGlueAsync; +import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; @@ -24,6 +25,7 @@ import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.spi.security.ConnectorIdentity; +import io.trino.spi.type.TypeManager; import org.weakref.jmx.Flatten; import org.weakref.jmx.Managed; @@ -37,7 +39,9 @@ public class TrinoGlueCatalogFactory implements TrinoCatalogFactory { + private final CatalogName catalogName; private final HdfsEnvironment hdfsEnvironment; + private final TypeManager typeManager; private final IcebergTableOperationsProvider tableOperationsProvider; private final String trinoVersion; private final Optional defaultSchemaLocation; @@ -47,7 +51,9 @@ public class TrinoGlueCatalogFactory @Inject public TrinoGlueCatalogFactory( + CatalogName catalogName, HdfsEnvironment hdfsEnvironment, + TypeManager typeManager, IcebergTableOperationsProvider tableOperationsProvider, NodeVersion nodeVersion, GlueHiveMetastoreConfig glueConfig, @@ -55,7 +61,9 @@ public TrinoGlueCatalogFactory( IcebergConfig icebergConfig, GlueMetastoreStats stats) { + this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString(); requireNonNull(glueConfig, "glueConfig is null"); @@ -77,6 +85,15 @@ public GlueMetastoreStats getStats() @Override public TrinoCatalog create(ConnectorIdentity identity) { - return new TrinoGlueCatalog(hdfsEnvironment, tableOperationsProvider, trinoVersion, glueClient, stats, defaultSchemaLocation, isUniqueTableLocation); + return new TrinoGlueCatalog( + catalogName, + hdfsEnvironment, + typeManager, + tableOperationsProvider, + trinoVersion, + glueClient, + stats, + defaultSchemaLocation, + isUniqueTableLocation); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 1fa156896c47..3b3dc7875e84 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -14,7 +14,6 @@ package io.trino.plugin.iceberg.catalog.hms; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.log.Logger; import io.trino.plugin.base.CatalogName; @@ -31,16 +30,12 @@ import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.plugin.hive.util.HiveUtil; import io.trino.plugin.iceberg.ColumnIdentity; -import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition; -import io.trino.plugin.iceberg.IcebergUtil; import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; import io.trino.spi.TrinoException; import io.trino.spi.connector.CatalogSchemaTableName; -import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; -import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.MaterializedViewNotFoundException; import io.trino.spi.connector.SchemaNotFoundException; @@ -49,8 +44,6 @@ import io.trino.spi.connector.ViewNotFoundException; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.type.TypeManager; -import net.jodah.failsafe.Failsafe; -import net.jodah.failsafe.RetryPolicy; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; @@ -61,9 +54,6 @@ import org.apache.iceberg.Transaction; import java.io.IOException; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -72,55 +62,47 @@ import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkState; -import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.HiveMetadata.STORAGE_TABLE; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.hive.HiveType.HIVE_STRING; -import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG; +import static io.trino.plugin.hive.ViewReaderUtil.ICEBERG_MATERIALIZED_VIEW_COMMENT; import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData; import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView; import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; +import static io.trino.plugin.hive.ViewReaderUtil.isTrinoMaterializedView; import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; import static io.trino.plugin.hive.metastore.StorageFormat.VIEW_STORAGE_FORMAT; import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; import static io.trino.plugin.hive.util.HiveWriteUtils.getTableDefaultLocation; -import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.decodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.encodeMaterializedViewData; import static io.trino.plugin.iceberg.IcebergMaterializedViewDefinition.fromConnectorMaterializedViewDefinition; import static io.trino.plugin.iceberg.IcebergSchemaProperties.getSchemaLocation; import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName; -import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY; -import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable; import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; -import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields; import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; +import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE; import static io.trino.spi.connector.SchemaTableName.schemaTableName; import static java.util.Objects.requireNonNull; -import static java.util.UUID.randomUUID; import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW; import static org.apache.iceberg.CatalogUtil.dropTableData; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; public class TrinoHiveCatalog extends AbstractTrinoCatalog { private static final Logger log = Logger.get(TrinoHiveCatalog.class); - private static final String ICEBERG_MATERIALIZED_VIEW_COMMENT = "Presto Materialized View"; public static final String DEPENDS_ON_TABLES = "dependsOnTables"; - private final CatalogName catalogName; private final CachingHiveMetastore metastore; private final HdfsEnvironment hdfsEnvironment; - private final TypeManager typeManager; private final boolean isUsingSystemSecurity; private final boolean deleteSchemaLocationsFallback; @@ -137,11 +119,9 @@ public TrinoHiveCatalog( boolean isUsingSystemSecurity, boolean deleteSchemaLocationsFallback) { - super(tableOperationsProvider, trinoVersion, useUniqueTableLocation); - this.catalogName = requireNonNull(catalogName, "catalogName is null"); + super(catalogName, typeManager, tableOperationsProvider, trinoVersion, useUniqueTableLocation); this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); - this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.isUsingSystemSecurity = isUsingSystemSecurity; this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback; } @@ -452,49 +432,36 @@ public List listMaterializedViews(ConnectorSession session, Opt } @Override - public void createMaterializedView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorMaterializedViewDefinition definition, - boolean replace, boolean ignoreExisting) + public void createMaterializedView( + ConnectorSession session, + SchemaTableName viewName, + ConnectorMaterializedViewDefinition definition, + boolean replace, + boolean ignoreExisting) { - Optional existing = metastore.getTable(schemaViewName.getSchemaName(), schemaViewName.getTableName()); + Optional existing = metastore.getTable(viewName.getSchemaName(), viewName.getTableName()); - // It's a create command where the materialized view already exists and 'if not exists' clause is not specified - if (!replace && existing.isPresent()) { - if (ignoreExisting) { - return; + if (existing.isPresent()) { + if (!isTrinoMaterializedView(existing.get().getTableType(), existing.get().getParameters())) { + throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Existing table is not a Materialized View: " + viewName); + } + if (!replace) { + if (ignoreExisting) { + return; + } + throw new TrinoException(ALREADY_EXISTS, "Materialized view already exists: " + viewName); } - throw new TrinoException(ALREADY_EXISTS, "Materialized view already exists: " + schemaViewName); } - // Generate a storage table name and create a storage table. The properties in the definition are table properties for the - // storage table as indicated in the materialized view definition. - String storageTableName = "st_" + randomUUID().toString().replace("-", ""); - Map storageTableProperties = new HashMap<>(definition.getProperties()); - storageTableProperties.putIfAbsent(FILE_FORMAT_PROPERTY, DEFAULT_FILE_FORMAT_DEFAULT); - - SchemaTableName storageTable = new SchemaTableName(schemaViewName.getSchemaName(), storageTableName); - List columns = definition.getColumns().stream() - .map(column -> new ColumnMetadata(column.getName(), typeManager.getType(column.getType()))) - .collect(toImmutableList()); - - ConnectorTableMetadata tableMetadata = new ConnectorTableMetadata(storageTable, columns, storageTableProperties, Optional.empty()); - Transaction transaction = IcebergUtil.newCreateTableTransaction(this, tableMetadata, session); - transaction.newAppend().commit(); - transaction.commitTransaction(); + SchemaTableName storageTable = createMaterializedViewStorageTable(session, viewName, definition); // Create a view indicating the storage table - Map viewProperties = ImmutableMap.builder() - .put(PRESTO_QUERY_ID_NAME, session.getQueryId()) - .put(STORAGE_TABLE, storageTableName) - .put(PRESTO_VIEW_FLAG, "true") - .put(TRINO_CREATED_BY, TRINO_CREATED_BY_VALUE) - .put(TABLE_COMMENT, ICEBERG_MATERIALIZED_VIEW_COMMENT) - .buildOrThrow(); - + Map viewProperties = createMaterializedViewProperties(session, storageTable.getTableName()); Column dummyColumn = new Column("dummy", HIVE_STRING, Optional.empty()); io.trino.plugin.hive.metastore.Table.Builder tableBuilder = io.trino.plugin.hive.metastore.Table.builder() - .setDatabaseName(schemaViewName.getSchemaName()) - .setTableName(schemaViewName.getTableName()) + .setDatabaseName(viewName.getSchemaName()) + .setTableName(viewName.getTableName()) .setOwner(isUsingSystemSecurity ? Optional.empty() : Optional.of(session.getUser())) .setTableType(VIRTUAL_VIEW.name()) .setDataColumns(ImmutableList.of(dummyColumn)) @@ -504,17 +471,17 @@ public void createMaterializedView(ConnectorSession session, SchemaTableName sch .withStorage(storage -> storage.setLocation("")) .setViewOriginalText(Optional.of( encodeMaterializedViewData(fromConnectorMaterializedViewDefinition(definition)))) - .setViewExpandedText(Optional.of("/* Presto Materialized View */")); + .setViewExpandedText(Optional.of("/* " + ICEBERG_MATERIALIZED_VIEW_COMMENT + " */")); io.trino.plugin.hive.metastore.Table table = tableBuilder.build(); PrincipalPrivileges principalPrivileges = isUsingSystemSecurity ? NO_PRIVILEGES : buildInitialPrivilegeSet(session.getUser()); if (existing.isPresent() && replace) { // drop the current storage table String oldStorageTable = existing.get().getParameters().get(STORAGE_TABLE); if (oldStorageTable != null) { - metastore.dropTable(schemaViewName.getSchemaName(), oldStorageTable, true); + metastore.dropTable(viewName.getSchemaName(), oldStorageTable, true); } // Replace the existing view definition - metastore.replaceTable(schemaViewName.getSchemaName(), schemaViewName.getTableName(), table, principalPrivileges); + metastore.replaceTable(viewName.getSchemaName(), viewName.getTableName(), table, principalPrivileges); return; } // create the view definition @@ -522,62 +489,47 @@ public void createMaterializedView(ConnectorSession session, SchemaTableName sch } @Override - public void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + public void dropMaterializedView(ConnectorSession session, SchemaTableName viewName) { - io.trino.plugin.hive.metastore.Table view = metastore.getTable(schemaViewName.getSchemaName(), schemaViewName.getTableName()) - .orElseThrow(() -> new MaterializedViewNotFoundException(schemaViewName)); + io.trino.plugin.hive.metastore.Table view = metastore.getTable(viewName.getSchemaName(), viewName.getTableName()) + .orElseThrow(() -> new MaterializedViewNotFoundException(viewName)); + + if (!isTrinoMaterializedView(view.getTableType(), view.getParameters())) { + throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + viewName); + } String storageTableName = view.getParameters().get(STORAGE_TABLE); if (storageTableName != null) { try { - metastore.dropTable(schemaViewName.getSchemaName(), storageTableName, true); + metastore.dropTable(viewName.getSchemaName(), storageTableName, true); } catch (TrinoException e) { - log.warn(e, "Failed to drop storage table '%s' for materialized view '%s'", storageTableName, schemaViewName); + log.warn(e, "Failed to drop storage table '%s' for materialized view '%s'", storageTableName, viewName); } } - metastore.dropTable(schemaViewName.getSchemaName(), schemaViewName.getTableName(), true); + metastore.dropTable(viewName.getSchemaName(), viewName.getTableName(), true); } @Override - public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + protected Optional doGetMaterializedView(ConnectorSession session, SchemaTableName viewName) { - try { - return Failsafe.with(new RetryPolicy<>() - .withMaxAttempts(10) - .withBackoff(1, 5_000, ChronoUnit.MILLIS, 4) - .withMaxDuration(Duration.ofSeconds(30)) - .abortOn(failure -> !(failure instanceof MaterializedViewMayBeBeingRemovedException))) - .get(() -> doGetMaterializedView(session, schemaViewName)); - } - catch (MaterializedViewMayBeBeingRemovedException e) { - throwIfUnchecked(e.getCause()); - throw new RuntimeException(e.getCause()); - } - } - - private Optional doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) - { - Optional tableOptional = metastore.getTable(schemaViewName.getSchemaName(), schemaViewName.getTableName()); + Optional tableOptional = metastore.getTable(viewName.getSchemaName(), viewName.getTableName()); if (tableOptional.isEmpty()) { return Optional.empty(); } io.trino.plugin.hive.metastore.Table table = tableOptional.get(); - if (!isPrestoView(table) || !isHiveOrPrestoView(table) || !table.getParameters().containsKey(STORAGE_TABLE)) { + if (!isTrinoMaterializedView(table.getTableType(), table.getParameters())) { return Optional.empty(); } io.trino.plugin.hive.metastore.Table materializedView = tableOptional.get(); String storageTable = materializedView.getParameters().get(STORAGE_TABLE); - checkState(storageTable != null, "Storage table missing in definition of materialized view " + schemaViewName); - - IcebergMaterializedViewDefinition definition = decodeMaterializedViewData(materializedView.getViewOriginalText() - .orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, "No view original text: " + schemaViewName))); + checkState(storageTable != null, "Storage table missing in definition of materialized view " + viewName); Table icebergTable; try { - icebergTable = loadTable(session, new SchemaTableName(schemaViewName.getSchemaName(), storageTable)); + icebergTable = loadTable(session, new SchemaTableName(viewName.getSchemaName(), storageTable)); } catch (RuntimeException e) { // The materialized view could be removed concurrently. This may manifest in a number of ways, e.g. @@ -585,27 +537,17 @@ private Optional doGetMaterializedView(Conn // - org.apache.iceberg.exceptions.NotFoundException when accessing manifest file // - other failures when reading storage table's metadata files // Retry, as we're catching broadly. - metastore.invalidateTable(schemaViewName.getSchemaName(), schemaViewName.getTableName()); - metastore.invalidateTable(schemaViewName.getSchemaName(), storageTable); + metastore.invalidateTable(viewName.getSchemaName(), viewName.getTableName()); + metastore.invalidateTable(viewName.getSchemaName(), storageTable); throw new MaterializedViewMayBeBeingRemovedException(e); } - ImmutableMap.Builder properties = ImmutableMap.builder(); - properties.put(FILE_FORMAT_PROPERTY, IcebergUtil.getFileFormat(icebergTable)); - if (!icebergTable.spec().fields().isEmpty()) { - properties.put(PARTITIONING_PROPERTY, toPartitionFields(icebergTable.spec())); - } - - return Optional.of(new ConnectorMaterializedViewDefinition( - definition.getOriginalSql(), - Optional.of(new CatalogSchemaTableName(catalogName.toString(), new SchemaTableName(schemaViewName.getSchemaName(), storageTable))), - definition.getCatalog(), - definition.getSchema(), - definition.getColumns().stream() - .map(column -> new ConnectorMaterializedViewDefinition.Column(column.getName(), column.getType())) - .collect(toImmutableList()), - definition.getComment(), - materializedView.getOwner(), - properties.buildOrThrow())); + return Optional.of(getMaterializedViewDefinition( + viewName, + icebergTable, + table.getOwner(), + materializedView.getViewOriginalText() + .orElseThrow(() -> new TrinoException(HIVE_INVALID_METADATA, "No view original text: " + viewName)), + storageTable)); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMaterializedViews.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java similarity index 88% rename from plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMaterializedViews.java rename to plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java index d572d2bbebfc..280364e264d2 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMaterializedViews.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java @@ -20,7 +20,6 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.sql.tree.ExplainType; import io.trino.testing.AbstractTestQueryFramework; -import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedRow; import io.trino.transaction.TransactionId; @@ -33,7 +32,6 @@ import java.util.Set; import static com.google.common.collect.ImmutableSet.toImmutableSet; -import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; import static io.trino.testing.MaterializedResult.DEFAULT_PRECISION; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DELETE_TABLE; import static io.trino.testing.TestingAccessControlManager.TestingPrivilegeType.DROP_MATERIALIZED_VIEW; @@ -50,15 +48,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -public class TestIcebergMaterializedViews +public abstract class BaseIcebergMaterializedViewTest extends AbstractTestQueryFramework { - @Override - protected DistributedQueryRunner createQueryRunner() - throws Exception - { - return createIcebergQueryRunner(); - } + protected abstract String getSchemaName(); @BeforeClass public void setUp() @@ -77,7 +70,7 @@ public void setUp() public void testShowTables() { assertUpdate("CREATE MATERIALIZED VIEW materialized_view_show_tables_test AS SELECT * FROM base_table1"); - SchemaTableName storageTableName = getStorageTable("iceberg", "tpch", "materialized_view_show_tables_test"); + SchemaTableName storageTableName = getStorageTable("iceberg", "materialized_view_show_tables_test"); Set expectedTables = ImmutableSet.of("base_table1", "base_table2", "materialized_view_show_tables_test", storageTableName.getTableName()); Set actualTables = computeActual("SHOW TABLES").getOnlyColumnAsSet().stream() @@ -111,7 +104,7 @@ public void testMaterializedViewsMetadata() "VALUES ('%s', '%s', '%s')", catalogName, schemaName, - getStorageTable(catalogName, schemaName, materializedViewName))); + getStorageTable(catalogName, materializedViewName))); // test freshness update assertQuery( @@ -151,6 +144,7 @@ public void testCreateWithDuplicateSourceTableSucceeds() assertUpdate("REFRESH MATERIALIZED VIEW materialized_view_with_duplicate_source", 12); assertQuery("SELECT count(*) FROM materialized_view_with_duplicate_source", "VALUES 12"); + assertUpdate("DROP MATERIALIZED VIEW materialized_view_with_duplicate_source"); } @Test @@ -162,7 +156,7 @@ public void testShowCreate() assertQuery("SELECT COUNT(*) FROM materialized_view_with_property", "VALUES 6"); assertThat(computeActual("SHOW CREATE MATERIALIZED VIEW materialized_view_with_property").getOnlyValue()) .isEqualTo( - "CREATE MATERIALIZED VIEW iceberg.tpch.materialized_view_with_property\n" + + "CREATE MATERIALIZED VIEW iceberg." + getSchemaName() + ".materialized_view_with_property\n" + "WITH (\n" + " format = 'ORC',\n" + " partitioning = ARRAY['_date']\n" + @@ -189,17 +183,18 @@ public void testSessionCatalogSchema() .setCatalog("tpch") .setSchema("tiny") .build(); - assertUpdate(session, "CREATE MATERIALIZED VIEW iceberg.tpch.materialized_view_session_test AS SELECT * FROM nation"); - assertQuery(session, "SELECT COUNT(*) FROM iceberg.tpch.materialized_view_session_test", "VALUES 25"); - assertUpdate(session, "DROP MATERIALIZED VIEW iceberg.tpch.materialized_view_session_test"); + String qualifiedMaterializedViewName = "iceberg." + getSchemaName() + ".materialized_view_session_test"; + assertUpdate(session, "CREATE MATERIALIZED VIEW " + qualifiedMaterializedViewName + " AS SELECT * FROM nation"); + assertQuery(session, "SELECT COUNT(*) FROM " + qualifiedMaterializedViewName, "VALUES 25"); + assertUpdate(session, "DROP MATERIALIZED VIEW " + qualifiedMaterializedViewName); session = Session.builder(getSession()) .setCatalog(Optional.empty()) .setSchema(Optional.empty()) .build(); - assertUpdate(session, "CREATE MATERIALIZED VIEW iceberg.tpch.materialized_view_session_test AS SELECT * FROM iceberg.tpch.base_table1"); - assertQuery(session, "SELECT COUNT(*) FROM iceberg.tpch.materialized_view_session_test", "VALUES 6"); - assertUpdate(session, "DROP MATERIALIZED VIEW iceberg.tpch.materialized_view_session_test"); + assertUpdate(session, "CREATE MATERIALIZED VIEW " + qualifiedMaterializedViewName + " AS SELECT * FROM iceberg." + getSchemaName() + ".base_table1"); + assertQuery(session, "SELECT COUNT(*) FROM " + qualifiedMaterializedViewName, "VALUES 6"); + assertUpdate(session, "DROP MATERIALIZED VIEW " + qualifiedMaterializedViewName); } @Test @@ -207,7 +202,7 @@ public void testDropIfExists() { assertQueryFails( "DROP MATERIALIZED VIEW non_existing_materialized_view", - "line 1:1: Materialized view 'iceberg.tpch.non_existing_materialized_view' does not exist"); + "line 1:1: Materialized view 'iceberg." + getSchemaName() + ".non_existing_materialized_view' does not exist"); assertUpdate("DROP MATERIALIZED VIEW IF EXISTS non_existing_materialized_view"); } @@ -249,7 +244,7 @@ public void testRefreshDenyPermission() public void testRefreshAllowedWithRestrictedStorageTable() { assertUpdate("CREATE MATERIALIZED VIEW materialized_view_refresh AS SELECT * FROM base_table1"); - SchemaTableName storageTable = getStorageTable("iceberg", "tpch", "materialized_view_refresh"); + SchemaTableName storageTable = getStorageTable("iceberg", "materialized_view_refresh"); assertAccessAllowed( "REFRESH MATERIALIZED VIEW materialized_view_refresh", @@ -426,11 +421,12 @@ public void testSqlFeatures() plan = getExplainPlan("SELECT * from materialized_view_subquery", ExplainType.Type.IO); assertThat(plan).doesNotContain("base_table1"); + String qualifiedMaterializedViewName = "iceberg." + getSchemaName() + ".materialized_view_window"; assertQueryFails("show create view materialized_view_window", - "line 1:1: Relation 'iceberg.tpch.materialized_view_window' is a materialized view, not a view"); + "line 1:1: Relation '" + qualifiedMaterializedViewName + "' is a materialized view, not a view"); assertThat(computeScalar("show create materialized view materialized_view_window")) - .isEqualTo("CREATE MATERIALIZED VIEW iceberg.tpch.materialized_view_window\n" + + .isEqualTo("CREATE MATERIALIZED VIEW " + qualifiedMaterializedViewName + "\n" + "WITH (\n" + " format = 'ORC',\n" + " partitioning = ARRAY['_date']\n" + @@ -477,6 +473,35 @@ public void testReplace() assertUpdate("DROP MATERIALIZED VIEW materialized_view_replace"); } + @Test + public void testCreateMaterializedViewWhenTableExists() + { + assertUpdate("CREATE TABLE test_create_materialized_view_when_table_exists (a INT, b INT)"); + assertThatThrownBy(() -> query("CREATE OR REPLACE MATERIALIZED VIEW test_create_materialized_view_when_table_exists AS SELECT sum(1) AS num_rows FROM base_table2")) + .hasMessage("Existing table is not a Materialized View: " + getSchemaName() + ".test_create_materialized_view_when_table_exists"); + assertThatThrownBy(() -> query("CREATE MATERIALIZED VIEW IF NOT EXISTS test_create_materialized_view_when_table_exists AS SELECT sum(1) AS num_rows FROM base_table2")) + .hasMessage("Existing table is not a Materialized View: " + getSchemaName() + ".test_create_materialized_view_when_table_exists"); + assertUpdate("DROP TABLE test_create_materialized_view_when_table_exists"); + } + + @Test + public void testDropMaterializedViewCannotDropTable() + { + assertUpdate("CREATE TABLE test_drop_materialized_view_cannot_drop_table (a INT, b INT)"); + assertThatThrownBy(() -> query("DROP MATERIALIZED VIEW test_drop_materialized_view_cannot_drop_table")) + .hasMessageContaining("Materialized view 'iceberg." + getSchemaName() + ".test_drop_materialized_view_cannot_drop_table' does not exist, but a table with that name exists"); + assertUpdate("DROP TABLE test_drop_materialized_view_cannot_drop_table"); + } + + @Test + public void testRenameMaterializedViewCannotRenameTable() + { + assertUpdate("CREATE TABLE test_rename_materialized_view_cannot_rename_table (a INT, b INT)"); + assertThatThrownBy(() -> query("ALTER MATERIALIZED VIEW test_rename_materialized_view_cannot_rename_table RENAME TO new_materialized_view_name")) + .hasMessageContaining("Materialized View 'iceberg." + getSchemaName() + ".test_rename_materialized_view_cannot_rename_table' does not exist, but a table with that name exists"); + assertUpdate("DROP TABLE test_rename_materialized_view_cannot_rename_table"); + } + @Test public void testNestedMaterializedViews() { @@ -511,13 +536,13 @@ public void testNestedMaterializedViews() assertUpdate("DROP MATERIALIZED VIEW materialized_view_level2"); } - private SchemaTableName getStorageTable(String catalogName, String schemaName, String objectName) + private SchemaTableName getStorageTable(String catalogName, String objectName) { TransactionManager transactionManager = getQueryRunner().getTransactionManager(); TransactionId transactionId = transactionManager.beginTransaction(false); Session session = getSession().beginTransactionId(transactionId, transactionManager, getQueryRunner().getAccessControl()); Optional materializedView = getQueryRunner().getMetadata() - .getMaterializedView(session, new QualifiedObjectName(catalogName, schemaName, objectName)); + .getMaterializedView(session, new QualifiedObjectName(catalogName, getSchemaName(), objectName)); assertThat(materializedView).isPresent(); return materializedView.get().getStorageTable().get().getSchemaTableName(); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMaterializedViewTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMaterializedViewTest.java new file mode 100644 index 000000000000..66a1bbc3fa15 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMaterializedViewTest.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.testing.DistributedQueryRunner; + +import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; + +public class TestIcebergMaterializedViewTest + extends BaseIcebergMaterializedViewTest +{ + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + return createIcebergQueryRunner(); + } + + @Override + protected String getSchemaName() + { + return "tpch"; + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java index ebc818f148fe..0147055f3f57 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java @@ -160,7 +160,7 @@ public void testSelect() assertGlueMetastoreApiInvocations("SELECT * FROM test_select_from", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 2) + .addCopies(GET_TABLE, 3) .build()); } finally { @@ -176,7 +176,7 @@ public void testSelectWithFilter() assertGlueMetastoreApiInvocations("SELECT * FROM test_select_from_where WHERE age = 2", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 2) + .addCopies(GET_TABLE, 3) .build()); } finally { @@ -193,7 +193,7 @@ public void testSelectFromView() assertGlueMetastoreApiInvocations("SELECT * FROM test_select_view_view", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 3) + .addCopies(GET_TABLE, 5) .build()); } finally { @@ -211,7 +211,7 @@ public void testSelectFromViewWithFilter() assertGlueMetastoreApiInvocations("SELECT * FROM test_select_view_where_view WHERE age = 2", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 3) + .addCopies(GET_TABLE, 5) .build()); } finally { @@ -283,7 +283,7 @@ public void testJoin() assertGlueMetastoreApiInvocations("SELECT name, age FROM test_join_t1 JOIN test_join_t2 ON test_join_t2.id = test_join_t1.id", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 4) + .addCopies(GET_TABLE, 6) .build()); } finally { @@ -300,7 +300,7 @@ public void testSelfJoin() assertGlueMetastoreApiInvocations("SELECT child.age, parent.age FROM test_self_join_table child JOIN test_self_join_table parent ON child.parent = parent.id", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 3) + .addCopies(GET_TABLE, 5) .build()); } finally { @@ -316,7 +316,7 @@ public void testExplainSelect() assertGlueMetastoreApiInvocations("EXPLAIN SELECT * FROM test_explain", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 2) + .addCopies(GET_TABLE, 3) .build()); } finally { @@ -332,7 +332,7 @@ public void testShowStatsForTable() assertGlueMetastoreApiInvocations("SHOW STATS FOR test_show_stats", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 2) + .addCopies(GET_TABLE, 3) .build()); } finally { @@ -348,7 +348,7 @@ public void testShowStatsForTableWithFilter() assertGlueMetastoreApiInvocations("SHOW STATS FOR (SELECT * FROM test_show_stats_with_filter where age >= 2)", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 2) + .addCopies(GET_TABLE, 3) .build()); } finally { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java index f0a8510e8330..40085361001a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java @@ -124,14 +124,6 @@ public void testShowCreateTable() schemaPath())); } - @Test - @Override - public void testMaterializedView() - { - assertThatThrownBy(super::testMaterializedView) - .hasStackTraceContaining("createMaterializedView is not supported for Iceberg Glue catalogs"); - } - @Test @Override public void testRenameSchema() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogMaterializedViewTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogMaterializedViewTest.java new file mode 100644 index 000000000000..40713292932b --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogMaterializedViewTest.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder; +import com.amazonaws.services.glue.model.BatchDeleteTableRequest; +import com.amazonaws.services.glue.model.DeleteDatabaseRequest; +import com.amazonaws.services.glue.model.GetTablesRequest; +import com.amazonaws.services.glue.model.GetTablesResult; +import com.amazonaws.services.glue.model.Table; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreApiStats; +import io.trino.plugin.iceberg.BaseIcebergMaterializedViewTest; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.plugin.iceberg.SchemaInitializer; +import io.trino.testing.QueryRunner; +import org.testng.annotations.AfterClass; + +import java.io.File; +import java.nio.file.Files; +import java.util.Collection; +import java.util.Set; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; +import static io.trino.testing.sql.TestTable.randomTableSuffix; + +public class TestIcebergGlueCatalogMaterializedViewTest + extends BaseIcebergMaterializedViewTest +{ + private final String schemaName = "test_iceberg_materialized_view_" + randomTableSuffix(); + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + File tempDir = Files.createTempDirectory("test_iceberg").toFile(); + tempDir.deleteOnExit(); + + return IcebergQueryRunner.builder() + .setIcebergProperties( + ImmutableMap.of( + "iceberg.catalog.type", "glue", + "hive.metastore.glue.default-warehouse-dir", tempDir.getAbsolutePath())) + .setSchemaInitializer( + SchemaInitializer.builder() + .withClonedTpchTables(ImmutableList.of()) + .withSchemaName(schemaName) + .build()) + .build(); + } + + @Override + protected String getSchemaName() + { + return schemaName; + } + + @AfterClass(alwaysRun = true) + public void cleanup() + { + AWSGlueAsync glueClient = AWSGlueAsyncClientBuilder.defaultClient(); + Set tableNames = getPaginatedResults( + glueClient::getTables, + new GetTablesRequest().withDatabaseName(schemaName), + GetTablesRequest::setNextToken, + GetTablesResult::getNextToken, + new GlueMetastoreApiStats()) + .map(GetTablesResult::getTableList) + .flatMap(Collection::stream) + .map(Table::getName) + .collect(toImmutableSet()); + glueClient.batchDeleteTable(new BatchDeleteTableRequest() + .withDatabaseName(schemaName) + .withTablesToDelete(tableNames)); + glueClient.deleteDatabase(new DeleteDatabaseRequest() + .withName(schemaName)); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalogTest.java index f2431402ccc1..5d00c3ba492f 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalogTest.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.log.Logger; +import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsConfig; import io.trino.plugin.hive.HdfsConfigurationInitializer; import io.trino.plugin.hive.HdfsEnvironment; @@ -31,6 +32,7 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.spi.security.PrincipalType; import io.trino.spi.security.TrinoPrincipal; +import io.trino.spi.type.TestingTypeManager; import org.testng.annotations.Test; import java.io.File; @@ -59,7 +61,9 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations) new HdfsConfig(), new NoHdfsAuthentication()); return new TrinoGlueCatalog( + new CatalogName("catalog_name"), hdfsEnvironment, + new TestingTypeManager(), new GlueIcebergTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment), new GlueMetastoreStats(), new GlueHiveMetastoreConfig(), DefaultAWSCredentialsProviderChain.getInstance()), "test", AWSGlueAsyncClientBuilder.defaultClient(), @@ -83,7 +87,9 @@ public void testDefaultLocation() new HdfsConfig(), new NoHdfsAuthentication()); TrinoCatalog catalogWithDefaultLocation = new TrinoGlueCatalog( + new CatalogName("catalog_name"), hdfsEnvironment, + new TestingTypeManager(), new GlueIcebergTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment), new GlueMetastoreStats(), new GlueHiveMetastoreConfig(), DefaultAWSCredentialsProviderChain.getInstance()), "test", AWSGlueAsyncClientBuilder.defaultClient(),