Skip to content

Commit

Permalink
Improve backlogSize stats in the topic. (apache#6700)
Browse files Browse the repository at this point in the history
### Motivation

When all subscriptions have no backlogs, but the backlog size of the topic stats is not 0. So this PR improves the backlog size calculation of the managed ledger.

### Modifications

If all entries are consumed, return the ledger size as the consumed size.

### Verifying this change

A new unit test added.
  • Loading branch information
codelipenghui authored Apr 12, 2020
1 parent 854716f commit d72e383
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -989,8 +989,12 @@ private long consumedLedgerSize(long ledgerSize, long ledgerEntries, long consum
if (ledgerEntries <= 0) {
return 0;
}
long averageSize = ledgerSize / ledgerEntries;
return consumedEntries >= 0 ? (consumedEntries + 1) * averageSize : 0;
if (ledgerEntries == (consumedEntries + 1)) {
return ledgerSize;
} else {
long averageSize = ledgerSize / ledgerEntries;
return consumedEntries >= 0 ? (consumedEntries + 1) * averageSize : 0;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2424,4 +2424,31 @@ public void testTopicStatsLastExpireTimestampForSubscription() throws PulsarAdmi

Assert.assertTrue(admin.topics().getStats(topic).subscriptions.values().iterator().next().lastExpireTimestamp > 0L);
}

@Test
public void testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages";
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscribe();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();

final int messages = 33;
for (int i = 0; i < messages; i++) {
producer.send(new byte[1024 * i * 5]);
}

for (int i = 0; i < messages; i++) {
consumer.acknowledgeCumulative(consumer.receive());
}

// Wait ack send
Thread.sleep(1000);

TopicStats topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.backlogSize, 0);
}
}

0 comments on commit d72e383

Please sign in to comment.