Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class GlueTestBase {
// iceberg
static GlueCatalog glueCatalog;
static GlueCatalog glueCatalogWithSkipNameValidation;
static GlueCatalog glueCatalogWithForceRegisterTable;

static Schema schema =
new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), "c1"));
Expand Down Expand Up @@ -95,6 +96,17 @@ public static void beforeClass() {
null,
fileIO,
ImmutableMap.of());
glueCatalogWithForceRegisterTable = new GlueCatalog();
AwsProperties propertiesForceRegisterTable = new AwsProperties();
propertiesForceRegisterTable.setGlueCatalogForceRegisterTable(true);
glueCatalogWithForceRegisterTable.initialize(
catalogName,
testBucketPath,
propertiesForceRegisterTable,
glue,
LockManagers.defaultLockManager(),
fileIO,
ImmutableMap.of());
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -269,14 +269,6 @@ public void testRenameTableFailsToDeleteOldTable() {
.databaseName(namespace)
.tableInput(TableInput.builder().name(tableName).parameters(Maps.newHashMap()).build())
.build());
AssertHelpers.assertThrows(
"should fail to rename",
ValidationException.class,
"Input Glue table is not an iceberg table",
() ->
glueCatalog.renameTable(
TableIdentifier.of(namespace, tableName),
TableIdentifier.of(namespace, newTableName)));
Comment on lines -272 to -279
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this assertion removed? It looks like it's for rename

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion was removed due to the logic change in renameTable that allows rename table to use the previous table's Iceberg Properties (metadata location) the related integration test would always fail at this assert.

AssertHelpers.assertThrows(
"renamed table should be deleted",
EntityNotFoundException.class,
Expand Down Expand Up @@ -575,11 +567,60 @@ public void testRegisterTable() {
String expectedMetadataLocation =
((BaseTable) table).operations().current().metadataFileLocation();
Assertions.assertThat(metadataLocation).isEqualTo(expectedMetadataLocation);

Assertions.assertThat(glueCatalog.loadTable(identifier)).isNotNull();
Assertions.assertThat(glueCatalog.dropTable(identifier, true)).isTrue();
Assertions.assertThat(glueCatalog.dropNamespace(Namespace.of(namespace))).isTrue();
}

@Test
public void testRegisterTableForceRegister() {
String namespace = createNamespace();
String tableName = getRandomName();
createTable(namespace, tableName);
TableIdentifier identifier = TableIdentifier.of(namespace, tableName);
Table table = glueCatalogWithForceRegisterTable.loadTable(identifier);
String metadataLocation = ((BaseTable) table).operations().current().metadataFileLocation();
Assertions.assertThat(glueCatalogWithForceRegisterTable.dropTable(identifier, false)).isTrue();
Table registeredTable =
glueCatalogWithForceRegisterTable.registerTable(identifier, metadataLocation);
Assertions.assertThat(registeredTable).isNotNull();
String expectedMetadataLocation =
((BaseTable) table).operations().current().metadataFileLocation();
Assertions.assertThat(metadataLocation).isEqualTo(expectedMetadataLocation);

GetTableResponse response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
String actualMetadataLocationGlue =
response.table().parameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);

Assert.assertEquals(
"Glue Catalog Register Table should not submit a new commit",
expectedMetadataLocation,
actualMetadataLocationGlue);

Assertions.assertThat(glueCatalogWithForceRegisterTable.loadTable(identifier)).isNotNull();
Assertions.assertThat(glueCatalogWithForceRegisterTable.dropTable(identifier, true)).isTrue();
Assertions.assertThat(glueCatalog.dropNamespace(Namespace.of(namespace))).isTrue();
}

@Test
public void testRegisterTableNamespaceNotFound() {
String namespace = createNamespace();
String tableName = getRandomName();
createTable(namespace, tableName);
Table table =
glueCatalogWithForceRegisterTable.loadTable(TableIdentifier.of(namespace, tableName));
String metadataLocation = ((BaseTable) table).operations().current().metadataFileLocation();
AssertHelpers.assertThrows(
"Should fail to register to an unknown namespace",
NoSuchNamespaceException.class,
"not found in Glue",
() ->
glueCatalogWithForceRegisterTable.registerTable(
TableIdentifier.of(getRandomName(), getRandomName()), metadataLocation));
}

@Test
public void testRegisterTableAlreadyExists() {
String namespace = createNamespace();
Expand All @@ -594,6 +635,55 @@ public void testRegisterTableAlreadyExists() {
Assertions.assertThat(glueCatalog.dropNamespace(Namespace.of(namespace))).isTrue();
}

@Test
public void testRegisterTableAlreadyExistsForceRegister() {
String namespace = createNamespace();
String tableName = getRandomName();
createTable(namespace, tableName);
TableIdentifier identifier = TableIdentifier.of(namespace, tableName);
Table table = glueCatalogWithForceRegisterTable.loadTable(identifier);
String metadataLocation = ((BaseTable) table).operations().current().metadataFileLocation();
Assertions.assertThat(glueCatalogWithForceRegisterTable.dropTable(identifier, false)).isTrue();
Table registeredTable =
glueCatalogWithForceRegisterTable.registerTable(identifier, metadataLocation);
Assertions.assertThat(registeredTable).isNotNull();

GetTableResponse response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
Assert.assertEquals(
"external table type is set after register",
"EXTERNAL_TABLE",
response.table().tableType());
String actualMetadataLocation =
response.table().parameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
Assert.assertEquals(
"metadata location should be updated with registerTable call",
metadataLocation,
actualMetadataLocation);

// commit new transaction, should create a new metadata file
DataFile dataFile =
DataFiles.builder(partitionSpec)
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
table.newAppend().appendFile(dataFile).commit();

metadataLocation = ((BaseTable) table).operations().current().metadataFileLocation();
// update metadata location
glueCatalogWithForceRegisterTable.registerTable(identifier, metadataLocation);
response =
glue.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
String updatedMetadataLocation =
response.table().parameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
Assert.assertEquals(
"metadata location should be updated with registerTable call",
metadataLocation,
updatedMetadataLocation);
Assert.assertEquals("Table Version should be updated", "2", response.table().versionId());
}

@Test
public void testTableLevelS3Tags() {
String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public GlueClient glue() {
return GlueClient.builder()
.applyMutation(this::applyAssumeRoleConfigurations)
.applyMutation(awsProperties::applyHttpClientConfigurations)
.applyMutation(awsProperties::applyGlueEndpointConfigurations)
.build();
}

Expand Down
41 changes: 41 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ public class AwsProperties implements Serializable {
*/
public static final String GLUE_CATALOG_ENDPOINT = "glue.endpoint";

/**
* If set, Glue will always update the catalog table if the table already exists in glue catalog.
* By default, Glue catalog will only be able to create new table and will throw
* AlreadyExistsException when register an existing table name.
*/
public static final String GLUE_CATALOG_FORCE_REGISTER_TABLE = "glue.force-register-table";

public static final boolean GLUE_CATALOG_FORCE_REGISTER_TABLE_DEFAULT = false;

/** Configure the Glue Catalog S3 FileIO Region to allow cross region s3 access */
public static final String GLUE_CATALOG_FILE_IO_REGION = "glue.catalog-file-io-region";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we already have client.region which sets the region for all services. I am going to be introducing a change in the near future that will allow setting per-region for the default AWS Client Factory (and we can extend it to the Assume Role client factory as well), so we probably want a more general per-service naming scheme. Based on how existing parameters are formatted where glue. and s3. are already established prefixes, most likely something like: glue.region, s3.region, kms.region, etc...


/**
* Number of threads to use for uploading parts to S3 (shared pool across all output streams),
* default to {@link Runtime#availableProcessors()}
Expand Down Expand Up @@ -911,6 +923,8 @@ public class AwsProperties implements Serializable {
private boolean glueCatalogSkipArchive;
private boolean glueCatalogSkipNameValidation;
private boolean glueLakeFormationEnabled;
private boolean glueCatalogForceRegisterTable;
private String glueCatalogFileIORegion;

private String dynamoDbTableName;
private String dynamoDbEndpoint;
Expand Down Expand Up @@ -970,6 +984,8 @@ public AwsProperties() {
this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;
this.glueCatalogSkipNameValidation = GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT;
this.glueLakeFormationEnabled = GLUE_LAKEFORMATION_ENABLED_DEFAULT;
this.glueCatalogForceRegisterTable = GLUE_CATALOG_FORCE_REGISTER_TABLE_DEFAULT;
this.glueCatalogFileIORegion = null;

this.dynamoDbEndpoint = null;
this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT;
Expand Down Expand Up @@ -1030,6 +1046,13 @@ public AwsProperties(Map<String, String> properties) {
this.glueLakeFormationEnabled =
PropertyUtil.propertyAsBoolean(
properties, GLUE_LAKEFORMATION_ENABLED, GLUE_LAKEFORMATION_ENABLED_DEFAULT);
this.glueCatalogForceRegisterTable =
PropertyUtil.propertyAsBoolean(
properties,
GLUE_CATALOG_FORCE_REGISTER_TABLE,
GLUE_CATALOG_FORCE_REGISTER_TABLE_DEFAULT);
this.glueCatalogFileIORegion = properties.get(GLUE_CATALOG_FILE_IO_REGION);

this.s3FileIoMultipartUploadThreads =
PropertyUtil.propertyAsInt(
properties,
Expand Down Expand Up @@ -1252,6 +1275,24 @@ public void setGlueLakeFormationEnabled(boolean glueLakeFormationEnabled) {
this.glueLakeFormationEnabled = glueLakeFormationEnabled;
}


public boolean glueCatalogForceRegisterTable() {
return glueCatalogForceRegisterTable;
}

public void setGlueCatalogForceRegisterTable(boolean glueCatalogForceRegisterTable) {
this.glueCatalogForceRegisterTable = glueCatalogForceRegisterTable;
}

public String getGlueCatalogFileIORegion() {
return glueCatalogFileIORegion;
}

public void setGlueCatalogFileIORegion(String glueCatalogFileIORegion) {
this.glueCatalogFileIORegion = glueCatalogFileIORegion;
}


/**
* @deprecated will be removed in 1.4.0, use {@link org.apache.iceberg.aws.s3.S3FileIOProperties}
* instead
Expand Down
84 changes: 83 additions & 1 deletion aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -31,7 +33,9 @@
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.LockManager;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.aws.AssumeRoleAwsClientFactory;
import org.apache.iceberg.aws.AwsClientFactories;
import org.apache.iceberg.aws.AwsClientFactory;
import org.apache.iceberg.aws.AwsProperties;
Expand All @@ -49,6 +53,7 @@
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -78,6 +83,7 @@
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;
import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
import software.amazon.awssdk.services.glue.model.UpdateTableRequest;

public class GlueCatalog extends BaseMetastoreCatalog
implements Closeable, SupportsNamespaces, Configurable<Configuration> {
Expand Down Expand Up @@ -111,7 +117,8 @@ public GlueCatalog() {}

@Override
public void initialize(String name, Map<String, String> properties) {
this.catalogProperties = ImmutableMap.copyOf(properties);
this.catalogProperties = new HashMap<>();
catalogProperties.putAll(properties);
Comment on lines -114 to +121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making CatalogProperties non-immutable in order to effectively side-load information into the AWS client factory post-initialization is a dangerous precedent to set. In addition, it doesn't seem to actually accomplish anything besides allowing the GlueCatalog to suddenly switch regions post-initialization, which is likely to introduce some dangerous side effects.

AwsClientFactory awsClientFactory;
FileIO catalogFileIO;
if (PropertyUtil.propertyAsBoolean(
Expand Down Expand Up @@ -437,6 +444,81 @@ public void renameTable(TableIdentifier from, TableIdentifier to) {
LOG.info("Successfully renamed table from {} to {}", from, to);
}

@Override
public org.apache.iceberg.Table registerTable(
TableIdentifier identifier, String metadataFileLocation) {
Preconditions.checkArgument(
isValidIdentifier(identifier), "Table identifier to register is invalid: " + identifier);
Preconditions.checkArgument(
metadataFileLocation != null && !metadataFileLocation.isEmpty(),
"Cannot register an empty metadata file location as a table");

// keep the original behavior when force-register-table flag is off
if (!awsProperties.glueCatalogForceRegisterTable()) {
return super.registerTable(identifier, metadataFileLocation);
}

String factoryImpl =
PropertyUtil.propertyAsString(catalogProperties, AwsProperties.CLIENT_FACTORY, null);
if (factoryImpl != null && factoryImpl.equals(AssumeRoleAwsClientFactory.class.getName())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It hurts extensibility to make logic specific to a particular implementation class. If, for example, a customer needs to extend AssumeRoleAwsClientFactory to add some functionality for their own use, this logic will break as the class name will no longer match.

// overwrite client assume_role_region for file IO to make cross region call
String catalogFileIORegion = awsProperties.getGlueCatalogFileIORegion();
if (catalogFileIORegion != null) {
catalogProperties.put(AwsProperties.CLIENT_ASSUME_ROLE_REGION, catalogFileIORegion);
}
Comment on lines +465 to +468
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this logic and the associated parameter be removed and replaced with just setting AwsProperties.CLIENT_ASSUME_ROLE_REGION directly? This change is not in any way scoped to this call or AWS service, it effectively creates a case where what region the GlueCatalog is calling can suddenly change post-initialization after the first time registerTable is called.

}

TableOperations ops = newTableOps(identifier);
InputFile metadataFile = ops.io().newInputFile(metadataFileLocation);
TableMetadata metadata = TableMetadataParser.read(ops.io(), metadataFile);

Map<String, String> tableParameters =
ImmutableMap.of(
BaseMetastoreTableOperations.TABLE_TYPE_PROP,
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toLowerCase(Locale.ENGLISH),
BaseMetastoreTableOperations.METADATA_LOCATION_PROP,
metadataFileLocation);

String databaseName =
IcebergToGlueConverter.getDatabaseName(
identifier, awsProperties.glueCatalogSkipNameValidation());
String tableName =
IcebergToGlueConverter.getTableName(
identifier, awsProperties.glueCatalogSkipNameValidation());

TableInput tableInput =
TableInput.builder()
.applyMutation(
builder ->
IcebergToGlueConverter.setTableInputInformation(
builder, metadata, tableParameters))
.name(tableName)
.tableType(GlueTableOperations.GLUE_EXTERNAL_TABLE_TYPE)
.parameters(tableParameters)
.build();

try {
glue.createTable(
CreateTableRequest.builder().databaseName(databaseName).tableInput(tableInput).build());
} catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) {
GetTableResponse response =
glue.getTable(
GetTableRequest.builder().databaseName(databaseName).name(tableName).build());
String versionId = response.table().versionId();
glue.updateTable(
UpdateTableRequest.builder()
.databaseName(databaseName)
.tableInput(tableInput)
.versionId(versionId)
.build());
} catch (EntityNotFoundException e) {
Comment on lines +471 to +514
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this logic is a combination of a fork of the logic in BaseMetastoreCatalog.registerTable and GlueTableOperations.persistGlueTable. As GlueTableOperations also has access to AwsProperties, I would recommend refactoring the logic in GlueTableOperations so that persistGlueTable can conditionally fall back to its update mode as that improves code reuse.

If the concern is the creation of an extra metadata file, it looks like GlueTableOperations already checks whether it needs to during the writeNewMetadataIfRequired function and considering tableMetadata is always set for registerTable, it will never choose to write a new metadata file anyways.

throw new NoSuchNamespaceException(
e, "Namespace %s is not found in Glue", identifier.namespace());
Comment on lines +515 to +516
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception handling seems off, can't EntityNotFoundException also be thrown when the table is not found?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception handling is meant for catching exceptions for glue.createTable calls only, for any exception during the getTable & updateTable calls we should have it throw exceptions as expected, thoughts?
Also I think the EntityNotFoundException won't be thrown for table not found in the case of Table AlreadyExistsException.

}

return loadTable(identifier);
}

@Override
public void createNamespace(Namespace namespace, Map<String, String> metadata) {
try {
Expand Down
Loading