-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: Refactor WriteResult aggregation in DynamicIcebergSink #14810
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
428def3 to
2a3c522
Compare
| We currently keep a List of commit requests per checkpoint instead of a single CommitRequest<DynamicCommittable> | ||
| to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream | ||
| DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's talk about @mxm and @Guosmilesmile about the timeline here.
It would be good to have a specific version mentioned here about the removal of this extra list. When could we say that the state will not contain any old DynamicCommittable objects, and how could the users ensure that everything is ready for an upgrade.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the following timeline for the upgrade and the state migration:
- We release this fix in the next Iceberg patch version - 1.10.1. It allows users to pick up the new change quickly and migrate their state to a single committable per table/branch/checkpoint.
- We remove the support of lists in the next minor release: 1.11.0.
This timeframe gives users enough time for a smooth migration. I updated the comment to reflect this.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the old Sink we had the following guarantees:
- You were able to upgrade between Iceberg minor versions without any issue
- Flink version upgrade needed an application stop-start. In this case the committable state was cleaned (notifyCheckpointComplete was run), and these incompatibilities were not an issue.
With the new SinkV2 API, I'm not sure we have any guarantees for cleaning up the committables from the state, but if this is so, then we might need to keep these things for a longer period.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary We can keep lists of committables for a longer period. Removing them from DynamicCommitter is a separate step, independent from this change. The current change is fully backward compatible.
Although I'd prefer to remove lists of committables early, in the next minor release, 1.11, because it creates unnecessary complications and pollutes the code, making it harder to reason about.
Additionally, we will still provide users with a smooth migration path. Users can upgrade to patch version 1.10.1 to resolve their current state and then upgrade to the latest minor version: 1.11+. We can always detect multiple committables in the DynamicCommitter state and ask users to upgrade to the previous patch version first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, that we can discuss the removal separately. The comment contained the timeline, that is why I asked your thoughts here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it strictly necessary to merge this change to 1.10.1? The original bug which motivated this change has already been fixed in 1.10.1. The changes here are great, but I think they can wait until 1.11.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Releasing this change in the next patch release (eg 1.10.1) would allow us to address the following points:
- Fix Flink committable metrics: currently
flink.taskmanager.job.task.operator.successfulCommittablesand similar report incorrect values due to multiple committables coming into the committer, but only the aggregated one getting committed. - Avoid writing extra manifests: sink would only write one temporary manifest per table, branch, partition spec, while currently, it writes a new temporary manifest for each unique WriteTarget (table, branch, schemaId, specId, upsertMode, equalityFields).
It would also allow us to simplify the code a lot, as we could remove lists in the next minor release (1.11).
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
Outdated
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
Outdated
Show resolved
Hide resolved
.../flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultSerializer.java
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java
Show resolved
Hide resolved
2a3c522 to
a56fb66
Compare
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/WriteTarget.java
Show resolved
Hide resolved
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
Outdated
Show resolved
Hide resolved
.../flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
Outdated
Show resolved
Hide resolved
a56fb66 to
2925a0e
Compare
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
Show resolved
Hide resolved
47f7a8b to
e9276da
Compare
flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableKey.java
Outdated
Show resolved
Hide resolved
.../flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommittableSerializer.java
Outdated
Show resolved
Hide resolved
| We currently keep a List of commit requests per checkpoint instead of a single CommitRequest<DynamicCommittable> | ||
| to process the Flink state from previous releases, which had multiple commit requests due to a bug in the upstream | ||
| DynamicWriteResultAggregator. We should replace this with a single commit request in the next major release. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it strictly necessary to merge this change to 1.10.1? The original bug which motivated this change has already been fixed in 1.10.1. The changes here are great, but I think they can wait until 1.11.0.
17ed528 to
20c4f25
Compare
DynamicWriteResultAggregator currently produces multiple committables per (table, branch, checkpoint), which get aggregated in the downstream committer. Refactor the commit aggregator to output only one committable per triplet. Clean up DynamicCommitter to remove assumptions of multiple commit requests per table, branch, and checkpoint. This requires serializing the aggregated WriteResult using multiple temporary manifest files for each unique partition spec because the Iceberg manifest writer requires a single spec per manifest file. We can improve this later by refactoring serialization in the following changes. Change-Id: I6d96c376ad9f3f04f864aef05966d5d0862ef051
20c4f25 to
bfae861
Compare
This PR re-opens #14312.