diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java index 60561b0f56bf..cb4fc62c01b8 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java @@ -20,6 +20,8 @@ import java.util.List; import java.util.Map; +import javax.annotation.Nullable; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.schema.SchemaWithPartnerVisitor; @@ -43,26 +45,31 @@ public class CompareSchemasVisitor extends SchemaWithPartnerVisitor { private final Schema tableSchema; + private final boolean caseSensitive; private final boolean dropUnusedColumns; - private CompareSchemasVisitor(Schema tableSchema, boolean dropUnusedColumns) { + private CompareSchemasVisitor( + Schema tableSchema, boolean caseSensitive, boolean dropUnusedColumns) { this.tableSchema = tableSchema; + this.caseSensitive = caseSensitive; this.dropUnusedColumns = dropUnusedColumns; } - public static Result visit(Schema dataSchema, Schema tableSchema) { - return visit(dataSchema, tableSchema, true, false); - } - public static Result visit( Schema dataSchema, Schema tableSchema, boolean caseSensitive, boolean dropUnusedColumns) { return visit( dataSchema, -1, - new CompareSchemasVisitor(tableSchema, dropUnusedColumns), + new CompareSchemasVisitor(tableSchema, caseSensitive, dropUnusedColumns), new PartnerIdByNameAccessors(tableSchema, caseSensitive)); } + @VisibleForTesting + @Deprecated + public static Result visit(Schema dataSchema, Schema tableSchema) { + return visit(dataSchema, tableSchema, true, false); + } + @Override public Result schema(Schema dataSchema, Integer tableSchemaId, Result downstream) { if (tableSchemaId == null) { @@ -92,7 +99,7 @@ public Result struct(Types.StructType struct, Integer tableSchemaId, List { private final Schema tableSchema; - private boolean caseSensitive = true; + private boolean caseSensitive; - PartnerIdByNameAccessors(Schema tableSchema) { + PartnerIdByNameAccessors(Schema tableSchema, boolean caseSensitive) { this.tableSchema = tableSchema; - } - - private PartnerIdByNameAccessors(Schema tableSchema, boolean caseSensitive) { - this(tableSchema); this.caseSensitive = caseSensitive; } @@ -211,8 +219,7 @@ public Integer fieldPartner(Integer tableSchemaFieldId, int fieldId, String name struct = tableSchema.findField(tableSchemaFieldId).type().asStructType(); } - Types.NestedField field = - caseSensitive ? struct.field(name) : struct.caseInsensitiveField(name); + Types.NestedField field = getFieldFromStruct(name, struct, caseSensitive); if (field != null) { return field.fieldId(); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java index e1bc8deb9d45..61b1f84a43b8 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java @@ -191,6 +191,7 @@ public static class Builder { private int cacheMaximumSize = 100; private long cacheRefreshMs = 1_000; private int inputSchemasPerTableCacheMaximumSize = 10; + private boolean caseSensitive = true; Builder() {} @@ -353,6 +354,15 @@ public Builder inputSchemasPerTableCacheMaxSize(int inputSchemasPerTableCache return this; } + /** + * Set whether schema field name matching should be case-sensitive. The default is to match the + * field names case-sensitive. + */ + public Builder caseSensitive(boolean newCaseSensitive) { + this.caseSensitive = newCaseSensitive; + return this; + } + private String operatorName(String suffix) { return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; } @@ -399,11 +409,12 @@ public DataStreamSink append() { generator, catalogLoader, immediateUpdate, - dropUnusedColumns, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize, - tableCreator)) + tableCreator, + caseSensitive, + dropUnusedColumns)) .uid(prefixIfNotNull(uidPrefix, "-generator")) .name(operatorName("generator")) .returns(type); @@ -418,11 +429,12 @@ public DataStreamSink append() { .map( new DynamicTableUpdateOperator( catalogLoader, - dropUnusedColumns, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize, - tableCreator)) + tableCreator, + caseSensitive, + dropUnusedColumns)) .uid(prefixIfNotNull(uidPrefix, "-updater")) .name(operatorName("Updater")) .returns(type) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java index 427aa6ceafba..07dfad2780f7 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java @@ -45,6 +45,7 @@ class DynamicRecordProcessor extends ProcessFunction extends ProcessFunction generator, CatalogLoader catalogLoader, boolean immediateUpdate, - boolean dropUnusedColumns, int cacheMaximumSize, long cacheRefreshMs, int inputSchemasPerTableCacheMaximumSize, - TableCreator tableCreator) { + TableCreator tableCreator, + boolean caseSensitive, + boolean dropUnusedColumns) { this.generator = generator; this.catalogLoader = catalogLoader; this.immediateUpdate = immediateUpdate; - this.dropUnusedColumns = dropUnusedColumns; this.cacheMaximumSize = cacheMaximumSize; this.cacheRefreshMs = cacheRefreshMs; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; this.tableCreator = tableCreator; + this.caseSensitive = caseSensitive; + this.dropUnusedColumns = dropUnusedColumns; } @Override @@ -78,12 +81,17 @@ public void open(OpenContext openContext) throws Exception { Catalog catalog = catalogLoader.loadCatalog(); this.tableCache = new TableMetadataCache( - catalog, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize); + catalog, + cacheMaximumSize, + cacheRefreshMs, + inputSchemasPerTableCacheMaximumSize, + caseSensitive, + dropUnusedColumns); this.hashKeyGenerator = new HashKeyGenerator( cacheMaximumSize, getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks()); if (immediateUpdate) { - updater = new TableUpdater(tableCache, catalog, dropUnusedColumns); + updater = new TableUpdater(tableCache, catalog, caseSensitive, dropUnusedColumns); } else { updateStream = new OutputTag<>( @@ -109,7 +117,7 @@ public void collect(DynamicRecord data) { TableMetadataCache.ResolvedSchemaInfo foundSchema = exists - ? tableCache.schema(data.tableIdentifier(), data.schema(), dropUnusedColumns) + ? tableCache.schema(data.tableIdentifier(), data.schema()) : TableMetadataCache.NOT_FOUND; PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(), data.spec()) : null; diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java index 8f38d4f8be0c..456f20adf59f 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java @@ -43,22 +43,25 @@ class DynamicTableUpdateOperator private final long cacheRefreshMs; private final int inputSchemasPerTableCacheMaximumSize; private final TableCreator tableCreator; + private final boolean caseSensitive; private transient TableUpdater updater; DynamicTableUpdateOperator( CatalogLoader catalogLoader, - boolean dropUnusedColumns, int cacheMaximumSize, long cacheRefreshMs, int inputSchemasPerTableCacheMaximumSize, - TableCreator tableCreator) { + TableCreator tableCreator, + boolean caseSensitive, + boolean dropUnusedColumns) { this.catalogLoader = catalogLoader; - this.dropUnusedColumns = dropUnusedColumns; this.cacheMaximumSize = cacheMaximumSize; this.cacheRefreshMs = cacheRefreshMs; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; this.tableCreator = tableCreator; + this.caseSensitive = caseSensitive; + this.dropUnusedColumns = dropUnusedColumns; } @Override @@ -68,8 +71,14 @@ public void open(OpenContext openContext) throws Exception { this.updater = new TableUpdater( new TableMetadataCache( - catalog, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize), + catalog, + cacheMaximumSize, + cacheRefreshMs, + inputSchemasPerTableCacheMaximumSize, + caseSensitive, + dropUnusedColumns), catalog, + caseSensitive, dropUnusedColumns); } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java index e106cf5754b3..d9747d201e3d 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java @@ -59,6 +59,7 @@ public class EvolveSchemaVisitor extends SchemaWithPartnerVisitor Types.StructType partnerStruct = findFieldType(partnerId).asStructType(); String after = null; for (Types.NestedField targetField : struct.fields()) { - Types.NestedField nestedField = partnerStruct.field(targetField.name()); + Types.NestedField nestedField = + CompareSchemasVisitor.getFieldFromStruct( + targetField.name(), partnerStruct, caseSensitive); final String columnName; if (nestedField != null) { updateColumn(nestedField, targetField); columnName = this.existingSchema.findColumnName(nestedField.fieldId()); } else { addColumn(partnerId, targetField); - columnName = this.targetSchema.findColumnName(targetField.fieldId()); + columnName = targetSchema.findColumnName(targetField.fieldId()); } setPosition(columnName, after); @@ -122,7 +130,11 @@ public Boolean struct(Types.StructType struct, Integer partnerId, List } for (Types.NestedField existingField : partnerStruct.fields()) { - if (struct.field(existingField.name()) == null) { + Types.NestedField targetField = + caseSensitive + ? struct.field(existingField.name()) + : struct.caseInsensitiveField(existingField.name()); + if (targetField == null) { String columnName = this.existingSchema.findColumnName(existingField.fieldId()); if (dropUnusedColumns) { LOG.debug("{}: Dropping column: {}", identifier.name(), columnName); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java index 3be8bbcd9123..fdefc01402ac 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java @@ -55,10 +55,24 @@ class TableMetadataCache { private final Clock cacheRefreshClock; private final int inputSchemasPerTableCacheMaximumSize; private final Map tableCache; + private final boolean caseSensitive; + private final boolean dropUnusedColumns; TableMetadataCache( - Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) { - this(catalog, maximumSize, refreshMs, inputSchemasPerTableCacheMaximumSize, Clock.systemUTC()); + Catalog catalog, + int maximumSize, + long refreshMs, + int inputSchemasPerTableCacheMaximumSize, + boolean caseSensitive, + boolean dropUnusedColumns) { + this( + catalog, + maximumSize, + refreshMs, + inputSchemasPerTableCacheMaximumSize, + caseSensitive, + dropUnusedColumns, + Clock.systemUTC()); } @VisibleForTesting @@ -67,12 +81,16 @@ class TableMetadataCache { int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize, + boolean caseSensitive, + boolean dropUnusedColumns, Clock cacheRefreshClock) { this.catalog = catalog; this.refreshMs = refreshMs; - this.cacheRefreshClock = cacheRefreshClock; this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize; this.tableCache = new LRUCache<>(maximumSize); + this.caseSensitive = caseSensitive; + this.dropUnusedColumns = dropUnusedColumns; + this.cacheRefreshClock = cacheRefreshClock; } Tuple2 exists(TableIdentifier identifier) { @@ -90,8 +108,8 @@ String branch(TableIdentifier identifier, String branch) { return branch(identifier, branch, true); } - ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input, boolean dropUnusedColumns) { - return schema(identifier, input, true, dropUnusedColumns); + ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input) { + return schema(identifier, input, true); } PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) { @@ -125,7 +143,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe } private ResolvedSchemaInfo schema( - TableIdentifier identifier, Schema input, boolean allowRefresh, boolean dropUnusedColumns) { + TableIdentifier identifier, Schema input, boolean allowRefresh) { CacheItem cached = tableCache.get(identifier); Schema compatible = null; if (cached != null && cached.tableExists) { @@ -140,7 +158,8 @@ private ResolvedSchemaInfo schema( for (Map.Entry tableSchema : cached.tableSchemas.entrySet()) { CompareSchemasVisitor.Result result = - CompareSchemasVisitor.visit(input, tableSchema.getValue(), true, dropUnusedColumns); + CompareSchemasVisitor.visit( + input, tableSchema.getValue(), caseSensitive, dropUnusedColumns); if (result == CompareSchemasVisitor.Result.SAME) { ResolvedSchemaInfo newResult = new ResolvedSchemaInfo( @@ -158,7 +177,7 @@ private ResolvedSchemaInfo schema( if (needsRefresh(identifier, cached, allowRefresh)) { refreshTable(identifier); - return schema(identifier, input, false, dropUnusedColumns); + return schema(identifier, input, false); } else if (compatible != null) { ResolvedSchemaInfo newResult = new ResolvedSchemaInfo( diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java index d8809efbe541..b0bdad8ed1e1 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java @@ -43,11 +43,14 @@ class TableUpdater { private static final Logger LOG = LoggerFactory.getLogger(TableUpdater.class); private final TableMetadataCache cache; private final Catalog catalog; + private final boolean caseSensitive; private final boolean dropUnusedColumns; - TableUpdater(TableMetadataCache cache, Catalog catalog, boolean dropUnusedColumns) { + TableUpdater( + TableMetadataCache cache, Catalog catalog, boolean caseSensitive, boolean dropUnusedColumns) { this.cache = cache; this.catalog = catalog; + this.caseSensitive = caseSensitive; this.dropUnusedColumns = dropUnusedColumns; } @@ -120,15 +123,14 @@ private void findOrCreateBranch(TableIdentifier identifier, String branch) { private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema( TableIdentifier identifier, Schema schema) { - TableMetadataCache.ResolvedSchemaInfo fromCache = - cache.schema(identifier, schema, dropUnusedColumns); + TableMetadataCache.ResolvedSchemaInfo fromCache = cache.schema(identifier, schema); if (fromCache.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { return fromCache; } else { Table table = catalog.loadTable(identifier); Schema tableSchema = table.schema(); CompareSchemasVisitor.Result result = - CompareSchemasVisitor.visit(schema, tableSchema, true, dropUnusedColumns); + CompareSchemasVisitor.visit(schema, tableSchema, caseSensitive, dropUnusedColumns); switch (result) { case SAME: cache.update(identifier, table); @@ -145,20 +147,20 @@ private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema( LOG.info( "Triggering schema update for table {} {} to {}", identifier, tableSchema, schema); UpdateSchema updateApi = table.updateSchema(); - EvolveSchemaVisitor.visit(identifier, updateApi, tableSchema, schema, dropUnusedColumns); + EvolveSchemaVisitor.visit( + identifier, updateApi, tableSchema, schema, caseSensitive, dropUnusedColumns); try { updateApi.commit(); cache.update(identifier, table); TableMetadataCache.ResolvedSchemaInfo comparisonAfterMigration = - cache.schema(identifier, schema, dropUnusedColumns); + cache.schema(identifier, schema); Schema newSchema = comparisonAfterMigration.resolvedTableSchema(); LOG.info("Table {} schema updated from {} to {}", identifier, tableSchema, newSchema); return comparisonAfterMigration; } catch (CommitFailedException e) { cache.invalidate(identifier); - TableMetadataCache.ResolvedSchemaInfo newSchema = - cache.schema(identifier, schema, dropUnusedColumns); + TableMetadataCache.ResolvedSchemaInfo newSchema = cache.schema(identifier, schema); if (newSchema.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) { LOG.debug("Table {} schema updated concurrently to {}", identifier, schema); return newSchema; diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java index cc8e6898d2ed..9e4d600f9325 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java @@ -33,6 +33,9 @@ class TestCompareSchemasVisitor { + private static final boolean CASE_SENSITIVE = true; + private static final boolean CASE_INSENSITIVE = false; + private static final boolean DROP_COLUMNS = true; private static final boolean PRESERVE_COLUMNS = false; @@ -47,7 +50,9 @@ void testSchema() { new Schema( optional(1, "id", IntegerType.get(), "comment"), optional(2, "data", StringType.get()), - optional(3, "extra", StringType.get())))) + optional(3, "extra", StringType.get())), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SAME); } @@ -62,7 +67,9 @@ void testSchemaDifferentId() { new Schema( optional(1, "id", IntegerType.get()), optional(2, "data", StringType.get()), - optional(3, "extra", StringType.get())))) + optional(3, "extra", StringType.get())), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SAME); } @@ -75,7 +82,9 @@ void testSchemaDifferent() { optional(1, "data", StringType.get()), optional(2, "extra", StringType.get())), new Schema( - optional(0, "id", IntegerType.get()), optional(1, "data", StringType.get())))) + optional(0, "id", IntegerType.get()), optional(1, "data", StringType.get())), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @@ -88,7 +97,9 @@ void testSchemaWithMoreColumns() { new Schema( optional(0, "id", IntegerType.get()), optional(1, "data", StringType.get()), - optional(2, "extra", StringType.get())))) + optional(2, "extra", StringType.get())), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); } @@ -99,7 +110,9 @@ void testDifferentType() { new Schema( optional(1, "id", LongType.get()), optional(2, "extra", StringType.get())), new Schema( - optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())))) + optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @@ -110,7 +123,9 @@ void testCompatibleType() { new Schema( optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())), new Schema( - optional(1, "id", LongType.get()), optional(2, "extra", StringType.get())))) + optional(1, "id", LongType.get()), optional(2, "extra", StringType.get())), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); } @@ -120,9 +135,11 @@ void testRequiredChangeForMatchingField() { new Schema(optional(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); Schema tableSchema = new Schema(required(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); - assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema)) + assertThat( + CompareSchemasVisitor.visit(dataSchema, tableSchema, CASE_SENSITIVE, PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); - assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema)) + assertThat( + CompareSchemasVisitor.visit(tableSchema, dataSchema, CASE_SENSITIVE, PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SAME); } @@ -131,9 +148,11 @@ void testRequiredChangeForNonMatchingField() { Schema dataSchema = new Schema(optional(1, "id", IntegerType.get())); Schema tableSchema = new Schema(optional(1, "id", IntegerType.get()), required(2, "extra", StringType.get())); - assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema)) + assertThat( + CompareSchemasVisitor.visit(dataSchema, tableSchema, CASE_SENSITIVE, PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); - assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema)) + assertThat( + CompareSchemasVisitor.visit(tableSchema, dataSchema, CASE_SENSITIVE, PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @@ -142,7 +161,8 @@ void testNoRequiredChangeForNonMatchingField() { Schema dataSchema = new Schema(required(1, "id", IntegerType.get())); Schema tableSchema = new Schema(required(1, "id", IntegerType.get()), optional(2, "extra", StringType.get())); - assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema)) + assertThat( + CompareSchemasVisitor.visit(dataSchema, tableSchema, CASE_SENSITIVE, PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); } @@ -155,8 +175,9 @@ void testStructDifferentId() { optional(2, "struct1", StructType.of(optional(3, "extra", IntegerType.get())))), new Schema( optional(0, "id", IntegerType.get()), - optional( - 1, "struct1", StructType.of(optional(2, "extra", IntegerType.get())))))) + optional(1, "struct1", StructType.of(optional(2, "extra", IntegerType.get())))), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SAME); } @@ -169,8 +190,9 @@ void testStructChanged() { optional(1, "struct1", StructType.of(optional(2, "extra", LongType.get())))), new Schema( optional(1, "id", IntegerType.get()), - optional( - 2, "struct1", StructType.of(optional(3, "extra", IntegerType.get())))))) + optional(2, "struct1", StructType.of(optional(3, "extra", IntegerType.get())))), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @@ -185,7 +207,9 @@ void testMapDifferentId() { new Schema( optional(0, "id", IntegerType.get()), optional( - 1, "map1", MapType.ofOptional(2, 3, IntegerType.get(), StringType.get()))))) + 1, "map1", MapType.ofOptional(2, 3, IntegerType.get(), StringType.get()))), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SAME); } @@ -200,7 +224,9 @@ void testMapChanged() { new Schema( optional(1, "id", IntegerType.get()), optional( - 2, "map1", MapType.ofOptional(3, 4, IntegerType.get(), StringType.get()))))) + 2, "map1", MapType.ofOptional(3, 4, IntegerType.get(), StringType.get()))), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @@ -213,7 +239,9 @@ void testListDifferentId() { optional(2, "list1", ListType.ofOptional(3, IntegerType.get()))), new Schema( optional(0, "id", IntegerType.get()), - optional(1, "list1", ListType.ofOptional(2, IntegerType.get()))))) + optional(1, "list1", ListType.ofOptional(2, IntegerType.get()))), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SAME); } @@ -226,10 +254,76 @@ void testListChanged() { optional(1, "list1", ListType.ofOptional(2, LongType.get()))), new Schema( optional(1, "id", IntegerType.get()), - optional(2, "list1", ListType.ofOptional(3, IntegerType.get()))))) + optional(2, "list1", ListType.ofOptional(3, IntegerType.get()))), + CASE_SENSITIVE, + PRESERVE_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); + } + + @Test + void testCaseInsensitiveFieldMatching() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "ID", IntegerType.get()), + optional(2, "Data", StringType.get()), + optional(3, "EXTRA", StringType.get())), + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())), + CASE_INSENSITIVE, + PRESERVE_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testCaseSensitiveFieldMatchingDefault() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "ID", IntegerType.get()), + optional(2, "Data", StringType.get()), + optional(3, "EXTRA", StringType.get())), + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "data", StringType.get()), + optional(3, "extra", StringType.get())), + CASE_SENSITIVE, + DROP_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } + @Test + void testCaseInsensitiveNestedStruct() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(1, "ID", IntegerType.get()), + optional(2, "STRUCT1", StructType.of(optional(3, "NESTED", StringType.get())))), + new Schema( + optional(1, "id", IntegerType.get()), + optional(2, "struct1", StructType.of(optional(3, "nested", StringType.get())))), + CASE_INSENSITIVE, + PRESERVE_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.SAME); + } + + @Test + void testCaseInsensitiveWithMoreColumns() { + assertThat( + CompareSchemasVisitor.visit( + new Schema( + optional(0, "ID", IntegerType.get()), optional(1, "DATA", StringType.get())), + new Schema( + optional(0, "id", IntegerType.get()), + optional(1, "data", StringType.get()), + optional(2, "extra", StringType.get())), + CASE_INSENSITIVE, + PRESERVE_COLUMNS)) + .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); + } + @Test void testDropUnusedColumnsEnabled() { Schema dataSchema = new Schema(optional(1, "id", IntegerType.get())); @@ -239,7 +333,7 @@ void testDropUnusedColumnsEnabled() { optional(2, "data", StringType.get()), optional(3, "extra", StringType.get())); - assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, CASE_SENSITIVE, DROP_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @@ -249,7 +343,7 @@ void testDropUnusedColumnsWithRequiredField() { Schema tableSchema = new Schema(optional(1, "id", IntegerType.get()), required(2, "data", StringType.get())); - assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, CASE_SENSITIVE, DROP_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @@ -262,7 +356,7 @@ void testDropUnusedColumnsWhenInputHasMoreFields() { optional(3, "extra", StringType.get())); Schema tableSchema = new Schema(optional(1, "id", IntegerType.get())); - assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, CASE_SENSITIVE, DROP_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); } @@ -282,10 +376,11 @@ void testDropUnusedColumnsInNestedStruct() { optional(3, "field1", StringType.get()), optional(4, "field2", IntegerType.get())))); - assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, DROP_COLUMNS)) + assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, CASE_SENSITIVE, DROP_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); - assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true, PRESERVE_COLUMNS)) + assertThat( + CompareSchemasVisitor.visit(dataSchema, tableSchema, CASE_SENSITIVE, PRESERVE_COLUMNS)) .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 0c07bc946189..d9602e12eb2e 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -977,6 +977,91 @@ void testOptInDropUnusedColumns() throws Exception { assertThat(records).hasSize(2); } + @Test + void testCaseInsensitiveSchemaMatching() throws Exception { + Schema lowerCaseSchema = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + Schema upperCaseSchema = + new Schema( + Types.NestedField.optional(1, "ID", Types.IntegerType.get()), + Types.NestedField.optional(2, "DATA", Types.StringType.get())); + + Schema mixedCaseSchema = + new Schema( + Types.NestedField.optional(1, "Id", Types.IntegerType.get()), + Types.NestedField.optional(2, "Data", Types.StringType.get())); + + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + lowerCaseSchema, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + upperCaseSchema, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + mixedCaseSchema, "t1", "main", PartitionSpec.unpartitioned())); + + DataStream dataStream = + env.fromData(rows, TypeInformation.of(new TypeHint<>() {})); + env.setParallelism(2); + + DynamicIcebergSink.forInput(dataStream) + .generator(new Generator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .writeParallelism(2) + .immediateTableUpdate(true) + .caseSensitive(false) + .append(); + + env.execute("Test Case Insensitive Iceberg DataStream"); + + verifyResults(rows); + } + + @Test + void testCaseSensitiveSchemaMatchingCreatesNewFields() throws Exception { + Schema lowerCaseSchema = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + Schema upperCaseSchema = + new Schema( + Types.NestedField.optional(1, "ID", Types.IntegerType.get()), + Types.NestedField.optional(2, "DATA", Types.StringType.get())); + + List rows = + Lists.newArrayList( + new DynamicIcebergDataImpl( + lowerCaseSchema, "t1", "main", PartitionSpec.unpartitioned()), + new DynamicIcebergDataImpl( + upperCaseSchema, "t1", "main", PartitionSpec.unpartitioned())); + + DataStream dataStream = + env.fromData(rows, TypeInformation.of(new TypeHint<>() {})); + env.setParallelism(2); + + DynamicIcebergSink.forInput(dataStream) + .generator(new Generator()) + .catalogLoader(CATALOG_EXTENSION.catalogLoader()) + .writeParallelism(2) + .immediateTableUpdate(true) + .caseSensitive(true) + .append(); + + env.execute("Test Case Sensitive Iceberg DataStream"); + + Table table = CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of(DATABASE, "t1")); + Schema resultSchema = table.schema(); + assertThat(resultSchema.columns()).hasSize(4); + assertThat(resultSchema.findField("id")).isNotNull(); + assertThat(resultSchema.findField("ID")).isNotNull(); + assertThat(resultSchema.findField("data")).isNotNull(); + assertThat(resultSchema.findField("DATA")).isNotNull(); + } + /** * Represents a concurrent duplicate commit during an ongoing commit operation, which can happen * in production scenarios when using REST catalog. diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java index d68dd58c08fc..1c8e6df8591d 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java @@ -32,9 +32,17 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class TestDynamicTableUpdateOperator { + private static final boolean CASE_SENSITIVE = true; + private static final boolean CASE_INSENSITIVE = false; + + private static final boolean DROP_COLUMNS = true; + private static final boolean PRESERVE_COLUMNS = false; + @RegisterExtension private static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension(DATABASE, TABLE); @@ -59,11 +67,12 @@ void testDynamicTableUpdateOperatorNewTable() throws Exception { DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( CATALOG_EXTENSION.catalogLoader(), - false, cacheMaximumSize, cacheRefreshMs, inputSchemaCacheMaximumSize, - TableCreator.DEFAULT); + TableCreator.DEFAULT, + CASE_SENSITIVE, + PRESERVE_COLUMNS); operator.open(null); DynamicRecordInternal input = @@ -93,11 +102,12 @@ void testDynamicTableUpdateOperatorSchemaChange() throws Exception { DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( CATALOG_EXTENSION.catalogLoader(), - false, cacheMaximumSize, cacheRefreshMs, inputSchemaCacheMaximumSize, - TableCreator.DEFAULT); + TableCreator.DEFAULT, + CASE_SENSITIVE, + PRESERVE_COLUMNS); operator.open(null); catalog.createTable(table, SCHEMA1); @@ -122,6 +132,59 @@ void testDynamicTableUpdateOperatorSchemaChange() throws Exception { assertThat(catalog.loadTable(table).schema().schemaId()).isEqualTo(output.schema().schemaId()); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCaseInSensitivity(boolean caseSensitive) throws Exception { + int cacheMaximumSize = 10; + int cacheRefreshMs = 1000; + int inputSchemaCacheMaximumSize = 10; + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier table = TableIdentifier.of(TABLE); + + Schema initialSchema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + Schema caseSensitiveSchema = + new Schema(Types.NestedField.required(1, "Id", Types.IntegerType.get())); + + DynamicTableUpdateOperator operator = + new DynamicTableUpdateOperator( + CATALOG_EXTENSION.catalogLoader(), + cacheMaximumSize, + cacheRefreshMs, + inputSchemaCacheMaximumSize, + TableCreator.DEFAULT, + caseSensitive, + PRESERVE_COLUMNS); + operator.open(null); + + catalog.createTable(table, initialSchema); + DynamicRecordInternal input = + new DynamicRecordInternal( + TABLE, + "branch", + caseSensitiveSchema, + GenericRowData.of(1, "test"), + PartitionSpec.unpartitioned(), + 42, + false, + Collections.emptySet()); + DynamicRecordInternal output = operator.map(input); + + if (caseSensitive) { + // Schema changes due to case sensitivity + Schema expectedSchema = + new Schema( + Types.NestedField.optional(2, "Id", Types.IntegerType.get()), + Types.NestedField.optional(1, "id", Types.IntegerType.get())); + Schema tableSchema = catalog.loadTable(table).schema(); + assertThat(tableSchema.sameSchema(expectedSchema)).isTrue(); + assertThat(output.schema().sameSchema(expectedSchema)).isTrue(); + } else { + // No schema change due to case insensitivity + assertThat(catalog.loadTable(table).schema().sameSchema(initialSchema)).isTrue(); + assertThat(output.schema().sameSchema(initialSchema)).isTrue(); + } + } + @Test void testDynamicTableUpdateOperatorPreserveUnusedColumns() throws Exception { int cacheMaximumSize = 10; @@ -133,11 +196,12 @@ void testDynamicTableUpdateOperatorPreserveUnusedColumns() throws Exception { DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( CATALOG_EXTENSION.catalogLoader(), - false, // dropUnusedColumns = false (default) cacheMaximumSize, cacheRefreshMs, inputSchemaCacheMaximumSize, - TableCreator.DEFAULT); + TableCreator.DEFAULT, + CASE_SENSITIVE, + PRESERVE_COLUMNS); operator.open(null); catalog.createTable(table, SCHEMA2); @@ -173,12 +237,12 @@ void testDynamicTableUpdateOperatorDropUnusedColumns() throws Exception { DynamicTableUpdateOperator operator = new DynamicTableUpdateOperator( CATALOG_EXTENSION.catalogLoader(), - // Drop unused columns - true, cacheMaximumSize, cacheRefreshMs, inputSchemaCacheMaximumSize, - TableCreator.DEFAULT); + TableCreator.DEFAULT, + CASE_INSENSITIVE, + DROP_COLUMNS); operator.open(null); catalog.createTable(table, SCHEMA2); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java index 027adc4031bd..d2da73c66973 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java @@ -50,6 +50,10 @@ public class TestEvolveSchemaVisitor { private static final TableIdentifier TABLE = TableIdentifier.of("table"); + + private static final boolean CASE_SENSITIVE = true; + private static final boolean CASE_INSENSITIVE = false; + private static final boolean DROP_COLUMNS = true; private static final boolean PRESERVE_COLUMNS = false; @@ -94,7 +98,8 @@ private static Types.NestedField[] primitiveFields( public void testAddTopLevelPrimitives() { Schema targetSchema = new Schema(primitiveFields(0, primitiveTypes())); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(targetSchema.asStruct()).isEqualTo(updateApi.apply().asStruct()); } @@ -104,7 +109,8 @@ public void testMakeTopLevelPrimitivesOptional() { assertThat(existingSchema.columns().stream().allMatch(Types.NestedField::isRequired)).isTrue(); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, new Schema(), PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, existingSchema, new Schema(), CASE_SENSITIVE, PRESERVE_COLUMNS); Schema newSchema = updateApi.apply(); assertThat(newSchema.asStruct().fields()).hasSize(14); assertThat(newSchema.columns().stream().allMatch(Types.NestedField::isOptional)).isTrue(); @@ -129,7 +135,8 @@ public void testDropUnusedColumns() { optional(2, "b", StructType.of(optional(5, "nested2", StringType.get())))); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, DROP_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, DROP_COLUMNS); Schema newSchema = updateApi.apply(); assertThat(newSchema.sameSchema(targetSchema)).isTrue(); @@ -151,7 +158,8 @@ public void testPreserveUnusedColumns() { Schema targetSchema = new Schema(optional(1, "a", StringType.get())); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); Schema newSchema = updateApi.apply(); assertThat(newSchema.sameSchema(existingSchema)).isTrue(); @@ -164,7 +172,8 @@ public void testIdentifyFieldsByName() { UpdateSchema updateApi = loadUpdateApi(existingSchema); Schema newSchema = new Schema(Arrays.asList(Types.NestedField.optional(-1, "myField", Types.LongType.get()))); - EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, newSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, existingSchema, newSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().sameSchema(existingSchema)).isTrue(); } @@ -177,7 +186,8 @@ public void testChangeOrderTopLevelPrimitives() { new Schema( Arrays.asList(optional(2, "b", StringType.get()), optional(1, "a", StringType.get()))); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -186,7 +196,8 @@ public void testAddTopLevelListOfPrimitives() { for (PrimitiveType primitiveType : primitiveTypes()) { Schema targetSchema = new Schema(optional(1, "aList", ListType.ofOptional(2, primitiveType))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -198,7 +209,8 @@ public void testMakeTopLevelListOfPrimitivesOptional() { new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); Schema targetSchema = new Schema(); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); Schema expectedSchema = new Schema(optional(1, "aList", ListType.ofRequired(2, primitiveType))); assertThat(updateApi.apply().asStruct()).isEqualTo(expectedSchema.asStruct()); @@ -211,7 +223,8 @@ public void testAddTopLevelMapOfPrimitives() { Schema targetSchema = new Schema(optional(1, "aMap", MapType.ofOptional(2, 3, primitiveType, primitiveType))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -223,7 +236,8 @@ public void testAddTopLevelStructOfPrimitives() { new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), currentSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, new Schema(), currentSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(currentSchema.asStruct()); } } @@ -236,7 +250,8 @@ public void testAddNestedPrimitive() { new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -251,7 +266,8 @@ public void testMakeNestedPrimitiveOptional() { new Schema( optional(1, "aStruct", StructType.of(optional(2, "primitive", primitiveType)))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } } @@ -262,7 +278,8 @@ public void testAddNestedPrimitives() { Schema targetSchema = new Schema(optional(1, "aStruct", StructType.of(primitiveFields(1, primitiveTypes())))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -292,7 +309,8 @@ public void testAddNestedLists() { ListType.ofOptional( 10, DecimalType.of(11, 20)))))))))))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -331,7 +349,8 @@ public void testAddNestedStruct() { "aString", StringType.get())))))))))))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -366,7 +385,8 @@ public void testAddNestedMaps() { 12, 13, StringType.get(), StringType.get())))))))); UpdateSchema updateApi = loadUpdateApi(new Schema()); - EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -382,6 +402,7 @@ public void testDetectInvalidTopLevelList() { loadUpdateApi(currentSchema), currentSchema, targetSchema, + true, PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aList.element: string -> long") .isInstanceOf(IllegalArgumentException.class); @@ -403,6 +424,7 @@ public void testDetectInvalidTopLevelMapValue() { loadUpdateApi(currentSchema), currentSchema, targetSchema, + true, PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aMap.value: string -> long") .isInstanceOf(IllegalArgumentException.class); @@ -422,6 +444,7 @@ public void testDetectInvalidTopLevelMapKey() { loadUpdateApi(currentSchema), currentSchema, targetSchema, + true, PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aMap.key: string -> uuid") .isInstanceOf(IllegalArgumentException.class); @@ -434,7 +457,8 @@ public void testTypePromoteIntegerToLong() { Schema targetSchema = new Schema(required(1, "aCol", LongType.get())); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); Schema applied = updateApi.apply(); assertThat(applied.asStruct().fields()).hasSize(1); assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(LongType.get()); @@ -447,7 +471,8 @@ public void testTypePromoteFloatToDouble() { Schema targetSchema = new Schema(required(1, "aCol", DoubleType.get())); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); Schema applied = updateApi.apply(); assertThat(applied.asStruct().fields()).hasSize(1); assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(DoubleType.get()); @@ -464,6 +489,7 @@ public void testInvalidTypePromoteDoubleToFloat() { loadUpdateApi(currentSchema), currentSchema, targetSchema, + true, PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aCol: double -> float") .isInstanceOf(IllegalArgumentException.class); @@ -477,7 +503,8 @@ public void testTypePromoteDecimalToFixedScaleWithWiderPrecision() { Schema targetSchema = new Schema(required(1, "aCol", DecimalType.of(22, 1))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -520,7 +547,8 @@ public void testAddPrimitiveToNestedStruct() { optional(6, "time", TimeType.get()))))))))); UpdateSchema updateApi = loadUpdateApi(existingSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -536,6 +564,7 @@ public void testReplaceListWithPrimitive() { loadUpdateApi(currentSchema), currentSchema, targetSchema, + true, PRESERVE_COLUMNS)) .hasMessage("Cannot change column type: aColumn: list -> string") .isInstanceOf(IllegalArgumentException.class); @@ -573,7 +602,8 @@ public void addNewTopLevelStruct() { optional(7, "d1", StructType.of(optional(8, "d2", StringType.get())))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -625,7 +655,8 @@ public void testAppendNestedStruct() { StringType.get())))))))))))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct()); } @@ -645,7 +676,8 @@ public void testMakeNestedStructOptional() { optional( 3, "s3", StructType.of(optional(4, "s4", StringType.get())))))))); UpdateSchema updateApi = loadUpdateApi(currentSchema); - EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema, PRESERVE_COLUMNS); + EvolveSchemaVisitor.visit( + TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(getNestedSchemaWithOptionalModifier(true).asStruct()) .isEqualTo(updateApi.apply().asStruct()); } @@ -682,6 +714,82 @@ private static Schema getNestedSchemaWithOptionalModifier(boolean nestedIsOption 9, "s4", StringType.get())))))))))))))); } + @Test + public void testCaseInsensitiveAddField() { + Schema existingSchema = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + Schema targetSchema = + new Schema( + Types.NestedField.optional(1, "ID", Types.IntegerType.get()), + Types.NestedField.optional(2, "NAME", Types.StringType.get()), + Types.NestedField.optional(3, "AGE", Types.IntegerType.get())); + + UpdateSchema updateApi = loadUpdateApi(existingSchema); + EvolveSchemaVisitor.visit( + TABLE, updateApi, existingSchema, targetSchema, CASE_INSENSITIVE, PRESERVE_COLUMNS); + Schema result = updateApi.apply(); + assertThat(result.columns()).hasSize(3); + assertThat(result.findField("AGE")).isNotNull(); + } + + @Test + public void testCaseInsensitiveMakeFieldOptional() { + Schema existingSchema = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get())); + Schema targetSchema = new Schema(Types.NestedField.optional(1, "ID", Types.IntegerType.get())); + + UpdateSchema updateApi = loadUpdateApi(existingSchema); + EvolveSchemaVisitor.visit( + TABLE, updateApi, existingSchema, targetSchema, CASE_INSENSITIVE, PRESERVE_COLUMNS); + Schema result = updateApi.apply(); + assertThat(result.findField("name").isOptional()).isTrue(); + } + + @Test + public void testCaseInsensitiveNestedStructField() { + Schema existingSchema = + new Schema( + optional(1, "struct1", StructType.of(optional(2, "field1", Types.StringType.get())))); + Schema targetSchema = + new Schema( + optional( + 1, + "STRUCT1", + StructType.of( + optional(2, "FIELD1", Types.StringType.get()), + optional(3, "FIELD2", Types.IntegerType.get())))); + + UpdateSchema updateApi = loadUpdateApi(existingSchema); + EvolveSchemaVisitor.visit( + TABLE, updateApi, existingSchema, targetSchema, CASE_INSENSITIVE, PRESERVE_COLUMNS); + Schema result = updateApi.apply(); + Types.StructType struct = result.findField("struct1").type().asStructType(); + assertThat(struct.fields()).hasSize(2); + assertThat(struct.field("FIELD2")).isNotNull(); + } + + @Test + public void testCaseSensitiveDoesNotMatch() { + Schema existingSchema = + new Schema(Types.NestedField.optional(1, "id", Types.IntegerType.get())); + Schema targetSchema = + new Schema( + Types.NestedField.optional(1, "ID", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + + UpdateSchema updateApi = loadUpdateApi(existingSchema); + EvolveSchemaVisitor.visit( + TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE, PRESERVE_COLUMNS); + Schema result = updateApi.apply(); + assertThat(result.columns()).hasSize(3); + assertThat(result.findField("ID")).isNotNull(); + assertThat(result.findField("id")).isNotNull(); + } + private static UpdateSchema loadUpdateApi(Schema schema) { try { Constructor constructor = diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java index d696059902f0..8a17c707f84a 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java @@ -36,6 +36,12 @@ public class TestTableMetadataCache extends TestFlinkIcebergSinkBase { + private static final boolean CASE_SENSITIVE = true; + private static final boolean CASE_INSENSITIVE = false; + + private static final boolean DROP_COLUMNS = true; + private static final boolean PRESERVE_COLUMNS = false; + static final Schema SCHEMA = new Schema( Types.NestedField.optional(1, "id", Types.IntegerType.get()), @@ -47,29 +53,35 @@ public class TestTableMetadataCache extends TestFlinkIcebergSinkBase { Types.NestedField.optional(2, "data", Types.StringType.get()), Types.NestedField.optional(3, "extra", Types.StringType.get())); + static final Schema SCHEMA_UPPERCASE = + new Schema( + Types.NestedField.optional(1, "ID", Types.IntegerType.get()), + Types.NestedField.optional(2, "DATA", Types.StringType.get())); + + static final Schema SCHEMA_MIXEDCASE = + new Schema( + Types.NestedField.optional(1, "Id", Types.IntegerType.get()), + Types.NestedField.optional(2, "Data", Types.StringType.get())); + @Test void testCaching() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); - Schema schema1 = cache.schema(tableIdentifier, SCHEMA, false).resolvedTableSchema(); + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); assertThat(schema1.sameSchema(SCHEMA)).isTrue(); assertThat( - cache - .schema(tableIdentifier, SerializationUtils.clone(SCHEMA), false) - .resolvedTableSchema()) + cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).resolvedTableSchema()) .isEqualTo(schema1); - assertThat(cache.schema(tableIdentifier, SCHEMA2, false)) - .isEqualTo(TableMetadataCache.NOT_FOUND); + assertThat(cache.schema(tableIdentifier, SCHEMA2)).isEqualTo(TableMetadataCache.NOT_FOUND); - schema1 = cache.schema(tableIdentifier, SCHEMA, false).resolvedTableSchema(); + schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); assertThat( - cache - .schema(tableIdentifier, SerializationUtils.clone(SCHEMA), false) - .resolvedTableSchema()) + cache.schema(tableIdentifier, SerializationUtils.clone(SCHEMA)).resolvedTableSchema()) .isEqualTo(schema1); } @@ -78,10 +90,11 @@ void testCacheInvalidationAfterSchemaChange() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, CASE_SENSITIVE, PRESERVE_COLUMNS); - Schema schema1 = cache.schema(tableIdentifier, SCHEMA, false).resolvedTableSchema(); + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); assertThat(schema1.sameSchema(SCHEMA)).isTrue(); catalog.dropTable(tableIdentifier); @@ -93,7 +106,7 @@ void testCacheInvalidationAfterSchemaChange() { PartitionSpec.unpartitioned(), TableCreator.DEFAULT); - Schema schema2 = cache.schema(tableIdentifier, SCHEMA2, false).resolvedTableSchema(); + Schema schema2 = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); assertThat(schema2.sameSchema(SCHEMA2)).isTrue(); } @@ -102,7 +115,8 @@ void testCachingDisabled() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10); + TableMetadataCache cache = + new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(cache.getInternalCache()).isEmpty(); } @@ -117,15 +131,21 @@ void testNoCacheRefreshingBeforeRefreshIntervalElapses() { // Init cache TableMetadataCache cache = new TableMetadataCache( - catalog, 10, 100L, 10, Clock.fixed(Instant.now(), ZoneId.systemDefault())); + catalog, + 10, + 100L, + 10, + CASE_INSENSITIVE, + PRESERVE_COLUMNS, + Clock.fixed(Instant.now(), ZoneId.systemDefault())); cache.update(tableIdentifier, table); // Cache schema - Schema schema = cache.schema(tableIdentifier, SCHEMA2, false).resolvedTableSchema(); + Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema(); assertThat(schema.sameSchema(SCHEMA2)).isTrue(); // Cache schema with fewer fields - TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA, false); + TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA); assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue(); assertThat(schemaInfo.compareResult()) .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED); @@ -140,9 +160,10 @@ void testNoCacheRefreshingBeforeRefreshIntervalElapses() { void testNoSuchNamespaceExceptionHandling() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.of("nonexistent_namespace", "myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); - TableMetadataCache.ResolvedSchemaInfo result = cache.schema(tableIdentifier, SCHEMA, false); + TableMetadataCache.ResolvedSchemaInfo result = cache.schema(tableIdentifier, SCHEMA); assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND); assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull(); @@ -152,11 +173,48 @@ void testNoSuchNamespaceExceptionHandling() { void testNoSuchTableExceptionHandling() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.nonexistent_table"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); - TableMetadataCache.ResolvedSchemaInfo result = cache.schema(tableIdentifier, SCHEMA, false); + TableMetadataCache.ResolvedSchemaInfo result = cache.schema(tableIdentifier, SCHEMA); assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND); assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull(); } + + @Test + void testCaseInsensitiveCaching() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_INSENSITIVE, PRESERVE_COLUMNS); + + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); + assertThat(schema1.sameSchema(SCHEMA)).isTrue(); + + Schema schemaUpperCase = cache.schema(tableIdentifier, SCHEMA_UPPERCASE).resolvedTableSchema(); + assertThat(schemaUpperCase).isEqualTo(schema1); + + Schema schemaMixedCase = cache.schema(tableIdentifier, SCHEMA_MIXEDCASE).resolvedTableSchema(); + assertThat(schemaMixedCase).isEqualTo(schema1); + } + + @Test + void testCaseSensitiveCachingDoesNotMatch() { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); + catalog.createTable(tableIdentifier, SCHEMA); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); + + Schema schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema(); + assertThat(schema1.sameSchema(SCHEMA)).isTrue(); + + assertThat(cache.schema(tableIdentifier, SCHEMA_UPPERCASE)) + .isEqualTo(TableMetadataCache.NOT_FOUND); + + assertThat(cache.schema(tableIdentifier, SCHEMA_MIXEDCASE)) + .isEqualTo(TableMetadataCache.NOT_FOUND); + } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java index c0b376d30e60..bdc825b44f2a 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java @@ -36,9 +36,17 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; public class TestTableUpdater extends TestFlinkIcebergSinkBase { + private static final boolean CASE_SENSITIVE = true; + private static final boolean CASE_INSENSITIVE = false; + + private static final boolean DROP_COLUMNS = true; + private static final boolean PRESERVE_COLUMNS = false; + static final Schema SCHEMA = new Schema( Types.NestedField.optional(1, "id", Types.IntegerType.get()), @@ -57,8 +65,9 @@ void testTableCreation(@TempDir Path tempDir) { catalog.initialize("catalog", Map.of()); catalog.createNamespace(Namespace.of("myNamespace")); TableIdentifier tableIdentifier = TableIdentifier.parse("myNamespace.myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, CASE_SENSITIVE, PRESERVE_COLUMNS); String locationOverride = tempDir.toString() + "/custom-path"; Map tableProperties = Map.of("key", "value"); @@ -75,8 +84,7 @@ void testTableCreation(@TempDir Path tempDir) { assertThat(catalog.tableExists(tableIdentifier)).isTrue(); assertThat(catalog.loadTable(tableIdentifier).properties().get("key")).isEqualTo("value"); assertThat(catalog.loadTable(tableIdentifier).location()).isEqualTo(locationOverride); - TableMetadataCache.ResolvedSchemaInfo cachedSchema = - cache.schema(tableIdentifier, SCHEMA, false); + TableMetadataCache.ResolvedSchemaInfo cachedSchema = cache.schema(tableIdentifier, SCHEMA); assertThat(cachedSchema.resolvedTableSchema().sameSchema(SCHEMA)).isTrue(); } @@ -84,8 +92,9 @@ void testTableCreation(@TempDir Path tempDir) { void testTableAlreadyExists() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, CASE_SENSITIVE, PRESERVE_COLUMNS); // Make the table non-existent in cache cache.exists(tableIdentifier); @@ -108,8 +117,9 @@ void testTableAlreadyExists() { void testBranchCreationAndCaching() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, CASE_SENSITIVE, PRESERVE_COLUMNS); catalog.createTable(tableIdentifier, SCHEMA); tableUpdater.update( @@ -126,8 +136,9 @@ void testBranchCreationAndCaching() { void testSpecCreation() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, CASE_SENSITIVE, PRESERVE_COLUMNS); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("data", 10).build(); tableUpdater.update( @@ -143,9 +154,10 @@ void testInvalidateOldCacheEntryOnUpdate() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - cache.schema(tableIdentifier, SCHEMA, false); - TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); + cache.schema(tableIdentifier, SCHEMA); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, CASE_SENSITIVE, PRESERVE_COLUMNS); Schema updated = tableUpdater @@ -158,8 +170,7 @@ void testInvalidateOldCacheEntryOnUpdate() { .f0 .resolvedTableSchema(); assertThat(updated.sameSchema(SCHEMA2)).isTrue(); - assertThat( - cache.schema(tableIdentifier, SCHEMA2, false).resolvedTableSchema().sameSchema(SCHEMA2)) + assertThat(cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema().sameSchema(SCHEMA2)) .isTrue(); } @@ -168,8 +179,9 @@ void testLastResultInvalidation() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable"); catalog.createTable(tableIdentifier, SCHEMA); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, CASE_SENSITIVE, PRESERVE_COLUMNS); // Initialize cache tableUpdater.update( @@ -184,7 +196,7 @@ void testLastResultInvalidation() { catalog.createTable(tableIdentifier, SCHEMA2); // Cache still stores the old information - assertThat(cache.schema(tableIdentifier, SCHEMA2, false).compareResult()) + assertThat(cache.schema(tableIdentifier, SCHEMA2).compareResult()) .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED); assertThat( @@ -204,14 +216,59 @@ void testLastResultInvalidation() { .doesNotContainKey(SCHEMA2); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCaseSensitivity(boolean caseSensitive) { + Catalog catalog = CATALOG_EXTENSION.catalog(); + TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, caseSensitive, DROP_COLUMNS); + + TableUpdater tableUpdater = new TableUpdater(cache, catalog, caseSensitive, DROP_COLUMNS); + + Schema schema = + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()), + Types.NestedField.optional(3, "extra", Types.StringType.get())); + + catalog.createTable(tableIdentifier, schema); + + Schema schemaWithUpperCase = + new Schema( + Types.NestedField.optional(1, "Id", Types.IntegerType.get()), + Types.NestedField.optional(2, "Data", Types.StringType.get()), + Types.NestedField.optional(3, "Extra", Types.StringType.get())); + + Tuple2 result = + tableUpdater.update( + tableIdentifier, + SnapshotRef.MAIN_BRANCH, + schemaWithUpperCase, + PartitionSpec.unpartitioned(), + TableCreator.DEFAULT); + + assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME); + + Schema tableSchema = catalog.loadTable(tableIdentifier).schema(); + if (caseSensitive) { + assertThat(tableSchema.columns()).hasSize(3); + assertThat(tableSchema.findField("Id")).isNotNull(); + assertThat(tableSchema.findField("Data")).isNotNull(); + assertThat(tableSchema.findField("Extra")).isNotNull(); + } else { + assertThat(tableSchema.sameSchema(schema)).isTrue(); + } + } + @Test void testDropUnusedColumns() { Catalog catalog = CATALOG_EXTENSION.catalog(); TableIdentifier tableIdentifier = TableIdentifier.parse("myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, DROP_COLUMNS); - final boolean dropUnusedColumns = true; - TableUpdater tableUpdater = new TableUpdater(cache, catalog, dropUnusedColumns); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, CASE_SENSITIVE, DROP_COLUMNS); catalog.createTable(tableIdentifier, SCHEMA2); @@ -236,8 +293,9 @@ void testNamespaceAndTableCreation() { Catalog catalog = CATALOG_EXTENSION.catalog(); SupportsNamespaces namespaceCatalog = (SupportsNamespaces) catalog; TableIdentifier tableIdentifier = TableIdentifier.of("new_namespace", "myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(namespaceCatalog.namespaceExists(Namespace.of("new_namespace"))).isFalse(); assertThat(catalog.tableExists(tableIdentifier)).isFalse(); @@ -265,8 +323,9 @@ void testTableCreationWithExistingNamespace() { namespaceCatalog.createNamespace(namespace); TableIdentifier tableIdentifier = TableIdentifier.of("existing_namespace", "myTable"); - TableMetadataCache cache = new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10); - TableUpdater tableUpdater = new TableUpdater(cache, catalog, false); + TableMetadataCache cache = + new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, CASE_SENSITIVE, PRESERVE_COLUMNS); + TableUpdater tableUpdater = new TableUpdater(cache, catalog, CASE_SENSITIVE, PRESERVE_COLUMNS); assertThat(namespaceCatalog.namespaceExists(namespace)).isTrue(); assertThat(catalog.tableExists(tableIdentifier)).isFalse();