diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java index 76d2df723946..07cff19974cd 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/ViewReaderUtil.java @@ -166,6 +166,11 @@ public static class PrestoViewReader { @Override public ConnectorViewDefinition decodeViewData(String viewData, Table table, CatalogName catalogName) + { + return decodeViewData(viewData); + } + + public static ConnectorViewDefinition decodeViewData(String viewData) { checkCondition(viewData.startsWith(VIEW_PREFIX), HIVE_INVALID_VIEW_DATA, "View data missing prefix: %s", viewData); checkCondition(viewData.endsWith(VIEW_SUFFIX), HIVE_INVALID_VIEW_DATA, "View data missing suffix: %s", viewData); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index 23a35e5ef5d2..97ab4492f84d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -13,10 +13,15 @@ */ package io.trino.plugin.iceberg.catalog; +import com.google.common.collect.ImmutableMap; import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.HiveMetadata; +import io.trino.plugin.hive.HiveViewNotSupportedException; +import io.trino.plugin.hive.ViewReaderUtil; import io.trino.plugin.iceberg.ColumnIdentity; import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.SchemaTableName; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,8 +36,13 @@ import java.util.Map; import java.util.Optional; +import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG; +import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView; +import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; +import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -42,14 +52,25 @@ public abstract class AbstractTrinoCatalog implements TrinoCatalog { + // Be compatible with views defined by the Hive connector, which can be useful under certain conditions. + protected static final String TRINO_CREATED_BY = HiveMetadata.TRINO_CREATED_BY; + protected static final String TRINO_CREATED_BY_VALUE = "Trino Iceberg connector"; + protected static final String PRESTO_VIEW_COMMENT = HiveMetadata.PRESTO_VIEW_COMMENT; + protected static final String PRESTO_VERSION_NAME = HiveMetadata.PRESTO_VERSION_NAME; + protected static final String PRESTO_QUERY_ID_NAME = HiveMetadata.PRESTO_QUERY_ID_NAME; + protected static final String PRESTO_VIEW_EXPANDED_TEXT_MARKER = HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER; + protected final IcebergTableOperationsProvider tableOperationsProvider; + private final String trinoVersion; private final boolean useUniqueTableLocation; protected AbstractTrinoCatalog( IcebergTableOperationsProvider tableOperationsProvider, + String trinoVersion, boolean useUniqueTableLocation) { this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); this.useUniqueTableLocation = useUniqueTableLocation; } @@ -72,6 +93,26 @@ public void updateColumnComment(ConnectorSession session, SchemaTableName schema icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit(); } + @Override + public Map getViews(ConnectorSession session, Optional namespace) + { + ImmutableMap.Builder views = ImmutableMap.builder(); + for (SchemaTableName name : listViews(session, namespace)) { + try { + getView(session, name).ifPresent(view -> views.put(name, view)); + } + catch (TrinoException e) { + if (e.getErrorCode().equals(TABLE_NOT_FOUND.toErrorCode())) { + // Ignore view that was dropped during query execution (race condition) + } + else { + throw e; + } + } + } + return views.buildOrThrow(); + } + protected Transaction newCreateTableTransaction( ConnectorSession session, SchemaTableName schemaTableName, @@ -115,4 +156,54 @@ protected void deleteTableDirectory( throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format("Failed to delete directory %s of the table %s", tableLocation, schemaTableName), e); } } + + protected Optional getView( + SchemaTableName viewName, + Optional viewOriginalText, + String tableType, + Map tableParameters, + Optional tableOwner) + { + if (!isView(tableType, tableParameters)) { + // Filter out Tables and Materialized Views + return Optional.empty(); + } + + if (!isPrestoView(tableParameters)) { + // Hive views are not compatible + throw new HiveViewNotSupportedException(viewName); + } + + checkArgument(viewOriginalText.isPresent(), "viewOriginalText must be present"); + ConnectorViewDefinition definition = ViewReaderUtil.PrestoViewReader.decodeViewData(viewOriginalText.get()); + // use owner from table metadata if it exists + if (tableOwner.isPresent() && !definition.isRunAsInvoker()) { + definition = new ConnectorViewDefinition( + definition.getOriginalSql(), + definition.getCatalog(), + definition.getSchema(), + definition.getColumns(), + definition.getComment(), + tableOwner, + false); + } + return Optional.of(definition); + } + + private static boolean isView(String tableType, Map tableParameters) + + { + return isHiveOrPrestoView(tableType) && PRESTO_VIEW_COMMENT.equals(tableParameters.get(TABLE_COMMENT)); + } + + protected Map createViewProperties(ConnectorSession session) + { + return ImmutableMap.builder() + .put(PRESTO_VIEW_FLAG, "true") // Ensures compatibility with views created by the Hive connector + .put(TRINO_CREATED_BY, TRINO_CREATED_BY_VALUE) + .put(PRESTO_VERSION_NAME, trinoVersion) + .put(PRESTO_QUERY_ID_NAME, session.getQueryId()) + .put(TABLE_COMMENT, PRESTO_VIEW_COMMENT) + .buildOrThrow(); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index bd36f2be8f3c..514ab45de430 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -101,7 +101,7 @@ Transaction newCreateTableTransaction( Map getViews(ConnectorSession session, Optional namespace); - Optional getView(ConnectorSession session, SchemaTableName viewIdentifier); + Optional getView(ConnectorSession session, SchemaTableName viewName); List listMaterializedViews(ConnectorSession session, Optional namespace); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java index 579be5e949fb..9725260f9a27 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergUtil.java @@ -15,10 +15,14 @@ import com.amazonaws.services.glue.model.TableInput; +import javax.annotation.Nullable; + import java.util.Map; import java.util.Optional; +import static io.trino.plugin.hive.HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER; import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE; +import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW; public final class GlueIcebergUtil { @@ -33,4 +37,15 @@ public static TableInput getTableInput(String tableName, Optional owner, // Iceberg does not distinguish managed and external tables, all tables are treated the same and marked as EXTERNAL .withTableType(EXTERNAL_TABLE.name()); } + + public static TableInput getViewTableInput(String viewName, String viewOriginalText, @Nullable String owner, Map parameters) + { + return new TableInput() + .withName(viewName) + .withTableType(VIRTUAL_VIEW.name()) + .withViewOriginalText(viewOriginalText) + .withViewExpandedText(PRESTO_VIEW_EXPANDED_TEXT_MARKER) + .withOwner(owner) + .withParameters(parameters); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 0dd21eac0794..0a1b26ebf8ed 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -30,10 +30,12 @@ import com.amazonaws.services.glue.model.GetTablesRequest; import com.amazonaws.services.glue.model.GetTablesResult; import com.amazonaws.services.glue.model.TableInput; +import com.amazonaws.services.glue.model.UpdateTableRequest; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.SchemaAlreadyExistsException; +import io.trino.plugin.hive.ViewAlreadyExistsException; import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; @@ -43,8 +45,12 @@ import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.SchemaNotFoundException; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.connector.ViewNotFoundException; import io.trino.spi.security.PrincipalType; import io.trino.spi.security.TrinoPrincipal; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; import org.apache.hadoop.fs.Path; import org.apache.iceberg.BaseTable; import org.apache.iceberg.PartitionSpec; @@ -54,6 +60,7 @@ import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -64,6 +71,8 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; +import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData; +import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; @@ -71,6 +80,7 @@ import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getTableInput; +import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getViewTableInput; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -89,12 +99,13 @@ public class TrinoGlueCatalog public TrinoGlueCatalog( HdfsEnvironment hdfsEnvironment, IcebergTableOperationsProvider tableOperationsProvider, + String trinoVersion, AWSGlueAsync glueClient, GlueMetastoreStats stats, Optional defaultSchemaLocation, boolean useUniqueTableLocation) { - super(tableOperationsProvider, useUniqueTableLocation); + super(tableOperationsProvider, trinoVersion, useUniqueTableLocation); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.glueClient = requireNonNull(glueClient, "glueClient is null"); this.stats = requireNonNull(stats, "stats is null"); @@ -211,12 +222,14 @@ public void renameNamespace(ConnectorSession session, String source, String targ @Override public List listTables(ConnectorSession session, Optional namespace) { + ImmutableList.Builder tables = ImmutableList.builder(); try { List namespaces = namespace.map(List::of).orElseGet(() -> listNamespaces(session)); - return namespaces.stream() - .flatMap(glueNamespace -> { - try { - return getPaginatedResults( + for (String glueNamespace : namespaces) { + try { + // Add all tables from a namespace together, in case it is removed while fetching paginated results + tables.addAll( + getPaginatedResults( glueClient::getTables, new GetTablesRequest().withDatabaseName(glueNamespace), GetTablesRequest::setNextToken, @@ -224,18 +237,18 @@ public List listTables(ConnectorSession session, Optional new SchemaTableName(glueNamespace, table.getName())); - } - catch (EntityNotFoundException e) { - // Namespace may have been deleted - return Stream.empty(); - } - }) - .collect(toImmutableList()); + .map(table -> new SchemaTableName(glueNamespace, table.getName())) + .collect(toImmutableList())); + } + catch (EntityNotFoundException e) { + // Namespace may have been deleted + } + } } catch (AmazonServiceException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, e); } + return tables.build(); } @Override @@ -301,11 +314,11 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa { boolean newTableCreated = false; try { - GetTableRequest getTableRequest = new GetTableRequest() - .withDatabaseName(from.getSchemaName()) - .withName(from.getTableName()); - com.amazonaws.services.glue.model.Table table = stats.getGetTable().call(() -> glueClient.getTable(getTableRequest).getTable()); - TableInput tableInput = getTableInput(to.getTableName(), Optional.ofNullable(table.getOwner()), table.getParameters()); + Optional table = getTable(from); + if (table.isEmpty()) { + throw new TableNotFoundException(from); + } + TableInput tableInput = getTableInput(to.getTableName(), Optional.ofNullable(table.get().getOwner()), table.get().getParameters()); CreateTableRequest createTableRequest = new CreateTableRequest() .withDatabaseName(to.getSchemaName()) .withTableInput(tableInput); @@ -328,6 +341,21 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa } } + private Optional getTable(SchemaTableName schemaTableName) + { + try { + return Optional.of( + stats.getGetTable().call(() -> + glueClient.getTable(new GetTableRequest() + .withDatabaseName(schemaTableName.getSchemaName()) + .withName(schemaTableName.getTableName())) + .getTable())); + } + catch (EntityNotFoundException e) { + return Optional.empty(); + } + } + private void deleteTable(String schema, String table) { stats.getDeleteTable().call(() -> @@ -377,13 +405,77 @@ public void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTa @Override public void createView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorViewDefinition definition, boolean replace) { - throw new TrinoException(NOT_SUPPORTED, "createView is not supported for Iceberg Glue catalogs"); + // If a view is created between listing the existing view and calling createTable, retry + TableInput viewTableInput = getViewTableInput(schemaViewName.getTableName(), encodeViewData(definition), session.getUser(), createViewProperties(session)); + Failsafe.with(new RetryPolicy<>() + .withMaxRetries(3) + .withDelay(Duration.ofMillis(100)) + .abortIf(throwable -> !replace || throwable instanceof ViewAlreadyExistsException)) + .run(() -> doCreateView(schemaViewName, viewTableInput, replace)); + } + + private void doCreateView(SchemaTableName schemaViewName, TableInput viewTableInput, boolean replace) + { + Optional existing = getTable(schemaViewName); + if (existing.isPresent()) { + if (!replace || !isPrestoView(existing.get().getParameters())) { + // TODO: ViewAlreadyExists is misleading if the name is used by a table https://github.com/trinodb/trino/issues/10037 + throw new ViewAlreadyExistsException(schemaViewName); + } + + stats.getUpdateTable().call(() -> + glueClient.updateTable(new UpdateTableRequest() + .withDatabaseName(schemaViewName.getSchemaName()) + .withTableInput(viewTableInput))); + return; + } + + try { + stats.getCreateTable().call(() -> + glueClient.createTable(new CreateTableRequest() + .withDatabaseName(schemaViewName.getSchemaName()) + .withTableInput(viewTableInput))); + } + catch (AlreadyExistsException e) { + throw new ViewAlreadyExistsException(schemaViewName); + } } @Override public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) { - throw new TrinoException(NOT_SUPPORTED, "renameView is not supported for Iceberg Glue catalogs"); + boolean newTableCreated = false; + try { + Optional existingView = getTable(source); + if (existingView.isEmpty()) { + throw new TableNotFoundException(source); + } + + TableInput viewTableInput = getViewTableInput( + target.getTableName(), + existingView.get().getViewOriginalText(), + existingView.get().getOwner(), + createViewProperties(session)); + CreateTableRequest createTableRequest = new CreateTableRequest() + .withDatabaseName(target.getSchemaName()) + .withTableInput(viewTableInput); + stats.getCreateTable().call(() -> glueClient.createTable(createTableRequest)); + newTableCreated = true; + deleteTable(source.getSchemaName(), source.getTableName()); + } + catch (Exception e) { + if (newTableCreated) { + try { + deleteTable(target.getSchemaName(), target.getTableName()); + } + catch (Exception cleanupException) { + if (!cleanupException.equals(e)) { + e.addSuppressed(cleanupException); + } + } + } + throw e; + } } @Override @@ -395,25 +487,63 @@ public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaVie @Override public void dropView(ConnectorSession session, SchemaTableName schemaViewName) { - throw new TrinoException(NOT_SUPPORTED, "dropView is not supported for Iceberg Glue catalogs"); - } + if (getView(session, schemaViewName).isEmpty()) { + throw new ViewNotFoundException(schemaViewName); + } - @Override - public List listViews(ConnectorSession session, Optional namespace) - { - return ImmutableList.of(); + try { + deleteTable(schemaViewName.getSchemaName(), schemaViewName.getTableName()); + } + catch (AmazonServiceException e) { + throw new TrinoException(HIVE_METASTORE_ERROR, e); + } } @Override - public Map getViews(ConnectorSession session, Optional namespace) + public List listViews(ConnectorSession session, Optional namespace) { - return ImmutableMap.of(); + try { + List namespaces = namespace.map(List::of).orElseGet(() -> listNamespaces(session)); + return namespaces.stream() + .flatMap(glueNamespace -> { + try { + return getPaginatedResults( + glueClient::getTables, + new GetTablesRequest().withDatabaseName(glueNamespace), + GetTablesRequest::setNextToken, + GetTablesResult::getNextToken, + stats.getGetTables()) + .map(GetTablesResult::getTableList) + .flatMap(List::stream) + .filter(table -> isPrestoView(table.getParameters())) + .map(table -> new SchemaTableName(glueNamespace, table.getName())); + } + catch (EntityNotFoundException e) { + // Namespace may have been deleted + return Stream.empty(); + } + }) + .collect(toImmutableList()); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } } @Override - public Optional getView(ConnectorSession session, SchemaTableName viewIdentifier) + public Optional getView(ConnectorSession session, SchemaTableName viewName) { - return Optional.empty(); + Optional table = getTable(viewName); + if (table.isEmpty()) { + return Optional.empty(); + } + com.amazonaws.services.glue.model.Table viewDefinition = table.get(); + return getView( + viewName, + Optional.ofNullable(viewDefinition.getViewOriginalText()), + viewDefinition.getTableType(), + viewDefinition.getParameters(), + Optional.ofNullable(viewDefinition.getOwner())); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java index 3e8c750a5bb2..08fc89f67cde 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java @@ -16,6 +16,7 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.glue.AWSGlueAsync; import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; import io.trino.plugin.iceberg.IcebergConfig; @@ -39,6 +40,7 @@ public class TrinoGlueCatalogFactory { private final HdfsEnvironment hdfsEnvironment; private final IcebergTableOperationsProvider tableOperationsProvider; + private final String trinoVersion; private final Optional defaultSchemaLocation; private final AWSGlueAsync glueClient; private final boolean isUniqueTableLocation; @@ -48,6 +50,7 @@ public class TrinoGlueCatalogFactory public TrinoGlueCatalogFactory( HdfsEnvironment hdfsEnvironment, IcebergTableOperationsProvider tableOperationsProvider, + NodeVersion nodeVersion, GlueHiveMetastoreConfig glueConfig, AWSCredentialsProvider credentialsProvider, IcebergConfig icebergConfig, @@ -55,6 +58,7 @@ public TrinoGlueCatalogFactory( { this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString(); requireNonNull(glueConfig, "glueConfig is null"); checkArgument(glueConfig.getCatalogId().isEmpty(), "catalogId configuration is not supported"); this.defaultSchemaLocation = glueConfig.getDefaultWarehouseDir(); @@ -75,6 +79,6 @@ public GlueMetastoreStats getStats() @Override public TrinoCatalog create(ConnectorIdentity identity) { - return new TrinoGlueCatalog(hdfsEnvironment, tableOperationsProvider, glueClient, stats, defaultSchemaLocation, isUniqueTableLocation); + return new TrinoGlueCatalog(hdfsEnvironment, tableOperationsProvider, trinoVersion, glueClient, stats, defaultSchemaLocation, isUniqueTableLocation); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 3f6165627971..1bcfad257912 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -20,12 +20,9 @@ import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; -import io.trino.plugin.hive.HiveMetadata; import io.trino.plugin.hive.HiveSchemaProperties; -import io.trino.plugin.hive.HiveViewNotSupportedException; import io.trino.plugin.hive.TableAlreadyExistsException; import io.trino.plugin.hive.ViewAlreadyExistsException; -import io.trino.plugin.hive.ViewReaderUtil; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HiveMetastore; @@ -105,7 +102,6 @@ import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; -import static io.trino.spi.StandardErrorCode.TABLE_NOT_FOUND; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW; @@ -121,24 +117,14 @@ public class TrinoHiveCatalog private static final String ICEBERG_MATERIALIZED_VIEW_COMMENT = "Presto Materialized View"; public static final String DEPENDS_ON_TABLES = "dependsOnTables"; - // Be compatible with views defined by the Hive connector, which can be useful under certain conditions. - private static final String TRINO_CREATED_BY = HiveMetadata.TRINO_CREATED_BY; - private static final String TRINO_CREATED_BY_VALUE = "Trino Iceberg connector"; - private static final String PRESTO_VIEW_COMMENT = HiveMetadata.PRESTO_VIEW_COMMENT; - private static final String PRESTO_VERSION_NAME = HiveMetadata.PRESTO_VERSION_NAME; - private static final String PRESTO_QUERY_ID_NAME = HiveMetadata.PRESTO_QUERY_ID_NAME; - private static final String PRESTO_VIEW_EXPANDED_TEXT_MARKER = HiveMetadata.PRESTO_VIEW_EXPANDED_TEXT_MARKER; - private final CatalogName catalogName; private final CachingHiveMetastore metastore; private final HdfsEnvironment hdfsEnvironment; private final TypeManager typeManager; - private final String trinoVersion; private final boolean isUsingSystemSecurity; private final boolean deleteSchemaLocationsFallback; private final Map tableMetadataCache = new ConcurrentHashMap<>(); - private final ViewReaderUtil.PrestoViewReader viewReader = new ViewReaderUtil.PrestoViewReader(); public TrinoHiveCatalog( CatalogName catalogName, @@ -151,12 +137,11 @@ public TrinoHiveCatalog( boolean isUsingSystemSecurity, boolean deleteSchemaLocationsFallback) { - super(tableOperationsProvider, useUniqueTableLocation); + super(tableOperationsProvider, trinoVersion, useUniqueTableLocation); this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.metastore = requireNonNull(metastore, "metastore is null"); this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); - this.trinoVersion = requireNonNull(trinoVersion, "trinoVersion is null"); this.isUsingSystemSecurity = isUsingSystemSecurity; this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback; } @@ -377,14 +362,6 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName, definition = definition.withoutOwner(); } - Map properties = ImmutableMap.builder() - .put(PRESTO_VIEW_FLAG, "true") - .put(TRINO_CREATED_BY, TRINO_CREATED_BY_VALUE) - .put(PRESTO_VERSION_NAME, trinoVersion) - .put(PRESTO_QUERY_ID_NAME, session.getQueryId()) - .put(TABLE_COMMENT, PRESTO_VIEW_COMMENT) - .buildOrThrow(); - io.trino.plugin.hive.metastore.Table.Builder tableBuilder = io.trino.plugin.hive.metastore.Table.builder() .setDatabaseName(schemaViewName.getSchemaName()) .setTableName(schemaViewName.getTableName()) @@ -392,7 +369,7 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName, .setTableType(org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW.name()) .setDataColumns(ImmutableList.of(new Column("dummy", HIVE_STRING, Optional.empty()))) .setPartitionColumns(ImmutableList.of()) - .setParameters(properties) + .setParameters(createViewProperties(session)) .setViewOriginalText(Optional.of(encodeViewData(definition))) .setViewExpandedText(Optional.of(PRESTO_VIEW_EXPANDED_TEXT_MARKER)); @@ -465,54 +442,18 @@ private Stream listViews(String schema) } @Override - public Map getViews(ConnectorSession session, Optional namespace) - { - ImmutableMap.Builder views = ImmutableMap.builder(); - for (SchemaTableName name : listViews(session, namespace)) { - try { - getView(session, name).ifPresent(view -> views.put(name, view)); - } - catch (TrinoException e) { - if (e.getErrorCode().equals(TABLE_NOT_FOUND.toErrorCode())) { - // Ignore view that was dropped during query execution (race condition) - } - else { - throw e; - } - } - } - return views.buildOrThrow(); - } - - @Override - public Optional getView(ConnectorSession session, SchemaTableName viewIdentifier) + public Optional getView(ConnectorSession session, SchemaTableName viewName) { - if (isHiveSystemSchema(viewIdentifier.getSchemaName())) { + if (isHiveSystemSchema(viewName.getSchemaName())) { return Optional.empty(); } - return metastore.getTable(viewIdentifier.getSchemaName(), viewIdentifier.getTableName()) - .filter(table -> HiveMetadata.PRESTO_VIEW_COMMENT.equals(table.getParameters().get(TABLE_COMMENT))) // filter out materialized views - .filter(ViewReaderUtil::canDecodeView) - .map(view -> { - if (!isPrestoView(view)) { - throw new HiveViewNotSupportedException(viewIdentifier); - } - - ConnectorViewDefinition definition = viewReader - .decodeViewData(view.getViewOriginalText().get(), view, catalogName); - // use owner from table metadata if it exists - if (view.getOwner().isPresent() && !definition.isRunAsInvoker()) { - definition = new ConnectorViewDefinition( - definition.getOriginalSql(), - definition.getCatalog(), - definition.getSchema(), - definition.getColumns(), - definition.getComment(), - view.getOwner(), - false); - } - return definition; - }); + return metastore.getTable(viewName.getSchemaName(), viewName.getTableName()) + .flatMap(view -> getView( + viewName, + view.getViewOriginalText(), + view.getTableType(), + view.getParameters(), + view.getOwner())); } @Override diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseTrinoCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseTrinoCatalogTest.java index 0e3d41e3f1ab..4c8f949c2500 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseTrinoCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseTrinoCatalogTest.java @@ -13,12 +13,15 @@ */ package io.trino.plugin.iceberg; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.spi.connector.ConnectorViewDefinition; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.security.PrincipalType; import io.trino.spi.security.TrinoPrincipal; +import io.trino.spi.type.VarcharType; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -28,6 +31,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Map; import java.util.Optional; import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; @@ -185,4 +189,81 @@ public void testUseUniqueTableLocations() } } } + + @Test + public void testView() + throws IOException + { + TrinoCatalog catalog = createTrinoCatalog(false); + Path tmpDirectory = Files.createTempDirectory("iceberg_catalog_test_create_view_"); + tmpDirectory.toFile().deleteOnExit(); + + String namespace = "test_create_view_" + randomTableSuffix(); + String viewName = "viewName"; + String renamedViewName = "renamedViewName"; + SchemaTableName schemaTableName = new SchemaTableName(namespace, viewName); + SchemaTableName renamedSchemaTableName = new SchemaTableName(namespace, renamedViewName); + ConnectorViewDefinition viewDefinition = new ConnectorViewDefinition( + "SELECT name FROM local.tiny.nation", + Optional.empty(), + Optional.empty(), + ImmutableList.of( + new ConnectorViewDefinition.ViewColumn("name", VarcharType.createVarcharType(25).getTypeId())), + Optional.empty(), + Optional.of(SESSION.getUser()), + false); + + try { + catalog.createNamespace(SESSION, namespace, ImmutableMap.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); + catalog.createView(SESSION, schemaTableName, viewDefinition, false); + + assertThat(catalog.listTables(SESSION, Optional.of(namespace))).contains(schemaTableName); + assertThat(catalog.listViews(SESSION, Optional.of(namespace))).contains(schemaTableName); + + Map views = catalog.getViews(SESSION, Optional.of(schemaTableName.getSchemaName())); + assertEquals(views.size(), 1); + assertViewDefinition(views.get(schemaTableName), viewDefinition); + assertViewDefinition(catalog.getView(SESSION, schemaTableName).orElseThrow(), viewDefinition); + + catalog.renameView(SESSION, schemaTableName, renamedSchemaTableName); + assertThat(catalog.listTables(SESSION, Optional.of(namespace))).doesNotContain(schemaTableName); + assertThat(catalog.listViews(SESSION, Optional.of(namespace))).doesNotContain(schemaTableName); + views = catalog.getViews(SESSION, Optional.of(schemaTableName.getSchemaName())); + assertEquals(views.size(), 1); + assertViewDefinition(views.get(renamedSchemaTableName), viewDefinition); + assertViewDefinition(catalog.getView(SESSION, renamedSchemaTableName).orElseThrow(), viewDefinition); + assertThat(catalog.getView(SESSION, schemaTableName)).isEmpty(); + + catalog.dropView(SESSION, renamedSchemaTableName); + assertThat(catalog.listTables(SESSION, Optional.of(namespace))) + .doesNotContain(renamedSchemaTableName); + } + finally { + try { + catalog.dropNamespace(SESSION, namespace); + } + catch (Exception e) { + LOG.warn("Failed to clean up namespace: " + namespace); + } + } + } + + private void assertViewDefinition(ConnectorViewDefinition actualView, ConnectorViewDefinition expectedView) + { + assertEquals(actualView.getOriginalSql(), expectedView.getOriginalSql()); + assertEquals(actualView.getCatalog(), expectedView.getCatalog()); + assertEquals(actualView.getSchema(), expectedView.getSchema()); + assertEquals(actualView.getColumns().size(), expectedView.getColumns().size()); + for (int i = 0; i < actualView.getColumns().size(); i++) { + assertViewColumnDefinition(actualView.getColumns().get(i), expectedView.getColumns().get(i)); + } + assertEquals(actualView.getOwner(), expectedView.getOwner()); + assertEquals(actualView.isRunAsInvoker(), expectedView.isRunAsInvoker()); + } + + private void assertViewColumnDefinition(ConnectorViewDefinition.ViewColumn actualViewColumn, ConnectorViewDefinition.ViewColumn expectedViewColumn) + { + assertEquals(actualViewColumn.getName(), expectedViewColumn.getName()); + assertEquals(actualViewColumn.getType(), expectedViewColumn.getType()); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogAccessOperations.java index 5b5ff20a481c..28a91386224a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogAccessOperations.java @@ -159,7 +159,7 @@ public void testSelect() assertGlueMetastoreApiInvocations("SELECT * FROM test_select_from", ImmutableMultiset.builder() - .add(GET_TABLE) + .addCopies(GET_TABLE, 2) .build()); } finally { @@ -175,7 +175,7 @@ public void testSelectWithFilter() assertGlueMetastoreApiInvocations("SELECT * FROM test_select_from_where WHERE age = 2", ImmutableMultiset.builder() - .add(GET_TABLE) + .addCopies(GET_TABLE, 2) .build()); } finally { @@ -183,7 +183,7 @@ public void testSelectWithFilter() } } - @Test(enabled = false) + @Test public void testSelectFromView() { try { @@ -192,7 +192,7 @@ public void testSelectFromView() assertGlueMetastoreApiInvocations("SELECT * FROM test_select_view_view", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 2) + .addCopies(GET_TABLE, 3) .build()); } finally { @@ -201,7 +201,7 @@ public void testSelectFromView() } } - @Test(enabled = false) + @Test public void testSelectFromViewWithFilter() { try { @@ -210,12 +210,12 @@ public void testSelectFromViewWithFilter() assertGlueMetastoreApiInvocations("SELECT * FROM test_select_view_where_view WHERE age = 2", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 2) + .addCopies(GET_TABLE, 3) .build()); } finally { + getQueryRunner().execute("DROP TABLE IF EXISTS test_select_view_where_table"); getQueryRunner().execute("DROP VIEW IF EXISTS test_select_view_where_view"); - getQueryRunner().execute("DROP TABLE IF EXISTS test_select_view_where_view"); } } @@ -282,7 +282,7 @@ public void testJoin() assertGlueMetastoreApiInvocations("SELECT name, age FROM test_join_t1 JOIN test_join_t2 ON test_join_t2.id = test_join_t1.id", ImmutableMultiset.builder() - .addCopies(GET_TABLE, 2) + .addCopies(GET_TABLE, 4) .build()); } finally { @@ -299,7 +299,7 @@ public void testSelfJoin() assertGlueMetastoreApiInvocations("SELECT child.age, parent.age FROM test_self_join_table child JOIN test_self_join_table parent ON child.parent = parent.id", ImmutableMultiset.builder() - .add(GET_TABLE) + .addCopies(GET_TABLE, 3) .build()); } finally { @@ -315,7 +315,7 @@ public void testExplainSelect() assertGlueMetastoreApiInvocations("EXPLAIN SELECT * FROM test_explain", ImmutableMultiset.builder() - .add(GET_TABLE) + .addCopies(GET_TABLE, 2) .build()); } finally { @@ -331,7 +331,7 @@ public void testShowStatsForTable() assertGlueMetastoreApiInvocations("SHOW STATS FOR test_show_stats", ImmutableMultiset.builder() - .add(GET_TABLE) + .addCopies(GET_TABLE, 2) .build()); } finally { @@ -347,7 +347,7 @@ public void testShowStatsForTableWithFilter() assertGlueMetastoreApiInvocations("SHOW STATS FOR (SELECT * FROM test_show_stats_with_filter where age >= 2)", ImmutableMultiset.builder() - .add(GET_TABLE) + .addCopies(GET_TABLE, 2) .build()); } finally { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogConnectorSmokeTest.java index 49d4cdd0204a..e95114e6942b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogConnectorSmokeTest.java @@ -121,14 +121,6 @@ public void testShowCreateTable() schemaPath())); } - @Test - @Override - public void testView() - { - assertThatThrownBy(super::testView) - .hasStackTraceContaining("createView is not supported for Iceberg Glue catalogs"); - } - @Test @Override public void testMaterializedView() diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTrinoGlueCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTrinoGlueCatalogTest.java index 9bc4c4f4ad31..d13fb6146602 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTrinoGlueCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestTrinoGlueCatalogTest.java @@ -61,6 +61,7 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations) return new TrinoGlueCatalog( hdfsEnvironment, new GlueIcebergTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment), new GlueMetastoreStats(), new GlueHiveMetastoreConfig(), DefaultAWSCredentialsProviderChain.getInstance()), + "test", AWSGlueAsyncClientBuilder.defaultClient(), new GlueMetastoreStats(), Optional.empty(), @@ -84,6 +85,7 @@ public void testDefaultLocation() TrinoCatalog catalogWithDefaultLocation = new TrinoGlueCatalog( hdfsEnvironment, new GlueIcebergTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment), new GlueMetastoreStats(), new GlueHiveMetastoreConfig(), DefaultAWSCredentialsProviderChain.getInstance()), + "test", AWSGlueAsyncClientBuilder.defaultClient(), new GlueMetastoreStats(), Optional.of(tmpDirectory.toAbsolutePath().toString()),