diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 42a53dd3a208..27226fd768dd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -219,7 +219,7 @@ jobs: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESSKEY }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRETKEY }} S3_BUCKET: "presto-ci-test" - S3_BUCKET_ENDPOINT: "s3.us-east-2.amazonaws.com" + S3_BUCKET_ENDPOINT: "https://s3.us-east-2.amazonaws.com" run: | if [ "${AWS_ACCESS_KEY_ID}" != "" ]; then source plugin/trino-hive-hadoop2/conf/hive-tests-${{ matrix.config }}.sh && diff --git a/docs/src/main/sphinx/connector/hive.rst b/docs/src/main/sphinx/connector/hive.rst index dbecaf062196..f6dc03322c74 100644 --- a/docs/src/main/sphinx/connector/hive.rst +++ b/docs/src/main/sphinx/connector/hive.rst @@ -594,7 +594,7 @@ Property Name Description * - ``hive.metastore.client.keytab`` - Hive metastore client keytab location. * - ``hive.metastore.thrift.delete-files-on-drop`` - - Actively delete the files for drop table operations, for cases when the + - Actively delete the files for drop table or partition operations, for cases when the metastore does not delete the files. Default is ``false``. .. _hive-glue-metastore: diff --git a/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh b/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh index 6ad38ab66de5..57c3c090bf75 100755 --- a/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh +++ b/plugin/trino-hive-hadoop2/bin/run_hive_s3_tests.sh @@ -92,6 +92,7 @@ set +e -Dhive.hadoop2.metastoreHost=localhost \ -Dhive.hadoop2.metastorePort=9083 \ -Dhive.hadoop2.databaseName=default \ + -Dhive.hadoop2.s3.endpoint="${S3_BUCKET_ENDPOINT}" \ -Dhive.hadoop2.s3.awsAccessKey="${AWS_ACCESS_KEY_ID}" \ -Dhive.hadoop2.s3.awsSecretKey="${AWS_SECRET_ACCESS_KEY}" \ -Dhive.hadoop2.s3.writableBucket="${S3_BUCKET}" \ diff --git a/plugin/trino-hive-hadoop2/pom.xml b/plugin/trino-hive-hadoop2/pom.xml index 5e01a8be231a..49e6599568d6 100644 --- a/plugin/trino-hive-hadoop2/pom.xml +++ b/plugin/trino-hive-hadoop2/pom.xml @@ -82,6 +82,18 @@ runtime + + com.amazonaws + aws-java-sdk-core + runtime + + + + com.amazonaws + aws-java-sdk-s3 + runtime + + com.qubole.rubix rubix-presto-shaded @@ -146,6 +158,12 @@ test + + io.trino + trino-testing-containers + test + + io.airlift testing @@ -180,6 +198,7 @@ **/TestHive.java **/TestHiveAlluxioMetastore.java + **/TestHiveThriftMetastoreWithS3.java **/TestHiveFileSystemS3.java **/TestHiveFileSystemS3SelectPushdown.java **/TestHiveFileSystemS3SelectJsonPushdown.java @@ -221,6 +240,7 @@ maven-surefire-plugin + **/TestHiveThriftMetastoreWithS3.java **/TestHiveFileSystemS3.java **/TestHiveFileSystemS3SelectPushdown.java **/TestHiveFileSystemS3SelectCsvPushdownWithSplits.java diff --git a/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveThriftMetastoreWithS3.java b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveThriftMetastoreWithS3.java new file mode 100644 index 000000000000..718cd1a9ca31 --- /dev/null +++ b/plugin/trino-hive-hadoop2/src/test/java/io/trino/plugin/hive/TestHiveThriftMetastoreWithS3.java @@ -0,0 +1,207 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hive; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; +import io.trino.plugin.hive.containers.HiveHadoop; +import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreConfig; +import io.trino.plugin.hive.s3.S3HiveQueryRunner; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Parameters; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.List; + +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestHiveThriftMetastoreWithS3 + extends AbstractTestQueryFramework +{ + private final String s3endpoint; + private final String awsAccessKey; + private final String awsSecretKey; + private final String writableBucket; + private final String schemaName; + private final Path hadoopCoreSiteXmlTempFile; + private final AmazonS3 s3Client; + + @Parameters({ + "hive.hadoop2.s3.endpoint", + "hive.hadoop2.s3.awsAccessKey", + "hive.hadoop2.s3.awsSecretKey", + "hive.hadoop2.s3.writableBucket", + }) + public TestHiveThriftMetastoreWithS3( + String s3endpoint, + String awsAccessKey, + String awsSecretKey, + String writableBucket) + throws IOException + { + this.s3endpoint = requireNonNull(s3endpoint, "s3endpoint is null"); + this.awsAccessKey = requireNonNull(awsAccessKey, "awsAccessKey is null"); + this.awsSecretKey = requireNonNull(awsSecretKey, "awsSecretKey is null"); + this.writableBucket = requireNonNull(writableBucket, "writableBucket is null"); + this.schemaName = "test_thrift_s3_" + randomTableSuffix(); + + String coreSiteXmlContent = Resources.toString(Resources.getResource("s3/hive-core-site.template.xml"), UTF_8) + .replace("%S3_BUCKET_ENDPOINT%", s3endpoint) + .replace("%AWS_ACCESS_KEY_ID%", awsAccessKey) + .replace("%AWS_SECRET_ACCESS_KEY%", awsSecretKey); + + hadoopCoreSiteXmlTempFile = Files.createTempFile("core-site", ".xml", PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--"))); + hadoopCoreSiteXmlTempFile.toFile().deleteOnExit(); + Files.writeString(hadoopCoreSiteXmlTempFile, coreSiteXmlContent); + + s3Client = AmazonS3Client.builder() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(s3endpoint, null)) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsAccessKey, awsSecretKey))) + .build(); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + HiveHadoop hiveHadoop = HiveHadoop.builder() + .withFilesToMount(ImmutableMap.of("/etc/hadoop/conf/core-site.xml", hadoopCoreSiteXmlTempFile.normalize().toAbsolutePath().toString())) + .build(); + hiveHadoop.start(); + + return S3HiveQueryRunner.builder() + .setHiveMetastoreEndpoint(hiveHadoop.getHiveMetastoreEndpoint()) + .setS3Endpoint(s3endpoint) + .setS3AccessKey(awsAccessKey) + .setS3SecretKey(awsSecretKey) + .setBucketName(writableBucket) + .setCreateTpchSchemas(false) + .setThriftMetastoreConfig(new ThriftMetastoreConfig().setDeleteFilesOnDrop(true)) + .setHiveProperties(ImmutableMap.of("hive.allow-register-partition-procedure", "true")) + .build(); + } + + @BeforeClass + public void setUp() + { + String schemaLocation = "s3a://%s/%s".formatted(writableBucket, schemaName); + assertUpdate("CREATE SCHEMA " + schemaName + " WITH (location = '" + schemaLocation + "')"); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + assertUpdate("DROP SCHEMA IF EXISTS " + schemaName); + } + + @Test + public void testRecreateTable() + { + String tableName = "test_recreate_table_" + randomTableSuffix(); + String schemaTableName = "%s.%s".formatted(schemaName, tableName); + String tableLocation = "%s/%s".formatted(schemaName, tableName); + + // Creating a new table generates special empty file on S3 (not MinIO) + assertUpdate("CREATE TABLE " + schemaTableName + "(col int)"); + try { + assertUpdate("INSERT INTO " + schemaTableName + " VALUES (1)", 1); + assertThat(getS3ObjectSummaries(tableLocation)).hasSize(2); // directory + file + + // DROP TABLE with Thrift metastore on S3 (not MinIO) leaves some files + // when 'hive.metastore.thrift.delete-files-on-drop' config property is false. + // Then, the subsequent CREATE TABLE throws "Target directory for table 'xxx' already exists" + assertUpdate("DROP TABLE " + schemaTableName); + assertThat(getS3ObjectSummaries(tableLocation)).hasSize(0); + + assertUpdate("CREATE TABLE " + schemaTableName + "(col int)"); + } + finally { + assertUpdate("DROP TABLE IF EXISTS " + schemaTableName); + } + } + + @Test + public void testRecreatePartition() + { + String tableName = "test_recreate_partition_" + randomTableSuffix(); + String schemaTableName = "%s.%s".formatted(schemaName, tableName); + String partitionLocation = "%s/%s/part=1".formatted(schemaName, tableName); + + assertUpdate("CREATE TABLE " + schemaTableName + "(col int, part int) WITH (partitioned_by = ARRAY['part'])"); + try { + // Creating an empty partition generates special empty file on S3 (not MinIO) + assertUpdate("CALL system.create_empty_partition('%s', '%s', ARRAY['part'], ARRAY['1'])".formatted(schemaName, tableName)); + assertUpdate("INSERT INTO " + schemaTableName + " VALUES (1, 1)", 1); + assertQuery("SELECT * FROM " + schemaTableName, "VALUES (1, 1)"); + + assertThat(getS3ObjectSummaries(partitionLocation)).hasSize(2); // directory + file + + // DELETE with Thrift metastore on S3 (not MinIO) leaves some files + // when 'hive.metastore.thrift.delete-files-on-drop' config property is false. + // Then, the subsequent SELECT doesn't return an empty row + assertUpdate("DELETE FROM " + schemaTableName); + assertThat(getS3ObjectSummaries(partitionLocation)).hasSize(0); + + assertUpdate("CALL system.create_empty_partition('%s', '%s', ARRAY['part'], ARRAY['1'])".formatted(schemaName, tableName)); + assertQueryReturnsEmptyResult("SELECT * FROM " + schemaTableName); + } + finally { + assertUpdate("DROP TABLE " + schemaTableName); + } + } + + @Test + public void testUnregisterPartitionNotRemoveData() + { + // Verify unregister_partition procedure doesn't remove physical data even when 'hive.metastore.thrift.delete-files-on-drop' config property is true + String tableName = "test_recreate_partition_" + randomTableSuffix(); + String schemaTableName = "%s.%s".formatted(schemaName, tableName); + + assertUpdate("CREATE TABLE " + schemaTableName + "(col int, part int) WITH (partitioned_by = ARRAY['part'])"); + try { + assertUpdate("INSERT INTO " + schemaTableName + " VALUES (1, 1)", 1); + assertQuery("SELECT * FROM " + schemaTableName, "VALUES (1, 1)"); + + assertUpdate("CALL system.unregister_partition('%s', '%s', ARRAY['part'], ARRAY['1'])".formatted(schemaName, tableName)); + assertQueryReturnsEmptyResult("SELECT * FROM " + schemaTableName); + + assertUpdate("CALL system.register_partition('%s', '%s', ARRAY['part'], ARRAY['1'])".formatted(schemaName, tableName)); + assertQuery("SELECT * FROM " + schemaTableName, "VALUES (1, 1)"); + } + finally { + assertUpdate("DROP TABLE " + schemaTableName); + } + } + + private List getS3ObjectSummaries(String prefix) + { + return s3Client.listObjectsV2(writableBucket, prefix).getObjectSummaries(); + } +} diff --git a/plugin/trino-hive-hadoop2/src/test/resources/s3/hive-core-site.template.xml b/plugin/trino-hive-hadoop2/src/test/resources/s3/hive-core-site.template.xml new file mode 100644 index 000000000000..a3dc6ad47d4b --- /dev/null +++ b/plugin/trino-hive-hadoop2/src/test/resources/s3/hive-core-site.template.xml @@ -0,0 +1,43 @@ + + + + fs.defaultFS + hdfs://hadoop-master:9000 + + + + fs.s3a.endpoint + %S3_BUCKET_ENDPOINT% + + + + fs.s3.awsAccessKeyId + %AWS_ACCESS_KEY_ID% + + + + fs.s3.awsSecretAccessKey + %AWS_SECRET_ACCESS_KEY% + + + + fs.s3a.access.key + %AWS_ACCESS_KEY_ID% + + + + fs.s3a.secret.key + %AWS_SECRET_ACCESS_KEY% + + + + + hadoop.proxyuser.hive.hosts + * + + + + hadoop.proxyuser.hive.groups + * + + diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java index 7ad504ed3250..df85843f7d15 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/thrift/ThriftHiveMetastore.java @@ -1139,7 +1139,12 @@ public void dropPartition(String databaseName, String tableName, List pa .stopOnIllegalExceptions() .run("dropPartition", stats.getDropPartition().wrap(() -> { try (ThriftMetastoreClient client = createMetastoreClient()) { + Partition partition = client.getPartition(databaseName, tableName, parts); client.dropPartition(databaseName, tableName, parts, deleteData); + String partitionLocation = partition.getSd().getLocation(); + if (deleteFilesOnDrop && deleteData && !isNullOrEmpty(partitionLocation) && isManagedTable(client.getTable(databaseName, tableName))) { + deleteDirRecursive(new Path(partitionLocation)); + } } return null; })); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java index 3a12ac6d7090..1f2c9eca834f 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/HiveQueryRunner.java @@ -117,6 +117,7 @@ public static class Builder> private Optional directoryLister = Optional.empty(); private boolean tpcdsCatalogEnabled; private String security = SQL_STANDARD; + private boolean createTpchSchemas = true; private ColumnNaming tpchColumnNaming = SIMPLIFIED; private DecimalTypeMapping tpchDecimalTypeMapping = DOUBLE; @@ -197,6 +198,12 @@ public SELF setSecurity(String security) return self(); } + public SELF setCreateTpchSchemas(boolean createTpchSchemas) + { + this.createTpchSchemas = createTpchSchemas; + return self(); + } + public SELF setTpchColumnNaming(ColumnNaming tpchColumnNaming) { this.tpchColumnNaming = requireNonNull(tpchColumnNaming, "tpchColumnNaming is null"); @@ -255,7 +262,9 @@ public DistributedQueryRunner build() queryRunner.createCatalog(HIVE_CATALOG, "hive", hiveProperties); queryRunner.createCatalog(HIVE_BUCKETED_CATALOG, "hive", hiveBucketedProperties); - populateData(queryRunner, metastore); + if (createTpchSchemas) { + populateData(queryRunner, metastore); + } return queryRunner; } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/S3HiveQueryRunner.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/S3HiveQueryRunner.java index 992ced70ad1b..594ac2da9e1b 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/S3HiveQueryRunner.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/s3/S3HiveQueryRunner.java @@ -21,6 +21,7 @@ import io.trino.plugin.hive.containers.HiveMinioDataLake; import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore; import io.trino.plugin.hive.metastore.thrift.TestingTokenAwareMetastoreClientFactory; +import io.trino.plugin.hive.metastore.thrift.ThriftMetastoreConfig; import io.trino.testing.DistributedQueryRunner; import io.trino.tpch.TpchTable; @@ -85,6 +86,7 @@ public static class Builder { private HostAndPort hiveMetastoreEndpoint; private Duration thriftMetastoreTimeout = TestingTokenAwareMetastoreClientFactory.TIMEOUT; + private ThriftMetastoreConfig thriftMetastoreConfig = new ThriftMetastoreConfig(); private String s3Endpoint; private String s3AccessKey; private String s3SecretKey; @@ -102,6 +104,12 @@ public Builder setThriftMetastoreTimeout(Duration thriftMetastoreTimeout) return this; } + public Builder setThriftMetastoreConfig(ThriftMetastoreConfig thriftMetastoreConfig) + { + this.thriftMetastoreConfig = requireNonNull(thriftMetastoreConfig, "thriftMetastoreConfig is null"); + return this; + } + public Builder setS3Endpoint(String s3Endpoint) { this.s3Endpoint = requireNonNull(s3Endpoint, "s3Endpoint is null"); @@ -145,6 +153,7 @@ public DistributedQueryRunner build() setMetastore(distributedQueryRunner -> new BridgingHiveMetastore( testingThriftHiveMetastoreBuilder() .metastoreClient(hiveMetastoreEndpoint, thriftMetastoreTimeout) + .thriftMetastoreConfig(thriftMetastoreConfig) .build())); setInitialSchemasLocationBase("s3a://" + bucketName); // cannot use s3:// as Hive metastore is not configured to accept it return super.build();