-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: Add support for Flink 2.1.0 #13714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
Show resolved
Hide resolved
.../flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
Show resolved
Hide resolved
|
Do we want to wait a bit for this PR? Maybe we should be a bit more slow as we having a major Flink version upgrade here. |
|
agree with Peter. Let's not merge this until Iceberg 1.10 is released for the supported versions of 1.19, 1.20, 2.0 |
|
Fully agree. It makes sense to defer merging the PR until the Iceberg 1.10.0 release branch has been cut. We already added Flink 2.0 and removed Flink 1.18 for the upcoming release. |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This will be merged once Iceberg 1.10.0 has been released. |
|
Rebased against the latest master after the 1.10.0 release branch has been cut. |
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
Show resolved
Hide resolved
pvary
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 pending tests
| } | ||
|
|
||
| @Override | ||
| public Variant getVariant(int pos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great that Flink 2.1 supports variant.
As a separate follow-up, can we add a unit test to cover the variant type support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will do!
|
|
||
| @Override | ||
| public void prepareSnapshotPreBarrier(long checkpointId) throws IOException { | ||
| if (checkpointId == lastCheckpointId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change required due to 2.1 behavior change? if not, it might be better to separate it out so that people are easier to see this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this change is required for Flink 2.1. The runtime logic changed. finish() no longer can use Long.MAX_VALUE as the checkpoint id. In finish() we call: prepareSnapshotPreBarrier(Long.MAX_VALUE). The checkpoint id needs to be valid, which is why we now use prepareSnapshotPreBarrier(lastCheckpointId + 1). We restore the checkpoint id on restore.
When there is a checkpoint triggered before finish() gets called by the runtime, which can happen on shutdown, prepareSnapshotPreBarrier will already be called. We want to avoid sending out the WriteResults twice. We don't even have the WriteResults available anymore after the first call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about batch execution? I think we added the finish mainly for batch execution. Now, batch execution can commit to Iceberg with checkpointId as 1, which is a valid checkpoint id. I don't see a correctness problem with that. but it does feels weird when inspecting the table snapshot summary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prepareSnapshotPreBarrier is necessary for flushing any records which should be part of the next snapshot. The method is called before the barrier is sent downstream, which ensures the records are processed before the snapshot is being taken. If we did not sent those records downstream, they must be saved as part of the checkpointed state and processed after the checkpoint finishes.
In the context of the Iceberg sink, flushing state in prepareSnapshotPreBarrier means that the data files written will be committed to the Iceberg table as part of the Flink checkpoint. If we did not do that, those files would be part of the next snapshot which would mean that the table changes aren't visible until the next Flink checkpoint.
what about batch execution? I think we added the prepareSnapshotPreBarrier mainly for batch execution. Now, batch execution can commit to Iceberg with checkpointId as 1, which is a valid checkpoint id. I don't see a correctness problem with that. but it does feels weird when inspecting the table snapshot summary.
It is useful also in streaming mode, to make the table changes visible faster, but it is 100% required for batch to ensure all data gets committed to the table on shutdown. Showing checkpoint id 1 might surprise users who normally saw Long.MAX_VALUE, but I don't see a big issue with that, since it reflects what's actually happening in Flink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, there is a typo in my previous comment. I meant finish method was added mainly for the batch execution. I am wondering if we can call prepareSnapshotPreBarrier (Long.MAX_VALUE) in the finish method only for batch execution. For streaming execution, it is not necessarily to flush in the finish method. Streaming execution only flushes during checkpoint prepareSnapshotPreBarrier.
Showing checkpoint id 1 might surprise users who normally saw Long.MAX_VALUE, but I don't see a big issue with that, since it reflects what's actually happening in Flink.
It's a behavior change. we will need to at least call out in the release note.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what we can do, is to write Long.MAX_VALUE to the manifest for batch pipelines and thereby keep the current behavior. The runtime can still use a different checkpoint id for batch.
It is les about the temp/staging manifest file that the aggregator wrote. It is more about the checkpointId put in the snapshot summary, which will still be 1 for batch execution.
I wouldn't want to have a different checkpointId for the staging manifest file name, which is how the IcebergWriteAggregator#writeToManifest actually uses the checkpointId value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am looking at the linked PR
https://github.com/apache/flink/pull/26433/files
It seems to me that it is incorrect for the CommitterOperator to validate the checkpoint id for the endInput method.
As the code comment suggested, all committables should be committed here.
// There will be no final checkpoint, all committables should be committed here
It should have called
commitAndEmitCheckpoints(Long.MAX_VALUE)
If the IcebergWriteAggregator.lastCheckpointId has an initial value larger than 0 for batch execution, this PR would fail. Flink CommitterOperator would only allows checkpointId 1 to commit with its current logic after the Flink PR 26433.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if users have picked up the Flink change in versions like 1.19, 1.20, 2.0, users can have silent data loss (incorrectly skipped committables) if using v2 sink for the batch execution mode or streaming execution with checkpoint disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right that the patch releases (Flink 1.19.3, 1.20.2, 2.0.1) should not have this behavioral change.
I've created a thread on the mailing list: https://lists.apache.org/thread/ymsgmrf7mmw3qkwtp34b32fc9nhx9oy0 and an associated JIRA: https://issues.apache.org/jira/browse/FLINK-38370
The crux is that finish() gets called in both batch and streaming pipelines. Streaming pipelines theoretically are affected just as batch pipelines, but in practice they are not because they have full checkpoint support. In the tests, we use a special bounded source for streaming which triggers a checkpoint before shutting down, which will always ensure that the checkpoint goes through before finish() gets called. Any subsequent checkpoints in streaming, which still happen even when operators are finished, do not flush anything, so the issue is not apparent. This behavior is fine in streaming because a proper shutdown of a streaming job via stop-with-savepoint would also ensure a final checkpoint gets created before the finish() method gets called. A simple cancel without a savepoint would discard the state written after the last checkpoint, which is expected. So the issue is batch-only.
For Flink 2.1.0, I think the changed behavior is fine. At this point, we're mainly concerned about a checkpoint id of 1 for batch pipelines. It may look unfamiliar, but I think it makes perfect sense. There is exactly one checkpoint for batch pipelines. I would suggest to go ahead and merge this PR because it is unlikely we will be able to change the Flink behavior for Flink 2.1.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked to Max offline. We will merge this PR for now. Once Flink 2.1.1 is released with the fix, we can update the checkpoint id code the batch exeuction
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
Outdated
Show resolved
Hide resolved
ae9e952 to
d9605b8
Compare
singhpk234
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the change @mxm
If you get some time, can you please also add an entry here https://github.com/apache/iceberg/blob/main/site/docs/multi-engine-support.md#apache-flink
|
Thanks for the reminder @singhpk234! The issue with the supported versions page is that it's not versioned like our docs. It goes live immediately after merging. I've tried to reflect the state as best as I can here: #14152 |
Flink 2.1.0 has been released: https://flink.apache.org/2025/07/31/apache-flink-2.1.0-ushers-in-a-new-era-of-unified-real-time-data--ai-with-comprehensive-upgrades/
Most notably, a Variant data type has been added. I have not added support for the Variant data type yet. I think it is best to do that separately. For now, I've ported the existing code to Flink 2.1.0 with the biggest change being that Flink now longer allows using
Long.MAX_VALUEas the checkpoint id when flushing state on shutdown viaprepareSnapshotBarrier(checkpointId). The checkpoint id needs to be increasing by 1 and any messages for higher checkpoint ids are going to be ignored. The correct checkpoint id needs to be restored from state when the job is recovered.The following commits have been added to preserve the Git history as best as possible, similarly to how previous Flink upgrades have been done. Since we manage a maximum of three releases, I'm removing support for Flink 1.19.