-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][client] Fix multi-topics consumer could receive old messages after seek #21945
[fix][client] Fix multi-topics consumer could receive old messages after seek #21945
Conversation
This issue should also exist for other |
1e7fa57
to
47237a5
Compare
47237a5
to
2b2a6a8
Compare
### Motivation See apache/pulsar#21945 ### Modifications In C++ client, the multi-topics consumer receives messages by configuring internal consumers with a message listener that adds messages to `incomingMessages_`. So this patch pauses the listeners before seek and resumes them after seek. Add `MultiTopicsConsumerTest.testSeekToNewerPosition` for test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work! Left one comment.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
Outdated
Show resolved
Hide resolved
@codelipenghui I have addressed your comments, PTAL again. |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #21945 +/- ##
============================================
+ Coverage 73.59% 73.67% +0.07%
- Complexity 32417 32489 +72
============================================
Files 1861 1863 +2
Lines 138678 138784 +106
Branches 15188 15207 +19
============================================
+ Hits 102060 102245 +185
+ Misses 28715 28651 -64
+ Partials 7903 7888 -15
Flags with carried forward coverage won't be shown. Click here to find out more.
|
### Motivation See apache/pulsar#21945 ### Modifications In C++ client, the multi-topics consumer receives messages by configuring internal consumers with a message listener that adds messages to `incomingMessages_`. So this patch pauses the listeners before seek and resumes them after seek. Add `MultiTopicsConsumerTest.testSeekToNewerPosition` for test.
…ter seek (apache#21945) (cherry picked from commit dc1b107)
…ter seek (apache#21945) (cherry picked from commit dc1b107)
Motivation
The multi-topics consumer supports seeking all internal consumers when accepting a timestamp as the argument. However, the multi-topics consumer could still read from earliest after seeking to a specific position. There are two reasons:
incomingMessages
is not cleared beforeseek(long timestamp)
seekAsync
was called and thenincomingMessages
was clearedincomingMessages
incomingMessages
seekAsync
completed,Consumer#receive
will poll the messages fetched beforeseek
completedModifications
Add a
duringSeek
flag that is set with true when the seek operation starts. Then do not handle messages viamessageReceive
and stopreceiveMessageFromConsumer
ifduringSeek
is true. After the seek option finishes, setduringSeek
back tofalse
and restartreceiveMessageFromConsumer
for all consumers again.Add
testSeekToNewerPosition
to verifyreceive
,receiveAsync
and message listener all work for seeking by timestamp to a newer position on a multi-topics consumer.Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: