diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst index 3309f7f3e040..4cb0a5c6bd6b 100644 --- a/docs/src/main/sphinx/connector/iceberg.rst +++ b/docs/src/main/sphinx/connector/iceberg.rst @@ -158,6 +158,9 @@ is used. materialized view definition. When the ``storage_schema`` materialized view property is specified, it takes precedence over this catalog property. - Empty + * - ``iceberg.register-table-procedure.enabled`` + - Enable to allow user to call ``register_table`` procedure + - ``false`` ORC format configuration ^^^^^^^^^^^^^^^^^^^^^^^^ @@ -748,6 +751,25 @@ and rename operations, including in nested structures. Table partitioning can also be changed and the connector can still query data created before the partitioning change. +Register table +-------------- +The connector can register existing Iceberg tables with the catalog. + +An SQL procedure ``system.register_table`` allows the caller to register an existing Iceberg +table in the metastore, using its existing metadata and data files:: + + CALL iceberg.system.register_table(schema_name => 'testdb', table_name => 'customer_orders', table_location => 'hdfs://hadoop-master:9000/user/hive/warehouse/customer_orders-581fad8517934af6be1857a903559d44') + +In addition, you can provide a file name to register a table +with specific metadata. This may be used to register the table with +some specific table state, or may be necessary if the connector cannot +automatically figure out the metadata version to use:: + + CALL iceberg.system.register_table(schema_name => 'testdb', table_name => 'customer_orders', table_location => 'hdfs://hadoop-master:9000/user/hive/warehouse/customer_orders-581fad8517934af6be1857a903559d44', metadata_file_name => '00003-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json') + +To prevent unauthorized users from accessing data, this procedure is disabled by default. +The procedure is enabled only when ``iceberg.register-table-procedure.enabled`` is set to ``true``. + Migrating existing tables ------------------------- diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index 77779cd9bc2c..c4c8f64adb9c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -56,6 +56,7 @@ public class IcebergConfig private boolean tableStatisticsEnabled = true; private boolean extendedStatisticsEnabled; private boolean projectionPushdownEnabled = true; + private boolean registerTableProcedureEnabled; private Optional hiveCatalogName = Optional.empty(); private int formatVersion = FORMAT_VERSION_SUPPORT_MAX; private Duration expireSnapshotsMinRetention = new Duration(7, DAYS); @@ -210,6 +211,19 @@ public IcebergConfig setProjectionPushdownEnabled(boolean projectionPushdownEnab return this; } + public boolean isRegisterTableProcedureEnabled() + { + return registerTableProcedureEnabled; + } + + @Config("iceberg.register-table-procedure.enabled") + @ConfigDescription("Allow users to call the register_table procedure") + public IcebergConfig setRegisterTableProcedureEnabled(boolean registerTableProcedureEnabled) + { + this.registerTableProcedureEnabled = registerTableProcedureEnabled; + return this; + } + public Optional getHiveCatalogName() { return hiveCatalogName; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java index def2527db491..eba276a63a40 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java @@ -28,6 +28,7 @@ import io.trino.plugin.iceberg.procedure.DropExtendedStatsTableProcedure; import io.trino.plugin.iceberg.procedure.ExpireSnapshotsTableProcedure; import io.trino.plugin.iceberg.procedure.OptimizeTableProcedure; +import io.trino.plugin.iceberg.procedure.RegisterTableProcedure; import io.trino.plugin.iceberg.procedure.RemoveOrphanFilesTableProcedure; import io.trino.spi.connector.ConnectorNodePartitioningProvider; import io.trino.spi.connector.ConnectorPageSinkProvider; @@ -80,6 +81,7 @@ public void configure(Binder binder) Multibinder procedures = newSetBinder(binder, Procedure.class); procedures.addBinding().toProvider(RollbackToSnapshotProcedure.class).in(Scopes.SINGLETON); + procedures.addBinding().toProvider(RegisterTableProcedure.class).in(Scopes.SINGLETON); Multibinder tableProcedures = newSetBinder(binder, TableProcedureMetadata.class); tableProcedures.addBinding().toProvider(OptimizeTableProcedure.class).in(Scopes.SINGLETON); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java index ebe65349d25d..7e65b86e56e6 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.airlift.slice.SliceUtf8; import io.airlift.slice.Slices; @@ -73,6 +74,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -87,6 +89,7 @@ import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_BAD_DATA; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE; import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_COLUMNS_KEY; import static io.trino.plugin.iceberg.IcebergMetadata.ORC_BLOOM_FILTER_FPP_KEY; @@ -128,6 +131,7 @@ import static java.lang.Double.parseDouble; import static java.lang.Float.floatToRawIntBits; import static java.lang.Float.parseFloat; +import static java.lang.Integer.parseInt; import static java.lang.Long.parseLong; import static java.lang.String.format; import static java.util.Comparator.comparing; @@ -147,6 +151,9 @@ public final class IcebergUtil { + private static final Logger log = Logger.get(IcebergUtil.class); + + public static final String METADATA_FOLDER_NAME = "metadata"; public static final String METADATA_FILE_EXTENSION = ".metadata.json"; private static final Pattern SIMPLE_NAME = Pattern.compile("[a-z][a-z0-9]*"); @@ -626,6 +633,23 @@ private static void validateOrcBloomFilterColumns(ConnectorTableMetadata tableMe } } + public static OptionalInt parseVersion(String metadataLocation) + throws TrinoException + { + int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0 + int versionEnd = metadataLocation.indexOf('-', versionStart); + if (versionStart == 0 || versionEnd == -1) { + throw new TrinoException(ICEBERG_BAD_DATA, "Invalid metadata location: " + metadataLocation); + } + try { + return OptionalInt.of(parseInt(metadataLocation.substring(versionStart, versionEnd))); + } + catch (NumberFormatException e) { + log.warn(e, "Unable to parse version from metadata location: %s", metadataLocation); + return OptionalInt.empty(); + } + } + public static String fixBrokenMetadataLocation(String location) { // Version 393-394 stored metadata location with double slash https://github.com/trinodb/trino/commit/e95fdcc7d1ec110b10977d17458e06fc4e6f217d#diff-9bbb7c0b6168f0e6b4732136f9a97f820aa082b04efb5609b6138afc118831d7R46 diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index b18e0450ddef..8430af799340 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.iceberg.catalog; -import io.airlift.log.Logger; import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.StorageFormat; import io.trino.spi.connector.ConnectorSession; @@ -42,9 +41,10 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.hive.HiveType.toHiveType; +import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME; import static io.trino.plugin.iceberg.IcebergUtil.fixBrokenMetadataLocation; import static io.trino.plugin.iceberg.IcebergUtil.getLocationProvider; -import static java.lang.Integer.parseInt; +import static io.trino.plugin.iceberg.IcebergUtil.parseVersion; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; @@ -58,10 +58,6 @@ public abstract class AbstractIcebergTableOperations implements IcebergTableOperations { - private static final Logger log = Logger.get(AbstractIcebergTableOperations.class); - - protected static final String METADATA_FOLDER_NAME = "metadata"; - public static final StorageFormat ICEBERG_METASTORE_STORAGE_FORMAT = StorageFormat.create( LazySimpleSerDe.class.getName(), FileInputFormat.class.getName(), @@ -102,7 +98,7 @@ public void initializeFromMetadata(TableMetadata tableMetadata) currentMetadata = tableMetadata; currentMetadataLocation = tableMetadata.metadataFileLocation(); shouldRefresh = false; - version = parseVersion(currentMetadataLocation); + version = parseVersion(currentMetadataLocation).orElse(-1); } @Override @@ -232,7 +228,7 @@ protected void refreshFromMetadataLocation(String newLocation) currentMetadata = newMetadata.get(); currentMetadataLocation = newLocation; - version = parseVersion(newLocation); + version = parseVersion(newLocation).orElse(-1); shouldRefresh = false; } @@ -251,19 +247,6 @@ protected static String metadataFileLocation(TableMetadata metadata, String file return format("%s/%s/%s", stripTrailingSlash(metadata.location()), METADATA_FOLDER_NAME, filename); } - protected static int parseVersion(String metadataLocation) - { - int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0 - int versionEnd = metadataLocation.indexOf('-', versionStart); - try { - return parseInt(metadataLocation.substring(versionStart, versionEnd)); - } - catch (NumberFormatException | IndexOutOfBoundsException e) { - log.warn(e, "Unable to parse version from metadata location: %s", metadataLocation); - return -1; - } - } - protected static List toHiveColumns(List columns) { return columns.stream() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index 8a6ab955d2dd..a171eceb888f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -73,6 +73,8 @@ Transaction newCreateTableTransaction( String location, Map properties); + void registerTable(ConnectorSession session, SchemaTableName tableName, String tableLocation, String metadataLocation); + void dropTable(ConnectorSession session, SchemaTableName schemaTableName); void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 107bfa6fa446..7c2f8271db72 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -110,7 +110,9 @@ import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW; +import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.CatalogUtil.dropTableData; public class TrinoGlueCatalog @@ -370,6 +372,17 @@ public Transaction newCreateTableTransaction( Optional.of(session.getUser())); } + @Override + public void registerTable(ConnectorSession session, SchemaTableName schemaTableName, String tableLocation, String metadataLocation) + throws TrinoException + { + TableInput tableInput = getTableInput(schemaTableName.getTableName(), Optional.of(session.getUser()), ImmutableMap.builder() + .put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(ENGLISH)) + .put(METADATA_LOCATION_PROP, metadataLocation) + .buildOrThrow()); + createTable(schemaTableName.getSchemaName(), tableInput); + } + @Override public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index a7f925f7b62a..eaf6e969b543 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -24,6 +24,7 @@ import io.trino.plugin.hive.metastore.Column; import io.trino.plugin.hive.metastore.Database; import io.trino.plugin.hive.metastore.HivePrincipal; +import io.trino.plugin.hive.metastore.MetastoreUtil; import io.trino.plugin.hive.metastore.PrincipalPrivileges; import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; import io.trino.plugin.hive.util.HiveUtil; @@ -42,6 +43,7 @@ import io.trino.spi.connector.ViewNotFoundException; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.type.TypeManager; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.iceberg.BaseTable; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -83,6 +85,7 @@ import static io.trino.plugin.iceberg.IcebergUtil.isIcebergTable; import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable; import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped; +import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT; import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -94,6 +97,9 @@ import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.hive.metastore.TableType.VIRTUAL_VIEW; +import static org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE; +import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP; +import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP; import static org.apache.iceberg.CatalogUtil.dropTableData; public class TrinoHiveCatalog @@ -261,6 +267,29 @@ public Transaction newCreateTableTransaction( isUsingSystemSecurity ? Optional.empty() : Optional.of(session.getUser())); } + @Override + public void registerTable(ConnectorSession session, SchemaTableName schemaTableName, String tableLocation, String metadataLocation) + throws TrinoException + { + Optional owner = isUsingSystemSecurity ? Optional.empty() : Optional.of(session.getUser()); + + io.trino.plugin.hive.metastore.Table.Builder builder = io.trino.plugin.hive.metastore.Table.builder() + .setDatabaseName(schemaTableName.getSchemaName()) + .setTableName(schemaTableName.getTableName()) + .setOwner(owner) + // Table needs to be EXTERNAL, otherwise table rename in HMS would rename table directory and break table contents. + .setTableType(TableType.EXTERNAL_TABLE.name()) + .withStorage(storage -> storage.setLocation(tableLocation)) + .withStorage(storage -> storage.setStorageFormat(ICEBERG_METASTORE_STORAGE_FORMAT)) + // This is a must-have property for the EXTERNAL_TABLE table type + .setParameter("EXTERNAL", "TRUE") + .setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(ENGLISH)) + .setParameter(METADATA_LOCATION_PROP, metadataLocation); + + PrincipalPrivileges privileges = owner.map(MetastoreUtil::buildInitialPrivilegeSet).orElse(NO_PRIVILEGES); + metastore.createTable(builder.build(), privileges); + } + @Override public List listTables(ConnectorSession session, Optional namespace) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/RegisterTableProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/RegisterTableProcedure.java new file mode 100644 index 000000000000..b914531567d7 --- /dev/null +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/RegisterTableProcedure.java @@ -0,0 +1,229 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.procedure; + +import com.google.common.collect.ImmutableList; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.iceberg.IcebergConfig; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.spi.TrinoException; +import io.trino.spi.classloader.ThreadContextClassLoader; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.procedure.Procedure; +import org.apache.iceberg.TableMetadataParser; + +import javax.inject.Inject; +import javax.inject.Provider; + +import java.io.IOException; +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR; +import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; +import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FILE_EXTENSION; +import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME; +import static io.trino.plugin.iceberg.IcebergUtil.parseVersion; +import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR; +import static io.trino.spi.StandardErrorCode.PERMISSION_DENIED; +import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_FOUND; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.lang.String.format; +import static java.lang.invoke.MethodHandles.lookup; +import static java.util.Objects.requireNonNull; +import static org.apache.iceberg.util.LocationUtil.stripTrailingSlash; + +public class RegisterTableProcedure + implements Provider +{ + private static final MethodHandle REGISTER_TABLE; + + private static final String PROCEDURE_NAME = "register_table"; + private static final String SYSTEM_SCHEMA = "system"; + + private static final String SCHEMA_NAME = "SCHEMA_NAME"; + private static final String TABLE_NAME = "TABLE_NAME"; + private static final String TABLE_LOCATION = "TABLE_LOCATION"; + private static final String METADATA_FILE_NAME = "METADATA_FILE_NAME"; + + static { + try { + REGISTER_TABLE = lookup().unreflect(RegisterTableProcedure.class.getMethod("registerTable", ConnectorSession.class, String.class, String.class, String.class, String.class)); + } + catch (ReflectiveOperationException e) { + throw new AssertionError(e); + } + } + + private final TrinoCatalogFactory catalogFactory; + private final TrinoFileSystemFactory fileSystemFactory; + private final ClassLoader classLoader; + private final boolean registerTableProcedureEnabled; + + @Inject + public RegisterTableProcedure(TrinoCatalogFactory catalogFactory, TrinoFileSystemFactory fileSystemFactory, IcebergConfig icebergConfig) + { + this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + // this class is loaded by PluginClassLoader, and we need its reference to be stored + this.classLoader = getClass().getClassLoader(); + this.registerTableProcedureEnabled = requireNonNull(icebergConfig, "icebergConfig is null").isRegisterTableProcedureEnabled(); + } + + @Override + public Procedure get() + { + return new Procedure( + SYSTEM_SCHEMA, + PROCEDURE_NAME, + ImmutableList.of( + new Procedure.Argument(SCHEMA_NAME, VARCHAR), + new Procedure.Argument(TABLE_NAME, VARCHAR), + new Procedure.Argument(TABLE_LOCATION, VARCHAR), + new Procedure.Argument(METADATA_FILE_NAME, VARCHAR, false, null)), + REGISTER_TABLE.bindTo(this)); + } + + public void registerTable( + ConnectorSession clientSession, + String schemaName, + String tableName, + String tableLocation, + String metadataFileName) + { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + doRegisterTable( + clientSession, + schemaName, + tableName, + tableLocation, + Optional.ofNullable(metadataFileName)); + } + } + + private void doRegisterTable( + ConnectorSession clientSession, + String schemaName, + String tableName, + String tableLocation, + Optional metadataFileName) + { + if (!registerTableProcedureEnabled) { + throw new TrinoException(PERMISSION_DENIED, "register_table procedure is disabled"); + } + checkProcedureArgument(schemaName != null && !schemaName.isEmpty(), "schema_name cannot be null or empty"); + checkProcedureArgument(tableName != null && !tableName.isEmpty(), "table_name cannot be null or empty"); + checkProcedureArgument(tableLocation != null && !tableLocation.isEmpty(), "table_location cannot be null or empty"); + metadataFileName.ifPresent(RegisterTableProcedure::validateMetadataFileName); + + SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); + TrinoCatalog catalog = catalogFactory.create(clientSession.getIdentity()); + if (!catalog.namespaceExists(clientSession, schemaTableName.getSchemaName())) { + throw new TrinoException(SCHEMA_NOT_FOUND, format("Schema '%s' does not exist", schemaTableName.getSchemaName())); + } + + TrinoFileSystem fileSystem = fileSystemFactory.create(clientSession); + String metadataLocation = getMetadataLocation(fileSystem, tableLocation, metadataFileName); + validateLocation(fileSystem, metadataLocation); + try { + // Try to read the metadata file. Invalid metadata file will throw the exception. + TableMetadataParser.read(fileSystem.toFileIo(), metadataLocation); + } + catch (RuntimeException e) { + throw new TrinoException(ICEBERG_INVALID_METADATA, metadataLocation + " is not a valid metadata file", e); + } + + catalog.registerTable(clientSession, schemaTableName, tableLocation, metadataLocation); + } + + private static void validateMetadataFileName(String fileName) + { + String metadataFileName = fileName.trim(); + checkProcedureArgument(!metadataFileName.isEmpty(), "metadata_file_name cannot be empty when provided as an argument"); + checkProcedureArgument(!metadataFileName.contains("/"), "%s is not a valid metadata file", metadataFileName); + } + + /** + * Get the latest metadata file location present in location if metadataFileName is not provided, otherwise + * form the metadata file location using location and metadataFileName + */ + private static String getMetadataLocation(TrinoFileSystem fileSystem, String location, Optional metadataFileName) + { + return metadataFileName + .map(fileName -> format("%s/%s/%s", stripTrailingSlash(location), METADATA_FOLDER_NAME, fileName)) + .orElseGet(() -> getLatestMetadataLocation(fileSystem, location)); + } + + public static String getLatestMetadataLocation(TrinoFileSystem fileSystem, String location) + { + List latestMetadataLocations = new ArrayList<>(); + String metadataDirectoryLocation = format("%s/%s", stripTrailingSlash(location), METADATA_FOLDER_NAME); + try { + int latestMetadataVersion = -1; + FileIterator fileIterator = fileSystem.listFiles(metadataDirectoryLocation); + while (fileIterator.hasNext()) { + FileEntry fileEntry = fileIterator.next(); + if (fileEntry.path().contains(METADATA_FILE_EXTENSION)) { + OptionalInt version = parseVersion(fileEntry.path()); + if (version.isPresent()) { + int versionNumber = version.getAsInt(); + if (versionNumber > latestMetadataVersion) { + latestMetadataVersion = versionNumber; + latestMetadataLocations.clear(); + latestMetadataLocations.add(fileEntry.path()); + } + else if (versionNumber == latestMetadataVersion) { + latestMetadataLocations.add(fileEntry.path()); + } + } + } + } + if (latestMetadataLocations.isEmpty()) { + throw new TrinoException(ICEBERG_INVALID_METADATA, "No versioned metadata file exists at location: " + metadataDirectoryLocation); + } + if (latestMetadataLocations.size() > 1) { + throw new TrinoException(ICEBERG_INVALID_METADATA, format( + "More than one latest metadata file found at location: %s, latest metadata files are %s", + metadataDirectoryLocation, + latestMetadataLocations)); + } + } + catch (IOException e) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed checking table's location: " + location, e); + } + return getOnlyElement(latestMetadataLocations); + } + + private static void validateLocation(TrinoFileSystem fileSystem, String location) + { + try { + if (!fileSystem.newInputFile(location).exists()) { + throw new TrinoException(GENERIC_USER_ERROR, format("Location %s does not exist", location)); + } + } + catch (IOException e) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, format("Invalid location: %s", location), e); + } + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java index 4fe0c24ab673..023643c044d7 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorSmokeTest.java @@ -28,6 +28,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.concurrent.MoreFutures.getFutureValue; +import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newFixedThreadPool; @@ -142,4 +143,182 @@ public void testDeleteRowsConcurrently() executor.awaitTermination(10, SECONDS); } } + + @Test + public void testRegisterTableWithTableLocation() + { + String tableName = "test_register_table_with_table_location_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean)", tableName)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate("CALL system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')"); + + assertThat(query(format("SELECT * FROM %s", tableName))) + .matches("VALUES " + + "ROW(INT '1', VARCHAR 'INDIA', BOOLEAN 'true'), " + + "ROW(INT '2', VARCHAR 'USA', BOOLEAN 'false')"); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + @Test + public void testRegisterTableWithComments() + { + String tableName = "test_register_table_with_comments_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean)", tableName)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("COMMENT ON TABLE %s is 'my-table-comment'", tableName)); + assertUpdate(format("COMMENT ON COLUMN %s.a is 'a-comment'", tableName)); + assertUpdate(format("COMMENT ON COLUMN %s.b is 'b-comment'", tableName)); + assertUpdate(format("COMMENT ON COLUMN %s.c is 'c-comment'", tableName)); + + String tableLocation = getTableLocation(tableName); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate("CALL system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')"); + + assertThat(getTableComment(tableName)).isEqualTo("my-table-comment"); + assertThat(getColumnComment(tableName, "a")).isEqualTo("a-comment"); + assertThat(getColumnComment(tableName, "b")).isEqualTo("b-comment"); + assertThat(getColumnComment(tableName, "c")).isEqualTo("c-comment"); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + @Test + public void testRegisterTableWithShowCreateTable() + { + String tableName = "test_register_table_with_show_create_table_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean)", tableName)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + String showCreateTableOld = (String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue(); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate("CALL system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')"); + String showCreateTableNew = (String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue(); + + assertThat(showCreateTableOld).isEqualTo(showCreateTableNew); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + @Test + public void testRegisterTableWithReInsert() + { + String tableName = "test_register_table_with_re_insert_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean)", tableName)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate("CALL system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')"); + assertUpdate(format("INSERT INTO %s values(3, 'POLAND', true)", tableName), 1); + + assertThat(query(format("SELECT * FROM %s", tableName))) + .matches("VALUES " + + "ROW(INT '1', VARCHAR 'INDIA', BOOLEAN 'true'), " + + "ROW(INT '2', VARCHAR 'USA', BOOLEAN 'false'), " + + "ROW(INT '3', VARCHAR 'POLAND', BOOLEAN 'true')"); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + @Test + public void testRegisterTableWithDroppedTable() + { + String tableName = "test_register_table_with_dropped_table_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean)", tableName)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + String tableNameNew = tableName + "_new"; + // Drop table to verify register_table call fails when no metadata can be found (table doesn't exist) + assertUpdate(format("DROP TABLE %s", tableName)); + + assertQueryFails(format("CALL system.register_table (CURRENT_SCHEMA, '%s', '%s')", tableNameNew, tableLocation), + ".*No versioned metadata file exists at location.*"); + } + + @Test + public void testRegisterTableWithDifferentTableName() + { + String tableName = "test_register_table_with_different_table_name_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean)", tableName)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + String tableNameNew = tableName + "_new"; + // Drop table from glue metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate(format("CALL system.register_table (CURRENT_SCHEMA, '%s', '%s')", tableNameNew, tableLocation)); + assertUpdate(format("INSERT INTO %s values(3, 'POLAND', true)", tableNameNew), 1); + + assertThat(query(format("SELECT * FROM %s", tableNameNew))) + .matches("VALUES " + + "ROW(INT '1', VARCHAR 'INDIA', BOOLEAN 'true'), " + + "ROW(INT '2', VARCHAR 'USA', BOOLEAN 'false'), " + + "ROW(INT '3', VARCHAR 'POLAND', BOOLEAN 'true')"); + assertUpdate(format("DROP TABLE %s", tableNameNew)); + } + + @Test + public void testRegisterTableWithMetadataFile() + { + String tableName = "test_register_table_with_metadata_file_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean)", tableName)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + String metadataLocation = getMetadataLocation(tableName); + String metadataFileName = metadataLocation.substring(metadataLocation.lastIndexOf("/") + 1); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "', '" + metadataFileName + "')"); + assertUpdate(format("INSERT INTO %s values(3, 'POLAND', true)", tableName), 1); + + assertThat(query(format("SELECT * FROM %s", tableName))) + .matches("VALUES " + + "ROW(INT '1', VARCHAR 'INDIA', BOOLEAN 'true'), " + + "ROW(INT '2', VARCHAR 'USA', BOOLEAN 'false'), " + + "ROW(INT '3', VARCHAR 'POLAND', BOOLEAN 'true')"); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + private String getTableLocation(String tableName) + { + return (String) computeScalar("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*/[^/]*$', '') FROM " + tableName); + } + + protected String getTableComment(String tableName) + { + return (String) computeScalar("SELECT comment FROM system.metadata.table_comments WHERE catalog_name = 'iceberg' AND schema_name = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "'"); + } + + protected String getColumnComment(String tableName, String columnName) + { + return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_schema = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'"); + } + + protected abstract void dropTableFromMetastore(String tableName); + + protected abstract String getMetadataLocation(String tableName); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java index 421044eb472b..98221a8f2748 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java @@ -17,6 +17,8 @@ import io.minio.messages.Event; import io.trino.Session; import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore; import io.trino.testing.QueryRunner; import org.apache.iceberg.FileFormat; import org.intellij.lang.annotations.Language; @@ -29,6 +31,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; import static io.trino.plugin.hive.containers.HiveMinioDataLake.MINIO_ACCESS_KEY; import static io.trino.plugin.hive.containers.HiveMinioDataLake.MINIO_SECRET_KEY; import static io.trino.testing.sql.TestTable.randomTableSuffix; @@ -70,6 +73,7 @@ protected QueryRunner createQueryRunner() .put("hive.s3.endpoint", "http://" + hiveMinioDataLake.getMinio().getMinioApiEndpoint()) .put("hive.s3.path-style-access", "true") .put("hive.s3.streaming.part-size", "5MB") + .put("iceberg.register-table-procedure.enabled", "true") .buildOrThrow()) .setSchemaInitializer( SchemaInitializer.builder() @@ -206,4 +210,27 @@ private List getSnapshotIds(String tableName) .map(Long.class::cast) .collect(toImmutableList()); } + + @Override + protected void dropTableFromMetastore(String tableName) + { + HiveMetastore metastore = new BridgingHiveMetastore( + testingThriftHiveMetastoreBuilder() + .metastoreClient(hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint()) + .build()); + metastore.dropTable(schemaName, tableName, false); + assertThat(metastore.getTable(schemaName, tableName)).isEmpty(); + } + + @Override + protected String getMetadataLocation(String tableName) + { + HiveMetastore metastore = new BridgingHiveMetastore( + testingThriftHiveMetastoreBuilder() + .metastoreClient(hiveMinioDataLake.getHiveHadoop().getHiveMetastoreEndpoint()) + .build()); + return metastore + .getTable(schemaName, tableName).orElseThrow() + .getParameters().get("metadata_location"); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index d1f9f122d17f..99cf4877c104 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -58,7 +58,8 @@ public void testDefaults() .setDeleteSchemaLocationsFallback(false) .setTargetMaxFileSize(DataSize.of(1, GIGABYTE)) .setMinimumAssignedSplitWeight(0.05) - .setMaterializedViewsStorageSchema(null)); + .setMaterializedViewsStorageSchema(null) + .setRegisterTableProcedureEnabled(false)); } @Test @@ -83,6 +84,7 @@ public void testExplicitPropertyMappings() .put("iceberg.target-max-file-size", "1MB") .put("iceberg.minimum-assigned-split-weight", "0.01") .put("iceberg.materialized-views.storage-schema", "mv_storage_schema") + .put("iceberg.register-table-procedure.enabled", "true") .buildOrThrow(); IcebergConfig expected = new IcebergConfig() @@ -103,7 +105,8 @@ public void testExplicitPropertyMappings() .setDeleteSchemaLocationsFallback(true) .setTargetMaxFileSize(DataSize.of(1, MEGABYTE)) .setMinimumAssignedSplitWeight(0.01) - .setMaterializedViewsStorageSchema("mv_storage_schema"); + .setMaterializedViewsStorageSchema("mv_storage_schema") + .setRegisterTableProcedureEnabled(true); assertFullMapping(properties, expected); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java index 95a6660d3b71..8d9dfea143c3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConnectorSmokeTest.java @@ -13,15 +13,29 @@ */ package io.trino.plugin.iceberg; +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.testing.QueryRunner; +import org.testng.annotations.AfterClass; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; import static org.apache.iceberg.FileFormat.ORC; +import static org.assertj.core.api.Assertions.assertThat; // Redundant over TestIcebergOrcConnectorTest, but exists to exercise BaseConnectorSmokeTest // Some features like materialized views may be supported by Iceberg only. public class TestIcebergConnectorSmokeTest extends BaseIcebergConnectorSmokeTest { + private HiveMetastore metastore; + private File metastoreDir; + public TestIcebergConnectorSmokeTest() { super(ORC); @@ -31,8 +45,35 @@ public TestIcebergConnectorSmokeTest() protected QueryRunner createQueryRunner() throws Exception { + this.metastoreDir = Files.createTempDirectory("test_iceberg_table_smoke_test").toFile(); + this.metastoreDir.deleteOnExit(); + this.metastore = createTestingFileHiveMetastore(metastoreDir); return IcebergQueryRunner.builder() .setInitialTables(REQUIRED_TPCH_TABLES) + .setMetastoreDirectory(metastoreDir) + .setIcebergProperties(ImmutableMap.of("iceberg.register-table-procedure.enabled", "true")) .build(); } + + @AfterClass(alwaysRun = true) + public void tearDown() + throws IOException + { + deleteRecursively(metastoreDir.toPath(), ALLOW_INSECURE); + } + + @Override + protected void dropTableFromMetastore(String tableName) + { + metastore.dropTable(getSession().getSchema().orElseThrow(), tableName, false); + assertThat(metastore.getTable(getSession().getSchema().orElseThrow(), tableName)).as("Table in metastore should be dropped").isEmpty(); + } + + @Override + protected String getMetadataLocation(String tableName) + { + return metastore + .getTable(getSession().getSchema().orElseThrow(), tableName).orElseThrow() + .getParameters().get("metadata_location"); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergDisabledRegisterTableProcedure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergDisabledRegisterTableProcedure.java new file mode 100644 index 000000000000..ecdc95bdc158 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergDisabledRegisterTableProcedure.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.testng.annotations.Test; + +public class TestIcebergDisabledRegisterTableProcedure + extends AbstractTestQueryFramework + +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .build(); + } + + @Test + public void testDisabledRegisterTableProcedure() + { + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, 'test_table', '/var/test/location/test_table/')", + "register_table procedure is disabled"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java index 5f52aaee1c27..9b0844b8470e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergGcsConnectorSmokeTest.java @@ -20,6 +20,8 @@ import io.trino.plugin.hive.containers.HiveHadoop; import io.trino.plugin.hive.gcs.GoogleGcsConfigurationInitializer; import io.trino.plugin.hive.gcs.HiveGcsConfig; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore; import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorBehavior; import org.apache.hadoop.conf.Configuration; @@ -40,13 +42,14 @@ import java.nio.file.attribute.PosixFilePermissions; import java.util.Base64; +import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; import static io.trino.plugin.hive.containers.HiveHadoop.HIVE3_IMAGE; -import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_RENAME_SCHEMA; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.FileFormat.ORC; +import static org.assertj.core.api.Assertions.assertThat; public class TestIcebergGcsConnectorSmokeTest extends BaseIcebergConnectorSmokeTest @@ -140,6 +143,7 @@ protected QueryRunner createQueryRunner() .put("hive.gcs.json-key-file-path", gcpCredentialsFile.toAbsolutePath().toString()) .put("hive.metastore.uri", "thrift://" + hiveHadoop.getHiveMetastoreEndpoint()) .put("iceberg.file-format", format.name()) + .put("iceberg.register-table-procedure.enabled", "true") .buildOrThrow()) .setSchemaInitializer( SchemaInitializer.builder() @@ -170,4 +174,27 @@ public void testRenameSchema() format("ALTER SCHEMA %s RENAME TO %s", schemaName, schemaName + randomTableSuffix()), "Hive metastore does not support renaming schemas"); } + + @Override + protected void dropTableFromMetastore(String tableName) + { + HiveMetastore metastore = new BridgingHiveMetastore( + testingThriftHiveMetastoreBuilder() + .metastoreClient(hiveHadoop.getHiveMetastoreEndpoint()) + .build()); + metastore.dropTable(schema, tableName, false); + assertThat(metastore.getTable(schema, tableName)).isEmpty(); + } + + @Override + protected String getMetadataLocation(String tableName) + { + HiveMetastore metastore = new BridgingHiveMetastore( + testingThriftHiveMetastoreBuilder() + .metastoreClient(hiveHadoop.getHiveMetastoreEndpoint()) + .build()); + return metastore + .getTable(schema, tableName).orElseThrow() + .getParameters().get("metadata_location"); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergRegisterTableProcedure.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergRegisterTableProcedure.java new file mode 100644 index 000000000000..c732bb71517c --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergRegisterTableProcedure.java @@ -0,0 +1,417 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.hdfs.HdfsFileSystemFactory; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorSession; +import org.testng.annotations.AfterClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Stream; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; +import static io.trino.plugin.iceberg.IcebergUtil.METADATA_FOLDER_NAME; +import static io.trino.plugin.iceberg.procedure.RegisterTableProcedure.getLatestMetadataLocation; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static java.lang.String.format; +import static java.util.Locale.ENGLISH; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestIcebergRegisterTableProcedure + extends AbstractTestQueryFramework +{ + private HiveMetastore metastore; + private File metastoreDir; + private TrinoFileSystem fileSystem; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + metastoreDir = Files.createTempDirectory("test_iceberg_register_table").toFile(); + metastoreDir.deleteOnExit(); + metastore = createTestingFileHiveMetastore(metastoreDir); + fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT).create(TestingConnectorSession.SESSION); + return IcebergQueryRunner.builder() + .setMetastoreDirectory(metastoreDir) + .setIcebergProperties(ImmutableMap.of("iceberg.register-table-procedure.enabled", "true")) + .build(); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + throws IOException + { + deleteRecursively(metastoreDir.toPath(), ALLOW_INSECURE); + } + + @DataProvider + public static Object[][] fileFormats() + { + return Stream.of(IcebergFileFormat.values()) + .map(icebergFileFormat -> new Object[] {icebergFileFormat}) + .toArray(Object[][]::new); + } + + @Test(dataProvider = "fileFormats") + public void testRegisterTableWithTableLocation(IcebergFileFormat icebergFileFormat) + { + String tableName = "test_register_table_with_table_location_" + icebergFileFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", tableName, icebergFileFormat)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')"); + + assertThat(query(format("SELECT * FROM %s", tableName))) + .matches("VALUES " + + "ROW(INT '1', VARCHAR 'INDIA', BOOLEAN 'true'), " + + "ROW(INT '2', VARCHAR 'USA', BOOLEAN 'false')"); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + @Test(dataProvider = "fileFormats") + public void testRegisterTableWithComments(IcebergFileFormat icebergFileFormat) + { + String tableName = "test_register_table_with_comments_" + icebergFileFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", tableName, icebergFileFormat)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + assertUpdate(format("COMMENT ON TABLE %s is 'my-table-comment'", tableName)); + assertUpdate(format("COMMENT ON COLUMN %s.a is 'a-comment'", tableName)); + assertUpdate(format("COMMENT ON COLUMN %s.b is 'b-comment'", tableName)); + assertUpdate(format("COMMENT ON COLUMN %s.c is 'c-comment'", tableName)); + + String tableLocation = getTableLocation(tableName); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')"); + + assertThat(getTableComment(tableName)).isEqualTo("my-table-comment"); + assertThat(getColumnComment(tableName, "a")).isEqualTo("a-comment"); + assertThat(getColumnComment(tableName, "b")).isEqualTo("b-comment"); + assertThat(getColumnComment(tableName, "c")).isEqualTo("c-comment"); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + @Test(dataProvider = "fileFormats") + public void testRegisterTableWithShowCreateTable(IcebergFileFormat icebergFileFormat) + { + String tableName = "test_register_table_with_show_create_table_" + icebergFileFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", tableName, icebergFileFormat)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + String showCreateTableOld = (String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue(); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate("CALL system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')"); + String showCreateTableNew = (String) computeActual("SHOW CREATE TABLE " + tableName).getOnlyValue(); + + assertThat(showCreateTableOld).isEqualTo(showCreateTableNew); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + @Test(dataProvider = "fileFormats") + public void testRegisterTableWithReInsert(IcebergFileFormat icebergFileFormat) + { + String tableName = "test_register_table_with_re_insert_" + icebergFileFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", tableName, icebergFileFormat)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate("CALL system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')"); + assertUpdate(format("INSERT INTO %s values(3, 'POLAND', true)", tableName), 1); + + assertThat(query(format("SELECT * FROM %s", tableName))) + .matches("VALUES " + + "ROW(INT '1', VARCHAR 'INDIA', BOOLEAN 'true'), " + + "ROW(INT '2', VARCHAR 'USA', BOOLEAN 'false'), " + + "ROW(INT '3', VARCHAR 'POLAND', BOOLEAN 'true')"); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + @Test(dataProvider = "fileFormats") + public void testRegisterTableWithDroppedTable(IcebergFileFormat icebergFileFormat) + { + String tableName = "test_register_table_with_dropped_table_" + icebergFileFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", tableName, icebergFileFormat)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + String tableNameNew = tableName + "_new"; + // Drop table to verify register_table call fails when no metadata can be found (table doesn't exist) + assertUpdate(format("DROP TABLE %s", tableName)); + + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableNameNew + "', '" + tableLocation + "')", + ".*No versioned metadata file exists at location.*"); + } + + @Test(dataProvider = "fileFormats") + public void testRegisterTableWithDifferentTableName(IcebergFileFormat icebergFileFormat) + { + String tableName = "test_register_table_with_different_table_name_old_" + icebergFileFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", tableName, icebergFileFormat)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + String tableNameNew = tableName + "_new"; + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableNameNew + "', '" + tableLocation + "')"); + assertUpdate(format("INSERT INTO %s values(3, 'POLAND', true)", tableNameNew), 1); + + assertThat(query(format("SELECT * FROM %s", tableNameNew))) + .matches("VALUES " + + "ROW(INT '1', VARCHAR 'INDIA', BOOLEAN 'true'), " + + "ROW(INT '2', VARCHAR 'USA', BOOLEAN 'false'), " + + "ROW(INT '3', VARCHAR 'POLAND', BOOLEAN 'true')"); + assertUpdate(format("DROP TABLE %s", tableNameNew)); + } + + @Test(dataProvider = "fileFormats") + public void testRegisterTableWithMetadataFile(IcebergFileFormat icebergFileFormat) + { + String tableName = "test_register_table_with_metadata_file_" + icebergFileFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", tableName, icebergFileFormat)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + String metadataLocation = getLatestMetadataLocation(fileSystem, tableLocation); + String metadataFileName = metadataLocation.substring(metadataLocation.lastIndexOf("/") + 1); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(tableName); + + assertUpdate("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "', '" + metadataFileName + "')"); + assertUpdate(format("INSERT INTO %s values(3, 'POLAND', true)", tableName), 1); + + assertThat(query(format("SELECT * FROM %s", tableName))) + .matches("VALUES " + + "ROW(INT '1', VARCHAR 'INDIA', BOOLEAN 'true'), " + + "ROW(INT '2', VARCHAR 'USA', BOOLEAN 'false'), " + + "ROW(INT '3', VARCHAR 'POLAND', BOOLEAN 'true')"); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + @Test + public void testRegisterTableWithNoMetadataFile() + throws IOException + { + IcebergFileFormat icebergFileFormat = IcebergFileFormat.ORC; + String tableName = "test_register_table_with_no_metadata_file_" + icebergFileFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", tableName, icebergFileFormat)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + String tableNameNew = tableName + "_new"; + // Delete files under metadata directory to verify register_table call fails + deleteRecursively(Path.of(tableLocation, METADATA_FOLDER_NAME), ALLOW_INSECURE); + + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableNameNew + "', '" + tableLocation + "')", + ".*No versioned metadata file exists at location.*"); + dropTableFromMetastore(tableName); + deleteRecursively(Path.of(tableLocation), ALLOW_INSECURE); + } + + @Test + public void testRegisterTableWithInvalidMetadataFile() + throws IOException + { + String tableName = "test_register_table_with_invalid_metadata_file_" + randomTableSuffix(); + + assertUpdate(format("CREATE TABLE %s (a int, b varchar, c boolean)", tableName)); + assertUpdate(format("INSERT INTO %s values(1, 'INDIA', true)", tableName), 1); + assertUpdate(format("INSERT INTO %s values(2, 'USA', false)", tableName), 1); + + String tableLocation = getTableLocation(tableName); + String tableNameNew = tableName + "_new"; + String metadataDirectoryLocation = format("%s/%s", tableLocation, METADATA_FOLDER_NAME); + FileIterator fileIterator = fileSystem.listFiles(metadataDirectoryLocation); + // Find one invalid metadata file inside metadata folder + String invalidMetadataFileName = "invalid-default.avro"; + while (fileIterator.hasNext()) { + FileEntry fileEntry = fileIterator.next(); + if (fileEntry.path().endsWith(".avro")) { + String file = fileEntry.path(); + invalidMetadataFileName = file.substring(file.lastIndexOf("/") + 1); + break; + } + } + + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableNameNew + "', '" + tableLocation + "', '" + invalidMetadataFileName + "')", + ".*is not a valid metadata file.*"); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + @Test + public void testRegisterTableWithNonExistingTableLocation() + { + String tableName = "test_register_table_with_non_existing_table_location_" + randomTableSuffix(); + String tableLocation = "/test/iceberg/hive/warehouse/orders_5-581fad8517934af6be1857a903559d44"; + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')", + ".*No versioned metadata file exists at location.*"); + } + + @Test + public void testRegisterTableWithNonExistingMetadataFile() + { + String tableName = "test_register_table_with_non_existing_metadata_file_" + randomTableSuffix(); + String nonExistingMetadataFileName = "00003-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json"; + String tableLocation = "/test/iceberg/hive/warehouse/orders_5-581fad8517934af6be1857a903559d44"; + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "', '" + nonExistingMetadataFileName + "')", + ".*Location (.*) does not exist.*"); + } + + @Test + public void testRegisterTableWithNonExistingSchema() + { + String tableLocation = "/test/iceberg/hive/orders_5-581fad8517934af6be1857a903559d44"; + assertQueryFails("CALL iceberg.system.register_table ('invalid_schema', 'test_table', '" + tableLocation + "')", + ".*Schema '(.*)' does not exist.*"); + } + + @Test + public void testRegisterTableWithExistingTable() + { + String tableName = "test_register_table_with_existing_table_" + randomTableSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a int, b varchar, c boolean)"); + assertUpdate("INSERT INTO " + tableName + " values(1, 'INDIA', true)", 1); + String tableLocation = getTableLocation(tableName); + + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')", + ".*Table already exists.*"); + assertUpdate(format("DROP TABLE %s", tableName)); + } + + @Test + public void testRegisterTableWithInvalidURIScheme() + { + String tableName = "test_register_table_with_invalid_uri_scheme_" + randomTableSuffix(); + String nonExistedMetadataFileName = "00003-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json"; + String tableLocation = "invalid://hadoop-master:9000/test/iceberg/hive/orders_5-581fad8517934af6be1857a903559d44"; + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "', '" + nonExistedMetadataFileName + "')", + ".*Invalid location:.*"); + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "')", + ".*Failed checking table's location:.*"); + } + + @Test + public void testRegisterTableWithInvalidParameter() + { + String tableName = "test_register_table_with_invalid_parameter_" + randomTableSuffix(); + String tableLocation = "/test/iceberg/hive/table1/"; + + assertQueryFails(format("CALL iceberg.system.register_table (CURRENT_SCHEMA, '%s')", tableName), + ".*'TABLE_LOCATION' is missing.*"); + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA)", + ".*'TABLE_NAME' is missing.*"); + assertQueryFails("CALL iceberg.system.register_table ()", + ".*'SCHEMA_NAME' is missing.*"); + + assertQueryFails("CALL iceberg.system.register_table (null, null, null)", + ".*schema_name cannot be null or empty.*"); + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, null, null)", + ".*table_name cannot be null or empty.*"); + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', null)", + ".*table_location cannot be null or empty.*"); + + assertQueryFails("CALL iceberg.system.register_table ('', '" + tableName + "', '" + tableLocation + "')", + ".*schema_name cannot be null or empty.*"); + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '', '" + tableLocation + "')", + ".*table_name cannot be null or empty.*"); + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '')", + ".*table_location cannot be null or empty.*"); + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "', '')", + ".*metadata_file_name cannot be empty when provided as an argument.*"); + } + + @Test + public void testRegisterTableWithInvalidMetadataFileName() + { + String tableName = "test_register_table_with_invalid_metadata_file_name_" + randomTableSuffix(); + String tableLocation = "/test/iceberg/hive"; + + String[] invalidMetadataFileNames = { + "/", + "../", + "../../", + "../../somefile.metadata.json", + }; + + for (String invalidMetadataFileName : invalidMetadataFileNames) { + assertQueryFails("CALL iceberg.system.register_table (CURRENT_SCHEMA, '" + tableName + "', '" + tableLocation + "', '" + invalidMetadataFileName + "')", + ".*is not a valid metadata file.*"); + } + } + + private String getTableLocation(String tableName) + { + return (String) computeScalar("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*/[^/]*$', '') FROM " + tableName); + } + + private void dropTableFromMetastore(String tableName) + { + metastore.dropTable(getSession().getSchema().orElseThrow(), tableName, false); + assertThat(metastore.getTable(getSession().getSchema().orElseThrow(), tableName)).as("Table in metastore should be dropped").isEmpty(); + } + + private String getTableComment(String tableName) + { + return (String) computeScalar("SELECT comment FROM system.metadata.table_comments WHERE catalog_name = 'iceberg' AND schema_name = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "'"); + } + + private String getColumnComment(String tableName, String columnName) + { + return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_schema = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'"); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergUtil.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergUtil.java new file mode 100644 index 000000000000..b7afd8563ae7 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergUtil.java @@ -0,0 +1,47 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import org.testng.annotations.Test; + +import java.util.OptionalInt; + +import static io.trino.plugin.iceberg.IcebergUtil.parseVersion; +import static io.trino.testing.assertions.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class TestIcebergUtil +{ + @Test + public void testParseVersion() + { + assertEquals(parseVersion("hdfs://hadoop-master:9000/user/hive/warehouse/orders_5-581fad8517934af6be1857a903559d44/metadata/00000-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json"), OptionalInt.of(0)); + assertEquals(parseVersion("hdfs://hadoop-master:9000/user/hive/warehouse/orders_5-581fad8517934af6be1857a903559d44/metadata/99999-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json"), OptionalInt.of(99999)); + assertEquals(parseVersion("s3://krvikash-test/test_icerberg_util/orders_93p93eniuw-30fa27a68c734c2bafac881e905351a9/metadata/00010-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json"), OptionalInt.of(10)); + assertEquals(parseVersion("/var/test/test_icerberg_util/orders_93p93eniuw-30fa27a68c734c2bafac881e905351a9/metadata/00011-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json"), OptionalInt.of(11)); + + assertThatThrownBy(() -> parseVersion("00010-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json")) + .hasMessageMatching(".*Invalid metadata location: .*"); + assertThatThrownBy(() -> parseVersion("hdfs://hadoop-master:9000/user/hive/warehouse/orders_5_581fad8517934af6be1857a903559d44")) + .hasMessageMatching(".*Invalid metadata location: .*"); + assertThatThrownBy(() -> parseVersion("hdfs://hadoop-master:9000/user/hive/warehouse/orders_5-581fad8517934af6be1857a903559d44/metadata")) + .hasMessageMatching(".*Invalid metadata location:.*"); + assertThatThrownBy(() -> parseVersion("s3://krvikash-test/test_icerberg_util/orders_93p93eniuw-30fa27a68c734c2bafac881e905351a9/metadata/00010_409702ba_4735_4645_8f14_09537cc0b2c8.metadata.json")) + .hasMessageMatching(".*Invalid metadata location:.*"); + assertThatThrownBy(() -> parseVersion("orders_5_581fad8517934af6be1857a903559d44")).hasMessageMatching(".*Invalid metadata location:.*"); + + assertEquals(parseVersion("hdfs://hadoop-master:9000/user/hive/warehouse/orders_5-581fad8517934af6be1857a903559d44/metadata/00003_409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json"), OptionalInt.empty()); + assertEquals(parseVersion("/var/test/test_icerberg_util/orders_93p93eniuw-30fa27a68c734c2bafac881e905351a9/metadata/-00010-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json"), OptionalInt.empty()); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java index 87ff37c5d499..f1c3bb85ae0e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java @@ -13,6 +13,10 @@ */ package io.trino.plugin.iceberg.catalog.glue; +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder; +import com.amazonaws.services.glue.model.DeleteTableRequest; +import com.amazonaws.services.glue.model.GetTableRequest; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.DeleteObjectsRequest; @@ -51,6 +55,7 @@ public class TestIcebergGlueCatalogConnectorSmokeTest { private final String bucketName; private final String schemaName; + private final AWSGlueAsync glueClient; @Parameters("s3.bucket") public TestIcebergGlueCatalogConnectorSmokeTest(String bucketName) @@ -58,6 +63,7 @@ public TestIcebergGlueCatalogConnectorSmokeTest(String bucketName) super(FileFormat.PARQUET); this.bucketName = requireNonNull(bucketName, "bucketName is null"); this.schemaName = "test_iceberg_smoke_" + randomTableSuffix(); + glueClient = AWSGlueAsyncClientBuilder.defaultClient(); } @Override @@ -68,7 +74,8 @@ protected QueryRunner createQueryRunner() .setIcebergProperties( ImmutableMap.of( "iceberg.catalog.type", "glue", - "hive.metastore.glue.default-warehouse-dir", schemaPath())) + "hive.metastore.glue.default-warehouse-dir", schemaPath(), + "iceberg.register-table-procedure.enabled", "true")) .setSchemaInitializer( SchemaInitializer.builder() .withClonedTpchTables(REQUIRED_TPCH_TABLES) @@ -184,14 +191,30 @@ public void testCommentViewColumn() } } - private String getTableComment(String tableName) + @Override + protected void dropTableFromMetastore(String tableName) { - return (String) computeScalar("SELECT comment FROM system.metadata.table_comments WHERE catalog_name = 'iceberg' AND schema_name = '" + schemaName + "' AND table_name = '" + tableName + "'"); + DeleteTableRequest deleteTableRequest = new DeleteTableRequest() + .withDatabaseName(schemaName) + .withName(tableName); + glueClient.deleteTable(deleteTableRequest); + GetTableRequest getTableRequest = new GetTableRequest() + .withDatabaseName(schemaName) + .withName(tableName); + assertThatThrownBy(() -> glueClient.getTable(getTableRequest)) + .as("Table in metastore should not exist") + .hasMessageMatching(".*Table (.*) not found.*"); } - protected String getColumnComment(String tableName, String columnName) + @Override + protected String getMetadataLocation(String tableName) { - return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_schema = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'"); + GetTableRequest getTableRequest = new GetTableRequest() + .withDatabaseName(schemaName) + .withName(tableName); + return glueClient.getTable(getTableRequest) + .getTable() + .getParameters().get("metadata_location"); } private String schemaPath() diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/hadoop/iceberg.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/hadoop/iceberg.properties index 7ec547c05f48..87969a44c64e 100644 --- a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/hadoop/iceberg.properties +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/common/hadoop/iceberg.properties @@ -2,3 +2,4 @@ connector.name=iceberg hive.metastore.uri=thrift://hadoop-master:9083 hive.config.resources=/docker/presto-product-tests/conf/presto/etc/hive-default-fs-site.xml iceberg.file-format=PARQUET +iceberg.register-table-procedure.enabled=true diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg/iceberg.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg/iceberg.properties index 7ce5303954a3..d474be71dcd4 100644 --- a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg/iceberg.properties +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg/iceberg.properties @@ -1,2 +1,3 @@ connector.name=iceberg hive.metastore.uri=thrift://hadoop-master:9083 +iceberg.register-table-procedure.enabled=true diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java index 556040f57371..c733ea778d1b 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java @@ -17,12 +17,16 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; import io.airlift.concurrent.MoreFutures; +import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreClient; +import io.trino.tempto.BeforeTestWithContext; import io.trino.tempto.ProductTest; import io.trino.tempto.hadoop.hdfs.HdfsClient; import io.trino.tempto.query.QueryExecutionException; import io.trino.tempto.query.QueryExecutor; import io.trino.tempto.query.QueryResult; import io.trino.tests.product.hive.Engine; +import io.trino.tests.product.hive.TestHiveMetastoreClientFactory; +import org.apache.thrift.TException; import org.assertj.core.api.Assertions; import org.testng.SkipException; import org.testng.annotations.DataProvider; @@ -63,6 +67,7 @@ import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.CreateMode.CREATE_TABLE_WITH_NO_DATA_AND_INSERT; import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.StorageFormat.AVRO; import static io.trino.tests.product.iceberg.util.IcebergTestUtils.getTableLocation; +import static io.trino.tests.product.iceberg.util.IcebergTestUtils.stripNamenodeURI; import static io.trino.tests.product.utils.QueryExecutors.onSpark; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -70,6 +75,7 @@ import static java.util.Locale.ENGLISH; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toUnmodifiableSet; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -81,6 +87,16 @@ public class TestIcebergSparkCompatibility { @Inject private HdfsClient hdfsClient; + @Inject + private TestHiveMetastoreClientFactory testHiveMetastoreClientFactory; + private ThriftMetastoreClient metastoreClient; + + @BeforeTestWithContext + public void setup() + throws TException + { + metastoreClient = testHiveMetastoreClientFactory.createMetastoreClient(); + } // see spark-defaults.conf private static final String SPARK_CATALOG = "iceberg_test"; @@ -2122,7 +2138,7 @@ public void testMetadataCompressionCodecGzip() assertThat(onTrino().executeQuery("SELECT * FROM " + trinoTableName)).containsOnly(row(1), row(2)); // Verify that all metadata file is compressed as Gzip - String tableLocation = getTableLocation(trinoTableName); + String tableLocation = stripNamenodeURI(getTableLocation(trinoTableName)); List metadataFiles = hdfsClient.listDirectory(tableLocation + "/metadata").stream() .filter(file -> file.endsWith("metadata.json")) .collect(toImmutableList()); @@ -2202,6 +2218,193 @@ public void testTrinoAnalyzeWithNonLowercaseColumnName() onSpark().executeQuery("DROP TABLE " + sparkTableName); } + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testRegisterTableWithTableLocation(StorageFormat storageFormat) + throws TException + { + String baseTableName = "test_register_table_with_table_location_" + storageFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onSpark().executeQuery(format("CREATE TABLE %s (a INT, b STRING, c BOOLEAN) USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s')", sparkTableName, storageFormat)); + onSpark().executeQuery(format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName)); + onTrino().executeQuery(format("INSERT INTO %s values(2, 'USA', false)", trinoTableName)); + + List expected = List.of(row(1, "INDIA", true), row(2, "USA", false)); + String tableLocation = getTableLocation(trinoTableName); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(baseTableName); + + onTrino().executeQuery(format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableName, tableLocation)); + + assertThat(onTrino().executeQuery(format("SELECT * FROM %s", trinoTableName))).containsOnly(expected); + assertThat(onSpark().executeQuery(format("SELECT * FROM %s", sparkTableName))).containsOnly(expected); + onTrino().executeQuery(format("DROP TABLE %s", trinoTableName)); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testRegisterTableWithComments(StorageFormat storageFormat) + throws TException + { + String baseTableName = "test_register_table_with_comments_" + storageFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onTrino().executeQuery(format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", trinoTableName, storageFormat)); + onSpark().executeQuery(format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName)); + onTrino().executeQuery(format("INSERT INTO %s values(2, 'USA', false)", trinoTableName)); + onTrino().executeQuery(format("COMMENT ON TABLE %s is 'my-table-comment'", trinoTableName)); + onTrino().executeQuery(format("COMMENT ON COLUMN %s.a is 'a-comment'", trinoTableName)); + onTrino().executeQuery(format("COMMENT ON COLUMN %s.b is 'b-comment'", trinoTableName)); + onTrino().executeQuery(format("COMMENT ON COLUMN %s.c is 'c-comment'", trinoTableName)); + + String tableLocation = getTableLocation(trinoTableName); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(baseTableName); + + onTrino().executeQuery(format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableName, tableLocation)); + + Assertions.assertThat(getTableComment(baseTableName)).isEqualTo("my-table-comment"); + Assertions.assertThat(getColumnComment(baseTableName, "a")).isEqualTo("a-comment"); + Assertions.assertThat(getColumnComment(baseTableName, "b")).isEqualTo("b-comment"); + Assertions.assertThat(getColumnComment(baseTableName, "c")).isEqualTo("c-comment"); + onTrino().executeQuery(format("DROP TABLE %s", trinoTableName)); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testRegisterTableWithShowCreateTable(StorageFormat storageFormat) + throws TException + { + String baseTableName = "test_register_table_with_show_create_table_" + storageFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onSpark().executeQuery(format("CREATE TABLE %s (a INT, b STRING, c BOOLEAN) USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s')", sparkTableName, storageFormat)); + onSpark().executeQuery(format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName)); + onTrino().executeQuery(format("INSERT INTO %s values(2, 'USA', false)", trinoTableName)); + + QueryResult expectedDescribeTable = onSpark().executeQuery("DESCRIBE TABLE EXTENDED " + sparkTableName); + List expectedDescribeTableRows = expectedDescribeTable.rows().stream().map(columns -> row(columns.toArray())).collect(toImmutableList()); + + QueryResult expectedShowCreateTable = onTrino().executeQuery("SHOW CREATE TABLE " + trinoTableName); + List expectedShowCreateTableRows = expectedShowCreateTable.rows().stream().map(columns -> row(columns.toArray())).collect(toImmutableList()); + + String tableLocation = getTableLocation(trinoTableName); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(baseTableName); + + onTrino().executeQuery(format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableName, tableLocation)); + + QueryResult actualDescribeTable = onSpark().executeQuery("DESCRIBE TABLE EXTENDED " + sparkTableName); + QueryResult actualShowCreateTable = onTrino().executeQuery("SHOW CREATE TABLE " + trinoTableName); + + assertThat(actualDescribeTable).hasColumns(expectedDescribeTable.getColumnTypes()).containsExactlyInOrder(expectedDescribeTableRows); + assertThat(actualShowCreateTable).hasColumns(expectedShowCreateTable.getColumnTypes()).containsExactlyInOrder(expectedShowCreateTableRows); + onTrino().executeQuery(format("DROP TABLE %s", trinoTableName)); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testRegisterTableWithReInsert(StorageFormat storageFormat) + throws TException + { + String baseTableName = "test_register_table_with_re_insert_" + storageFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onTrino().executeQuery(format("CREATE TABLE %s (a int, b varchar, c boolean) with (format = '%s')", trinoTableName, storageFormat)); + onSpark().executeQuery(format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName)); + onTrino().executeQuery(format("INSERT INTO %s values(2, 'USA', false)", trinoTableName)); + + String tableLocation = getTableLocation(trinoTableName); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(baseTableName); + + onTrino().executeQuery(format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableName, tableLocation)); + onSpark().executeQuery(format("INSERT INTO %s values(3, 'POLAND', true)", sparkTableName)); + + List expected = List.of(row(1, "INDIA", true), row(2, "USA", false), row(3, "POLAND", true)); + + assertThat(onTrino().executeQuery(format("SELECT * FROM %s", trinoTableName))).containsOnly(expected); + assertThat(onSpark().executeQuery(format("SELECT * FROM %s", sparkTableName))).containsOnly(expected); + onTrino().executeQuery(format("DROP TABLE %s", trinoTableName)); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testRegisterTableWithDroppedTable(StorageFormat storageFormat) + { + String baseTableName = "test_register_table_with_dropped_table_" + storageFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onSpark().executeQuery(format("CREATE TABLE %s (a INT, b STRING, c BOOLEAN) USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s')", sparkTableName, storageFormat)); + onSpark().executeQuery(format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName)); + onTrino().executeQuery(format("INSERT INTO %s values(2, 'USA', false)", trinoTableName)); + + String tableLocation = getTableLocation(trinoTableName); + String baseTableNameNew = baseTableName + "_new"; + + // Drop table to verify register_table call fails when no metadata can be found (table doesn't exist) + onTrino().executeQuery(format("DROP TABLE %s", trinoTableName)); + + assertQueryFailure(() -> onTrino().executeQuery(format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableNameNew, tableLocation))) + .hasMessageMatching(".*No versioned metadata file exists at location.*"); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testRegisterTableWithDifferentTableName(StorageFormat storageFormat) + throws TException + { + String baseTableName = "test_register_table_with_different_table_name_" + storageFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onSpark().executeQuery(format("CREATE TABLE %s (a INT, b STRING, c BOOLEAN) USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s')", sparkTableName, storageFormat)); + onSpark().executeQuery(format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName)); + onTrino().executeQuery(format("INSERT INTO %s values(2, 'USA', false)", trinoTableName)); + + String tableLocation = getTableLocation(trinoTableName); + String baseTableNameNew = baseTableName + "_new"; + String trinoTableNameNew = trinoTableName(baseTableNameNew); + String sparkTableNameNew = sparkTableName(baseTableNameNew); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(baseTableName); + + onTrino().executeQuery(format("CALL iceberg.system.register_table ('%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableNameNew, tableLocation)); + onSpark().executeQuery(format("INSERT INTO %s values(3, 'POLAND', true)", sparkTableNameNew)); + List expected = List.of(row(1, "INDIA", true), row(2, "USA", false), row(3, "POLAND", true)); + + assertThat(onTrino().executeQuery(format("SELECT * FROM %s", trinoTableNameNew))).containsOnly(expected); + assertThat(onSpark().executeQuery(format("SELECT * FROM %s", sparkTableNameNew))).containsOnly(expected); + onTrino().executeQuery(format("DROP TABLE %s", trinoTableNameNew)); + } + + @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormats") + public void testRegisterTableWithMetadataFile(StorageFormat storageFormat) + throws TException + { + String baseTableName = "test_register_table_with_metadata_file_" + storageFormat.name().toLowerCase(ENGLISH) + "_" + randomTableSuffix(); + String trinoTableName = trinoTableName(baseTableName); + String sparkTableName = sparkTableName(baseTableName); + + onSpark().executeQuery(format("CREATE TABLE %s (a INT, b STRING, c BOOLEAN) USING ICEBERG TBLPROPERTIES ('write.format.default' = '%s')", sparkTableName, storageFormat)); + onSpark().executeQuery(format("INSERT INTO %s values(1, 'INDIA', true)", sparkTableName)); + onTrino().executeQuery(format("INSERT INTO %s values(2, 'USA', false)", trinoTableName)); + + String tableLocation = getTableLocation(trinoTableName); + String metadataLocation = metastoreClient.getTable(TEST_SCHEMA_NAME, baseTableName).getParameters().get("metadata_location"); + String metadataFileName = metadataLocation.substring(metadataLocation.lastIndexOf("/") + 1); + // Drop table from hive metastore and use the same table name to register again with the metadata + dropTableFromMetastore(baseTableName); + + onTrino().executeQuery(format("CALL iceberg.system.register_table ('%s', '%s', '%s', '%s')", TEST_SCHEMA_NAME, baseTableName, tableLocation, metadataFileName)); + onTrino().executeQuery(format("INSERT INTO %s values(3, 'POLAND', true)", trinoTableName)); + List expected = List.of(row(1, "INDIA", true), row(2, "USA", false), row(3, "POLAND", true)); + + assertThat(onTrino().executeQuery(format("SELECT * FROM %s", trinoTableName))).containsOnly(expected); + assertThat(onSpark().executeQuery(format("SELECT * FROM %s", sparkTableName))).containsOnly(expected); + onTrino().executeQuery(format("DROP TABLE %s", trinoTableName)); + } + private int calculateMetadataFilesForPartitionedTable(String tableName) { String dataFilePath = (String) onTrino().executeQuery(format("SELECT file_path FROM iceberg.default.\"%s$files\" limit 1", tableName)).getOnlyValue(); @@ -2211,4 +2414,21 @@ private int calculateMetadataFilesForPartitionedTable(String tableName) String metadataFolderPath = tableFolderPath + "/metadata"; return hdfsClient.listDirectory(URI.create(metadataFolderPath).getPath()).size(); } + + private void dropTableFromMetastore(String tableName) + throws TException + { + metastoreClient.dropTable(TEST_SCHEMA_NAME, tableName, false); + assertThatThrownBy(() -> metastoreClient.getTable(TEST_SCHEMA_NAME, tableName)).hasMessageContaining("table not found"); + } + + private String getTableComment(String tableName) + { + return (String) onTrino().executeQuery("SELECT comment FROM system.metadata.table_comments WHERE catalog_name = '" + TRINO_CATALOG + "' AND schema_name = '" + TEST_SCHEMA_NAME + "' AND table_name = '" + tableName + "'").getOnlyValue(); + } + + private String getColumnComment(String tableName, String columnName) + { + return (String) onTrino().executeQuery("SELECT comment FROM " + TRINO_CATALOG + ".information_schema.columns WHERE table_schema = '" + TEST_SCHEMA_NAME + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'").getOnlyValue(); + } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkDropTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkDropTableCompatibility.java index a11c5599ef49..74b30f058199 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkDropTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkDropTableCompatibility.java @@ -34,6 +34,7 @@ import static io.trino.tests.product.hive.Engine.TRINO; import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; import static io.trino.tests.product.iceberg.util.IcebergTestUtils.getTableLocation; +import static io.trino.tests.product.iceberg.util.IcebergTestUtils.stripNamenodeURI; import static io.trino.tests.product.utils.QueryExecutors.onSpark; import static io.trino.tests.product.utils.QueryExecutors.onTrino; import static java.lang.String.format; @@ -74,7 +75,7 @@ public void testCleanupOnDropTable(Engine tableCreatorEngine, Engine tableDroppe tableCreatorEngine.queryExecutor().executeQuery("CREATE TABLE " + tableName + "(col0 INT, col1 INT)"); onTrino().executeQuery("INSERT INTO " + tableName + " VALUES (1, 2)"); - String tableDirectory = getTableLocation(tableName); + String tableDirectory = stripNamenodeURI(getTableLocation(tableName)); assertFileExistence(tableDirectory, true, "The table directory exists after creating the table"); List dataFilePaths = getDataFilePaths(tableName); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/util/IcebergTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/util/IcebergTestUtils.java index 26161294c060..83501e8d05c0 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/util/IcebergTestUtils.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/util/IcebergTestUtils.java @@ -13,10 +13,8 @@ */ package io.trino.tests.product.iceberg.util; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.net.URI; -import static com.google.common.base.Verify.verify; import static io.trino.tests.product.utils.QueryExecutors.onTrino; public final class IcebergTestUtils @@ -25,13 +23,11 @@ private IcebergTestUtils() {} public static String getTableLocation(String tableName) { - Pattern locationPattern = Pattern.compile(".*location = 'hdfs://hadoop-master:9000(.*?)'.*", Pattern.DOTALL); - Matcher m = locationPattern.matcher((String) onTrino().executeQuery("SHOW CREATE TABLE " + tableName).getOnlyValue()); - if (m.find()) { - String location = m.group(1); - verify(!m.find(), "Unexpected second match"); - return location; - } - throw new IllegalStateException("Location not found in SHOW CREATE TABLE result"); + return (String) onTrino().executeQuery("SELECT DISTINCT regexp_replace(\"$path\", '/[^/]*/[^/]*$', '') FROM " + tableName).getOnlyValue(); + } + + public static String stripNamenodeURI(String location) + { + return URI.create(location).getPath(); } } diff --git a/testing/trino-server-dev/etc/catalog/iceberg.properties b/testing/trino-server-dev/etc/catalog/iceberg.properties index 440a9a4272f5..812fe34206fc 100644 --- a/testing/trino-server-dev/etc/catalog/iceberg.properties +++ b/testing/trino-server-dev/etc/catalog/iceberg.properties @@ -14,3 +14,4 @@ hive.hdfs.socks-proxy=localhost:1180 # Fail-fast in development hive.metastore.thrift.client.max-retry-time=1s +iceberg.register-table-procedure.enabled=true