From 1b8355093e03f68ebc210d06d43ba8870be86ea3 Mon Sep 17 00:00:00 2001 From: dxu Date: Thu, 27 Mar 2025 12:11:37 +0100 Subject: [PATCH] feat: add optional Glue Schema configuration to exclude Non-Current Fields --- .../aws/glue/TestGlueCatalogTable.java | 69 +++++++++++++++++++ .../org/apache/iceberg/aws/AwsProperties.java | 18 +++++ .../iceberg/aws/glue/GlueTableOperations.java | 8 ++- .../aws/glue/IcebergToGlueConverter.java | 23 +++++-- .../aws/glue/TestIcebergToGlueConverter.java | 8 +-- 5 files changed, 115 insertions(+), 11 deletions(-) diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java index 50883703bae0..6252b205c387 100644 --- a/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java +++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/TestGlueCatalogTable.java @@ -755,4 +755,73 @@ public void testTableLevelS3Tags() { .containsEntry(S3FileIOProperties.S3_TAG_ICEBERG_TABLE, tableName) .containsEntry(S3FileIOProperties.S3_TAG_ICEBERG_NAMESPACE, namespace); } + + @Test + public void testDisableNonCurrentFields() { + AwsProperties properties = new AwsProperties(); + properties.setGlueNonCurrentFieldsDisabled(true); + glueCatalog.initialize( + CATALOG_NAME, + TEST_BUCKET_PATH, + properties, + new S3FileIOProperties(), + GLUE, + LockManagers.defaultLockManager(), + ImmutableMap.of()); + String namespace = getRandomName(); + + NAMESPACES.add(namespace); + glueCatalog.createNamespace(Namespace.of(namespace)); + String tableName = getRandomName(); + glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec); + Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)); + table + .updateSchema() + .addColumn( + "c2", + Types.StructType.of(Types.NestedField.required(3, "z", Types.IntegerType.get())), + "c2") + .addColumn("c3", Types.StringType.get()) + .addColumn("c4", Types.StringType.get()) + .commit(); + table.updateSpec().addField(truncate("c1", 8)).commit(); + table.updateSchema().deleteColumn("c3").renameColumn("c4", "c5").commit(); + + GetTableResponse response = + GLUE.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build()); + List actualColumns = response.table().storageDescriptor().columns(); + + List expectedColumns = + ImmutableList.of( + Column.builder() + .name("c1") + .type("string") + .comment("c1") + .parameters( + ImmutableMap.of( + IcebergToGlueConverter.ICEBERG_FIELD_ID, "1", + IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false", + IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true")) + .build(), + Column.builder() + .name("c2") + .type("struct") + .comment("c2") + .parameters( + ImmutableMap.of( + IcebergToGlueConverter.ICEBERG_FIELD_ID, "2", + IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true", + IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true")) + .build(), + Column.builder() + .name("c5") + .type("string") + .parameters( + ImmutableMap.of( + IcebergToGlueConverter.ICEBERG_FIELD_ID, "5", + IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true", + IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true")) + .build()); + assertThat(actualColumns).isEqualTo(expectedColumns); + } } diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java index 1a8db990578a..116b56170971 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java +++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java @@ -98,6 +98,11 @@ public class AwsProperties implements Serializable { */ public static final String GLUE_CATALOG_ENDPOINT = "glue.endpoint"; + /** Option to disable including non-current fields in Glue Schema */ + public static final String GLUE_NON_CURRENT_FIELDS_DISABLED = "glue.non-current-fields-disabled"; + + public static final boolean GLUE_NON_CURRENT_FIELDS_DISABLED_DEFAULT = false; + /** Configure an alternative endpoint of the DynamoDB service to access. */ public static final String DYNAMODB_ENDPOINT = "dynamodb.endpoint"; @@ -221,6 +226,7 @@ public class AwsProperties implements Serializable { private boolean glueCatalogSkipArchive; private boolean glueCatalogSkipNameValidation; private boolean glueLakeFormationEnabled; + private boolean glueNonCurrentFieldsDisabled; private String dynamoDbTableName; private final String dynamoDbEndpoint; @@ -247,6 +253,7 @@ public AwsProperties() { this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT; this.glueCatalogSkipNameValidation = GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT; this.glueLakeFormationEnabled = GLUE_LAKEFORMATION_ENABLED_DEFAULT; + this.glueNonCurrentFieldsDisabled = GLUE_NON_CURRENT_FIELDS_DISABLED_DEFAULT; this.dynamoDbEndpoint = null; this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT; @@ -283,6 +290,9 @@ public AwsProperties(Map properties) { this.glueLakeFormationEnabled = PropertyUtil.propertyAsBoolean( properties, GLUE_LAKEFORMATION_ENABLED, GLUE_LAKEFORMATION_ENABLED_DEFAULT); + this.glueNonCurrentFieldsDisabled = + PropertyUtil.propertyAsBoolean( + properties, GLUE_NON_CURRENT_FIELDS_DISABLED, GLUE_NON_CURRENT_FIELDS_DISABLED_DEFAULT); this.dynamoDbEndpoint = properties.get(DYNAMODB_ENDPOINT); this.dynamoDbTableName = @@ -351,6 +361,14 @@ public void setGlueLakeFormationEnabled(boolean glueLakeFormationEnabled) { this.glueLakeFormationEnabled = glueLakeFormationEnabled; } + public boolean glueNonCurrentFieldsDisabled() { + return glueNonCurrentFieldsDisabled; + } + + public void setGlueNonCurrentFieldsDisabled(boolean glueNonCurrentFieldsDisabled) { + this.glueNonCurrentFieldsDisabled = glueNonCurrentFieldsDisabled; + } + public String dynamoDbTableName() { return dynamoDbTableName; } diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java index 4c63dfdb2a70..e145d590ebea 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java @@ -319,7 +319,10 @@ void persistGlueTable( .applyMutation( builder -> IcebergToGlueConverter.setTableInputInformation( - builder, metadata, glueTable)) + builder, + metadata, + glueTable, + awsProperties.glueNonCurrentFieldsDisabled())) .name(tableName) .tableType(GLUE_EXTERNAL_TABLE_TYPE) .parameters(parameters) @@ -341,7 +344,8 @@ void persistGlueTable( TableInput.builder() .applyMutation( builder -> - IcebergToGlueConverter.setTableInputInformation(builder, metadata)) + IcebergToGlueConverter.setTableInputInformation( + builder, metadata, awsProperties.glueNonCurrentFieldsDisabled())) .name(tableName) .tableType(GLUE_EXTERNAL_TABLE_TYPE) .parameters(parameters) diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java b/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java index 56b38a47e968..fbb05d166d4d 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java @@ -218,10 +218,13 @@ static String getTableName(TableIdentifier tableIdentifier, boolean skipNameVali * * @param tableInputBuilder Glue TableInput builder * @param metadata Iceberg table metadata + * @param disableNonCurrentFields option to disable writing non current fields to Glue table */ static void setTableInputInformation( - TableInput.Builder tableInputBuilder, TableMetadata metadata) { - setTableInputInformation(tableInputBuilder, metadata, null); + TableInput.Builder tableInputBuilder, + TableMetadata metadata, + Boolean disableNonCurrentFields) { + setTableInputInformation(tableInputBuilder, metadata, null, disableNonCurrentFields); } /** @@ -241,9 +244,13 @@ static void setTableInputInformation( * @param tableInputBuilder Glue TableInput builder * @param metadata Iceberg table metadata * @param existingTable optional existing Glue table, used to preserve column comments + * @param disableNonCurrentFields option to disable writing non current fields to Glue table */ static void setTableInputInformation( - TableInput.Builder tableInputBuilder, TableMetadata metadata, Table existingTable) { + TableInput.Builder tableInputBuilder, + TableMetadata metadata, + Table existingTable, + Boolean disableNonCurrentFields) { try { Map properties = metadata.properties(); StorageDescriptor.Builder storageDescriptor = StorageDescriptor.builder(); @@ -274,7 +281,7 @@ static void setTableInputInformation( existingColumnMap = Collections.emptyMap(); } - List columns = toColumns(metadata, existingColumnMap); + List columns = toColumns(metadata, existingColumnMap, disableNonCurrentFields); tableInputBuilder.storageDescriptor( storageDescriptor.location(metadata.location()).columns(columns).build()); @@ -340,7 +347,9 @@ private static String toTypeString(Type type) { } private static List toColumns( - TableMetadata metadata, Map existingColumnMap) { + TableMetadata metadata, + Map existingColumnMap, + Boolean disableNonCurrentFields) { List columns = Lists.newArrayList(); Set addedNames = Sets.newHashSet(); @@ -348,6 +357,10 @@ private static List toColumns( addColumnWithDedupe(columns, addedNames, field, true /* is current */, existingColumnMap); } + if (disableNonCurrentFields) { + return columns; + } + for (Schema schema : metadata.schemas()) { if (schema.schemaId() != metadata.currentSchemaId()) { for (NestedField field : schema.columns()) { diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/TestIcebergToGlueConverter.java b/aws/src/test/java/org/apache/iceberg/aws/glue/TestIcebergToGlueConverter.java index edebfd3420e2..6030b89d6ab2 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/glue/TestIcebergToGlueConverter.java +++ b/aws/src/test/java/org/apache/iceberg/aws/glue/TestIcebergToGlueConverter.java @@ -173,7 +173,7 @@ public void testSetTableInputInformation() { PartitionSpec.builderFor(schema).identity("x").withSpecId(1000).build(); TableMetadata tableMetadata = TableMetadata.newTableMetadata(schema, partitionSpec, "s3://test", tableLocationProperties); - IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata); + IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata, false); TableInput actualTableInput = actualTableInputBuilder.build(); // Expected TableInput @@ -239,7 +239,7 @@ public void testSetTableInputInformationWithRemovedColumns() { Schema newSchema = new Schema(Types.NestedField.required(1, "x", Types.StringType.get(), "comment1")); tableMetadata = tableMetadata.updateSchema(newSchema); - IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata); + IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata, false); TableInput actualTableInput = actualTableInputBuilder.build(); // Expected TableInput @@ -300,7 +300,7 @@ public void testSetTableDescription() { TableMetadata.newTableMetadata( schema, PartitionSpec.unpartitioned(), "s3://test", tableProperties); - IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata); + IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata, false); TableInput actualTableInput = actualTableInputBuilder.build(); assertThat(actualTableInput.description()) @@ -335,7 +335,7 @@ public void testSetTableInputInformationWithExistingTable() { .build(); IcebergToGlueConverter.setTableInputInformation( - actualTableInputBuilder, tableMetadata, existingGlueTable); + actualTableInputBuilder, tableMetadata, existingGlueTable, false); TableInput actualTableInput = actualTableInputBuilder.build(); // Expected TableInput