-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix dispatch duplicated messages with Exclusive
mode.
#17237
[fix][broker] Fix dispatch duplicated messages with Exclusive
mode.
#17237
Conversation
...ache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
Show resolved
Hide resolved
...va/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
Outdated
Show resolved
Hide resolved
I will add more information later |
...er/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
Outdated
Show resolved
Hide resolved
From this log, we can know |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
great catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good to me.
...er/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
Outdated
Show resolved
Hide resolved
...er/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
Show resolved
Hide resolved
...er/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
Outdated
Show resolved
Hide resolved
@mattisonchao Do you know which PR introduced this issue? |
This is very interesting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch. Do we have an idea of what is causing the early calls to the readMoreEntries
method? Adding synchronize will ensure correctness, but it might also increase contention and slow down the dispatcher.
@@ -332,12 +332,17 @@ public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl | |||
} | |||
|
|||
@Override | |||
protected void readMoreEntries(Consumer consumer) { | |||
protected synchronized void readMoreEntries(Consumer consumer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we might not want to acquire the lock before the consumer
null check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified the scope of the lock
// consumer can be null when all consumers are disconnected from broker. | ||
// so skip reading more entries if currently there is no active consumer. | ||
if (null == consumer) { | ||
return; | ||
} | ||
if (havePendingRead) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see several parts of the code that call this readMoreEntries
method after verifying that havePendingRead
is true
. I haven't yet verified the details of each call, but it seems like we should clean up those guards if we are moving the logic into this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move the havePendingRead
check to the readMoreEntries
method to avoid judgement everywhere.
Except for some judgments that have logical implications, I have changed them all.
Please take a look.
Pair<Integer, Long> calculateResult = calculateToRead(consumer); | ||
int messagesToRead = calculateResult.getLeft(); | ||
long bytesToRead = calculateResult.getRight(); | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move lock here to avoid invoking useless calculateToRead
many times.
+ " havePendingRead {}", | ||
topic.getName(), currentConsumer, havePendingRead); | ||
} | ||
if (isRescheduleReadInProgress.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to avoid reScheduleRead
many times, see #16241
After the test, I think the problem was caused by we probably invoking So, we may read the same position many times. Even though I fix the |
Hi @eolivelli
When I wanted to fix the flaky test "SubscriptionMessageDispatchThrottlingTest", I found that one time we received more messages than we sent. So, I added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
good catch !
Exclusive
mode.Exclusive
mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// If this assertion failed, please alert we may have some regression cause message dispatch was duplicated. | ||
Assert.assertEquals(totalReceived.get(), numProducedMessages, 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is flaky. since the comment is "please alert", I'm alerting. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was running it locally.
java.lang.AssertionError:
Expected :30.0
Actual :44.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try to verify it. Maybe I lost something. very thanks to you!
We meet same issue here on flink-connector-pulsar.
|
…apache#17237) (cherry picked from commit 0517423) (cherry picked from commit 390a4ed)
Motivation
When invoking
readMoreEntries
we have to check if thehavePendingRead
state istrue
to avoid reading the same position many times in the race condition.You can add
invocationCount
on theTest
annotation to call the new test multiple times to understand the problem.The relative logs:
Modifications
synchronized
keyword andhavePendingRead
state to avoid read same position many times.havePendingRead
check.Verifying this change
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)