Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
Expand All @@ -43,26 +45,31 @@ public class CompareSchemasVisitor
extends SchemaWithPartnerVisitor<Integer, CompareSchemasVisitor.Result> {

private final Schema tableSchema;
private final boolean caseSensitive;
private final boolean dropUnusedColumns;

private CompareSchemasVisitor(Schema tableSchema, boolean dropUnusedColumns) {
private CompareSchemasVisitor(
Schema tableSchema, boolean caseSensitive, boolean dropUnusedColumns) {
this.tableSchema = tableSchema;
this.caseSensitive = caseSensitive;
this.dropUnusedColumns = dropUnusedColumns;
}

public static Result visit(Schema dataSchema, Schema tableSchema) {
return visit(dataSchema, tableSchema, true, false);
}

public static Result visit(
Schema dataSchema, Schema tableSchema, boolean caseSensitive, boolean dropUnusedColumns) {
return visit(
dataSchema,
-1,
new CompareSchemasVisitor(tableSchema, dropUnusedColumns),
new CompareSchemasVisitor(tableSchema, caseSensitive, dropUnusedColumns),
new PartnerIdByNameAccessors(tableSchema, caseSensitive));
}

@VisibleForTesting
@Deprecated
public static Result visit(Schema dataSchema, Schema tableSchema) {
return visit(dataSchema, tableSchema, true, false);
}

@Override
public Result schema(Schema dataSchema, Integer tableSchemaId, Result downstream) {
if (tableSchemaId == null) {
Expand Down Expand Up @@ -92,7 +99,7 @@ public Result struct(Types.StructType struct, Integer tableSchemaId, List<Result
}

for (Types.NestedField tableField : tableSchemaType.asStructType().fields()) {
if (struct.field(tableField.name()) == null
if (getFieldFromStruct(tableField.name(), struct, caseSensitive) == null
&& (tableField.isRequired() || dropUnusedColumns)) {
// If a field from the table schema does not exist in the input schema, then we won't visit
// it. The only choice is to make the table field optional or drop it.
Expand All @@ -105,18 +112,23 @@ public Result struct(Types.StructType struct, Integer tableSchemaId, List<Result
}

for (int i = 0; i < struct.fields().size(); ++i) {
if (!struct
.fields()
.get(i)
.name()
.equals(tableSchemaType.asStructType().fields().get(i).name())) {
String fieldName = struct.fields().get(i).name();
String tableFieldName = tableSchemaType.asStructType().fields().get(i).name();
if ((caseSensitive && !fieldName.equals(tableFieldName))
|| (!caseSensitive && !fieldName.equalsIgnoreCase(tableFieldName))) {
return Result.DATA_CONVERSION_NEEDED;
}
}

return result;
}

@Nullable
static Types.NestedField getFieldFromStruct(
String fieldName, Types.StructType struct, boolean caseSensitive) {
return caseSensitive ? struct.field(fieldName) : struct.caseInsensitiveField(fieldName);
}

@Override
public Result field(Types.NestedField field, Integer tableSchemaId, Result typeResult) {
if (tableSchemaId == null) {
Expand Down Expand Up @@ -191,14 +203,10 @@ public Result primitive(Type.PrimitiveType primitive, Integer tableSchemaId) {

static class PartnerIdByNameAccessors implements PartnerAccessors<Integer> {
private final Schema tableSchema;
private boolean caseSensitive = true;
private boolean caseSensitive;

PartnerIdByNameAccessors(Schema tableSchema) {
PartnerIdByNameAccessors(Schema tableSchema, boolean caseSensitive) {
this.tableSchema = tableSchema;
}

private PartnerIdByNameAccessors(Schema tableSchema, boolean caseSensitive) {
this(tableSchema);
this.caseSensitive = caseSensitive;
}

Expand All @@ -211,8 +219,7 @@ public Integer fieldPartner(Integer tableSchemaFieldId, int fieldId, String name
struct = tableSchema.findField(tableSchemaFieldId).type().asStructType();
}

Types.NestedField field =
caseSensitive ? struct.field(name) : struct.caseInsensitiveField(name);
Types.NestedField field = getFieldFromStruct(name, struct, caseSensitive);
if (field != null) {
return field.fieldId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public static class Builder<T> {
private int cacheMaximumSize = 100;
private long cacheRefreshMs = 1_000;
private int inputSchemasPerTableCacheMaximumSize = 10;
private boolean caseSensitive = true;

Builder() {}

Expand Down Expand Up @@ -353,6 +354,15 @@ public Builder<T> inputSchemasPerTableCacheMaxSize(int inputSchemasPerTableCache
return this;
}

/**
* Set whether schema field name matching should be case-sensitive. The default is to match the
* field names case-sensitive.
*/
public Builder<T> caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
}

private String operatorName(String suffix) {
return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
}
Expand Down Expand Up @@ -399,11 +409,12 @@ public DataStreamSink<DynamicRecordInternal> append() {
generator,
catalogLoader,
immediateUpdate,
dropUnusedColumns,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize,
tableCreator))
tableCreator,
caseSensitive,
dropUnusedColumns))
.uid(prefixIfNotNull(uidPrefix, "-generator"))
.name(operatorName("generator"))
.returns(type);
Expand All @@ -418,11 +429,12 @@ public DataStreamSink<DynamicRecordInternal> append() {
.map(
new DynamicTableUpdateOperator(
catalogLoader,
dropUnusedColumns,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize,
tableCreator))
tableCreator,
caseSensitive,
dropUnusedColumns))
.uid(prefixIfNotNull(uidPrefix, "-updater"))
.name(operatorName("Updater"))
.returns(type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
private final long cacheRefreshMs;
private final int inputSchemasPerTableCacheMaximumSize;
private final TableCreator tableCreator;
private final boolean caseSensitive;

private transient TableMetadataCache tableCache;
private transient HashKeyGenerator hashKeyGenerator;
Expand All @@ -57,19 +58,21 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
DynamicRecordGenerator<T> generator,
CatalogLoader catalogLoader,
boolean immediateUpdate,
boolean dropUnusedColumns,
int cacheMaximumSize,
long cacheRefreshMs,
int inputSchemasPerTableCacheMaximumSize,
TableCreator tableCreator) {
TableCreator tableCreator,
boolean caseSensitive,
boolean dropUnusedColumns) {
this.generator = generator;
this.catalogLoader = catalogLoader;
this.immediateUpdate = immediateUpdate;
this.dropUnusedColumns = dropUnusedColumns;
this.cacheMaximumSize = cacheMaximumSize;
this.cacheRefreshMs = cacheRefreshMs;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
this.tableCreator = tableCreator;
this.caseSensitive = caseSensitive;
this.dropUnusedColumns = dropUnusedColumns;
}

@Override
Expand All @@ -78,12 +81,17 @@ public void open(OpenContext openContext) throws Exception {
Catalog catalog = catalogLoader.loadCatalog();
this.tableCache =
new TableMetadataCache(
catalog, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize);
catalog,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize,
caseSensitive,
dropUnusedColumns);
this.hashKeyGenerator =
new HashKeyGenerator(
cacheMaximumSize, getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks());
if (immediateUpdate) {
updater = new TableUpdater(tableCache, catalog, dropUnusedColumns);
updater = new TableUpdater(tableCache, catalog, caseSensitive, dropUnusedColumns);
} else {
updateStream =
new OutputTag<>(
Expand All @@ -109,7 +117,7 @@ public void collect(DynamicRecord data) {

TableMetadataCache.ResolvedSchemaInfo foundSchema =
exists
? tableCache.schema(data.tableIdentifier(), data.schema(), dropUnusedColumns)
? tableCache.schema(data.tableIdentifier(), data.schema())
: TableMetadataCache.NOT_FOUND;

PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(), data.spec()) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,25 @@ class DynamicTableUpdateOperator
private final long cacheRefreshMs;
private final int inputSchemasPerTableCacheMaximumSize;
private final TableCreator tableCreator;
private final boolean caseSensitive;

private transient TableUpdater updater;

DynamicTableUpdateOperator(
CatalogLoader catalogLoader,
boolean dropUnusedColumns,
int cacheMaximumSize,
long cacheRefreshMs,
int inputSchemasPerTableCacheMaximumSize,
TableCreator tableCreator) {
TableCreator tableCreator,
boolean caseSensitive,
boolean dropUnusedColumns) {
this.catalogLoader = catalogLoader;
this.dropUnusedColumns = dropUnusedColumns;
this.cacheMaximumSize = cacheMaximumSize;
this.cacheRefreshMs = cacheRefreshMs;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
this.tableCreator = tableCreator;
this.caseSensitive = caseSensitive;
this.dropUnusedColumns = dropUnusedColumns;
}

@Override
Expand All @@ -68,8 +71,14 @@ public void open(OpenContext openContext) throws Exception {
this.updater =
new TableUpdater(
new TableMetadataCache(
catalog, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize),
catalog,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize,
caseSensitive,
dropUnusedColumns),
catalog,
caseSensitive,
dropUnusedColumns);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,21 @@ public class EvolveSchemaVisitor extends SchemaWithPartnerVisitor<Integer, Boole
private final UpdateSchema api;
private final Schema existingSchema;
private final Schema targetSchema;
private final boolean caseSensitive;
private final boolean dropUnusedColumns;

private EvolveSchemaVisitor(
TableIdentifier identifier,
UpdateSchema api,
Schema existingSchema,
Schema targetSchema,
boolean caseSensitive,
boolean dropUnusedColumns) {
this.identifier = identifier;
this.api = api;
this.api = api.caseSensitive(caseSensitive);
this.existingSchema = existingSchema;
this.targetSchema = targetSchema;
this.caseSensitive = caseSensitive;
this.dropUnusedColumns = dropUnusedColumns;
}

Expand All @@ -82,19 +85,22 @@ private EvolveSchemaVisitor(
* @param api an UpdateSchema for adding changes
* @param existingSchema an existing schema
* @param targetSchema a new schema to compare with the existing
* @param caseSensitive whether field name matching should be case-sensitive
* @param dropUnusedColumns whether to drop columns not present in target schema
*/
public static void visit(
TableIdentifier identifier,
UpdateSchema api,
Schema existingSchema,
Schema targetSchema,
boolean caseSensitive,
boolean dropUnusedColumns) {
visit(
targetSchema,
-1,
new EvolveSchemaVisitor(identifier, api, existingSchema, targetSchema, dropUnusedColumns),
new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema));
new EvolveSchemaVisitor(
identifier, api, existingSchema, targetSchema, caseSensitive, dropUnusedColumns),
new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema, caseSensitive));
}

@Override
Expand All @@ -107,22 +113,28 @@ public Boolean struct(Types.StructType struct, Integer partnerId, List<Boolean>
Types.StructType partnerStruct = findFieldType(partnerId).asStructType();
String after = null;
for (Types.NestedField targetField : struct.fields()) {
Types.NestedField nestedField = partnerStruct.field(targetField.name());
Types.NestedField nestedField =
CompareSchemasVisitor.getFieldFromStruct(
targetField.name(), partnerStruct, caseSensitive);
final String columnName;
if (nestedField != null) {
updateColumn(nestedField, targetField);
columnName = this.existingSchema.findColumnName(nestedField.fieldId());
} else {
addColumn(partnerId, targetField);
columnName = this.targetSchema.findColumnName(targetField.fieldId());
columnName = targetSchema.findColumnName(targetField.fieldId());
}

setPosition(columnName, after);
after = columnName;
}

for (Types.NestedField existingField : partnerStruct.fields()) {
if (struct.field(existingField.name()) == null) {
Types.NestedField targetField =
caseSensitive
? struct.field(existingField.name())
: struct.caseInsensitiveField(existingField.name());
if (targetField == null) {
String columnName = this.existingSchema.findColumnName(existingField.fieldId());
if (dropUnusedColumns) {
LOG.debug("{}: Dropping column: {}", identifier.name(), columnName);
Expand Down
Loading