-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue #14400
[PIP-74] Dependency of consumer client memory limit: support dynamic limit of consumer receiver queue #14400
Conversation
/pulsarbot run-failure-checks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Left one question.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR changed the behaviour of the original Consumer#receiverQueueSize
,
it looks seem will auto-scale the queue size
, maybe we can change the receiverQueueSize
to initialReceiverQueueSize
and add a doc to explain it.
I'm not sure about this, Please let me know what you think. :)
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to handle the ZeroQueueConsumerImpl
better since that case is not eligible for dynamic queue resizing, at least based on the current class design.
@mattisonchao - good point about the names. However, I think I have a different perspective on what the final names should be.
We currently set the initial queue size with a method named receiverQueueSize
on the consumer builder and then we update it with a method named setMaxReceiverQueueSize
on the consumer. I assume that this difference comes from the fact that ConsumerBase
has a protected variable named maxReceiverQueueSize
. I think that our API should be consistent when possible, and since receiverQueueSize
has been on the builder since 2018, I'd prefer to update the new setter method to setReceiverQueueSize
. That might mean we need to update the protected variable named maxReceiverQueueSize
. What is your perspective @Jason918?
(I am requesting changes because of the ZeroQueueConsumerImpl
issue and not because of the naming discussion.)
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Outdated
Show resolved
Hide resolved
Yes, it's exactly what PIP-74 want to do.
It's actually would be the max value of auto-scaled receiver queue size. |
Good catch.
Good point. Previously, I use |
835b199
to
56fa5c5
Compare
56fa5c5
to
371048d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the explanations @Jason918, I didn't understand the feature at first--I thought the dynamic part was going to be exposed to the end user.
…limit of consumer receiver queue (apache#14400) ### Motivation This is part of the work for [PIP 74](https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits) We need dynamic update `currentReceiverQueue` to control client memory. ### Modifications Add getter and setter method for `ConsumerBase#maxReceiverQueueSize`. - For `ConsumerImpl`, we need update availablePermits together. - For `MultiTopicsConsumerImpl`, we need update inner consumers together and trigger paused consumers.
Motivation
This is part of the work for PIP 74
We need dynamic update
currentReceiverQueue
to control client memory.Modifications
Add getter and setter method for
ConsumerBase#maxReceiverQueueSize
.ConsumerImpl
, we need update availablePermits together.MultiTopicsConsumerImpl
, we need update inner consumers together and trigger paused consumers.Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below and label this PR (if you have committer privilege).
Need to update docs?
no-need-doc