diff --git a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java index a71804c0406f..d8faaf8b438c 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java @@ -19,7 +19,12 @@ package org.apache.iceberg.aws; +import java.util.List; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.Delete; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; @@ -29,6 +34,8 @@ public class AwsIntegTestUtil { + private static final Logger LOG = LoggerFactory.getLogger(AwsIntegTestUtil.class); + private AwsIntegTestUtil() { } @@ -55,4 +62,15 @@ public static void cleanS3Bucket(S3Client s3, String bucketName, String prefix) } } } + + public static void cleanGlueCatalog(GlueClient glue, List namespaces) { + for (String namespace : namespaces) { + try { + // delete db also delete tables + glue.deleteDatabase(DeleteDatabaseRequest.builder().name(namespace).build()); + } catch (Exception e) { + LOG.error("Cannot delete namespace {}", namespace, e); + } + } + } } diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java new file mode 100644 index 000000000000..9dc3ce583978 --- /dev/null +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Test; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; + +public class GlueCatalogNamespaceTest extends GlueTestBase { + + @Test + public void testCreateNamespace() { + String namespace = getRandomName(); + namespaces.add(namespace); + AssertHelpers.assertThrows("namespace does not exist before create", + EntityNotFoundException.class, + "not found", + () -> glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build())); + glueCatalog.createNamespace(Namespace.of(namespace)); + Database database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database(); + Assert.assertEquals("namespace must equal database name", namespace, database.name()); + } + + @Test + public void testCreateDuplicate() { + String namespace = createNamespace(); + AssertHelpers.assertThrows("should not create namespace with the same name", + AlreadyExistsException.class, + "it already exists in Glue", + () -> glueCatalog.createNamespace(Namespace.of(namespace))); + } + + @Test + public void testCreateBadName() { + List invalidNamespaces = Lists.newArrayList( + Namespace.of("db-1"), + Namespace.of("db", "db2") + ); + + for (Namespace namespace : invalidNamespaces) { + AssertHelpers.assertThrows("should not create namespace with invalid or nested names", + ValidationException.class, + "Cannot convert namespace", + () -> glueCatalog.createNamespace(namespace)); + } + } + + @Test + public void testNamespaceExists() { + String namespace = createNamespace(); + Assert.assertTrue(glueCatalog.namespaceExists(Namespace.of(namespace))); + } + + @Test + public void testListNamespace() { + String namespace = createNamespace(); + List namespaceList = glueCatalog.listNamespaces(); + Assert.assertTrue(namespaceList.size() > 0); + Assert.assertTrue(namespaceList.contains(Namespace.of(namespace))); + namespaceList = glueCatalog.listNamespaces(Namespace.of(namespace)); + Assert.assertTrue(namespaceList.isEmpty()); + } + + @Test + public void testNamespaceProperties() { + String namespace = createNamespace(); + // set properties + Map properties = Maps.newHashMap(); + properties.put("key", "val"); + properties.put("key2", "val2"); + glueCatalog.setProperties(Namespace.of(namespace), properties); + Database database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database(); + Assert.assertTrue(database.parameters().containsKey("key")); + Assert.assertEquals("val", database.parameters().get("key")); + Assert.assertTrue(database.parameters().containsKey("key2")); + Assert.assertEquals("val2", database.parameters().get("key2")); + // remove properties + glueCatalog.removeProperties(Namespace.of(namespace), Sets.newHashSet("key")); + database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database(); + Assert.assertFalse(database.parameters().containsKey("key")); + Assert.assertTrue(database.parameters().containsKey("key2")); + Assert.assertEquals("val2", database.parameters().get("key2")); + // add back + properties = Maps.newHashMap(); + properties.put("key", "val"); + glueCatalog.setProperties(Namespace.of(namespace), properties); + database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database(); + Assert.assertTrue(database.parameters().containsKey("key")); + Assert.assertEquals("val", database.parameters().get("key")); + Assert.assertTrue(database.parameters().containsKey("key2")); + Assert.assertEquals("val2", database.parameters().get("key2")); + } + + @Test + public void testDropNamespace() { + String namespace = createNamespace(); + glueCatalog.dropNamespace(Namespace.of(namespace)); + AssertHelpers.assertThrows("namespace should not exist after deletion", + EntityNotFoundException.class, + "not found", + () -> glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build())); + } + + @Test + public void testDropNamespaceNonEmpty() { + String namespace = createNamespace(); + createTable(namespace); + AssertHelpers.assertThrows("namespace should not be dropped when still has table", + NamespaceNotEmptyException.class, + "it is not empty", + () -> glueCatalog.dropNamespace(Namespace.of(namespace))); + } + +} diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogTableTest.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogTableTest.java new file mode 100644 index 000000000000..3de670396a49 --- /dev/null +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogTableTest.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.GetTableVersionsRequest; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.S3Object; + +public class GlueCatalogTableTest extends GlueTestBase { + + @Test + public void testCreateTable() { + String namespace = createNamespace(); + String tableName = getRandomName(); + glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec); + // verify table exists in Glue + GetTableResponse response = glue.getTable(GetTableRequest.builder() + .databaseName(namespace).name(tableName).build()); + Assert.assertEquals(namespace, response.table().databaseName()); + Assert.assertEquals(tableName, response.table().name()); + Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH), + response.table().parameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)); + Assert.assertTrue(response.table().parameters().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP)); + // verify metadata file exists in S3 + String metaLocation = response.table().parameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP); + String key = metaLocation.split(testBucketName, -1)[1].substring(1); + s3.headObject(HeadObjectRequest.builder().bucket(testBucketName).key(key).build()); + Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)); + Assert.assertEquals(partitionSpec, table.spec()); + Assert.assertEquals(schema.toString(), table.schema().toString()); + } + + @Test + public void testCreateTableDuplicate() { + String namespace = createNamespace(); + String tableName = createTable(namespace); + AssertHelpers.assertThrows("should not create table with the same name", + AlreadyExistsException.class, + "Table already exists", + () -> glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec)); + } + + @Test + public void testCreateTableBadName() { + String namespace = createNamespace(); + AssertHelpers.assertThrows("should not create table with bad name", + IllegalArgumentException.class, + "Invalid table identifier", + () -> glueCatalog.createTable(TableIdentifier.of(namespace, "table-1"), schema, partitionSpec)); + } + + @Test + public void testListTables() { + String namespace = createNamespace(); + Assert.assertTrue("list namespace should have nothing before table creation", + glueCatalog.listTables(Namespace.of(namespace)).isEmpty()); + String tableName = createTable(namespace); + List tables = glueCatalog.listTables(Namespace.of(namespace)); + Assert.assertEquals(1, tables.size()); + Assert.assertEquals(TableIdentifier.of(namespace, tableName), tables.get(0)); + } + + @Test + public void testTableExists() { + String namespace = createNamespace(); + String tableName = createTable(namespace); + Assert.assertTrue(glueCatalog.tableExists(TableIdentifier.of(namespace, tableName))); + } + + @Test + public void testUpdateTable() { + String namespace = createNamespace(); + String tableName = getRandomName(); + // current should be null + TableOperations ops = glueCatalog.newTableOps(TableIdentifier.of(namespace, tableName)); + TableMetadata current = ops.current(); + Assert.assertNull(current); + // create table, refresh should update + createTable(namespace, tableName); + current = ops.refresh(); + Assert.assertEquals(schema.toString(), current.schema().toString()); + Assert.assertEquals(partitionSpec, current.spec()); + Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)); + Assert.assertTrue("initial table history should be empty", table.history().isEmpty()); + // commit new version, should create a new snapshot + table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)); + DataFile dataFile = DataFiles.builder(partitionSpec) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + table.newAppend().appendFile(dataFile).commit(); + table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)); + Assert.assertEquals("commit should create a new table version", 1, table.history().size()); + } + + @Test + public void testRenameTable() { + String namespace = createNamespace(); + String tableName = createTable(namespace); + Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)); + // rename table + String newTableName = tableName + "_2"; + glueCatalog.renameTable(TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, newTableName)); + Table renamedTable = glueCatalog.loadTable(TableIdentifier.of(namespace, newTableName)); + Assert.assertEquals(table.location(), renamedTable.location()); + Assert.assertEquals(table.schema().toString(), renamedTable.schema().toString()); + Assert.assertEquals(table.spec(), renamedTable.spec()); + Assert.assertEquals(table.currentSnapshot(), renamedTable.currentSnapshot()); + } + + @Test + public void testRenameTable_failToCreateNewTable() { + String namespace = createNamespace(); + String tableName = createTable(namespace); + TableIdentifier id = TableIdentifier.of(namespace, tableName); + Table table = glueCatalog.loadTable(id); + // create a new table in Glue, so that rename to that table will fail + String newTableName = tableName + "_2"; + glue.createTable(CreateTableRequest.builder() + .databaseName(namespace) + .tableInput(TableInput.builder().name(newTableName).build()) + .build()); + AssertHelpers.assertThrows("should fail to rename to an existing table", + software.amazon.awssdk.services.glue.model.AlreadyExistsException.class, + "Table already exists", + () -> glueCatalog.renameTable( + TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, newTableName))); + // old table can still be read with same metadata + Table oldTable = glueCatalog.loadTable(id); + Assert.assertEquals(table.location(), oldTable.location()); + Assert.assertEquals(table.schema().toString(), oldTable.schema().toString()); + Assert.assertEquals(table.spec(), oldTable.spec()); + Assert.assertEquals(table.currentSnapshot(), oldTable.currentSnapshot()); + } + + @Test + public void testRenameTable_failToDeleteOldTable() { + String namespace = createNamespace(); + String tableName = createTable(namespace); + TableIdentifier id = TableIdentifier.of(namespace, tableName); + Table table = glueCatalog.loadTable(id); + // delete the old table metadata, so that drop old table will fail + String newTableName = tableName + "_2"; + glue.updateTable(UpdateTableRequest.builder() + .databaseName(namespace) + .tableInput(TableInput.builder().name(tableName).parameters(Maps.newHashMap()).build()) + .build()); + AssertHelpers.assertThrows("should fail to rename", + ValidationException.class, + "Input Glue table is not an iceberg table", + () -> glueCatalog.renameTable( + TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, newTableName))); + AssertHelpers.assertThrows("renamed table should be deleted", + EntityNotFoundException.class, + "not found", + () -> glue.getTable(GetTableRequest.builder().databaseName(namespace).name(newTableName).build())); + } + + @Test + public void testDeleteTableWithoutPurge() { + String namespace = createNamespace(); + String tableName = createTable(namespace); + glueCatalog.dropTable(TableIdentifier.of(namespace, tableName), false); + AssertHelpers.assertThrows("should not have table", + NoSuchTableException.class, + "Table does not exist", + () -> glueCatalog.loadTable(TableIdentifier.of(namespace, tableName))); + String warehouseLocation = glueCatalog.defaultWarehouseLocation(TableIdentifier.of(namespace, tableName)); + String prefix = warehouseLocation.split(testBucketName + "/", -1)[1]; + ListObjectsV2Response response = s3.listObjectsV2(ListObjectsV2Request.builder() + .bucket(testBucketName).prefix(prefix + "/metadata/").build()); + Assert.assertTrue(response.hasContents()); + boolean hasMetaFile = false; + for (S3Object s3Object : response.contents()) { + if (s3Object.key().contains(".json")) { + hasMetaFile = true; + break; + } + } + Assert.assertTrue("metadata json file exists after delete without purge", hasMetaFile); + } + + @Test + public void testDeleteTableWithPurge() { + String namespace = createNamespace(); + String tableName = createTable(namespace); + glueCatalog.dropTable(TableIdentifier.of(namespace, tableName)); + AssertHelpers.assertThrows("should not have table", + NoSuchTableException.class, + "Table does not exist", + () -> glueCatalog.loadTable(TableIdentifier.of(namespace, tableName))); + String warehouseLocation = glueCatalog.defaultWarehouseLocation(TableIdentifier.of(namespace, tableName)); + String prefix = warehouseLocation.split(testBucketName + "/", -1)[1]; + ListObjectsV2Response response = s3.listObjectsV2(ListObjectsV2Request.builder() + .bucket(testBucketName).prefix(prefix).build()); + if (response.hasContents()) { + // might have directory markers left + for (S3Object s3Object : response.contents()) { + Optional size = s3Object.getValueForField("Size", Long.class); + Assert.assertTrue(size.isPresent()); + Assert.assertEquals(0L, (long) size.get()); + } + } + } + + @Test + public void testCommitTableSkipArchive() { + // create ns + String namespace = getRandomName(); + namespaces.add(namespace); + glueCatalog.createNamespace(Namespace.of(namespace)); + // create table and commit without skip + Schema schema = new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), "c1")); + PartitionSpec partitionSpec = PartitionSpec.builderFor(schema).build(); + String tableName = getRandomName(); + glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec); + Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)); + DataFile dataFile = DataFiles.builder(partitionSpec) + .withPath("/path/to/data-a.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + table.newAppend().appendFile(dataFile).commit(); + Assert.assertEquals(2, glue.getTableVersions(GetTableVersionsRequest.builder() + .databaseName(namespace).tableName(tableName).build()).tableVersions().size()); + // create table and commit with skip + tableName = getRandomName(); + glueCatalogWithSkip.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec); + table = glueCatalogWithSkip.loadTable(TableIdentifier.of(namespace, tableName)); + table.newAppend().appendFile(dataFile).commit(); + Assert.assertEquals("skipArchive should not create new version", + 1, glue.getTableVersions(GetTableVersionsRequest.builder() + .databaseName(namespace).tableName(tableName).build()).tableVersions().size()); + } +} diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java new file mode 100644 index 000000000000..b787cb8f4247 --- /dev/null +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.List; +import java.util.UUID; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.aws.AwsClientUtil; +import org.apache.iceberg.aws.AwsIntegTestUtil; +import org.apache.iceberg.aws.AwsProperties; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.s3.S3Client; + +@SuppressWarnings({"VisibilityModifier", "HideUtilityClassConstructor"}) +public class GlueTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(GlueTestBase.class); + + // the integration test requires the following env variables + static final String testBucketName = AwsIntegTestUtil.testBucketName(); + + static final String catalogName = "glue"; + static final String testPathPrefix = getRandomName(); + static final List namespaces = Lists.newArrayList(); + + // aws clients + static final GlueClient glue = AwsClientUtil.defaultGlueClient(); + static final S3Client s3 = AwsClientUtil.defaultS3Client(); + + // iceberg + static GlueCatalog glueCatalog; + static GlueCatalog glueCatalogWithSkip; + + static Schema schema = new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), "c1")); + static PartitionSpec partitionSpec = PartitionSpec.builderFor(schema).build(); + + @BeforeClass + public static void beforeClass() { + String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix; + S3FileIO fileIO = new S3FileIO(); + glueCatalog = new GlueCatalog(glue); + glueCatalog.initialize(catalogName, testBucketPath, new AwsProperties(), fileIO); + AwsProperties properties = new AwsProperties(); + properties.setGlueCatalogSkipArchive(true); + glueCatalogWithSkip = new GlueCatalog(glue); + glueCatalogWithSkip.initialize(catalogName, testBucketPath, properties, fileIO); + } + + @AfterClass + public static void afterClass() { + AwsIntegTestUtil.cleanGlueCatalog(glue, namespaces); + AwsIntegTestUtil.cleanS3Bucket(s3, testBucketName, testPathPrefix); + } + + public static String getRandomName() { + return UUID.randomUUID().toString().replace("-", ""); + } + + public static String createNamespace() { + String namespace = getRandomName(); + namespaces.add(namespace); + glueCatalog.createNamespace(Namespace.of(namespace)); + return namespace; + } + + public static String createTable(String namespace) { + String tableName = getRandomName(); + return createTable(namespace, tableName); + } + + public static String createTable(String namespace, String tableName) { + glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec); + return tableName; + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java index e75b3f00d261..71c7ed0db494 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java @@ -20,6 +20,7 @@ package org.apache.iceberg.aws; import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.glue.GlueClient; import software.amazon.awssdk.services.kms.KmsClient; import software.amazon.awssdk.services.s3.S3Client; @@ -47,4 +48,10 @@ public static KmsClient defaultKmsClient() { .httpClient(UrlConnectionHttpClient.create()) .build(); } + + public static GlueClient defaultGlueClient() { + return GlueClient.builder() + .httpClient(UrlConnectionHttpClient.create()) + .build(); + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java index b9c33df2601b..358a8c990734 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; public class AwsProperties { @@ -65,14 +66,36 @@ public class AwsProperties { */ public static final String S3FILEIO_SSE_MD5 = "s3fileio.sse.md5"; + /** + * The ID of the Glue Data Catalog where the tables reside. + * If none is provided, Glue automatically uses the caller's AWS account ID by default. + * For more details, see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html + */ + public static final String GLUE_CATALOG_ID = "gluecatalog.id"; + + /** + * If Glue should skip archiving an old table version when creating a new version in a commit. + * By default Glue archives all old table versions after an UpdateTable call, + * but Glue has a default max number of archived table versions (can be increased). + * So for streaming use case with lots of commits, it is recommended to set this value to true. + */ + public static final String GLUE_CATALOG_SKIP_ARCHIVE = "gluecatalog.skip-archive"; + public static final boolean GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT = false; + private String s3FileIoSseType; private String s3FileIoSseKey; private String s3FileIoSseMd5; + private String glueCatalogId; + private boolean glueCatalogSkipArchive; + public AwsProperties() { this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE; this.s3FileIoSseKey = null; this.s3FileIoSseMd5 = null; + + this.glueCatalogId = null; + this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT; } public AwsProperties(Map properties) { @@ -84,6 +107,10 @@ public AwsProperties(Map properties) { Preconditions.checkNotNull(s3FileIoSseKey, "Cannot initialize SSE-C S3FileIO with null encryption key"); Preconditions.checkNotNull(s3FileIoSseMd5, "Cannot initialize SSE-C S3FileIO with null encryption key MD5"); } + + this.glueCatalogId = properties.get(GLUE_CATALOG_ID); + this.glueCatalogSkipArchive = PropertyUtil.propertyAsBoolean(properties, + AwsProperties.GLUE_CATALOG_SKIP_ARCHIVE, AwsProperties.GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT); } public String s3FileIoSseType() { @@ -109,4 +136,20 @@ public String s3FileIoSseMd5() { public void setS3FileIoSseMd5(String sseMd5) { this.s3FileIoSseMd5 = sseMd5; } + + public String glueCatalogId() { + return glueCatalogId; + } + + public void setGlueCatalogId(String id) { + this.glueCatalogId = id; + } + + public boolean glueCatalogSkipArchive() { + return glueCatalogSkipArchive; + } + + public void setGlueCatalogSkipArchive(boolean skipArchive) { + this.glueCatalogSkipArchive = skipArchive; + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java new file mode 100644 index 000000000000..86ce858a825d --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.aws.AwsClientUtil; +import org.apache.iceberg.aws.AwsProperties; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; +import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetDatabaseResponse; +import software.amazon.awssdk.services.glue.model.GetDatabasesRequest; +import software.amazon.awssdk.services.glue.model.GetDatabasesResponse; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.InvalidInputException; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest; + +public class GlueCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + private final GlueClient glue; + private Configuration hadoopConf; + private String catalogName; + private String warehousePath; + private AwsProperties awsProperties; + private FileIO fileIO; + + /** + * No-arg constructor to load the catalog dynamically. + *

+ * Only the AWS Glue client is initialized. + * Other fields must be initialized by calling {@link GlueCatalog#initialize(String, Map)} later. + */ + public GlueCatalog() { + this(AwsClientUtil.defaultGlueClient()); + } + + @VisibleForTesting + GlueCatalog(GlueClient glue) { + this.glue = glue; + } + + @Override + public void initialize(String name, Map properties) { + String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL); + initialize( + name, + properties.get(CatalogProperties.WAREHOUSE_LOCATION), + new AwsProperties(properties), + fileIOImpl == null ? new S3FileIO() : CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf)); + } + + @VisibleForTesting + void initialize(String name, String path, AwsProperties properties, FileIO io) { + this.catalogName = name; + this.awsProperties = properties; + this.warehousePath = cleanWarehousePath(path); + this.fileIO = io; + } + + private String cleanWarehousePath(String path) { + Preconditions.checkArgument(path != null && path.length() > 0, + "Cannot initialize GlueCatalog because warehousePath must not be null"); + int len = path.length(); + if (path.charAt(len - 1) == '/') { + return path.substring(0, len - 1); + } else { + return path; + } + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return new GlueTableOperations(glue, catalogName, awsProperties, fileIO, tableIdentifier); + } + + /** + * This method produces the same result as using a HiveCatalog. + * If databaseUri exists for the Glue database URI, the default location is databaseUri/tableName. + * If not, the default location is warehousePath/databaseName.db/tableName + * @param tableIdentifier table id + * @return default warehouse path + */ + @Override + protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) { + // check if value is set in database + GetDatabaseResponse response = glue.getDatabase(GetDatabaseRequest.builder() + .name(IcebergToGlueConverter.getDatabaseName(tableIdentifier)) + .build()); + String dbLocationUri = response.database().locationUri(); + if (dbLocationUri != null) { + return String.format("%s/%s", dbLocationUri, tableIdentifier.name()); + } + + return String.format( + "%s/%s.db/%s", + warehousePath, + IcebergToGlueConverter.getDatabaseName(tableIdentifier), + tableIdentifier.name()); + } + + @Override + public List listTables(Namespace namespace) { + namespaceExists(namespace); + // should be safe to list all before returning the list, instead of dynamically load the list. + String nextToken = null; + List results = Lists.newArrayList(); + do { + GetTablesResponse response = glue.getTables(GetTablesRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .databaseName(IcebergToGlueConverter.toDatabaseName(namespace)) + .nextToken(nextToken) + .build()); + nextToken = response.nextToken(); + if (response.hasTableList()) { + results.addAll(response.tableList().stream() + .map(GlueToIcebergConverter::toTableId) + .collect(Collectors.toList())); + } + } while (nextToken != null); + + LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results); + return results; + } + + @Override + public boolean dropTable(TableIdentifier identifier, boolean purge) { + try { + TableOperations ops = newTableOps(identifier); + TableMetadata lastMetadata = ops.current(); + glue.deleteTable(DeleteTableRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .databaseName(IcebergToGlueConverter.getDatabaseName(identifier)) + .name(identifier.name()) + .build()); + LOG.info("Successfully dropped table {} from Glue", identifier); + if (purge && lastMetadata != null) { + CatalogUtil.dropTableData(ops.io(), lastMetadata); + LOG.info("Glue table {} data purged", identifier); + } + LOG.info("Dropped table: {}", identifier); + return true; + } catch (EntityNotFoundException e) { + LOG.error("Cannot drop table {} because table not found or not accessible", identifier, e); + return false; + } catch (Exception e) { + LOG.error("Cannot complete drop table operation for {} due to unexpected exception", identifier, e); + throw e; + } + } + + /** + * Rename table in Glue is a drop table and create table. + * @param from identifier of the table to rename + * @param to new table name + */ + @Override + public void renameTable(TableIdentifier from, TableIdentifier to) { + // check new namespace exists + if (!namespaceExists(to.namespace())) { + throw new NoSuchNamespaceException("Cannot rename %s to %s because namespace %s does not exist", + from, to, to.namespace()); + } + // keep metadata + Table fromTable = null; + String fromTableDbName = IcebergToGlueConverter.getDatabaseName(from); + String fromTableName = IcebergToGlueConverter.getTableName(from); + String toTableDbName = IcebergToGlueConverter.getDatabaseName(to); + String toTableName = IcebergToGlueConverter.getTableName(to); + try { + GetTableResponse response = glue.getTable(GetTableRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .databaseName(fromTableDbName) + .name(fromTableName) + .build()); + fromTable = response.table(); + } catch (EntityNotFoundException e) { + throw new NoSuchTableException(e, "Cannot rename %s because the table does not exist in Glue", from); + } + + // use the same Glue info to create the new table, pointing to the old metadata + TableInput.Builder tableInputBuilder = TableInput.builder() + .owner(fromTable.owner()) + .tableType(fromTable.tableType()) + .parameters(fromTable.parameters()); + + glue.createTable(CreateTableRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .databaseName(toTableDbName) + .tableInput(tableInputBuilder.name(toTableName).build()) + .build()); + LOG.info("created rename destination table {}", to); + + try { + dropTable(from, false); + } catch (Exception e) { + // rollback, delete renamed table + LOG.error("Fail to drop old table {} after renaming to {}, rollback to use the old table", from, to, e); + glue.deleteTable(DeleteTableRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .databaseName(toTableDbName) + .name(toTableName) + .build()); + throw e; + } + + LOG.info("Successfully renamed table from {} to {}", from, to); + } + + @Override + public void createNamespace(Namespace namespace, Map metadata) { + try { + glue.createDatabase(CreateDatabaseRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .databaseInput(IcebergToGlueConverter.toDatabaseInput(namespace, metadata)) + .build()); + LOG.info("Created namespace: {}", namespace); + } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) { + throw new AlreadyExistsException("Cannot create namespace %s because it already exists in Glue", namespace); + } + } + + @Override + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + if (!namespace.isEmpty()) { + // if it is not a list all op, just check if the namespace exists and return empty. + if (namespaceExists(namespace)) { + return Lists.newArrayList(); + } + throw new NoSuchNamespaceException( + "Glue does not support nested namespace, cannot list namespaces under %s", namespace); + } + + // should be safe to list all before returning the list, instead of dynamically load the list. + String nextToken = null; + List results = Lists.newArrayList(); + do { + GetDatabasesResponse response = glue.getDatabases(GetDatabasesRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .nextToken(nextToken) + .build()); + nextToken = response.nextToken(); + if (response.hasDatabaseList()) { + results.addAll(response.databaseList().stream() + .map(GlueToIcebergConverter::toNamespace) + .collect(Collectors.toList())); + } + } while (nextToken != null); + + LOG.debug("Listing namespace {} returned namespaces: {}", namespace, results); + return results; + } + + @Override + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + String databaseName = IcebergToGlueConverter.toDatabaseName(namespace); + try { + GetDatabaseResponse response = glue.getDatabase(GetDatabaseRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .name(databaseName) + .build()); + Map result = response.database().parameters(); + LOG.debug("Loaded metadata for namespace {} found {}", namespace, result); + return result; + } catch (InvalidInputException e) { + throw new NoSuchNamespaceException("invalid input for namespace %s, error message: %s", + namespace, e.getMessage()); + } catch (EntityNotFoundException e) { + throw new NoSuchNamespaceException("fail to find Glue database for namespace %s, error message: %s", + databaseName, e.getMessage()); + } + } + + @Override + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + namespaceExists(namespace); + List tableIdentifiers = listTables(namespace); + if (tableIdentifiers != null && !tableIdentifiers.isEmpty()) { + throw new NamespaceNotEmptyException("Cannot drop namespace %s because it is not empty. " + + "The following tables still exist under the namespace: %s", namespace, tableIdentifiers); + } + + glue.deleteDatabase(DeleteDatabaseRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .name(IcebergToGlueConverter.toDatabaseName(namespace)) + .build()); + LOG.info("Dropped namespace: {}", namespace); + // Always successful, otherwise exception is thrown + return true; + } + + @Override + public boolean setProperties(Namespace namespace, Map properties) throws NoSuchNamespaceException { + Map newProperties = Maps.newHashMap(); + newProperties.putAll(loadNamespaceMetadata(namespace)); + newProperties.putAll(properties); + glue.updateDatabase(UpdateDatabaseRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .name(IcebergToGlueConverter.toDatabaseName(namespace)) + .databaseInput(IcebergToGlueConverter.toDatabaseInput(namespace, newProperties)) + .build()); + LOG.debug("Successfully set properties {} for {}", properties.keySet(), namespace); + // Always successful, otherwise exception is thrown + return true; + } + + @Override + public boolean removeProperties(Namespace namespace, Set properties) throws NoSuchNamespaceException { + Map metadata = Maps.newHashMap(loadNamespaceMetadata(namespace)); + for (String property : properties) { + metadata.remove(property); + } + + glue.updateDatabase(UpdateDatabaseRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .name(IcebergToGlueConverter.toDatabaseName(namespace)) + .databaseInput(IcebergToGlueConverter.toDatabaseInput(namespace, metadata)) + .build()); + LOG.debug("Successfully removed properties {} from {}", properties, namespace); + // Always successful, otherwise exception is thrown + return true; + } + + @Override + protected boolean isValidIdentifier(TableIdentifier tableIdentifier) { + return IcebergToGlueConverter.isValidNamespace(tableIdentifier.namespace()) && + IcebergToGlueConverter.isValidTableName(tableIdentifier.name()); + } + + @Override + public String name() { + return catalogName; + } + + @Override + public void close() throws IOException { + glue.close(); + } + + @Override + public void setConf(Configuration conf) { + this.hadoopConf = conf; + } + + @Override + public Configuration getConf() { + return hadoopConf; + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java new file mode 100644 index 000000000000..92f0d557b66b --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.aws.AwsProperties; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.ConcurrentModificationException; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; + +class GlueTableOperations extends BaseMetastoreTableOperations { + + private static final Logger LOG = LoggerFactory.getLogger(GlueTableOperations.class); + + // same as org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE + // more details: https://docs.aws.amazon.com/glue/latest/webapi/API_TableInput.html + private static final String GLUE_EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE"; + + private final GlueClient glue; + private final AwsProperties awsProperties; + private final String databaseName; + private final String tableName; + private final String fullTableName; + private final FileIO fileIO; + + GlueTableOperations(GlueClient glue, String catalogName, AwsProperties awsProperties, + FileIO fileIO, TableIdentifier tableIdentifier) { + this.glue = glue; + this.awsProperties = awsProperties; + this.databaseName = IcebergToGlueConverter.getDatabaseName(tableIdentifier); + this.tableName = IcebergToGlueConverter.getTableName(tableIdentifier); + this.fullTableName = String.format("%s.%s.%s", catalogName, databaseName, tableName); + this.fileIO = fileIO; + } + + @Override + public FileIO io() { + return fileIO; + } + + @Override + protected String tableName() { + return fullTableName; + } + + @Override + protected void doRefresh() { + String metadataLocation = null; + Table table = getGlueTable(); + if (table != null) { + GlueToIcebergConverter.validateTable(table, tableName()); + metadataLocation = table.parameters().get(METADATA_LOCATION_PROP); + } else { + if (currentMetadataLocation() != null) { + throw new NoSuchTableException("Cannot find Glue table %s after refresh, " + + "maybe another process deleted it or revoked your access permission", tableName()); + } + } + + refreshFromMetadataLocation(metadataLocation); + } + + @Override + protected void doCommit(TableMetadata base, TableMetadata metadata) { + String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); + boolean exceptionThrown = true; + Table glueTable = getGlueTable(); + checkMetadataLocation(glueTable, base); + Map properties = prepareProperties(glueTable, newMetadataLocation); + try { + persistGlueTable(glueTable, properties); + exceptionThrown = false; + } catch (ConcurrentModificationException e) { + throw new CommitFailedException(e, "Cannot commit %s because Glue detected concurrent update", tableName()); + } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) { + throw new AlreadyExistsException(e, + "Cannot commit %s because its Glue table already exists when trying to create one", tableName()); + } catch (SdkException e) { + throw new CommitFailedException(e, "Cannot commit %s because unexpected exception contacting AWS", tableName()); + } finally { + if (exceptionThrown) { + io().deleteFile(newMetadataLocation); + } + } + } + + private void checkMetadataLocation(Table glueTable, TableMetadata base) { + String glueMetadataLocation = glueTable != null ? glueTable.parameters().get(METADATA_LOCATION_PROP) : null; + String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; + if (!Objects.equals(baseMetadataLocation, glueMetadataLocation)) { + throw new CommitFailedException( + "Cannot commit %s because base metadata location '%s' is not same as the current Glue location '%s'", + tableName(), baseMetadataLocation, glueMetadataLocation); + } + } + + private Table getGlueTable() { + try { + GetTableResponse response = glue.getTable(GetTableRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .databaseName(databaseName) + .name(tableName) + .build()); + return response.table(); + } catch (EntityNotFoundException e) { + return null; + } + } + + private Map prepareProperties(Table glueTable, String newMetadataLocation) { + Map properties = glueTable != null ? Maps.newHashMap(glueTable.parameters()) : Maps.newHashMap(); + properties.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH)); + properties.put(METADATA_LOCATION_PROP, newMetadataLocation); + if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) { + properties.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation()); + } + + return properties; + } + + private void persistGlueTable(Table glueTable, Map parameters) { + if (glueTable != null) { + LOG.debug("Committing existing Glue table: {}", tableName()); + glue.updateTable(UpdateTableRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .databaseName(databaseName) + .skipArchive(awsProperties.glueCatalogSkipArchive()) + .tableInput(TableInput.builder() + .name(tableName) + .parameters(parameters) + .build()) + .build()); + } else { + LOG.debug("Committing new Glue table: {}", tableName()); + glue.createTable(CreateTableRequest.builder() + .catalogId(awsProperties.glueCatalogId()) + .databaseName(databaseName) + .tableInput(TableInput.builder() + .name(tableName) + .tableType(GLUE_EXTERNAL_TABLE_TYPE) + .parameters(parameters) + .build()) + .build()); + } + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueToIcebergConverter.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueToIcebergConverter.java new file mode 100644 index 000000000000..5078ba34d609 --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueToIcebergConverter.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ValidationException; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.Table; + +class GlueToIcebergConverter { + + private GlueToIcebergConverter() { + } + + static Namespace toNamespace(Database database) { + return Namespace.of(database.name()); + } + + static TableIdentifier toTableId(Table table) { + return TableIdentifier.of(table.databaseName(), table.name()); + } + + /** + * Validate the Glue table is Iceberg table by checking its parameters + * @param table glue table + * @param fullName full table name for logging + */ + static void validateTable(Table table, String fullName) { + String tableType = table.parameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP); + ValidationException.check(tableType != null && tableType.equalsIgnoreCase( + BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE), + "Input Glue table is not an iceberg table: %s (type=%s)", fullName, tableType); + } +} diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java b/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java new file mode 100644 index 000000000000..ecb94b97f52c --- /dev/null +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.Map; +import java.util.regex.Pattern; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.exceptions.ValidationException; +import software.amazon.awssdk.services.glue.model.DatabaseInput; + +class IcebergToGlueConverter { + + private IcebergToGlueConverter() { + } + + private static final Pattern GLUE_DB_PATTERN = Pattern.compile("^[a-z0-9_]{1,252}$"); + private static final Pattern GLUE_TABLE_PATTERN = Pattern.compile("^[a-z0-9_]{1,255}$"); + + /** + * A Glue database name cannot be longer than 252 characters. + * The only acceptable characters are lowercase letters, numbers, and the underscore character. + * More details: https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html + * @param namespace namespace + * @return if namespace can be accepted by Glue + */ + static boolean isValidNamespace(Namespace namespace) { + if (namespace.levels().length != 1) { + return false; + } + String dbName = namespace.level(0); + return dbName != null && GLUE_DB_PATTERN.matcher(dbName).find(); + } + + /** + * Validate if an Iceberg namespace is valid in Glue + * @param namespace namespace + * @throws NoSuchNamespaceException if namespace is not valid in Glue + */ + static void validateNamespace(Namespace namespace) { + ValidationException.check(isValidNamespace(namespace), "Cannot convert namespace %s to Glue database name, " + + "because it must be 1-252 chars of lowercase letters, numbers, underscore", namespace); + } + + /** + * Validate and convert Iceberg namespace to Glue database name + * @param namespace Iceberg namespace + * @return database name + */ + static String toDatabaseName(Namespace namespace) { + validateNamespace(namespace); + return namespace.level(0); + } + + /** + * Validate and get Glue database name from Iceberg TableIdentifier + * @param tableIdentifier Iceberg table identifier + * @return database name + */ + static String getDatabaseName(TableIdentifier tableIdentifier) { + return toDatabaseName(tableIdentifier.namespace()); + } + + /** + * Validate and convert Iceberg name to Glue DatabaseInput + * @param namespace Iceberg namespace + * @param metadata metadata map + * @return Glue DatabaseInput + */ + static DatabaseInput toDatabaseInput(Namespace namespace, Map metadata) { + return DatabaseInput.builder() + .name(toDatabaseName(namespace)) + .parameters(metadata) + .build(); + } + + /** + * A Glue table name cannot be longer than 255 characters. + * The only acceptable characters are lowercase letters, numbers, and the underscore character. + * More details: https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html + * @param tableName table name + * @return if a table name can be accepted by Glue + */ + static boolean isValidTableName(String tableName) { + return tableName != null && GLUE_TABLE_PATTERN.matcher(tableName).find(); + } + + /** + * Validate if a table name is valid in Glue + * @param tableName table name + * @throws NoSuchTableException if table name not valid in Glue + */ + static void validateTableName(String tableName) { + ValidationException.check(isValidTableName(tableName), "Cannot use %s as Glue table name, " + + "because it must be 1-255 chars of lowercase letters, numbers, underscore", tableName); + } + + /** + * Validate and get Glue table name from Iceberg TableIdentifier + * @param tableIdentifier table identifier + * @return table name + */ + static String getTableName(TableIdentifier tableIdentifier) { + validateTableName(tableIdentifier.name()); + return tableIdentifier.name(); + } + + /** + * Validate Iceberg TableIdentifier is valid in Glue + * @param tableIdentifier Iceberg table identifier + */ + static void validateTableIdentifier(TableIdentifier tableIdentifier) { + validateNamespace(tableIdentifier.namespace()); + validateTableName(tableIdentifier.name()); + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java new file mode 100644 index 000000000000..a80b168f77bb --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.aws.AwsProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse; +import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.DeleteTableResponse; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetDatabaseResponse; +import software.amazon.awssdk.services.glue.model.GetDatabasesRequest; +import software.amazon.awssdk.services.glue.model.GetDatabasesResponse; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse; + +public class GlueCatalogTest { + + private static final String WAREHOUSE_PATH = "s3://bucket"; + private static final String CATALOG_NAME = "glue"; + private GlueClient glue; + private GlueCatalog glueCatalog; + + @Before + public void before() { + glue = Mockito.mock(GlueClient.class); + glueCatalog = new GlueCatalog(glue); + glueCatalog.initialize(CATALOG_NAME, WAREHOUSE_PATH, new AwsProperties(), null); + } + + @Test + public void constructor_emptyWarehousePath() { + AssertHelpers.assertThrows("warehouse path cannot be null", + IllegalArgumentException.class, + "Cannot initialize GlueCatalog because warehousePath must not be null", + () -> { + GlueCatalog catalog = new GlueCatalog(glue); + catalog.initialize(CATALOG_NAME, null, new AwsProperties(), null); + }); + } + + @Test + public void constructor_warehousePathWithEndSlash() { + GlueCatalog catalogWithSlash = new GlueCatalog(glue); + catalogWithSlash.initialize(CATALOG_NAME, WAREHOUSE_PATH + "/", new AwsProperties(), null); + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db").build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + String location = catalogWithSlash.defaultWarehouseLocation(TableIdentifier.of("db", "table")); + Assert.assertEquals(WAREHOUSE_PATH + "/db.db/table", location); + } + + @Test + public void defaultWarehouseLocation_noDbUri() { + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db").build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + String location = glueCatalog.defaultWarehouseLocation(TableIdentifier.of("db", "table")); + Assert.assertEquals(WAREHOUSE_PATH + "/db.db/table", location); + } + + @Test + public void defaultWarehouseLocation_dbUri() { + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db").locationUri("s3://bucket2/db").build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + String location = glueCatalog.defaultWarehouseLocation(TableIdentifier.of("db", "table")); + Assert.assertEquals("s3://bucket2/db/table", location); + } + + @Test + public void listTables() { + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db1").build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + Mockito.doReturn(GetTablesResponse.builder() + .tableList( + Table.builder().databaseName("db1").name("t1").build(), + Table.builder().databaseName("db1").name("t2").build() + ).build()) + .when(glue).getTables(Mockito.any(GetTablesRequest.class)); + Assert.assertEquals( + Lists.newArrayList( + TableIdentifier.of("db1", "t1"), + TableIdentifier.of("db1", "t2") + ), + glueCatalog.listTables(Namespace.of("db1")) + ); + } + + @Test + public void listTables_pagination() { + AtomicInteger counter = new AtomicInteger(10); + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db1").build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (counter.decrementAndGet() > 0) { + return GetTablesResponse.builder() + .tableList( + Table.builder().databaseName("db1").name( + UUID.randomUUID().toString().replace("-", "")).build() + ) + .nextToken("token") + .build(); + } else { + return GetTablesResponse.builder() + .tableList(Table.builder().databaseName("db1").name("tb1").build()) + .build(); + } + } + }).when(glue).getTables(Mockito.any(GetTablesRequest.class)); + Assert.assertEquals(10, glueCatalog.listTables(Namespace.of("db1")).size()); + } + + @Test + public void dropTable() { + Map properties = new HashMap<>(); + properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, + BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE); + Mockito.doReturn(GetTableResponse.builder() + .table(Table.builder().databaseName("db1").name("t1").parameters(properties).build()).build()) + .when(glue).getTable(Mockito.any(GetTableRequest.class)); + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db1").build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + Mockito.doReturn(DeleteTableResponse.builder().build()) + .when(glue).deleteTable(Mockito.any(DeleteTableRequest.class)); + glueCatalog.dropTable(TableIdentifier.of("db1", "t1")); + } + + @Test + public void renameTable() { + AtomicInteger counter = new AtomicInteger(1); + Map properties = new HashMap<>(); + properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, + BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE); + Mockito.doReturn(GetTableResponse.builder() + .table(Table.builder().databaseName("db1").name("t1").parameters(properties).build()).build()) + .when(glue).getTable(Mockito.any(GetTableRequest.class)); + Mockito.doReturn(GetTablesResponse.builder().build()) + .when(glue).getTables(Mockito.any(GetTablesRequest.class)); + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db1").build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + counter.decrementAndGet(); + return DeleteTableResponse.builder().build(); + } + }).when(glue).deleteTable(Mockito.any(DeleteTableRequest.class)); + glueCatalog.dropTable(TableIdentifier.of("db1", "t1")); + Assert.assertEquals(0, counter.get()); + } + + @Test + public void createNamespace() { + Mockito.doReturn(CreateDatabaseResponse.builder().build()) + .when(glue).createDatabase(Mockito.any(CreateDatabaseRequest.class)); + glueCatalog.createNamespace(Namespace.of("db")); + } + + @Test + public void createNamespace_badName() { + Mockito.doReturn(CreateDatabaseResponse.builder().build()) + .when(glue).createDatabase(Mockito.any(CreateDatabaseRequest.class)); + List invalidNamespaces = Lists.newArrayList( + Namespace.of("db-1"), + Namespace.of("db", "db2") + ); + + for (Namespace namespace : invalidNamespaces) { + AssertHelpers.assertThrows("should not create namespace with invalid or nested names", + ValidationException.class, + "Cannot convert namespace", + () -> glueCatalog.createNamespace(namespace)); + } + } + + @Test + public void listNamespaces_all() { + Mockito.doReturn(GetDatabasesResponse.builder() + .databaseList( + Database.builder().name("db1").build(), + Database.builder().name("db2").build() + ).build()) + .when(glue).getDatabases(Mockito.any(GetDatabasesRequest.class)); + Assert.assertEquals( + Lists.newArrayList( + Namespace.of("db1"), + Namespace.of("db2") + ), + glueCatalog.listNamespaces() + ); + } + + @Test + public void listNamespaces_pagination() { + AtomicInteger counter = new AtomicInteger(10); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (counter.decrementAndGet() > 0) { + return GetDatabasesResponse.builder() + .databaseList( + Database.builder().name(UUID.randomUUID().toString().replace("-", "")).build() + ) + .nextToken("token") + .build(); + } else { + return GetDatabasesResponse.builder() + .databaseList(Database.builder().name("db").build()) + .build(); + } + } + }).when(glue).getDatabases(Mockito.any(GetDatabasesRequest.class)); + Assert.assertEquals(10, glueCatalog.listNamespaces().size()); + } + + @Test + public void listNamespaces_self() { + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db1").build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + Assert.assertEquals( + "list self should return empty list", + Lists.newArrayList(), + glueCatalog.listNamespaces(Namespace.of("db1")) + ); + } + + @Test + public void listNamespaces_selfInvalid() { + AssertHelpers.assertThrows("table name invalid", + ValidationException.class, + "Cannot convert namespace", + () -> glueCatalog.listNamespaces(Namespace.of("db-1"))); + } + + @Test + public void loadNamespaceMetadata() { + Map parameters = new HashMap<>(); + parameters.put("key", "val"); + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db1") + .parameters(parameters) + .build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + Assert.assertEquals(parameters, glueCatalog.loadNamespaceMetadata(Namespace.of("db1"))); + } + + @Test + public void dropNamespace() { + Mockito.doReturn(GetTablesResponse.builder().build()) + .when(glue).getTables(Mockito.any(GetTablesRequest.class)); + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db1").build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + Mockito.doReturn(DeleteDatabaseResponse.builder().build()) + .when(glue).deleteDatabase(Mockito.any(DeleteDatabaseRequest.class)); + glueCatalog.dropNamespace(Namespace.of("db1")); + } + + @Test + public void dropNamespace_notEmpty() { + Mockito.doReturn(GetTablesResponse.builder() + .tableList( + Table.builder().databaseName("db1").name("t1").build(), + Table.builder().databaseName("db1").name("t2").build() + ).build()) + .when(glue).getTables(Mockito.any(GetTablesRequest.class)); + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db1").build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + Mockito.doReturn(DeleteDatabaseResponse.builder().build()) + .when(glue).deleteDatabase(Mockito.any(DeleteDatabaseRequest.class)); + AssertHelpers.assertThrows("namespace should not be dropped when still has table", + NamespaceNotEmptyException.class, + "Cannot drop namespace", + () -> glueCatalog.dropNamespace(Namespace.of("db1"))); + } + + @Test + public void setProperties() { + Map parameters = new HashMap<>(); + parameters.put("key", "val"); + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db1") + .parameters(parameters) + .build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + Mockito.doReturn(UpdateDatabaseResponse.builder().build()) + .when(glue).updateDatabase(Mockito.any(UpdateDatabaseRequest.class)); + glueCatalog.setProperties(Namespace.of("db1"), parameters); + } + + @Test + public void removeProperties() { + Map parameters = new HashMap<>(); + parameters.put("key", "val"); + Mockito.doReturn(GetDatabaseResponse.builder() + .database(Database.builder().name("db1") + .parameters(parameters) + .build()).build()) + .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class)); + Mockito.doReturn(UpdateDatabaseResponse.builder().build()) + .when(glue).updateDatabase(Mockito.any(UpdateDatabaseRequest.class)); + glueCatalog.removeProperties(Namespace.of("db1"), Sets.newHashSet("key")); + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueToIcebergConverterTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueToIcebergConverterTest.java new file mode 100644 index 000000000000..88a9eee2942b --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueToIcebergConverterTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.HashMap; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.ValidationException; +import org.junit.Assert; +import org.junit.Test; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.Table; + +public class GlueToIcebergConverterTest { + + @Test + public void toNamespace() { + Database database = Database.builder() + .name("db") + .build(); + Namespace namespace = Namespace.of("db"); + Assert.assertEquals(namespace, GlueToIcebergConverter.toNamespace(database)); + } + + @Test + public void toTableId() { + Table table = Table.builder() + .databaseName("db") + .name("name") + .build(); + TableIdentifier icebergId = TableIdentifier.of("db", "name"); + Assert.assertEquals(icebergId, GlueToIcebergConverter.toTableId(table)); + } + + @Test + public void validateTable() { + Map properties = new HashMap<>(); + properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, + BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE); + Table table = Table.builder() + .parameters(properties) + .build(); + GlueToIcebergConverter.validateTable(table, "name"); + } + + @Test + public void validateTable_icebergPropertyNotFound() { + Map properties = new HashMap<>(); + Table table = Table.builder() + .parameters(properties) + .build(); + AssertHelpers.assertThrows("Iceberg property not found", + ValidationException.class, + "Input Glue table is not an iceberg table", + () -> GlueToIcebergConverter.validateTable(table, "name") + ); + } + + @Test + public void validateTable_icebergPropertyValueWrong() { + Map properties = new HashMap<>(); + properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, "other"); + Table table = Table.builder() + .parameters(properties) + .build(); + AssertHelpers.assertThrows("Iceberg property value wrong", + ValidationException.class, + "Input Glue table is not an iceberg table", + () -> GlueToIcebergConverter.validateTable(table, "name") + ); + } +} diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/IcebergToGlueConverterTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/IcebergToGlueConverterTest.java new file mode 100644 index 000000000000..d4c20b7d0f8a --- /dev/null +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/IcebergToGlueConverterTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.aws.glue; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Test; +import software.amazon.awssdk.services.glue.model.DatabaseInput; + +public class IcebergToGlueConverterTest { + + @Test + public void toDatabaseName() { + Assert.assertEquals("db", IcebergToGlueConverter.toDatabaseName(Namespace.of("db"))); + } + + @Test + public void toDatabaseName_fail() { + List badNames = Lists.newArrayList( + Namespace.of("db", "a"), + Namespace.of("db-1"), + Namespace.empty(), + Namespace.of(""), + Namespace.of(new String(new char[600]).replace("\0", "a"))); + for (Namespace name : badNames) { + AssertHelpers.assertThrows("bad namespace name", + ValidationException.class, + "Cannot convert namespace", + () -> IcebergToGlueConverter.toDatabaseName(name) + ); + } + } + + @Test + public void toDatabaseInput() { + Map param = new HashMap<>(); + DatabaseInput input = DatabaseInput.builder() + .name("db") + .parameters(param) + .build(); + Namespace namespace = Namespace.of("db"); + Assert.assertEquals(input, IcebergToGlueConverter.toDatabaseInput(namespace, param)); + } +} diff --git a/build.gradle b/build.gradle index 7df7c9f079b3..a385685f6b7d 100644 --- a/build.gradle +++ b/build.gradle @@ -252,7 +252,16 @@ project(':iceberg-aws') { compile 'software.amazon.awssdk:url-connection-client' compile 'software.amazon.awssdk:s3' compile 'software.amazon.awssdk:kms' + compile 'software.amazon.awssdk:glue' + compileOnly("org.apache.hadoop:hadoop-common") { + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'org.slf4j', module: 'slf4j-log4j12' + exclude group: 'javax.servlet', module: 'servlet-api' + exclude group: 'com.google.code.gson', module: 'gson' + } + + testCompile project(path: ':iceberg-api', configuration: 'testArtifacts') testCompile("com.adobe.testing:s3mock-junit4") { exclude module: "spring-boot-starter-logging" exclude module: "logback-classic"