Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -755,4 +755,73 @@ public void testTableLevelS3Tags() {
.containsEntry(S3FileIOProperties.S3_TAG_ICEBERG_TABLE, tableName)
.containsEntry(S3FileIOProperties.S3_TAG_ICEBERG_NAMESPACE, namespace);
}

@Test
public void testDisableNonCurrentFields() {
AwsProperties properties = new AwsProperties();
properties.setGlueNonCurrentFieldsDisabled(true);
glueCatalog.initialize(
CATALOG_NAME,
TEST_BUCKET_PATH,
properties,
new S3FileIOProperties(),
GLUE,
LockManagers.defaultLockManager(),
ImmutableMap.of());
String namespace = getRandomName();

NAMESPACES.add(namespace);
glueCatalog.createNamespace(Namespace.of(namespace));
String tableName = getRandomName();
glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec);
Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
table
.updateSchema()
.addColumn(
"c2",
Types.StructType.of(Types.NestedField.required(3, "z", Types.IntegerType.get())),
"c2")
.addColumn("c3", Types.StringType.get())
.addColumn("c4", Types.StringType.get())
.commit();
table.updateSpec().addField(truncate("c1", 8)).commit();
table.updateSchema().deleteColumn("c3").renameColumn("c4", "c5").commit();

GetTableResponse response =
GLUE.getTable(GetTableRequest.builder().databaseName(namespace).name(tableName).build());
List<Column> actualColumns = response.table().storageDescriptor().columns();

List<Column> expectedColumns =
ImmutableList.of(
Column.builder()
.name("c1")
.type("string")
.comment("c1")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "1",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "false",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("c2")
.type("struct<z:int>")
.comment("c2")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "2",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build(),
Column.builder()
.name("c5")
.type("string")
.parameters(
ImmutableMap.of(
IcebergToGlueConverter.ICEBERG_FIELD_ID, "5",
IcebergToGlueConverter.ICEBERG_FIELD_OPTIONAL, "true",
IcebergToGlueConverter.ICEBERG_FIELD_CURRENT, "true"))
.build());
assertThat(actualColumns).isEqualTo(expectedColumns);
}
}
18 changes: 18 additions & 0 deletions aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public class AwsProperties implements Serializable {
*/
public static final String GLUE_CATALOG_ENDPOINT = "glue.endpoint";

/** Option to disable including non-current fields in Glue Schema */
public static final String GLUE_NON_CURRENT_FIELDS_DISABLED = "glue.non-current-fields-disabled";

public static final boolean GLUE_NON_CURRENT_FIELDS_DISABLED_DEFAULT = false;

/** Configure an alternative endpoint of the DynamoDB service to access. */
public static final String DYNAMODB_ENDPOINT = "dynamodb.endpoint";

Expand Down Expand Up @@ -221,6 +226,7 @@ public class AwsProperties implements Serializable {
private boolean glueCatalogSkipArchive;
private boolean glueCatalogSkipNameValidation;
private boolean glueLakeFormationEnabled;
private boolean glueNonCurrentFieldsDisabled;

private String dynamoDbTableName;
private final String dynamoDbEndpoint;
Expand All @@ -247,6 +253,7 @@ public AwsProperties() {
this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;
this.glueCatalogSkipNameValidation = GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT;
this.glueLakeFormationEnabled = GLUE_LAKEFORMATION_ENABLED_DEFAULT;
this.glueNonCurrentFieldsDisabled = GLUE_NON_CURRENT_FIELDS_DISABLED_DEFAULT;

this.dynamoDbEndpoint = null;
this.dynamoDbTableName = DYNAMODB_TABLE_NAME_DEFAULT;
Expand Down Expand Up @@ -283,6 +290,9 @@ public AwsProperties(Map<String, String> properties) {
this.glueLakeFormationEnabled =
PropertyUtil.propertyAsBoolean(
properties, GLUE_LAKEFORMATION_ENABLED, GLUE_LAKEFORMATION_ENABLED_DEFAULT);
this.glueNonCurrentFieldsDisabled =
PropertyUtil.propertyAsBoolean(
properties, GLUE_NON_CURRENT_FIELDS_DISABLED, GLUE_NON_CURRENT_FIELDS_DISABLED_DEFAULT);

this.dynamoDbEndpoint = properties.get(DYNAMODB_ENDPOINT);
this.dynamoDbTableName =
Expand Down Expand Up @@ -351,6 +361,14 @@ public void setGlueLakeFormationEnabled(boolean glueLakeFormationEnabled) {
this.glueLakeFormationEnabled = glueLakeFormationEnabled;
}

public boolean glueNonCurrentFieldsDisabled() {
return glueNonCurrentFieldsDisabled;
}

public void setGlueNonCurrentFieldsDisabled(boolean glueNonCurrentFieldsDisabled) {
this.glueNonCurrentFieldsDisabled = glueNonCurrentFieldsDisabled;
}

public String dynamoDbTableName() {
return dynamoDbTableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,10 @@ void persistGlueTable(
.applyMutation(
builder ->
IcebergToGlueConverter.setTableInputInformation(
builder, metadata, glueTable))
builder,
metadata,
glueTable,
awsProperties.glueNonCurrentFieldsDisabled()))
.name(tableName)
.tableType(GLUE_EXTERNAL_TABLE_TYPE)
.parameters(parameters)
Expand All @@ -341,7 +344,8 @@ void persistGlueTable(
TableInput.builder()
.applyMutation(
builder ->
IcebergToGlueConverter.setTableInputInformation(builder, metadata))
IcebergToGlueConverter.setTableInputInformation(
builder, metadata, awsProperties.glueNonCurrentFieldsDisabled()))
.name(tableName)
.tableType(GLUE_EXTERNAL_TABLE_TYPE)
.parameters(parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,13 @@ static String getTableName(TableIdentifier tableIdentifier, boolean skipNameVali
*
* @param tableInputBuilder Glue TableInput builder
* @param metadata Iceberg table metadata
* @param disableNonCurrentFields option to disable writing non current fields to Glue table
*/
static void setTableInputInformation(
TableInput.Builder tableInputBuilder, TableMetadata metadata) {
setTableInputInformation(tableInputBuilder, metadata, null);
TableInput.Builder tableInputBuilder,
TableMetadata metadata,
Boolean disableNonCurrentFields) {
setTableInputInformation(tableInputBuilder, metadata, null, disableNonCurrentFields);
}

/**
Expand All @@ -241,9 +244,13 @@ static void setTableInputInformation(
* @param tableInputBuilder Glue TableInput builder
* @param metadata Iceberg table metadata
* @param existingTable optional existing Glue table, used to preserve column comments
* @param disableNonCurrentFields option to disable writing non current fields to Glue table
*/
static void setTableInputInformation(
TableInput.Builder tableInputBuilder, TableMetadata metadata, Table existingTable) {
TableInput.Builder tableInputBuilder,
TableMetadata metadata,
Table existingTable,
Boolean disableNonCurrentFields) {
try {
Map<String, String> properties = metadata.properties();
StorageDescriptor.Builder storageDescriptor = StorageDescriptor.builder();
Expand Down Expand Up @@ -274,7 +281,7 @@ static void setTableInputInformation(
existingColumnMap = Collections.emptyMap();
}

List<Column> columns = toColumns(metadata, existingColumnMap);
List<Column> columns = toColumns(metadata, existingColumnMap, disableNonCurrentFields);

tableInputBuilder.storageDescriptor(
storageDescriptor.location(metadata.location()).columns(columns).build());
Expand Down Expand Up @@ -340,14 +347,20 @@ private static String toTypeString(Type type) {
}

private static List<Column> toColumns(
TableMetadata metadata, Map<String, String> existingColumnMap) {
TableMetadata metadata,
Map<String, String> existingColumnMap,
Boolean disableNonCurrentFields) {
List<Column> columns = Lists.newArrayList();
Set<String> addedNames = Sets.newHashSet();

for (NestedField field : metadata.schema().columns()) {
addColumnWithDedupe(columns, addedNames, field, true /* is current */, existingColumnMap);
}

if (disableNonCurrentFields) {
return columns;
}

for (Schema schema : metadata.schemas()) {
if (schema.schemaId() != metadata.currentSchemaId()) {
for (NestedField field : schema.columns()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void testSetTableInputInformation() {
PartitionSpec.builderFor(schema).identity("x").withSpecId(1000).build();
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(schema, partitionSpec, "s3://test", tableLocationProperties);
IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata);
IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata, false);
TableInput actualTableInput = actualTableInputBuilder.build();

// Expected TableInput
Expand Down Expand Up @@ -239,7 +239,7 @@ public void testSetTableInputInformationWithRemovedColumns() {
Schema newSchema =
new Schema(Types.NestedField.required(1, "x", Types.StringType.get(), "comment1"));
tableMetadata = tableMetadata.updateSchema(newSchema);
IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata);
IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata, false);
TableInput actualTableInput = actualTableInputBuilder.build();

// Expected TableInput
Expand Down Expand Up @@ -300,7 +300,7 @@ public void testSetTableDescription() {
TableMetadata.newTableMetadata(
schema, PartitionSpec.unpartitioned(), "s3://test", tableProperties);

IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata);
IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata, false);
TableInput actualTableInput = actualTableInputBuilder.build();

assertThat(actualTableInput.description())
Expand Down Expand Up @@ -335,7 +335,7 @@ public void testSetTableInputInformationWithExistingTable() {
.build();

IcebergToGlueConverter.setTableInputInformation(
actualTableInputBuilder, tableMetadata, existingGlueTable);
actualTableInputBuilder, tableMetadata, existingGlueTable, false);
TableInput actualTableInput = actualTableInputBuilder.build();

// Expected TableInput
Expand Down