diff --git a/conf/broker.conf b/conf/broker.conf
index 0e08ac9b1b337..94fd429f4581a 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -785,6 +785,11 @@ exposePublisherStats=true
statsUpdateFrequencyInSecs=60
statsUpdateInitialDelayInSecs=60
+# Enable expose the precise backlog stats.
+# Set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate.
+# Default is false.
+exposePreciseBacklogInPrometheus=false
+
### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 3f31b422c5ebd..7dff0c3c913ab 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -531,6 +531,11 @@ exposeTopicLevelMetricsInPrometheus=true
# Enable topic level metrics
exposePublisherStats=true
+# Enable expose the precise backlog stats.
+# Set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate.
+# Default is false.
+exposePreciseBacklogInPrometheus=false
+
### --- Deprecated config variables --- ###
# Deprecated. Use configurationStoreServers
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index ae0426935637c..6f12fb2cb20dc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -193,9 +193,10 @@ void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, ReadEntryC
*
*
This method has linear time complexity on the number of ledgers included in the managed ledger.
*
+ * @param isPrecise set to true to get precise backlog count
* @return the number of entries
*/
- long getNumberOfEntriesInBacklog();
+ long getNumberOfEntriesInBacklog(boolean isPrecise);
/**
* This signals that the reader is done with all the entries up to "position" (included). This can potentially
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 158a2ce889a81..cb6f40a38bfc8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -734,12 +734,16 @@ public long getEstimatedSizeSinceMarkDeletePosition() {
}
@Override
- public long getNumberOfEntriesInBacklog() {
+ public long getNumberOfEntriesInBacklog(boolean isPrecise) {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor ml-entries: {} -- deleted-counter: {} other counters: mdPos {} rdPos {}",
ledger.getName(), name, ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger),
messagesConsumedCounter, markDeletePosition, readPosition);
}
+ if (isPrecise) {
+ return getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1;
+ }
+
long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
if (backlog < 0) {
// In some case the counters get incorrect values, fall back to the precise backlog count
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 85850acb5373c..685b86db170a2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -24,6 +24,7 @@
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@@ -142,6 +143,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
// Ever increasing counter of entries added
+ @VisibleForTesting
static final AtomicLongFieldUpdater ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "entriesAddedCounter");
@SuppressWarnings("unused")
@@ -3176,6 +3178,11 @@ public long getOffloadedSize() {
return offloadedSize;
}
+ @VisibleForTesting
+ public void setEntriesAddedCounter(long count) {
+ ENTRIES_ADDED_COUNTER_UPDATER.set(this, count);
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
index 38c67f49f8085..ec1dc74a290bd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java
@@ -263,7 +263,7 @@ public long getNumberOfMessagesInBacklog() {
long count = 0;
for (ManagedCursor cursor : managedLedger.getCursors()) {
- count += cursor.getNumberOfEntriesInBacklog();
+ count += cursor.getNumberOfEntriesInBacklog(false);
}
return count;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index 21f5747919b2f..99e7aec740bed 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -92,7 +92,7 @@ public long getNumberOfEntries() {
}
@Override
- public long getNumberOfEntriesInBacklog() {
+ public long getNumberOfEntriesInBacklog(boolean isPrecise) {
return 0;
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
index aaf7cd1c6f282..dc876e19eb1ab 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorListAckTest.java
@@ -51,24 +51,24 @@ void testMultiPositionDelete() throws Exception {
Position p7 = ledger.addEntry("dummy-entry-7".getBytes(Encoding));
assertEquals(c1.getNumberOfEntries(), 7);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 7);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 7);
c1.delete(Lists.newArrayList(p2, p3, p5, p7));
assertEquals(c1.getNumberOfEntries(), 3);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
assertEquals(c1.getMarkDeletedPosition(), p0);
c1.delete(Lists.newArrayList(p1));
assertEquals(c1.getNumberOfEntries(), 2);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
assertEquals(c1.getMarkDeletedPosition(), p3);
c1.delete(Lists.newArrayList(p4, p6, p7));
assertEquals(c1.getNumberOfEntries(), 0);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
assertEquals(c1.getMarkDeletedPosition(), p7);
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a02b3650faf2e..7bc0a39851511 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -265,31 +265,31 @@ void testNumberOfEntriesInBacklog() throws Exception {
Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
ManagedCursor c5 = ledger.openCursor("c5");
- assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
- assertEquals(c2.getNumberOfEntriesInBacklog(), 3);
- assertEquals(c3.getNumberOfEntriesInBacklog(), 2);
- assertEquals(c4.getNumberOfEntriesInBacklog(), 1);
- assertEquals(c5.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 4);
+ assertEquals(c2.getNumberOfEntriesInBacklog(false), 3);
+ assertEquals(c3.getNumberOfEntriesInBacklog(false), 2);
+ assertEquals(c4.getNumberOfEntriesInBacklog(false), 1);
+ assertEquals(c5.getNumberOfEntriesInBacklog(false), 0);
List entries = c1.readEntries(2);
assertEquals(entries.size(), 2);
entries.forEach(e -> e.release());
assertEquals(c1.getNumberOfEntries(), 2);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 4);
c1.markDelete(p1);
assertEquals(c1.getNumberOfEntries(), 2);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
c1.delete(p3);
assertEquals(c1.getNumberOfEntries(), 1);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
c1.markDelete(p4);
assertEquals(c1.getNumberOfEntries(), 0);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
}
@Test(timeOut = 20000)
@@ -315,11 +315,11 @@ void testNumberOfEntriesInBacklogWithFallback() throws Exception {
field.setLong(c4, counter);
field.setLong(c5, counter);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
- assertEquals(c2.getNumberOfEntriesInBacklog(), 3);
- assertEquals(c3.getNumberOfEntriesInBacklog(), 2);
- assertEquals(c4.getNumberOfEntriesInBacklog(), 1);
- assertEquals(c5.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 4);
+ assertEquals(c2.getNumberOfEntriesInBacklog(false), 3);
+ assertEquals(c3.getNumberOfEntriesInBacklog(false), 2);
+ assertEquals(c4.getNumberOfEntriesInBacklog(false), 1);
+ assertEquals(c5.getNumberOfEntriesInBacklog(false), 0);
}
@Test(timeOut = 20000)
@@ -811,34 +811,34 @@ void rewind() throws Exception {
log.debug("p4: {}", p4);
assertEquals(c1.getNumberOfEntries(), 4);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 4);
c1.markDelete(p1);
assertEquals(c1.getNumberOfEntries(), 3);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
List entries = c1.readEntries(10);
assertEquals(entries.size(), 3);
entries.forEach(e -> e.release());
assertEquals(c1.getNumberOfEntries(), 0);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
c1.rewind();
assertEquals(c1.getNumberOfEntries(), 3);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
c1.markDelete(p2);
assertEquals(c1.getNumberOfEntries(), 2);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
entries = c1.readEntries(10);
assertEquals(entries.size(), 2);
entries.forEach(e -> e.release());
assertEquals(c1.getNumberOfEntries(), 0);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
c1.rewind();
assertEquals(c1.getNumberOfEntries(), 2);
c1.markDelete(p4);
assertEquals(c1.getNumberOfEntries(), 0);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
c1.rewind();
assertEquals(c1.getNumberOfEntries(), 0);
ledger.addEntry("dummy-entry-5".getBytes(Encoding));
@@ -1324,26 +1324,26 @@ void testFilteringReadEntries() throws Exception {
/* Position p6 = */ledger.addEntry("entry6".getBytes());
assertEquals(cursor.getNumberOfEntries(), 6);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 6);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 6);
List entries = cursor.readEntries(3);
assertEquals(entries.size(), 3);
entries.forEach(e -> e.release());
assertEquals(cursor.getNumberOfEntries(), 3);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 6);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 6);
log.info("Deleting {}", p5);
cursor.delete(p5);
assertEquals(cursor.getNumberOfEntries(), 2);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 5);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 5);
entries = cursor.readEntries(3);
assertEquals(entries.size(), 2);
entries.forEach(e -> e.release());
assertEquals(cursor.getNumberOfEntries(), 0);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 5);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 5);
}
@Test(timeOut = 20000)
@@ -1384,21 +1384,21 @@ void testCountingWithDeletedEntries() throws Exception {
Position p8 = ledger.addEntry("entry8".getBytes());
assertEquals(cursor.getNumberOfEntries(), 8);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 8);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 8);
cursor.delete(p8);
assertEquals(cursor.getNumberOfEntries(), 7);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 7);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 7);
cursor.delete(p1);
assertEquals(cursor.getNumberOfEntries(), 6);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 6);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 6);
cursor.delete(p7);
cursor.delete(p6);
cursor.delete(p5);
assertEquals(cursor.getNumberOfEntries(), 3);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 3);
}
@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
@@ -1504,22 +1504,22 @@ void testClearBacklog(boolean useOpenRangeSet) throws Exception {
ManagedCursor c3 = ledger.openCursor("c3");
ledger.addEntry("dummy-entry-3".getBytes(Encoding));
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
assertEquals(c1.getNumberOfEntries(), 3);
assertTrue(c1.hasMoreEntries());
c1.clearBacklog();
c3.clearBacklog();
- assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
assertEquals(c1.getNumberOfEntries(), 0);
assertFalse(c1.hasMoreEntries());
- assertEquals(c2.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c2.getNumberOfEntriesInBacklog(false), 2);
assertEquals(c2.getNumberOfEntries(), 2);
assertTrue(c2.hasMoreEntries());
- assertEquals(c3.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c3.getNumberOfEntriesInBacklog(false), 0);
assertEquals(c3.getNumberOfEntries(), 0);
assertFalse(c3.hasMoreEntries());
@@ -1530,15 +1530,15 @@ void testClearBacklog(boolean useOpenRangeSet) throws Exception {
c2 = ledger.openCursor("c2");
c3 = ledger.openCursor("c3");
- assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
assertEquals(c1.getNumberOfEntries(), 0);
assertFalse(c1.hasMoreEntries());
- assertEquals(c2.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c2.getNumberOfEntriesInBacklog(false), 2);
assertEquals(c2.getNumberOfEntries(), 2);
assertTrue(c2.hasMoreEntries());
- assertEquals(c3.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c3.getNumberOfEntriesInBacklog(false), 0);
assertEquals(c3.getNumberOfEntries(), 0);
assertFalse(c3.hasMoreEntries());
factory2.shutdown();
@@ -1555,12 +1555,12 @@ void testRateLimitMarkDelete(boolean useOpenRangeSet) throws Exception {
Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
c1.markDelete(p1);
c1.markDelete(p2);
c1.markDelete(p3);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
// Re-open to recover from storage
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
@@ -1569,7 +1569,7 @@ void testRateLimitMarkDelete(boolean useOpenRangeSet) throws Exception {
c1 = ledger.openCursor("c1");
// Only the 1st mark-delete was persisted
- assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
factory2.shutdown();
}
@@ -1585,57 +1585,57 @@ void deleteSingleMessageTwice(boolean useOpenRangeSet) throws Exception {
Position p3 = ledger.addEntry("entry-3".getBytes(Encoding));
Position p4 = ledger.addEntry("entry-4".getBytes(Encoding));
- assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 4);
assertEquals(c1.getNumberOfEntries(), 4);
c1.delete(p1);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
assertEquals(c1.getNumberOfEntries(), 3);
assertEquals(c1.getMarkDeletedPosition(), p1);
assertEquals(c1.getReadPosition(), p2);
// Should have not effect since p1 is already deleted
c1.delete(p1);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
assertEquals(c1.getNumberOfEntries(), 3);
assertEquals(c1.getMarkDeletedPosition(), p1);
assertEquals(c1.getReadPosition(), p2);
c1.delete(p2);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
assertEquals(c1.getNumberOfEntries(), 2);
assertEquals(c1.getMarkDeletedPosition(), p2);
assertEquals(c1.getReadPosition(), p3);
// Should have not effect since p2 is already deleted
c1.delete(p2);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
assertEquals(c1.getNumberOfEntries(), 2);
assertEquals(c1.getMarkDeletedPosition(), p2);
assertEquals(c1.getReadPosition(), p3);
c1.delete(p3);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 1);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 1);
assertEquals(c1.getNumberOfEntries(), 1);
assertEquals(c1.getMarkDeletedPosition(), p3);
assertEquals(c1.getReadPosition(), p4);
// Should have not effect since p3 is already deleted
c1.delete(p3);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 1);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 1);
assertEquals(c1.getNumberOfEntries(), 1);
assertEquals(c1.getMarkDeletedPosition(), p3);
assertEquals(c1.getReadPosition(), p4);
c1.delete(p4);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
assertEquals(c1.getNumberOfEntries(), 0);
assertEquals(c1.getMarkDeletedPosition(), p4);
assertEquals(c1.getReadPosition(), p4.getNext());
// Should have not effect since p4 is already deleted
c1.delete(p4);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
assertEquals(c1.getNumberOfEntries(), 0);
assertEquals(c1.getMarkDeletedPosition(), p4);
assertEquals(c1.getReadPosition(), p4.getNext());
@@ -2367,19 +2367,19 @@ void outOfOrderAcks() throws Exception {
positions.add(ledger.addEntry("entry".getBytes()));
}
- assertEquals(c1.getNumberOfEntriesInBacklog(), N);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), N);
c1.delete(positions.get(3));
- assertEquals(c1.getNumberOfEntriesInBacklog(), N - 1);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), N - 1);
c1.delete(positions.get(2));
- assertEquals(c1.getNumberOfEntriesInBacklog(), N - 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), N - 2);
c1.delete(positions.get(1));
- assertEquals(c1.getNumberOfEntriesInBacklog(), N - 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), N - 3);
c1.delete(positions.get(0));
- assertEquals(c1.getNumberOfEntriesInBacklog(), N - 4);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), N - 4);
}
@Test(timeOut = 20000)
@@ -2394,17 +2394,17 @@ void randomOrderAcks() throws Exception {
positions.add(ledger.addEntry("entry".getBytes()));
}
- assertEquals(c1.getNumberOfEntriesInBacklog(), N);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), N);
// Randomize the ack sequence
Collections.shuffle(positions);
int toDelete = N;
for (Position p : positions) {
- assertEquals(c1.getNumberOfEntriesInBacklog(), toDelete);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), toDelete);
c1.delete(p);
--toDelete;
- assertEquals(c1.getNumberOfEntriesInBacklog(), toDelete);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), toDelete);
}
}
@@ -2572,7 +2572,7 @@ public void testOutOfOrderDeletePersistenceWithClose() throws Exception {
c1.delete(addedPositions.get(8));
c1.delete(addedPositions.get(9));
- assertEquals(c1.getNumberOfEntriesInBacklog(), 20 - 5);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 20 - 5);
ledger.close();
factory.shutdown();
@@ -2581,7 +2581,7 @@ public void testOutOfOrderDeletePersistenceWithClose() throws Exception {
factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
c1 = ledger.openCursor("c1");
- assertEquals(c1.getNumberOfEntriesInBacklog(), 20 - 5);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 20 - 5);
List entries = c1.readEntries(20);
assertEquals(entries.size(), 20 - 5);
@@ -2617,13 +2617,13 @@ public void testOutOfOrderDeletePersistenceAfterCrash() throws Exception {
c1.delete(addedPositions.get(8));
c1.delete(addedPositions.get(9));
- assertEquals(c1.getNumberOfEntriesInBacklog(), 20 - 5);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 20 - 5);
// Re-Open
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ledger = factory2.open("my_test_ledger", new ManagedLedgerConfig());
c1 = ledger.openCursor("c1");
- assertEquals(c1.getNumberOfEntriesInBacklog(), 20 - 5);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 20 - 5);
List entries = c1.readEntries(20);
assertEquals(entries.size(), 20 - 5);
@@ -2736,7 +2736,7 @@ public void testOutOfOrderDeletePersistenceIntoLedgerWithClose() throws Exceptio
}
}
- assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2);
// Close ledger to persist individual-deleted positions into cursor-ledger
ledger.close();
@@ -2788,7 +2788,7 @@ public void operationFailed(MetaStoreException e) {
ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);
c1 = (ManagedCursorImpl) ledger.openCursor("c1");
// verify cursor has been recovered
- assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2);
// try to read entries which should only read non-deleted positions
List entries = c1.readEntries(totalAddEntries);
@@ -2820,7 +2820,7 @@ public void testOutOfOrderDeletePersistenceIntoZkWithClose() throws Exception {
}
}
- assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2);
// Close ledger to persist individual-deleted positions into cursor-ledger
ledger.close();
@@ -2848,7 +2848,7 @@ public void operationFailed(MetaStoreException e) {
ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);
c1 = (ManagedCursorImpl) ledger.openCursor(cursorName);
// verify cursor has been recovered
- assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2);
// try to read entries which should only read non-deleted positions
List entries = c1.readEntries(totalAddEntries);
@@ -3019,7 +3019,7 @@ public void deleteMessagesCheckhMarkDelete() throws Exception {
totalDeletedMessages += 1;
}
}
- assertEquals(c1.getNumberOfEntriesInBacklog(), totalEntries - totalDeletedMessages);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), totalEntries - totalDeletedMessages);
assertEquals(c1.getNumberOfEntries(), totalEntries - totalDeletedMessages);
assertEquals(c1.getMarkDeletedPosition(), positions[0]);
assertEquals(c1.getReadPosition(), positions[1]);
@@ -3033,7 +3033,7 @@ public void deleteMessagesCheckhMarkDelete() throws Exception {
}
}
int markDelete = totalEntries / 2 - 1;
- assertEquals(c1.getNumberOfEntriesInBacklog(), totalEntries - totalDeletedMessages);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), totalEntries - totalDeletedMessages);
assertEquals(c1.getNumberOfEntries(), totalEntries - totalDeletedMessages);
assertEquals(c1.getMarkDeletedPosition(), positions[markDelete]);
assertEquals(c1.getReadPosition(), positions[markDelete + 1]);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
index ea69c63f2c21c..5a9e8d4bafc7b 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java
@@ -124,7 +124,7 @@ public void testBookieFailure() throws Exception {
// Next add should succeed
ledger.addEntry("entry-2".getBytes());
- assertEquals(3, cursor.getNumberOfEntriesInBacklog());
+ assertEquals(3, cursor.getNumberOfEntriesInBacklog(false));
List entries = cursor.readEntries(1);
assertEquals(1, entries.size());
@@ -357,13 +357,13 @@ public void ledgerFencedByAutoReplication() throws Exception {
ledger.addEntry("entry-2".getBytes());
assertEquals(2, c1.getNumberOfEntries());
- assertEquals(2, c1.getNumberOfEntriesInBacklog());
+ assertEquals(2, c1.getNumberOfEntriesInBacklog(false));
PositionImpl p3 = (PositionImpl) ledger.addEntry("entry-3".getBytes());
// Now entry-2 should have been written before entry-3
assertEquals(3, c1.getNumberOfEntries());
- assertEquals(3, c1.getNumberOfEntriesInBacklog());
+ assertEquals(3, c1.getNumberOfEntriesInBacklog(false));
assertTrue(p1.getLedgerId() != p3.getLedgerId());
factory.shutdown();
}
@@ -402,7 +402,7 @@ public void ledgerFencedByFailover() throws Exception {
// Ok
}
- assertEquals(2, c2.getNumberOfEntriesInBacklog());
+ assertEquals(2, c2.getNumberOfEntriesInBacklog(false));
factory1.shutdown();
factory2.shutdown();
}
@@ -459,12 +459,12 @@ void testResetCursorAfterRecovery() throws Exception {
assertEquals(cursor.getMarkDeletedPosition(), p3);
assertEquals(cursor.getReadPosition(), p4);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1);
cursor.resetCursor(p2);
assertEquals(cursor.getMarkDeletedPosition(), p1);
assertEquals(cursor.getReadPosition(), p2);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 3);
factory2.shutdown();
factory.shutdown();
@@ -531,7 +531,7 @@ public void testChangeCrcType() throws Exception {
ledger.addEntry("entry-3".getBytes());
assertEquals(c1.getNumberOfEntries(), 4);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 4);
List entries = c1.readEntries(4);
assertEquals(entries.size(), 4);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 3d47a6f5697d7..2ee6e7d2c9626 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -366,7 +366,7 @@ public void recoverAfterWriteError() throws Exception {
// With one single error, the write should succeed
ledger.addEntry("entry-1".getBytes());
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1);
bkc.failNow(BKException.Code.BookieHandleNotAvailableException);
zkc.failNow(Code.CONNECTIONLOSS);
@@ -385,7 +385,7 @@ public void recoverAfterWriteError() throws Exception {
// ok
}
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1);
// Signal that ManagedLedger has recovered from write error and will be availbe for writes again
ledger.readyToCreateNewLedger();
@@ -393,7 +393,7 @@ public void recoverAfterWriteError() throws Exception {
// Next add should succeed, and the previous write should not appear
ledger.addEntry("entry-4".getBytes());
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 2);
List entries = cursor.readEntries(10);
assertEquals(entries.size(), 2);
@@ -435,7 +435,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
counter.await();
assertNull(ex.get());
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 2);
// Ensure that we are only creating one new ledger
// even when there are multiple (here, 2) add entry failed ops
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index b954f654a9442..cb6da7c41a333 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -175,14 +175,14 @@ public void simple() throws Exception {
assertFalse(cursor.hasMoreEntries());
assertEquals(cursor.getNumberOfEntries(), 0);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 0);
assertEquals(cursor.readEntries(100), new ArrayList());
ledger.addEntry("dummy-entry-2".getBytes(Encoding));
assertTrue(cursor.hasMoreEntries());
assertEquals(cursor.getNumberOfEntries(), 1);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1);
assertEquals(ledger.getNumberOfActiveEntries(), 1);
List entries = cursor.readEntries(100);
@@ -242,7 +242,7 @@ public void acknowledge1() throws Exception {
assertEquals(entries.size(), 2);
assertEquals(cursor.getNumberOfEntries(), 0);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 2);
assertFalse(cursor.hasMoreEntries());
assertEquals(ledger.getNumberOfEntries(), 2);
@@ -251,7 +251,7 @@ public void acknowledge1() throws Exception {
entries.forEach(e -> e.release());
assertEquals(cursor.getNumberOfEntries(), 0);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1);
assertFalse(cursor.hasMoreEntries());
assertEquals(ledger.getNumberOfActiveEntries(), 1);
@@ -266,7 +266,7 @@ public void acknowledge1() throws Exception {
assertEquals(ledger.getTotalSize(), "dummy-entry-1".getBytes(Encoding).length * 2);
assertEquals(cursor.getNumberOfEntries(), 1);
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1);
assertTrue(cursor.hasMoreEntries());
entries = cursor.readEntries(100);
@@ -2313,7 +2313,7 @@ public void testConsumerSubscriptionInitializePosition() throws Exception{
assertEquals(earliestPositionAndCounter.getLeft().getNext(), p2);
assertEquals(latestPositionAndCounter.getRight().longValue(), totalInsertedEntries);
- assertEquals(earliestPositionAndCounter.getRight().longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog());
+ assertEquals(earliestPositionAndCounter.getRight().longValue(), totalInsertedEntries - earliestCursor.getNumberOfEntriesInBacklog(false));
ledger.close();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 957e63bfa3e6a..26cad0a02260f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -221,31 +221,31 @@ void testNumberOfEntriesInBacklog() throws Exception {
Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
ManagedCursor c5 = ledger.newNonDurableCursor(PositionImpl.latest);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
- assertEquals(c2.getNumberOfEntriesInBacklog(), 3);
- assertEquals(c3.getNumberOfEntriesInBacklog(), 2);
- assertEquals(c4.getNumberOfEntriesInBacklog(), 1);
- assertEquals(c5.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 4);
+ assertEquals(c2.getNumberOfEntriesInBacklog(false), 3);
+ assertEquals(c3.getNumberOfEntriesInBacklog(false), 2);
+ assertEquals(c4.getNumberOfEntriesInBacklog(false), 1);
+ assertEquals(c5.getNumberOfEntriesInBacklog(false), 0);
List entries = c1.readEntries(2);
assertEquals(entries.size(), 2);
entries.forEach(e -> e.release());
assertEquals(c1.getNumberOfEntries(), 2);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 4);
c1.markDelete(p1);
assertEquals(c1.getNumberOfEntries(), 2);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
c1.delete(p3);
assertEquals(c1.getNumberOfEntries(), 1);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
c1.markDelete(p4);
assertEquals(c1.getNumberOfEntries(), 0);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
}
@Test(timeOut = 20000)
@@ -371,34 +371,34 @@ void rewind() throws Exception {
log.debug("p4: {}", p4);
assertEquals(c1.getNumberOfEntries(), 4);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 4);
c1.markDelete(p1);
assertEquals(c1.getNumberOfEntries(), 3);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
List entries = c1.readEntries(10);
assertEquals(entries.size(), 3);
entries.forEach(e -> e.release());
assertEquals(c1.getNumberOfEntries(), 0);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
c1.rewind();
assertEquals(c1.getNumberOfEntries(), 3);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
c1.markDelete(p2);
assertEquals(c1.getNumberOfEntries(), 2);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
entries = c1.readEntries(10);
assertEquals(entries.size(), 2);
entries.forEach(e -> e.release());
assertEquals(c1.getNumberOfEntries(), 0);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
c1.rewind();
assertEquals(c1.getNumberOfEntries(), 2);
c1.markDelete(p4);
assertEquals(c1.getNumberOfEntries(), 0);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
c1.rewind();
assertEquals(c1.getNumberOfEntries(), 0);
ledger.addEntry("dummy-entry-5".getBytes(Encoding));
@@ -573,13 +573,13 @@ void subscribeToEarliestPositionWithDeferredDeletion() throws Exception {
assertEquals(c1.getReadPosition(), p1);
assertEquals(c1.getMarkDeletedPosition(), new PositionImpl(3, -1));
assertEquals(c1.getNumberOfEntries(), 6);
- assertEquals(c1.getNumberOfEntriesInBacklog(), 6);
+ assertEquals(c1.getNumberOfEntriesInBacklog(false), 6);
ManagedCursor c2 = ledger.newNonDurableCursor(p1);
assertEquals(c2.getReadPosition(), p2);
assertEquals(c2.getMarkDeletedPosition(), p1);
assertEquals(c2.getNumberOfEntries(), 5);
- assertEquals(c2.getNumberOfEntriesInBacklog(), 5);
+ assertEquals(c2.getNumberOfEntriesInBacklog(false), 5);
}
@Test
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 92b5f7a6b2096..19a4337b4042b 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1370,6 +1370,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Classname of Pluggable JVM GC metrics logger that can log GC specific metrics")
private String jvmGCMetricsLoggerClassName;
+ @FieldContext(
+ category = CATEGORY_METRICS,
+ doc = "Enable expose the precise backlog stats.\n" +
+ " Set false to use published counter and consumed counter to calculate,\n" +
+ " this would be more efficient but may be inaccurate. Default is false."
+ )
+ private boolean exposePreciseBacklogInPrometheus = false;
+
/**** --- Functions --- ****/
@FieldContext(
category = CATEGORY_FUNCTIONS,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4a1021f9aa366..e33f94a3f744d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -808,14 +808,14 @@ protected void internalGetSubscriptions(AsyncResponse asyncResponse, boolean aut
}
}
- protected TopicStats internalGetStats(boolean authoritative) {
+ protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog) {
validateAdminAndClientPermission();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
validateTopicOwnership(topicName, authoritative);
Topic topic = getTopicReference(topicName);
- return topic.getStats();
+ return topic.getStats(getPreciseBacklog);
}
protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative) {
@@ -850,7 +850,7 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
}
protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative,
- boolean perPartition) {
+ boolean perPartition, boolean getPreciseBacklog) {
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
if (partitionMetadata.partitions == 0) {
throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
@@ -864,7 +864,7 @@ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
topicStatsFutureList
- .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString())));
+ .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString()), getPreciseBacklog));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 12b9622d7d430..4cf6e124ca82f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -95,7 +95,7 @@ public NonPersistentTopicStats getStats(@PathParam("property") String property,
validateTopicName(property, cluster, namespace, encodedTopic);
validateAdminOperationOnTopic(authoritative);
Topic topic = getTopicReference(topicName);
- return ((NonPersistentTopic) topic).getStats();
+ return ((NonPersistentTopic) topic).getStats(false);
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 9944ca36469da..890e18e210cac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -264,7 +264,7 @@ public TopicStats getStats(@PathParam("property") String property, @PathParam("c
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- return internalGetStats(authoritative);
+ return internalGetStats(authoritative, false);
}
@GET
@@ -305,7 +305,7 @@ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalGetPartitionedStats(asyncResponse, authoritative, perPartition);
+ internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index add815d13c02b..13ac2845f5fda 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -113,11 +113,13 @@ public NonPersistentTopicStats getStats(
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @ApiParam(value = "Is return precise backlog or imprecise backlog")
+ @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
validateTopicName(tenant, namespace, encodedTopic);
validateAdminOperationOnTopic(topicName, authoritative);
Topic topic = getTopicReference(topicName);
- return ((NonPersistentTopic) topic).getStats();
+ return ((NonPersistentTopic) topic).getStats(getPreciseBacklog);
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 57dd7e1726236..a9c9b5ea96893 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -46,8 +46,6 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
-import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -458,9 +456,11 @@ public TopicStats getStats(
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @ApiParam(value = "Is return precise backlog or imprecise backlog")
+ @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
validateTopicName(tenant, namespace, encodedTopic);
- return internalGetStats(authoritative);
+ return internalGetStats(authoritative, getPreciseBacklog);
}
@GET
@@ -529,10 +529,12 @@ public void getPartitionedStats(
@ApiParam(value = "Get per partition stats")
@QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
@ApiParam(value = "Is authentication required to perform this operation")
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @ApiParam(value = "Is return precise backlog or imprecise backlog")
+ @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
- internalGetPartitionedStats(asyncResponse, authoritative, perPartition);
+ internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 1d15b003d8c0f..e481b16cb653e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -145,7 +145,7 @@ private void dropBacklog(PersistentTopic persistentTopic, BacklogQuota quota) {
}
// Calculate number of messages to be skipped using the current backlog and the skip factor.
- long entriesInBacklog = slowestConsumer.getNumberOfEntriesInBacklog();
+ long entriesInBacklog = slowestConsumer.getNumberOfEntriesInBacklog(false);
int messagesToSkip = (int) (messageSkipFactor * entriesInBacklog);
try {
// If there are no messages to skip, break out of the loop
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fe9d4d564e437..a35db0c135df5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1407,7 +1407,7 @@ public String generateUniqueProducerName() {
public Map getTopicStats() {
HashMap stats = new HashMap<>();
- forEachTopic(topic -> stats.put(topic.getName(), topic.getStats()));
+ forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false)));
return stats;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 50a5107c05fd9..bef63fc7f5b0a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -454,7 +454,7 @@ CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consum
commandConsumerStatsResponseBuilder.setConnectedSince(consumerStats.getConnectedSince());
Subscription subscription = consumer.getSubscription();
- commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog());
+ commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false));
commandConsumerStatsResponseBuilder.setMsgRateExpired(subscription.getExpiredMessageRate());
commandConsumerStatsResponseBuilder.setType(subscription.getTypeString());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 18c7c49ae37f5..f7d9687523960 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -53,7 +53,7 @@ default void removeConsumer(Consumer consumer) throws BrokerServiceException {
Dispatcher getDispatcher();
- long getNumberOfEntriesInBacklog();
+ long getNumberOfEntriesInBacklog(boolean getPreciseBacklog);
default long getNumberOfEntriesDelayed() {
return 0;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 26af1c1c5c8bf..6b3685deba927 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -162,7 +162,7 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
ConcurrentOpenHashMap getReplicators();
- TopicStats getStats();
+ TopicStats getStats(boolean getPreciseBacklog);
PersistentTopicInternalStats getInternalStats();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 652042000b6ac..f653ee52b4953 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -250,7 +250,7 @@ public CompletableFuture peekNthMessage(int messagePosition) {
}
@Override
- public long getNumberOfEntriesInBacklog() {
+ public long getNumberOfEntriesInBacklog(boolean getPreciseBacklog) {
// No-op
return 0;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 8e3b14fa7f43a..27dc4bb3111bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -697,7 +697,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStatsStream.endList();
// Populate subscription specific stats here
- topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog());
+ topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false));
topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
@@ -714,7 +714,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStats.aggMsgRateOut += subMsgRateOut;
topicStats.aggMsgThroughputOut += subMsgThroughputOut;
- nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog();
+ nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(false);
} catch (Exception e) {
log.error("Got exception when creating consumer stats for subscription {}: {}", subscriptionName,
e.getMessage(), e);
@@ -751,7 +751,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStatsStream.endObject();
}
- public NonPersistentTopicStats getStats() {
+ public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {
NonPersistentTopicStats stats = new NonPersistentTopicStats();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index 0c3feb31bfd86..5d39c20a2441d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -89,7 +89,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}
}, null);
- if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog() == 0) {
+ if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Notify all consumer that the end of topic was reached
dispatcher.getConsumers().forEach(Consumer::reachedEndOfTopic);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index fee9cfe34522a..08a3a0128669c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -543,7 +543,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
long waitTimeMillis = readFailureBackoff.next();
if (exception instanceof NoMoreEntriesToReadException) {
- if (cursor.getNumberOfEntriesInBacklog() == 0) {
+ if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Topic has been terminated and there are no more entries to read
// Notify the consumer only if all the messages were already acknowledged
consumerList.forEach(Consumer::reachedEndOfTopic);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 084d0b41208b8..38399824cebac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -455,7 +455,7 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
long waitTimeMillis = readFailureBackoff.next();
if (exception instanceof NoMoreEntriesToReadException) {
- if (cursor.getNumberOfEntriesInBacklog() == 0) {
+ if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Topic has been terminated and there are no more entries to read
// Notify the consumer only if all the messages were already acknowledged
consumers.forEach(Consumer::reachedEndOfTopic);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index deb274457bf92..1b761d8fb7af6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -99,7 +99,7 @@ public double getMessageExpiryRate() {
private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
- long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog();
+ long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(false);
msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value stats */);
updateRates();
@@ -119,7 +119,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
public void findEntryComplete(Position position, Object ctx) {
if (position != null) {
log.info("[{}][{}] Expiring all messages until position {}", topicName, subName, position);
- cursor.asyncMarkDelete(position, markDeleteCallback, cursor.getNumberOfEntriesInBacklog());
+ cursor.asyncMarkDelete(position, markDeleteCallback, cursor.getNumberOfEntriesInBacklog(false));
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] No messages to expire", topicName, subName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 2bc18ca384709..c4a19c460b529 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -149,7 +149,7 @@ protected Position getReplicatorReadPosition() {
@Override
protected long getNumberOfEntriesInBacklog() {
- return cursor.getNumberOfEntriesInBacklog();
+ return cursor.getNumberOfEntriesInBacklog(false);
}
@Override
@@ -507,7 +507,7 @@ public CompletableFuture clearBacklog() {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Backlog size before clearing: {}", topicName, localCluster, remoteCluster,
- cursor.getNumberOfEntriesInBacklog());
+ cursor.getNumberOfEntriesInBacklog(false));
}
cursor.asyncClearBacklog(new ClearBacklogCallback() {
@@ -515,7 +515,7 @@ public CompletableFuture clearBacklog() {
public void clearBacklogComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Backlog size after clearing: {}", topicName, localCluster, remoteCluster,
- cursor.getNumberOfEntriesInBacklog());
+ cursor.getNumberOfEntriesInBacklog(false));
}
future.complete(null);
}
@@ -535,7 +535,7 @@ public CompletableFuture skipMessages(int numMessagesToSkip) {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Skipping {} messages, current backlog {}", topicName, localCluster, remoteCluster,
- numMessagesToSkip, cursor.getNumberOfEntriesInBacklog());
+ numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false));
}
cursor.asyncSkipEntries(numMessagesToSkip, IndividualDeletedEntries.Exclude,
new AsyncCallbacks.SkipEntriesCallback() {
@@ -543,7 +543,7 @@ public CompletableFuture skipMessages(int numMessagesToSkip) {
public void skipEntriesComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Skipped {} messages, new backlog {}", topicName, localCluster,
- remoteCluster, numMessagesToSkip, cursor.getNumberOfEntriesInBacklog());
+ remoteCluster, numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false));
}
future.complete(null);
}
@@ -605,7 +605,7 @@ public void updateRates() {
}
public ReplicatorStats getStats() {
- stats.replicationBacklog = cursor.getNumberOfEntriesInBacklog();
+ stats.replicationBacklog = cursor.getNumberOfEntriesInBacklog(false);
stats.connected = producer != null && producer.isConnected();
stats.replicationDelayInSeconds = getReplicationDelayInSeconds();
@@ -633,8 +633,8 @@ private long getReplicationDelayInSeconds() {
}
public void expireMessages(int messageTTLInSeconds) {
- if ((cursor.getNumberOfEntriesInBacklog() == 0)
- || (cursor.getNumberOfEntriesInBacklog() < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
+ if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
+ || (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
&& !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) {
// don't do anything for almost caught-up connected subscriptions
return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 8b5fe42571b58..1b0ae9f77ca07 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -391,7 +392,7 @@ public void acknowledgeMessage(List positions, AckType ackType, Map clearBacklog() {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Backlog size before clearing: {}", topicName, subName,
- cursor.getNumberOfEntriesInBacklog());
+ cursor.getNumberOfEntriesInBacklog(false));
}
cursor.asyncClearBacklog(new ClearBacklogCallback() {
@@ -581,7 +582,7 @@ public CompletableFuture clearBacklog() {
public void clearBacklogComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Backlog size after clearing: {}", topicName, subName,
- cursor.getNumberOfEntriesInBacklog());
+ cursor.getNumberOfEntriesInBacklog(false));
}
future.complete(null);
}
@@ -602,7 +603,7 @@ public CompletableFuture skipMessages(int numMessagesToSkip) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Skipping {} messages, current backlog {}", topicName, subName, numMessagesToSkip,
- cursor.getNumberOfEntriesInBacklog());
+ cursor.getNumberOfEntriesInBacklog(false));
}
cursor.asyncSkipEntries(numMessagesToSkip, IndividualDeletedEntries.Exclude,
new AsyncCallbacks.SkipEntriesCallback() {
@@ -610,7 +611,7 @@ public CompletableFuture skipMessages(int numMessagesToSkip) {
public void skipEntriesComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Skipped {} messages, new backlog {}", topicName, subName,
- numMessagesToSkip, cursor.getNumberOfEntriesInBacklog());
+ numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false));
}
future.complete(null);
}
@@ -776,8 +777,8 @@ public void readEntryComplete(Entry entry, Object ctx) {
}
@Override
- public long getNumberOfEntriesInBacklog() {
- return cursor.getNumberOfEntriesInBacklog();
+ public long getNumberOfEntriesInBacklog(boolean getPreciseBacklog) {
+ return cursor.getNumberOfEntriesInBacklog(getPreciseBacklog);
}
@Override
@@ -917,8 +918,8 @@ public List getConsumers() {
@Override
public void expireMessages(int messageTTLInSeconds) {
this.lastExpireTimestamp = System.currentTimeMillis();
- if ((getNumberOfEntriesInBacklog() == 0) || (dispatcher != null && dispatcher.isConsumerConnected()
- && getNumberOfEntriesInBacklog() < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
+ if ((getNumberOfEntriesInBacklog(false) == 0) || (dispatcher != null && dispatcher.isConsumerConnected()
+ && getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
&& !topic.isOldestMessageExpired(cursor, messageTTLInSeconds))) {
// don't do anything for almost caught-up connected subscriptions
return;
@@ -934,7 +935,7 @@ public long estimateBacklogSize() {
return cursor.getEstimatedSizeSinceMarkDeletePosition();
}
- public SubscriptionStats getStats() {
+ public SubscriptionStats getStats(Boolean getPreciseBacklog) {
SubscriptionStats subStats = new SubscriptionStats();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
@@ -967,7 +968,8 @@ public SubscriptionStats getStats() {
subStats.msgDelayed = d.getNumberOfDelayedMessages();
}
}
- subStats.msgBacklog = getNumberOfEntriesInBacklog();
+ subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog);
+ subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
subStats.isReplicated = isReplicated();
return subStats;
@@ -1047,7 +1049,7 @@ public void markTopicWithBatchMessagePublished() {
}
void topicTerminated() {
- if (cursor.getNumberOfEntriesInBacklog() == 0) {
+ if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
// notify the consumers if there are consumers connected to this topic.
if (null != dispatcher) {
// Immediately notify the consumer that there are no more available messages
@@ -1197,5 +1199,10 @@ public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapsho
}
}
+ @VisibleForTesting
+ public ManagedCursor getCursor() {
+ return cursor;
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 463b6e2dcc011..c618606eeea83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1414,7 +1414,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStatsStream.endList();
// Populate subscription specific stats here
- topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog());
+ topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false));
topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
@@ -1435,7 +1435,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStatsHelper.aggMsgRateOut += subMsgRateOut;
topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut;
- nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog();
+ nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(false);
} catch (Exception e) {
log.error("Got exception when creating consumer stats for subscription {}: {}", subscriptionName,
e.getMessage(), e);
@@ -1487,7 +1487,7 @@ public double getLastUpdatedAvgPublishRateInByte() {
return lastUpdatedAvgPublishRateInByte;
}
- public TopicStats getStats() {
+ public TopicStats getStats(boolean getPreciseBacklog) {
TopicStats stats = new TopicStats();
@@ -1510,7 +1510,7 @@ public TopicStats getStats() {
stats.bytesInCounter = getBytesInCounter();
subscriptions.forEach((name, subscription) -> {
- SubscriptionStats subStats = subscription.getStats();
+ SubscriptionStats subStats = subscription.getStats(getPreciseBacklog);
stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
@@ -1620,7 +1620,7 @@ public boolean isActive(InactiveTopicDeleteMode deleteMode) {
}
private boolean hasBacklogs() {
- return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog() > 0);
+ return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog(false) > 0);
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index cc5388e8ced02..ea05ed074c09e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -93,6 +93,7 @@ void updateStats(TopicStats stats) {
msgDelayed += as.msgDelayed;
subsStats.blockedSubscriptionOnUnackedMsgs = as.blockedSubscriptionOnUnackedMsgs;
subsStats.msgBacklog += as.msgBacklog;
+ subsStats.msgBacklogNoDelayed += as.msgBacklogNoDelayed;
subsStats.msgDelayed += as.msgDelayed;
subsStats.msgRateRedeliver += as.msgRateRedeliver;
subsStats.unackedMessages += as.unackedMessages;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index 1f3c51302e980..d5b53537dcc9a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -27,6 +27,8 @@ public class AggregatedSubscriptionStats {
public long msgBacklog;
+ public long msgBacklogNoDelayed;
+
public boolean blockedSubscriptionOnUnackedMsgs;
public double msgRateRedeliver;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 19aa043ae823a..a7b35b8c63337 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -61,7 +61,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
bundlesMap.forEach((bundle, topicsMap) -> {
topicsMap.forEach((name, topic) -> {
- getTopicStats(topic, topicStats, includeConsumerMetrics);
+ getTopicStats(topic, topicStats, includeConsumerMetrics, pulsar.getConfiguration().isExposePreciseBacklogInPrometheus());
if (includeTopicMetrics) {
topicsCount.add(1);
@@ -82,7 +82,7 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
});
}
- private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics) {
+ private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics, boolean getPreciseBacklog) {
stats.reset();
if (topic instanceof PersistentTopic) {
@@ -104,8 +104,8 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.storageReadRate = mlStats.getReadEntriesRate();
}
- stats.msgInCounter = topic.getStats().msgInCounter;
- stats.bytesInCounter = topic.getStats().bytesInCounter;
+ stats.msgInCounter = topic.getStats(getPreciseBacklog).msgInCounter;
+ stats.bytesInCounter = topic.getStats(getPreciseBacklog).bytesInCounter;
stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
@@ -125,12 +125,13 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
topic.getSubscriptions().forEach((name, subscription) -> {
stats.subscriptionsCount++;
- stats.msgBacklog += subscription.getNumberOfEntriesInBacklog();
+ stats.msgBacklog += subscription.getNumberOfEntriesInBacklog(getPreciseBacklog);
AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(name, k -> new AggregatedSubscriptionStats());
- subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog();
+ subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog(getPreciseBacklog);
subsStats.msgDelayed = subscription.getNumberOfEntriesDelayed();
+ subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subscription.getConsumers().forEach(consumer -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 846104441fac3..caf0ce842e2b3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -134,6 +134,7 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
stats.subscriptionStats.forEach((n, subsStats) -> {
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log", subsStats.msgBacklog);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed", subsStats.msgBacklogNoDelayed);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed", subsStats.msgDelayed);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver", subsStats.msgRateRedeliver);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages", subsStats.unackedMessages);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 156ddf5b2a3c5..e250b4a39f88f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -53,12 +53,14 @@
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -1062,4 +1064,185 @@ public void testConsumerStatsLastTimestamp() throws PulsarClientException, Pulsa
consumer.close();
producer.close();
}
+
+ @Test(timeOut = 30000)
+ public void testPreciseBacklog() throws PulsarClientException, PulsarAdminException, InterruptedException {
+ final String topic = "persistent://prop-xyz/ns1/precise-back-log";
+ final String subName = "sub-name";
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+
+ @Cleanup
+ Consumer consumer = client.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+
+ @Cleanup
+ Producer producer = client.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ producer.send("message-1".getBytes(StandardCharsets.UTF_8));
+ Message message = consumer.receive();
+ assertNotNull(message);
+
+ // Mock the entries added count. Default is disable the precise backlog, so the backlog is entries added count - consumed count
+ // Since message have not acked, so the backlog is 10
+ PersistentSubscription subscription = (PersistentSubscription)pulsar.getBrokerService().getTopicReference(topic).get().getSubscription(subName);
+ assertNotNull(subscription);
+ ((ManagedLedgerImpl)subscription.getCursor().getManagedLedger()).setEntriesAddedCounter(10L);
+ TopicStats topicStats = admin.topics().getStats(topic);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
+
+ topicStats = admin.topics().getStats(topic, true);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 1);
+ consumer.acknowledge(message);
+
+ // wait for ack send
+ Thread.sleep(500);
+
+ // Consumer acks the message, so the precise backlog is 0
+ topicStats = admin.topics().getStats(topic, true);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 0);
+
+ topicStats = admin.topics().getStats(topic);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 9);
+ }
+
+ @Test(timeOut = 30000)
+ public void testBacklogNoDelayed() throws PulsarClientException, PulsarAdminException, InterruptedException {
+ final String topic = "persistent://prop-xyz/ns1/precise-back-log-no-delayed";
+ final String subName = "sub-name";
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+
+ @Cleanup
+ Consumer consumer = client.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer producer = client.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ if (i > 4) {
+ producer.newMessage()
+ .value("message-1".getBytes(StandardCharsets.UTF_8))
+ .deliverAfter(10, TimeUnit.SECONDS)
+ .send();
+ } else {
+ producer.send("message-1".getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ TopicStats topicStats = admin.topics().getStats(topic, true);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 5);
+
+ for (int i = 0; i < 5; i++) {
+ consumer.acknowledge(consumer.receive());
+ }
+ // Wait the ack send.
+ Thread.sleep(500);
+ topicStats = admin.topics().getStats(topic, true);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0);
+ }
+
+ @Test
+ public void testPreciseBacklogForPartitionedTopic() throws PulsarClientException, PulsarAdminException, InterruptedException {
+ final String topic = "persistent://prop-xyz/ns1/precise-back-log-for-partitioned-topic";
+ admin.topics().createPartitionedTopic(topic, 2);
+ final String subName = "sub-name";
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+
+ @Cleanup
+ Consumer consumer = client.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+
+ @Cleanup
+ Producer producer = client.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ producer.send("message-1".getBytes(StandardCharsets.UTF_8));
+ Message message = consumer.receive();
+ assertNotNull(message);
+
+ // Mock the entries added count. Default is disable the precise backlog, so the backlog is entries added count - consumed count
+ // Since message have not acked, so the backlog is 10
+ for (int i = 0; i < 2; i++) {
+ PersistentSubscription subscription = (PersistentSubscription)pulsar.getBrokerService().getTopicReference(topic + "-partition-" + i).get().getSubscription(subName);
+ assertNotNull(subscription);
+ ((ManagedLedgerImpl)subscription.getCursor().getManagedLedger()).setEntriesAddedCounter(10L);
+ }
+
+ TopicStats topicStats = admin.topics().getPartitionedStats(topic, false);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 20);
+
+ topicStats = admin.topics().getPartitionedStats(topic, false, true);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 1);
+ }
+
+ @Test(timeOut = 30000)
+ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientException, PulsarAdminException, InterruptedException {
+ final String topic = "persistent://prop-xyz/ns1/precise-back-log-no-delayed-partitioned-topic";
+ admin.topics().createPartitionedTopic(topic, 2);
+ final String subName = "sub-name";
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
+
+ @Cleanup
+ Consumer consumer = client.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer producer = client.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ if (i > 4) {
+ producer.newMessage()
+ .value("message-1".getBytes(StandardCharsets.UTF_8))
+ .deliverAfter(10, TimeUnit.SECONDS)
+ .send();
+ } else {
+ producer.send("message-1".getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 5);
+
+ for (int i = 0; i < 5; i++) {
+ consumer.acknowledge(consumer.receive());
+ }
+ // Wait the ack send.
+ Thread.sleep(500);
+ topicStats = admin.topics().getPartitionedStats(topic, false, true);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5);
+ assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0);
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 05b888a785479..bdf9ab8336152 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -121,7 +121,7 @@ public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressio
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
// we expect 2 messages in the backlog since we sent 50 messages with the batch size set to 25. We have set the
// batch time high enough for it to not affect the number of messages in the batch
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 2);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
for (int i = 0; i < numMsgs; i++) {
@@ -170,7 +170,7 @@ public void testSimpleBatchProducerWithFixedBatchBytes(CompressionType compressi
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
// we expect 2 messages in the backlog since we sent 50 messages with the batch size set to 25. We have set the
// batch time high enough for it to not affect the number of messages in the batch
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 2);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
for (int i = 0; i < numMsgs; i++) {
@@ -215,8 +215,8 @@ public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressio
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
LOG.info("Sent {} messages, backlog is {} messages", numMsgs,
- topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog());
- assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog() < numMsgs);
+ topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false));
+ assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false) < numMsgs);
producer.close();
}
@@ -251,8 +251,8 @@ public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType com
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
LOG.info("Sent {} messages, backlog is {} messages", numMsgs,
- topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog());
- assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog() < numMsgs);
+ topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false));
+ assertTrue(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false) < numMsgs);
producer.close();
}
@@ -298,7 +298,7 @@ public void testBatchProducerWithLargeMessage(CompressionType compressionType, B
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
// we expect 3 messages in the backlog since the large message in the middle should
// close out the batch and be sent in a batch of its own
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 3);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 3);
consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
@@ -312,7 +312,7 @@ public void testBatchProducerWithLargeMessage(CompressionType compressionType, B
consumer.acknowledge(msg);
}
Thread.sleep(100);
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0);
consumer.close();
producer.close();
}
@@ -354,7 +354,7 @@ public void testSimpleBatchProducerConsumer(CompressionType compressionType, Bat
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), numMsgs / numMsgsInBatch);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Message lastunackedMsg = null;
@@ -371,7 +371,7 @@ public void testSimpleBatchProducerConsumer(CompressionType compressionType, Bat
consumer.acknowledgeCumulative(lastunackedMsg);
}
Thread.sleep(100);
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0);
consumer.close();
producer.close();
}
@@ -403,7 +403,7 @@ public void testSimpleBatchSyncProducerWithFixedBatchSize(BatcherBuilder builder
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
// we expect 10 messages in the backlog since we sent 10 messages with the batch size set to 5.
// However, we are using synchronous send and so each message will go as an individual message
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 10);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 10);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
for (int i = 0; i < numMsgs; i++) {
@@ -458,7 +458,7 @@ public void testSimpleBatchProducerConsumer1kMessages(BatcherBuilder builder) th
// allow stats to be updated..
LOG.info("[{}] checking backlog stats..");
rolloverPerIntervalStats();
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), numMsgs / numMsgsInBatch);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Message lastunackedMsg = null;
@@ -473,7 +473,7 @@ public void testSimpleBatchProducerConsumer1kMessages(BatcherBuilder builder) th
consumer.close();
producer.close();
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0);
}
// test for ack holes
@@ -507,7 +507,7 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception {
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
rolloverPerIntervalStats();
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), numMsgs / numMsgsInBatch);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), numMsgs / numMsgsInBatch);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Set individualAcks = new HashSet<>();
for (int i = 15; i < 20; i++) {
@@ -528,7 +528,7 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception {
Thread.sleep(1000);
rolloverPerIntervalStats();
Thread.sleep(1000);
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 3);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 3);
} else if (individualAcks.contains(i)) {
consumer.acknowledge(msg);
} else {
@@ -537,12 +537,12 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception {
}
Thread.sleep(1000);
rolloverPerIntervalStats();
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 2);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2);
if (lastunackedMsg != null) {
consumer.acknowledgeCumulative(lastunackedMsg);
}
Thread.sleep(100);
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0);
consumer.close();
producer.close();
}
@@ -580,7 +580,7 @@ public void testNonBatchCumulativeAckAfterBatchPublish(BatcherBuilder builder) t
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 2);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 2);
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
Message lastunackedMsg = null;
@@ -594,7 +594,7 @@ public void testNonBatchCumulativeAckAfterBatchPublish(BatcherBuilder builder) t
}
Thread.sleep(100);
rolloverPerIntervalStats();
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), 0);
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false), 0);
consumer.close();
producer.close();
noBatchProducer.close();
@@ -637,7 +637,7 @@ public void testBatchAndNonBatchCumulativeAcks(BatcherBuilder builder) throws Ex
rolloverPerIntervalStats();
assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0);
- assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(),
+ assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false),
(numMsgs / 2) / numMsgsInBatch + numMsgs / 2);
consumer = pulsarClient.newConsumer()
.topic(topicName)
@@ -662,7 +662,7 @@ public void testBatchAndNonBatchCumulativeAcks(BatcherBuilder builder) throws Ex
consumer.acknowledgeCumulative(lastunackedMsg);
}
- retryStrategically(t -> topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog() == 0, 100, 100);
+ retryStrategically(t -> topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(false) == 0, 100, 100);
consumer.close();
producer.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 505038bede387..a2f27822d0008 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -153,7 +153,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
assertNotNull(topicRef);
rolloverPerIntervalStats();
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.subscriptions.values().iterator().next();
// subscription stats
@@ -171,7 +171,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.subscriptions.values().iterator().next();
// publisher stats
@@ -208,7 +208,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.subscriptions.values().iterator().next();
assertEquals(subStats.msgBacklog, 0);
@@ -221,13 +221,13 @@ public void testStatsOfStorageSizeWithSubscription() throws Exception {
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);
- assertEquals(topicRef.getStats().storageSize, 0);
+ assertEquals(topicRef.getStats(false).storageSize, 0);
for (int i = 0; i < 10; i++) {
producer.send(new byte[10]);
}
- assertTrue(topicRef.getStats().storageSize > 0);
+ assertTrue(topicRef.getStats(false).storageSize > 0);
}
@Test
@@ -246,7 +246,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
assertNotNull(topicRef);
rolloverPerIntervalStats();
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.subscriptions.values().iterator().next();
// subscription stats
@@ -264,7 +264,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.subscriptions.values().iterator().next();
// publisher stats
@@ -299,7 +299,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.subscriptions.values().iterator().next();
assertTrue(subStats.msgRateRedeliver > 0.0);
assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
@@ -313,7 +313,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
rolloverPerIntervalStats();
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.subscriptions.values().iterator().next();
assertEquals(subStats.msgBacklog, 0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 21f3c03fa6097..eccaea6237244 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -187,7 +187,7 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {
rolloverPerIntervalStats();
- assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
// 3. consumer1 should have all the messages while consumer2 should have no messages
@@ -204,7 +204,7 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {
// 4. messages deleted on individual acks
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
- assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
for (int i = 0; i < numMsgs; i++) {
String message = "my-message-" + i;
@@ -241,7 +241,7 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {
rolloverPerIntervalStats();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
- assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
// 8. unsubscribe not allowed if multiple consumers connected
try {
@@ -517,7 +517,7 @@ public void testActiveConsumerFailoverWithDelay() throws Exception {
// wait for all messages to be dequeued
int retry = 20;
for (int i = 0; i < retry; i++) {
- if (receivedMessages.size() >= numMsgs && subRef.getNumberOfEntriesInBacklog() == 0) {
+ if (receivedMessages.size() >= numMsgs && subRef.getNumberOfEntriesInBacklog(false) == 0) {
break;
} else if (i != retry - 1) {
Thread.sleep(100);
@@ -526,7 +526,7 @@ public void testActiveConsumerFailoverWithDelay() throws Exception {
// check if message duplication has occurred
assertEquals(receivedMessages.size(), numMsgs);
- assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
for (int i = 0; i < receivedMessages.size(); i++) {
Assert.assertNotNull(receivedMessages.get(i));
Assert.assertEquals(new String(receivedMessages.get(i).getData()), "my-message-" + i);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 7ffb00565c63c..495a9973b5322 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -117,7 +117,7 @@ public void testSimpleConsumerEvents() throws Exception {
rolloverPerIntervalStats();
- assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs * 2);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs * 2);
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
// both consumers will together consumer all messages
@@ -141,7 +141,7 @@ public void testSimpleConsumerEvents() throws Exception {
// 3. messages deleted on individual acks
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
- assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
// 4. shared consumer unsubscribe not allowed
try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 38a080f0035e0..42190c7a18024 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -161,7 +161,7 @@ public void testSimpleConsumerEvents() throws Exception {
assertTrue(subRef.getDispatcher().isConsumerConnected());
rolloverPerIntervalStats();
- assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs * 2);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs * 2);
// 2. messages pushed before client receive
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@@ -179,7 +179,7 @@ public void testSimpleConsumerEvents() throws Exception {
// 4. messages deleted on individual acks
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
- assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
for (int i = 0; i < numMsgs; i++) {
msg = consumer.receive();
@@ -192,7 +192,7 @@ public void testSimpleConsumerEvents() throws Exception {
// 5. messages deleted on cumulative acks
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
- assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
// 6. consumer unsubscribe
consumer.unsubscribe();
@@ -365,7 +365,7 @@ public Void call() throws Exception {
PersistentSubscription subRef = topicRef.getSubscription(subName);
// 1. cumulatively all threads drain the backlog
- assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
// 2. flow control works the same as single consumer single thread
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@@ -848,13 +848,13 @@ public void testMessageExpiry() throws Exception {
}
rolloverPerIntervalStats();
- assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs));
runMessageExpiryCheck();
// 1. check all messages expired for this unconnected subscription
- assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
// clean-up
producer.close();
@@ -895,17 +895,17 @@ public void testMessageExpiryWithFewExpiredBacklog() throws Exception {
}
rolloverPerIntervalStats();
- assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs));
runMessageExpiryCheck();
- assertEquals(subRef.getNumberOfEntriesInBacklog(), numMsgs);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs / 2));
runMessageExpiryCheck();
- assertEquals(subRef.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
index bbcb6b194b19b..c9cc07ccaed00 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
@@ -76,19 +76,19 @@ public void testSeek() throws Exception {
}
PersistentSubscription sub = topicRef.getSubscription("my-subscription");
- assertEquals(sub.getNumberOfEntriesInBacklog(), 10);
+ assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
consumer.seek(MessageId.latest);
- assertEquals(sub.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(sub.getNumberOfEntriesInBacklog(false), 0);
// Wait for consumer to reconnect
Thread.sleep(500);
consumer.seek(MessageId.earliest);
- assertEquals(sub.getNumberOfEntriesInBacklog(), 10);
+ assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
Thread.sleep(500);
consumer.seek(messageIds.get(5));
- assertEquals(sub.getNumberOfEntriesInBacklog(), 5);
+ assertEquals(sub.getNumberOfEntriesInBacklog(false), 5);
}
@Test
@@ -131,16 +131,16 @@ public void testSeekTime() throws Exception {
producer.send(message.getBytes());
}
- assertEquals(sub.getNumberOfEntriesInBacklog(), 10);
+ assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
long currentTimestamp = System.currentTimeMillis();
consumer.seek(currentTimestamp);
- assertEquals(sub.getNumberOfEntriesInBacklog(), 1);
+ assertEquals(sub.getNumberOfEntriesInBacklog(false), 1);
// Wait for consumer to reconnect
Thread.sleep(1000);
consumer.seek(currentTimestamp - resetTimeInMillis);
- assertEquals(sub.getNumberOfEntriesInBacklog(), 10);
+ assertEquals(sub.getNumberOfEntriesInBacklog(false), 10);
}
@Test
@@ -176,7 +176,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
long backlogs = 0;
for (PersistentSubscription sub : subs) {
- backlogs += sub.getNumberOfEntriesInBacklog();
+ backlogs += sub.getNumberOfEntriesInBacklog(false);
}
assertEquals(backlogs, 10);
@@ -185,7 +185,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
long currentTimestamp = System.currentTimeMillis();
consumer.seek(currentTimestamp);
for (PersistentSubscription sub : subs) {
- backlogs += sub.getNumberOfEntriesInBacklog();
+ backlogs += sub.getNumberOfEntriesInBacklog(false);
}
assertEquals(backlogs, 2);
@@ -195,7 +195,7 @@ public void testSeekTimeOnPartitionedTopic() throws Exception {
backlogs = 0;
for (PersistentSubscription sub : subs) {
- backlogs += sub.getNumberOfEntriesInBacklog();
+ backlogs += sub.getNumberOfEntriesInBacklog(false);
}
assertEquals(backlogs, 10);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
index bb010a61d05a2..c01173ee2511c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
@@ -54,6 +54,7 @@ public void testSimpleAggregation() throws Exception {
subStats1.msgBacklog = 50;
subStats1.msgRateRedeliver = 1.5;
subStats1.unackedMessages = 2;
+ subStats1.msgBacklogNoDelayed = 30;
topicStats1.subscriptionStats.put(namespace, subStats1);
TopicStats topicStats2 = new TopicStats();
@@ -81,6 +82,7 @@ public void testSimpleAggregation() throws Exception {
subStats2.msgBacklog = 27;
subStats2.msgRateRedeliver = 0.7;
subStats2.unackedMessages = 0;
+ subStats2.msgBacklogNoDelayed = 20;
topicStats2.subscriptionStats.put(namespace, subStats2);
AggregatedNamespaceStats nsStats = new AggregatedNamespaceStats();
@@ -111,6 +113,7 @@ public void testSimpleAggregation() throws Exception {
AggregatedSubscriptionStats nsSubStats = nsStats.subscriptionStats.get(namespace);
assertNotNull(nsSubStats);
assertEquals(nsSubStats.msgBacklog, 77);
+ assertEquals(nsSubStats.msgBacklogNoDelayed, 50);
assertEquals(nsSubStats.msgRateRedeliver, 2.2);
assertEquals(nsSubStats.unackedMessages, 2);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index bb7aa65d2398a..5cb4fc1383dae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -544,7 +544,7 @@ public void testBlockDispatcherStats() throws Exception {
assertNotNull(topicRef);
rolloverPerIntervalStats();
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.subscriptions.values().iterator().next();
// subscription stats
@@ -562,7 +562,7 @@ public void testBlockDispatcherStats() throws Exception {
Thread.sleep(timeWaitToSync);
rolloverPerIntervalStats();
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.subscriptions.values().iterator().next();
assertTrue(subStats.msgBacklog > 0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 1e46fe86ca41e..9232642c02f9b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -438,7 +438,7 @@ public void testTopicStats() throws Exception {
assertNotNull(topicRef);
rolloverPerIntervalStats(pulsar);
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.getSubscriptions().values().iterator().next();
// subscription stats
@@ -456,7 +456,7 @@ public void testTopicStats() throws Exception {
Thread.sleep(timeWaitToSync);
rolloverPerIntervalStats(pulsar);
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.getSubscriptions().values().iterator().next();
assertTrue(subStats.msgRateOut > 0);
@@ -520,7 +520,7 @@ public void testReplicator() throws Exception {
assertNotNull(replicatorR3);
rolloverPerIntervalStats(replicationPulasr);
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.getSubscriptions().values().iterator().next();
// subscription stats
@@ -591,7 +591,7 @@ public void testReplicator() throws Exception {
Thread.sleep(timeWaitToSync);
rolloverPerIntervalStats(replicationPulasr);
- stats = topicRef.getStats();
+ stats = topicRef.getStats(false);
subStats = stats.getSubscriptions().values().iterator().next();
assertTrue(subStats.msgRateOut > 0);
@@ -812,7 +812,7 @@ public void testMsgDropStat() throws Exception {
NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
pulsar.getBrokerService().updateRates();
- NonPersistentTopicStats stats = topic.getStats();
+ NonPersistentTopicStats stats = topic.getStats(false);
NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1");
NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
index 87278a4d0745b..9fcb129a373c8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageParserTest.java
@@ -124,7 +124,7 @@ public void testWithBatches() throws Exception {
producer.send("hello-" + (n - 1));
// Read through raw data
- assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
+ assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1);
Entry entry = cursor.readEntriesOrWait(1).get(0);
List messages = Lists.newArrayList();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 0d693663b76fc..41e0153583034 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -794,9 +794,9 @@ public void testDefaultBacklogTTL() throws Exception {
topic.get().checkMessageExpiry();
- retryStrategically((test) -> subscription.getNumberOfEntriesInBacklog() == 0, 5, 200);
+ retryStrategically((test) -> subscription.getNumberOfEntriesInBacklog(false) == 0, 5, 200);
- assertEquals(subscription.getNumberOfEntriesInBacklog(), 0);
+ assertEquals(subscription.getNumberOfEntriesInBacklog(false), 0);
}
@Test(timeOut = testTimeout)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index c5ec04120c022..0a004628de6b0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -565,6 +565,8 @@ List getListInBundle(String namespace, String bundleRange)
*
* @param topic
* topic name
+ * @param getPreciseBacklog
+ * Set to true to get precise backlog, Otherwise get imprecise backlog.
* @return the topic statistics
*
* @throws NotAuthorizedException
@@ -574,7 +576,11 @@ List getListInBundle(String namespace, String bundleRange)
* @throws PulsarAdminException
* Unexpected error
*/
- TopicStats getStats(String topic) throws PulsarAdminException;
+ TopicStats getStats(String topic, boolean getPreciseBacklog) throws PulsarAdminException;
+
+ default TopicStats getStats(String topic) throws PulsarAdminException {
+ return getStats(topic, false);
+ }
/**
* Get the stats for the topic asynchronously. All the rates are computed over a 1 minute window and are relative
@@ -582,11 +588,17 @@ List getListInBundle(String namespace, String bundleRange)
*
* @param topic
* topic name
+ * @param getPreciseBacklog
+ * Set to true to get precise backlog, Otherwise get imprecise backlog.
*
* @return a future that can be used to track when the topic statistics are returned
*
*/
- CompletableFuture getStatsAsync(String topic);
+ CompletableFuture getStatsAsync(String topic, boolean getPreciseBacklog);
+
+ default CompletableFuture getStatsAsync(String topic) {
+ return getStatsAsync(topic, false);
+ }
/**
* Get the internal stats for the topic.
@@ -716,7 +728,11 @@ List getListInBundle(String namespace, String bundleRange)
* Unexpected error
*
*/
- PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition) throws PulsarAdminException;
+ PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, boolean getPreciseBacklog) throws PulsarAdminException;
+
+ default PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition) throws PulsarAdminException {
+ return getPartitionedStats(topic, perPartition, false);
+ }
/**
* Get the stats for the partitioned topic asynchronously
@@ -727,7 +743,11 @@ List getListInBundle(String namespace, String bundleRange)
* flag to get stats per partition
* @return a future that can be used to track when the partitioned topic statistics are returned
*/
- CompletableFuture getPartitionedStatsAsync(String topic, boolean perPartition);
+ CompletableFuture getPartitionedStatsAsync(String topic, boolean perPartition, boolean getPreciseBacklog);
+
+ default CompletableFuture getPartitionedStatsAsync(String topic, boolean perPartition) {
+ return getPartitionedStatsAsync(topic, perPartition, false);
+ }
/**
* Get the stats for the partitioned topic
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 75c3d59a55d7c..c61359a74f0b5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -437,9 +437,9 @@ public void failed(Throwable throwable) {
}
@Override
- public TopicStats getStats(String topic) throws PulsarAdminException {
+ public TopicStats getStats(String topic, boolean getPreciseBacklog) throws PulsarAdminException {
try {
- return getStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ return getStatsAsync(topic, getPreciseBacklog).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
@@ -451,9 +451,9 @@ public TopicStats getStats(String topic) throws PulsarAdminException {
}
@Override
- public CompletableFuture getStatsAsync(String topic) {
+ public CompletableFuture getStatsAsync(String topic, boolean getPreciseBacklog) {
TopicName tn = validateTopic(topic);
- WebTarget path = topicPath(tn, "stats");
+ WebTarget path = topicPath(tn, "stats").queryParam("getPreciseBacklog", getPreciseBacklog);
final CompletableFuture future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback() {
@@ -542,10 +542,10 @@ public void failed(Throwable throwable) {
}
@Override
- public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition)
+ public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, boolean getPreciseBacklog)
throws PulsarAdminException {
try {
- return getPartitionedStatsAsync(topic, perPartition).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ return getPartitionedStatsAsync(topic, perPartition, getPreciseBacklog).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
@@ -558,10 +558,10 @@ public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartit
@Override
public CompletableFuture getPartitionedStatsAsync(String topic,
- boolean perPartition) {
+ boolean perPartition, boolean getPreciseBacklog) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitioned-stats");
- path = path.queryParam("perPartition", perPartition);
+ path = path.queryParam("perPartition", perPartition).queryParam("getPreciseBacklog", getPreciseBacklog);
final CompletableFuture future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback() {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 461c55e08cc19..d9bd2f703306e 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -630,7 +630,7 @@ void topics() throws Exception {
verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1");
cmdTopics.run(split("stats persistent://myprop/clust/ns1/ds1"));
- verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1");
+ verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1"));
verify(mockTopics).getInternalStats("persistent://myprop/clust/ns1/ds1");
@@ -639,7 +639,7 @@ void topics() throws Exception {
verify(mockTopics).getInternalInfo("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("partitioned-stats persistent://myprop/clust/ns1/ds1 --per-partition"));
- verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true);
+ verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true, false);
cmdTopics.run(split("clear-backlog persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).skipAllMessages("persistent://myprop/clust/ns1/ds1", "sub1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index af5aa02e7a2a3..f5954cf38bac4 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -370,10 +370,14 @@ private class GetStats extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
private java.util.List params;
+ @Parameter(names = { "-gpb",
+ "--get-precise-backlog" }, description = "Set true to get precise backlog")
+ private boolean getPreciseBacklog = false;
+
@Override
void run() throws PulsarAdminException {
String topic = validateTopicName(params);
- print(topics.getStats(topic));
+ print(topics.getStats(topic, getPreciseBacklog));
}
}
@@ -412,10 +416,14 @@ private class GetPartitionedStats extends CliCommand {
@Parameter(names = "--per-partition", description = "Get per partition stats")
private boolean perPartition = false;
+ @Parameter(names = { "-gpb",
+ "--get-precise-backlog" }, description = "Set true to get precise backlog")
+ private boolean getPreciseBacklog = false;
+
@Override
void run() throws Exception {
String topic = validateTopicName(params);
- print(topics.getPartitionedStats(topic, perPartition));
+ print(topics.getPartitionedStats(topic, perPartition, getPreciseBacklog));
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index a4c299493be79..7064883a04c2a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -40,6 +40,9 @@ public class SubscriptionStats {
/** Number of messages in the subscription backlog. */
public long msgBacklog;
+ /** Number of messages in the subscription backlog that do not contain the delay messages. */
+ public long msgBacklogNoDelayed;
+
/** Flag to verify if subscription is blocked due to reaching threshold of unacked messages. */
public boolean blockedSubscriptionOnUnackedMsgs;
@@ -85,6 +88,7 @@ public void reset() {
msgThroughputOut = 0;
msgRateRedeliver = 0;
msgBacklog = 0;
+ msgBacklogNoDelayed = 0;
unackedMessages = 0;
msgRateExpired = 0;
lastExpireTimestamp = 0L;
@@ -99,6 +103,7 @@ public SubscriptionStats add(SubscriptionStats stats) {
this.msgThroughputOut += stats.msgThroughputOut;
this.msgRateRedeliver += stats.msgRateRedeliver;
this.msgBacklog += stats.msgBacklog;
+ this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
this.unackedMessages += stats.unackedMessages;
this.msgRateExpired += stats.msgRateExpired;
this.isReplicated |= stats.isReplicated;
diff --git a/site2/docs/admin-api-non-partitioned-topics.md b/site2/docs/admin-api-non-partitioned-topics.md
index 61f7409b47938..a8cfe0fc8d183 100644
--- a/site2/docs/admin-api-non-partitioned-topics.md
+++ b/site2/docs/admin-api-non-partitioned-topics.md
@@ -101,3 +101,59 @@ persistent://tenant/namespace/topic2
```java
admin.topics().getList(namespace);
```
+
+### Stats
+
+It shows current statistics of a given topic. Here's an example payload:
+
+The following stats are available:
+
+|Stat|Description|
+|----|-----------|
+|msgRateIn|The sum of all local and replication publishers’ publish rates in messages per second|
+|msgThroughputIn|Same as msgRateIn but in bytes per second instead of messages per second|
+|msgRateOut|The sum of all local and replication consumers’ dispatch rates in messages per second|
+|msgThroughputOut|Same as msgRateOut but in bytes per second instead of messages per second|
+|averageMsgSize|Average message size, in bytes, from this publisher within the last interval|
+|storageSize|The sum of the ledgers’ storage size for this topic|
+|publishers|The list of all local publishers into the topic. There can be anywhere from zero to thousands.|
+|producerId|Internal identifier for this producer on this topic|
+|producerName|Internal identifier for this producer, generated by the client library|
+|address|IP address and source port for the connection of this producer|
+|connectedSince|Timestamp this producer was created or last reconnected|
+|subscriptions|The list of all local subscriptions to the topic|
+|my-subscription|The name of this subscription (client defined)|
+|msgBacklog|The count of messages in backlog for this subscription|
+|msgBacklogNoDelayed|The count of messages in backlog without delayed messages for this subscription|
+|type|This subscription type|
+|msgRateExpired|The rate at which messages were discarded instead of dispatched from this subscription due to TTL|
+|consumers|The list of connected consumers for this subscription|
+|consumerName|Internal identifier for this consumer, generated by the client library|
+|availablePermits|The number of messages this consumer has space for in the client library’s listen queue. A value of 0 means the client library’s queue is full and receive() isn’t being called. A nonzero value means this consumer is ready to be dispatched messages.|
+|replication|This section gives the stats for cross-colo replication of this topic|
+|replicationBacklog|The outbound replication backlog in messages|
+|connected|Whether the outbound replicator is connected|
+|replicationDelayInSeconds|How long the oldest message has been waiting to be sent through the connection, if connected is true|
+|inboundConnection|The IP and port of the broker in the remote cluster’s publisher connection to this broker|
+|inboundConnectedSince|The TCP connection being used to publish messages to the remote cluster. If there are no local publishers connected, this connection is automatically closed after a minute.|
+
+#### pulsar-admin
+
+The stats for the topic and its connected producers and consumers can be fetched by using the
+[`stats`](reference-pulsar-admin.md#stats) command, specifying the topic by name:
+
+```shell
+$ pulsar-admin topics stats \
+ persistent://test-tenant/namespace/topic \
+ --get-precise-backlog
+```
+
+#### REST API
+
+{@inject: endpoint|GET|/admin/v2/persistent/:tenant/:namespace/:topic/stats|operation/getStats}
+
+#### Java
+
+```java
+admin.topics().getStats(persistentTopic, false /* is precise backlog */);
+```
diff --git a/site2/docs/admin-api-partitioned-topics.md b/site2/docs/admin-api-partitioned-topics.md
index a6507e073b759..d413425c282f1 100644
--- a/site2/docs/admin-api-partitioned-topics.md
+++ b/site2/docs/admin-api-partitioned-topics.md
@@ -254,6 +254,7 @@ The following stats are available:
|subscriptions|The list of all local subscriptions to the topic|
|my-subscription|The name of this subscription (client defined)|
|msgBacklog|The count of messages in backlog for this subscription|
+|msgBacklogNoDelayed|The count of messages in backlog without delayed messages for this subscription|
|type|This subscription type|
|msgRateExpired|The rate at which messages were discarded instead of dispatched from this subscription due to TTL|
|consumers|The list of connected consumers for this subscription|
@@ -274,7 +275,7 @@ The stats for the partitioned topic and its connected producers and consumers ca
```shell
$ pulsar-admin topics partitioned-stats \
persistent://test-tenant/namespace/topic \
- --per-partition
+ --per-partition
```
#### REST API
@@ -284,7 +285,7 @@ $ pulsar-admin topics partitioned-stats \
#### Java
```java
-admin.persistentTopics().getStats(persistentTopic);
+admin.topics().getPartitionedStats(persistentTopic, true /* per partition */, false /* is precise backlog */);
```
### Internal stats
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index 3440f7a23915f..f08656f9bc9c5 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -162,6 +162,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|brokerClientAuthenticationPlugin| Authentication settings of the broker itself. Used when the broker connects to other brokers, either in same or other clusters ||
|brokerClientAuthenticationParameters|||
|athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication ||
+|exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false|
|bookkeeperClientAuthenticationPlugin| Authentication plugin to use when connecting to bookies ||
|bookkeeperClientAuthenticationParametersName| BookKeeper auth plugin implementatation specifics parameters name and values ||
|bookkeeperClientAuthenticationParameters|||
@@ -349,6 +350,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used
|brokerClientAuthenticationPlugin| The authentication settings of the broker itself. Used when the broker connects to other brokers either in the same cluster or from other clusters. ||
|brokerClientAuthenticationParameters| The parameters that go along with the plugin specified using brokerClientAuthenticationPlugin. ||
|athenzDomainNames| Supported Athenz authentication provider domain names as a comma-separated list. ||
+|exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false|
|bookkeeperClientAuthenticationPlugin| Authentication plugin to be used when connecting to bookies (BookKeeper servers). ||
|bookkeeperClientAuthenticationParametersName| BookKeeper authentication plugin implementation parameters and values. ||
|bookkeeperClientAuthenticationParameters| Parameters associated with the bookkeeperClientAuthenticationParametersName ||