47
47
import org .apache .bookkeeper .mledger .impl .AckSetStateUtil ;
48
48
import org .apache .commons .lang3 .mutable .MutableInt ;
49
49
import org .apache .commons .lang3 .tuple .MutablePair ;
50
+ import org .apache .commons .lang3 .tuple .Pair ;
50
51
import org .apache .pulsar .broker .authentication .AuthenticationDataSubscription ;
51
52
import org .apache .pulsar .broker .loadbalance .extensions .data .BrokerLookupData ;
52
53
import org .apache .pulsar .broker .service .persistent .PersistentSubscription ;
@@ -531,14 +532,16 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
531
532
532
533
//this method is for individual ack not carry the transaction
533
534
private CompletableFuture <Long > individualAckNormal (CommandAck ack , Map <String , Long > properties ) {
534
- List <Position > positionsAcked = new ArrayList <>();
535
+ List <Pair < Consumer , Position > > positionsAcked = new ArrayList <>();
535
536
long totalAckCount = 0 ;
536
537
for (int i = 0 ; i < ack .getMessageIdsCount (); i ++) {
537
538
MessageIdData msgId = ack .getMessageIdAt (i );
538
539
Position position ;
539
- long ackedCount = 0 ;
540
- long batchSize = getBatchSize (msgId );
541
- Consumer ackOwnerConsumer = getAckOwnerConsumer (msgId .getLedgerId (), msgId .getEntryId ());
540
+ Pair <Consumer , Long > ackOwnerConsumerAndBatchSize =
541
+ getAckOwnerConsumerAndBatchSize (msgId .getLedgerId (), msgId .getEntryId ());
542
+ Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize .getLeft ();
543
+ long ackedCount ;
544
+ long batchSize = ackOwnerConsumerAndBatchSize .getRight ();
542
545
if (msgId .getAckSetsCount () > 0 ) {
543
546
long [] ackSets = new long [msgId .getAckSetsCount ()];
544
547
for (int j = 0 ; j < msgId .getAckSetsCount (); j ++) {
@@ -557,28 +560,32 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
557
560
} else {
558
561
position = PositionFactory .create (msgId .getLedgerId (), msgId .getEntryId ());
559
562
ackedCount = getAckedCountForMsgIdNoAckSets (batchSize , position , ackOwnerConsumer );
560
- if (checkCanRemovePendingAcksAndHandle (position , msgId )) {
563
+ if (checkCanRemovePendingAcksAndHandle (ackOwnerConsumer , position , msgId )) {
561
564
addAndGetUnAckedMsgs (ackOwnerConsumer , -(int ) ackedCount );
562
565
}
563
566
}
564
567
565
- positionsAcked .add (position );
568
+ positionsAcked .add (Pair . of ( ackOwnerConsumer , position ) );
566
569
567
570
checkAckValidationError (ack , position );
568
571
569
572
totalAckCount += ackedCount ;
570
573
}
571
- subscription .acknowledgeMessage (positionsAcked , AckType .Individual , properties );
574
+ subscription .acknowledgeMessage (positionsAcked .stream ()
575
+ .map (Pair ::getRight )
576
+ .collect (Collectors .toList ()), AckType .Individual , properties );
572
577
CompletableFuture <Long > completableFuture = new CompletableFuture <>();
573
578
completableFuture .complete (totalAckCount );
574
579
if (isTransactionEnabled () && Subscription .isIndividualAckMode (subType )) {
575
- completableFuture .whenComplete ((v , e ) -> positionsAcked .forEach (position -> {
580
+ completableFuture .whenComplete ((v , e ) -> positionsAcked .forEach (positionPair -> {
581
+ Consumer ackOwnerConsumer = positionPair .getLeft ();
582
+ Position position = positionPair .getRight ();
576
583
//check if the position can remove from the consumer pending acks.
577
584
// the bit set is empty in pending ack handle.
578
585
if (AckSetStateUtil .hasAckSet (position )) {
579
586
if (((PersistentSubscription ) subscription )
580
587
.checkIsCanDeleteConsumerPendingAck (position )) {
581
- removePendingAcks (position );
588
+ removePendingAcks (ackOwnerConsumer , position );
582
589
}
583
590
}
584
591
}));
@@ -590,7 +597,7 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
590
597
//this method is for individual ack carry the transaction
591
598
private CompletableFuture <Long > individualAckWithTransaction (CommandAck ack ) {
592
599
// Individual ack
593
- List <MutablePair <Position , Integer >> positionsAcked = new ArrayList <>();
600
+ List <Pair < Consumer , MutablePair <Position , Integer > >> positionsAcked = new ArrayList <>();
594
601
if (!isTransactionEnabled ()) {
595
602
return FutureUtil .failedFuture (
596
603
new BrokerServiceException .NotAllowedException ("Server don't support transaction ack!" ));
@@ -600,20 +607,23 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
600
607
for (int i = 0 ; i < ack .getMessageIdsCount (); i ++) {
601
608
MessageIdData msgId = ack .getMessageIdAt (i );
602
609
Position position = AckSetStateUtil .createPositionWithAckSet (msgId .getLedgerId (), msgId .getEntryId (), null );
610
+ Consumer ackOwnerConsumer = getAckOwnerConsumerAndBatchSize (msgId .getLedgerId (),
611
+ msgId .getEntryId ()).getLeft ();
603
612
// acked count at least one
604
- long ackedCount = 0 ;
605
- long batchSize = 0 ;
613
+ long ackedCount ;
614
+ long batchSize ;
606
615
if (msgId .hasBatchSize ()) {
607
616
batchSize = msgId .getBatchSize ();
608
617
// ack batch messages set ackeCount = batchSize
609
618
ackedCount = msgId .getBatchSize ();
610
- positionsAcked .add (new MutablePair <>(position , msgId .getBatchSize ()));
619
+ positionsAcked .add (Pair . of ( ackOwnerConsumer , new MutablePair <>(position , msgId .getBatchSize () )));
611
620
} else {
612
621
// ack no batch message set ackedCount = 1
622
+ batchSize = 0 ;
613
623
ackedCount = 1 ;
614
- positionsAcked .add (new MutablePair <>(position , (int ) batchSize ));
624
+ positionsAcked .add (Pair . of ( ackOwnerConsumer , new MutablePair <>(position , (int ) batchSize ) ));
615
625
}
616
- Consumer ackOwnerConsumer = getAckOwnerConsumer ( msgId . getLedgerId (), msgId . getEntryId ());
626
+
617
627
if (msgId .getAckSetsCount () > 0 ) {
618
628
long [] ackSets = new long [msgId .getAckSetsCount ()];
619
629
for (int j = 0 ; j < msgId .getAckSetsCount (); j ++) {
@@ -625,47 +635,31 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
625
635
626
636
addAndGetUnAckedMsgs (ackOwnerConsumer , -(int ) ackedCount );
627
637
628
- checkCanRemovePendingAcksAndHandle (position , msgId );
638
+ checkCanRemovePendingAcksAndHandle (ackOwnerConsumer , position , msgId );
629
639
630
640
checkAckValidationError (ack , position );
631
641
632
642
totalAckCount .add (ackedCount );
633
643
}
634
644
635
645
CompletableFuture <Void > completableFuture = transactionIndividualAcknowledge (ack .getTxnidMostBits (),
636
- ack .getTxnidLeastBits (), positionsAcked );
646
+ ack .getTxnidLeastBits (), positionsAcked . stream (). map ( Pair :: getRight ). collect ( Collectors . toList ()) );
637
647
if (Subscription .isIndividualAckMode (subType )) {
638
648
completableFuture .whenComplete ((v , e ) ->
639
- positionsAcked .forEach (positionLongMutablePair -> {
649
+ positionsAcked .forEach (positionPair -> {
650
+ Consumer ackOwnerConsumer = positionPair .getLeft ();
651
+ MutablePair <Position , Integer > positionLongMutablePair = positionPair .getRight ();
640
652
if (AckSetStateUtil .hasAckSet (positionLongMutablePair .getLeft ())) {
641
653
if (((PersistentSubscription ) subscription )
642
654
.checkIsCanDeleteConsumerPendingAck (positionLongMutablePair .left )) {
643
- removePendingAcks (positionLongMutablePair .left );
655
+ removePendingAcks (ackOwnerConsumer , positionLongMutablePair .left );
644
656
}
645
657
}
646
658
}));
647
659
}
648
660
return completableFuture .thenApply (__ -> totalAckCount .sum ());
649
661
}
650
662
651
- private long getBatchSize (MessageIdData msgId ) {
652
- long batchSize = 1 ;
653
- if (Subscription .isIndividualAckMode (subType )) {
654
- LongPair longPair = pendingAcks .get (msgId .getLedgerId (), msgId .getEntryId ());
655
- // Consumer may ack the msg that not belongs to it.
656
- if (longPair == null ) {
657
- Consumer ackOwnerConsumer = getAckOwnerConsumer (msgId .getLedgerId (), msgId .getEntryId ());
658
- longPair = ackOwnerConsumer .getPendingAcks ().get (msgId .getLedgerId (), msgId .getEntryId ());
659
- if (longPair != null ) {
660
- batchSize = longPair .first ;
661
- }
662
- } else {
663
- batchSize = longPair .first ;
664
- }
665
- }
666
- return batchSize ;
667
- }
668
-
669
663
private long getAckedCountForMsgIdNoAckSets (long batchSize , Position position , Consumer consumer ) {
670
664
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription .isIndividualAckMode (subType )) {
671
665
long [] cursorAckSet = getCursorAckSet (position );
@@ -725,26 +719,39 @@ private void checkAckValidationError(CommandAck ack, Position position) {
725
719
}
726
720
}
727
721
728
- private boolean checkCanRemovePendingAcksAndHandle (Position position , MessageIdData msgId ) {
722
+ private boolean checkCanRemovePendingAcksAndHandle (Consumer ackOwnedConsumer ,
723
+ Position position , MessageIdData msgId ) {
729
724
if (Subscription .isIndividualAckMode (subType ) && msgId .getAckSetsCount () == 0 ) {
730
- return removePendingAcks (position );
725
+ return removePendingAcks (ackOwnedConsumer , position );
731
726
}
732
727
return false ;
733
728
}
734
729
735
- private Consumer getAckOwnerConsumer (long ledgerId , long entryId ) {
736
- Consumer ackOwnerConsumer = this ;
730
+ /**
731
+ * Retrieves the acknowledgment owner consumer and batch size for the specified ledgerId and entryId.
732
+ *
733
+ * @param ledgerId The ID of the ledger.
734
+ * @param entryId The ID of the entry.
735
+ * @return Pair<Consumer, BatchSize>
736
+ */
737
+ private Pair <Consumer , Long > getAckOwnerConsumerAndBatchSize (long ledgerId , long entryId ) {
737
738
if (Subscription .isIndividualAckMode (subType )) {
738
- if (!getPendingAcks ().containsKey (ledgerId , entryId )) {
739
+ LongPair longPair = getPendingAcks ().get (ledgerId , entryId );
740
+ if (longPair != null ) {
741
+ return Pair .of (this , longPair .first );
742
+ } else {
743
+ // If there are more consumers, this step will consume more CPU, and it should be optimized later.
739
744
for (Consumer consumer : subscription .getConsumers ()) {
740
- if (consumer != this && consumer .getPendingAcks ().containsKey (ledgerId , entryId )) {
741
- ackOwnerConsumer = consumer ;
742
- break ;
745
+ if (consumer != this ) {
746
+ longPair = consumer .getPendingAcks ().get (ledgerId , entryId );
747
+ if (longPair != null ) {
748
+ return Pair .of (consumer , longPair .first );
749
+ }
743
750
}
744
751
}
745
752
}
746
753
}
747
- return ackOwnerConsumer ;
754
+ return Pair . of ( this , 1L ) ;
748
755
}
749
756
750
757
private long [] getCursorAckSet (Position position ) {
@@ -1019,44 +1026,24 @@ public int hashCode() {
1019
1026
*
1020
1027
* @param position
1021
1028
*/
1022
- private boolean removePendingAcks (Position position ) {
1023
- Consumer ackOwnedConsumer = null ;
1024
- if (pendingAcks .get (position .getLedgerId (), position .getEntryId ()) == null ) {
1025
- for (Consumer consumer : subscription .getConsumers ()) {
1026
- if (!consumer .equals (this ) && consumer .getPendingAcks ().containsKey (position .getLedgerId (),
1027
- position .getEntryId ())) {
1028
- ackOwnedConsumer = consumer ;
1029
- break ;
1030
- }
1031
- }
1032
- } else {
1033
- ackOwnedConsumer = this ;
1029
+ private boolean removePendingAcks (Consumer ackOwnedConsumer , Position position ) {
1030
+ if (!ackOwnedConsumer .getPendingAcks ().remove (position .getLedgerId (), position .getEntryId ())) {
1031
+ // Message was already removed by the other consumer
1032
+ return false ;
1034
1033
}
1035
-
1036
- // remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
1037
- LongPair ackedPosition = ackOwnedConsumer != null
1038
- ? ackOwnedConsumer .getPendingAcks ().get (position .getLedgerId (), position .getEntryId ())
1039
- : null ;
1040
- if (ackedPosition != null ) {
1041
- if (!ackOwnedConsumer .getPendingAcks ().remove (position .getLedgerId (), position .getEntryId ())) {
1042
- // Message was already removed by the other consumer
1043
- return false ;
1044
- }
1045
- if (log .isDebugEnabled ()) {
1046
- log .debug ("[{}-{}] consumer {} received ack {}" , topicName , subscription , consumerId , position );
1047
- }
1048
- // unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
1049
- // consumer can start again consuming messages
1050
- int unAckedMsgs = UNACKED_MESSAGES_UPDATER .get (ackOwnedConsumer );
1051
- if ((((unAckedMsgs <= getMaxUnackedMessages () / 2 ) && ackOwnedConsumer .blockedConsumerOnUnackedMsgs )
1052
- && ackOwnedConsumer .shouldBlockConsumerOnUnackMsgs ())
1053
- || !shouldBlockConsumerOnUnackMsgs ()) {
1054
- ackOwnedConsumer .blockedConsumerOnUnackedMsgs = false ;
1055
- flowConsumerBlockedPermits (ackOwnedConsumer );
1056
- }
1057
- return true ;
1034
+ if (log .isDebugEnabled ()) {
1035
+ log .debug ("[{}-{}] consumer {} received ack {}" , topicName , subscription , consumerId , position );
1058
1036
}
1059
- return false ;
1037
+ // unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
1038
+ // consumer can start again consuming messages
1039
+ int unAckedMsgs = UNACKED_MESSAGES_UPDATER .get (ackOwnedConsumer );
1040
+ if ((((unAckedMsgs <= getMaxUnackedMessages () / 2 ) && ackOwnedConsumer .blockedConsumerOnUnackedMsgs )
1041
+ && ackOwnedConsumer .shouldBlockConsumerOnUnackMsgs ())
1042
+ || !shouldBlockConsumerOnUnackMsgs ()) {
1043
+ ackOwnedConsumer .blockedConsumerOnUnackedMsgs = false ;
1044
+ flowConsumerBlockedPermits (ackOwnedConsumer );
1045
+ }
1046
+ return true ;
1060
1047
}
1061
1048
1062
1049
public ConcurrentLongLongPairHashMap getPendingAcks () {
0 commit comments