Skip to content

Conversation

@AHeise
Copy link
Contributor

@AHeise AHeise commented Apr 9, 2025

What is the purpose of the change

So far, we used a special value for the final checkpoint on endInput. However, as shown in the description of this ticket, final doesn't mean final. Hence, multiple committables with EOI could be created at different times.

With this commit, we stop using a special value for such committables and instead try to guess the checkpoint id of the next checkpoint. There are various factors that influence the checkpoint id but we can mostly ignore them all because we just need to pick a checkpoint id that is

  • higher than all checkpoint ids of the previous, successful checkpoints of this attempt
  • higher than the checkpoint id of the restored checkpoint
  • lower than any future checkpoint id.

Hence, we just remember the last observed checkpoint id (initialized with max(0, restored id)), and use last id + 1 for endInput. Naturally, multiple endInput calls happening through restarts will result in unique checkpoint ids. Note that aborted checkpoints before endInput may result in diverged checkpoint ids across subtasks. However, each of the id satisfies above requirements and any id of endInput1 will be smaller than any id of endInput2. Thus, diverged checkpoint ids will not impact correctness at all.

Brief change log

  • Clarify contract of endInput
  • Infer checkpoint id on endInput

Verifying this change

Covered by existing tests. No new tests since it removes special case handling.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@AHeise AHeise force-pushed the FLINK-37605-fix-eoi-sink branch from a2cf486 to e54f829 Compare April 9, 2025 13:58
@flinkbot
Copy link
Collaborator

flinkbot commented Apr 9, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

long completedCheckpointId = endInput ? EOI : lastCompletedCheckpointId;
private void commitAndEmitCheckpoints(long checkpointId)
throws IOException, InterruptedException {
lastCompletedCheckpointId = checkpointId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably a basic question, but shouldn't we update the lastCompletedCheckpointId variable after we have completed the checkpoint, which I assume happens in the subsequent for loop? I was expecting the lastCompletedCheckpointId to be updated after the checkpointing loop in case there was an error during the checkpointing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, transient state is lost on error. So whether we update before or after the loop doesn't matter because the exception will lead to a fail-over and everything is recalculated on recovery. Since everything is called from the main task thread (mailbox thread), there is no interleaving possible of this call and another call like endInput.

Now in this specific case, lastCompletedCheckpointId refers to the completed checkpoint id of Flink as a whole. Since this value is primarily set through notifyCheckpointCompleted, the checkpoint is already completed before the start of the method. So I'd like to keep it as the first statement because it's easier to read than if it's done at the end of the method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation @AHeise - that makes sense

@AHeise AHeise force-pushed the FLINK-37605-fix-eoi-sink branch from e54f829 to 023b6b8 Compare April 13, 2025 09:22
Copy link
Contributor

@fapaul fapaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed the new method to infer the checkpoint id offline and it seems solid (at least less brittle than using the special EOI marker).

I didn't fully understand why the refactoring was needed for this PR but I'll leave that up to you.

}

public void setRestoredCheckpointId(long restoredCheckpointId) {
this.restoredCheckpointId = restoredCheckpointId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks unrelated to this commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll move the last commit.

AHeise added 3 commits April 14, 2025 14:12
With the removal of SinkV1, all adapter tests have also been testing V2. We can remove the adapter tests and simplify test hierarchy.
Remove factory methods and InspectableSink because we don't need the abstraction anymore. Make test setup and assertions more explicit by using sink builder directly in tests.

Remove unused methods.
So far, we used a special value for the final checkpoint on endInput. However, as shown in the description of this ticket, final doesn't mean final. Hence, multiple committables with EOI could be created at different times.

With this commit, we stop using a special value for such committables and instead try to guess the checkpoint id of the next checkpoint. There are various factors that influence the checkpoint id but we can mostly ignore them all because we just need to pick a checkpoint id that is
- higher than all checkpoint ids of the previous, successful checkpoints of this attempt
- higher than the checkpoint id of the restored checkpoint
- lower than any future checkpoint id.

Hence, we just remember the last observed checkpoint id (initialized with max(0, restored id)), and use last id + 1 for endInput. Naturally, multiple endInput calls happening through restarts will result in unique checkpoint ids. Note that aborted checkpoints before endInput may result in diverged checkpoint ids across subtasks. However, each of the id satisfies above requirements and any id of endInput1 will be smaller than any id of endInput2. Thus, diverged checkpoint ids will not impact correctness at all.
@AHeise AHeise force-pushed the FLINK-37605-fix-eoi-sink branch from 023b6b8 to 941e510 Compare April 14, 2025 12:31
@AHeise AHeise merged commit 9302545 into apache:master Apr 14, 2025
mxm added a commit to mxm/flink that referenced this pull request Sep 18, 2025
…es in batch mode

In apache#26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
mxm added a commit that referenced this pull request Sep 18, 2025
…es in batch mode

In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
mxm added a commit to mxm/flink that referenced this pull request Sep 19, 2025
…es in batch mode

In apache#26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
mxm added a commit to mxm/flink that referenced this pull request Sep 19, 2025
…es in batch mode

In apache#26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
mxm added a commit to mxm/flink that referenced this pull request Sep 19, 2025
…es in batch mode

In apache#26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
mxm added a commit to mxm/flink that referenced this pull request Sep 19, 2025
…es in batch mode

In apache#26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.

Revert "[FLINK-38370] Ensure CommitterOperator commits all pending committables in batch mode"

This reverts commit 00cf26396763c89c38bb52c87e5e214cfad7cfbc.
mxm added a commit to mxm/flink that referenced this pull request Sep 19, 2025
…es in batch mode

In apache#26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
mxm added a commit to mxm/flink that referenced this pull request Sep 19, 2025
…es in batch mode

In apache#26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
mxm added a commit that referenced this pull request Sep 19, 2025
…ing committables in batch mode (#27013)

In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
mxm added a commit that referenced this pull request Sep 19, 2025
…ng committables in batch mode (#27014)

In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
mxm added a commit that referenced this pull request Sep 22, 2025
…es in batch mode (#27016)

In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
mxm added a commit that referenced this pull request Sep 22, 2025
…es in batch mode (#27015)

In #26433, we removed the EOI marker in the form of Long.MAX_VALUE as the checkpoint id. Since
streaming pipelines can continue to checkpoint even after their respective operators have been shut
down, it is not safe to use a constant as this can lead to duplicate commits.

However, in batch pipelines we only have one commit on job shutdown. Using any checkpoint id should
suffice in this scenario. Any pending committables should be processed by the ComitterOperator when
the operator shuts down. No further checkpoints will take place.

There are various connectors which rely on this behavior. I don't see any drawbacks from keeping
this behavior for batch pipelines.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants