From 8feb6220973bf3dfe50445d9e1d056b048b0160f Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 3 Dec 2021 17:42:02 +0100 Subject: [PATCH] Add support to redirect table reads from Hive to Iceberg MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Hive Connector redirects Iceberg table access to the configured Iceberg catalog. This change adds implementation of `HiveTableRedirectionsProvider`, plugging in Hive->Iceberg redirects and leveraging existing framework. This is based on work from several authors, mentioned at the end of commit message. As tests show, not all the statements support redirects properly and this needs to be followed up upon. Co-authored-by: Xingyuan Lin Co-authored-by: Pratham Desai Co-authored-by: Sasha Sheikin Co-authored-by: Michał Ślizak Co-authored-by: Łukasz Osipiuk --- .../DefaultHiveTableRedirectionsProvider.java | 40 ++ .../java/io/trino/plugin/hive/HiveConfig.java | 15 + .../java/io/trino/plugin/hive/HiveModule.java | 2 +- .../plugin/hive/HiveSessionProperties.java | 12 + .../trino/plugin/hive/HiveWriterFactory.java | 11 +- .../io/trino/plugin/hive/TestHiveConfig.java | 3 + .../EnvSinglenodeHiveIcebergRedirections.java | 52 ++ .../suite/suites/Suite7NonGeneric.java | 2 + .../hive.properties | 10 + .../iceberg.properties | 3 + .../io/trino/tests/product/TestGroups.java | 1 + .../hive/TestHiveRedirectionToIceberg.java | 492 ++++++++++++++++++ .../iceberg/TestIcebergRedirectionToHive.java | 466 +++++++++++++++++ 13 files changed, 1105 insertions(+), 4 deletions(-) create mode 100644 plugin/trino-hive/src/main/java/io/trino/plugin/hive/DefaultHiveTableRedirectionsProvider.java create mode 100644 testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeHiveIcebergRedirections.java create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/hive.properties create mode 100644 testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/iceberg.properties create mode 100644 testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveRedirectionToIceberg.java create mode 100644 testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergRedirectionToHive.java diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/DefaultHiveTableRedirectionsProvider.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/DefaultHiveTableRedirectionsProvider.java new file mode 100644 index 000000000000..d767e7f726ff --- /dev/null +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/DefaultHiveTableRedirectionsProvider.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import io.trino.plugin.hive.metastore.Table; +import io.trino.spi.connector.CatalogSchemaTableName; +import io.trino.spi.connector.ConnectorSession; + +import java.util.Optional; + +import static io.trino.plugin.hive.HiveSessionProperties.getIcebergCatalogName; +import static io.trino.plugin.hive.util.HiveUtil.isIcebergTable; + +public class DefaultHiveTableRedirectionsProvider + implements HiveTableRedirectionsProvider +{ + @Override + public Optional redirectTable(ConnectorSession session, Table table) + { + Optional targetCatalogName = getIcebergCatalogName(session); + if (targetCatalogName.isEmpty()) { + return Optional.empty(); + } + if (isIcebergTable(table)) { + return targetCatalogName.map(catalog -> new CatalogSchemaTableName(catalog, table.getSchemaTableName())); + } + return Optional.empty(); + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java index 634ff350ad84..34f17235e817 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java @@ -153,6 +153,8 @@ public class HiveConfig private boolean optimizeSymlinkListing = true; private boolean legacyHiveViewTranslation; + private Optional icebergCatalogName = Optional.empty(); + private DataSize targetMaxFileSize = DataSize.of(1, GIGABYTE); private boolean sizeBasedSplitWeightsEnabled = true; @@ -1092,6 +1094,19 @@ public boolean isLegacyHiveViewTranslation() return this.legacyHiveViewTranslation; } + public Optional getIcebergCatalogName() + { + return icebergCatalogName; + } + + @Config("hive.iceberg-catalog-name") + @ConfigDescription("The catalog to redirect iceberg tables to") + public HiveConfig setIcebergCatalogName(String icebergCatalogName) + { + this.icebergCatalogName = Optional.ofNullable(icebergCatalogName); + return this; + } + @Config("hive.size-based-split-weights-enabled") public HiveConfig setSizeBasedSplitWeightsEnabled(boolean sizeBasedSplitWeightsEnabled) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java index d5ebfb27b477..b95f39292cb7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveModule.java @@ -91,7 +91,7 @@ public void configure(Binder binder) newOptionalBinder(binder, TransactionalMetadataFactory.class) .setDefault().to(HiveMetadataFactory.class).in(Scopes.SINGLETON); newOptionalBinder(binder, HiveTableRedirectionsProvider.class) - .setDefault().toInstance(HiveTableRedirectionsProvider.NO_REDIRECTIONS); + .setDefault().to(DefaultHiveTableRedirectionsProvider.class); binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON); binder.bind(ConnectorSplitManager.class).to(HiveSplitManager.class).in(Scopes.SINGLETON); newExporter(binder).export(ConnectorSplitManager.class).as(generator -> generator.generatedNameOf(HiveSplitManager.class)); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java index f057624c3753..58441105f9f7 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSessionProperties.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; @@ -111,6 +112,7 @@ public final class HiveSessionProperties private static final String DYNAMIC_FILTERING_WAIT_TIMEOUT = "dynamic_filtering_wait_timeout"; private static final String OPTIMIZE_SYMLINK_LISTING = "optimize_symlink_listing"; private static final String LEGACY_HIVE_VIEW_TRANSLATION = "legacy_hive_view_translation"; + private static final String ICEBERG_CATALOG_NAME = "iceberg_catalog_name"; public static final String SIZE_BASED_SPLIT_WEIGHTS_ENABLED = "size_based_split_weights_enabled"; public static final String MINIMUM_ASSIGNED_SPLIT_WEIGHT = "minimum_assigned_split_weight"; public static final String NON_TRANSACTIONAL_OPTIMIZE_ENABLED = "non_transactional_optimize_enabled"; @@ -463,6 +465,11 @@ public HiveSessionProperties( "Use legacy Hive view translation mechanism", hiveConfig.isLegacyHiveViewTranslation(), false), + stringProperty( + ICEBERG_CATALOG_NAME, + "Catalog to redirect to when an Iceberg table is referenced", + hiveConfig.getIcebergCatalogName().orElse(null), + false), booleanProperty( SIZE_BASED_SPLIT_WEIGHTS_ENABLED, "Enable estimating split weights based on size in bytes", @@ -790,6 +797,11 @@ public static boolean isLegacyHiveViewTranslation(ConnectorSession session) return session.getProperty(LEGACY_HIVE_VIEW_TRANSLATION, Boolean.class); } + public static Optional getIcebergCatalogName(ConnectorSession session) + { + return Optional.ofNullable(session.getProperty(ICEBERG_CATALOG_NAME, String.class)); + } + public static boolean isSizeBasedSplitWeightsEnabled(ConnectorSession session) { return session.getProperty(SIZE_BASED_SPLIT_WEIGHTS_ENABLED, Boolean.class); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java index 3e7ebed6802a..0642de4608ae 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveWriterFactory.java @@ -39,7 +39,6 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SortOrder; -import io.trino.spi.session.PropertyMetadata; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; import org.apache.hadoop.conf.Configuration; @@ -62,6 +61,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; @@ -72,6 +72,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.Maps.immutableEntry; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_PARTITION_READ_ONLY; @@ -256,8 +257,12 @@ public HiveWriterFactory( requireNonNull(hiveSessionProperties, "hiveSessionProperties is null"); this.sessionProperties = hiveSessionProperties.getSessionProperties().stream() - .collect(toImmutableMap(PropertyMetadata::getName, - entry -> session.getProperty(entry.getName(), entry.getJavaType()).toString())); + .map(propertyMetadata -> immutableEntry( + propertyMetadata.getName(), + session.getProperty(propertyMetadata.getName(), propertyMetadata.getJavaType()))) + // The session properties collected here are used for events only. Filter out nulls to avoid problems with downstream consumers + .filter(entry -> entry.getValue() != null) + .collect(toImmutableMap(Entry::getKey, entry -> entry.getValue().toString())); Configuration conf = hdfsEnvironment.getConfiguration(new HdfsContext(session), writePath); configureCompression(conf, getCompressionCodec(session)); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java index b99c79533c01..75866f6a4998 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java @@ -105,6 +105,7 @@ public void testDefaults() .setTimestampPrecision(HiveTimestampPrecision.DEFAULT_PRECISION) .setOptimizeSymlinkListing(true) .setLegacyHiveViewTranslation(false) + .setIcebergCatalogName(null) .setSizeBasedSplitWeightsEnabled(true) .setMinimumAssignedSplitWeight(0.05)); } @@ -182,6 +183,7 @@ public void testExplicitPropertyMappings() .put("hive.timestamp-precision", "NANOSECONDS") .put("hive.optimize-symlink-listing", "false") .put("hive.legacy-hive-view-translation", "true") + .put("hive.iceberg-catalog-name", "iceberg") .put("hive.size-based-split-weights-enabled", "false") .put("hive.minimum-assigned-split-weight", "1.0") .build(); @@ -256,6 +258,7 @@ public void testExplicitPropertyMappings() .setTimestampPrecision(HiveTimestampPrecision.NANOSECONDS) .setOptimizeSymlinkListing(false) .setLegacyHiveViewTranslation(true) + .setIcebergCatalogName("iceberg") .setSizeBasedSplitWeightsEnabled(false) .setMinimumAssignedSplitWeight(1.0); diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeHiveIcebergRedirections.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeHiveIcebergRedirections.java new file mode 100644 index 000000000000..fe693af795d8 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeHiveIcebergRedirections.java @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.env.environment; + +import com.google.common.collect.ImmutableList; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.docker.DockerFiles.ResourceProvider; +import io.trino.tests.product.launcher.env.Environment; +import io.trino.tests.product.launcher.env.EnvironmentProvider; +import io.trino.tests.product.launcher.env.common.Hadoop; +import io.trino.tests.product.launcher.env.common.Standard; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; + +import javax.inject.Inject; + +import static io.trino.tests.product.launcher.env.EnvironmentContainers.COORDINATOR; +import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_PRESTO_HIVE_PROPERTIES; +import static io.trino.tests.product.launcher.env.common.Hadoop.CONTAINER_PRESTO_ICEBERG_PROPERTIES; +import static org.testcontainers.utility.MountableFile.forHostPath; + +@TestsEnvironment +public class EnvSinglenodeHiveIcebergRedirections + extends EnvironmentProvider +{ + private final ResourceProvider configDir; + + @Inject + public EnvSinglenodeHiveIcebergRedirections(DockerFiles dockerFiles, Standard standard, Hadoop hadoop) + { + super(ImmutableList.of(standard, hadoop)); + configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment/singlenode-hive-iceberg-redirections"); + } + + @Override + public void extendEnvironment(Environment.Builder builder) + { + builder.configureContainer(COORDINATOR, container -> container + .withCopyFileToContainer(forHostPath(configDir.getPath("hive.properties")), CONTAINER_PRESTO_HIVE_PROPERTIES) + .withCopyFileToContainer(forHostPath(configDir.getPath("iceberg.properties")), CONTAINER_PRESTO_ICEBERG_PROPERTIES)); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java index c5a5eb804f04..e0a7faf3792f 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/Suite7NonGeneric.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.trino.tests.product.launcher.env.EnvironmentConfig; import io.trino.tests.product.launcher.env.EnvironmentDefaults; +import io.trino.tests.product.launcher.env.environment.EnvSinglenodeHiveIcebergRedirections; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeKerberosHdfsImpersonationCrossRealm; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeMysql; import io.trino.tests.product.launcher.env.environment.EnvSinglenodePostgresql; @@ -46,6 +47,7 @@ public List getTestRuns(EnvironmentConfig config) testOnEnvironment(EnvSinglenodeSqlserver.class).withGroups("sqlserver").build(), testOnEnvironment(EnvSinglenodeSparkHive.class).withGroups("hive_spark").build(), testOnEnvironment(EnvSinglenodeSparkIceberg.class).withGroups("iceberg").withExcludedGroups("storage_formats").build(), + testOnEnvironment(EnvSinglenodeHiveIcebergRedirections.class).withGroups("hive_iceberg_redirections").build(), testOnEnvironment(EnvSinglenodeKerberosHdfsImpersonationCrossRealm.class).withGroups("storage_formats", "cli", "hdfs_impersonation").build(), testOnEnvironment(EnvTwoMixedHives.class).withGroups("two_hives").build(), testOnEnvironment(EnvTwoKerberosHives.class).withGroups("two_hives").build()); diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/hive.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/hive.properties new file mode 100644 index 000000000000..91fb859f871e --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/hive.properties @@ -0,0 +1,10 @@ +connector.name=hive-hadoop2 +hive.metastore.uri=thrift://hadoop-master:9083 +hive.config.resources=/docker/presto-product-tests/conf/presto/etc/hive-default-fs-site.xml +hive.allow-add-column=true +hive.allow-drop-column=true +hive.allow-rename-column=true +hive.allow-comment-table=true +hive.allow-drop-table=true +hive.allow-rename-table=true +hive.iceberg-catalog-name=iceberg diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/iceberg.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/iceberg.properties new file mode 100644 index 000000000000..6230d550a4ac --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-hive-iceberg-redirections/iceberg.properties @@ -0,0 +1,3 @@ +connector.name=iceberg +hive.metastore.uri=thrift://hadoop-master:9083 +hive.config.resources=/docker/presto-product-tests/conf/presto/etc/hive-default-fs-site.xml diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java index b2b5fad460c4..6b5dd6417d22 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java @@ -47,6 +47,7 @@ public final class TestGroups public static final String HIVE_VIEWS = "hive_views"; public static final String HIVE_VIEW_COMPATIBILITY = "hive_view_compatibility"; public static final String HIVE_CACHING = "hive_caching"; + public static final String HIVE_ICEBERG_REDIRECTIONS = "hive_iceberg_redirections"; public static final String AUTHORIZATION = "authorization"; public static final String HIVE_COERCION = "hive_coercion"; public static final String CASSANDRA = "cassandra"; diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveRedirectionToIceberg.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveRedirectionToIceberg.java new file mode 100644 index 000000000000..da79644c723b --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestHiveRedirectionToIceberg.java @@ -0,0 +1,492 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.hive; + +import io.trino.tempto.BeforeTestWithContext; +import io.trino.tempto.ProductTest; +import io.trino.tempto.assertions.QueryAssert; +import io.trino.tempto.query.QueryResult; +import org.assertj.core.api.AbstractStringAssert; +import org.assertj.core.api.Assertions; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.tempto.query.QueryExecutor.param; +import static io.trino.tests.product.TestGroups.HIVE_ICEBERG_REDIRECTIONS; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.lang.String.format; +import static java.sql.JDBCType.VARCHAR; + +public class TestHiveRedirectionToIceberg + extends ProductTest +{ + @BeforeTestWithContext + public void createAdditionalSchema() + { + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive.nondefaultschema"); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirect() + { + String tableName = "redirect_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("TABLE " + hiveTableName)); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectWithNonDefaultSchema() + { + String tableName = "redirect_non_default_schema_" + randomTableSuffix(); + String hiveTableName = "hive.nondefaultschema." + tableName; + String icebergTableName = "iceberg.nondefaultschema." + tableName; + + createIcebergTable(icebergTableName, false); + + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("TABLE " + hiveTableName)); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectToNonexistentCatalog() + { + String tableName = "redirect_to_nonexistent_iceberg_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + // sanity check + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("TABLE " + hiveTableName)); + + onTrino().executeQuery("SET SESSION hive.iceberg_catalog_name = 'someweirdcatalog'"); + + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + hiveTableName)) + .hasMessageMatching(".*Table 'hive.default.redirect_to_nonexistent_iceberg_.*' redirected to 'someweirdcatalog.default.redirect_to_nonexistent_iceberg_.*', but the target catalog 'someweirdcatalog' does not exist"); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + // Note: this tests engine more than connectors. Still good scenario to test. + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectWithDefaultSchemaInSession() + { + String tableName = "redirect_with_use_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + onTrino().executeQuery("USE iceberg.default"); + assertResultsEqual( + onTrino().executeQuery("TABLE " + tableName), // unqualified + onTrino().executeQuery("TABLE " + hiveTableName)); + + onTrino().executeQuery("USE hive.default"); + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("TABLE " + tableName)); // unqualified + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectPartitionsToUnpartitioned() + { + String tableName = "iceberg_unpartitioned_table_" + randomTableSuffix(); + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + assertThat(onTrino().executeQuery("" + + "SELECT record_count, data.nationkey.min, data.nationkey.max, data.name.min, data.name.max " + + "FROM hive.default.\"" + tableName + "$partitions\"")) + .containsOnly(row(25L, 0L, 24L, "ALGERIA", "VIETNAM")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectPartitionsToPartitioned() + { + String tableName = "iceberg_partitioned_table_" + randomTableSuffix(); + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + assertThat(onTrino().executeQuery("" + + "SELECT partition.regionkey, record_count, data.nationkey.min, data.nationkey.max, data.name.min, data.name.max " + + "FROM hive.default.\"" + tableName + "$partitions\"")) + .containsOnly( + row(0L, 5L, 0L, 16L, "ALGERIA", "MOZAMBIQUE"), + row(1L, 5L, 1L, 24L, "ARGENTINA", "UNITED STATES"), + row(2L, 5L, 8L, 21L, "CHINA", "VIETNAM"), + row(3L, 5L, 6L, 23L, "FRANCE", "UNITED KINGDOM"), + row(4L, 5L, 4L, 20L, "EGYPT", "SAUDI ARABIA")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}, dataProvider = "schemaAndPartitioning") + public void testInsert(String schema, boolean partitioned) + { + String tableName = "iceberg_insert_" + randomTableSuffix(); + String hiveTableName = "hive." + schema + "." + tableName; + String icebergTableName = "iceberg." + schema + "." + tableName; + + createIcebergTable(icebergTableName, partitioned, false); + + onTrino().executeQuery("INSERT INTO " + hiveTableName + " VALUES (42, 'some name', 12, 'some comment')"); + + assertThat(onTrino().executeQuery("TABLE " + hiveTableName)) + .containsOnly(row(42L, "some name", 12L, "some comment")); + assertThat(onTrino().executeQuery("TABLE " + icebergTableName)) + .containsOnly(row(42L, "some name", 12L, "some comment")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @DataProvider + public static Object[][] schemaAndPartitioning() + { + return new Object[][] { + {"default", false}, + {"default", true}, + // Note: this tests engine more than connectors. Still good scenario to test. + {"nondefaultschema", false}, + {"nondefaultschema", true}, + }; + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDelete() + { + String tableName = "iceberg_insert_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + onTrino().executeQuery("DELETE FROM " + hiveTableName + " WHERE regionkey = 1"); + + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("SELECT nationkey, name, regionkey, comment FROM tpch.tiny.nation WHERE regionkey != 1")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testUpdate() + { + String tableName = "iceberg_insert_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + assertQueryFailure(() -> onTrino().executeQuery("UPDATE " + hiveTableName + " SET nationkey = nationkey + 100 WHERE regionkey = 1")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): This connector does not support updates"); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDropTable() + { + String tableName = "hive_drop_iceberg_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + onTrino().executeQuery("DROP TABLE " + hiveTableName); + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + icebergTableName + "' does not exist"); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDescribe() + { + String tableName = "iceberg_describe_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + assertResultsEqual( + onTrino().executeQuery("DESCRIBE " + icebergTableName), + onTrino().executeQuery("DESCRIBE " + hiveTableName)); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testShowCreateTable() + { + String tableName = "iceberg_show_create_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + assertThat(onTrino().executeQuery("SHOW CREATE TABLE " + hiveTableName)) + .containsOnly(row("CREATE TABLE " + hiveTableName + " (\n" + + " nationkey bigint,\n" + + " name varchar,\n" + + " regionkey bigint,\n" + + " comment varchar\n" + + ")\n" + + "WITH (\n" + + " format = 'ORC',\n" + + " partitioning = ARRAY['regionkey']\n" + // 'partitioning' comes from Iceberg + ")")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testShowStats() + { + String tableName = "iceberg_show_create_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, true); + + assertThat(onTrino().executeQuery("SHOW STATS FOR " + hiveTableName)) + .containsOnly( + row("nationkey", null, null, 0d, null, "0", "24"), + row("name", null, null, 0d, null, null, null), + row("regionkey", null, null, 0d, null, "0", "4"), + row("comment", null, null, 0d, null, null, null), + row(null, null, null, null, 25d, null, null)); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testAlterTableRename() + { + String tableName = "iceberg_rename_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + onTrino().executeQuery("ALTER TABLE " + hiveTableName + " RENAME TO " + tableName + "_new"); + + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + hiveTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + hiveTableName + "' does not exist"); + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Table '" + icebergTableName + "' does not exist"); + + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName + "_new"), + onTrino().executeQuery("TABLE " + hiveTableName + "_new")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName + "_new"); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testAlterTableAddColumn() + { + String tableName = "iceberg_alter_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + onTrino().executeQuery("ALTER TABLE " + hiveTableName + " ADD COLUMN some_new_column double"); + + // TODO: ALTER TABLE succeeded, but new column was not added + Assertions.assertThat(onTrino().executeQuery("DESCRIBE " + icebergTableName).column(1)) + .containsOnly("nationkey", "name", "regionkey", "comment"); + + assertResultsEqual( + onTrino().executeQuery("TABLE " + icebergTableName), + onTrino().executeQuery("SELECT * /*, NULL*/ FROM tpch.tiny.nation")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testCommentTable() + { + String tableName = "iceberg_comment_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createIcebergTable(icebergTableName, false); + + assertTableComment("hive", "default", tableName).isNull(); + assertTableComment("iceberg", "default", tableName).isNull(); + + onTrino().executeQuery("COMMENT ON TABLE " + hiveTableName + " IS 'This is my table, there are many like it but this one is mine'"); + + // TODO: COMMENT ON TABLE succeeded, but comment was not preserved + assertTableComment("hive", "default", tableName).isNull(); + assertTableComment("iceberg", "default", tableName).isNull(); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testInformationSchemaColumns() + { + // use dedicated schema so that we control the number and shape of tables + String schemaName = "redirect_information_schema_" + randomTableSuffix(); + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive." + schemaName); + + String tableName = "redirect_information_schema_table_" + randomTableSuffix(); + String icebergTableName = "iceberg." + schemaName + "." + tableName; + + createIcebergTable(icebergTableName, false); + + // via redirection with table filter + assertThat(onTrino().executeQuery( + format("SELECT * FROM hive.information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey", 1, null, "YES", "bigint"), + row("hive", schemaName, tableName, "name", 2, null, "YES", "varchar"), + row("hive", schemaName, tableName, "regionkey", 3, null, "YES", "bigint"), + row("hive", schemaName, tableName, "comment", 4, null, "YES", "varchar")); + + // test via redirection with just schema filter + assertThat(onTrino().executeQuery( + format("SELECT * FROM hive.information_schema.columns WHERE table_schema = '%s'", schemaName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey", 1, null, "YES", "bigint"), + row("hive", schemaName, tableName, "name", 2, null, "YES", "varchar"), + row("hive", schemaName, tableName, "regionkey", 3, null, "YES", "bigint"), + row("hive", schemaName, tableName, "comment", 4, null, "YES", "varchar")); + + // sanity check that getting columns info without redirection produces matching result + assertThat(onTrino().executeQuery( + format("SELECT * FROM iceberg.information_schema.columns WHERE table_schema = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + row("iceberg", schemaName, tableName, "nationkey", 1, null, "YES", "bigint"), + row("iceberg", schemaName, tableName, "name", 2, null, "YES", "varchar"), + row("iceberg", schemaName, tableName, "regionkey", 3, null, "YES", "bigint"), + row("iceberg", schemaName, tableName, "comment", 4, null, "YES", "varchar")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + onTrino().executeQuery("DROP SCHEMA hive." + schemaName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testSystemJdbcColumns() + { + // use dedicated schema so that we control the number and shape of tables + String schemaName = "redirect_system_jdbc_columns_" + randomTableSuffix(); + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive." + schemaName); + + String tableName = "redirect_system_jdbc_columns_table_" + randomTableSuffix(); + String icebergTableName = "iceberg." + schemaName + "." + tableName; + + createIcebergTable(icebergTableName, false); + + // via redirection with table filter + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'hive' AND table_schem = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey"), + row("hive", schemaName, tableName, "name"), + row("hive", schemaName, tableName, "regionkey"), + row("hive", schemaName, tableName, "comment")); + + // test via redirection with just schema filter + // via redirection with table filter + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'hive' AND table_schem = '%s'", schemaName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey"), + row("hive", schemaName, tableName, "name"), + row("hive", schemaName, tableName, "regionkey"), + row("hive", schemaName, tableName, "comment")); + + // sanity check that getting columns info without redirection produces matching result + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'iceberg' AND table_schem = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + row("iceberg", schemaName, tableName, "nationkey"), + row("iceberg", schemaName, tableName, "name"), + row("iceberg", schemaName, tableName, "regionkey"), + row("iceberg", schemaName, tableName, "comment")); + + onTrino().executeQuery("DROP TABLE " + icebergTableName); + onTrino().executeQuery("DROP SCHEMA hive." + schemaName); + } + + private static void createIcebergTable(String tableName, boolean partitioned) + { + createIcebergTable(tableName, partitioned, true); + } + + private static void createIcebergTable(String tableName, boolean partitioned, boolean withData) + { + onTrino().executeQuery( + "CREATE TABLE " + tableName + " " + + (partitioned ? "WITH (partitioning = ARRAY['regionkey']) " : "") + + " AS " + + "SELECT * FROM tpch.tiny.nation " + + (withData ? "WITH DATA" : "WITH NO DATA")); + } + + private static AbstractStringAssert assertTableComment(String catalog, String schema, String tableName) + { + QueryResult queryResult = readTableComment(catalog, schema, tableName); + return Assertions.assertThat((String) getOnlyElement(getOnlyElement(queryResult.rows()))); + } + + private static QueryResult readTableComment(String catalog, String schema, String tableName) + { + return onTrino().executeQuery( + "SELECT comment FROM system.metadata.table_comments WHERE catalog_name = ? AND schema_name = ? AND table_name = ?", + param(VARCHAR, catalog), + param(VARCHAR, schema), + param(VARCHAR, tableName)); + } + + private static void assertResultsEqual(QueryResult first, QueryResult second) + { + assertThat(first).containsOnly(second.rows().stream() + .map(QueryAssert.Row::new) + .collect(toImmutableList())); + + // just for symmetry + assertThat(second).containsOnly(first.rows().stream() + .map(QueryAssert.Row::new) + .collect(toImmutableList())); + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergRedirectionToHive.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergRedirectionToHive.java new file mode 100644 index 000000000000..4b20a28c5472 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergRedirectionToHive.java @@ -0,0 +1,466 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.iceberg; + +import io.trino.tempto.BeforeTestWithContext; +import io.trino.tempto.ProductTest; +import io.trino.tempto.assertions.QueryAssert; +import io.trino.tempto.query.QueryResult; +import org.assertj.core.api.AbstractStringAssert; +import org.assertj.core.api.Assertions; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; +import static io.trino.tempto.assertions.QueryAssert.assertThat; +import static io.trino.tempto.query.QueryExecutor.param; +import static io.trino.tests.product.TestGroups.HIVE_ICEBERG_REDIRECTIONS; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.lang.String.format; +import static java.sql.JDBCType.VARCHAR; + +public class TestIcebergRedirectionToHive + extends ProductTest +{ + @BeforeTestWithContext + public void createAdditionalSchema() + { + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS iceberg.nondefaultschema"); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirect() + { + String tableName = "redirect_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectWithNonDefaultSchema() + { + String tableName = "redirect_non_default_schema_" + randomTableSuffix(); + String hiveTableName = "hive.nondefaultschema." + tableName; + String icebergTableName = "iceberg.nondefaultschema." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: nondefaultschema." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectToNonexistentCatalog() + { + String tableName = "redirect_to_nonexistent_hive_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("SET SESSION iceberg.hive_catalog_name = 'someweirdcatalog'")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): line 1:1: Session property 'iceberg.hive_catalog_name' does not exist"); + + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + // Note: this tests engine more than connectors. Still good scenario to test. + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectWithDefaultSchemaInSession() + { + String tableName = "redirect_with_use_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + onTrino().executeQuery("USE iceberg.default"); + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + tableName)) // unqualified + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); +// assertResultsEqual( +// onTrino().executeQuery("TABLE " + tableName), // unqualified +// onTrino().executeQuery("TABLE " + hiveTableName)); + + onTrino().executeQuery("USE hive.default"); + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); +// assertResultsEqual( +// onTrino().executeQuery("TABLE " + icebergTableName), +// onTrino().executeQuery("TABLE " + tableName)); // unqualified + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectPartitionsToUnpartitioned() + { + String tableName = "hive_unpartitioned_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE iceberg.default.\"" + tableName + "$partitions\"")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testRedirectPartitionsToPartitioned() + { + String tableName = "hive_partitioned_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("TABLE iceberg.default.\"" + tableName + "$partitions\"")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}, dataProvider = "schemaAndPartitioning") + public void testInsert(String schema, boolean partitioned) + { + String tableName = "hive_insert_" + randomTableSuffix(); + String hiveTableName = "hive." + schema + "." + tableName; + String icebergTableName = "iceberg." + schema + "." + tableName; + + createHiveTable(hiveTableName, partitioned, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("INSERT INTO " + icebergTableName + " VALUES (6, false, -17), (7, true, 1)")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: " + schema + "." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @DataProvider + public static Object[][] schemaAndPartitioning() + { + return new Object[][] { + {"default", false}, + {"default", true}, + // Note: this tests engine more than connectors. Still good scenario to test. + {"nondefaultschema", false}, + {"nondefaultschema", true}, + }; + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDelete() + { + String tableName = "hive_delete_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("DELETE FROM " + icebergTableName + " WHERE regionkey = 1")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testUpdate() + { + String tableName = "hive_update_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("UPDATE " + icebergTableName + " SET nationkey = nationkey + 100 WHERE regionkey = 1")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDropTable() + { + String tableName = "iceberg_drop_hive_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("DROP TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + // TODO should fail + onTrino().executeQuery("TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testDescribe() + { + String tableName = "hive_describe_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("DESCRIBE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testShowCreateTable() + { + String tableName = "hive_show_create_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("SHOW CREATE TABLE " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testShowStats() + { + String tableName = "hive_show_create_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, true); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("SHOW STATS FOR " + icebergTableName)) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testAlterTableRename() + { + String tableName = "hive_rename_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + icebergTableName + " RENAME TO a_new_name")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testAlterTableAddColumn() + { + String tableName = "hive_alter_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("ALTER TABLE " + icebergTableName + " ADD COLUMN some_new_column double")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testCommentTable() + { + String tableName = "hive_comment_table_" + randomTableSuffix(); + String hiveTableName = "hive.default." + tableName; + String icebergTableName = "iceberg.default." + tableName; + + createHiveTable(hiveTableName, false); + + assertTableComment("hive", "default", tableName).isNull(); + // TODO: support redirects from Iceberg to Hive + assertThat(readTableComment("iceberg", "default", tableName)).hasNoRows(); + + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery("COMMENT ON TABLE " + icebergTableName + " IS 'This is my table, there are many like it but this one is mine'")) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: default." + tableName); + + assertTableComment("hive", "default", tableName).isNull(); + // TODO: support redirects from Iceberg to Hive + assertThat(readTableComment("iceberg", "default", tableName)).hasNoRows(); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testInformationSchemaColumns() + { + // use dedicated schema so that we control the number and shape of tables + String schemaName = "redirect_information_schema_" + randomTableSuffix(); + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive." + schemaName); + + String tableName = "redirect_information_schema_table_" + randomTableSuffix(); + String hiveTableName = "hive." + schemaName + "." + tableName; + + createHiveTable(hiveTableName, false); + + // via redirection with table filter + // TODO: support redirects from Iceberg to Hive + assertQueryFailure(() -> onTrino().executeQuery( + format("SELECT * FROM iceberg.information_schema.columns WHERE table_schema = '%s' AND table_name='%s'", schemaName, tableName))) + .hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Not an Iceberg table: " + schemaName + "." + tableName); + + // test via redirection with just schema filter + assertThat(onTrino().executeQuery( + format("SELECT * FROM iceberg.information_schema.columns WHERE table_schema = '%s'", schemaName))) + .containsOnly( + // TODO report table's columns via redirect to Hive +// row("iceberg", schemaName, tableName, "nationkey", 1, null, "YES", "bigint"), +// row("iceberg", schemaName, tableName, "name", 2, null, "YES", "varchar(25)"), +// row("iceberg", schemaName, tableName, "comment", 3, null, "YES", "varchar(152)"), +// row("iceberg", schemaName, tableName, "regionkey", 4, null, "YES", "bigint") + /**/); + + // sanity check that getting columns info without redirection produces matching result + assertThat(onTrino().executeQuery( + format("SELECT * FROM hive.information_schema.columns WHERE table_schema = '%s' AND table_name='%s'", schemaName, tableName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey", 1, null, "YES", "bigint"), + row("hive", schemaName, tableName, "name", 2, null, "YES", "varchar(25)"), + row("hive", schemaName, tableName, "comment", 3, null, "YES", "varchar(152)"), + row("hive", schemaName, tableName, "regionkey", 4, null, "YES", "bigint")); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + onTrino().executeQuery("DROP SCHEMA hive." + schemaName); + } + + @Test(groups = {HIVE_ICEBERG_REDIRECTIONS, PROFILE_SPECIFIC_TESTS}) + public void testSystemJdbcColumns() + { + // use dedicated schema so that we control the number and shape of tables + String schemaName = "redirect_system_jdbc_columns_" + randomTableSuffix(); + onTrino().executeQuery("CREATE SCHEMA IF NOT EXISTS hive." + schemaName); + + String tableName = "redirect_system_jdbc_columns_table_" + randomTableSuffix(); + String hiveTableName = "hive." + schemaName + "." + tableName; + + createHiveTable(hiveTableName, false); + + // via redirection with table filter + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'iceberg' AND table_schem = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + // TODO report table's columns via redirect to Hive +// row("iceberg", schemaName, tableName, "nationkey"), +// row("iceberg", schemaName, tableName, "name"), +// row("iceberg", schemaName, tableName, "comment"), +// row("iceberg", schemaName, tableName, "regionkey") + /**/); + + // test via redirection with just schema filter + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'iceberg' AND table_schem = '%s'", schemaName))) + .containsOnly( + // TODO report table's columns via redirect to Hive +// row("iceberg", schemaName, tableName, "nationkey"), +// row("iceberg", schemaName, tableName, "name"), +// row("iceberg", schemaName, tableName, "comment"), +// row("iceberg", schemaName, tableName, "regionkey") + /**/); + + // sanity check that getting columns info without redirection produces matching result + assertThat(onTrino().executeQuery( + format("SELECT table_cat, table_schem, table_name, column_name FROM system.jdbc.columns WHERE table_cat = 'hive' AND table_schem = '%s' AND table_name = '%s'", schemaName, tableName))) + .containsOnly( + row("hive", schemaName, tableName, "nationkey"), + row("hive", schemaName, tableName, "name"), + row("hive", schemaName, tableName, "comment"), + row("hive", schemaName, tableName, "regionkey")); + + onTrino().executeQuery("DROP TABLE " + hiveTableName); + onTrino().executeQuery("DROP SCHEMA hive." + schemaName); + } + + private static void createHiveTable(String tableName, boolean partitioned) + { + createHiveTable(tableName, partitioned, true); + } + + private static void createHiveTable(String tableName, boolean partitioned, boolean withData) + { + onTrino().executeQuery( + "CREATE TABLE " + tableName + " " + + (partitioned ? "WITH (partitioned_by = ARRAY['regionkey']) " : "") + + " AS " + + "SELECT nationkey, name, comment, regionkey FROM tpch.tiny.nation " + + (withData ? "WITH DATA" : "WITH NO DATA")); + } + + private static AbstractStringAssert assertTableComment(String catalog, String schema, String tableName) + { + QueryResult queryResult = readTableComment(catalog, schema, tableName); + return Assertions.assertThat((String) getOnlyElement(getOnlyElement(queryResult.rows()))); + } + + private static QueryResult readTableComment(String catalog, String schema, String tableName) + { + return onTrino().executeQuery( + "SELECT comment FROM system.metadata.table_comments WHERE catalog_name = ? AND schema_name = ? AND table_name = ?", + param(VARCHAR, catalog), + param(VARCHAR, schema), + param(VARCHAR, tableName)); + } + + private static void assertResultsEqual(QueryResult first, QueryResult second) + { + assertThat(first).containsOnly(second.rows().stream() + .map(QueryAssert.Row::new) + .collect(toImmutableList())); + + // just for symmetry + assertThat(second).containsOnly(first.rows().stream() + .map(QueryAssert.Row::new) + .collect(toImmutableList())); + } +}