Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions docs/docs/flink-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -497,18 +497,26 @@ The dynamic sink tries to match the schema provided in `DynamicRecord` with the

The dynamic sink maintains an LRU cache for both table metadata and incoming schemas, with eviction based on size and time constraints. When a DynamicRecord contains a schema that is incompatible with the current table schema, a schema update is triggered. This update can occur either immediately or via a centralized executor, depending on the `immediateTableUpdate` configuration. While centralized updates reduce load on the Catalog, they may introduce backpressure on the sink.

Supported schema updates:
#### Supported schema updates

- Adding new columns
- Widening existing column types (e.g., Integer → Long, Float → Double)
- Making required columns optional
- Dropping columns (disabled by default)

Unsupported schema updates:
Dropping columns is disabled by default to prevent issues with late or out-of-order data, as removed fields cannot be easily restored without data loss.

You can opt-in to allow dropping columns (see the configuration options below). Once a column has been dropped, it is
technically still possible to write data to that column because Iceberg maintains all past table schemas. However,
regular queries won't be able to reference the column. If the field was to re-appear as part of a new schema, an
entirely new column would be added, which apart from the name, has nothing in common with the old column, i.e. queries
for the new column will never return data of the old column.

##### Unsupported schema updates

- Dropping columns
- Renaming columns

Dropping columns is avoided to prevent issues with late or out-of-order data, as removed fields cannot be easily restored without data loss. Renaming is unsupported because schema comparison is name-based, and renames would require additional metadata or hints to resolve.
Renaming is unsupported because schema comparison is name-based, and renames would require additional metadata or hints to resolve.

### Caching

Expand Down Expand Up @@ -537,6 +545,7 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are
| `set(String property, String value)` | Set any Iceberg write property (e.g., `"write.format"`, `"write.upsert.enabled"`).Check out all the options here: [write-options](flink-configuration.md#write-options) |
| `setAll(Map<String, String> properties)` | Set multiple properties at once |
| `tableCreator(TableCreator creator)` | When DynamicIcebergSink creates new Iceberg tables, allows overriding how tables are created - setting custom table properties and location based on the table name. |
| `dropUnusedColumns(boolean enabled)` | When enabled, drops all columns from the current table schema which are not contained in the input schema (see the caveats above on dropping columns). |

### Notes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,23 @@ public class CompareSchemasVisitor
extends SchemaWithPartnerVisitor<Integer, CompareSchemasVisitor.Result> {

private final Schema tableSchema;
private final boolean dropUnusedColumns;

private CompareSchemasVisitor(Schema tableSchema) {
private CompareSchemasVisitor(Schema tableSchema, boolean dropUnusedColumns) {
this.tableSchema = tableSchema;
this.dropUnusedColumns = dropUnusedColumns;
}

public static Result visit(Schema dataSchema, Schema tableSchema) {
return visit(dataSchema, tableSchema, true);
return visit(dataSchema, tableSchema, true, false);
}

public static Result visit(Schema dataSchema, Schema tableSchema, boolean caseSensitive) {
public static Result visit(
Schema dataSchema, Schema tableSchema, boolean caseSensitive, boolean dropUnusedColumns) {
return visit(
dataSchema,
-1,
new CompareSchemasVisitor(tableSchema),
new CompareSchemasVisitor(tableSchema, dropUnusedColumns),
new PartnerIdByNameAccessors(tableSchema, caseSensitive));
}

Expand All @@ -70,6 +73,7 @@ public Result schema(Schema dataSchema, Integer tableSchemaId, Result downstream
}

@Override
@SuppressWarnings("CyclomaticComplexity")
public Result struct(Types.StructType struct, Integer tableSchemaId, List<Result> fields) {
if (tableSchemaId == null) {
return Result.SCHEMA_UPDATE_NEEDED;
Expand All @@ -88,10 +92,10 @@ public Result struct(Types.StructType struct, Integer tableSchemaId, List<Result
}

for (Types.NestedField tableField : tableSchemaType.asStructType().fields()) {
if (tableField.isRequired() && struct.field(tableField.name()) == null) {
if (struct.field(tableField.name()) == null
&& (tableField.isRequired() || dropUnusedColumns)) {
// If a field from the table schema does not exist in the input schema, then we won't visit
// it and check for required/optional compatibility. The only choice is to make the table
// field optional.
// it. The only choice is to make the table field optional or drop it.
return Result.SCHEMA_UPDATE_NEEDED;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public static class Builder<T> {
private ReadableConfig readableConfig = new Configuration();
private TableCreator tableCreator = TableCreator.DEFAULT;
private boolean immediateUpdate = false;
private boolean dropUnusedColumns = false;
private int cacheMaximumSize = 100;
private long cacheRefreshMs = 1_000;
private int inputSchemasPerTableCacheMaximumSize = 10;
Expand Down Expand Up @@ -314,6 +315,22 @@ public Builder<T> immediateTableUpdate(boolean newImmediateUpdate) {
return this;
}

/**
* Dropping columns is disabled by default to prevent issues with late or out-of-order data, as
* removed fields cannot be easily restored without data loss.
*
* <p>You can opt-in to allow dropping columns. Once a column has been dropped, it is
* technically still possible to write data to that column because Iceberg maintains all past
* table schemas. However, regular queries won't be able to reference the column. If the field
* was to re-appear as part of a new schema, an entirely new column would be added, which apart
* from the name, has nothing in common with the old column, i.e. queries for the new column
* will never return data of the old column.
*/
public Builder<T> dropUnusedColumns(boolean newDropUnusedColumns) {
this.dropUnusedColumns = newDropUnusedColumns;
return this;
}

/** Maximum size of the caches used in Dynamic Sink for table data and serializers. */
public Builder<T> cacheMaxSize(int maxSize) {
this.cacheMaximumSize = maxSize;
Expand Down Expand Up @@ -382,6 +399,7 @@ public DataStreamSink<DynamicRecordInternal> append() {
generator,
catalogLoader,
immediateUpdate,
dropUnusedColumns,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize,
Expand All @@ -400,6 +418,7 @@ public DataStreamSink<DynamicRecordInternal> append() {
.map(
new DynamicTableUpdateOperator(
catalogLoader,
dropUnusedColumns,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
private final DynamicRecordGenerator<T> generator;
private final CatalogLoader catalogLoader;
private final boolean immediateUpdate;
private final boolean dropUnusedColumns;
private final int cacheMaximumSize;
private final long cacheRefreshMs;
private final int inputSchemasPerTableCacheMaximumSize;
Expand All @@ -56,13 +57,15 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
DynamicRecordGenerator<T> generator,
CatalogLoader catalogLoader,
boolean immediateUpdate,
boolean dropUnusedColumns,
int cacheMaximumSize,
long cacheRefreshMs,
int inputSchemasPerTableCacheMaximumSize,
TableCreator tableCreator) {
this.generator = generator;
this.catalogLoader = catalogLoader;
this.immediateUpdate = immediateUpdate;
this.dropUnusedColumns = dropUnusedColumns;
this.cacheMaximumSize = cacheMaximumSize;
this.cacheRefreshMs = cacheRefreshMs;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
Expand All @@ -80,7 +83,7 @@ public void open(OpenContext openContext) throws Exception {
new HashKeyGenerator(
cacheMaximumSize, getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks());
if (immediateUpdate) {
updater = new TableUpdater(tableCache, catalog);
updater = new TableUpdater(tableCache, catalog, dropUnusedColumns);
} else {
updateStream =
new OutputTag<>(
Expand All @@ -106,7 +109,7 @@ public void collect(DynamicRecord data) {

TableMetadataCache.ResolvedSchemaInfo foundSchema =
exists
? tableCache.schema(data.tableIdentifier(), data.schema())
? tableCache.schema(data.tableIdentifier(), data.schema(), dropUnusedColumns)
: TableMetadataCache.NOT_FOUND;

PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(), data.spec()) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
class DynamicTableUpdateOperator
extends RichMapFunction<DynamicRecordInternal, DynamicRecordInternal> {
private final CatalogLoader catalogLoader;
private final boolean dropUnusedColumns;
private final int cacheMaximumSize;
private final long cacheRefreshMs;
private final int inputSchemasPerTableCacheMaximumSize;
Expand All @@ -47,11 +48,13 @@ class DynamicTableUpdateOperator

DynamicTableUpdateOperator(
CatalogLoader catalogLoader,
boolean dropUnusedColumns,
int cacheMaximumSize,
long cacheRefreshMs,
int inputSchemasPerTableCacheMaximumSize,
TableCreator tableCreator) {
this.catalogLoader = catalogLoader;
this.dropUnusedColumns = dropUnusedColumns;
this.cacheMaximumSize = cacheMaximumSize;
this.cacheRefreshMs = cacheRefreshMs;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
Expand All @@ -66,7 +69,8 @@ public void open(OpenContext openContext) throws Exception {
new TableUpdater(
new TableMetadataCache(
catalog, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize),
catalog);
catalog,
dropUnusedColumns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Visitor class that accumulates the set of changes needed to evolve an existing schema into the
Expand All @@ -36,30 +39,39 @@
* <li>Adding new columns
* <li>Widening the type of existing columsn
* <li>Reordering columns
* <li>Dropping columns (when dropUnusedColumns is enabled)
* </ul>
*
* We don't support:
*
* <ul>
* <li>Dropping columns
* <li>Renaming columns
* </ul>
*
* The reason is that dropping columns would create issues with late / out of order data. Once we
* drop fields, we wouldn't be able to easily add them back later without losing the associated
* data. Renaming columns is not supported because we compare schemas by name, which doesn't allow
* for renaming without additional hints.
* By default, any columns present in the table but absent from the input schema are marked as
* optional to prevent issues caused by late or out-of-order data. If dropUnusedColumns is enabled,
* these columns are removed instead to ensure a strict one-to-one schema alignment.
*/
public class EvolveSchemaVisitor extends SchemaWithPartnerVisitor<Integer, Boolean> {

private static final Logger LOG = LoggerFactory.getLogger(EvolveSchemaVisitor.class);
private final TableIdentifier identifier;
private final UpdateSchema api;
private final Schema existingSchema;
private final Schema targetSchema;

private EvolveSchemaVisitor(UpdateSchema api, Schema existingSchema, Schema targetSchema) {
private final boolean dropUnusedColumns;

private EvolveSchemaVisitor(
TableIdentifier identifier,
UpdateSchema api,
Schema existingSchema,
Schema targetSchema,
boolean dropUnusedColumns) {
this.identifier = identifier;
this.api = api;
this.existingSchema = existingSchema;
this.targetSchema = targetSchema;
this.dropUnusedColumns = dropUnusedColumns;
}

/**
Expand All @@ -70,12 +82,18 @@ private EvolveSchemaVisitor(UpdateSchema api, Schema existingSchema, Schema targ
* @param api an UpdateSchema for adding changes
* @param existingSchema an existing schema
* @param targetSchema a new schema to compare with the existing
* @param dropUnusedColumns whether to drop columns not present in target schema
*/
public static void visit(UpdateSchema api, Schema existingSchema, Schema targetSchema) {
public static void visit(
TableIdentifier identifier,
UpdateSchema api,
Schema existingSchema,
Schema targetSchema,
boolean dropUnusedColumns) {
visit(
targetSchema,
-1,
new EvolveSchemaVisitor(api, existingSchema, targetSchema),
new EvolveSchemaVisitor(identifier, api, existingSchema, targetSchema, dropUnusedColumns),
new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema));
}

Expand Down Expand Up @@ -103,11 +121,16 @@ public Boolean struct(Types.StructType struct, Integer partnerId, List<Boolean>
after = columnName;
}

// Ensure that unused fields are made optional
for (Types.NestedField existingField : partnerStruct.fields()) {
if (struct.field(existingField.name()) == null) {
if (existingField.isRequired()) {
this.api.makeColumnOptional(this.existingSchema.findColumnName(existingField.fieldId()));
String columnName = this.existingSchema.findColumnName(existingField.fieldId());
if (dropUnusedColumns) {
LOG.debug("{}: Dropping column: {}", identifier.name(), columnName);
this.api.deleteColumn(columnName);
} else {
if (existingField.isRequired()) {
this.api.makeColumnOptional(columnName);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ String branch(TableIdentifier identifier, String branch) {
return branch(identifier, branch, true);
}

ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input) {
return schema(identifier, input, true);
ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input, boolean dropUnusedColumns) {
return schema(identifier, input, true, dropUnusedColumns);
}

PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) {
Expand Down Expand Up @@ -124,7 +124,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe
}

private ResolvedSchemaInfo schema(
TableIdentifier identifier, Schema input, boolean allowRefresh) {
TableIdentifier identifier, Schema input, boolean allowRefresh, boolean dropUnusedColumns) {
CacheItem cached = tableCache.get(identifier);
Schema compatible = null;
if (cached != null && cached.tableExists) {
Expand All @@ -139,7 +139,7 @@ private ResolvedSchemaInfo schema(

for (Map.Entry<Integer, Schema> tableSchema : cached.tableSchemas.entrySet()) {
CompareSchemasVisitor.Result result =
CompareSchemasVisitor.visit(input, tableSchema.getValue(), true);
CompareSchemasVisitor.visit(input, tableSchema.getValue(), true, dropUnusedColumns);
if (result == CompareSchemasVisitor.Result.SAME) {
ResolvedSchemaInfo newResult =
new ResolvedSchemaInfo(
Expand All @@ -157,7 +157,7 @@ private ResolvedSchemaInfo schema(

if (needsRefresh(cached, allowRefresh)) {
refreshTable(identifier);
return schema(identifier, input, false);
return schema(identifier, input, false, dropUnusedColumns);
} else if (compatible != null) {
ResolvedSchemaInfo newResult =
new ResolvedSchemaInfo(
Expand Down
Loading