Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,23 @@ public class CompareSchemasVisitor
extends SchemaWithPartnerVisitor<Integer, CompareSchemasVisitor.Result> {

private final Schema tableSchema;
private final boolean dropUnusedColumns;

private CompareSchemasVisitor(Schema tableSchema) {
private CompareSchemasVisitor(Schema tableSchema, boolean dropUnusedColumns) {
this.tableSchema = tableSchema;
this.dropUnusedColumns = dropUnusedColumns;
}

public static Result visit(Schema dataSchema, Schema tableSchema) {
return visit(dataSchema, tableSchema, true);
return visit(dataSchema, tableSchema, true, false);
}

public static Result visit(Schema dataSchema, Schema tableSchema, boolean caseSensitive) {
public static Result visit(
Schema dataSchema, Schema tableSchema, boolean caseSensitive, boolean dropUnusedColumns) {
return visit(
dataSchema,
-1,
new CompareSchemasVisitor(tableSchema),
new CompareSchemasVisitor(tableSchema, dropUnusedColumns),
new PartnerIdByNameAccessors(tableSchema, caseSensitive));
}

Expand All @@ -70,6 +73,7 @@ public Result schema(Schema dataSchema, Integer tableSchemaId, Result downstream
}

@Override
@SuppressWarnings("CyclomaticComplexity")
public Result struct(Types.StructType struct, Integer tableSchemaId, List<Result> fields) {
if (tableSchemaId == null) {
return Result.SCHEMA_UPDATE_NEEDED;
Expand All @@ -88,10 +92,10 @@ public Result struct(Types.StructType struct, Integer tableSchemaId, List<Result
}

for (Types.NestedField tableField : tableSchemaType.asStructType().fields()) {
if (tableField.isRequired() && struct.field(tableField.name()) == null) {
if (struct.field(tableField.name()) == null
&& (tableField.isRequired() || dropUnusedColumns)) {
// If a field from the table schema does not exist in the input schema, then we won't visit
// it and check for required/optional compatibility. The only choice is to make the table
// field optional.
// it. The only choice is to make the table field optional or drop it.
return Result.SCHEMA_UPDATE_NEEDED;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public static class Builder<T> {
private ReadableConfig readableConfig = new Configuration();
private TableCreator tableCreator = TableCreator.DEFAULT;
private boolean immediateUpdate = false;
private boolean dropUnusedColumns = false;
private int cacheMaximumSize = 100;
private long cacheRefreshMs = 1_000;
private int inputSchemasPerTableCacheMaximumSize = 10;
Expand Down Expand Up @@ -314,6 +315,22 @@ public Builder<T> immediateTableUpdate(boolean newImmediateUpdate) {
return this;
}

/**
* Dropping columns is disabled by default to prevent issues with late or out-of-order data, as
* removed fields cannot be easily restored without data loss.
*
* <p>You can opt-in to allow dropping columns. Once a column has been dropped, it is
* technically still possible to write data to that column because Iceberg maintains all past
* table schemas. However, regular queries won't be able to reference the column. If the field
* was to re-appear as part of a new schema, an entirely new column would be added, which apart
* from the name, has nothing in common with the old column, i.e. queries for the new column
* will never return data of the old column.
*/
public Builder<T> dropUnusedColumns(boolean newDropUnusedColumns) {
this.dropUnusedColumns = newDropUnusedColumns;
return this;
}

/** Maximum size of the caches used in Dynamic Sink for table data and serializers. */
public Builder<T> cacheMaxSize(int maxSize) {
this.cacheMaximumSize = maxSize;
Expand Down Expand Up @@ -382,6 +399,7 @@ public DataStreamSink<DynamicRecordInternal> append() {
generator,
catalogLoader,
immediateUpdate,
dropUnusedColumns,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize,
Expand All @@ -400,6 +418,7 @@ public DataStreamSink<DynamicRecordInternal> append() {
.map(
new DynamicTableUpdateOperator(
catalogLoader,
dropUnusedColumns,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
private final DynamicRecordGenerator<T> generator;
private final CatalogLoader catalogLoader;
private final boolean immediateUpdate;
private final boolean dropUnusedColumns;
private final int cacheMaximumSize;
private final long cacheRefreshMs;
private final int inputSchemasPerTableCacheMaximumSize;
Expand All @@ -56,13 +57,15 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T, DynamicRecordInternal
DynamicRecordGenerator<T> generator,
CatalogLoader catalogLoader,
boolean immediateUpdate,
boolean dropUnusedColumns,
int cacheMaximumSize,
long cacheRefreshMs,
int inputSchemasPerTableCacheMaximumSize,
TableCreator tableCreator) {
this.generator = generator;
this.catalogLoader = catalogLoader;
this.immediateUpdate = immediateUpdate;
this.dropUnusedColumns = dropUnusedColumns;
this.cacheMaximumSize = cacheMaximumSize;
this.cacheRefreshMs = cacheRefreshMs;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
Expand All @@ -80,7 +83,7 @@ public void open(OpenContext openContext) throws Exception {
new HashKeyGenerator(
cacheMaximumSize, getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks());
if (immediateUpdate) {
updater = new TableUpdater(tableCache, catalog);
updater = new TableUpdater(tableCache, catalog, dropUnusedColumns);
} else {
updateStream =
new OutputTag<>(
Expand All @@ -106,7 +109,7 @@ public void collect(DynamicRecord data) {

TableMetadataCache.ResolvedSchemaInfo foundSchema =
exists
? tableCache.schema(data.tableIdentifier(), data.schema())
? tableCache.schema(data.tableIdentifier(), data.schema(), dropUnusedColumns)
: TableMetadataCache.NOT_FOUND;

PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(), data.spec()) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
class DynamicTableUpdateOperator
extends RichMapFunction<DynamicRecordInternal, DynamicRecordInternal> {
private final CatalogLoader catalogLoader;
private final boolean dropUnusedColumns;
private final int cacheMaximumSize;
private final long cacheRefreshMs;
private final int inputSchemasPerTableCacheMaximumSize;
Expand All @@ -47,11 +48,13 @@ class DynamicTableUpdateOperator

DynamicTableUpdateOperator(
CatalogLoader catalogLoader,
boolean dropUnusedColumns,
int cacheMaximumSize,
long cacheRefreshMs,
int inputSchemasPerTableCacheMaximumSize,
TableCreator tableCreator) {
this.catalogLoader = catalogLoader;
this.dropUnusedColumns = dropUnusedColumns;
this.cacheMaximumSize = cacheMaximumSize;
this.cacheRefreshMs = cacheRefreshMs;
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
Expand All @@ -66,7 +69,8 @@ public void open(OpenContext openContext) throws Exception {
new TableUpdater(
new TableMetadataCache(
catalog, cacheMaximumSize, cacheRefreshMs, inputSchemasPerTableCacheMaximumSize),
catalog);
catalog,
dropUnusedColumns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Visitor class that accumulates the set of changes needed to evolve an existing schema into the
Expand All @@ -36,30 +39,39 @@
* <li>Adding new columns
* <li>Widening the type of existing columsn
* <li>Reordering columns
* <li>Dropping columns (when dropUnusedColumns is enabled)
* </ul>
*
* We don't support:
*
* <ul>
* <li>Dropping columns
* <li>Renaming columns
* </ul>
*
* The reason is that dropping columns would create issues with late / out of order data. Once we
* drop fields, we wouldn't be able to easily add them back later without losing the associated
* data. Renaming columns is not supported because we compare schemas by name, which doesn't allow
* for renaming without additional hints.
* By default, any columns present in the table but absent from the input schema are marked as
* optional to prevent issues caused by late or out-of-order data. If dropUnusedColumns is enabled,
* these columns are removed instead to ensure a strict one-to-one schema alignment.
*/
public class EvolveSchemaVisitor extends SchemaWithPartnerVisitor<Integer, Boolean> {

private static final Logger LOG = LoggerFactory.getLogger(EvolveSchemaVisitor.class);
private final TableIdentifier identifier;
private final UpdateSchema api;
private final Schema existingSchema;
private final Schema targetSchema;

private EvolveSchemaVisitor(UpdateSchema api, Schema existingSchema, Schema targetSchema) {
private final boolean dropUnusedColumns;

private EvolveSchemaVisitor(
TableIdentifier identifier,
UpdateSchema api,
Schema existingSchema,
Schema targetSchema,
boolean dropUnusedColumns) {
this.identifier = identifier;
this.api = api;
this.existingSchema = existingSchema;
this.targetSchema = targetSchema;
this.dropUnusedColumns = dropUnusedColumns;
}

/**
Expand All @@ -70,12 +82,18 @@ private EvolveSchemaVisitor(UpdateSchema api, Schema existingSchema, Schema targ
* @param api an UpdateSchema for adding changes
* @param existingSchema an existing schema
* @param targetSchema a new schema to compare with the existing
* @param dropUnusedColumns whether to drop columns not present in target schema
*/
public static void visit(UpdateSchema api, Schema existingSchema, Schema targetSchema) {
public static void visit(
TableIdentifier identifier,
UpdateSchema api,
Schema existingSchema,
Schema targetSchema,
boolean dropUnusedColumns) {
visit(
targetSchema,
-1,
new EvolveSchemaVisitor(api, existingSchema, targetSchema),
new EvolveSchemaVisitor(identifier, api, existingSchema, targetSchema, dropUnusedColumns),
new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema));
}

Expand Down Expand Up @@ -103,11 +121,16 @@ public Boolean struct(Types.StructType struct, Integer partnerId, List<Boolean>
after = columnName;
}

// Ensure that unused fields are made optional
for (Types.NestedField existingField : partnerStruct.fields()) {
if (struct.field(existingField.name()) == null) {
if (existingField.isRequired()) {
this.api.makeColumnOptional(this.existingSchema.findColumnName(existingField.fieldId()));
String columnName = this.existingSchema.findColumnName(existingField.fieldId());
if (dropUnusedColumns) {
LOG.debug("{}: Dropping column: {}", identifier.name(), columnName);
this.api.deleteColumn(columnName);
} else {
if (existingField.isRequired()) {
this.api.makeColumnOptional(columnName);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ String branch(TableIdentifier identifier, String branch) {
return branch(identifier, branch, true);
}

ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input) {
return schema(identifier, input, true);
ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input, boolean dropUnusedColumns) {
return schema(identifier, input, true, dropUnusedColumns);
}

PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) {
Expand Down Expand Up @@ -124,7 +124,7 @@ private String branch(TableIdentifier identifier, String branch, boolean allowRe
}

private ResolvedSchemaInfo schema(
TableIdentifier identifier, Schema input, boolean allowRefresh) {
TableIdentifier identifier, Schema input, boolean allowRefresh, boolean dropUnusedColumns) {
CacheItem cached = tableCache.get(identifier);
Schema compatible = null;
if (cached != null && cached.tableExists) {
Expand All @@ -139,7 +139,7 @@ private ResolvedSchemaInfo schema(

for (Map.Entry<Integer, Schema> tableSchema : cached.tableSchemas.entrySet()) {
CompareSchemasVisitor.Result result =
CompareSchemasVisitor.visit(input, tableSchema.getValue(), true);
CompareSchemasVisitor.visit(input, tableSchema.getValue(), true, dropUnusedColumns);
if (result == CompareSchemasVisitor.Result.SAME) {
ResolvedSchemaInfo newResult =
new ResolvedSchemaInfo(
Expand All @@ -157,7 +157,7 @@ private ResolvedSchemaInfo schema(

if (needsRefresh(cached, allowRefresh)) {
refreshTable(identifier);
return schema(identifier, input, false);
return schema(identifier, input, false, dropUnusedColumns);
} else if (compatible != null) {
ResolvedSchemaInfo newResult =
new ResolvedSchemaInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ class TableUpdater {
private static final Logger LOG = LoggerFactory.getLogger(TableUpdater.class);
private final TableMetadataCache cache;
private final Catalog catalog;
private final boolean dropUnusedColumns;

TableUpdater(TableMetadataCache cache, Catalog catalog) {
TableUpdater(TableMetadataCache cache, Catalog catalog, boolean dropUnusedColumns) {
this.cache = cache;
this.catalog = catalog;
this.dropUnusedColumns = dropUnusedColumns;
}

/**
Expand Down Expand Up @@ -118,13 +120,15 @@ private void findOrCreateBranch(TableIdentifier identifier, String branch) {

private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema(
TableIdentifier identifier, Schema schema) {
TableMetadataCache.ResolvedSchemaInfo fromCache = cache.schema(identifier, schema);
TableMetadataCache.ResolvedSchemaInfo fromCache =
cache.schema(identifier, schema, dropUnusedColumns);
if (fromCache.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
return fromCache;
} else {
Table table = catalog.loadTable(identifier);
Schema tableSchema = table.schema();
CompareSchemasVisitor.Result result = CompareSchemasVisitor.visit(schema, tableSchema, true);
CompareSchemasVisitor.Result result =
CompareSchemasVisitor.visit(schema, tableSchema, true, dropUnusedColumns);
switch (result) {
case SAME:
cache.update(identifier, table);
Expand All @@ -141,19 +145,20 @@ private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema(
LOG.info(
"Triggering schema update for table {} {} to {}", identifier, tableSchema, schema);
UpdateSchema updateApi = table.updateSchema();
EvolveSchemaVisitor.visit(updateApi, tableSchema, schema);
EvolveSchemaVisitor.visit(identifier, updateApi, tableSchema, schema, dropUnusedColumns);

try {
updateApi.commit();
cache.update(identifier, table);
TableMetadataCache.ResolvedSchemaInfo comparisonAfterMigration =
cache.schema(identifier, schema);
cache.schema(identifier, schema, dropUnusedColumns);
Schema newSchema = comparisonAfterMigration.resolvedTableSchema();
LOG.info("Table {} schema updated from {} to {}", identifier, tableSchema, newSchema);
return comparisonAfterMigration;
} catch (CommitFailedException e) {
cache.invalidate(identifier);
TableMetadataCache.ResolvedSchemaInfo newSchema = cache.schema(identifier, schema);
TableMetadataCache.ResolvedSchemaInfo newSchema =
cache.schema(identifier, schema, dropUnusedColumns);
if (newSchema.compareResult() != CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
LOG.debug("Table {} schema updated concurrently to {}", identifier, schema);
return newSchema;
Expand Down
Loading