diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java index c74ed116f6e3..dac2dfa51618 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueHiveMetastore.java @@ -201,7 +201,7 @@ public GlueHiveMetastore( this.columnStatisticsProvider = columnStatisticsProviderFactory.createGlueColumnStatisticsProvider(glueClient, stats); } - private static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional requestHandler, RequestMetricCollector metricsCollector) + public static AWSGlueAsync createAsyncGlueClient(GlueHiveMetastoreConfig config, Optional requestHandler, RequestMetricCollector metricsCollector) { ClientConfiguration clientConfig = new ClientConfiguration() .withMaxConnections(config.getMaxGlueConnections()) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreStats.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreStats.java index 2a505cafa4c8..4ef90f42df8d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreStats.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/glue/GlueMetastoreStats.java @@ -38,6 +38,8 @@ public class GlueMetastoreStats private final GlueMetastoreApiStats createTable = new GlueMetastoreApiStats(); private final GlueMetastoreApiStats dropTable = new GlueMetastoreApiStats(); private final GlueMetastoreApiStats replaceTable = new GlueMetastoreApiStats(); + private final GlueMetastoreApiStats updateTable = new GlueMetastoreApiStats(); + private final GlueMetastoreApiStats renameTable = new GlueMetastoreApiStats(); private final GlueMetastoreApiStats getPartitionNames = new GlueMetastoreApiStats(); private final GlueMetastoreApiStats getPartitions = new GlueMetastoreApiStats(); private final GlueMetastoreApiStats getPartition = new GlueMetastoreApiStats(); @@ -140,6 +142,20 @@ public GlueMetastoreApiStats getReplaceTable() return replaceTable; } + @Managed + @Nested + public GlueMetastoreApiStats getUpdateTable() + { + return updateTable; + } + + @Managed + @Nested + public GlueMetastoreApiStats getRenameTable() + { + return renameTable; + } + @Managed @Nested public GlueMetastoreApiStats getGetPartitionNames() diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 597288ef30c4..f27682ca266f 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -59,6 +59,11 @@ bootstrap + + io.airlift + concurrent + + io.airlift configuration @@ -84,6 +89,16 @@ units + + com.amazonaws + aws-java-sdk-core + + + + com.amazonaws + aws-java-sdk-glue + + com.fasterxml.jackson.core jackson-core @@ -301,4 +316,43 @@ + + + + default + + true + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/TestIcebergGlueCatalogConnectorSmokeTest.java + + + + + + + + + test-iceberg-glue + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/TestIcebergGlueCatalogConnectorSmokeTest.java + + + + + + + diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index f07b12be2618..c9319e9dcd87 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -22,6 +22,8 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import java.util.Optional; + import static io.trino.plugin.hive.HiveCompressionCodec.ZSTD; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; @@ -38,6 +40,7 @@ public class IcebergConfig private Duration dynamicFilteringWaitTimeout = new Duration(0, SECONDS); private boolean tableStatisticsEnabled = true; private boolean projectionPushdownEnabled = true; + private Optional defaultSchemaLocation = Optional.empty(); public CatalogType getCatalogType() { @@ -167,4 +170,18 @@ public IcebergConfig setProjectionPushdownEnabled(boolean projectionPushdownEnab this.projectionPushdownEnabled = projectionPushdownEnabled; return this; } + + @NotNull + public Optional getDefaultSchemaLocation() + { + return defaultSchemaLocation; + } + + @Config("iceberg.default-schema-location") + @ConfigDescription("The default base location to create a new schema") + public IcebergConfig setDefaultSchemaLocation(String defaultSchemaLocation) + { + this.defaultSchemaLocation = Optional.ofNullable(defaultSchemaLocation); + return this; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java index 46dcf0acc977..f32fddcfd2fb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergErrorCode.java @@ -36,6 +36,8 @@ public enum IcebergErrorCode ICEBERG_CURSOR_ERROR(9, EXTERNAL), ICEBERG_WRITE_VALIDATION_FAILED(10, INTERNAL_ERROR), ICEBERG_INVALID_SNAPSHOT_ID(11, USER_ERROR), + ICEBERG_CATALOG_ERROR(12, EXTERNAL), + ICEBERG_COMMIT_ERROR(13, EXTERNAL) /**/; private final ErrorCode errorCode; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index fb4cae303f23..0a1f5b0b157d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -26,6 +26,7 @@ import io.trino.plugin.hive.HiveApplyProjectionUtil; import io.trino.plugin.hive.HiveApplyProjectionUtil.ProjectedColumnRepresentation; import io.trino.plugin.hive.HiveWrittenPartitions; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.spi.TrinoException; import io.trino.spi.connector.Assignment; import io.trino.spi.connector.CatalogSchemaName; @@ -118,9 +119,9 @@ import static io.trino.plugin.iceberg.PartitionFields.parsePartitionFields; import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields; import static io.trino.plugin.iceberg.TableType.DATA; -import static io.trino.plugin.iceberg.TrinoHiveCatalog.DEPENDS_ON_TABLES; import static io.trino.plugin.iceberg.TypeConverter.toIcebergType; import static io.trino.plugin.iceberg.TypeConverter.toTrinoType; +import static io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog.DEPENDS_ON_TABLES; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.BigintType.BIGINT; import static java.util.Collections.singletonList; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java index 47d99fc2ace5..79f129029e23 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import io.airlift.json.JsonCodec; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.spi.type.TypeManager; import javax.inject.Inject; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index c526c962912e..c3aec40b59ce 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -63,7 +63,6 @@ public void configure(Binder binder) configBinder(binder).bindConfig(ParquetReaderConfig.class); configBinder(binder).bindConfig(ParquetWriterConfig.class); - binder.bind(TrinoCatalogFactory.class).in(Scopes.SINGLETON); binder.bind(IcebergMetadataFactory.class).in(Scopes.SINGLETON); jsonCodecBinder(binder).bindJsonCodec(CommitTaskData.class); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index a14b58daea07..4f705468320d 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -18,9 +18,9 @@ import io.airlift.slice.Slice; import io.airlift.slice.SliceUtf8; import io.airlift.slice.Slices; -import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorSession; @@ -105,7 +105,10 @@ import static org.apache.iceberg.LocationProviders.locationsFor; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; +import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; +import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; import static org.apache.iceberg.types.Type.TypeID.BINARY; import static org.apache.iceberg.types.Type.TypeID.FIXED; @@ -120,10 +123,10 @@ public static boolean isIcebergTable(io.trino.plugin.hive.metastore.Table table) return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP)); } - public static Table loadIcebergTable(HiveMetastore metastore, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table) + public static Table loadIcebergTable(TrinoCatalog catalog, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table) { TableOperations operations = tableOperationsProvider.createTableOperations( - metastore, + catalog, session, table.getSchemaName(), table.getTableName(), @@ -133,14 +136,14 @@ public static Table loadIcebergTable(HiveMetastore metastore, IcebergTableOperat } public static Table getIcebergTableWithMetadata( - HiveMetastore metastore, + TrinoCatalog catalog, IcebergTableOperationsProvider tableOperationsProvider, ConnectorSession session, SchemaTableName table, TableMetadata tableMetadata) { IcebergTableOperations operations = tableOperationsProvider.createTableOperations( - metastore, + catalog, session, table.getSchemaName(), table.getTableName(), @@ -229,7 +232,7 @@ public static Optional getTableComment(Table table) return Optional.ofNullable(table.properties().get(TABLE_COMMENT)); } - private static String quotedTableName(SchemaTableName name) + public static String quotedTableName(SchemaTableName name) { return quotedName(name.getSchemaName()) + "." + quotedName(name.getTableName()); } @@ -397,4 +400,14 @@ public static Transaction newCreateTableTransaction(TrinoCatalog catalog, Connec return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, targetPath, propertiesBuilder.build()); } + + public static void validateTableCanBeDropped(Table table, SchemaTableName schemaTableName) + { + // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 + if (table.properties().containsKey(OBJECT_STORE_PATH) || + table.properties().containsKey(WRITE_NEW_DATA_LOCATION) || + table.properties().containsKey(WRITE_METADATA_LOCATION)) { + throw new TrinoException(NOT_SUPPORTED, "Table " + schemaTableName + " contains Iceberg path override properties and cannot be dropped from Trino"); + } + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java index d79bafeb537d..bf8459a91fc8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/InternalIcebergConnectorFactory.java @@ -37,6 +37,7 @@ import io.trino.plugin.hive.gcs.HiveGcsModule; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.s3.HiveS3Module; +import io.trino.plugin.iceberg.catalog.IcebergCatalogModule; import io.trino.spi.NodeManager; import io.trino.spi.PageIndexerFactory; import io.trino.spi.classloader.ThreadContextClassLoader; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java index d649362b7196..55c02b245d43 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/RollbackToSnapshotProcedure.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableList; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.procedure.Procedure; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java similarity index 71% rename from plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java rename to plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index 0751124ea550..aa0b1e1e6746 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -14,19 +14,10 @@ package io.trino.plugin.iceberg.catalog; import io.airlift.log.Logger; -import io.trino.plugin.hive.authentication.HiveIdentity; import io.trino.plugin.hive.metastore.Column; -import io.trino.plugin.hive.metastore.HiveMetastore; -import io.trino.plugin.hive.metastore.MetastoreUtil; -import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.StorageFormat; -import io.trino.plugin.hive.metastore.Table; -import io.trino.plugin.iceberg.UnknownTableTypeException; -import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; -import io.trino.spi.connector.TableNotFoundException; -import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; @@ -50,30 +41,22 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.hive.HiveType.toHiveType; -import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView; -import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; -import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; -import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; -import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable; import static java.lang.Integer.parseInt; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; -import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; -import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.TableMetadataParser.getFileExtension; import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION; import static org.apache.iceberg.TableProperties.METADATA_COMPRESSION_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; @NotThreadSafe -public abstract class AbstractMetastoreTableOperations +public abstract class AbstractIcebergTableOperations implements IcebergTableOperations { - private static final Logger log = Logger.get(AbstractMetastoreTableOperations.class); + private static final Logger log = Logger.get(AbstractIcebergTableOperations.class); public static final String METADATA_LOCATION = "metadata_location"; public static final String PREVIOUS_METADATA_LOCATION = "previous_metadata_location"; @@ -84,7 +67,6 @@ public abstract class AbstractMetastoreTableOperations FileInputFormat.class.getName(), FileOutputFormat.class.getName()); - protected final HiveMetastore metastore; protected final ConnectorSession session; protected final String database; protected final String tableName; @@ -97,9 +79,8 @@ public abstract class AbstractMetastoreTableOperations protected boolean shouldRefresh = true; protected int version = -1; - protected AbstractMetastoreTableOperations( + protected AbstractIcebergTableOperations( FileIO fileIo, - HiveMetastore metastore, ConnectorSession session, String database, String table, @@ -107,7 +88,6 @@ protected AbstractMetastoreTableOperations( Optional location) { this.fileIo = requireNonNull(fileIo, "fileIo is null"); - this.metastore = requireNonNull(metastore, "metastore is null"); this.session = requireNonNull(session, "session is null"); this.database = requireNonNull(database, "database is null"); this.tableName = requireNonNull(table, "table is null"); @@ -141,24 +121,7 @@ public TableMetadata refresh() refreshFromMetadataLocation(null); return currentMetadata; } - - Table table = getTable(); - - if (isPrestoView(table) && isHiveOrPrestoView(table)) { - // this is a Hive view, hence not a table - throw new TableNotFoundException(getSchemaTableName()); - } - if (!isIcebergTable(table)) { - throw new UnknownTableTypeException(getSchemaTableName()); - } - - String metadataLocation = table.getParameters().get(METADATA_LOCATION); - if (metadataLocation == null) { - throw new TrinoException(ICEBERG_INVALID_METADATA, format("Table is missing [%s] property: %s", METADATA_LOCATION, getSchemaTableName())); - } - - refreshFromMetadataLocation(metadataLocation); - + refreshFromMetadataLocation(getRefreshedLocation()); return currentMetadata; } @@ -187,43 +150,9 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata) shouldRefresh = true; } - protected void commitNewTable(TableMetadata metadata) - { - String newMetadataLocation = writeNewMetadata(metadata, version + 1); - - Table table; - try { - Table.Builder builder = Table.builder() - .setDatabaseName(database) - .setTableName(tableName) - .setOwner(owner) - .setTableType(TableType.EXTERNAL_TABLE.name()) - .setDataColumns(toHiveColumns(metadata.schema().columns())) - .withStorage(storage -> storage.setLocation(metadata.location())) - .withStorage(storage -> storage.setStorageFormat(STORAGE_FORMAT)) - .setParameter("EXTERNAL", "TRUE") - .setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE) - .setParameter(METADATA_LOCATION, newMetadataLocation); - String tableComment = metadata.properties().get(TABLE_COMMENT); - if (tableComment != null) { - builder.setParameter(TABLE_COMMENT, tableComment); - } - table = builder.build(); - } - catch (RuntimeException e) { - try { - io().deleteFile(newMetadataLocation); - } - catch (RuntimeException ex) { - e.addSuppressed(ex); - } - throw e; - } + protected abstract String getRefreshedLocation(); - PrincipalPrivileges privileges = owner.map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES); - HiveIdentity identity = new HiveIdentity(session); - metastore.createTable(identity, table, privileges); - } + protected abstract void commitNewTable(TableMetadata metadata); protected abstract void commitToExistingTable(TableMetadata base, TableMetadata metadata); @@ -258,12 +187,6 @@ public LocationProvider locationProvider() return getLocationProvider(getSchemaTableName(), metadata.location(), metadata.properties()); } - protected Table getTable() - { - return metastore.getTable(new HiveIdentity(session), database, tableName) - .orElseThrow(() -> new TableNotFoundException(getSchemaTableName())); - } - protected SchemaTableName getSchemaTableName() { return new SchemaTableName(database, tableName); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java similarity index 71% rename from plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergCatalogModule.java rename to plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java index 85c7f093a8a6..13952dfc952f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java @@ -11,26 +11,41 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.iceberg; + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Scopes; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.plugin.hive.metastore.HiveMetastore; -import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastoreModule; import io.trino.plugin.hive.metastore.cache.ForCachingHiveMetastore; -import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.CatalogType; +import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; import io.trino.plugin.iceberg.catalog.file.IcebergFileMetastoreCatalogModule; +import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule; import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule; - -import javax.inject.Inject; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory; import java.util.Optional; import static io.airlift.configuration.ConditionalModule.conditionalModule; +import static io.trino.plugin.iceberg.CatalogType.GLUE; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE; import static java.util.Objects.requireNonNull; @@ -52,24 +67,12 @@ protected void setup(Binder binder) binder.bind(HiveMetastore.class).annotatedWith(ForCachingHiveMetastore.class).toInstance(metastore.get()); install(new CachingHiveMetastoreModule()); binder.bind(IcebergTableOperationsProvider.class).to(FileMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); + binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); } else { bindCatalogModule(HIVE_METASTORE, new IcebergHiveMetastoreCatalogModule()); bindCatalogModule(TESTING_FILE_METASTORE, new IcebergFileMetastoreCatalogModule()); - // TODO add support for Glue metastore - } - - binder.bind(MetastoreValidator.class).asEagerSingleton(); - } - - public static class MetastoreValidator - { - @Inject - public MetastoreValidator(HiveMetastore metastore) - { - if (metastore instanceof CachingHiveMetastore) { - throw new RuntimeException("Hive metastore caching must not be enabled for Iceberg"); - } + bindCatalogModule(GLUE, new IcebergGlueCatalogModule()); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java index c4e970cab61b..fdf3c6b64c94 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergTableOperationsProvider.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.iceberg.catalog; -import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.spi.connector.ConnectorSession; import java.util.Optional; @@ -21,7 +20,7 @@ public interface IcebergTableOperationsProvider { IcebergTableOperations createTableOperations( - HiveMetastore hiveMetastore, + TrinoCatalog catalog, ConnectorSession session, String database, String table, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java similarity index 88% rename from plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalog.java rename to plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index f3364e3912fd..f91d2b9ac56c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -11,8 +11,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.iceberg; +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog; + +import io.trino.plugin.iceberg.UnknownTableTypeException; import io.trino.spi.connector.ConnectorMaterializedViewDefinition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorViewDefinition; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalogFactory.java new file mode 100644 index 000000000000..1b4760228347 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalogFactory.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog; + +public interface TrinoCatalogFactory +{ + TrinoCatalog create(); +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java index 7098cd1ba976..2335258c8919 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java @@ -17,7 +17,7 @@ import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.Table; -import io.trino.plugin.iceberg.catalog.AbstractMetastoreTableOperations; +import io.trino.plugin.iceberg.catalog.hms.AbstractMetastoreTableOperations; import io.trino.spi.connector.ConnectorSession; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.exceptions.CommitFailedException; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java index 7fafc4a58c95..6955c40c6aa5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperationsProvider.java @@ -18,6 +18,8 @@ import io.trino.plugin.iceberg.FileIoProvider; import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.spi.connector.ConnectorSession; import javax.inject.Inject; @@ -32,14 +34,14 @@ public class FileMetastoreTableOperationsProvider private final FileIoProvider fileIoProvider; @Inject - public FileMetastoreTableOperationsProvider(FileIoProvider fileIoProvider) + public FileMetastoreTableOperationsProvider(HiveMetastore hiveMetastore, FileIoProvider fileIoProvider) { this.fileIoProvider = requireNonNull(fileIoProvider, "fileIoProvider is null"); } @Override public IcebergTableOperations createTableOperations( - HiveMetastore hiveMetastore, + TrinoCatalog catalog, ConnectorSession session, String database, String table, @@ -48,7 +50,7 @@ public IcebergTableOperations createTableOperations( { return new FileMetastoreTableOperations( fileIoProvider.createFileIo(new HdfsContext(session), session.getQueryId()), - hiveMetastore, + ((TrinoHiveCatalog) catalog).getMetastore(), session, database, table, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java index 64cb309ea05e..aba9e1fd6e85 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/IcebergFileMetastoreCatalogModule.java @@ -18,6 +18,9 @@ import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.plugin.hive.metastore.file.FileMetastoreModule; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.catalog.hms.MetastoreValidator; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory; public class IcebergFileMetastoreCatalogModule extends AbstractConfigurationAwareModule @@ -27,5 +30,7 @@ protected void setup(Binder binder) { install(new FileMetastoreModule()); binder.bind(IcebergTableOperationsProvider.class).to(FileMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); + binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(MetastoreValidator.class).asEagerSingleton(); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperations.java new file mode 100644 index 000000000000..68d6523154e8 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperations.java @@ -0,0 +1,236 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.model.AlreadyExistsException; +import com.amazonaws.services.glue.model.ConcurrentModificationException; +import com.amazonaws.services.glue.model.CreateTableRequest; +import com.amazonaws.services.glue.model.EntityNotFoundException; +import com.amazonaws.services.glue.model.GetTableRequest; +import com.amazonaws.services.glue.model.Table; +import com.amazonaws.services.glue.model.TableInput; +import com.amazonaws.services.glue.model.UpdateTableRequest; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.plugin.hive.TableAlreadyExistsException; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.UnknownTableTypeException; +import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.TableNotFoundException; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.io.FileIO; + +import java.util.Map; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkState; +import static io.trino.plugin.hive.ViewReaderUtil.PRESTO_VIEW_FLAG; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static java.lang.String.format; +import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE; +import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; + +public class GlueTableOperations + extends AbstractIcebergTableOperations +{ + private static final Logger log = Logger.get(GlueTableOperations.class); + + private final AWSGlueAsync glueClient; + private final GlueMetastoreStats stats; + private final String catalogId; + + protected GlueTableOperations( + AWSGlueAsync glueClient, + GlueMetastoreStats stats, + String catalogId, + FileIO fileIo, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + super(fileIo, session, database, table, owner, location); + this.glueClient = requireNonNull(glueClient, "glueClient is null"); + this.stats = requireNonNull(stats, "stats is null"); + this.catalogId = catalogId; + } + + @Override + protected String getRefreshedLocation() + { + return stats.getGetTable().call(() -> { + Table table = getTable(); + + if (isPrestoView(table) && isHiveOrPrestoView(table)) { + // this is a Presto Hive view, hence not a table + throw new TableNotFoundException(getSchemaTableName()); + } + if (!isIcebergTable(table)) { + throw new UnknownTableTypeException(getSchemaTableName()); + } + + String metadataLocation = table.getParameters().get(METADATA_LOCATION); + if (metadataLocation == null) { + throw new TrinoException(ICEBERG_INVALID_METADATA, format("Table is missing [%s] property: %s", METADATA_LOCATION, getSchemaTableName())); + } + return metadataLocation; + }); + } + + @Override + protected void commitNewTable(TableMetadata metadata) + { + String newMetadataLocation = writeNewMetadata(metadata, version + 1); + Map parameters = ImmutableMap.of( + TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(ENGLISH), + METADATA_LOCATION, newMetadataLocation); + TableInput tableInput = new TableInput() + .withName(tableName) + .withTableType(EXTERNAL_TABLE.name()) + .withOwner(owner.orElse(null)) + .withParameters(parameters); + + boolean succeeded = false; + try { + stats.getCreateTable().call(() -> { + glueClient.createTable(new CreateTableRequest() + .withCatalogId(catalogId) + .withDatabaseName(database) + .withTableInput(tableInput)); + return null; + }); + succeeded = true; + } + catch (ConcurrentModificationException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit %s because Glue detected concurrent update", getSchemaTableName()), e); + } + catch (AlreadyExistsException e) { + throw new TableAlreadyExistsException(getSchemaTableName()); + } + catch (RuntimeException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit %s due to unexpected exception", getSchemaTableName()), e); + } + finally { + cleanupMetadataLocation(!succeeded, newMetadataLocation); + } + + shouldRefresh = true; + } + + @Override + protected void commitToExistingTable(TableMetadata base, TableMetadata metadata) + { + String newMetadataLocation = writeNewMetadata(metadata, version + 1); + Map parameters = ImmutableMap.of( + TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(ENGLISH), + METADATA_LOCATION, newMetadataLocation, + PREVIOUS_METADATA_LOCATION, currentMetadataLocation); + TableInput tableInput = new TableInput() + .withName(tableName) + .withTableType(EXTERNAL_TABLE.name()) + .withOwner(owner.orElse(null)) + .withParameters(parameters); + + boolean succeeded = false; + try { + Table table = getTable(); + + checkState(currentMetadataLocation != null, "No current metadata location for existing table"); + String metadataLocation = table.getParameters().get(METADATA_LOCATION); + if (!currentMetadataLocation.equals(metadataLocation)) { + throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s", + currentMetadataLocation, metadataLocation, getSchemaTableName()); + } + + TableInput tableInputToUpdate = tableInput + .withDescription(table.getDescription()) + .withTargetTable(table.getTargetTable()) + .withLastAccessTime(table.getLastAccessTime()) + .withLastAnalyzedTime(table.getLastAnalyzedTime()) + .withPartitionKeys(table.getPartitionKeys()) + .withRetention(table.getRetention()) + .withStorageDescriptor(table.getStorageDescriptor()) + .withViewExpandedText(table.getViewExpandedText()) + .withViewOriginalText(table.getViewOriginalText()); + + stats.getUpdateTable().call(() -> { + glueClient.updateTable(new UpdateTableRequest() + .withCatalogId(catalogId) + .withDatabaseName(database) + .withTableInput(tableInputToUpdate)); + return null; + }); + succeeded = true; + } + catch (ConcurrentModificationException | CommitFailedException e) { + throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit %s because of concurrent update", getSchemaTableName()), e); + } + finally { + cleanupMetadataLocation(!succeeded, newMetadataLocation); + } + + shouldRefresh = true; + } + + private boolean isPrestoView(Table table) + { + return "true".equals(table.getParameters().get(PRESTO_VIEW_FLAG)); + } + + private boolean isHiveOrPrestoView(Table table) + { + return table.getTableType().equals(TableType.VIRTUAL_VIEW.name()); + } + + private boolean isIcebergTable(Table table) + { + return ICEBERG_TABLE_TYPE_VALUE.equalsIgnoreCase(table.getParameters().get(TABLE_TYPE_PROP)); + } + + private Table getTable() + { + try { + return glueClient.getTable(new GetTableRequest() + .withCatalogId(catalogId) + .withDatabaseName(database) + .withName(tableName)).getTable(); + } + catch (EntityNotFoundException e) { + throw new TableNotFoundException(getSchemaTableName()); + } + } + + private void cleanupMetadataLocation(boolean shouldCleanup, String metadataLocation) + { + if (shouldCleanup) { + try { + io().deleteFile(metadataLocation); + } + catch (RuntimeException ex) { + log.error(ex, "Fail to cleanup metadata file at " + metadataLocation); + throw ex; + } + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperationsProvider.java new file mode 100644 index 000000000000..4e2854aa437d --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueTableOperationsProvider.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; +import io.trino.plugin.iceberg.FileIoProvider; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.spi.connector.ConnectorSession; + +import javax.inject.Inject; + +import java.util.Optional; + +public class GlueTableOperationsProvider + implements IcebergTableOperationsProvider +{ + private final FileIoProvider fileIoProvider; + + @Inject + public GlueTableOperationsProvider(FileIoProvider fileIoProvider) + { + this.fileIoProvider = fileIoProvider; + } + + @Override + public IcebergTableOperations createTableOperations( + TrinoCatalog catalog, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + TrinoGlueCatalog glueCatalog = (TrinoGlueCatalog) catalog; + return new GlueTableOperations( + glueCatalog.getGlueClient(), + glueCatalog.getStats(), + glueCatalog.getCatalogId(), + fileIoProvider.createFileIo(new HdfsContext(session), session.getQueryId()), + session, + database, + table, + owner, + location); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java new file mode 100644 index 000000000000..483d9b59c1e8 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/IcebergGlueCatalogModule.java @@ -0,0 +1,102 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import io.airlift.concurrent.BoundedExecutor; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.metastore.glue.DefaultGlueColumnStatisticsProviderFactory; +import io.trino.plugin.hive.metastore.glue.DisabledGlueColumnStatisticsProviderFactory; +import io.trino.plugin.hive.metastore.glue.ForGlueColumnStatisticsRead; +import io.trino.plugin.hive.metastore.glue.ForGlueColumnStatisticsWrite; +import io.trino.plugin.hive.metastore.glue.ForGlueHiveMetastore; +import io.trino.plugin.hive.metastore.glue.GlueColumnStatisticsProviderFactory; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; + +import java.util.concurrent.Executor; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.airlift.configuration.ConditionalModule.conditionalModule; +import static io.airlift.configuration.ConfigBinder.configBinder; +import static java.util.concurrent.Executors.newCachedThreadPool; + +public class IcebergGlueCatalogModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(GlueHiveMetastoreConfig.class); + configBinder(binder).bindConfig(HiveConfig.class); + binder.bind(IcebergTableOperationsProvider.class).to(GlueTableOperationsProvider.class).in(Scopes.SINGLETON); + install(conditionalModule( + HiveConfig.class, + HiveConfig::isTableStatisticsEnabled, + getGlueStatisticsModule(DefaultGlueColumnStatisticsProviderFactory.class), + getGlueStatisticsModule(DisabledGlueColumnStatisticsProviderFactory.class))); + binder.bind(TrinoCatalogFactory.class).to(TrinoGlueCatalogFactory.class).in(Scopes.SINGLETON); + } + + private Module getGlueStatisticsModule(Class statisticsPrividerFactoryClass) + { + return internalBinder -> newOptionalBinder(internalBinder, GlueColumnStatisticsProviderFactory.class) + .setDefault() + .to(statisticsPrividerFactoryClass) + .in(Scopes.SINGLETON); + } + + @Provides + @Singleton + @ForGlueHiveMetastore + public Executor createExecutor(CatalogName catalogName, GlueHiveMetastoreConfig hiveConfig) + { + return createExecutor("hive-glue-partitions-%s", hiveConfig.getGetPartitionThreads()); + } + + @Provides + @Singleton + @ForGlueColumnStatisticsRead + public Executor createStatisticsReadExecutor(CatalogName catalogName, GlueHiveMetastoreConfig hiveConfig) + { + return createExecutor("hive-glue-statistics-read-%s", hiveConfig.getReadStatisticsThreads()); + } + + @Provides + @Singleton + @ForGlueColumnStatisticsWrite + public Executor createStatisticsWriteExecutor(CatalogName catalogName, GlueHiveMetastoreConfig hiveConfig) + { + return createExecutor("hive-glue-statistics-write-%s", hiveConfig.getWriteStatisticsThreads()); + } + + private Executor createExecutor(String nameTemplate, int threads) + { + if (threads == 1) { + return directExecutor(); + } + return new BoundedExecutor( + newCachedThreadPool(daemonThreadsNamed(nameTemplate)), + threads); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java new file mode 100644 index 000000000000..c2ab24a07466 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -0,0 +1,500 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.model.AlreadyExistsException; +import com.amazonaws.services.glue.model.CreateDatabaseRequest; +import com.amazonaws.services.glue.model.CreateTableRequest; +import com.amazonaws.services.glue.model.DatabaseInput; +import com.amazonaws.services.glue.model.DeleteDatabaseRequest; +import com.amazonaws.services.glue.model.DeleteTableRequest; +import com.amazonaws.services.glue.model.EntityNotFoundException; +import com.amazonaws.services.glue.model.GetDatabaseRequest; +import com.amazonaws.services.glue.model.GetDatabasesRequest; +import com.amazonaws.services.glue.model.GetDatabasesResult; +import com.amazonaws.services.glue.model.GetTableRequest; +import com.amazonaws.services.glue.model.GetTablesRequest; +import com.amazonaws.services.glue.model.GetTablesResult; +import com.amazonaws.services.glue.model.TableInput; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.airlift.log.Logger; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.SchemaAlreadyExistsException; +import io.trino.plugin.hive.TableAlreadyExistsException; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.catalog.IcebergTableOperations; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorMaterializedViewDefinition; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorViewDefinition; +import io.trino.spi.connector.SchemaNotFoundException; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.security.TrinoPrincipal; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.Transaction; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; +import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; +import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; +import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName; +import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; +import static java.util.stream.Collectors.toMap; +import static org.apache.iceberg.TableMetadata.newTableMetadata; +import static org.apache.iceberg.Transactions.createTableTransaction; + +public class TrinoGlueCatalog + implements TrinoCatalog +{ + private static final Logger log = Logger.get(TrinoGlueCatalog.class); + + private final HdfsEnvironment hdfsEnvironment; + private final IcebergTableOperationsProvider tableOperationsProvider; + private final Optional defaultSchemaLocation; + private final boolean isUniqueTableLocation; + private final AWSGlueAsync glueClient; + private final String catalogId; + private final GlueMetastoreStats stats; + + private final Map tableMetadataCache = new ConcurrentHashMap<>(); + + public TrinoGlueCatalog( + HdfsEnvironment hdfsEnvironment, + IcebergTableOperationsProvider tableOperationsProvider, + AWSGlueAsync glueClient, + GlueMetastoreStats stats, + String catalogId, + Optional defaultSchemaLocation, + boolean isUniqueTableLocation) + { + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + this.glueClient = requireNonNull(glueClient, "glueClient is null"); + this.stats = requireNonNull(stats, "stats is null"); + this.catalogId = catalogId; // null is a valid catalogId, meaning the current account + this.defaultSchemaLocation = requireNonNull(defaultSchemaLocation, "defaultSchemaLocation is null"); + this.isUniqueTableLocation = isUniqueTableLocation; + } + + public GlueMetastoreStats getStats() + { + return stats; + } + + public String getCatalogId() + { + return catalogId; + } + + public AWSGlueAsync getGlueClient() + { + return glueClient; + } + + @Override + public List listNamespaces(ConnectorSession session) + { + try { + return stats.getGetAllDatabases().call(() -> { + List namespaces = new ArrayList<>(); + String nextToken = null; + + do { + GetDatabasesResult result = glueClient.getDatabases(new GetDatabasesRequest().withCatalogId(catalogId).withNextToken(nextToken)); + nextToken = result.getNextToken(); + result.getDatabaseList().forEach(database -> namespaces.add(database.getName())); + } + while (nextToken != null); + + return namespaces; + }); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public boolean dropNamespace(ConnectorSession session, String namespace) + { + try { + stats.getDropDatabase().call(() -> glueClient.deleteDatabase(new DeleteDatabaseRequest() + .withCatalogId(catalogId).withName(namespace))); + return true; + } + catch (EntityNotFoundException e) { + throw new SchemaNotFoundException(namespace); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public Map loadNamespaceMetadata(ConnectorSession session, String namespace) + { + try { + return stats.getGetDatabase().call(() -> + glueClient.getDatabase(new GetDatabaseRequest().withCatalogId(catalogId).withName(namespace)) + .getDatabase().getParameters().entrySet().stream() + .collect(toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + catch (EntityNotFoundException e) { + throw new SchemaNotFoundException(namespace); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public Optional getNamespacePrincipal(ConnectorSession session, String namespace) + { + return Optional.empty(); + } + + @Override + public void createNamespace(ConnectorSession session, String namespace, Map properties, TrinoPrincipal owner) + { + try { + stats.getCreateDatabase().call(() -> glueClient.createDatabase(new CreateDatabaseRequest() + .withCatalogId(catalogId) + .withDatabaseInput(new DatabaseInput() + .withName(namespace) + .withParameters(properties.entrySet().stream().collect(toMap(Map.Entry::getKey, e -> e.getValue().toString())))))); + } + catch (AlreadyExistsException e) { + throw new SchemaAlreadyExistsException(namespace); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public void setNamespacePrincipal(ConnectorSession session, String namespace, TrinoPrincipal principal) + { + } + + @Override + public void renameNamespace(ConnectorSession session, String source, String target) + { + throw new TrinoException(NOT_SUPPORTED, "renameNamespace is not supported by Iceberg Glue catalog"); + } + + @Override + public List listTables(ConnectorSession session, Optional namespace) + { + try { + return stats.getGetAllTables().call(() -> { + List namespaces = namespace.isPresent() ? Lists.newArrayList(namespace.get()) : listNamespaces(session); + + List tableNames = new ArrayList<>(); + String nextToken = null; + + for (String ns : namespaces) { + do { + GetTablesResult result = glueClient.getTables(new GetTablesRequest() + .withCatalogId(catalogId) + .withDatabaseName(ns) + .withNextToken(nextToken)); + result.getTableList().stream() + .map(com.amazonaws.services.glue.model.Table::getName) + .forEach(name -> tableNames.add(new SchemaTableName(ns, name))); + nextToken = result.getNextToken(); + } + while (nextToken != null); + } + return tableNames; + }); + } + catch (EntityNotFoundException e) { + // database does not exist + return ImmutableList.of(); + } + catch (AmazonServiceException e) { + throw new TrinoException(ICEBERG_CATALOG_ERROR, e); + } + } + + @Override + public Table loadTable(ConnectorSession session, SchemaTableName table) + { + TableMetadata metadata = tableMetadataCache.computeIfAbsent( + table, + ignore -> { + TableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + return new BaseTable(operations, quotedTableName(table)).operations().current(); + }); + + IcebergTableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + operations.initializeFromMetadata(metadata); + return new BaseTable(operations, quotedTableName(table)); + } + + @Override + public boolean dropTable(ConnectorSession session, SchemaTableName schemaTableName, boolean purgeData) + { + Table table = loadTable(session, schemaTableName); + validateTableCanBeDropped(table, schemaTableName); + try { + stats.getDropTable().call(() -> + glueClient.deleteTable(new DeleteTableRequest() + .withCatalogId(catalogId) + .withDatabaseName(schemaTableName.getSchemaName()) + .withName(schemaTableName.getTableName()))); + } + catch (AmazonServiceException e) { + throw new TrinoException(HIVE_METASTORE_ERROR, e); + } + + Path tableLocation = new Path(table.location()); + if (purgeData) { + try { + hdfsEnvironment.getFileSystem(new HdfsEnvironment.HdfsContext(session), tableLocation).delete(tableLocation, true); + } + catch (Exception e) { + // don't fail if unable to delete path + log.warn(e, "Failed to delete path: " + tableLocation); + } + } + return true; + } + + @Override + public Transaction newCreateTableTransaction(ConnectorSession session, SchemaTableName schemaTableName, Schema schema, PartitionSpec partitionSpec, + String location, Map properties) + { + TableMetadata metadata = newTableMetadata(schema, partitionSpec, location, properties); + TableOperations ops = tableOperationsProvider.createTableOperations( + this, + session, + schemaTableName.getSchemaName(), + schemaTableName.getTableName(), + Optional.of(session.getUser()), + Optional.of(location)); + return createTableTransaction(schemaTableName.toString(), ops, metadata); + } + + @Override + public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) + { + AtomicBoolean newTableCreated = new AtomicBoolean(false); + AtomicBoolean oldTableDeleted = new AtomicBoolean(false); + try { + stats.getRenameTable().call(() -> { + com.amazonaws.services.glue.model.Table table = glueClient.getTable(new GetTableRequest() + .withCatalogId(catalogId) + .withDatabaseName(from.getSchemaName()) + .withName(from.getTableName())) + .getTable(); + + TableInput tableInput = new TableInput() + .withName(to.getTableName()) + .withTableType(table.getTableType()) + .withOwner(table.getOwner()) + .withParameters(table.getParameters()) + .withDescription(table.getDescription()) + .withTargetTable(table.getTargetTable()) + .withLastAccessTime(table.getLastAccessTime()) + .withLastAnalyzedTime(table.getLastAnalyzedTime()) + .withPartitionKeys(table.getPartitionKeys()) + .withRetention(table.getRetention()) + .withStorageDescriptor(table.getStorageDescriptor()) + .withViewExpandedText(table.getViewExpandedText()) + .withViewOriginalText(table.getViewOriginalText()); + + glueClient.createTable(new CreateTableRequest() + .withCatalogId(catalogId) + .withDatabaseName(to.getSchemaName()) + .withTableInput(tableInput)); + newTableCreated.set(true); + + glueClient.deleteTable(new DeleteTableRequest() + .withCatalogId(catalogId) + .withDatabaseName(from.getSchemaName()) + .withName(from.getTableName())); + oldTableDeleted.set(true); + return null; + }); + } + catch (EntityNotFoundException e) { + throw new TableNotFoundException(from); + } + catch (AlreadyExistsException e) { + throw new TableAlreadyExistsException(to); + } + finally { + if (newTableCreated.get() && !oldTableDeleted.get()) { + glueClient.deleteTable(new DeleteTableRequest() + .withCatalogId(catalogId) + .withDatabaseName(to.getSchemaName()) + .withName(to.getTableName())); + } + } + } + + @Override + public void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional comment) + { + Table icebergTable = loadTable(session, schemaTableName); + if (comment.isEmpty()) { + icebergTable.updateProperties().remove(TABLE_COMMENT).commit(); + } + else { + icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); + } + } + + @Override + public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) + { + String dbLocation = stats.getGetDatabase().call(() -> glueClient.getDatabase(new GetDatabaseRequest() + .withCatalogId(catalogId).withName(schemaTableName.getSchemaName())) + .getDatabase().getLocationUri()); + + String location; + if (dbLocation == null) { + if (defaultSchemaLocation.isEmpty()) { + throw new TrinoException(HIVE_DATABASE_LOCATION_ERROR, format("Database '%s' location cannot be determined, " + + "please either set 'location' when creating the database, or set 'iceberg.catalog.warehouse' " + + "to allow a default location at '/.db'", schemaTableName.getSchemaName())); + } + location = format("%s/%s.db/%s", defaultSchemaLocation.get(), schemaTableName.getSchemaName(), schemaTableName.getTableName()); + } + else { + location = format("%s/%s", dbLocation, schemaTableName.getTableName()); + } + + if (isUniqueTableLocation) { + location = location + "-" + randomUUID().toString().replace("-", ""); + } + return location; + } + + @Override + public void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setTablePrincipal is not supported by Iceberg Glue catalog"); + } + + @Override + public void createView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorViewDefinition definition, boolean replace) + { + throw new TrinoException(NOT_SUPPORTED, "createView is not supported by Iceberg Glue catalog"); + } + + @Override + public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameView is not supported by Iceberg Glue catalog"); + } + + @Override + public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, TrinoPrincipal principal) + { + throw new TrinoException(NOT_SUPPORTED, "setViewPrincipal is not supported by Iceberg Glue catalog"); + } + + @Override + public void dropView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropView is not supported by Iceberg Glue catalog"); + } + + @Override + public List listViews(ConnectorSession session, Optional namespace) + { + return ImmutableList.of(); + } + + @Override + public Map getViews(ConnectorSession session, Optional namespace) + { + return ImmutableMap.of(); + } + + @Override + public Optional getView(ConnectorSession session, SchemaTableName viewIdentifier) + { + return Optional.empty(); + } + + @Override + public List listMaterializedViews(ConnectorSession session, Optional namespace) + { + return ImmutableList.of(); + } + + @Override + public void createMaterializedView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorMaterializedViewDefinition definition, + boolean replace, boolean ignoreExisting) + { + throw new TrinoException(NOT_SUPPORTED, "createMaterializedView is not supported by Iceberg Glue catalog"); + } + + @Override + public void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + throw new TrinoException(NOT_SUPPORTED, "dropMaterializedView is not supported by Iceberg Glue catalog"); + } + + @Override + public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName) + { + return Optional.empty(); + } + + @Override + public void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target) + { + throw new TrinoException(NOT_SUPPORTED, "renameMaterializedView is not supported by Iceberg Glue catalog"); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java new file mode 100644 index 000000000000..0ea2d672e37e --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalogFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.AWSGlueAsync; +import io.trino.plugin.hive.HdfsEnvironment; +import io.trino.plugin.hive.metastore.glue.GlueHiveMetastoreConfig; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreStats; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; + +import javax.inject.Inject; + +import java.util.Optional; + +import static io.trino.plugin.hive.metastore.glue.GlueHiveMetastore.createAsyncGlueClient; +import static java.util.Objects.requireNonNull; + +public class TrinoGlueCatalogFactory + implements TrinoCatalogFactory +{ + private final HdfsEnvironment hdfsEnvironment; + private final IcebergTableOperationsProvider tableOperationsProvider; + private final Optional defaultSchemaLocation; + private final boolean isUniqueTableLocation; + private final AWSGlueAsync glueClient; + private final String catalogId; + private final GlueMetastoreStats stats = new GlueMetastoreStats(); + + @Inject + public TrinoGlueCatalogFactory( + HdfsEnvironment hdfsEnvironment, + IcebergTableOperationsProvider tableOperationsProvider, + GlueHiveMetastoreConfig glueConfig, + IcebergConfig icebergConfig) + { + this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null"); + this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null"); + requireNonNull(glueConfig, "glueConfig is null"); + this.glueClient = createAsyncGlueClient(glueConfig, Optional.empty(), stats.newRequestMetricsCollector()); + this.catalogId = glueConfig.getCatalogId().orElse(null); + requireNonNull(icebergConfig, "icebergConfig is null"); + this.defaultSchemaLocation = icebergConfig.getDefaultSchemaLocation(); + this.isUniqueTableLocation = icebergConfig.isUniqueTableLocation(); + } + + @Override + public TrinoCatalog create() + { + return new TrinoGlueCatalog(hdfsEnvironment, tableOperationsProvider, glueClient, stats, catalogId, defaultSchemaLocation, isUniqueTableLocation); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java new file mode 100644 index 000000000000..eb3c7a34d505 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/AbstractMetastoreTableOperations.java @@ -0,0 +1,129 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.hms; + +import io.trino.plugin.hive.authentication.HiveIdentity; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.MetastoreUtil; +import io.trino.plugin.hive.metastore.PrincipalPrivileges; +import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.iceberg.UnknownTableTypeException; +import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.TableNotFoundException; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.io.FileIO; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.Optional; + +import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; +import static io.trino.plugin.hive.ViewReaderUtil.isHiveOrPrestoView; +import static io.trino.plugin.hive.ViewReaderUtil.isPrestoView; +import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; + +@NotThreadSafe +public abstract class AbstractMetastoreTableOperations + extends AbstractIcebergTableOperations +{ + protected HiveMetastore metastore; + + public AbstractMetastoreTableOperations( + FileIO fileIo, + HiveMetastore metastore, + ConnectorSession session, + String database, + String table, + Optional owner, + Optional location) + { + super(fileIo, session, database, table, owner, location); + this.metastore = requireNonNull(metastore, "metastore is null"); + } + + @Override + protected String getRefreshedLocation() + { + Table table = getTable(); + + if (isPrestoView(table) && isHiveOrPrestoView(table)) { + // this is a Hive view, hence not a table + throw new TableNotFoundException(getSchemaTableName()); + } + if (!isIcebergTable(table)) { + throw new UnknownTableTypeException(getSchemaTableName()); + } + + String metadataLocation = table.getParameters().get(METADATA_LOCATION); + if (metadataLocation == null) { + throw new TrinoException(ICEBERG_INVALID_METADATA, format("Table is missing [%s] property: %s", METADATA_LOCATION, getSchemaTableName())); + } + + return metadataLocation; + } + + @Override + protected void commitNewTable(TableMetadata metadata) + { + String newMetadataLocation = writeNewMetadata(metadata, version + 1); + + Table table; + try { + Table.Builder builder = Table.builder() + .setDatabaseName(database) + .setTableName(tableName) + .setOwner(owner) + .setTableType(TableType.EXTERNAL_TABLE.name()) + .setDataColumns(toHiveColumns(metadata.schema().columns())) + .withStorage(storage -> storage.setLocation(metadata.location())) + .withStorage(storage -> storage.setStorageFormat(STORAGE_FORMAT)) + .setParameter("EXTERNAL", "TRUE") + .setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE) + .setParameter(METADATA_LOCATION, newMetadataLocation); + String tableComment = metadata.properties().get(TABLE_COMMENT); + if (tableComment != null) { + builder.setParameter(TABLE_COMMENT, tableComment); + } + table = builder.build(); + } + catch (RuntimeException e) { + try { + io().deleteFile(newMetadataLocation); + } + catch (RuntimeException ex) { + e.addSuppressed(ex); + } + throw e; + } + + PrincipalPrivileges privileges = owner.map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES); + HiveIdentity identity = new HiveIdentity(session); + metastore.createTable(identity, table, privileges); + } + + protected Table getTable() + { + return metastore.getTable(new HiveIdentity(session), database, tableName) + .orElseThrow(() -> new TableNotFoundException(getSchemaTableName())); + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java index 32c7f806b13a..ca7493573c69 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java @@ -18,7 +18,6 @@ import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.Table; import io.trino.plugin.hive.metastore.thrift.ThriftMetastore; -import io.trino.plugin.iceberg.catalog.AbstractMetastoreTableOperations; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.TableNotFoundException; import org.apache.iceberg.TableMetadata; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java index 38422ae428ba..76cb147056a2 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperationsProvider.java @@ -14,11 +14,11 @@ package io.trino.plugin.iceberg.catalog.hms; import io.trino.plugin.hive.HdfsEnvironment.HdfsContext; -import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.thrift.ThriftMetastore; import io.trino.plugin.iceberg.FileIoProvider; import io.trino.plugin.iceberg.catalog.IcebergTableOperations; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.spi.connector.ConnectorSession; import javax.inject.Inject; @@ -42,7 +42,7 @@ public HiveMetastoreTableOperationsProvider(FileIoProvider fileIoProvider, Thrif @Override public IcebergTableOperations createTableOperations( - HiveMetastore hiveMetastore, + TrinoCatalog catalog, ConnectorSession session, String database, String table, @@ -51,7 +51,7 @@ public IcebergTableOperations createTableOperations( { return new HiveMetastoreTableOperations( fileIoProvider.createFileIo(new HdfsContext(session), session.getQueryId()), - hiveMetastore, + ((TrinoHiveCatalog) catalog).getMetastore(), thriftMetastore, session, database, diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java index 65ab21e6e990..f9a741c40127 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/IcebergHiveMetastoreCatalogModule.java @@ -18,6 +18,7 @@ import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreModule; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; public class IcebergHiveMetastoreCatalogModule extends AbstractConfigurationAwareModule @@ -27,5 +28,7 @@ protected void setup(Binder binder) { install(new ThriftMetastoreModule()); binder.bind(IcebergTableOperationsProvider.class).to(HiveMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON); + binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON); + binder.bind(MetastoreValidator.class).asEagerSingleton(); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/MetastoreValidator.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/MetastoreValidator.java new file mode 100644 index 000000000000..1c276e25f634 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/MetastoreValidator.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.hms; + +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; + +import javax.inject.Inject; + +public class MetastoreValidator +{ + @Inject + public MetastoreValidator(HiveMetastore metastore) + { + if (metastore instanceof CachingHiveMetastore) { + throw new RuntimeException("Hive metastore caching must not be enabled for Iceberg"); + } + } +} diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java similarity index 97% rename from plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java rename to plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index c36209e1551a..7dafa8c2f8fd 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.iceberg; +package io.trino.plugin.iceberg.catalog.hms; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -32,7 +32,10 @@ import io.trino.plugin.hive.metastore.HivePrincipal; import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.util.HiveUtil; +import io.trino.plugin.iceberg.IcebergMaterializedViewDefinition; +import io.trino.plugin.iceberg.IcebergUtil; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.spi.TrinoException; import io.trino.spi.connector.CatalogSchemaTableName; import io.trino.spi.connector.ColumnMetadata; @@ -89,6 +92,7 @@ import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; +import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; import static io.trino.plugin.iceberg.PartitionFields.toPartitionFields; import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; @@ -102,12 +106,9 @@ import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.TableMetadata.newTableMetadata; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; -import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; -import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; -import static org.apache.iceberg.TableProperties.WRITE_NEW_DATA_LOCATION; import static org.apache.iceberg.Transactions.createTableTransaction; -class TrinoHiveCatalog +public class TrinoHiveCatalog implements TrinoCatalog { private static final Logger log = Logger.get(TrinoHiveCatalog.class); @@ -154,6 +155,11 @@ public TrinoHiveCatalog( this.isUsingSystemSecurity = isUsingSystemSecurity; } + public HiveMetastore getMetastore() + { + return metastore; + } + @Override public List listNamespaces(ConnectorSession session) { @@ -237,7 +243,7 @@ public Transaction newCreateTableTransaction(ConnectorSession session, SchemaTab { TableMetadata metadata = newTableMetadata(schema, partitionSpec, location, properties); TableOperations ops = tableOperationsProvider.createTableOperations( - metastore, + this, session, schemaTableName.getSchemaName(), schemaTableName.getTableName(), @@ -272,11 +278,7 @@ public boolean dropTable(ConnectorSession session, SchemaTableName schemaTableNa { // TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861 Table table = loadTable(session, schemaTableName); - if (table.properties().containsKey(OBJECT_STORE_PATH) || - table.properties().containsKey(WRITE_NEW_DATA_LOCATION) || - table.properties().containsKey(WRITE_METADATA_LOCATION)) { - throw new TrinoException(NOT_SUPPORTED, "Table " + schemaTableName + " contains Iceberg path override properties and cannot be dropped from Trino"); - } + validateTableCanBeDropped(table, schemaTableName); metastore.dropTable(new HiveIdentity(session), schemaTableName.getSchemaName(), schemaTableName.getTableName(), purgeData); return true; } @@ -292,9 +294,9 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName { TableMetadata metadata = tableMetadataCache.computeIfAbsent( schemaTableName, - ignore -> ((BaseTable) loadIcebergTable(metastore, tableOperationsProvider, session, schemaTableName)).operations().current()); + ignore -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); - return getIcebergTableWithMetadata(metastore, tableOperationsProvider, session, schemaTableName, metadata); + return getIcebergTableWithMetadata(this, tableOperationsProvider, session, schemaTableName, metadata); } @Override diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java similarity index 71% rename from plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalogFactory.java rename to plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java index 060898f56f40..b69042743fde 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/TrinoCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalogFactory.java @@ -11,24 +11,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.iceberg; +package io.trino.plugin.iceberg.catalog.hms; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsEnvironment; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.IcebergSecurityConfig; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; -import io.trino.spi.TrinoException; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.spi.type.TypeManager; import javax.inject.Inject; import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore; import static io.trino.plugin.iceberg.IcebergSecurityConfig.IcebergSecurity.SYSTEM; -import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; -public class TrinoCatalogFactory +public class TrinoHiveCatalogFactory + implements TrinoCatalogFactory { private final CatalogName catalogName; private final HiveMetastore metastore; @@ -36,12 +39,11 @@ public class TrinoCatalogFactory private final TypeManager typeManager; private final IcebergTableOperationsProvider tableOperationsProvider; private final String trinoVersion; - private final CatalogType catalogType; private final boolean isUniqueTableLocation; private final boolean isUsingSystemSecurity; @Inject - public TrinoCatalogFactory( + public TrinoHiveCatalogFactory( IcebergConfig config, CatalogName catalogName, HiveMetastore metastore, @@ -58,29 +60,21 @@ public TrinoCatalogFactory( this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationProvider is null"); this.trinoVersion = requireNonNull(nodeVersion, "trinoVersion is null").toString(); requireNonNull(config, "config is null"); - this.catalogType = config.getCatalogType(); this.isUniqueTableLocation = config.isUniqueTableLocation(); this.isUsingSystemSecurity = securityConfig.getSecuritySystem() == SYSTEM; } + @Override public TrinoCatalog create() { - switch (catalogType) { - case TESTING_FILE_METASTORE: - case HIVE_METASTORE: - return new TrinoHiveCatalog( - catalogName, - memoizeMetastore(metastore, 1000), - hdfsEnvironment, - typeManager, - tableOperationsProvider, - trinoVersion, - isUniqueTableLocation, - isUsingSystemSecurity); - case GLUE: - // TODO not supported yet - throw new TrinoException(NOT_SUPPORTED, "Unknown Trino Iceberg catalog type"); - } - throw new TrinoException(NOT_SUPPORTED, "Unsupported Trino Iceberg catalog type " + catalogType); + return new TrinoHiveCatalog( + catalogName, + memoizeMetastore(metastore, 1000), + hdfsEnvironment, + typeManager, + tableOperationsProvider, + trinoVersion, + isUniqueTableLocation, + isUsingSystemSecurity); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java index 88eb0b5e6b06..89305c31090a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/IcebergQueryRunner.java @@ -28,9 +28,12 @@ import java.util.Map; import java.util.Optional; +import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; +import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE; import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.trino.testing.QueryAssertions.copyTpchTables; import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.util.Locale.ENGLISH; public final class IcebergQueryRunner { @@ -86,7 +89,14 @@ public static DistributedQueryRunner createIcebergQueryRunner( queryRunner.installPlugin(new IcebergPlugin()); connectorProperties = new HashMap<>(ImmutableMap.copyOf(connectorProperties)); connectorProperties.putIfAbsent("iceberg.catalog.type", "TESTING_FILE_METASTORE"); - connectorProperties.putIfAbsent("hive.metastore.catalog.dir", dataDir.toString()); + CatalogType catalogType = CatalogType.valueOf(connectorProperties.get("iceberg.catalog.type").toUpperCase(ENGLISH)); + if (catalogType == TESTING_FILE_METASTORE || catalogType == HIVE_METASTORE) { + connectorProperties.putIfAbsent("hive.metastore.catalog.dir", dataDir.toString()); + } + else { + connectorProperties.putIfAbsent("iceberg.default-schema-location", dataDir.toString()); + } + queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", connectorProperties); queryRunner.execute("CREATE SCHEMA tpch"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index 23bd92ef1daf..199a062c38a8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -44,7 +44,8 @@ public void testDefaults() .setCatalogType(HIVE_METASTORE) .setDynamicFilteringWaitTimeout(new Duration(0, MINUTES)) .setTableStatisticsEnabled(true) - .setProjectionPushdownEnabled(true)); + .setProjectionPushdownEnabled(true) + .setDefaultSchemaLocation(null)); } @Test @@ -60,6 +61,7 @@ public void testExplicitPropertyMappings() .put("iceberg.dynamic-filtering.wait-timeout", "1h") .put("iceberg.table-statistics-enabled", "false") .put("iceberg.projection-pushdown-enabled", "false") + .put("iceberg.default-schema-location", "/tmp") .build(); IcebergConfig expected = new IcebergConfig() @@ -71,7 +73,8 @@ public void testExplicitPropertyMappings() .setCatalogType(GLUE) .setDynamicFilteringWaitTimeout(Duration.valueOf("1h")) .setTableStatisticsEnabled(false) - .setProjectionPushdownEnabled(false); + .setProjectionPushdownEnabled(false) + .setDefaultSchemaLocation("/tmp"); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogConnectorSmokeTest.java new file mode 100644 index 000000000000..7fd01a5231cc --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGlueCatalogConnectorSmokeTest.java @@ -0,0 +1,114 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import io.trino.testing.BaseConnectorSmokeTest; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import org.testng.annotations.Test; + +import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/* + * TestIcebergGlueCatalogConnectorSmokeTest currently uses AWS Default Credential Provider Chain, + * See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default + * on ways to set your AWS credentials which will be needed to run this test. + */ +public class TestIcebergGlueCatalogConnectorSmokeTest + extends BaseConnectorSmokeTest +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return createIcebergQueryRunner( + ImmutableMap.of(), + ImmutableMap.of( + "iceberg.file-format", "orc", + "iceberg.catalog.type", "glue"), + REQUIRED_TPCH_TABLES); + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) + { + switch (connectorBehavior) { + case SUPPORTS_RENAME_SCHEMA: + case SUPPORTS_COMMENT_ON_COLUMN: + case SUPPORTS_TOPN_PUSHDOWN: + case SUPPORTS_CREATE_VIEW: + case SUPPORTS_CREATE_MATERIALIZED_VIEW: + case SUPPORTS_RENAME_MATERIALIZED_VIEW: + case SUPPORTS_RENAME_MATERIALIZED_VIEW_ACROSS_SCHEMAS: + return false; + + case SUPPORTS_DELETE: + return true; + default: + return super.hasBehavior(connectorBehavior); + } + } + + @Test + @Override + public void testRowLevelDelete() + { + // Deletes are covered AbstractTestIcebergConnectorTest + assertThatThrownBy(super::testRowLevelDelete) + .hasStackTraceContaining("This connector only supports delete where one or more identity-transformed partitions are deleted entirely"); + } + + @Test + @Override + public void testShowCreateTable() + { + assertThat((String) computeScalar("SHOW CREATE TABLE region")) + .isEqualTo("" + + "CREATE TABLE iceberg.tpch.region (\n" + + " regionkey bigint,\n" + + " name varchar,\n" + + " comment varchar\n" + + ")\n" + + "WITH (\n" + + " format = 'ORC'\n" + + ")"); + } + + @Test + @Override + public void testView() + { + assertThatThrownBy(super::testView) + .hasStackTraceContaining("createView is not supported by Iceberg Glue catalog"); + } + + @Test + @Override + public void testMaterializedView() + { + assertThatThrownBy(super::testMaterializedView) + .hasStackTraceContaining("createMaterializedView is not supported by Iceberg Glue catalog"); + } + + @Test + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasStackTraceContaining("renameNamespace is not supported by Iceberg Glue catalog"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java index 4282266ca2a8..70395c11eb0c 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java @@ -67,11 +67,13 @@ public void testGlueMetastore() { ConnectorFactory factory = getConnectorFactory(); - assertThatThrownBy(() -> factory.create( + factory.create( "test", - Map.of("iceberg.catalog.type", "glue"), - new TestingConnectorContext())) - .hasMessageContaining("Explicit bindings are required and HiveMetastore is not explicitly bound"); + Map.of( + "iceberg.catalog.type", "glue", + "hive.metastore.glue.region", "us-east-1"), + new TestingConnectorContext()) + .shutdown(); assertThatThrownBy(() -> factory.create( "test", @@ -105,8 +107,7 @@ public void testRecordingMetastore() "hive.metastore.glue.region", "us-east-2", "hive.metastore-recording-path", "/tmp"), new TestingConnectorContext())) - .hasMessageContaining("Configuration property 'hive.metastore-recording-path' was not used") - .hasMessageContaining("Configuration property 'hive.metastore.glue.region' was not used"); + .hasMessageContaining("Configuration property 'hive.metastore-recording-path' was not used"); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java index b724e784ee71..4748b3b606f9 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergSplitSource.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.units.Duration; +import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsConfig; import io.trino.plugin.hive.HdfsConfiguration; import io.trino.plugin.hive.HdfsConfigurationInitializer; @@ -25,7 +26,9 @@ import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.SchemaTableName; @@ -34,6 +37,7 @@ import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.predicate.ValueSet; +import io.trino.spi.type.TestingTypeManager; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; import org.apache.iceberg.Table; @@ -57,7 +61,6 @@ import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; import static io.trino.plugin.iceberg.IcebergQueryRunner.createIcebergQueryRunner; -import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; import static io.trino.spi.connector.Constraint.alwaysTrue; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.testing.TestingConnectorSession.SESSION; @@ -71,8 +74,7 @@ public class TestIcebergSplitSource extends AbstractTestQueryFramework { private File metastoreDir; - private HiveMetastore metastore; - private IcebergTableOperationsProvider operationsProvider; + private TrinoCatalog catalog; @Override protected QueryRunner createQueryRunner() @@ -84,8 +86,9 @@ protected QueryRunner createQueryRunner() File tempDir = Files.createTempDirectory("test_iceberg_split_source").toFile(); this.metastoreDir = new File(tempDir, "iceberg_data"); - this.metastore = createTestingFileHiveMetastore(metastoreDir); - this.operationsProvider = new FileMetastoreTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment)); + HiveMetastore metastore = createTestingFileHiveMetastore(metastoreDir); + IcebergTableOperationsProvider operationsProvider = new FileMetastoreTableOperationsProvider(metastore, new HdfsFileIoProvider(hdfsEnvironment)); + this.catalog = new TrinoHiveCatalog(new CatalogName("hive"), metastore, hdfsEnvironment, new TestingTypeManager(), operationsProvider, "test", false, false); return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of(), ImmutableList.of(NATION), Optional.of(metastoreDir)); } @@ -112,7 +115,7 @@ public void testIncompleteDynamicFilterTimeout() TupleDomain.all(), ImmutableSet.of(), Optional.empty()); - Table nationTable = loadIcebergTable(metastore, operationsProvider, SESSION, schemaTableName); + Table nationTable = catalog.loadTable(SESSION, schemaTableName); IcebergSplitSource splitSource = new IcebergSplitSource( tableHandle, diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java index de93d56af7f7..deeebfe30de7 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.HdfsConfig; import io.trino.plugin.hive.HdfsConfiguration; import io.trino.plugin.hive.HdfsConfigurationInitializer; @@ -24,8 +25,11 @@ import io.trino.plugin.hive.authentication.NoHdfsAuthentication; import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TestingTypeManager; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; import org.apache.hadoop.fs.FileSystem; @@ -178,8 +182,9 @@ private void writeEqualityDeleteToNationTable(Table icebergTable) private Table updateTableToV2(String tableName) { - IcebergTableOperationsProvider tableOperationsProvider = new FileMetastoreTableOperationsProvider(new HdfsFileIoProvider(hdfsEnvironment)); - BaseTable table = (BaseTable) loadIcebergTable(metastore, tableOperationsProvider, SESSION, new SchemaTableName("tpch", tableName)); + IcebergTableOperationsProvider tableOperationsProvider = new FileMetastoreTableOperationsProvider(metastore, new HdfsFileIoProvider(hdfsEnvironment)); + TrinoCatalog catalog = new TrinoHiveCatalog(new CatalogName("hive"), metastore, hdfsEnvironment, new TestingTypeManager(), tableOperationsProvider, "test", false, false); + BaseTable table = (BaseTable) loadIcebergTable(catalog, tableOperationsProvider, SESSION, new SchemaTableName("tpch", tableName)); TableOperations operations = table.operations(); TableMetadata currentMetadata = operations.current();