-
Notifications
You must be signed in to change notification settings - Fork 3k
Flink: Dynamic Sink: Add support for dropping columns #14728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Before I go into the details of the PR, could you please help me understand what to expect in the interim period, when the sink receives records with and without the dropped column? |
New records will write with the new schema with any columns not part of the input schema dropped. Old records will continue to write with the old schema, which still exists. If there are any previously unseen schemas which include removed columns, those columns will be re-added as new columns. This is a catch which users will have to accept. That's why the feature is opt-in and disabled by default. |
|
Essentially, there’s a race condition between adding and dropping columns. For example, if a user does the following:
If these actions occur within a short time frame and the streams are skewed, the table could end up with either:
Afterward, querying the table with the “old” schema becomes difficult. Additionally, users cannot revert the table to any previously created schema using DynamicSink. This behavior is consistent with the current implementation, but with column-dropping support, users might expect this capability. @Guosmilesmile: Would these restrictions impact your use cases? |
|
IMHO this is fine if the user opts in. We deliberately chose not to allow dropping columns because of this race condition; it's important that this remains the default setting. I agree that we should add more documentation around the semantics of removing columns. |
|
|
Cool! Seems like an ok feature. |
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java
Outdated
Show resolved
Hide resolved
|
Rebased to fix merge conflicts in CI after #14406. |
This commit adds support for strict 1:1 schema synchronization by allowing columns to be dropped from the table when they are not present in the input schema. This is controlled via a new dropUnusedColumns parameter in DynamicIcebergSink. The default behavior (dropUnusedColumns=false) remains unchanged.
.../v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
Show resolved
Hide resolved
|
Please update the documentation, and highlight the caveats, like what happens when the column is added back with the same name, and what happens when multiple schema changes happen simultaneously |
| * By default, columns that exist in the table but not in the input schema are made optional to | ||
| * avoid issues with late/out-of-order data. When dropUnusedColumns is enabled, these columns are | ||
| * dropped instead for 1:1 schema syncing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe something like this:
By default, any columns present in the table but absent from the input schema are made as optional to prevent issues caused by late or out-of-order data. If dropUnusedColumns is enabled, these columns are removed instead to ensure a strict one-to-one schema alignment.
|
Merged to main. |
* Backport: Flink: Dynamic Sink: Add support for dropping columns (#14728)
This commit adds support for strict 1:1 schema synchronization by allowing columns to be dropped from the table when they are not present in the input schema. This is controlled via a new dropUnusedColumns parameter in DynamicIcebergSink.
The default behavior (dropUnusedColumns=false) remains unchanged.