-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][txn] fix ack with txn compute ackedCount error #17016
[fix][txn] fix ack with txn compute ackedCount error #17016
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can also change get batchSize
in the method individualAckNormal
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! Left some comments.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
long ackedCount = 0; | ||
long batchSize = getBatchSize(msgId); | ||
long batchSize = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value may be 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not batch message, the batchSize can not be able to 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, If not batch message, the batch size should be 0, right?
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Outdated
Show resolved
Hide resolved
@congbobo184 Please provide a correct documentation label for your PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When sendMessages
, we put the batch size into pendingAcks
, so in general, we may get the batch size from it when process ack...
@Technoboy- transaction ack command will carry the batchSize, if the batchSize is not in pendingAcks(the message redeliver hasn't been sent to any consumer), we will not get the correct batch size. So I think in Transaction ack, we uniformly use command carried batch size is ok |
@Technoboy- Ping |
1 similar comment
@Technoboy- Ping |
…nsaction_ack_reduce_unack_message_count # Conflicts: # pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
Co-authored-by: congbobo184 <[email protected]>
Co-authored-by: congbobo184 <[email protected]> (cherry picked from commit 176b0d6)
Co-authored-by: congbobo184 <[email protected]> (cherry picked from commit 176b0d6) (cherry picked from commit a0eb84e)
Co-authored-by: congbobo184 <[email protected]> (cherry picked from commit 176b0d6)
Co-authored-by: congbobo184 <[email protected]> (cherry picked from commit 176b0d6)
issue:
reproduce:
Motivation
get correct batch size to compute the consumer unackedCount
Modification
use the correct batch size from the
messageIdDate
Verifying this change
add the test
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Does this pull request introduce a new feature? (yes)
If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
If a feature is not applicable for documentation, explain why?
If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
doc-not-needed