-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-18934] Idle stream does not advance watermark in connected stream #15771
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 77caa97 (Sat Aug 28 12:12:16 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
...ng-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
Outdated
Show resolved
Hide resolved
...ng-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
Outdated
Show resolved
Hide resolved
61748f5 to
b979ed5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your contribution. The code definitively feels much cleaner with the additional abstraction of CombinedWatermark.
However, I'd probably not expose the inner class at all and get rid of the List ancestor. You now have a weird data structure that is unnecessarily mutable in many ways.
I'm also not sure if the tests are sufficient: I'm missing tests that explicitly test the behavior of 2- and N-ary operators. Most tests have been traditionally on task level but that is probably not the best place anymore.
Please add ticket and component to all commits (none of them are hotfixes).
flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermark.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermark.java
Outdated
Show resolved
Hide resolved
flink-core/src/main/java/org/apache/flink/api/common/eventtime/CombinedWatermark.java
Show resolved
Hide resolved
| combinedWatermark.add(new CombinedWatermark.PartialWatermark()); | ||
| combinedWatermark.add(new CombinedWatermark.PartialWatermark()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't that depend on the number of inputs? Can't we create that in ctor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add an additional ctor. (I was actually thinking about it) However, we still need the add method for the WatermarkOutputMultiplexer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, haven't thought about the dynamic cases (actually Kafka is not really dynamic but I can see that other sources are). Still the comment stands that I wouldn't expose PartialWatermark.
You could have a similar method to register in CombinedWatermark that returns the index of the newly added partial. The index is then used in private final Map<String, Integer> watermarkIndexPerOutputId; This index can then be used to update the CombinedWatermark via index.
This approach makes more sense if you can easily have all side-effects inside the CombinedWatermark#updateWatermark(int index, Watermark watermark). So it avoids the case, where you update the partial and then tell the combined watermark: "hey something changed, please check".
However, the current way may still be valid, if you explicitly want to have the information drift and only update at specific times for performance reasons. I'm currently not in the position to assess that properly.
Note that I don't like the current way because you don't have proper invariants (and I like having them) because of the information drift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there are two things here. My comment in this thread referred just to the add method, not exposing the PartialWatermark. Sure, if the issue was just about the add method, I would not expose the PartialWatermark.
The second problem is the combination of PartialWatermark and updateCombinedWatermark. Actually, you wrote the exact reason why we need it:
However, the current way may still be valid if you explicitly want to have the information drift and only update at specific times for performance reasons. I'm currently not in the position to assess that properly.
That's exactly what happens in the WatermarkOutputMultiplexer. It exposes two access patterns:
- getImmediateOutput
- getDeferredOutput
One immediately updates the combined watermark, and the other one does not. AFAIK the deferred one is used for periodic watermarks. Bear in mind that the PeriodicWatermark has the default scope. It is exposed only to the org.apache.flink.api.common.eventtime package for it to be used by WatermarkOutputMultiplexer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I'd still think that you could also hide that within #updateWatermark(..., boolean deferred) but the benefit of encapsulation gets even smaller (if it even still exists). So leave as is.
...ming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
Outdated
Show resolved
Hide resolved
...ng-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
Outdated
Show resolved
Hide resolved
...ng-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
Outdated
Show resolved
Hide resolved
...eaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
Show resolved
Hide resolved
|
|
||
| private <X> void pushToRecordWriter(StreamRecord<X> record) { | ||
| serializationDelegate.setInstance(record); | ||
| if (announcedStatus.isIdle()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to add a comment here to explain why this can happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is good in respect to explaining the invariant (you could also give a link to the docs). However, it's not helping the reader to understand why this case is happening.
...k-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
Outdated
Show resolved
Hide resolved
9aaebb2 to
37dcf10
Compare
|
I tried addressing your comments. I am still not 100% sure about the short ACTIVE/IDLE cycle or should we rather let records be generated but halt watermarks forwarding. Do you mind taking another look @AHeise ? |
| // StreamStatus.IDLE requires that no records nor watermarks travel through the branch | ||
| // in order to keep the older behaviour that records could've been generated down the | ||
| // pipeline even though the sources were idle we go through a short ACTIVE/IDLE loop | ||
| if (announcedStatus.isIdle()) { | ||
| writeStreamStatus(StreamStatus.ACTIVE); | ||
| } | ||
|
|
||
| serializationDelegate.setInstance(record); | ||
| try { | ||
| recordWriter.emit(serializationDelegate); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e.getMessage(), e); | ||
| } | ||
|
|
||
| if (announcedStatus.isIdle()) { | ||
| writeStreamStatus(StreamStatus.IDLE); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we factor out some reusable pattern?
try (announcedStatus.ensureActive(this::writeStreamStatus)) {
serializationDelegate.setInstance(record);
try {
recordWriter.emit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice one! Will do.
Looks already quite good. As discussed offline, I think that the ACTIVE/IDLE cycle is a good start and we should just optimize some operators to make them cycling in batches (asyncIO). To that end, could you try to factor out some small reusable pattern? I gave an idea on the respective lines. Other comments:
|
AHeise
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM now ;)
…ected stream Watermark in the two and multi input operators is computed in operators. So far operators were unaware of the StreamStatus, therefore even if a whole input was IDLE it could still block increasing the Watermark. This commit makes operators aware of the StreamStatus. The contract of the StreamStatus is that if a stream is IDLE it should not emit records nor watermarks.
…ider The StreamStatus traverses the whole DAG and it's state should be kept on the operator level. Given those assumptions the maintainer & provider are no longer necessary.
What is the purpose of the change
Make two and multi-input operators respect stream idleness.
Brief change log
CombinedWatermarkclassStreamStatusMaintainerandStreamStatusProviderinterfacesTwo open questions:
RecordWriterOutputor do we want to find a better place for it.Verifying this change
Adjusted tests that were verifying the idleness, e.g.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation