diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c130406c81e7..1d1a9dbbd48c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -219,6 +219,10 @@ jobs: if [ "${AWS_ACCESS_KEY_ID}" != "" ]; then source plugin/trino-hive-hadoop2/conf/hive-tests-${{ matrix.config }}.sh && plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh + if [ matrix.config == 'config-hdp3' ]; then + # JsonSerde class needed for the S3 Select JSON tests is only available on hdp3. + plugin/trino-hive-hadoop2/bin/run_hive_s3_select_json_tests.sh + fi fi - name: Run Hive Glue Tests env: diff --git a/plugin/trino-hive-hadoop2/bin/run_hive_s3_select_json_tests.sh b/plugin/trino-hive-hadoop2/bin/run_hive_s3_select_json_tests.sh new file mode 100755 index 000000000000..2c663932b608 --- /dev/null +++ b/plugin/trino-hive-hadoop2/bin/run_hive_s3_select_json_tests.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash + +# Similar to run_hive_s3_tests.sh, but has only Amazon S3 Select JSON tests. This is in a separate file as the JsonSerDe +# class is only available in Hadoop 3.1 version, and so we would only test JSON pushdown against the 3.1 version. + +set -euo pipefail -x + +. "${BASH_SOURCE%/*}/common.sh" + +abort_if_not_gib_impacted + +check_vars S3_BUCKET S3_BUCKET_ENDPOINT \ + AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY + +cleanup_hadoop_docker_containers +start_hadoop_docker_containers + +test_directory="$(date '+%Y%m%d-%H%M%S')-$(uuidgen | sha1sum | cut -b 1-6)-s3select-json" + +# insert AWS credentials +deploy_core_site_xml core-site.xml.s3-template \ + AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY S3_BUCKET_ENDPOINT + +# create test tables +# can't use create_test_tables because the first table is created with different commands +table_path="s3a://${S3_BUCKET}/${test_directory}/trino_s3select_test_external_fs_json/" +exec_in_hadoop_master_container hadoop fs -mkdir -p "${table_path}" +exec_in_hadoop_master_container /docker/files/hadoop-put.sh /docker/files/test_table.json{,.gz,.bz2} "${table_path}" +exec_in_hadoop_master_container sudo -Eu hive beeline -u jdbc:hive2://localhost:10000/default -n hive -e " + CREATE EXTERNAL TABLE trino_s3select_test_external_fs_json(col_1 bigint, col_2 bigint) + ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' + LOCATION '${table_path}'" + +stop_unnecessary_hadoop_services + +# restart hive-metastore to apply S3 changes in core-site.xml +docker exec "$(hadoop_master_container)" supervisorctl restart hive-metastore +retry check_hadoop + +# run product tests +pushd "${PROJECT_ROOT}" +set +e +./mvnw ${MAVEN_TEST:--B} -pl :trino-hive-hadoop2 test -P test-hive-hadoop2-s3-select-json \ + -DHADOOP_USER_NAME=hive \ + -Dhive.hadoop2.metastoreHost=localhost \ + -Dhive.hadoop2.metastorePort=9083 \ + -Dhive.hadoop2.databaseName=default \ + -Dhive.hadoop2.s3.awsAccessKey="${AWS_ACCESS_KEY_ID}" \ + -Dhive.hadoop2.s3.awsSecretKey="${AWS_SECRET_ACCESS_KEY}" \ + -Dhive.hadoop2.s3.writableBucket="${S3_BUCKET}" \ + -Dhive.hadoop2.s3.testDirectory="${test_directory}" +EXIT_CODE=$? +set -e +popd + +cleanup_hadoop_docker_containers + +exit "${EXIT_CODE}" diff --git a/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh b/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh index 9f952137d8af..2caa78bd3678 100755 --- a/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh +++ b/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh @@ -10,9 +10,6 @@ check_vars S3_BUCKET S3_BUCKET_ENDPOINT \ AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY cleanup_hadoop_docker_containers - -# Use Hadoop version 3.1 for S3 tests as the JSON SerDe class is not available in lower versions. -export HADOOP_BASE_IMAGE="ghcr.io/trinodb/testing/hdp3.1-hive" start_hadoop_docker_containers test_directory="$(date '+%Y%m%d-%H%M%S')-$(uuidgen | sha1sum | cut -b 1-6)" @@ -69,14 +66,6 @@ exec_in_hadoop_master_container /usr/bin/hive -e " STORED AS TEXTFILE LOCATION '${table_path}'" -table_path="s3a://${S3_BUCKET}/${test_directory}/trino_s3select_test_external_fs_json/" -exec_in_hadoop_master_container hadoop fs -mkdir -p "${table_path}" -exec_in_hadoop_master_container hadoop fs -put -f /docker/files/test_table.json{,.gz,.bz2} "${table_path}" -exec_in_hadoop_master_container /usr/bin/hive -e " - CREATE EXTERNAL TABLE trino_s3select_test_external_fs_json(col_1 bigint, col_2 bigint) - ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' - LOCATION '${table_path}'" - stop_unnecessary_hadoop_services # restart hive-metastore to apply S3 changes in core-site.xml diff --git a/plugin/trino-hive-hadoop2/pom.xml b/plugin/trino-hive-hadoop2/pom.xml index 1443c69b9d73..f29d5df2c979 100644 --- a/plugin/trino-hive-hadoop2/pom.xml +++ b/plugin/trino-hive-hadoop2/pom.xml @@ -34,18 +34,48 @@ + + io.trino + trino-hadoop-toolkit + runtime + + io.trino trino-hdfs runtime + + io.trino + trino-plugin-toolkit + runtime + + io.trino.hadoop hadoop-apache runtime + + io.airlift + concurrent + runtime + + + + io.airlift + json + runtime + + + + io.airlift + stats + runtime + + com.qubole.rubix rubix-presto-shaded @@ -146,6 +176,7 @@ **/TestHiveAlluxioMetastore.java **/TestHiveFileSystemS3.java **/TestHiveFileSystemS3SelectPushdown.java + **/TestHiveFileSystemS3SelectJsonPushdown.java **/TestHiveFileSystemWasb.java **/TestHiveFileSystemAbfsAccessKey.java **/TestHiveFileSystemAbfsOAuth.java @@ -190,6 +221,22 @@ + + test-hive-hadoop2-s3-select-json + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/TestHiveFileSystemS3SelectJsonPushdown.java + + + + + + test-hive-hadoop2-wasb diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectJsonPushdown.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectJsonPushdown.java new file mode 100644 index 000000000000..328a8acc0d15 --- /dev/null +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectJsonPushdown.java @@ -0,0 +1,248 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive.s3select; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.net.HostAndPort; +import io.airlift.concurrent.BoundedExecutor; +import io.airlift.json.JsonCodec; +import io.airlift.stats.CounterStat; +import io.trino.hdfs.ConfigurationInitializer; +import io.trino.hdfs.DynamicHdfsConfiguration; +import io.trino.hdfs.HdfsConfig; +import io.trino.hdfs.HdfsConfiguration; +import io.trino.hdfs.HdfsConfigurationInitializer; +import io.trino.hdfs.HdfsEnvironment; +import io.trino.hdfs.authentication.NoHdfsAuthentication; +import io.trino.plugin.base.CatalogName; +import io.trino.plugin.hive.AbstractTestHiveFileSystem; +import io.trino.plugin.hive.DefaultHiveMaterializedViewMetadataFactory; +import io.trino.plugin.hive.GenericHiveRecordCursorProvider; +import io.trino.plugin.hive.HiveConfig; +import io.trino.plugin.hive.HiveLocationService; +import io.trino.plugin.hive.HiveMetadataFactory; +import io.trino.plugin.hive.HivePageSourceProvider; +import io.trino.plugin.hive.HivePartitionManager; +import io.trino.plugin.hive.HiveSplitManager; +import io.trino.plugin.hive.HiveTransactionManager; +import io.trino.plugin.hive.LocationService; +import io.trino.plugin.hive.NamenodeStats; +import io.trino.plugin.hive.NodeVersion; +import io.trino.plugin.hive.NoneHiveRedirectionsProvider; +import io.trino.plugin.hive.PartitionUpdate; +import io.trino.plugin.hive.PartitionsSystemTableProvider; +import io.trino.plugin.hive.PropertiesSystemTableProvider; +import io.trino.plugin.hive.aws.athena.PartitionProjectionService; +import io.trino.plugin.hive.fs.FileSystemDirectoryLister; +import io.trino.plugin.hive.metastore.HiveMetastoreConfig; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore; +import io.trino.plugin.hive.s3.HiveS3Config; +import io.trino.plugin.hive.s3.TrinoS3ConfigurationInitializer; +import io.trino.plugin.hive.security.SqlStandardAccessControlMetadata; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TestingTypeManager; +import io.trino.testing.MaterializedResult; +import org.apache.hadoop.fs.Path; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Parameters; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; +import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; +import static io.trino.plugin.hive.HiveFileSystemTestUtils.filterTable; +import static io.trino.plugin.hive.HiveFileSystemTestUtils.newSession; +import static io.trino.plugin.hive.HiveFileSystemTestUtils.readTable; +import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories; +import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveRecordCursorProviders; +import static io.trino.plugin.hive.HiveType.HIVE_INT; +import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; +import static io.trino.spi.connector.MetadataProvider.NOOP_METADATA_PROVIDER; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.testing.QueryAssertions.assertEqualsIgnoreOrder; +import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER; +import static java.lang.String.format; +import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.Executors.newScheduledThreadPool; +import static org.testng.util.Strings.isNullOrEmpty; + +public class TestHiveFileSystemS3SelectJsonPushdown +{ + private SchemaTableName tableJson; + + private HdfsEnvironment hdfsEnvironment; + private LocationService locationService; + private AbstractTestHiveFileSystem.TestingHiveMetastore metastoreClient; + private HiveMetadataFactory metadataFactory; + private HiveTransactionManager transactionManager; + private ConnectorSplitManager splitManager; + private ConnectorPageSourceProvider pageSourceProvider; + + private ExecutorService executor; + private HiveConfig config; + private ScheduledExecutorService heartbeatService; + + @Parameters({ + "hive.hadoop2.metastoreHost", + "hive.hadoop2.metastorePort", + "hive.hadoop2.databaseName", + "hive.hadoop2.s3.awsAccessKey", + "hive.hadoop2.s3.awsSecretKey", + "hive.hadoop2.s3.writableBucket", + "hive.hadoop2.s3.testDirectory", + }) + @BeforeClass + public void setup(String host, int port, String databaseName, String awsAccessKey, String awsSecretKey, String writableBucket, String testDirectory) + { + checkArgument(!isNullOrEmpty(host), "Expected non empty host"); + checkArgument(!isNullOrEmpty(databaseName), "Expected non empty databaseName"); + checkArgument(!isNullOrEmpty(awsAccessKey), "Expected non empty awsAccessKey"); + checkArgument(!isNullOrEmpty(awsSecretKey), "Expected non empty awsSecretKey"); + checkArgument(!isNullOrEmpty(writableBucket), "Expected non empty writableBucket"); + checkArgument(!isNullOrEmpty(testDirectory), "Expected non empty testDirectory"); + + executor = newCachedThreadPool(daemonThreadsNamed("s3select-json-%s")); + heartbeatService = newScheduledThreadPool(1); + + ConfigurationInitializer s3Config = new TrinoS3ConfigurationInitializer(new HiveS3Config() + .setS3AwsAccessKey(awsAccessKey) + .setS3AwsSecretKey(awsSecretKey)); + HdfsConfigurationInitializer initializer = new HdfsConfigurationInitializer(new HdfsConfig(), ImmutableSet.of(s3Config)); + HdfsConfiguration hdfsConfiguration = new DynamicHdfsConfiguration(initializer, ImmutableSet.of()); + + config = new HiveConfig().setS3SelectPushdownEnabled(true); + HivePartitionManager hivePartitionManager = new HivePartitionManager(config); + + hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, new HdfsConfig(), new NoHdfsAuthentication()); + locationService = new HiveLocationService(hdfsEnvironment); + JsonCodec partitionUpdateCodec = JsonCodec.jsonCodec(PartitionUpdate.class); + + metastoreClient = new AbstractTestHiveFileSystem.TestingHiveMetastore( + new BridgingHiveMetastore( + testingThriftHiveMetastoreBuilder() + .metastoreClient(HostAndPort.fromParts(host, port)) + .hiveConfig(config) + .hdfsEnvironment(hdfsEnvironment) + .build()), + new Path(format("s3a://%s/%s/", writableBucket, testDirectory)), + hdfsEnvironment); + metadataFactory = new HiveMetadataFactory( + new CatalogName("hive"), + config, + new HiveMetastoreConfig(), + HiveMetastoreFactory.ofInstance(metastoreClient), + hdfsEnvironment, + hivePartitionManager, + newDirectExecutorService(), + heartbeatService, + TESTING_TYPE_MANAGER, + NOOP_METADATA_PROVIDER, + locationService, + partitionUpdateCodec, + new NodeVersion("test_version"), + new NoneHiveRedirectionsProvider(), + ImmutableSet.of( + new PartitionsSystemTableProvider(hivePartitionManager, TESTING_TYPE_MANAGER), + new PropertiesSystemTableProvider()), + new DefaultHiveMaterializedViewMetadataFactory(), + SqlStandardAccessControlMetadata::new, + new FileSystemDirectoryLister(), + new PartitionProjectionService(config, ImmutableMap.of(), new TestingTypeManager())); + transactionManager = new HiveTransactionManager(metadataFactory); + + splitManager = new HiveSplitManager( + transactionManager, + hivePartitionManager, + new NamenodeStats(), + hdfsEnvironment, + new BoundedExecutor(executor, config.getMaxSplitIteratorThreads()), + new CounterStat(), + config.getMaxOutstandingSplits(), + config.getMaxOutstandingSplitsSize(), + config.getMinPartitionBatchSize(), + config.getMaxPartitionBatchSize(), + config.getMaxInitialSplits(), + config.getSplitLoaderConcurrency(), + config.getMaxSplitsPerSecond(), + config.getRecursiveDirWalkerEnabled(), + TESTING_TYPE_MANAGER); + + pageSourceProvider = new HivePageSourceProvider( + TESTING_TYPE_MANAGER, + hdfsEnvironment, + config, + getDefaultHivePageSourceFactories(hdfsEnvironment, config), + getDefaultHiveRecordCursorProviders(config, hdfsEnvironment), + new GenericHiveRecordCursorProvider(hdfsEnvironment, config), + Optional.empty()); + + tableJson = new SchemaTableName(databaseName, "trino_s3select_test_external_fs_json"); + } + + @Test + public void testGetRecordsJson() + throws Exception + { + assertEqualsIgnoreOrder( + readTable(tableJson, transactionManager, config, pageSourceProvider, splitManager), + MaterializedResult.resultBuilder(newSession(config), BIGINT, BIGINT) + .row(2L, 4L).row(5L, 6L) // test_table.json + .row(7L, 23L).row(28L, 22L).row(13L, 10L) // test_table.json.gz + .row(1L, 19L).row(6L, 3L).row(24L, 22L).row(100L, 77L) // test_table.json.bz2 + .build()); + } + + @Test + public void testFilterRecordsJson() + throws Exception + { + List projectedColumns = ImmutableList.of( + createBaseColumn("col_1", 0, HIVE_INT, BIGINT, REGULAR, Optional.empty())); + + assertEqualsIgnoreOrder( + filterTable(tableJson, projectedColumns, transactionManager, config, pageSourceProvider, splitManager), + MaterializedResult.resultBuilder(newSession(config), BIGINT) + .row(2L).row(5L) // test_table.json + .row(7L).row(28L).row(13L) // test_table.json.gz + .row(1L).row(6L).row(24L).row(100L) // test_table.json.bz2 + .build()); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + if (executor != null) { + executor.shutdownNow(); + executor = null; + } + if (heartbeatService != null) { + heartbeatService.shutdownNow(); + heartbeatService = null; + } + } +} diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java index 46572b1c1d75..1b5a8c27a455 100644 --- a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/s3select/TestHiveFileSystemS3SelectPushdown.java @@ -36,7 +36,6 @@ public class TestHiveFileSystemS3SelectPushdown { protected SchemaTableName tableWithPipeDelimiter; protected SchemaTableName tableWithCommaDelimiter; - protected SchemaTableName tableJson; @Parameters({ "hive.hadoop2.metastoreHost", @@ -53,7 +52,6 @@ public void setup(String host, int port, String databaseName, String awsAccessKe super.setup(host, port, databaseName, awsAccessKey, awsSecretKey, writableBucket, testDirectory, true); tableWithPipeDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_pipe_delimiter"); tableWithCommaDelimiter = new SchemaTableName(database, "trino_s3select_test_external_fs_with_comma_delimiter"); - tableJson = new SchemaTableName(database, "trino_s3select_test_external_fs_json"); } @Test @@ -113,33 +111,4 @@ public void testFilterRecordsWithCommaDelimiter() .row(11L).row(1L).row(21L).row(0L) // test_table_with_comma_delimiter.csv.bz2 .build()); } - - @Test - public void testGetRecordsJson() - throws Exception - { - assertEqualsIgnoreOrder( - readTable(tableJson), - MaterializedResult.resultBuilder(newSession(), BIGINT, BIGINT) - .row(2L, 4L).row(5L, 6L) // test_table.json - .row(7L, 23L).row(28L, 22L).row(13L, 10L) // test_table.json.gz - .row(1L, 19L).row(6L, 3L).row(24L, 22L).row(100L, 77L) // test_table.json.bz2 - .build()); - } - - @Test - public void testFilterRecordsJson() - throws Exception - { - List projectedColumns = ImmutableList.of( - createBaseColumn("col_1", 0, HIVE_INT, BIGINT, REGULAR, Optional.empty())); - - assertEqualsIgnoreOrder( - filterTable(tableJson, projectedColumns), - MaterializedResult.resultBuilder(newSession(), BIGINT) - .row(2L).row(5L) // test_table.json - .row(7L).row(28L).row(13L) // test_table.json.gz - .row(1L).row(6L).row(24L).row(100L) // test_table.json.bz2 - .build()); - } } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index 9939382d9a1f..4f0864935287 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -30,7 +30,6 @@ import io.trino.hdfs.authentication.NoHdfsAuthentication; import io.trino.operator.GroupByHashPageIndexerFactory; import io.trino.plugin.base.CatalogName; -import io.trino.plugin.hive.AbstractTestHive.HiveTransaction; import io.trino.plugin.hive.AbstractTestHive.Transaction; import io.trino.plugin.hive.aws.athena.PartitionProjectionService; import io.trino.plugin.hive.fs.FileSystemDirectoryLister; @@ -66,11 +65,9 @@ import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.TestingTypeManager; -import io.trino.spi.type.Type; import io.trino.spi.type.TypeOperators; import io.trino.sql.gen.JoinCompiler; import io.trino.testing.MaterializedResult; -import io.trino.testing.MaterializedRow; import io.trino.testing.TestingNodeManager; import io.trino.type.BlockTypeOperators; import org.apache.hadoop.fs.FileSystem; @@ -88,9 +85,7 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; -import java.util.stream.IntStream; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; @@ -105,7 +100,6 @@ import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveFileWriterFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHivePageSourceFactories; import static io.trino.plugin.hive.HiveTestUtils.getDefaultHiveRecordCursorProviders; -import static io.trino.plugin.hive.HiveTestUtils.getHiveSession; import static io.trino.plugin.hive.HiveTestUtils.getHiveSessionProperties; import static io.trino.plugin.hive.HiveTestUtils.getTypes; import static io.trino.plugin.hive.HiveType.HIVE_LONG; @@ -268,12 +262,22 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec protected ConnectorSession newSession() { - return getHiveSession(config); + return HiveFileSystemTestUtils.newSession(config); } protected Transaction newTransaction() { - return new HiveTransaction(transactionManager); + return HiveFileSystemTestUtils.newTransaction(transactionManager); + } + + protected MaterializedResult readTable(SchemaTableName tableName) throws IOException + { + return HiveFileSystemTestUtils.readTable(tableName, transactionManager, config, pageSourceProvider, splitManager); + } + + protected MaterializedResult filterTable(SchemaTableName tableName, List projectedColumns) throws IOException + { + return HiveFileSystemTestUtils.filterTable(tableName, projectedColumns, transactionManager, config, pageSourceProvider, splitManager); } @Test @@ -565,89 +569,12 @@ private void dropTable(SchemaTableName table) } } - protected MaterializedResult readTable(SchemaTableName tableName) - throws IOException - { - try (Transaction transaction = newTransaction()) { - ConnectorMetadata metadata = transaction.getMetadata(); - ConnectorSession session = newSession(); - - ConnectorTableHandle table = getTableHandle(metadata, tableName); - List columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, table).values()); - - metadata.beginQuery(session); - ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, table); - - List allTypes = getTypes(columnHandles); - List dataTypes = getTypes(columnHandles.stream() - .filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden()) - .collect(toImmutableList())); - MaterializedResult.Builder result = MaterializedResult.resultBuilder(session, dataTypes); - - List splits = getAllSplits(splitSource); - for (ConnectorSplit split : splits) { - try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, table, columnHandles, DynamicFilter.EMPTY)) { - MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes); - for (MaterializedRow row : pageSourceResult.getMaterializedRows()) { - Object[] dataValues = IntStream.range(0, row.getFieldCount()) - .filter(channel -> !((HiveColumnHandle) columnHandles.get(channel)).isHidden()) - .mapToObj(row::getField) - .toArray(); - result.row(dataValues); - } - } - } - - metadata.cleanupQuery(session); - return result.build(); - } - } - - protected MaterializedResult filterTable(SchemaTableName tableName, List columnHandles) - throws IOException - { - try (Transaction transaction = newTransaction()) { - ConnectorMetadata metadata = transaction.getMetadata(); - ConnectorSession session = newSession(); - - ConnectorTableHandle table = getTableHandle(metadata, tableName); - - metadata.beginQuery(session); - ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, table); - - List allTypes = getTypes(columnHandles); - List dataTypes = getTypes(columnHandles.stream() - .filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden()) - .collect(toImmutableList())); - MaterializedResult.Builder result = MaterializedResult.resultBuilder(session, dataTypes); - - List splits = getAllSplits(splitSource); - for (ConnectorSplit split : splits) { - try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, table, columnHandles, DynamicFilter.EMPTY)) { - MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes); - for (MaterializedRow row : pageSourceResult.getMaterializedRows()) { - Object[] dataValues = IntStream.range(0, row.getFieldCount()) - .filter(channel -> !((HiveColumnHandle) columnHandles.get(channel)).isHidden()) - .mapToObj(row::getField) - .toArray(); - result.row(dataValues); - } - } - } - - metadata.cleanupQuery(session); - return result.build(); - } - } - private ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName) { - ConnectorTableHandle handle = metadata.getTableHandle(newSession(), tableName); - checkArgument(handle != null, "table not found: %s", tableName); - return handle; + return HiveFileSystemTestUtils.getTableHandle(metadata, tableName, newSession()); } - protected static class TestingHiveMetastore + public static class TestingHiveMetastore extends ForwardingHiveMetastore { private final Path basePath; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveFileSystemTestUtils.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveFileSystemTestUtils.java new file mode 100644 index 000000000000..474268cc3438 --- /dev/null +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveFileSystemTestUtils.java @@ -0,0 +1,145 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import com.google.common.collect.ImmutableList; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorSplitManager; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.DynamicFilter; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.Type; +import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedRow; + +import java.io.IOException; +import java.util.List; +import java.util.stream.IntStream; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.hive.AbstractTestHive.getAllSplits; +import static io.trino.plugin.hive.AbstractTestHive.getSplits; +import static io.trino.plugin.hive.HiveTestUtils.getHiveSession; +import static io.trino.plugin.hive.HiveTestUtils.getTypes; +import static io.trino.testing.MaterializedResult.materializeSourceDataStream; + +public class HiveFileSystemTestUtils +{ + private HiveFileSystemTestUtils() {} + + public static MaterializedResult readTable(SchemaTableName tableName, HiveTransactionManager transactionManager, + HiveConfig config, ConnectorPageSourceProvider pageSourceProvider, + ConnectorSplitManager splitManager) + throws IOException + { + try (AbstractTestHive.Transaction transaction = newTransaction(transactionManager)) { + ConnectorMetadata metadata = transaction.getMetadata(); + ConnectorSession session = newSession(config); + + ConnectorTableHandle table = getTableHandle(metadata, tableName, session); + List columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, table).values()); + + metadata.beginQuery(session); + ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, table); + + List allTypes = getTypes(columnHandles); + List dataTypes = getTypes(columnHandles.stream() + .filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden()) + .collect(toImmutableList())); + MaterializedResult.Builder result = MaterializedResult.resultBuilder(session, dataTypes); + + List splits = getAllSplits(splitSource); + for (ConnectorSplit split : splits) { + try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, table, columnHandles, DynamicFilter.EMPTY)) { + MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes); + for (MaterializedRow row : pageSourceResult.getMaterializedRows()) { + Object[] dataValues = IntStream.range(0, row.getFieldCount()) + .filter(channel -> !((HiveColumnHandle) columnHandles.get(channel)).isHidden()) + .mapToObj(row::getField) + .toArray(); + result.row(dataValues); + } + } + } + + metadata.cleanupQuery(session); + return result.build(); + } + } + + public static ConnectorTableHandle getTableHandle(ConnectorMetadata metadata, SchemaTableName tableName, ConnectorSession session) + { + ConnectorTableHandle handle = metadata.getTableHandle(session, tableName); + checkArgument(handle != null, "table not found: %s", tableName); + return handle; + } + + public static ConnectorSession newSession(HiveConfig config) + { + return getHiveSession(config); + } + + public static AbstractTestHive.Transaction newTransaction(HiveTransactionManager transactionManager) + { + return new AbstractTestHive.HiveTransaction(transactionManager); + } + + public static MaterializedResult filterTable(SchemaTableName tableName, List projectedColumns, + HiveTransactionManager transactionManager, + HiveConfig config, ConnectorPageSourceProvider pageSourceProvider, + ConnectorSplitManager splitManager) + throws IOException + { + try (AbstractTestHive.Transaction transaction = newTransaction(transactionManager)) { + ConnectorMetadata metadata = transaction.getMetadata(); + ConnectorSession session = newSession(config); + + ConnectorTableHandle table = getTableHandle(metadata, tableName, session); + + metadata.beginQuery(session); + ConnectorSplitSource splitSource = getSplits(splitManager, transaction, session, table); + + List allTypes = getTypes(projectedColumns); + List dataTypes = getTypes(projectedColumns.stream() + .filter(columnHandle -> !((HiveColumnHandle) columnHandle).isHidden()) + .collect(toImmutableList())); + MaterializedResult.Builder result = MaterializedResult.resultBuilder(session, dataTypes); + + List splits = getAllSplits(splitSource); + for (ConnectorSplit split : splits) { + try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), + session, split, table, projectedColumns, DynamicFilter.EMPTY)) { + MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes); + for (MaterializedRow row : pageSourceResult.getMaterializedRows()) { + Object[] dataValues = IntStream.range(0, row.getFieldCount()) + .filter(channel -> !((HiveColumnHandle) projectedColumns.get(channel)).isHidden()) + .mapToObj(row::getField) + .toArray(); + result.row(dataValues); + } + } + } + + metadata.cleanupQuery(session); + return result.build(); + } + } +}