diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dc6013fc1e5e..8d1b07064a06 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -424,7 +424,7 @@ jobs: strategy: fail-fast: false matrix: ${{ fromJson(needs.build-test-matrix.outputs.matrix) }} - timeout-minutes: 60 + timeout-minutes: 120 steps: - uses: actions/checkout@v2 with: diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 330c2861a92f..e6de37deb417 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -419,6 +419,7 @@ **/TestIceberg*FailureRecoveryTest.java **/TestIcebergGlueCatalogConnectorSmokeTest.java + **/TestIcebergGlueConnectorTest.java **/TestTrinoGlueCatalogTest.java **/TestSharedGlueMetastore.java **/TestIcebergGlueCatalogAccessOperations.java @@ -459,6 +460,7 @@ **/TestIcebergGlueCatalogConnectorSmokeTest.java + **/TestIcebergGlueConnectorTest.java **/TestTrinoGlueCatalogTest.java **/TestSharedGlueMetastore.java **/TestIcebergGlueCatalogAccessOperations.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index ec0ba4e0c6b3..bf2c0523fad4 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -243,6 +243,12 @@ public Optional getSchemaOwner(ConnectorSession session, Catalog return catalog.getNamespacePrincipal(session, schemaName.getSchemaName()); } + @Override + public SchemaTableName getSchemaTableName(ConnectorSession session, ConnectorTableHandle table) + { + return ((IcebergTableHandle) table).getSchemaTableName(); + } + @Override public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index a3590bcbbafa..b681eda8d8e1 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -16,6 +16,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.log.Level; +import io.airlift.log.Logging; import io.airlift.units.DataSize; import io.trino.Session; import io.trino.metadata.Metadata; @@ -40,7 +42,7 @@ import io.trino.testing.sql.TestTable; import io.trino.tpch.TpchTable; import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -57,7 +59,6 @@ import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.util.List; import java.util.Locale; import java.util.Map; @@ -125,6 +126,8 @@ public abstract class BaseIcebergConnectorTest protected BaseIcebergConnectorTest(IcebergFileFormat format) { this.format = requireNonNull(format, "format is null"); + Logging logging = Logging.initialize(); + logging.setLevel("org.apache.iceberg.BaseTableScan", Level.WARN); // To suppress huge "Scanning table ... filter" message } @Override @@ -213,9 +216,9 @@ public void testDescribeTable() @Test public void testShowCreateTable() { - File tempDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile(); + String schemaName = getSession().getSchema().orElseThrow(); assertThat(computeActual("SHOW CREATE TABLE orders").getOnlyValue()) - .isEqualTo("CREATE TABLE iceberg.tpch.orders (\n" + + .isEqualTo("CREATE TABLE iceberg." + schemaName + ".orders (\n" + " orderkey bigint,\n" + " custkey bigint,\n" + " orderstatus varchar,\n" + @@ -229,7 +232,7 @@ public void testShowCreateTable() "WITH (\n" + " format = '" + format.name() + "',\n" + " format_version = 2,\n" + - " location = '" + tempDir + "/iceberg_data/tpch/orders'\n" + + " location = '" + getSchemaLocation() + "/orders'\n" + ")"); } @@ -854,10 +857,11 @@ public void testCreatePartitionedTableAs() @Test public void testTableComments() { + String schemaName = getSession().getSchema().orElseThrow(); File tempDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile(); String tempDirPath = tempDir.toURI().toASCIIString() + randomTableSuffix(); String createTableTemplate = "" + - "CREATE TABLE iceberg.tpch.test_table_comments (\n" + + "CREATE TABLE iceberg." + schemaName + ".test_table_comments (\n" + " _x bigint\n" + ")\n" + "COMMENT '%s'\n" + @@ -867,7 +871,7 @@ public void testTableComments() format(" location = '%s'\n", tempDirPath) + ")"; String createTableWithoutComment = "" + - "CREATE TABLE iceberg.tpch.test_table_comments (\n" + + "CREATE TABLE iceberg." + schemaName + ".test_table_comments (\n" + " _x bigint\n" + ")\n" + "WITH (\n" + @@ -884,12 +888,12 @@ public void testTableComments() assertUpdate("COMMENT ON TABLE test_table_comments IS NULL"); assertEquals(computeScalar("SHOW CREATE TABLE test_table_comments"), createTableWithoutComment); - dropTable("iceberg.tpch.test_table_comments"); + dropTable("test_table_comments"); assertUpdate(createTableWithoutComment); assertEquals(computeScalar("SHOW CREATE TABLE test_table_comments"), createTableWithoutComment); - dropTable("iceberg.tpch.test_table_comments"); + dropTable("test_table_comments"); } @Test @@ -904,10 +908,10 @@ public void testRollbackSnapshot() assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (456, CAST(654 AS BIGINT))", 1); assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT)), (456, CAST(654 AS BIGINT))"); - assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterFirstInsertId)); + assertUpdate(format("CALL system.rollback_to_snapshot('%s', 'test_rollback', %s)", getSession().getSchema().orElseThrow(), afterFirstInsertId)); assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (123, CAST(987 AS BIGINT))"); - assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterCreateTableId)); + assertUpdate(format("CALL system.rollback_to_snapshot('%s', 'test_rollback', %s)", getSession().getSchema().orElseThrow(), afterCreateTableId)); assertEquals((long) computeActual("SELECT COUNT(*) FROM test_rollback").getOnlyValue(), 0); assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (789, CAST(987 AS BIGINT))", 1); @@ -916,7 +920,7 @@ public void testRollbackSnapshot() // extra insert which should be dropped on rollback assertUpdate("INSERT INTO test_rollback (col0, col1) VALUES (999, CAST(999 AS BIGINT))", 1); - assertUpdate(format("CALL system.rollback_to_snapshot('tpch', 'test_rollback', %s)", afterSecondInsertId)); + assertUpdate(format("CALL system.rollback_to_snapshot('%s', 'test_rollback', %s)", getSession().getSchema().orElseThrow(), afterSecondInsertId)); assertQuery("SELECT * FROM test_rollback ORDER BY col0", "VALUES (789, CAST(987 AS BIGINT))"); dropTable("test_rollback"); @@ -1016,8 +1020,8 @@ public void testCreateTableLike() private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat) { - File tempDir = getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile(); - String tempDirPath = tempDir.toURI().toASCIIString() + randomTableSuffix(); + String tempDir = getBaseDirectory(); + String tempDirPath = tempDir + randomTableSuffix(); // LIKE source INCLUDING PROPERTIES copies all the properties of the source table, including the `location`. // For this reason the source and the copied table will share the same directory. @@ -1036,12 +1040,11 @@ private void testCreateTableLikeForFormat(IcebergFileFormat otherFormat) assertUpdate("CREATE TABLE test_create_table_like_copy1 (LIKE test_create_table_like_original)"); assertEquals(getTablePropertiesString("test_create_table_like_copy1"), "WITH (\n" + - format(" format = '%s',\n format_version = 2,\n location = '%s'\n)", format, tempDir + "/iceberg_data/tpch/test_create_table_like_copy1")); + format(" format = '%s',\n format_version = 2,\n location = '%s/test_create_table_like_copy1'\n)", format, getSchemaLocation())); assertUpdate("CREATE TABLE test_create_table_like_copy2 (LIKE test_create_table_like_original EXCLUDING PROPERTIES)"); assertEquals(getTablePropertiesString("test_create_table_like_copy2"), "WITH (\n" + - format(" format = '%s',\n format_version = 2,\n location = '%s'\n)", format, tempDir + "/iceberg_data/tpch/test_create_table_like_copy2")); - dropTable("test_create_table_like_copy2"); + format(" format = '%s',\n format_version = 2,\n location = '%s/test_create_table_like_copy2'\n)", format, getSchemaLocation())); assertUpdate("CREATE TABLE test_create_table_like_copy3 (LIKE test_create_table_like_original INCLUDING PROPERTIES)"); assertEquals(getTablePropertiesString("test_create_table_like_copy3"), "WITH (\n" + @@ -1865,13 +1868,13 @@ public void testMultipleColumnTableStatistics() @Test public void testPartitionedTableStatistics() { - assertUpdate("CREATE TABLE iceberg.tpch.test_partitioned_table_statistics (col1 REAL, col2 BIGINT) WITH (partitioning = ARRAY['col2'])"); + assertUpdate("CREATE TABLE test_partitioned_table_statistics (col1 REAL, col2 BIGINT) WITH (partitioning = ARRAY['col2'])"); String insertStart = "INSERT INTO test_partitioned_table_statistics"; assertUpdate(insertStart + " VALUES (-10, -1)", 1); assertUpdate(insertStart + " VALUES (100, 10)", 1); - MaterializedResult result = computeActual("SHOW STATS FOR iceberg.tpch.test_partitioned_table_statistics"); + MaterializedResult result = computeActual("SHOW STATS FOR test_partitioned_table_statistics"); assertEquals(result.getRowCount(), 3); MaterializedRow row0 = result.getMaterializedRows().get(0); @@ -1897,7 +1900,7 @@ public void testPartitionedTableStatistics() .mapToObj(i -> "(NULL, 10)") .collect(joining(", ")), 5); - result = computeActual("SHOW STATS FOR iceberg.tpch.test_partitioned_table_statistics"); + result = computeActual("SHOW STATS FOR test_partitioned_table_statistics"); assertEquals(result.getRowCount(), 3); row0 = result.getMaterializedRows().get(0); assertEquals(row0.getField(0), "col1"); @@ -1918,7 +1921,7 @@ public void testPartitionedTableStatistics() .mapToObj(i -> "(100, NULL)") .collect(joining(", ")), 5); - result = computeActual("SHOW STATS FOR iceberg.tpch.test_partitioned_table_statistics"); + result = computeActual("SHOW STATS FOR test_partitioned_table_statistics"); row0 = result.getMaterializedRows().get(0); assertEquals(row0.getField(0), "col1"); assertEquals(row0.getField(3), 5.0 / 17.0); @@ -1934,13 +1937,13 @@ public void testPartitionedTableStatistics() row2 = result.getMaterializedRows().get(2); assertEquals(row2.getField(4), 17.0); - dropTable("iceberg.tpch.test_partitioned_table_statistics"); + dropTable("test_partitioned_table_statistics"); } @Test public void testPredicatePushdown() { - QualifiedObjectName tableName = new QualifiedObjectName("iceberg", "tpch", "test_predicate"); + QualifiedObjectName tableName = new QualifiedObjectName("iceberg", getSession().getSchema().orElseThrow(), "test_predicate"); assertUpdate(format("CREATE TABLE %s (col1 BIGINT, col2 BIGINT, col3 BIGINT) WITH (partitioning = ARRAY['col2', 'col3'])", tableName)); assertUpdate(format("INSERT INTO %s VALUES (1, 10, 100)", tableName), 1L); assertUpdate(format("INSERT INTO %s VALUES (2, 20, 200)", tableName), 1L); @@ -2274,12 +2277,21 @@ public void testFileSizeInManifest() Long fileSizeInBytes = (Long) row.getField(2); totalRecordCount += recordCount; - assertThat(fileSizeInBytes).isEqualTo(Files.size(Paths.get(path))); + + assertThat(fileSizeInBytes).isEqualTo(getFileSize(new org.apache.hadoop.fs.Path(path))); } // Verify sum(record_count) to make sure we have all the files. assertThat(totalRecordCount).isEqualTo(2); } + private long getFileSize(org.apache.hadoop.fs.Path path) + throws IOException + { + HdfsEnvironment.HdfsContext context = new HdfsContext(getSession().toConnectorSession()); + FileSystem fileSystem = HDFS_ENVIRONMENT.getFileSystem(context, path); + return fileSystem.getFileStatus(path).getLen(); + } + @Test public void testIncorrectIcebergFileSizes() throws Exception @@ -2296,7 +2308,12 @@ public void testIncorrectIcebergFileSizes() // Read manifest file Schema schema; GenericData.Record entry = null; - try (DataFileReader dataFileReader = new DataFileReader<>(new File(manifestFile), new GenericDatumReader<>())) { + + HdfsEnvironment.HdfsContext context = new HdfsContext(getSession().toConnectorSession()); + org.apache.hadoop.fs.Path manifestFilePath = new org.apache.hadoop.fs.Path(manifestFile); + FileSystem fs = HDFS_ENVIRONMENT.getFileSystem(context, manifestFilePath); + + try (DataFileStream dataFileReader = new DataFileStream<>(fs.open(manifestFilePath), new GenericDatumReader<>())) { schema = dataFileReader.getSchema(); int recordCount = 0; while (dataFileReader.hasNext()) { @@ -2305,7 +2322,6 @@ public void testIncorrectIcebergFileSizes() } assertEquals(recordCount, 1); } - // Alter data file entry to store incorrect file size GenericData.Record dataFile = (GenericData.Record) entry.get("data_file"); long alteredValue = 50L; @@ -2313,10 +2329,6 @@ public void testIncorrectIcebergFileSizes() dataFile.put("file_size_in_bytes", alteredValue); // Replace the file through HDFS client. This is required for correct checksums. - HdfsEnvironment.HdfsContext context = new HdfsContext(getSession().toConnectorSession()); - org.apache.hadoop.fs.Path manifestFilePath = new org.apache.hadoop.fs.Path(manifestFile); - FileSystem fs = HDFS_ENVIRONMENT.getFileSystem(context, manifestFilePath); - // Write altered metadata try (OutputStream out = fs.create(manifestFilePath); DataFileWriter dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema))) { @@ -3131,7 +3143,7 @@ private List getActiveFiles(String tableName) .collect(toImmutableList()); } - private List getAllDataFilesFromTableDirectory(String tableName) + protected List getAllDataFilesFromTableDirectory(String tableName) throws IOException { String schema = getSession().getSchema().orElseThrow(); @@ -3150,7 +3162,7 @@ public void testOptimizeParameterValidation() { assertQueryFails( "ALTER TABLE no_such_table_exists EXECUTE OPTIMIZE", - "\\Qline 1:1: Table 'iceberg.tpch.no_such_table_exists' does not exist"); + format("\\Qline 1:1: Table 'iceberg.%s.no_such_table_exists' does not exist", getSession().getSchema().orElseThrow())); assertQueryFails( "ALTER TABLE nation EXECUTE OPTIMIZE (file_size_threshold => '33')", "\\QUnable to set catalog 'iceberg' table procedure 'OPTIMIZE' property 'file_size_threshold' to ['33']: size is not a valid data size string: 33"); @@ -3288,14 +3300,14 @@ public void testExplainExpireSnapshotOutput() assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1); assertExplain("EXPLAIN ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')", - "SimpleTableExecute\\[iceberg:schemaTableName:tpch.test_expiring_snapshots.*\\{retentionThreshold:0\\.00s}.*"); + "SimpleTableExecute\\[iceberg:schemaTableName:.*test_expiring_snapshots.*\\{retentionThreshold:0\\.00s}.*"); } @Test public void testExpireSnapshotsParameterValidation() { assertQueryFails( - "ALTER TABLE no_such_table_exists EXECUTE EXPIRE_SNAPSHOTS", + "ALTER TABLE tpch.no_such_table_exists EXECUTE EXPIRE_SNAPSHOTS", "\\Qline 1:1: Table 'iceberg.tpch.no_such_table_exists' does not exist"); assertQueryFails( "ALTER TABLE nation EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '33')", @@ -3412,14 +3424,14 @@ public void testExplainDeleteOrphanFilesOutput() assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1); assertExplain("EXPLAIN ALTER TABLE " + tableName + " EXECUTE DELETE_ORPHAN_FILES (retention_threshold => '0s')", - "SimpleTableExecute\\[iceberg:schemaTableName:tpch.test_delete_orphan_files.*\\{retentionThreshold=0\\.00s}.*"); + "SimpleTableExecute\\[iceberg:schemaTableName:.*test_delete_orphan_files.*\\{retentionThreshold=0\\.00s}.*"); } @Test public void testDeleteOrphanFilesParameterValidation() { assertQueryFails( - "ALTER TABLE no_such_table_exists EXECUTE DELETE_ORPHAN_FILES", + "ALTER TABLE tpch.no_such_table_exists EXECUTE DELETE_ORPHAN_FILES", "\\Qline 1:1: Table 'iceberg.tpch.no_such_table_exists' does not exist"); assertQueryFails( "ALTER TABLE nation EXECUTE DELETE_ORPHAN_FILES (retention_threshold => '33')", @@ -3534,4 +3546,14 @@ private Path getIcebergTablePath(String tableName, String suffix) String schema = getSession().getSchema().orElseThrow(); return getDistributedQueryRunner().getCoordinator().getBaseDataDir().resolve("iceberg_data").resolve(schema).resolve(tableName).resolve(suffix); } + + protected String getSchemaLocation() + { + return getBaseDirectory() + "/iceberg_data/tpch"; + } + + protected String getBaseDirectory() + { + return getDistributedQueryRunner().getCoordinator().getBaseDataDir().toFile().getPath(); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/S3Util.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/S3Util.java new file mode 100644 index 000000000000..042860161bfd --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/S3Util.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreApiStats; + +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; + +public final class S3Util +{ + private S3Util() {} + + public static void deleteObjects(String bucketName, String prefix) + { + AmazonS3 s3 = AmazonS3ClientBuilder.standard().build(); + + ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request() + .withBucketName(bucketName) + .withPrefix(prefix); + List keysToDelete = getPaginatedResults( + s3::listObjectsV2, + listObjectsRequest, + ListObjectsV2Request::setContinuationToken, + ListObjectsV2Result::getNextContinuationToken, + new GlueMetastoreApiStats()) + .map(ListObjectsV2Result::getObjectSummaries) + .flatMap(objectSummaries -> objectSummaries.stream().map(S3ObjectSummary::getKey)) + .map(DeleteObjectsRequest.KeyVersion::new) + .collect(toImmutableList()); + + if (!keysToDelete.isEmpty()) { + s3.deleteObjects(new DeleteObjectsRequest(bucketName).withKeys(keysToDelete)); + } + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java index ce16bc95edcb..4d4d044aa7d6 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java @@ -13,14 +13,7 @@ */ package io.trino.plugin.iceberg.catalog.glue; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.ListObjectsV2Result; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.collect.ImmutableMap; -import io.trino.plugin.hive.metastore.glue.GlueMetastoreApiStats; import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest; import io.trino.plugin.iceberg.IcebergQueryRunner; import io.trino.plugin.iceberg.SchemaInitializer; @@ -30,10 +23,7 @@ import org.testng.annotations.Parameters; import org.testng.annotations.Test; -import java.util.List; - -import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; +import static io.trino.plugin.iceberg.S3Util.deleteObjects; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -56,7 +46,7 @@ public TestIcebergGlueCatalogConnectorSmokeTest(String bucketName) { super(FileFormat.PARQUET); this.bucketName = requireNonNull(bucketName, "bucketName is null"); - this.schemaName = "iceberg_smoke_test_" + randomTableSuffix(); + this.schemaName = "test_iceberg_smoke_" + randomTableSuffix(); } @Override @@ -84,25 +74,7 @@ public void cleanup() getQueryRunner().execute("DROP SCHEMA IF EXISTS " + schemaName); // DROP TABLES should clean up any files, but clear the directory manually to be safe - AmazonS3 s3 = AmazonS3ClientBuilder.standard().build(); - - ListObjectsV2Request listObjectsRequest = new ListObjectsV2Request() - .withBucketName(bucketName) - .withPrefix(schemaPath()); - List keysToDelete = getPaginatedResults( - s3::listObjectsV2, - listObjectsRequest, - ListObjectsV2Request::setContinuationToken, - ListObjectsV2Result::getNextContinuationToken, - new GlueMetastoreApiStats()) - .map(ListObjectsV2Result::getObjectSummaries) - .flatMap(objectSummaries -> objectSummaries.stream().map(S3ObjectSummary::getKey)) - .map(DeleteObjectsRequest.KeyVersion::new) - .collect(toImmutableList()); - - if (!keysToDelete.isEmpty()) { - s3.deleteObjects(new DeleteObjectsRequest(bucketName).withKeys(keysToDelete)); - } + deleteObjects(bucketName, schemaPath()); } @Test diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueConnectorTest.java new file mode 100644 index 000000000000..3275a4ff907c --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueConnectorTest.java @@ -0,0 +1,301 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.Session; +import io.trino.plugin.iceberg.BaseIcebergConnectorTest; +import io.trino.plugin.iceberg.IcebergQueryRunner; +import io.trino.plugin.iceberg.SchemaInitializer; +import io.trino.testing.MaterializedRow; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; +import org.intellij.lang.annotations.Language; +import org.testng.SkipException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Parameters; +import org.testng.annotations.Test; + +import java.util.List; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.iceberg.IcebergFileFormat.ORC; +import static io.trino.plugin.iceberg.S3Util.deleteObjects; +import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static io.trino.tpch.TpchTable.LINE_ITEM; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/* + * TestIcebergGlueConnectorTest currently uses AWS Default Credential Provider Chain, + * See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default + * on ways to set your AWS credentials which will be needed to run this test. + */ +public class TestIcebergGlueConnectorTest + extends BaseIcebergConnectorTest +{ + private static final Logger log = Logger.get(TestIcebergGlueConnectorTest.class); + private final String bucketName; + private final String schemaName; + + @Parameters("s3.bucket") + public TestIcebergGlueConnectorTest(String bucketName) + { + super(ORC); + this.bucketName = requireNonNull(bucketName, "bucketName is null"); + this.schemaName = "test_iceberg_connector_" + randomTableSuffix(); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.builder() + .setIcebergProperties( + ImmutableMap.of( + "iceberg.catalog.type", "glue", + "iceberg.file-format", ORC.name(), + "hive.metastore.glue.default-warehouse-dir", getBaseDirectory())) + .setSchemaInitializer( + SchemaInitializer.builder() + .withClonedTpchTables(ImmutableList.>builder() + .addAll(REQUIRED_TPCH_TABLES) + .add(LINE_ITEM) + .build()) + .withSchemaName(schemaName) + .build()) + .build(); + } + + @AfterClass(alwaysRun = true) + public void cleanup() + { + for (MaterializedRow table : computeActual("SHOW TABLES").getMaterializedRows()) { + try { + getQueryRunner().execute("DROP TABLE " + table.getField(0)); + } + catch (Exception e) { + log.error(e, "Failed to drop table '%s'", table.getField(0)); + } + } + + try { + getQueryRunner().execute("DROP SCHEMA IF EXISTS " + schemaName); + } + catch (Exception e) { + log.error(e, "Failed to drop schema '%s'", schemaName); + } + + // DROP TABLES should clean up any files, but clear the directory manually to be safe + deleteObjects(bucketName, getBaseDirectory()); + } + + @Override + public void testInformationSchemaFiltering() + { + // Add schema name to WHERE condition because finding a table from all schemas in Glue is too slow + assertQuery( + "SELECT table_name FROM information_schema.tables WHERE table_schema = '" + schemaName + "' AND table_name = 'orders' LIMIT 1", + "SELECT 'orders' table_name"); + assertQuery( + "SELECT table_name FROM information_schema.columns WHERE data_type = 'bigint' AND table_schema = '" + schemaName + "' AND table_name = 'customer' AND column_name = 'custkey' LIMIT 1", + "SELECT 'customer' table_name"); + } + + @Override + public void testSelectInformationSchemaColumns() + { + // Add schema name to WHERE condition and skip below query because finding a table from all schemas in Glue is too slow + // SELECT DISTINCT table_name FROM information_schema.columns WHERE table_schema = 'information_schema' OR rand() = 42 ORDER BY 1 + + String catalog = getSession().getCatalog().orElseThrow(); + String schema = getSession().getSchema().orElseThrow(); + String schemaPattern = schema.replaceAll(".$", "_"); + + @Language("SQL") String ordersTableWithColumns = "VALUES " + + "('orders', 'orderkey'), " + + "('orders', 'custkey'), " + + "('orders', 'orderstatus'), " + + "('orders', 'totalprice'), " + + "('orders', 'orderdate'), " + + "('orders', 'orderpriority'), " + + "('orders', 'clerk'), " + + "('orders', 'shippriority'), " + + "('orders', 'comment')"; + + assertQuery("SELECT table_schema FROM information_schema.columns WHERE table_schema = '" + schema + "' GROUP BY table_schema", "VALUES '" + schema + "'"); + assertQuery("SELECT table_name FROM information_schema.columns WHERE table_schema = '" + schema + "' AND table_name = 'orders' GROUP BY table_name", "VALUES 'orders'"); + assertQuery("SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = '" + schema + "' AND table_name = 'orders'", ordersTableWithColumns); + assertQuery("SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = '" + schema + "' AND table_name LIKE '%rders'", ordersTableWithColumns); + assertQuery("SELECT table_name, column_name FROM information_schema.columns WHERE table_schema LIKE '" + schemaPattern + "' AND table_name LIKE '_rder_'", ordersTableWithColumns); + assertThat(query( + "SELECT table_name, column_name FROM information_schema.columns " + + "WHERE table_catalog = '" + catalog + "' AND table_schema = '" + schema + "' AND table_name LIKE '%orders%'")) + .skippingTypesCheck() + .containsAll(ordersTableWithColumns); + + assertQuerySucceeds("SELECT * FROM information_schema.columns WHERE table_schema = '" + schema + "'"); + assertQuery("SELECT DISTINCT table_name, column_name FROM information_schema.columns WHERE table_schema = '" + schema + "' AND table_name LIKE '_rders'", ordersTableWithColumns); + assertQuerySucceeds("SELECT * FROM information_schema.columns WHERE table_catalog = '" + catalog + "' AND table_schema = '" + schema + "'"); + assertQuery("SELECT table_name, column_name FROM information_schema.columns WHERE table_catalog = '" + catalog + "' AND table_schema = '" + schema + "' AND table_name LIKE '_rders'", ordersTableWithColumns); + assertQuerySucceeds("SELECT * FROM information_schema.columns WHERE table_catalog = '" + catalog + "' AND table_schema = '" + schema + "' AND table_name LIKE '%'"); + assertQuery("SELECT column_name FROM information_schema.columns WHERE table_catalog = 'something_else' AND table_schema = '" + schema + "'", "SELECT '' WHERE false"); + } + + @Override + public void testShowCreateSchema() + { + String schemaName = getSession().getSchema().orElseThrow(); + assertThat(computeActual("SHOW CREATE SCHEMA " + schemaName).getOnlyValue().toString()) + .matches("CREATE SCHEMA iceberg." + schemaName); + } + + @Override + public void testRenameSchema() + { + assertThatThrownBy(super::testRenameSchema) + .hasStackTraceContaining("renameNamespace is not supported for Iceberg Glue catalogs"); + } + + @Override + public void testMaterializedView() + { + throw new SkipException("TODO Glue catalog expects 'VIEW' in information_schema.tables for materialized views, but BaseConnectorTest expects 'BASE TABLE'"); + } + + @Override + public void testCreateTableSchemaNotFound() + { + // TODO: Fix Glue catalog to throw SchemaNotFoundException + assertThatThrownBy(super::testCreateTableSchemaNotFound) + .hasMessageMatching("(?s).*Database .* not found.*"); + } + + @Override + public void testCreateTableAsSelectSchemaNotFound() + { + // TODO: Fix Glue catalog to throw SchemaNotFoundException + assertThatThrownBy(super::testCreateTableAsSelectSchemaNotFound) + .hasMessageMatching("(?s).*Database .* not found.*"); + } + + @Test(dataProvider = "repartitioningDataProvider") + @Override + public void testRepartitionDataOnCtas(Session session, String partitioning, int expectedFiles) + { + throw new SkipException("TODO Disable temporarily because the test causes OOM"); + } + + @Test(dataProvider = "repartitioningDataProvider") + @Override + public void testRepartitionDataOnInsert(Session session, String partitioning, int expectedFiles) + { + throw new SkipException("TODO Disable temporarily because the test causes OOM"); + } + + @Override + public void testDeleteOrphanFiles() + { + throw new SkipException("TODO Enable after fixing testDeleteOrphanFiles to handle S3"); + } + + @Override + public void testExpireSnapshots() + { + throw new SkipException("TODO Enable after fixing testDeleteOrphanFiles to handle S3"); + } + + @Override + public void testIfDeleteOrphanFilesCleansUnnecessaryDataFilesInPartitionedTable() + { + throw new SkipException("TODO Enable after fixing testDeleteOrphanFiles to handle S3"); + } + + @Override + public void testIfDeleteOrphanFilesCleansUnnecessaryMetadataFilesInPartitionedTable() + { + throw new SkipException("TODO Enable after fixing testDeleteOrphanFiles to handle S3"); + } + + @Override + public void testShowSchemasLikeWithEscape() + { + assertThatThrownBy(super::testShowSchemasLikeWithEscape) + .hasMessageContaining("not to be equal to"); + throw new SkipException("TODO Enable this test"); + } + + @Override + public void testLocalDynamicFilteringWithSelectiveBuildSizeJoin() + { + throw new SkipException("TODO Needs investigation. Failed with NoSuchElementException"); + } + + @Override + protected boolean supportsIcebergFileStatistics(String typeName) + { + return !(typeName.equalsIgnoreCase("tinyint")) && + !(typeName.equalsIgnoreCase("smallint")) && + !(typeName.equalsIgnoreCase("char(3)")) && + !(typeName.equalsIgnoreCase("uuid")) && + !(typeName.equalsIgnoreCase("varbinary")); + } + + @Override + protected boolean supportsRowGroupStatistics(String typeName) + { + return !typeName.equalsIgnoreCase("varbinary"); + } + + @Override + protected Session withSmallRowGroups(Session session) + { + return Session.builder(session) + .setCatalogSessionProperty("iceberg", "orc_writer_max_stripe_rows", "10") + .build(); + } + + @Override + protected List getAllDataFilesFromTableDirectory(String tableName) + { + ListObjectsV2Request request = new ListObjectsV2Request(); + request.withBucketName(bucketName); + request.withPrefix(format("%s/%s.db/%s/data", schemaName, schemaName, tableName)); + + AmazonS3 s3 = AmazonS3ClientBuilder.standard().build(); + return s3.listObjectsV2(request).getObjectSummaries().stream() + .map(object -> format("s3://%s/%s", bucketName, object.getKey())) + .filter(path -> !path.matches("\\..*\\.crc")) + .collect(toImmutableList()); + } + + @Override + protected String getSchemaLocation() + { + return format("%s/%s.db", getBaseDirectory(), schemaName); + } + + @Override + protected String getBaseDirectory() + { + return format("s3://%s/%s", bucketName, schemaName); + } +} diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueries.java index fc7c86e16d67..eb5b9ec62f4a 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueries.java @@ -309,7 +309,7 @@ public void testShowSchemasLikeWithEscape() "This test expects at least one schema without underscore in it's name. Satisfy this assumption or override the test."); assertThat(result) .isSubsetOf(allSchemas) - .isNotEqualTo(allSchemas); + .isNotEqualTo(allSchemas); // Iceberg Glue assertThat(result).contains("information_schema").allMatch(schemaName -> ((String) schemaName).contains("_")); }); } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index ec18a1fbe108..8c55f5255758 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -655,8 +655,10 @@ public void testView() "WHERE table_schema = '" + schemaName + "' and table_name = '" + testView + "'", "VALUES ('" + testView + "', 'VIEW')"); - // system.jdbc.tables without filter - assertThat(query("SELECT table_schem, table_name, table_type FROM system.jdbc.tables")) + // system.jdbc.tables without table_name filter + assertThat(query( + "SELECT table_schem, table_name, table_type FROM system.jdbc.tables " + + "WHERE table_schem = '" + schemaName + "'")) .skippingTypesCheck() .containsAll("VALUES ('" + schemaName + "', '" + testView + "', 'VIEW')"); @@ -705,8 +707,10 @@ public void testView() .skippingTypesCheck() .containsAll("VALUES '" + testView + "'"); - // system.jdbc.columns without filter - assertThat(query("SELECT table_schem, table_name, column_name FROM system.jdbc.columns")) + // system.jdbc.columns without table_name filter + assertThat(query( + "SELECT table_schem, table_name, column_name FROM system.jdbc.columns " + + "WHERE table_schem = '" + schemaName + "'")) .skippingTypesCheck() .containsAll( "SELECT * FROM (VALUES ('" + schemaName + "', '" + testView + "')) " + @@ -726,7 +730,7 @@ public void testView() assertThat(query( "SELECT table_schem, table_name, column_name " + "FROM system.jdbc.columns " + - "WHERE table_name LIKE '%" + testView + "%'")) + "WHERE table_schem = '" + schemaName + "' AND table_name LIKE '%" + testView + "%'")) .skippingTypesCheck() .containsAll( "SELECT * FROM (VALUES ('" + schemaName + "', '" + testView + "')) " + @@ -900,7 +904,7 @@ public void testMaterializedView() assertUpdate("DROP MATERIALIZED VIEW " + viewWithComment); // test filtering materialized views in system metadata table - assertThat(query(listMaterializedViewsSql("catalog_name = '" + view.getCatalogName() + "'"))) + assertThat(query(listMaterializedViewsSql("catalog_name = '" + view.getCatalogName() + "' AND schema_name IN ('" + view.getSchemaName() + "', '" + otherView.getSchemaName() + "')"))) .skippingTypesCheck() .containsAll(getTestingMaterializedViewsResultRows(view, otherView)); @@ -925,7 +929,7 @@ public void testMaterializedView() .containsAll(getTestingMaterializedViewsResultRow(view, "")); assertThat(query( - listMaterializedViewsSql("name LIKE '%" + view.getObjectName() + "%'"))) + listMaterializedViewsSql("schema_name = '" + view.getSchemaName() + "' AND name LIKE '%" + view.getObjectName() + "%'"))) .skippingTypesCheck() .containsAll(getTestingMaterializedViewsResultRow(view, "")); @@ -938,9 +942,9 @@ public void testMaterializedView() assertUpdate("DROP MATERIALIZED VIEW " + view); assertUpdate("DROP MATERIALIZED VIEW " + otherView); - assertQueryReturnsEmptyResult(listMaterializedViewsSql("name = '" + view.getObjectName() + "'")); - assertQueryReturnsEmptyResult(listMaterializedViewsSql("name = '" + otherView.getObjectName() + "'")); - assertQueryReturnsEmptyResult(listMaterializedViewsSql("name = '" + viewWithComment.getObjectName() + "'")); + assertQueryReturnsEmptyResult(listMaterializedViewsSql("schema_name = '" + view.getSchemaName() + "' AND name = '" + view.getObjectName() + "'")); + assertQueryReturnsEmptyResult(listMaterializedViewsSql("schema_name = '" + otherView.getSchemaName() + "' AND name = '" + otherView.getObjectName() + "'")); + assertQueryReturnsEmptyResult(listMaterializedViewsSql("schema_name = '" + viewWithComment.getSchemaName() + "' AND name = '" + viewWithComment.getObjectName() + "'")); } @Test @@ -1132,9 +1136,9 @@ public void testRenameMaterializedView() assertUpdate(session, "ALTER MATERIALIZED VIEW " + originalMaterializedView + " RENAME TO " + renamedMaterializedView); assertTestingMaterializedViewQuery(schema, renamedMaterializedView); // verify new name in the system.metadata.materialized_views - assertQuery(session, "SELECT catalog_name, schema_name FROM system.metadata.materialized_views WHERE name = '" + renamedMaterializedView + "'", + assertQuery(session, "SELECT catalog_name, schema_name FROM system.metadata.materialized_views WHERE schema_name = '" + originalMaterializedView.getSchemaName() + "' AND name = '" + renamedMaterializedView + "'", format("VALUES ('%s', '%s')", originalMaterializedView.getCatalogName(), originalMaterializedView.getSchemaName())); - assertQueryReturnsEmptyResult(session, listMaterializedViewsSql("name = '" + originalMaterializedView.getObjectName() + "'")); + assertQueryReturnsEmptyResult(session, listMaterializedViewsSql("schema_name = '" + originalMaterializedView.getSchemaName() + "' AND name = '" + originalMaterializedView.getObjectName() + "'")); // rename with IF EXISTS on existing materialized view String testExistsMaterializedViewName = "test_materialized_view_rename_exists_" + randomTableSuffix(); @@ -1168,8 +1172,8 @@ public void testRenameMaterializedView() // rename with IF EXISTS on NOT existing materialized view assertUpdate(session, "ALTER TABLE IF EXISTS " + originalMaterializedView + " RENAME TO " + renamedMaterializedView); - assertQueryReturnsEmptyResult(session, listMaterializedViewsSql("name = '" + originalMaterializedView.getObjectName() + "'")); - assertQueryReturnsEmptyResult(session, listMaterializedViewsSql("name = '" + renamedMaterializedView + "'")); + assertQueryReturnsEmptyResult(session, listMaterializedViewsSql("schema_name = '" + schema + "' AND name = '" + originalMaterializedView.getObjectName() + "'")); + assertQueryReturnsEmptyResult(session, listMaterializedViewsSql("schema_name = '" + schema + "' AND name = '" + renamedMaterializedView + "'")); } private void assertTestingMaterializedViewQuery(String schema, String materializedViewName)