Skip to content

Commit

Permalink
Don't increment unacked messages for the consumer with Exclusive/Fail…
Browse files Browse the repository at this point in the history
…over subscription mode. (apache#6558)

Fixes apache#6552

### Motivation

apache#6552 is introduced by apache#5929, so this PR stop increase unacked messages for the consumer with Exclusive/Failover subscription mode.
(cherry picked from commit 2449696)
  • Loading branch information
codelipenghui authored and jiazhai committed May 17, 2020
1 parent 668595f commit eb68db8
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba
}

private void incrementUnackedMessages(int ackedMessages) {
if (addAndGetUnAckedMsgs(this, ackedMessages) >= maxUnackedMessages && shouldBlockConsumerOnUnackMsgs()) {
if (Subscription.isIndividualAckMode(subType)
&& addAndGetUnAckedMsgs(this, ackedMessages) >= maxUnackedMessages
&& maxUnackedMessages > 0) {
blockedConsumerOnUnackedMsgs = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,41 @@ public void testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer() throws Pulsar
Assert.assertEquals(stats.subscriptions.entrySet().iterator().next().getValue().consumers.get(0).unackedMessages, 0);
}

@Test
public void testAckStatsOnPartitionedTopicForExclusiveSubscription() throws PulsarAdminException, PulsarClientException, InterruptedException {
final String topic = "persistent://my-property/my-ns/testAckStatsOnPartitionedTopicForExclusiveSubscription";
admin.topics().createPartitionedTopic(topic, 3);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("sub")
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();

final int messages = 10;
for (int i = 0; i < messages; i++) {
producer.send(("message-" + i).getBytes());
}

int received = 0;
for (int i = 0; i < messages; i++) {
consumer.acknowledge(consumer.receive());
received++;
}
Assert.assertEquals(messages, received);

// wait acknowledge send
Thread.sleep(2000);

for (int i = 0; i < 3; i++) {
TopicStats stats = admin.topics().getStats(topic + "-partition-" + i);
Assert.assertEquals(stats.subscriptions.size(), 1);
Assert.assertEquals(stats.subscriptions.entrySet().iterator().next().getValue().consumers.size(), 1);
Assert.assertEquals(stats.subscriptions.entrySet().iterator().next().getValue().consumers.get(0).unackedMessages, 0);
}
}

}

0 comments on commit eb68db8

Please sign in to comment.