diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java index 41ffa609540b..60561b0f56bf 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -43,20 +43,23 @@ public class CompareSchemasVisitor extends SchemaWithPartnerVisitor { private final Schema tableSchema; + private final boolean dropUnusedColumns; - private CompareSchemasVisitor(Schema tableSchema) { + private CompareSchemasVisitor(Schema tableSchema, boolean dropUnusedColumns) { this.tableSchema = tableSchema; + this.dropUnusedColumns = dropUnusedColumns; } public static Result visit(Schema dataSchema, Schema tableSchema) { - return visit(dataSchema, tableSchema, true); + return visit(dataSchema, tableSchema, true, false); } - public static Result visit(Schema dataSchema, Schema tableSchema, boolean caseSensitive) { + public static Result visit( + Schema dataSchema, Schema tableSchema, boolean caseSensitive, boolean dropUnusedColumns) { return visit( dataSchema, -1, - new CompareSchemasVisitor(tableSchema), + new CompareSchemasVisitor(tableSchema, dropUnusedColumns), new PartnerIdByNameAccessors(tableSchema, caseSensitive)); } @@ -70,6 +73,7 @@ public Result schema(Schema dataSchema, Integer tableSchemaId, Result downstream } @Override + @SuppressWarnings("CyclomaticComplexity") public Result struct(Types.StructType struct, Integer tableSchemaId, List fields) { if (tableSchemaId == null) { return Result.SCHEMA_UPDATE_NEEDED; @@ -88,10 +92,10 @@ public Result struct(Types.StructType struct, Integer tableSchemaId, List { private ReadableConfig readableConfig = new Configuration(); private TableCreator tableCreator = TableCreator.DEFAULT; private boolean immediateUpdate = false; + private boolean dropUnusedColumns = false; private int cacheMaximumSize = 100; private long cacheRefreshMs = 1_000; private int inputSchemasPerTableCacheMaximumSize = 10; @@ -314,6 +315,22 @@ public Builder immediateTableUpdate(boolean newImmediateUpdate) { return this; } + /** + * Dropping columns is disabled by default to prevent issues with late or out-of-order data, as + * removed fields cannot be easily restored without data loss. + * + *

You can opt-in to allow dropping columns. Once a column has been dropped, it is + * technically still possible to write data to that column because Iceberg maintains all past + * table schemas. However, regular queries won't be able to reference the column. If the field + * was to re-appear as part of a new schema, an entirely new column would be added, which apart + * from the name, has nothing in common with the old column, i.e. queries for the new column + * will never return data of the old column. + */ + public Builder dropUnusedColumns(boolean newDropUnusedColumns) { + this.dropUnusedColumns = newDropUnusedColumns; + return this; + } + /** Maximum size of the caches used in Dynamic Sink for table data and serializers. */ public Builder cacheMaxSize(int maxSize) { this.cacheMaximumSize = maxSize; @@ -382,6 +399,7 @@ public DataStreamSink append() { generator, catalogLoader, immediateUpdate, + dropUnusedColumns, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize, @@ -400,6 +418,7 @@ public DataStreamSink append() { .map( new DynamicTableUpdateOperator( catalogLoader, + dropUnusedColumns, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize, diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index bc3a25468d84..427aa6ceafba 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -40,6 +40,7 @@ class DynamicRecordProcessor extends ProcessFunction generator; private final CatalogLoader catalogLoader; private final boolean immediateUpdate; + private final boolean dropUnusedColumns; private final int cacheMaximumSize; private final long cacheRefreshMs; private final int inputSchemasPerTableCacheMaximumSize; @@ -56,6 +57,7 @@ class DynamicRecordProcessor extends ProcessFunction generator, CatalogLoader catalogLoader, boolean immediateUpdate, + boolean dropUnusedColumns, int cacheMaximumSize, long cacheRefreshMs, int inputSchemasPerTableCacheMaximumSize, @@ -63,6 +65,7 @@ class DynamicRecordProcessor extends ProcessFunction( @@ -106,7 +109,7 @@ public void collect(DynamicRecord data) { TableMetadataCache.ResolvedSchemaInfo foundSchema = exists - ? tableCache.schema(data.tableIdentifier(), data.schema()) + ? tableCache.schema(data.tableIdentifier(), data.schema(), dropUnusedColumns) : TableMetadataCache.NOT_FOUND; PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(), data.spec()) : null; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java index 586239b54bca..8f38d4f8be0c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java @@ -38,6 +38,7 @@ class DynamicTableUpdateOperator extends RichMapFunction { private final CatalogLoader catalogLoader; + private final boolean dropUnusedColumns; private final int cacheMaximumSize; private final long cacheRefreshMs; private final int inputSchemasPerTableCacheMaximumSize; @@ -47,11 +48,13 @@ class DynamicTableUpdateOperator DynamicTableUpdateOperator( CatalogLoader catalogLoader, + boolean dropUnusedColumns, int cacheMaximumSize, long cacheRefreshMs, int inputSchemasPerTableCacheMaximumSize, TableCreator tableCreator) { this.catalogLoader = catalogLoader; + this.dropUnusedColumns = dropUnusedColumns; this.cacheMaximumSize = cacheMaximumSize; this.cacheRefreshMs = cacheRefreshMs; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; @@ -66,7 +69,8 @@ public void open(OpenContext openContext) throws Exception { new TableUpdater( new TableMetadataCache( catalog, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize), - catalog); + catalog, + dropUnusedColumns); } @Override diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java index ee0549997178..e106cf5754b3 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java @@ -21,10 +21,13 @@ import java.util.List; import org.apache.iceberg.Schema; import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.schema.SchemaWithPartnerVisitor; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Visitor class that accumulates the set of changes needed to evolve an existing schema into the @@ -36,30 +39,39 @@ *

  • Adding new columns *
  • Widening the type of existing columsn *
  • Reordering columns + *
  • Dropping columns (when dropUnusedColumns is enabled) * * * We don't support: * *
      - *
    • Dropping columns *
    • Renaming columns *
    * - * The reason is that dropping columns would create issues with late / out of order data. Once we - * drop fields, we wouldn't be able to easily add them back later without losing the associated - * data. Renaming columns is not supported because we compare schemas by name, which doesn't allow - * for renaming without additional hints. + * By default, any columns present in the table but absent from the input schema are marked as + * optional to prevent issues caused by late or out-of-order data. If dropUnusedColumns is enabled, + * these columns are removed instead to ensure a strict one-to-one schema alignment. */ public class EvolveSchemaVisitor extends SchemaWithPartnerVisitor { + private static final Logger LOG = LoggerFactory.getLogger(EvolveSchemaVisitor.class); + private final TableIdentifier identifier; private final UpdateSchema api; private final Schema existingSchema; private final Schema targetSchema; - - private EvolveSchemaVisitor(UpdateSchema api, Schema existingSchema, Schema targetSchema) { + private final boolean dropUnusedColumns; + + private EvolveSchemaVisitor( + TableIdentifier identifier, + UpdateSchema api, + Schema existingSchema, + Schema targetSchema, + boolean dropUnusedColumns) { + this.identifier = identifier; this.api = api; this.existingSchema = existingSchema; this.targetSchema = targetSchema; + this.dropUnusedColumns = dropUnusedColumns; } /** @@ -70,12 +82,18 @@ private EvolveSchemaVisitor(UpdateSchema api, Schema existingSchema, Schema targ * @param api an UpdateSchema for adding changes * @param existingSchema an existing schema * @param targetSchema a new schema to compare with the existing + * @param dropUnusedColumns whether to drop columns not present in target schema */ - public static void visit(UpdateSchema api, Schema existingSchema, Schema targetSchema) { + public static void visit( + TableIdentifier identifier, + UpdateSchema api, + Schema existingSchema, + Schema targetSchema, + boolean dropUnusedColumns) { visit( targetSchema, -1, - new EvolveSchemaVisitor(api, existingSchema, targetSchema), + new EvolveSchemaVisitor(identifier, api, existingSchema, targetSchema, dropUnusedColumns), new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema)); } @@ -103,11 +121,16 @@ public Boolean struct(Types.StructType struct, Integer partnerId, List after = columnName; } - // Ensure that unused fields are made optional for (Types.NestedField existingField : partnerStruct.fields()) { if (struct.field(existingField.name()) == null) { - if (existingField.isRequired()) { - this.api.makeColumnOptional(this.existingSchema.findColumnName(existingField.fieldId())); + String columnName = this.existingSchema.findColumnName(existingField.fieldId()); + if (dropUnusedColumns) { + LOG.debug("{}: Dropping column: {}", identifier.name(), columnName); + this.api.deleteColumn(columnName); + } else { + if (existingField.isRequired()) { + this.api.makeColumnOptional(columnName); + } } } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 2c08a3486e7c..8a8362a41996 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -89,8 +89,8 @@ String branch(TableIdentifier identifier, String branch) { return branch(identifier, branch, true); } - ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input) { - return schema(identifier, input, true); + ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input, boolean dropUnusedColumns) { + return schema(identifier, input, true, dropUnusedColumns); } PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) { @@ -124,7 +124,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe } private ResolvedSchemaInfo schema( - TableIdentifier identifier, Schema input, boolean allowRefresh) { + TableIdentifier identifier, Schema input, boolean allowRefresh, boolean dropUnusedColumns) { CacheItem cached = tableCache.get(identifier); Schema compatible = null; if (cached != null && cached.tableExists) { @@ -139,7 +139,7 @@ private ResolvedSchemaInfo schema( for (Map.Entry tableSchema : cached.tableSchemas.entrySet()) { CompareSchemasVisitor.Result result = - CompareSchemasVisitor.visit(input, tableSchema.getValue(), true); + CompareSchemasVisitor.visit(input, tableSchema.getValue(), true, dropUnusedColumns); if (result == CompareSchemasVisitor.Result.SAME) { ResolvedSchemaInfo newResult = new ResolvedSchemaInfo( @@ -157,7 +157,7 @@ private ResolvedSchemaInfo schema( if (needsRefresh(cached, allowRefresh)) { refreshTable(identifier); - return schema(identifier, input, false); + return schema(identifier, input, false, dropUnusedColumns); } else if (compatible != null) { ResolvedSchemaInfo newResult = new ResolvedSchemaInfo( diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index cadfe345980c..d8809efbe541 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -43,10 +43,12 @@ class TableUpdater { private static final Logger LOG = LoggerFactory.getLogger(TableUpdater.class); private final TableMetadataCache cache; private final Catalog catalog; + private final boolean dropUnusedColumns; - TableUpdater(TableMetadataCache cache, Catalog catalog) { + TableUpdater(TableMetadataCache cache, Catalog catalog, boolean dropUnusedColumns) { this.cache = cache; this.catalog = catalog; + this.dropUnusedColumns = dropUnusedColumns; } /** @@ -118,13 +120,15 @@ private void findOrCreateBranch(TableIdentifier identifier, String branch) { private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema( TableIdentifier identifier, Schema schema) { - TableMetadataCache.ResolvedSchemaInfo fromCache = cache.schema(identifier, schema); + TableMetadataCache.ResolvedSchemaInfo fromCache = + cache.schema(identifier, schema, dropUnusedColumns); if (fromCache.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { return fromCache; } else { Table table = catalog.loadTable(identifier); Schema tableSchema = table.schema(); - CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(schema, tableSchema, true); + CompareSchemasVisitor.Result result = + CompareSchemasVisitor.visit(schema, tableSchema, true, dropUnusedColumns); switch (result) { case SAME: cache.update(identifier, table); @@ -141,19 +145,20 @@ private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema( LOG.info( "Triggering schema update for table {} {} to {}", identifier, tableSchema, schema); UpdateSchema updateApi = table.updateSchema(); - EvolveSchemaVisitor.visit(updateApi, tableSchema, schema); + EvolveSchemaVisitor.visit(identifier, updateApi, tableSchema, schema, dropUnusedColumns); try { updateApi.commit(); cache.update(identifier, table); TableMetadataCache.ResolvedSchemaInfo comparisonAfterMigration = - cache.schema(identifier, schema); + cache.schema(identifier, schema, dropUnusedColumns); Schema newSchema = comparisonAfterMigration.resolvedTableSchema(); LOG.info("Table {} schema updated from {} to {}", identifier, tableSchema, newSchema); return comparisonAfterMigration; } catch (CommitFailedException e) { cache.invalidate(identifier); - TableMetadataCache.ResolvedSchemaInfo newSchema = cache.schema(identifier, schema); + TableMetadataCache.ResolvedSchemaInfo newSchema = + cache.schema(identifier, schema, dropUnusedColumns); if (newSchema.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { LOG.debug("Table {} schema updated concurrently to {}", identifier, schema); return newSchema; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java index 385a354889fb..cc8e6898d2ed 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java @@ -33,6 +33,9 @@ class TestCompareSchemasVisitor { + private static final boolean DROP_COLUMNS = true; + private static final boolean PRESERVE_COLUMNS = false; + @Test void testSchema() { assertThat( @@ -226,4 +229,63 @@ void testListChanged() { optional(2, "list1", ListType.ofOptional(3, IntegerType.get()))))) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } + + @Test + void testDropUnusedColumnsEnabled() { + Schema dataSchema = new Schema(optional(1, "id", IntegerType.get())); + Schema tableSchema = + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())); + + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testDropUnusedColumnsWithRequiredField() { + Schema dataSchema = new Schema(optional(1, "id", IntegerType.get())); + Schema tableSchema = + new Schema(optional(1, "id", IntegerType.get()), required(2, "data", StringType.get())); + + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testDropUnusedColumnsWhenInputHasMoreFields() { + Schema dataSchema = + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())); + Schema tableSchema = new Schema(optional(1, "id", IntegerType.get())); + + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testDropUnusedColumnsInNestedStruct() { + Schema dataSchema = + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "struct1", StructType.of(optional(3, "field1", StringType.get())))); + Schema tableSchema = + new Schema( + optional(1, "id", IntegerType.get()), + optional( + 2, + "struct1", + StructType.of( + optional(3, "field1", StringType.get()), + optional(4, "field2", IntegerType.get())))); + + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, PRESERVE_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index b660d8e285d9..2711df72cab1 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -822,6 +822,55 @@ void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) throws assertThat(totalAddedRecords).isEqualTo(records.size()); } + @Test + void testOptInDropUnusedColumns() throws Exception { + Schema schema1 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "extra", Types.StringType.get())); + + Schema schema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE, "t1"); + catalog.createTable(tableIdentifier, schema1); + + List rows = + Lists.newArrayList( + // Drop columns + new DynamicIcebergDataImpl(schema2, "t1", "main", PartitionSpec.unpartitioned()), + // Re-add columns + new DynamicIcebergDataImpl(schema1, "t1", "main", PartitionSpec.unpartitioned())); + + DataStream dataStream = + env.fromData(rows, TypeInformation.of(new TypeHint<>() {})); + env.setParallelism(1); + + DynamicIcebergSink.forInput(dataStream) + .generator(new Generator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .immediateTableUpdate(true) + .dropUnusedColumns(true) + .append(); + + env.execute("Test Drop Unused Columns"); + + Table table = catalog.loadTable(tableIdentifier); + table.refresh(); + + assertThat(table.schema().columns()).hasSize(2); + assertThat(table.schema().findField("id")).isNotNull(); + assertThat(table.schema().findField("data")).isNotNull(); + assertThat(table.schema().findField("extra")).isNull(); + + List records = Lists.newArrayList(IcebergGenerics.read(table).build()); + assertThat(records).hasSize(2); + } + /** * Represents a concurrent duplicate commit during an ongoing commit operation, which can happen * in production scenarios when using REST catalog. diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java index 5ee766231b9d..5745d54c73ca 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java @@ -60,6 +60,7 @@ void testDynamicTableUpdateOperatorNewTable() throws Exception { DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( CATALOG_EXTENSION.catalogLoader(), + false, cacheMaximumSize, cacheRefreshMs, inputSchemaCacheMaximumSize, @@ -93,6 +94,7 @@ void testDynamicTableUpdateOperatorSchemaChange() throws Exception { DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( CATALOG_EXTENSION.catalogLoader(), + false, cacheMaximumSize, cacheRefreshMs, inputSchemaCacheMaximumSize, @@ -120,4 +122,84 @@ void testDynamicTableUpdateOperatorSchemaChange() throws Exception { assertThat(output2).isEqualTo(output); assertThat(catalog.loadTable(table).schema().schemaId()).isEqualTo(output.schema().schemaId()); } + + @Test + void testDynamicTableUpdateOperatorPreserveUnusedColumns() throws Exception { + int cacheMaximumSize = 10; + int cacheRefreshMs = 1000; + int inputSchemaCacheMaximumSize = 10; + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier table = TableIdentifier.of(TABLE); + + DynamicTableUpdateOperator operator = + new DynamicTableUpdateOperator( + CATALOG_EXTENSION.catalogLoader(), + false, // dropUnusedColumns = false (default) + cacheMaximumSize, + cacheRefreshMs, + inputSchemaCacheMaximumSize, + TableCreator.DEFAULT); + operator.open((OpenContext) null); + + catalog.createTable(table, SCHEMA2); + + DynamicRecordInternal input = + new DynamicRecordInternal( + TABLE, + "branch", + SCHEMA1, + GenericRowData.of(1), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + DynamicRecordInternal output = operator.map(input); + + Schema tableSchema = catalog.loadTable(table).schema(); + assertThat(tableSchema.columns()).hasSize(2); + assertThat(tableSchema.findField("id")).isNotNull(); + assertThat(tableSchema.findField("data")).isNotNull(); + assertThat(tableSchema.findField("data").isOptional()).isTrue(); + assertThat(input).isEqualTo(output); + } + + @Test + void testDynamicTableUpdateOperatorDropUnusedColumns() throws Exception { + int cacheMaximumSize = 10; + int cacheRefreshMs = 1000; + int inputSchemaCacheMaximumSize = 10; + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier table = TableIdentifier.of(TABLE); + + DynamicTableUpdateOperator operator = + new DynamicTableUpdateOperator( + CATALOG_EXTENSION.catalogLoader(), + // Drop unused columns + true, + cacheMaximumSize, + cacheRefreshMs, + inputSchemaCacheMaximumSize, + TableCreator.DEFAULT); + operator.open((OpenContext) null); + + catalog.createTable(table, SCHEMA2); + + DynamicRecordInternal input = + new DynamicRecordInternal( + TABLE, + "branch", + SCHEMA1, + GenericRowData.of(1), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + DynamicRecordInternal output = operator.map(input); + + Schema tableSchema = catalog.loadTable(table).schema(); + assertThat(tableSchema.columns()).hasSize(1); + assertThat(tableSchema.findField("id")).isNotNull(); + assertThat(tableSchema.findField("data")).isNull(); + assertThat(input).isEqualTo(output); + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java index d416e7ec1fc6..027adc4031bd 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.Schema; import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types; @@ -48,6 +49,10 @@ public class TestEvolveSchemaVisitor { + private static final TableIdentifier TABLE = TableIdentifier.of("table"); + private static final boolean DROP_COLUMNS = true; + private static final boolean PRESERVE_COLUMNS = false; + private static List primitiveTypes() { return Lists.newArrayList( StringType.get(), @@ -89,7 +94,7 @@ private static Types.NestedField[] primitiveFields( public void testAddTopLevelPrimitives() { Schema targetSchema = new Schema(primitiveFields(0, primitiveTypes())); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); assertThat(targetSchema.asStruct()).isEqualTo(updateApi.apply().asStruct()); } @@ -99,12 +104,59 @@ public void testMakeTopLevelPrimitivesOptional() { assertThat(existingSchema.columns().stream().allMatch(Types.NestedField::isRequired)).isTrue(); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(updateApi, existingSchema, new Schema()); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, new Schema(), PRESERVE_COLUMNS); Schema newSchema = updateApi.apply(); assertThat(newSchema.asStruct().fields()).hasSize(14); assertThat(newSchema.columns().stream().allMatch(Types.NestedField::isOptional)).isTrue(); } + @Test + public void testDropUnusedColumns() { + Schema existingSchema = + new Schema( + optional(1, "a", StringType.get()), + optional( + 2, + "b", + StructType.of( + optional(4, "nested1", StringType.get()), + optional(5, "nested2", StringType.get()))), + optional(3, "c", IntegerType.get())); + + Schema targetSchema = + new Schema( + optional(1, "a", StringType.get()), + optional(2, "b", StructType.of(optional(5, "nested2", StringType.get())))); + + UpdateSchema updateApi = loadUpdateApi(existingSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, DROP_COLUMNS); + + Schema newSchema = updateApi.apply(); + assertThat(newSchema.sameSchema(targetSchema)).isTrue(); + } + + @Test + public void testPreserveUnusedColumns() { + Schema existingSchema = + new Schema( + optional(1, "a", StringType.get()), + optional( + 2, + "b", + StructType.of( + optional(4, "nested1", StringType.get()), + optional(5, "nested2", StringType.get()))), + optional(3, "c", IntegerType.get())); + + Schema targetSchema = new Schema(optional(1, "a", StringType.get())); + + UpdateSchema updateApi = loadUpdateApi(existingSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); + + Schema newSchema = updateApi.apply(); + assertThat(newSchema.sameSchema(existingSchema)).isTrue(); + } + @Test public void testIdentifyFieldsByName() { Schema existingSchema = @@ -112,7 +164,7 @@ public void testIdentifyFieldsByName() { UpdateSchema updateApi = loadUpdateApi(existingSchema); Schema newSchema = new Schema(Arrays.asList(Types.NestedField.optional(-1, "myField", Types.LongType.get()))); - EvolveSchemaVisitor.visit(updateApi, existingSchema, newSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, newSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().sameSchema(existingSchema)).isTrue(); } @@ -125,7 +177,7 @@ public void testChangeOrderTopLevelPrimitives() { new Schema( Arrays.asList(optional(2, "b", StringType.get()), optional(1, "a", StringType.get()))); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -134,7 +186,7 @@ public void testAddTopLevelListOfPrimitives() { for (PrimitiveType primitiveType : primitiveTypes()) { Schema targetSchema = new Schema(optional(1, "aList", ListType.ofOptional(2, primitiveType))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -146,7 +198,7 @@ public void testMakeTopLevelListOfPrimitivesOptional() { new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); Schema targetSchema = new Schema(); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); Schema expectedSchema = new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); assertThat(updateApi.apply().asStruct()).isEqualTo(expectedSchema.asStruct()); @@ -159,7 +211,7 @@ public void testAddTopLevelMapOfPrimitives() { Schema targetSchema = new Schema(optional(1, "aMap", MapType.ofOptional(2, 3, primitiveType, primitiveType))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -171,7 +223,7 @@ public void testAddTopLevelStructOfPrimitives() { new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), currentSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), currentSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(currentSchema.asStruct()); } } @@ -184,7 +236,7 @@ public void testAddNestedPrimitive() { new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -199,7 +251,7 @@ public void testMakeNestedPrimitiveOptional() { new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -210,7 +262,7 @@ public void testAddNestedPrimitives() { Schema targetSchema = new Schema(optional(1, "aStruct", StructType.of(primitiveFields(1, primitiveTypes())))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -240,7 +292,7 @@ public void testAddNestedLists() { ListType.ofOptional( 10, DecimalType.of(11, 20)))))))))))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -279,7 +331,7 @@ public void testAddNestedStruct() { "aString", StringType.get())))))))))))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -314,7 +366,7 @@ public void testAddNestedMaps() { 12, 13, StringType.get(), StringType.get())))))))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -326,7 +378,11 @@ public void testDetectInvalidTopLevelList() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - loadUpdateApi(currentSchema), currentSchema, targetSchema)) + TABLE, + loadUpdateApi(currentSchema), + currentSchema, + targetSchema, + PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aList.element: string -> long") .isInstanceOf(IllegalArgumentException.class); } @@ -343,7 +399,11 @@ public void testDetectInvalidTopLevelMapValue() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - loadUpdateApi(currentSchema), currentSchema, targetSchema)) + TABLE, + loadUpdateApi(currentSchema), + currentSchema, + targetSchema, + PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aMap.value: string -> long") .isInstanceOf(IllegalArgumentException.class); } @@ -358,7 +418,11 @@ public void testDetectInvalidTopLevelMapKey() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - loadUpdateApi(currentSchema), currentSchema, targetSchema)) + TABLE, + loadUpdateApi(currentSchema), + currentSchema, + targetSchema, + PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aMap.key: string -> uuid") .isInstanceOf(IllegalArgumentException.class); } @@ -370,7 +434,7 @@ public void testTypePromoteIntegerToLong() { Schema targetSchema = new Schema(required(1, "aCol", LongType.get())); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); Schema applied = updateApi.apply(); assertThat(applied.asStruct().fields()).hasSize(1); assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(LongType.get()); @@ -383,7 +447,7 @@ public void testTypePromoteFloatToDouble() { Schema targetSchema = new Schema(required(1, "aCol", DoubleType.get())); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); Schema applied = updateApi.apply(); assertThat(applied.asStruct().fields()).hasSize(1); assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(DoubleType.get()); @@ -396,7 +460,11 @@ public void testInvalidTypePromoteDoubleToFloat() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - loadUpdateApi(currentSchema), currentSchema, targetSchema)) + TABLE, + loadUpdateApi(currentSchema), + currentSchema, + targetSchema, + PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aCol: double -> float") .isInstanceOf(IllegalArgumentException.class); } @@ -409,7 +477,7 @@ public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() { Schema targetSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -452,7 +520,7 @@ public void testAddPrimitiveToNestedStruct() { optional(6, "time", TimeType.get()))))))))); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -464,7 +532,11 @@ public void testReplaceListWithPrimitive() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - loadUpdateApi(currentSchema), currentSchema, targetSchema)) + TABLE, + loadUpdateApi(currentSchema), + currentSchema, + targetSchema, + PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aColumn: list -> string") .isInstanceOf(IllegalArgumentException.class); } @@ -501,7 +573,7 @@ public void addNewTopLevelStruct() { optional(7, "d1", StructType.of(optional(8, "d2", StringType.get())))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -553,7 +625,7 @@ public void testAppendNestedStruct() { StringType.get())))))))))))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -573,7 +645,7 @@ public void testMakeNestedStructOptional() { optional( 3, "s3", StructType.of(optional(4, "s4", StringType.get())))))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(getNestedSchemaWithOptionalModifier(true).asStruct()) .isEqualTo(updateApi.apply().asStruct()); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index bf5b9f562f9a..7f91d2f8d585 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -53,17 +53,22 @@ void testCaching() { catalog.createTable(tableIdentifier, SCHEMA); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); + Schema schema1 = cache.schema(tableIdentifier, SCHEMA, false).resolvedTableSchema(); assertThat(schema1.sameSchema(SCHEMA)).isTrue(); assertThat( - cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).resolvedTableSchema()) + cache + .schema(tableIdentifier, SerializationUtils.clone(SCHEMA), false) + .resolvedTableSchema()) .isEqualTo(schema1); - assertThat(cache.schema(tableIdentifier, SCHEMA2)).isEqualTo(TableMetadataCache.NOT_FOUND); + assertThat(cache.schema(tableIdentifier, SCHEMA2, false)) + .isEqualTo(TableMetadataCache.NOT_FOUND); - schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); + schema1 = cache.schema(tableIdentifier, SCHEMA, false).resolvedTableSchema(); assertThat( - cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).resolvedTableSchema()) + cache + .schema(tableIdentifier, SerializationUtils.clone(SCHEMA), false) + .resolvedTableSchema()) .isEqualTo(schema1); } @@ -73,9 +78,9 @@ void testCacheInvalidationAfterSchemaChange() { TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); - Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); + Schema schema1 = cache.schema(tableIdentifier, SCHEMA, false).resolvedTableSchema(); assertThat(schema1.sameSchema(SCHEMA)).isTrue(); catalog.dropTable(tableIdentifier); @@ -83,7 +88,7 @@ void testCacheInvalidationAfterSchemaChange() { tableUpdater.update( tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned(), TableCreator.DEFAULT); - Schema schema2 = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); + Schema schema2 = cache.schema(tableIdentifier, SCHEMA2, false).resolvedTableSchema(); assertThat(schema2.sameSchema(SCHEMA2)).isTrue(); } @@ -111,11 +116,11 @@ void testNoCacheRefreshingBeforeRefreshIntervalElapses() { cache.update(tableIdentifier, table); // Cache schema - Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); + Schema schema = cache.schema(tableIdentifier, SCHEMA2, false).resolvedTableSchema(); assertThat(schema.sameSchema(SCHEMA2)).isTrue(); // Cache schema with fewer fields - TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA); + TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA, false); assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue(); assertThat(schemaInfo.compareResult()) .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index 1d4461698746..bcc5d8064517 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -56,7 +56,7 @@ void testTableCreation(@TempDir Path tempDir) { catalog.createNamespace(Namespace.of("myNamespace")); TableIdentifier tableIdentifier = TableIdentifier.parse("myNamespace.myTable"); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); String locationOverride = tempDir.toString() + "/custom-path"; Map tableProperties = Map.of("key", "value"); @@ -69,7 +69,8 @@ void testTableCreation(@TempDir Path tempDir) { assertThat(catalog.tableExists(tableIdentifier)).isTrue(); assertThat(catalog.loadTable(tableIdentifier).properties().get("key")).isEqualTo("value"); assertThat(catalog.loadTable(tableIdentifier).location()).isEqualTo(locationOverride); - TableMetadataCache.ResolvedSchemaInfo cachedSchema = cache.schema(tableIdentifier, SCHEMA); + TableMetadataCache.ResolvedSchemaInfo cachedSchema = + cache.schema(tableIdentifier, SCHEMA, false); assertThat(cachedSchema.resolvedTableSchema().sameSchema(SCHEMA)).isTrue(); } @@ -78,7 +79,7 @@ void testTableAlreadyExists() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); // Make the table non-existent in cache cache.exists(tableIdentifier); @@ -98,7 +99,7 @@ void testBranchCreationAndCaching() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); catalog.createTable(tableIdentifier, SCHEMA); tableUpdater.update( @@ -116,7 +117,7 @@ void testSpecCreation() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("data", 10).build(); tableUpdater.update(tableIdentifier, "main", SCHEMA, spec, TableCreator.DEFAULT); @@ -132,8 +133,8 @@ void testInvalidateOldCacheEntryOnUpdate() { TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - cache.schema(tableIdentifier, SCHEMA); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + cache.schema(tableIdentifier, SCHEMA, false); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); Schema updated = tableUpdater @@ -146,7 +147,8 @@ void testInvalidateOldCacheEntryOnUpdate() { .f0 .resolvedTableSchema(); assertThat(updated.sameSchema(SCHEMA2)).isTrue(); - assertThat(cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema().sameSchema(SCHEMA2)) + assertThat( + cache.schema(tableIdentifier, SCHEMA2, false).resolvedTableSchema().sameSchema(SCHEMA2)) .isTrue(); } @@ -156,7 +158,7 @@ void testLastResultInvalidation() { TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); // Initialize cache tableUpdater.update( @@ -167,7 +169,7 @@ void testLastResultInvalidation() { catalog.createTable(tableIdentifier, SCHEMA2); // Cache still stores the old information - assertThat(cache.schema(tableIdentifier, SCHEMA2).compareResult()) + assertThat(cache.schema(tableIdentifier, SCHEMA2, false).compareResult()) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); assertThat( @@ -186,4 +188,27 @@ void testLastResultInvalidation() { assertThat(cache.getInternalCache().get(tableIdentifier).inputSchemas()) .doesNotContainKey(SCHEMA2); } + + @Test + void testDropUnusedColumns() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); + + final boolean dropUnusedColumns = true; + TableUpdater tableUpdater = new TableUpdater(cache, catalog, dropUnusedColumns); + + catalog.createTable(tableIdentifier, SCHEMA2); + + Tuple2 result = + tableUpdater.update( + tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT); + + assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME); + Schema tableSchema = catalog.loadTable(tableIdentifier).schema(); + assertThat(tableSchema.columns()).hasSize(2); + assertThat(tableSchema.findField("id")).isNotNull(); + assertThat(tableSchema.findField("data")).isNotNull(); + assertThat(tableSchema.findField("extra")).isNull(); + } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java index 41ffa609540b..60561b0f56bf 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -43,20 +43,23 @@ public class CompareSchemasVisitor extends SchemaWithPartnerVisitor { private final Schema tableSchema; + private final boolean dropUnusedColumns; - private CompareSchemasVisitor(Schema tableSchema) { + private CompareSchemasVisitor(Schema tableSchema, boolean dropUnusedColumns) { this.tableSchema = tableSchema; + this.dropUnusedColumns = dropUnusedColumns; } public static Result visit(Schema dataSchema, Schema tableSchema) { - return visit(dataSchema, tableSchema, true); + return visit(dataSchema, tableSchema, true, false); } - public static Result visit(Schema dataSchema, Schema tableSchema, boolean caseSensitive) { + public static Result visit( + Schema dataSchema, Schema tableSchema, boolean caseSensitive, boolean dropUnusedColumns) { return visit( dataSchema, -1, - new CompareSchemasVisitor(tableSchema), + new CompareSchemasVisitor(tableSchema, dropUnusedColumns), new PartnerIdByNameAccessors(tableSchema, caseSensitive)); } @@ -70,6 +73,7 @@ public Result schema(Schema dataSchema, Integer tableSchemaId, Result downstream } @Override + @SuppressWarnings("CyclomaticComplexity") public Result struct(Types.StructType struct, Integer tableSchemaId, List fields) { if (tableSchemaId == null) { return Result.SCHEMA_UPDATE_NEEDED; @@ -88,10 +92,10 @@ public Result struct(Types.StructType struct, Integer tableSchemaId, List { private ReadableConfig readableConfig = new Configuration(); private TableCreator tableCreator = TableCreator.DEFAULT; private boolean immediateUpdate = false; + private boolean dropUnusedColumns = false; private int cacheMaximumSize = 100; private long cacheRefreshMs = 1_000; private int inputSchemasPerTableCacheMaximumSize = 10; @@ -314,6 +315,22 @@ public Builder immediateTableUpdate(boolean newImmediateUpdate) { return this; } + /** + * Dropping columns is disabled by default to prevent issues with late or out-of-order data, as + * removed fields cannot be easily restored without data loss. + * + *

    You can opt-in to allow dropping columns. Once a column has been dropped, it is + * technically still possible to write data to that column because Iceberg maintains all past + * table schemas. However, regular queries won't be able to reference the column. If the field + * was to re-appear as part of a new schema, an entirely new column would be added, which apart + * from the name, has nothing in common with the old column, i.e. queries for the new column + * will never return data of the old column. + */ + public Builder dropUnusedColumns(boolean newDropUnusedColumns) { + this.dropUnusedColumns = newDropUnusedColumns; + return this; + } + /** Maximum size of the caches used in Dynamic Sink for table data and serializers. */ public Builder cacheMaxSize(int maxSize) { this.cacheMaximumSize = maxSize; @@ -382,6 +399,7 @@ public DataStreamSink append() { generator, catalogLoader, immediateUpdate, + dropUnusedColumns, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize, @@ -400,6 +418,7 @@ public DataStreamSink append() { .map( new DynamicTableUpdateOperator( catalogLoader, + dropUnusedColumns, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize, diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index bc3a25468d84..427aa6ceafba 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -40,6 +40,7 @@ class DynamicRecordProcessor extends ProcessFunction generator; private final CatalogLoader catalogLoader; private final boolean immediateUpdate; + private final boolean dropUnusedColumns; private final int cacheMaximumSize; private final long cacheRefreshMs; private final int inputSchemasPerTableCacheMaximumSize; @@ -56,6 +57,7 @@ class DynamicRecordProcessor extends ProcessFunction generator, CatalogLoader catalogLoader, boolean immediateUpdate, + boolean dropUnusedColumns, int cacheMaximumSize, long cacheRefreshMs, int inputSchemasPerTableCacheMaximumSize, @@ -63,6 +65,7 @@ class DynamicRecordProcessor extends ProcessFunction( @@ -106,7 +109,7 @@ public void collect(DynamicRecord data) { TableMetadataCache.ResolvedSchemaInfo foundSchema = exists - ? tableCache.schema(data.tableIdentifier(), data.schema()) + ? tableCache.schema(data.tableIdentifier(), data.schema(), dropUnusedColumns) : TableMetadataCache.NOT_FOUND; PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(), data.spec()) : null; diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java index 586239b54bca..8f38d4f8be0c 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java @@ -38,6 +38,7 @@ class DynamicTableUpdateOperator extends RichMapFunction { private final CatalogLoader catalogLoader; + private final boolean dropUnusedColumns; private final int cacheMaximumSize; private final long cacheRefreshMs; private final int inputSchemasPerTableCacheMaximumSize; @@ -47,11 +48,13 @@ class DynamicTableUpdateOperator DynamicTableUpdateOperator( CatalogLoader catalogLoader, + boolean dropUnusedColumns, int cacheMaximumSize, long cacheRefreshMs, int inputSchemasPerTableCacheMaximumSize, TableCreator tableCreator) { this.catalogLoader = catalogLoader; + this.dropUnusedColumns = dropUnusedColumns; this.cacheMaximumSize = cacheMaximumSize; this.cacheRefreshMs = cacheRefreshMs; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; @@ -66,7 +69,8 @@ public void open(OpenContext openContext) throws Exception { new TableUpdater( new TableMetadataCache( catalog, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize), - catalog); + catalog, + dropUnusedColumns); } @Override diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java index ee0549997178..e106cf5754b3 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java @@ -21,10 +21,13 @@ import java.util.List; import org.apache.iceberg.Schema; import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.schema.SchemaWithPartnerVisitor; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Visitor class that accumulates the set of changes needed to evolve an existing schema into the @@ -36,30 +39,39 @@ *

  • Adding new columns *
  • Widening the type of existing columsn *
  • Reordering columns + *
  • Dropping columns (when dropUnusedColumns is enabled) * * * We don't support: * *
      - *
    • Dropping columns *
    • Renaming columns *
    * - * The reason is that dropping columns would create issues with late / out of order data. Once we - * drop fields, we wouldn't be able to easily add them back later without losing the associated - * data. Renaming columns is not supported because we compare schemas by name, which doesn't allow - * for renaming without additional hints. + * By default, any columns present in the table but absent from the input schema are marked as + * optional to prevent issues caused by late or out-of-order data. If dropUnusedColumns is enabled, + * these columns are removed instead to ensure a strict one-to-one schema alignment. */ public class EvolveSchemaVisitor extends SchemaWithPartnerVisitor { + private static final Logger LOG = LoggerFactory.getLogger(EvolveSchemaVisitor.class); + private final TableIdentifier identifier; private final UpdateSchema api; private final Schema existingSchema; private final Schema targetSchema; - - private EvolveSchemaVisitor(UpdateSchema api, Schema existingSchema, Schema targetSchema) { + private final boolean dropUnusedColumns; + + private EvolveSchemaVisitor( + TableIdentifier identifier, + UpdateSchema api, + Schema existingSchema, + Schema targetSchema, + boolean dropUnusedColumns) { + this.identifier = identifier; this.api = api; this.existingSchema = existingSchema; this.targetSchema = targetSchema; + this.dropUnusedColumns = dropUnusedColumns; } /** @@ -70,12 +82,18 @@ private EvolveSchemaVisitor(UpdateSchema api, Schema existingSchema, Schema targ * @param api an UpdateSchema for adding changes * @param existingSchema an existing schema * @param targetSchema a new schema to compare with the existing + * @param dropUnusedColumns whether to drop columns not present in target schema */ - public static void visit(UpdateSchema api, Schema existingSchema, Schema targetSchema) { + public static void visit( + TableIdentifier identifier, + UpdateSchema api, + Schema existingSchema, + Schema targetSchema, + boolean dropUnusedColumns) { visit( targetSchema, -1, - new EvolveSchemaVisitor(api, existingSchema, targetSchema), + new EvolveSchemaVisitor(identifier, api, existingSchema, targetSchema, dropUnusedColumns), new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema)); } @@ -103,11 +121,16 @@ public Boolean struct(Types.StructType struct, Integer partnerId, List after = columnName; } - // Ensure that unused fields are made optional for (Types.NestedField existingField : partnerStruct.fields()) { if (struct.field(existingField.name()) == null) { - if (existingField.isRequired()) { - this.api.makeColumnOptional(this.existingSchema.findColumnName(existingField.fieldId())); + String columnName = this.existingSchema.findColumnName(existingField.fieldId()); + if (dropUnusedColumns) { + LOG.debug("{}: Dropping column: {}", identifier.name(), columnName); + this.api.deleteColumn(columnName); + } else { + if (existingField.isRequired()) { + this.api.makeColumnOptional(columnName); + } } } } diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 2c08a3486e7c..8a8362a41996 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -89,8 +89,8 @@ String branch(TableIdentifier identifier, String branch) { return branch(identifier, branch, true); } - ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input) { - return schema(identifier, input, true); + ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input, boolean dropUnusedColumns) { + return schema(identifier, input, true, dropUnusedColumns); } PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) { @@ -124,7 +124,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe } private ResolvedSchemaInfo schema( - TableIdentifier identifier, Schema input, boolean allowRefresh) { + TableIdentifier identifier, Schema input, boolean allowRefresh, boolean dropUnusedColumns) { CacheItem cached = tableCache.get(identifier); Schema compatible = null; if (cached != null && cached.tableExists) { @@ -139,7 +139,7 @@ private ResolvedSchemaInfo schema( for (Map.Entry tableSchema : cached.tableSchemas.entrySet()) { CompareSchemasVisitor.Result result = - CompareSchemasVisitor.visit(input, tableSchema.getValue(), true); + CompareSchemasVisitor.visit(input, tableSchema.getValue(), true, dropUnusedColumns); if (result == CompareSchemasVisitor.Result.SAME) { ResolvedSchemaInfo newResult = new ResolvedSchemaInfo( @@ -157,7 +157,7 @@ private ResolvedSchemaInfo schema( if (needsRefresh(cached, allowRefresh)) { refreshTable(identifier); - return schema(identifier, input, false); + return schema(identifier, input, false, dropUnusedColumns); } else if (compatible != null) { ResolvedSchemaInfo newResult = new ResolvedSchemaInfo( diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index cadfe345980c..d8809efbe541 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -43,10 +43,12 @@ class TableUpdater { private static final Logger LOG = LoggerFactory.getLogger(TableUpdater.class); private final TableMetadataCache cache; private final Catalog catalog; + private final boolean dropUnusedColumns; - TableUpdater(TableMetadataCache cache, Catalog catalog) { + TableUpdater(TableMetadataCache cache, Catalog catalog, boolean dropUnusedColumns) { this.cache = cache; this.catalog = catalog; + this.dropUnusedColumns = dropUnusedColumns; } /** @@ -118,13 +120,15 @@ private void findOrCreateBranch(TableIdentifier identifier, String branch) { private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema( TableIdentifier identifier, Schema schema) { - TableMetadataCache.ResolvedSchemaInfo fromCache = cache.schema(identifier, schema); + TableMetadataCache.ResolvedSchemaInfo fromCache = + cache.schema(identifier, schema, dropUnusedColumns); if (fromCache.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { return fromCache; } else { Table table = catalog.loadTable(identifier); Schema tableSchema = table.schema(); - CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(schema, tableSchema, true); + CompareSchemasVisitor.Result result = + CompareSchemasVisitor.visit(schema, tableSchema, true, dropUnusedColumns); switch (result) { case SAME: cache.update(identifier, table); @@ -141,19 +145,20 @@ private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema( LOG.info( "Triggering schema update for table {} {} to {}", identifier, tableSchema, schema); UpdateSchema updateApi = table.updateSchema(); - EvolveSchemaVisitor.visit(updateApi, tableSchema, schema); + EvolveSchemaVisitor.visit(identifier, updateApi, tableSchema, schema, dropUnusedColumns); try { updateApi.commit(); cache.update(identifier, table); TableMetadataCache.ResolvedSchemaInfo comparisonAfterMigration = - cache.schema(identifier, schema); + cache.schema(identifier, schema, dropUnusedColumns); Schema newSchema = comparisonAfterMigration.resolvedTableSchema(); LOG.info("Table {} schema updated from {} to {}", identifier, tableSchema, newSchema); return comparisonAfterMigration; } catch (CommitFailedException e) { cache.invalidate(identifier); - TableMetadataCache.ResolvedSchemaInfo newSchema = cache.schema(identifier, schema); + TableMetadataCache.ResolvedSchemaInfo newSchema = + cache.schema(identifier, schema, dropUnusedColumns); if (newSchema.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { LOG.debug("Table {} schema updated concurrently to {}", identifier, schema); return newSchema; diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java index 385a354889fb..cc8e6898d2ed 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java @@ -33,6 +33,9 @@ class TestCompareSchemasVisitor { + private static final boolean DROP_COLUMNS = true; + private static final boolean PRESERVE_COLUMNS = false; + @Test void testSchema() { assertThat( @@ -226,4 +229,63 @@ void testListChanged() { optional(2, "list1", ListType.ofOptional(3, IntegerType.get()))))) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } + + @Test + void testDropUnusedColumnsEnabled() { + Schema dataSchema = new Schema(optional(1, "id", IntegerType.get())); + Schema tableSchema = + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())); + + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testDropUnusedColumnsWithRequiredField() { + Schema dataSchema = new Schema(optional(1, "id", IntegerType.get())); + Schema tableSchema = + new Schema(optional(1, "id", IntegerType.get()), required(2, "data", StringType.get())); + + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testDropUnusedColumnsWhenInputHasMoreFields() { + Schema dataSchema = + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())); + Schema tableSchema = new Schema(optional(1, "id", IntegerType.get())); + + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testDropUnusedColumnsInNestedStruct() { + Schema dataSchema = + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "struct1", StructType.of(optional(3, "field1", StringType.get())))); + Schema tableSchema = + new Schema( + optional(1, "id", IntegerType.get()), + optional( + 2, + "struct1", + StructType.of( + optional(3, "field1", StringType.get()), + optional(4, "field2", IntegerType.get())))); + + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, PRESERVE_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); + } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index b660d8e285d9..2711df72cab1 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -822,6 +822,55 @@ void testCommitsOnceWhenConcurrentDuplicateCommit(boolean overwriteMode) throws assertThat(totalAddedRecords).isEqualTo(records.size()); } + @Test + void testOptInDropUnusedColumns() throws Exception { + Schema schema1 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "extra", Types.StringType.get())); + + Schema schema2 = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE, "t1"); + catalog.createTable(tableIdentifier, schema1); + + List rows = + Lists.newArrayList( + // Drop columns + new DynamicIcebergDataImpl(schema2, "t1", "main", PartitionSpec.unpartitioned()), + // Re-add columns + new DynamicIcebergDataImpl(schema1, "t1", "main", PartitionSpec.unpartitioned())); + + DataStream dataStream = + env.fromData(rows, TypeInformation.of(new TypeHint<>() {})); + env.setParallelism(1); + + DynamicIcebergSink.forInput(dataStream) + .generator(new Generator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .immediateTableUpdate(true) + .dropUnusedColumns(true) + .append(); + + env.execute("Test Drop Unused Columns"); + + Table table = catalog.loadTable(tableIdentifier); + table.refresh(); + + assertThat(table.schema().columns()).hasSize(2); + assertThat(table.schema().findField("id")).isNotNull(); + assertThat(table.schema().findField("data")).isNotNull(); + assertThat(table.schema().findField("extra")).isNull(); + + List records = Lists.newArrayList(IcebergGenerics.read(table).build()); + assertThat(records).hasSize(2); + } + /** * Represents a concurrent duplicate commit during an ongoing commit operation, which can happen * in production scenarios when using REST catalog. diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java index 22655ff99f86..d68dd58c08fc 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java @@ -59,6 +59,7 @@ void testDynamicTableUpdateOperatorNewTable() throws Exception { DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( CATALOG_EXTENSION.catalogLoader(), + false, cacheMaximumSize, cacheRefreshMs, inputSchemaCacheMaximumSize, @@ -92,6 +93,7 @@ void testDynamicTableUpdateOperatorSchemaChange() throws Exception { DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( CATALOG_EXTENSION.catalogLoader(), + false, cacheMaximumSize, cacheRefreshMs, inputSchemaCacheMaximumSize, @@ -119,4 +121,84 @@ void testDynamicTableUpdateOperatorSchemaChange() throws Exception { assertThat(output2).isEqualTo(output); assertThat(catalog.loadTable(table).schema().schemaId()).isEqualTo(output.schema().schemaId()); } + + @Test + void testDynamicTableUpdateOperatorPreserveUnusedColumns() throws Exception { + int cacheMaximumSize = 10; + int cacheRefreshMs = 1000; + int inputSchemaCacheMaximumSize = 10; + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier table = TableIdentifier.of(TABLE); + + DynamicTableUpdateOperator operator = + new DynamicTableUpdateOperator( + CATALOG_EXTENSION.catalogLoader(), + false, // dropUnusedColumns = false (default) + cacheMaximumSize, + cacheRefreshMs, + inputSchemaCacheMaximumSize, + TableCreator.DEFAULT); + operator.open(null); + + catalog.createTable(table, SCHEMA2); + + DynamicRecordInternal input = + new DynamicRecordInternal( + TABLE, + "branch", + SCHEMA1, + GenericRowData.of(1), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + DynamicRecordInternal output = operator.map(input); + + Schema tableSchema = catalog.loadTable(table).schema(); + assertThat(tableSchema.columns()).hasSize(2); + assertThat(tableSchema.findField("id")).isNotNull(); + assertThat(tableSchema.findField("data")).isNotNull(); + assertThat(tableSchema.findField("data").isOptional()).isTrue(); + assertThat(input).isEqualTo(output); + } + + @Test + void testDynamicTableUpdateOperatorDropUnusedColumns() throws Exception { + int cacheMaximumSize = 10; + int cacheRefreshMs = 1000; + int inputSchemaCacheMaximumSize = 10; + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier table = TableIdentifier.of(TABLE); + + DynamicTableUpdateOperator operator = + new DynamicTableUpdateOperator( + CATALOG_EXTENSION.catalogLoader(), + // Drop unused columns + true, + cacheMaximumSize, + cacheRefreshMs, + inputSchemaCacheMaximumSize, + TableCreator.DEFAULT); + operator.open(null); + + catalog.createTable(table, SCHEMA2); + + DynamicRecordInternal input = + new DynamicRecordInternal( + TABLE, + "branch", + SCHEMA1, + GenericRowData.of(1), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + DynamicRecordInternal output = operator.map(input); + + Schema tableSchema = catalog.loadTable(table).schema(); + assertThat(tableSchema.columns()).hasSize(1); + assertThat(tableSchema.findField("id")).isNotNull(); + assertThat(tableSchema.findField("data")).isNull(); + assertThat(input).isEqualTo(output); + } } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java index d416e7ec1fc6..027adc4031bd 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.Schema; import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types; @@ -48,6 +49,10 @@ public class TestEvolveSchemaVisitor { + private static final TableIdentifier TABLE = TableIdentifier.of("table"); + private static final boolean DROP_COLUMNS = true; + private static final boolean PRESERVE_COLUMNS = false; + private static List primitiveTypes() { return Lists.newArrayList( StringType.get(), @@ -89,7 +94,7 @@ private static Types.NestedField[] primitiveFields( public void testAddTopLevelPrimitives() { Schema targetSchema = new Schema(primitiveFields(0, primitiveTypes())); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); assertThat(targetSchema.asStruct()).isEqualTo(updateApi.apply().asStruct()); } @@ -99,12 +104,59 @@ public void testMakeTopLevelPrimitivesOptional() { assertThat(existingSchema.columns().stream().allMatch(Types.NestedField::isRequired)).isTrue(); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(updateApi, existingSchema, new Schema()); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, new Schema(), PRESERVE_COLUMNS); Schema newSchema = updateApi.apply(); assertThat(newSchema.asStruct().fields()).hasSize(14); assertThat(newSchema.columns().stream().allMatch(Types.NestedField::isOptional)).isTrue(); } + @Test + public void testDropUnusedColumns() { + Schema existingSchema = + new Schema( + optional(1, "a", StringType.get()), + optional( + 2, + "b", + StructType.of( + optional(4, "nested1", StringType.get()), + optional(5, "nested2", StringType.get()))), + optional(3, "c", IntegerType.get())); + + Schema targetSchema = + new Schema( + optional(1, "a", StringType.get()), + optional(2, "b", StructType.of(optional(5, "nested2", StringType.get())))); + + UpdateSchema updateApi = loadUpdateApi(existingSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, DROP_COLUMNS); + + Schema newSchema = updateApi.apply(); + assertThat(newSchema.sameSchema(targetSchema)).isTrue(); + } + + @Test + public void testPreserveUnusedColumns() { + Schema existingSchema = + new Schema( + optional(1, "a", StringType.get()), + optional( + 2, + "b", + StructType.of( + optional(4, "nested1", StringType.get()), + optional(5, "nested2", StringType.get()))), + optional(3, "c", IntegerType.get())); + + Schema targetSchema = new Schema(optional(1, "a", StringType.get())); + + UpdateSchema updateApi = loadUpdateApi(existingSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); + + Schema newSchema = updateApi.apply(); + assertThat(newSchema.sameSchema(existingSchema)).isTrue(); + } + @Test public void testIdentifyFieldsByName() { Schema existingSchema = @@ -112,7 +164,7 @@ public void testIdentifyFieldsByName() { UpdateSchema updateApi = loadUpdateApi(existingSchema); Schema newSchema = new Schema(Arrays.asList(Types.NestedField.optional(-1, "myField", Types.LongType.get()))); - EvolveSchemaVisitor.visit(updateApi, existingSchema, newSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, newSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().sameSchema(existingSchema)).isTrue(); } @@ -125,7 +177,7 @@ public void testChangeOrderTopLevelPrimitives() { new Schema( Arrays.asList(optional(2, "b", StringType.get()), optional(1, "a", StringType.get()))); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -134,7 +186,7 @@ public void testAddTopLevelListOfPrimitives() { for (PrimitiveType primitiveType : primitiveTypes()) { Schema targetSchema = new Schema(optional(1, "aList", ListType.ofOptional(2, primitiveType))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -146,7 +198,7 @@ public void testMakeTopLevelListOfPrimitivesOptional() { new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); Schema targetSchema = new Schema(); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); Schema expectedSchema = new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); assertThat(updateApi.apply().asStruct()).isEqualTo(expectedSchema.asStruct()); @@ -159,7 +211,7 @@ public void testAddTopLevelMapOfPrimitives() { Schema targetSchema = new Schema(optional(1, "aMap", MapType.ofOptional(2, 3, primitiveType, primitiveType))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -171,7 +223,7 @@ public void testAddTopLevelStructOfPrimitives() { new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), currentSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), currentSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(currentSchema.asStruct()); } } @@ -184,7 +236,7 @@ public void testAddNestedPrimitive() { new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -199,7 +251,7 @@ public void testMakeNestedPrimitiveOptional() { new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -210,7 +262,7 @@ public void testAddNestedPrimitives() { Schema targetSchema = new Schema(optional(1, "aStruct", StructType.of(primitiveFields(1, primitiveTypes())))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -240,7 +292,7 @@ public void testAddNestedLists() { ListType.ofOptional( 10, DecimalType.of(11, 20)))))))))))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -279,7 +331,7 @@ public void testAddNestedStruct() { "aString", StringType.get())))))))))))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -314,7 +366,7 @@ public void testAddNestedMaps() { 12, 13, StringType.get(), StringType.get())))))))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(updateApi, new Schema(), targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -326,7 +378,11 @@ public void testDetectInvalidTopLevelList() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - loadUpdateApi(currentSchema), currentSchema, targetSchema)) + TABLE, + loadUpdateApi(currentSchema), + currentSchema, + targetSchema, + PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aList.element: string -> long") .isInstanceOf(IllegalArgumentException.class); } @@ -343,7 +399,11 @@ public void testDetectInvalidTopLevelMapValue() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - loadUpdateApi(currentSchema), currentSchema, targetSchema)) + TABLE, + loadUpdateApi(currentSchema), + currentSchema, + targetSchema, + PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aMap.value: string -> long") .isInstanceOf(IllegalArgumentException.class); } @@ -358,7 +418,11 @@ public void testDetectInvalidTopLevelMapKey() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - loadUpdateApi(currentSchema), currentSchema, targetSchema)) + TABLE, + loadUpdateApi(currentSchema), + currentSchema, + targetSchema, + PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aMap.key: string -> uuid") .isInstanceOf(IllegalArgumentException.class); } @@ -370,7 +434,7 @@ public void testTypePromoteIntegerToLong() { Schema targetSchema = new Schema(required(1, "aCol", LongType.get())); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); Schema applied = updateApi.apply(); assertThat(applied.asStruct().fields()).hasSize(1); assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(LongType.get()); @@ -383,7 +447,7 @@ public void testTypePromoteFloatToDouble() { Schema targetSchema = new Schema(required(1, "aCol", DoubleType.get())); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); Schema applied = updateApi.apply(); assertThat(applied.asStruct().fields()).hasSize(1); assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(DoubleType.get()); @@ -396,7 +460,11 @@ public void testInvalidTypePromoteDoubleToFloat() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - loadUpdateApi(currentSchema), currentSchema, targetSchema)) + TABLE, + loadUpdateApi(currentSchema), + currentSchema, + targetSchema, + PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aCol: double -> float") .isInstanceOf(IllegalArgumentException.class); } @@ -409,7 +477,7 @@ public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() { Schema targetSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -452,7 +520,7 @@ public void testAddPrimitiveToNestedStruct() { optional(6, "time", TimeType.get()))))))))); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(updateApi, existingSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -464,7 +532,11 @@ public void testReplaceListWithPrimitive() { assertThatThrownBy( () -> EvolveSchemaVisitor.visit( - loadUpdateApi(currentSchema), currentSchema, targetSchema)) + TABLE, + loadUpdateApi(currentSchema), + currentSchema, + targetSchema, + PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aColumn: list -> string") .isInstanceOf(IllegalArgumentException.class); } @@ -501,7 +573,7 @@ public void addNewTopLevelStruct() { optional(7, "d1", StructType.of(optional(8, "d2", StringType.get())))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -553,7 +625,7 @@ public void testAppendNestedStruct() { StringType.get())))))))))))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -573,7 +645,7 @@ public void testMakeNestedStructOptional() { optional( 3, "s3", StructType.of(optional(4, "s4", StringType.get())))))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(updateApi, currentSchema, targetSchema); + EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); assertThat(getNestedSchemaWithOptionalModifier(true).asStruct()) .isEqualTo(updateApi.apply().asStruct()); } diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index bf5b9f562f9a..7f91d2f8d585 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -53,17 +53,22 @@ void testCaching() { catalog.createTable(tableIdentifier, SCHEMA); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); + Schema schema1 = cache.schema(tableIdentifier, SCHEMA, false).resolvedTableSchema(); assertThat(schema1.sameSchema(SCHEMA)).isTrue(); assertThat( - cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).resolvedTableSchema()) + cache + .schema(tableIdentifier, SerializationUtils.clone(SCHEMA), false) + .resolvedTableSchema()) .isEqualTo(schema1); - assertThat(cache.schema(tableIdentifier, SCHEMA2)).isEqualTo(TableMetadataCache.NOT_FOUND); + assertThat(cache.schema(tableIdentifier, SCHEMA2, false)) + .isEqualTo(TableMetadataCache.NOT_FOUND); - schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); + schema1 = cache.schema(tableIdentifier, SCHEMA, false).resolvedTableSchema(); assertThat( - cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).resolvedTableSchema()) + cache + .schema(tableIdentifier, SerializationUtils.clone(SCHEMA), false) + .resolvedTableSchema()) .isEqualTo(schema1); } @@ -73,9 +78,9 @@ void testCacheInvalidationAfterSchemaChange() { TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); - Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); + Schema schema1 = cache.schema(tableIdentifier, SCHEMA, false).resolvedTableSchema(); assertThat(schema1.sameSchema(SCHEMA)).isTrue(); catalog.dropTable(tableIdentifier); @@ -83,7 +88,7 @@ void testCacheInvalidationAfterSchemaChange() { tableUpdater.update( tableIdentifier, "main", SCHEMA2, PartitionSpec.unpartitioned(), TableCreator.DEFAULT); - Schema schema2 = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); + Schema schema2 = cache.schema(tableIdentifier, SCHEMA2, false).resolvedTableSchema(); assertThat(schema2.sameSchema(SCHEMA2)).isTrue(); } @@ -111,11 +116,11 @@ void testNoCacheRefreshingBeforeRefreshIntervalElapses() { cache.update(tableIdentifier, table); // Cache schema - Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); + Schema schema = cache.schema(tableIdentifier, SCHEMA2, false).resolvedTableSchema(); assertThat(schema.sameSchema(SCHEMA2)).isTrue(); // Cache schema with fewer fields - TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA); + TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA, false); assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue(); assertThat(schemaInfo.compareResult()) .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index 1d4461698746..bcc5d8064517 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -56,7 +56,7 @@ void testTableCreation(@TempDir Path tempDir) { catalog.createNamespace(Namespace.of("myNamespace")); TableIdentifier tableIdentifier = TableIdentifier.parse("myNamespace.myTable"); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); String locationOverride = tempDir.toString() + "/custom-path"; Map tableProperties = Map.of("key", "value"); @@ -69,7 +69,8 @@ void testTableCreation(@TempDir Path tempDir) { assertThat(catalog.tableExists(tableIdentifier)).isTrue(); assertThat(catalog.loadTable(tableIdentifier).properties().get("key")).isEqualTo("value"); assertThat(catalog.loadTable(tableIdentifier).location()).isEqualTo(locationOverride); - TableMetadataCache.ResolvedSchemaInfo cachedSchema = cache.schema(tableIdentifier, SCHEMA); + TableMetadataCache.ResolvedSchemaInfo cachedSchema = + cache.schema(tableIdentifier, SCHEMA, false); assertThat(cachedSchema.resolvedTableSchema().sameSchema(SCHEMA)).isTrue(); } @@ -78,7 +79,7 @@ void testTableAlreadyExists() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); // Make the table non-existent in cache cache.exists(tableIdentifier); @@ -98,7 +99,7 @@ void testBranchCreationAndCaching() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); catalog.createTable(tableIdentifier, SCHEMA); tableUpdater.update( @@ -116,7 +117,7 @@ void testSpecCreation() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("data", 10).build(); tableUpdater.update(tableIdentifier, "main", SCHEMA, spec, TableCreator.DEFAULT); @@ -132,8 +133,8 @@ void testInvalidateOldCacheEntryOnUpdate() { TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - cache.schema(tableIdentifier, SCHEMA); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + cache.schema(tableIdentifier, SCHEMA, false); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); Schema updated = tableUpdater @@ -146,7 +147,8 @@ void testInvalidateOldCacheEntryOnUpdate() { .f0 .resolvedTableSchema(); assertThat(updated.sameSchema(SCHEMA2)).isTrue(); - assertThat(cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema().sameSchema(SCHEMA2)) + assertThat( + cache.schema(tableIdentifier, SCHEMA2, false).resolvedTableSchema().sameSchema(SCHEMA2)) .isTrue(); } @@ -156,7 +158,7 @@ void testLastResultInvalidation() { TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); // Initialize cache tableUpdater.update( @@ -167,7 +169,7 @@ void testLastResultInvalidation() { catalog.createTable(tableIdentifier, SCHEMA2); // Cache still stores the old information - assertThat(cache.schema(tableIdentifier, SCHEMA2).compareResult()) + assertThat(cache.schema(tableIdentifier, SCHEMA2, false).compareResult()) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); assertThat( @@ -186,4 +188,27 @@ void testLastResultInvalidation() { assertThat(cache.getInternalCache().get(tableIdentifier).inputSchemas()) .doesNotContainKey(SCHEMA2); } + + @Test + void testDropUnusedColumns() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); + + final boolean dropUnusedColumns = true; + TableUpdater tableUpdater = new TableUpdater(cache, catalog, dropUnusedColumns); + + catalog.createTable(tableIdentifier, SCHEMA2); + + Tuple2 result = + tableUpdater.update( + tableIdentifier, "main", SCHEMA, PartitionSpec.unpartitioned(), TableCreator.DEFAULT); + + assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME); + Schema tableSchema = catalog.loadTable(tableIdentifier).schema(); + assertThat(tableSchema.columns()).hasSize(2); + assertThat(tableSchema.findField("id")).isNotNull(); + assertThat(tableSchema.findField("data")).isNotNull(); + assertThat(tableSchema.findField("extra")).isNull(); + } }