diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/DefaultHiveTableRedirectionsProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/DefaultHiveTableRedirectionsProvider.java new file mode 100644 index 00000000000..d767e7f726f --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/DefaultHiveTableRedirectionsProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import io.trino.plugin.hive.metastore.Table; +import io.trino.spi.connector.CatalogSchemaTableName; +import io.trino.spi.connector.ConnectorSession; + +import java.util.Optional; + +import static io.trino.plugin.hive.HiveSessionProperties.getIcebergCatalogName; +import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable; + +public class DefaultHiveTableRedirectionsProvider + implements HiveTableRedirectionsProvider +{ + @Override + public Optional redirectTable(ConnectorSession session, Table table) + { + Optional targetCatalogName = getIcebergCatalogName(session); + if (targetCatalogName.isEmpty()) { + return Optional.empty(); + } + if (isIcebergTable(table)) { + return targetCatalogName.map(catalog -> new CatalogSchemaTableName(catalog, table.getSchemaTableName())); + } + return Optional.empty(); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java index 634ff350ad8..34f17235e81 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java @@ -153,6 +153,8 @@ public class HiveConfig private boolean optimizeSymlinkListing = true; private boolean legacyHiveViewTranslation; + private Optional icebergCatalogName = Optional.empty(); + private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE); private boolean sizeBasedSplitWeightsEnabled = true; @@ -1092,6 +1094,19 @@ public boolean isLegacyHiveViewTranslation() return this.legacyHiveViewTranslation; } + public Optional getIcebergCatalogName() + { + return icebergCatalogName; + } + + @Config("hive.iceberg-catalog-name") + @ConfigDescription("The catalog to redirect iceberg tables to") + public HiveConfig setIcebergCatalogName(String icebergCatalogName) + { + this.icebergCatalogName = Optional.ofNullable(icebergCatalogName); + return this; + } + @Config("hive.size-based-split-weights-enabled") public HiveConfig setSizeBasedSplitWeightsEnabled(boolean sizeBasedSplitWeightsEnabled) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index d5ebfb27b47..b95f39292cb 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -91,7 +91,7 @@ public void configure(Binder binder) newOptionalBinder(binder, TransactionalMetadataFactory.class) .setDefault().to(HiveMetadataFactory.class).in(Scopes.SINGLETON); newOptionalBinder(binder, HiveTableRedirectionsProvider.class) - .setDefault().toInstance(HiveTableRedirectionsProvider.NO_REDIRECTIONS); + .setDefault().to(DefaultHiveTableRedirectionsProvider.class); binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(HiveSplitManager.class).in(Scopes.SINGLETON); newExporter(binder).export(ConnectorSplitManager.class).as(generator -> generator.generatedNameOf(HiveSplitManager.class)); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java index f057624c375..58441105f9f 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; @@ -111,6 +112,7 @@ public final class HiveSessionProperties private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout"; private static final String OPTIMIZE_SYMLINK_LISTING = "optimize_symlink_listing"; private static final String LEGACY_HIVE_VIEW_TRANSLATION = "legacy_hive_view_translation"; + private static final String ICEBERG_CATALOG_NAME = "iceberg_catalog_name"; public static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled"; public static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; public static final String NON_TRANSACTIONAL_OPTIMIZE_ENABLED = "non_transactional_optimize_enabled"; @@ -463,6 +465,11 @@ public HiveSessionProperties( "Use legacy Hive view translation mechanism", hiveConfig.isLegacyHiveViewTranslation(), false), + stringProperty( + ICEBERG_CATALOG_NAME, + "Catalog to redirect to when an Iceberg table is referenced", + hiveConfig.getIcebergCatalogName().orElse(null), + false), booleanProperty( SIZE_BASED_SPLIT_WEIGHTS_ENABLED, "Enable estimating split weights based on size in bytes", @@ -790,6 +797,11 @@ public static boolean isLegacyHiveViewTranslation(ConnectorSession session) return session.getProperty(LEGACY_HIVE_VIEW_TRANSLATION, Boolean.class); } + public static Optional getIcebergCatalogName(ConnectorSession session) + { + return Optional.ofNullable(session.getProperty(ICEBERG_CATALOG_NAME, String.class)); + } + public static boolean isSizeBasedSplitWeightsEnabled(ConnectorSession session) { return session.getProperty(SIZE_BASED_SPLIT_WEIGHTS_ENABLED, Boolean.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java index 3e7ebed6802..0642de4608a 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java @@ -39,7 +39,6 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SortOrder; -import io.trino.spi.session.PropertyMetadata; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; @@ -62,6 +61,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; @@ -72,6 +72,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.Maps.immutableEntry; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_PARTITION_READ_ONLY; @@ -256,8 +257,12 @@ public HiveWriterFactory( requireNonNull(hiveSessionProperties, "hiveSessionProperties is null"); this.sessionProperties = hiveSessionProperties.getSessionProperties().stream() - .collect(toImmutableMap(PropertyMetadata::getName, - entry -> session.getProperty(entry.getName(), entry.getJavaType()).toString())); + .map(propertyMetadata -> immutableEntry( + propertyMetadata.getName(), + session.getProperty(propertyMetadata.getName(), propertyMetadata.getJavaType()))) + // The session properties collected here are used for events only. Filter out nulls to avoid problems with downstream consumers + .filter(entry -> entry.getValue() != null) + .collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().toString())); Configuration conf = hdfsEnvironment.getConfiguration(new HdfsContext(session), writePath); configureCompression(conf, getCompressionCodec(session)); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java index b99c79533c0..75866f6a499 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java @@ -105,6 +105,7 @@ public void testDefaults() .setTimestampPrecision(HiveTimestampPrecision.DEFAULT_PRECISION) .setOptimizeSymlinkListing(true) .setLegacyHiveViewTranslation(false) + .setIcebergCatalogName(null) .setSizeBasedSplitWeightsEnabled(true) .setMinimumAssignedSplitWeight(0.05)); } @@ -182,6 +183,7 @@ public void testExplicitPropertyMappings() .put("hive.timestamp-precision", "NANOSECONDS") .put("hive.optimize-symlink-listing", "false") .put("hive.legacy-hive-view-translation", "true") + .put("hive.iceberg-catalog-name", "iceberg") .put("hive.size-based-split-weights-enabled", "false") .put("hive.minimum-assigned-split-weight", "1.0") .build(); @@ -256,6 +258,7 @@ public void testExplicitPropertyMappings() .setTimestampPrecision(HiveTimestampPrecision.NANOSECONDS) .setOptimizeSymlinkListing(false) .setLegacyHiveViewTranslation(true) + .setIcebergCatalogName("iceberg") .setSizeBasedSplitWeightsEnabled(false) .setMinimumAssignedSplitWeight(1.0); diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeHiveIcebergRedirections.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeHiveIcebergRedirections.java new file mode 100644 index 00000000000..fe693af795d --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeHiveIcebergRedirections.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.env.environment; + +import com.google.common.collect.ImmutableList; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.docker.DockerFiles.ResourceProvider; +import io.trino.tests.product.launcher.env.Environment; +import io.trino.tests.product.launcher.env.EnvironmentProvider; +import io.trino.tests.product.launcher.env.common.Hadoop; +import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; + +import javax.inject.Inject; + +import static io.trino.tests.product.launcher.env.EnvironmentContainers.COORDINATOR; +import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_PRESTO_HIVE_PROPERTIES; +import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_PRESTO_ICEBERG_PROPERTIES; +import static org.testcontainers.utility.MountableFile.forHostPath; + +@TestsEnvironment +public class EnvSinglenodeHiveIcebergRedirections + extends EnvironmentProvider +{ + private final ResourceProvider configDir; + + @Inject + public EnvSinglenodeHiveIcebergRedirections(DockerFiles dockerFiles, Standard standard, Hadoop hadoop) + { + super(ImmutableList.of(standard, hadoop)); + configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment/singlenode-hive-iceberg-redirections"); + } + + @Override + public void extendEnvironment(Environment.Builder builder) + { + builder.configureContainer(COORDINATOR, container -> container + .withCopyFileToContainer(forHostPath(configDir.getPath("hive.properties")), CONTAINER_PRESTO_HIVE_PROPERTIES) + .withCopyFileToContainer(forHostPath(configDir.getPath("iceberg.properties")), CONTAINER_PRESTO_ICEBERG_PROPERTIES)); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java index c5a5eb804f0..e0a7faf3792 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.trino.tests.product.launcher.env.EnvironmentConfig; import io.trino.tests.product.launcher.env.EnvironmentDefaults; +import io.trino.tests.product.launcher.env.environment.EnvSinglenodeHiveIcebergRedirections; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeKerberosHdfsImpersonationCrossRealm; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeMysql; import io.trino.tests.product.launcher.env.environment.EnvSinglenodePostgresql; @@ -46,6 +47,7 @@ public List getTestRuns(EnvironmentConfig config) testOnEnvironment(EnvSinglenodeSqlserver.class).withGroups("sqlserver").build(), testOnEnvironment(EnvSinglenodeSparkHive.class).withGroups("hive_spark").build(), testOnEnvironment(EnvSinglenodeSparkIceberg.class).withGroups("iceberg").withExcludedGroups("storage_formats").build(), + testOnEnvironment(EnvSinglenodeHiveIcebergRedirections.class).withGroups("hive_iceberg_redirections").build(), testOnEnvironment(EnvSinglenodeKerberosHdfsImpersonationCrossRealm.class).withGroups("storage_formats", "cli", "hdfs_impersonation").build(), testOnEnvironment(EnvTwoMixedHives.class).withGroups("two_hives").build(), testOnEnvironment(EnvTwoKerberosHives.class).withGroups("two_hives").build()); diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/hive.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/hive.properties new file mode 100644 index 00000000000..91fb859f871 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/hive.properties @@ -0,0 +1,10 @@ +connector.name=hive-hadoop2 +hive.metastore.uri=thrift://hadoop-master:9083 +hive.config.resources=/docker/presto-product-tests/conf/presto/etc/hive-default-fs-site.xml +hive.allow-add-column=true +hive.allow-drop-column=true +hive.allow-rename-column=true +hive.allow-comment-table=true +hive.allow-drop-table=true +hive.allow-rename-table=true +hive.iceberg-catalog-name=iceberg diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/iceberg.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/iceberg.properties new file mode 100644 index 00000000000..6230d550a4a --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/iceberg.properties @@ -0,0 +1,3 @@ +connector.name=iceberg +hive.metastore.uri=thrift://hadoop-master:9083 +hive.config.resources=/docker/presto-product-tests/conf/presto/etc/hive-default-fs-site.xml diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java index b2b5fad460c..6b5dd6417d2 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java @@ -47,6 +47,7 @@ public final class TestGroups public static final String HIVE_VIEWS = "hive_views"; public static final String HIVE_VIEW_COMPATIBILITY = "hive_view_compatibility"; public static final String HIVE_CACHING = "hive_caching"; + public static final String HIVE_ICEBERG_REDIRECTIONS = "hive_iceberg_redirections"; public static final String AUTHORIZATION = "authorization"; public static final String HIVE_COERCION = "hive_coercion"; public static final String CASSANDRA = "cassandra"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveRedirectionToIceberg.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveRedirectionToIceberg.java new file mode 100644 index 00000000000..da79644c723 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveRedirectionToIceberg.java @@ -0,0 +1,492 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.hive; + +import io.trino.tempto.BeforeTestWithContext; +import io.trino.tempto.ProductTest; +import io.trino.tempto.assertions.QueryAssert; +import io.trino.tempto.query.QueryResult; +import org.assertj.core.api.AbstractStringAssert; +import org.assertj.core.api.Assertions; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.tempto.query.QueryExecutor.param; +import static io.trino.tests.product.TestGroups.HIVE_ICEBERG_REDIRECTIONS; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.lang.String.format; +import static java.sql.JDBCType.VARCHAR; + +public class TestHiveRedirectionToIceberg + extends ProductTest +{ + @BeforeTestWithContext + public void createAdditionalSchema() + { + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive.nondefaultschema"); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirect() + { + String tableName = "redirect_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("TABLE " + hiveTableName)); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectWithNonDefaultSchema() + { + String tableName = "redirect_non_default_schema_" + randomTableSuffix(); + String hiveTableName = "hive.nondefaultschema." + tableName; + String icebergTableName = "iceberg.nondefaultschema." + tableName; + + createIcebergTable(icebergTableName, false); + + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("TABLE " + hiveTableName)); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectToNonexistentCatalog() + { + String tableName = "redirect_to_nonexistent_iceberg_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + // sanity check + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("TABLE " + hiveTableName)); + + onTrino().executeQuery("SET SESSION hive.iceberg_catalog_name = 'someweirdcatalog'"); + + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + hiveTableName)) + .hasMessageMatching(".*Table 'hive.default.redirect_to_nonexistent_iceberg_.*' redirected to 'someweirdcatalog.default.redirect_to_nonexistent_iceberg_.*', but the target catalog 'someweirdcatalog' does not exist"); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + // Note: this tests engine more than connectors. Still good scenario to test. + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectWithDefaultSchemaInSession() + { + String tableName = "redirect_with_use_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + onTrino().executeQuery("USE iceberg.default"); + assertResultsEqual( + onTrino().executeQuery("TABLE " + tableName), // unqualified + onTrino().executeQuery("TABLE " + hiveTableName)); + + onTrino().executeQuery("USE hive.default"); + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("TABLE " + tableName)); // unqualified + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectPartitionsToUnpartitioned() + { + String tableName = "iceberg_unpartitioned_table_" + randomTableSuffix(); + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + assertThat(onTrino().executeQuery("" + + "SELECT record_count, data.nationkey.min, data.nationkey.max, data.name.min, data.name.max " + + "FROM hive.default.\"" + tableName + "$partitions\"")) + .containsOnly(row(25L, 0L, 24L, "ALGERIA", "VIETNAM")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectPartitionsToPartitioned() + { + String tableName = "iceberg_partitioned_table_" + randomTableSuffix(); + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + assertThat(onTrino().executeQuery("" + + "SELECT partition.regionkey, record_count, data.nationkey.min, data.nationkey.max, data.name.min, data.name.max " + + "FROM hive.default.\"" + tableName + "$partitions\"")) + .containsOnly( + row(0L, 5L, 0L, 16L, "ALGERIA", "MOZAMBIQUE"), + row(1L, 5L, 1L, 24L, "ARGENTINA", "UNITED STATES"), + row(2L, 5L, 8L, 21L, "CHINA", "VIETNAM"), + row(3L, 5L, 6L, 23L, "FRANCE", "UNITED KINGDOM"), + row(4L, 5L, 4L, 20L, "EGYPT", "SAUDI ARABIA")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}, dataProvider = "schemaAndPartitioning") + public void testInsert(String schema, boolean partitioned) + { + String tableName = "iceberg_insert_" + randomTableSuffix(); + String hiveTableName = "hive." + schema + "." + tableName; + String icebergTableName = "iceberg." + schema + "." + tableName; + + createIcebergTable(icebergTableName, partitioned, false); + + onTrino().executeQuery("INSERT INTO " + hiveTableName + " VALUES (42, 'some name', 12, 'some comment')"); + + assertThat(onTrino().executeQuery("TABLE " + hiveTableName)) + .containsOnly(row(42L, "some name", 12L, "some comment")); + assertThat(onTrino().executeQuery("TABLE " + icebergTableName)) + .containsOnly(row(42L, "some name", 12L, "some comment")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @DataProvider + public static Object[][] schemaAndPartitioning() + { + return new Object[][] { + {"default", false}, + {"default", true}, + // Note: this tests engine more than connectors. Still good scenario to test. + {"nondefaultschema", false}, + {"nondefaultschema", true}, + }; + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDelete() + { + String tableName = "iceberg_insert_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + onTrino().executeQuery("DELETE FROM " + hiveTableName + " WHERE regionkey = 1"); + + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("SELECT nationkey, name, regionkey, comment FROM tpch.tiny.nation WHERE regionkey != 1")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testUpdate() + { + String tableName = "iceberg_insert_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + assertQueryFailure(() -> onTrino().executeQuery("UPDATE " + hiveTableName + " SET nationkey = nationkey + 100 WHERE regionkey = 1")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): This connector does not support updates"); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDropTable() + { + String tableName = "hive_drop_iceberg_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + onTrino().executeQuery("DROP TABLE " + hiveTableName); + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + icebergTableName + "' does not exist"); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDescribe() + { + String tableName = "iceberg_describe_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + assertResultsEqual( + onTrino().executeQuery("DESCRIBE " + icebergTableName), + onTrino().executeQuery("DESCRIBE " + hiveTableName)); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testShowCreateTable() + { + String tableName = "iceberg_show_create_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + assertThat(onTrino().executeQuery("SHOW CREATE TABLE " + hiveTableName)) + .containsOnly(row("CREATE TABLE " + hiveTableName + " (\n" + + " nationkey bigint,\n" + + " name varchar,\n" + + " regionkey bigint,\n" + + " comment varchar\n" + + ")\n" + + "WITH (\n" + + " format = 'ORC',\n" + + " partitioning = ARRAY['regionkey']\n" + // 'partitioning' comes from Iceberg + ")")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testShowStats() + { + String tableName = "iceberg_show_create_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + assertThat(onTrino().executeQuery("SHOW STATS FOR " + hiveTableName)) + .containsOnly( + row("nationkey", null, null, 0d, null, "0", "24"), + row("name", null, null, 0d, null, null, null), + row("regionkey", null, null, 0d, null, "0", "4"), + row("comment", null, null, 0d, null, null, null), + row(null, null, null, null, 25d, null, null)); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testAlterTableRename() + { + String tableName = "iceberg_rename_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + onTrino().executeQuery("ALTER TABLE " + hiveTableName + " RENAME TO " + tableName + "_new"); + + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + hiveTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + hiveTableName + "' does not exist"); + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + icebergTableName + "' does not exist"); + + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName + "_new"), + onTrino().executeQuery("TABLE " + hiveTableName + "_new")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName + "_new"); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testAlterTableAddColumn() + { + String tableName = "iceberg_alter_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + onTrino().executeQuery("ALTER TABLE " + hiveTableName + " ADD COLUMN some_new_column double"); + + // TODO: ALTER TABLE succeeded, but new column was not added + Assertions.assertThat(onTrino().executeQuery("DESCRIBE " + icebergTableName).column(1)) + .containsOnly("nationkey", "name", "regionkey", "comment"); + + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("SELECT * /*, NULL*/ FROM tpch.tiny.nation")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testCommentTable() + { + String tableName = "iceberg_comment_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + assertTableComment("hive", "default", tableName).isNull(); + assertTableComment("iceberg", "default", tableName).isNull(); + + onTrino().executeQuery("COMMENT ON TABLE " + hiveTableName + " IS 'This is my table, there are many like it but this one is mine'"); + + // TODO: COMMENT ON TABLE succeeded, but comment was not preserved + assertTableComment("hive", "default", tableName).isNull(); + assertTableComment("iceberg", "default", tableName).isNull(); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testInformationSchemaColumns() + { + // use dedicated schema so that we control the number and shape of tables + String schemaName = "redirect_information_schema_" + randomTableSuffix(); + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive." + schemaName); + + String tableName = "redirect_information_schema_table_" + randomTableSuffix(); + String icebergTableName = "iceberg." + schemaName + "." + tableName; + + createIcebergTable(icebergTableName, false); + + // via redirection with table filter + assertThat(onTrino().executeQuery( + format("SELECT * FROM hive.information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey", 1, null, "YES", "bigint"), + row("hive", schemaName, tableName, "name", 2, null, "YES", "varchar"), + row("hive", schemaName, tableName, "regionkey", 3, null, "YES", "bigint"), + row("hive", schemaName, tableName, "comment", 4, null, "YES", "varchar")); + + // test via redirection with just schema filter + assertThat(onTrino().executeQuery( + format("SELECT * FROM hive.information_schema.columns WHERE table_schema = '%s'", schemaName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey", 1, null, "YES", "bigint"), + row("hive", schemaName, tableName, "name", 2, null, "YES", "varchar"), + row("hive", schemaName, tableName, "regionkey", 3, null, "YES", "bigint"), + row("hive", schemaName, tableName, "comment", 4, null, "YES", "varchar")); + + // sanity check that getting columns info without redirection produces matching result + assertThat(onTrino().executeQuery( + format("SELECT * FROM iceberg.information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + row("iceberg", schemaName, tableName, "nationkey", 1, null, "YES", "bigint"), + row("iceberg", schemaName, tableName, "name", 2, null, "YES", "varchar"), + row("iceberg", schemaName, tableName, "regionkey", 3, null, "YES", "bigint"), + row("iceberg", schemaName, tableName, "comment", 4, null, "YES", "varchar")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + onTrino().executeQuery("DROP SCHEMA hive." + schemaName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testSystemJdbcColumns() + { + // use dedicated schema so that we control the number and shape of tables + String schemaName = "redirect_system_jdbc_columns_" + randomTableSuffix(); + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive." + schemaName); + + String tableName = "redirect_system_jdbc_columns_table_" + randomTableSuffix(); + String icebergTableName = "iceberg." + schemaName + "." + tableName; + + createIcebergTable(icebergTableName, false); + + // via redirection with table filter + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'hive' AND table_schem = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey"), + row("hive", schemaName, tableName, "name"), + row("hive", schemaName, tableName, "regionkey"), + row("hive", schemaName, tableName, "comment")); + + // test via redirection with just schema filter + // via redirection with table filter + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'hive' AND table_schem = '%s'", schemaName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey"), + row("hive", schemaName, tableName, "name"), + row("hive", schemaName, tableName, "regionkey"), + row("hive", schemaName, tableName, "comment")); + + // sanity check that getting columns info without redirection produces matching result + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'iceberg' AND table_schem = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + row("iceberg", schemaName, tableName, "nationkey"), + row("iceberg", schemaName, tableName, "name"), + row("iceberg", schemaName, tableName, "regionkey"), + row("iceberg", schemaName, tableName, "comment")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + onTrino().executeQuery("DROP SCHEMA hive." + schemaName); + } + + private static void createIcebergTable(String tableName, boolean partitioned) + { + createIcebergTable(tableName, partitioned, true); + } + + private static void createIcebergTable(String tableName, boolean partitioned, boolean withData) + { + onTrino().executeQuery( + "CREATE TABLE " + tableName + " " + + (partitioned ? "WITH (partitioning = ARRAY['regionkey']) " : "") + + " AS " + + "SELECT * FROM tpch.tiny.nation " + + (withData ? "WITH DATA" : "WITH NO DATA")); + } + + private static AbstractStringAssert assertTableComment(String catalog, String schema, String tableName) + { + QueryResult queryResult = readTableComment(catalog, schema, tableName); + return Assertions.assertThat((String) getOnlyElement(getOnlyElement(queryResult.rows()))); + } + + private static QueryResult readTableComment(String catalog, String schema, String tableName) + { + return onTrino().executeQuery( + "SELECT comment FROM system.metadata.table_comments WHERE catalog_name = ? AND schema_name = ? AND table_name = ?", + param(VARCHAR, catalog), + param(VARCHAR, schema), + param(VARCHAR, tableName)); + } + + private static void assertResultsEqual(QueryResult first, QueryResult second) + { + assertThat(first).containsOnly(second.rows().stream() + .map(QueryAssert.Row::new) + .collect(toImmutableList())); + + // just for symmetry + assertThat(second).containsOnly(first.rows().stream() + .map(QueryAssert.Row::new) + .collect(toImmutableList())); + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergRedirectionToHive.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergRedirectionToHive.java new file mode 100644 index 00000000000..4b20a28c547 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergRedirectionToHive.java @@ -0,0 +1,466 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.iceberg; + +import io.trino.tempto.BeforeTestWithContext; +import io.trino.tempto.ProductTest; +import io.trino.tempto.assertions.QueryAssert; +import io.trino.tempto.query.QueryResult; +import org.assertj.core.api.AbstractStringAssert; +import org.assertj.core.api.Assertions; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.tempto.query.QueryExecutor.param; +import static io.trino.tests.product.TestGroups.HIVE_ICEBERG_REDIRECTIONS; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.lang.String.format; +import static java.sql.JDBCType.VARCHAR; + +public class TestIcebergRedirectionToHive + extends ProductTest +{ + @BeforeTestWithContext + public void createAdditionalSchema() + { + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS iceberg.nondefaultschema"); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirect() + { + String tableName = "redirect_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectWithNonDefaultSchema() + { + String tableName = "redirect_non_default_schema_" + randomTableSuffix(); + String hiveTableName = "hive.nondefaultschema." + tableName; + String icebergTableName = "iceberg.nondefaultschema." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: nondefaultschema." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectToNonexistentCatalog() + { + String tableName = "redirect_to_nonexistent_hive_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("SET SESSION iceberg.hive_catalog_name = 'someweirdcatalog'")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Session property 'iceberg.hive_catalog_name' does not exist"); + + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + // Note: this tests engine more than connectors. Still good scenario to test. + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectWithDefaultSchemaInSession() + { + String tableName = "redirect_with_use_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + onTrino().executeQuery("USE iceberg.default"); + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + tableName)) // unqualified + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); +// assertResultsEqual( +// onTrino().executeQuery("TABLE " + tableName), // unqualified +// onTrino().executeQuery("TABLE " + hiveTableName)); + + onTrino().executeQuery("USE hive.default"); + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); +// assertResultsEqual( +// onTrino().executeQuery("TABLE " + icebergTableName), +// onTrino().executeQuery("TABLE " + tableName)); // unqualified + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectPartitionsToUnpartitioned() + { + String tableName = "hive_unpartitioned_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE iceberg.default.\"" + tableName + "$partitions\"")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectPartitionsToPartitioned() + { + String tableName = "hive_partitioned_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE iceberg.default.\"" + tableName + "$partitions\"")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}, dataProvider = "schemaAndPartitioning") + public void testInsert(String schema, boolean partitioned) + { + String tableName = "hive_insert_" + randomTableSuffix(); + String hiveTableName = "hive." + schema + "." + tableName; + String icebergTableName = "iceberg." + schema + "." + tableName; + + createHiveTable(hiveTableName, partitioned, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO " + icebergTableName + " VALUES (6, false, -17), (7, true, 1)")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: " + schema + "." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @DataProvider + public static Object[][] schemaAndPartitioning() + { + return new Object[][] { + {"default", false}, + {"default", true}, + // Note: this tests engine more than connectors. Still good scenario to test. + {"nondefaultschema", false}, + {"nondefaultschema", true}, + }; + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDelete() + { + String tableName = "hive_delete_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM " + icebergTableName + " WHERE regionkey = 1")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testUpdate() + { + String tableName = "hive_update_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("UPDATE " + icebergTableName + " SET nationkey = nationkey + 100 WHERE regionkey = 1")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDropTable() + { + String tableName = "iceberg_drop_hive_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + // TODO should fail + onTrino().executeQuery("TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDescribe() + { + String tableName = "hive_describe_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("DESCRIBE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testShowCreateTable() + { + String tableName = "hive_show_create_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("SHOW CREATE TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testShowStats() + { + String tableName = "hive_show_create_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("SHOW STATS FOR " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testAlterTableRename() + { + String tableName = "hive_rename_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + icebergTableName + " RENAME TO a_new_name")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testAlterTableAddColumn() + { + String tableName = "hive_alter_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + icebergTableName + " ADD COLUMN some_new_column double")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testCommentTable() + { + String tableName = "hive_comment_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + assertTableComment("hive", "default", tableName).isNull(); + // TODO: support redirects from Iceberg to Hive + assertThat(readTableComment("iceberg", "default", tableName)).hasNoRows(); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE " + icebergTableName + " IS 'This is my table, there are many like it but this one is mine'")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + assertTableComment("hive", "default", tableName).isNull(); + // TODO: support redirects from Iceberg to Hive + assertThat(readTableComment("iceberg", "default", tableName)).hasNoRows(); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testInformationSchemaColumns() + { + // use dedicated schema so that we control the number and shape of tables + String schemaName = "redirect_information_schema_" + randomTableSuffix(); + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive." + schemaName); + + String tableName = "redirect_information_schema_table_" + randomTableSuffix(); + String hiveTableName = "hive." + schemaName + "." + tableName; + + createHiveTable(hiveTableName, false); + + // via redirection with table filter + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery( + format("SELECT * FROM iceberg.information_schema.columns WHERE table_schema = '%s' AND table_name='%s'", schemaName, tableName))) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: " + schemaName + "." + tableName); + + // test via redirection with just schema filter + assertThat(onTrino().executeQuery( + format("SELECT * FROM iceberg.information_schema.columns WHERE table_schema = '%s'", schemaName))) + .containsOnly( + // TODO report table's columns via redirect to Hive +// row("iceberg", schemaName, tableName, "nationkey", 1, null, "YES", "bigint"), +// row("iceberg", schemaName, tableName, "name", 2, null, "YES", "varchar(25)"), +// row("iceberg", schemaName, tableName, "comment", 3, null, "YES", "varchar(152)"), +// row("iceberg", schemaName, tableName, "regionkey", 4, null, "YES", "bigint") + /**/); + + // sanity check that getting columns info without redirection produces matching result + assertThat(onTrino().executeQuery( + format("SELECT * FROM hive.information_schema.columns WHERE table_schema = '%s' AND table_name='%s'", schemaName, tableName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey", 1, null, "YES", "bigint"), + row("hive", schemaName, tableName, "name", 2, null, "YES", "varchar(25)"), + row("hive", schemaName, tableName, "comment", 3, null, "YES", "varchar(152)"), + row("hive", schemaName, tableName, "regionkey", 4, null, "YES", "bigint")); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + onTrino().executeQuery("DROP SCHEMA hive." + schemaName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testSystemJdbcColumns() + { + // use dedicated schema so that we control the number and shape of tables + String schemaName = "redirect_system_jdbc_columns_" + randomTableSuffix(); + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive." + schemaName); + + String tableName = "redirect_system_jdbc_columns_table_" + randomTableSuffix(); + String hiveTableName = "hive." + schemaName + "." + tableName; + + createHiveTable(hiveTableName, false); + + // via redirection with table filter + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'iceberg' AND table_schem = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + // TODO report table's columns via redirect to Hive +// row("iceberg", schemaName, tableName, "nationkey"), +// row("iceberg", schemaName, tableName, "name"), +// row("iceberg", schemaName, tableName, "comment"), +// row("iceberg", schemaName, tableName, "regionkey") + /**/); + + // test via redirection with just schema filter + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'iceberg' AND table_schem = '%s'", schemaName))) + .containsOnly( + // TODO report table's columns via redirect to Hive +// row("iceberg", schemaName, tableName, "nationkey"), +// row("iceberg", schemaName, tableName, "name"), +// row("iceberg", schemaName, tableName, "comment"), +// row("iceberg", schemaName, tableName, "regionkey") + /**/); + + // sanity check that getting columns info without redirection produces matching result + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'hive' AND table_schem = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey"), + row("hive", schemaName, tableName, "name"), + row("hive", schemaName, tableName, "comment"), + row("hive", schemaName, tableName, "regionkey")); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + onTrino().executeQuery("DROP SCHEMA hive." + schemaName); + } + + private static void createHiveTable(String tableName, boolean partitioned) + { + createHiveTable(tableName, partitioned, true); + } + + private static void createHiveTable(String tableName, boolean partitioned, boolean withData) + { + onTrino().executeQuery( + "CREATE TABLE " + tableName + " " + + (partitioned ? "WITH (partitioned_by = ARRAY['regionkey']) " : "") + + " AS " + + "SELECT nationkey, name, comment, regionkey FROM tpch.tiny.nation " + + (withData ? "WITH DATA" : "WITH NO DATA")); + } + + private static AbstractStringAssert assertTableComment(String catalog, String schema, String tableName) + { + QueryResult queryResult = readTableComment(catalog, schema, tableName); + return Assertions.assertThat((String) getOnlyElement(getOnlyElement(queryResult.rows()))); + } + + private static QueryResult readTableComment(String catalog, String schema, String tableName) + { + return onTrino().executeQuery( + "SELECT comment FROM system.metadata.table_comments WHERE catalog_name = ? AND schema_name = ? AND table_name = ?", + param(VARCHAR, catalog), + param(VARCHAR, schema), + param(VARCHAR, tableName)); + } + + private static void assertResultsEqual(QueryResult first, QueryResult second) + { + assertThat(first).containsOnly(second.rows().stream() + .map(QueryAssert.Row::new) + .collect(toImmutableList())); + + // just for symmetry + assertThat(second).containsOnly(first.rows().stream() + .map(QueryAssert.Row::new) + .collect(toImmutableList())); + } +}