-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Flink: Ensure DynamicCommitter idempotence in the presence of failures #14182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Previously, the DynamicCommitter could commit duplicate WriteResults when recovering from failures, leading to incorrect data in tables. This change introduces tracking of the maximum committed WriteResult index per checkpoint to ensure idempotent behavior during recovery scenarios. Key changes: - Added MAX_WRITE_RESULT_INDEX snapshot property to track committed WriteResults - Modified commit logic to skip already committed WriteResults within a checkpoint - Optimized atomic commits by batching append-only WriteResults into single transactions - Updated tests to verify idempotent behavior with simulated failures
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
Outdated
Show resolved
Hide resolved
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java
Outdated
Show resolved
Hide resolved
|
I've revised the approach here quite a bit which now is also reflected in the PR description. |
|
@mxm Even with this change, the data loss will still occur for WriteResults with delete files in the scenario described in #14090. For example, consider the case when the private void commitDeltaTxn(...) {
// ...
RowDelta rowDelta = null;
long checkpointId = -1;
for (Map.Entry<Long, List<WriteResult>> e : pendingResults.entrySet()) {
// ...
if (rowDelta != null
&& writeResults.stream().anyMatch(writeResult -> writeResult.deleteFiles().length > 0)) {
// ...
// aiborodin: The data loss occurs if we fail after the first iteration.
// Flink will re-attempt all dynamic committables from the last checkpoint and skip the
// remaining committables on line 146 because we already committed the first committable
// with the current checkpoint ID
commitOperation(
table, branch, rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, checkpointId);
rowDelta = null;
}The complete solution is to aggregate all WriteResults for a (checkpoint, table, branch) triplet, which I implemented in #14092 in More generally, I think Combining WriteResults across checkpoints for appends is a different story. It is valid to do this in DynamicCommitter because it is the only logical place in the code that has the context across multiple checkpoints, while I'm happy to discuss this online in Slack or over a Zoom call to clarify this. @pvary, would you be interested in joining as well? |
I have to politely disagree with you here. We commit in two cases:
Since the smallest unit at which we do a table snapshot is per Flink checkpoint, we will always be able to recover the commit state by looking up the highest committed checkpoint id from the table summary which is kept per table/branch.
This is precisely what this change does. There is an additional optimization to also optimize multiple commits. I think this makes the code hard to review. I'm going to revert this change and only commit each checkpoint. This makes the code easier to reason about. Also, the situation where we have WriteResults from multiple Flink checkpoints is very rare to occur.
For the sake of simplicity, I will revert the change to combine WriteResults from multiple Flink checkpoints. |
… combine append-only WriteResults from multiple checkpoints
|
@aiborodin I've pushed the simplification. We can also discuss on Slack if there are still open questions. |
2cc812c to
e67c082
Compare
| for (List<WriteResult> writeResults : pendingResults.values()) { | ||
| for (WriteResult result : writeResults) { | ||
| Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile); | ||
| commitOperation( | ||
| table, | ||
| branch, | ||
| dynamicOverwrite, | ||
| summary, | ||
| "dynamic partition overwrite", | ||
| newFlinkJobId, | ||
| operatorId, | ||
| e.getKey()); | ||
| } | ||
| } | ||
|
|
||
| commitOperation( | ||
| table, | ||
| branch, | ||
| dynamicOverwrite, | ||
| summary, | ||
| "dynamic partition overwrite", | ||
| newFlinkJobId, | ||
| operatorId, | ||
| pendingResults.lastKey()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be:
for (List<WriteResult> writeResults : pendingResults.values()) {
ReplacePartitions dynamicOverwrite = table.newReplacePartitions().scanManifestsWith(workerPool);
for (WriteResult result : writeResults) {
Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
}
commitOperation(
table,
branch,
dynamicOverwrite,
summary,
"dynamic partition overwrite",
newFlinkJobId,
operatorId,
pendingResults.lastKey());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to commit the checkpoints one-by-one. What if the replace happened for the same partition? With the proposed method we will have duplicated data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be, but IMHO correctness isn't affected. This is a left-over from the previous commit where we would still combine as many WriteResults as possible into a single table snapshot. Since replace partitions is append-only, I figured we could keep this optimization. However, for the sake of consistency with non-replacing writes, we could also go with your suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, then the overwrite-mode only should be enabled in batch jobs, as it is very hard to make any claims about the results in a streaming job.
Also, with the current implementation it is also problematic, as we could have multiple data files for a given partition, and then, they will replace each other, and only that last one wins 😢
Also, if the checkpoints are turned on, then we will have a same issue as mentioned above, just with a bit bigger amount of data. The 2nd checkpoint might delete data from the 1st checkpoint, because it is replacing the same partition.
So this means that replace partitions is only working if the checkpointing is turned off (or you are lucky 😄)
So essentially, it doesn't matter which solution we choose 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's probably a topic for another day, as both the IcebergSink and older FlinkSink have this issue. The implementation in this PR is consistent though with how the IcebergSink works, see
iceberg/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
Line 224 in e67c082
| for (WriteResult result : pendingResults.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline. We can keep your proposed solution here, and we could file another PR to throw an exception/or log an error if checkpointing is on and overwrite flag is used
@aiborodin: This seems like a reasonable optimization for me. OTOH the correctness issue could be solved with a lightweight change (don't commit every WriteResult one-by-one, but first aggregate on the Committer side). This could go in to 1.10.1, and then the optimization you have suggested could go in to 1.11.0. WDYT? |
@pvary I am okay with that. I also considered aggregating WriteResults in the DynamicCommitter, which is what this PR ultimately achieves — but decided in favour of the approach in #14092, which makes more sense architecturally. I would appreciate it if we could continue reviewing #14092 to get it in as well.
@pvary @mxm It seems there is some communication (and decisions being made) outside of GitHub. I want to be part of those discussions and contribute to them, especially given that |
|
Merged to main. |
We don't decide things offline. We always try to bring back the result of the discussions to dev list or to the github PR so others can raise their voice too. We do the "offline" discussions on Slack. I will try to reach out to you there too. |
|
@aiborodin: Do you have a slack user? Are you part of the Iceberg Slack community? |
…the presence of failures
…esence of failures (#14213)
…the presence of failures (apache#14213)
@pvary I am not, how can I join? |
|
…the presence of failures (apache#14213)
apache#14182) (cherry picked from commit 3860284)
…the presence of failures (apache#14213) (cherry picked from commit 441597e)
…e presence of failures (#14461) * Flink: Ensure DynamicCommitter Idempotence in the presence of failures (#14182) (cherry picked from commit 3860284) * Flink: Backport #14182: Ensure DynamicCommitter Idempotence in the presence of failures (#14213) (cherry picked from commit 441597e) --------- Co-authored-by: Maximilian Michels <[email protected]>
Previously, the DynamicCommitter could commit duplicate WriteResults when recovering from failures, leading to incorrect data in tables. This change gets rid of the unnecessary commits for each WriteResult, which caused the problems in the first place, and instead commits all the WriteResults for each Flink checkpoint / table / branch together.
The WriteResults for each checkpoint / table / branch can safely be committed together, even in the presence of delete files. Since we now commit once per Flink checkpoint and table / branch, we can use the already existing max Flink checkpoint id in the snapshot table properties for the table / branch to detect whether we have already committed.
We intentionally chose not to combine WriteResults across Flink checkpoints, which would be possible when a checkpoint only contains append-only data. While technically feasible, it is a premature optimization which complicates the implementation and maintainability for solving a very rare edge case of committing multiple pending checkpoints. Multiple checkpoints can only occur with a very low checkpoint interval or when concurrent checkpointing is enabled. Even in such a situation, it is preferable to keep the commits separately, as this makes it easier to reason about the applied changes.