From e8bea25bc0a3a2f7ce603a1615e7af343ce229df Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Mon, 11 Apr 2022 12:34:43 -0400 Subject: [PATCH] Add cleanup mechanism for Iceberg Glue tests After one day any schemas with the 'ci_iceberg_integration_test' property will be deleted. --- plugin/trino-iceberg/pom.xml | 2 + .../catalog/glue/TrinoGlueCatalog.java | 3 + .../plugin/iceberg/BaseTrinoCatalogTest.java | 17 ++-- .../iceberg/catalog/glue/TestGlueCleanup.java | 89 +++++++++++++++++++ ...estIcebergGlueCatalogAccessOperations.java | 1 + ...tIcebergGlueCatalogConnectorSmokeTest.java | 78 +++++++++++++++- .../catalog/glue/TestSharedGlueMetastore.java | 1 + .../glue/TestTrinoGlueCatalogTest.java | 22 ++++- 8 files changed, 205 insertions(+), 8 deletions(-) create mode 100644 plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestGlueCleanup.java diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index d48a53ff5161..b373b0a1cdfb 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -416,6 +416,7 @@ **/TestTrinoGlueCatalogTest.java **/TestSharedGlueMetastore.java **/TestIcebergGlueCatalogAccessOperations.java + **/TestGlueCleanup.java @@ -455,6 +456,7 @@ **/TestTrinoGlueCatalogTest.java **/TestSharedGlueMetastore.java **/TestIcebergGlueCatalogAccessOperations.java + **/TestGlueCleanup.java diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 0d8da9c2ee5c..a512f15ac34b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -70,6 +70,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_METASTORE_ERROR; import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData; @@ -206,6 +207,8 @@ public void createNamespace(ConnectorSession session, String namespace, Map properties) { DatabaseInput databaseInput = new DatabaseInput().withName(namespace); + databaseInput.setParameters(properties.entrySet().stream() + .collect(toImmutableMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())))); Object location = properties.get(LOCATION_PROPERTY); if (location != null) { databaseInput.setLocationUri((String) location); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseTrinoCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseTrinoCatalogTest.java index 4c8f949c2500..b72890c342f5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseTrinoCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseTrinoCatalogTest.java @@ -54,7 +54,7 @@ public void testCreateNamespaceWithLocation() TrinoCatalog catalog = createTrinoCatalog(false); String namespace = "test_create_namespace_with_location_" + randomTableSuffix(); - catalog.createNamespace(SESSION, namespace, ImmutableMap.of(LOCATION_PROPERTY, "/a/path/"), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); + createNamespace(catalog, namespace, ImmutableMap.of(LOCATION_PROPERTY, "/a/path/"), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); assertThat(catalog.listNamespaces(SESSION)).contains(namespace); assertEquals(catalog.loadNamespaceMetadata(SESSION, namespace), ImmutableMap.of(LOCATION_PROPERTY, "/a/path/")); assertEquals(catalog.defaultTableLocation(SESSION, new SchemaTableName(namespace, "table")), "/a/path/table"); @@ -74,7 +74,7 @@ public void testCreateTable() String table = "tableName"; SchemaTableName schemaTableName = new SchemaTableName(namespace, table); try { - catalog.createNamespace(SESSION, namespace, ImmutableMap.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); + createNamespace(catalog, namespace, ImmutableMap.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); catalog.newCreateTableTransaction( SESSION, schemaTableName, @@ -122,8 +122,8 @@ public void testRenameTable() String table = "tableName"; SchemaTableName sourceSchemaTableName = new SchemaTableName(namespace, table); try { - catalog.createNamespace(SESSION, namespace, ImmutableMap.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); - catalog.createNamespace(SESSION, targetNamespace, ImmutableMap.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); + createNamespace(catalog, namespace, ImmutableMap.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); + createNamespace(catalog, targetNamespace, ImmutableMap.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); catalog.newCreateTableTransaction( SESSION, sourceSchemaTableName, @@ -171,7 +171,7 @@ public void testUseUniqueTableLocations() String namespace = "test_unique_table_locations_" + randomTableSuffix(); String table = "tableName"; SchemaTableName schemaTableName = new SchemaTableName(namespace, table); - catalog.createNamespace(SESSION, namespace, ImmutableMap.of(LOCATION_PROPERTY, tmpDirectory.toString()), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); + createNamespace(catalog, namespace, ImmutableMap.of(LOCATION_PROPERTY, tmpDirectory.toString()), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); try { String location1 = catalog.defaultTableLocation(SESSION, schemaTableName); String location2 = catalog.defaultTableLocation(SESSION, schemaTableName); @@ -214,7 +214,7 @@ public void testView() false); try { - catalog.createNamespace(SESSION, namespace, ImmutableMap.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); + createNamespace(catalog, namespace, ImmutableMap.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); catalog.createView(SESSION, schemaTableName, viewDefinition, false); assertThat(catalog.listTables(SESSION, Optional.of(namespace))).contains(schemaTableName); @@ -248,6 +248,11 @@ public void testView() } } + protected void createNamespace(TrinoCatalog catalog, String namespace, Map properties, TrinoPrincipal owner) + { + catalog.createNamespace(SESSION, namespace, properties, owner); + } + private void assertViewDefinition(ConnectorViewDefinition actualView, ConnectorViewDefinition expectedView) { assertEquals(actualView.getOriginalSql(), expectedView.getOriginalSql()); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestGlueCleanup.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestGlueCleanup.java new file mode 100644 index 000000000000..68eb0a8ed0d0 --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestGlueCleanup.java @@ -0,0 +1,89 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg.catalog.glue; + +import com.amazonaws.services.glue.AWSGlueAsync; +import com.amazonaws.services.glue.AWSGlueAsyncClientBuilder; +import com.amazonaws.services.glue.model.Database; +import com.amazonaws.services.glue.model.DatabaseInput; +import com.amazonaws.services.glue.model.DeleteDatabaseRequest; +import com.amazonaws.services.glue.model.GetDatabaseRequest; +import com.amazonaws.services.glue.model.GetDatabasesRequest; +import com.amazonaws.services.glue.model.GetDatabasesResult; +import com.amazonaws.services.glue.model.UpdateDatabaseRequest; +import com.google.common.collect.ImmutableMap; +import io.airlift.log.Logger; +import io.trino.plugin.hive.metastore.glue.GlueMetastoreApiStats; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.Collection; +import java.util.Date; + +import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; + +/** + * Test class with methods to clean up existing Glue tables and schemas from test which were not finished properly. + */ +public class TestGlueCleanup +{ + // All tests creating Glue schemas should add this key/value pair to the schema properties. + public static final String GLUE_SCHEMA_PROPERTY_KEY = "ci_iceberg_integration_test"; + public static final String GLUE_SCHEMA_PROPERTY_VALUE = "true"; + + private static final Logger LOG = Logger.get(TestGlueCleanup.class); + + public static void setSchemaFlag(String schemaName) + { + AWSGlueAsync glueClient = AWSGlueAsyncClientBuilder.defaultClient(); + Database database = glueClient.getDatabase(new GetDatabaseRequest().withName(schemaName)).getDatabase(); + ImmutableMap.Builder schemaParameters = ImmutableMap.builder(); + if (database.getParameters() != null) { + schemaParameters.putAll(database.getParameters()); + } + schemaParameters.put(GLUE_SCHEMA_PROPERTY_KEY, GLUE_SCHEMA_PROPERTY_VALUE); + + DatabaseInput databaseInput = new DatabaseInput() + .withName(schemaName) + .withDescription(database.getDescription()) + .withLocationUri(database.getLocationUri()) + .withParameters(schemaParameters.buildOrThrow()) + .withCreateTableDefaultPermissions(database.getCreateTableDefaultPermissions()) + .withTargetDatabase(database.getTargetDatabase()); + glueClient.updateDatabase(new UpdateDatabaseRequest() + .withName(schemaName) + .withDatabaseInput(databaseInput)); + } + + @Test + public void cleanupOrphanedSchemas() + { + AWSGlueAsync glueClient = AWSGlueAsyncClientBuilder.defaultClient(); + long createdAtCutoff = System.currentTimeMillis() - Duration.ofDays(1).toMillis(); + getPaginatedResults( + glueClient::getDatabases, + new GetDatabasesRequest(), + GetDatabasesRequest::setNextToken, + GetDatabasesResult::getNextToken, + new GlueMetastoreApiStats()) + .map(GetDatabasesResult::getDatabaseList) + .flatMap(Collection::stream) + .filter(database -> database.getParameters() != null && GLUE_SCHEMA_PROPERTY_VALUE.equals(database.getParameters().get(GLUE_SCHEMA_PROPERTY_KEY))) + .filter(database -> database.getCreateTime().before(new Date(createdAtCutoff))) + .forEach(database -> { + LOG.warn("Deleting old Glue database " + database); + glueClient.deleteDatabase(new DeleteDatabaseRequest().withName(database.getName())); + }); + } +} diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java index ebc818f148fe..28781a6577b8 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogAccessOperations.java @@ -109,6 +109,7 @@ protected QueryRunner createQueryRunner() "hive.metastore.glue.default-warehouse-dir", tmp.getAbsolutePath())); queryRunner.execute("CREATE SCHEMA " + testSchema); + TestGlueCleanup.setSchemaFlag(testSchema); glueStats = verifyNotNull(glueStatsReference.get(), "glueStatsReference not set"); return queryRunner; diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java index f0a8510e8330..729afbe50f73 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestIcebergGlueCatalogConnectorSmokeTest.java @@ -26,6 +26,7 @@ import io.trino.plugin.iceberg.SchemaInitializer; import io.trino.testing.QueryRunner; import org.apache.iceberg.FileFormat; +import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.Parameters; import org.testng.annotations.Test; @@ -34,6 +35,11 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.hive.metastore.glue.AwsSdkUtil.getPaginatedResults; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_SCHEMA; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_CREATE_TABLE; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_INSERT; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_RENAME_TABLE; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_RENAME_TABLE_ACROSS_SCHEMAS; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -63,7 +69,7 @@ public TestIcebergGlueCatalogConnectorSmokeTest(String bucketName) protected QueryRunner createQueryRunner() throws Exception { - return IcebergQueryRunner.builder() + QueryRunner queryRunner = IcebergQueryRunner.builder() .setIcebergProperties( ImmutableMap.of( "iceberg.catalog.type", "glue", @@ -74,6 +80,8 @@ protected QueryRunner createQueryRunner() .withSchemaName(schemaName) .build()) .build(); + TestGlueCleanup.setSchemaFlag(schemaName); + return queryRunner; } @AfterClass(alwaysRun = true) @@ -132,6 +140,25 @@ public void testMaterializedView() .hasStackTraceContaining("createMaterializedView is not supported for Iceberg Glue catalogs"); } + // Overridden to set the schema flag for proper cleanup + @Test + @Override + public void testCreateSchema() + { + String schemaName = "test_schema_create_" + randomTableSuffix(); + if (!hasBehavior(SUPPORTS_CREATE_SCHEMA)) { + assertQueryFails(createSchemaSql(schemaName), "This connector does not support creating schemas"); + return; + } + + assertUpdate(createSchemaSql(schemaName)); + TestGlueCleanup.setSchemaFlag(schemaName); + assertThat(query("SHOW SCHEMAS")) + .skippingTypesCheck() + .containsAll(format("VALUES '%s', '%s'", getSession().getSchema().orElseThrow(), schemaName)); + assertUpdate("DROP SCHEMA " + schemaName); + } + @Test @Override public void testRenameSchema() @@ -140,6 +167,55 @@ public void testRenameSchema() .hasStackTraceContaining("renameNamespace is not supported for Iceberg Glue catalogs"); } + // Overridden to set the schema flag for proper cleanup + @Test + @Override + public void testRenameTableAcrossSchemas() + { + if (!hasBehavior(SUPPORTS_RENAME_TABLE_ACROSS_SCHEMAS)) { + if (!hasBehavior(SUPPORTS_RENAME_TABLE)) { + throw new SkipException("Skipping since rename table is not supported at all"); + } + assertQueryFails("ALTER TABLE nation RENAME TO other_schema.yyyy", "This connector does not support renaming tables across schemas"); + return; + } + + if (!hasBehavior(SUPPORTS_CREATE_SCHEMA)) { + throw new AssertionError("Cannot test ALTER TABLE RENAME across schemas without CREATE SCHEMA, the test needs to be implemented in a connector-specific way"); + } + + if (!hasBehavior(SUPPORTS_CREATE_TABLE)) { + throw new AssertionError("Cannot test ALTER TABLE RENAME across schemas without CREATE TABLE, the test needs to be implemented in a connector-specific way"); + } + + String oldTable = "test_rename_old_" + randomTableSuffix(); + assertUpdate("CREATE TABLE " + oldTable + " (a bigint, b double)"); + + String schemaName = "test_schema_" + randomTableSuffix(); + assertUpdate(createSchemaSql(schemaName)); + TestGlueCleanup.setSchemaFlag(schemaName); + + String newTable = schemaName + ".test_rename_new_" + randomTableSuffix(); + assertUpdate("ALTER TABLE " + oldTable + " RENAME TO " + newTable); + + assertThat(query("SHOW TABLES LIKE '" + oldTable + "'")) + .returnsEmptyResult(); + assertThat(query("SELECT a, b FROM " + newTable)) + .returnsEmptyResult(); + + if (hasBehavior(SUPPORTS_INSERT)) { + assertUpdate("INSERT INTO " + newTable + " (a, b) VALUES (42, -38.5)", 1); + assertThat(query("SELECT CAST(a AS bigint), b FROM " + newTable)) + .matches("VALUES (BIGINT '42', -385e-1)"); + } + + assertUpdate("DROP TABLE " + newTable); + assertThat(query("SHOW TABLES LIKE '" + newTable + "'")) + .returnsEmptyResult(); + + assertUpdate("DROP SCHEMA " + schemaName); + } + private String schemaPath() { return format("s3://%s/%s", bucketName, schemaName); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestSharedGlueMetastore.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestSharedGlueMetastore.java index da6188ec3cd3..030f4f44411d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestSharedGlueMetastore.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestSharedGlueMetastore.java @@ -119,6 +119,7 @@ protected QueryRunner createQueryRunner() ImmutableMap.of("hive.iceberg-catalog-name", "iceberg")); queryRunner.execute("CREATE SCHEMA " + schema + " WITH (location = '" + dataDirectory.toString() + "')"); + TestGlueCleanup.setSchemaFlag(schema); copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, icebergSession, ImmutableList.of(TpchTable.NATION)); copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, hiveSession, ImmutableList.of(TpchTable.REGION)); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalogTest.java index f2431402ccc1..e9509451f1e4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalogTest.java @@ -37,8 +37,11 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Map; import java.util.Optional; +import static io.trino.plugin.iceberg.catalog.glue.TestGlueCleanup.GLUE_SCHEMA_PROPERTY_KEY; +import static io.trino.plugin.iceberg.catalog.glue.TestGlueCleanup.GLUE_SCHEMA_PROPERTY_VALUE; import static io.trino.testing.TestingConnectorSession.SESSION; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static org.testng.Assert.assertEquals; @@ -68,6 +71,19 @@ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations) useUniqueTableLocations); } + @Override + protected void createNamespace(TrinoCatalog catalog, String namespace, Map properties, TrinoPrincipal owner) + { + super.createNamespace( + catalog, + namespace, + ImmutableMap.builder() + .putAll(properties) + .put(GLUE_SCHEMA_PROPERTY_KEY, GLUE_SCHEMA_PROPERTY_VALUE) + .buildOrThrow(), + owner); + } + @Test public void testDefaultLocation() throws IOException @@ -94,7 +110,11 @@ public void testDefaultLocation() String namespace = "test_default_location_" + randomTableSuffix(); String table = "tableName"; SchemaTableName schemaTableName = new SchemaTableName(namespace, table); - catalogWithDefaultLocation.createNamespace(SESSION, namespace, ImmutableMap.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); + createNamespace( + catalogWithDefaultLocation, + namespace, + ImmutableMap.of(), + new TrinoPrincipal(PrincipalType.USER, SESSION.getUser())); try { File expectedSchemaDirectory = new File(tmpDirectory.toFile(), namespace + ".db"); File expectedTableDirectory = new File(expectedSchemaDirectory, schemaTableName.getTableName());