Skip to content

Commit

Permalink
Enable get precise backlog and backlog without delayed messages. (apa…
Browse files Browse the repository at this point in the history
…che#6310)

Fixes apache#6045 apache#6281 

### Motivation

Enable get precise backlog and backlog without delayed messages.

### Verifying this change

Added new unit tests for the change.
  • Loading branch information
codelipenghui authored Feb 16, 2020
1 parent 91dfa1a commit df15210
Show file tree
Hide file tree
Showing 57 changed files with 577 additions and 253 deletions.
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,11 @@ exposePublisherStats=true
statsUpdateFrequencyInSecs=60
statsUpdateInitialDelayInSecs=60

# Enable expose the precise backlog stats.
# Set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate.
# Default is false.
exposePreciseBacklogInPrometheus=false

### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
Expand Down
5 changes: 5 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,11 @@ exposeTopicLevelMetricsInPrometheus=true
# Enable topic level metrics
exposePublisherStats=true

# Enable expose the precise backlog stats.
# Set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate.
# Default is false.
exposePreciseBacklogInPrometheus=false

### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,10 @@ void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, ReadEntryC
*
* <p/>This method has linear time complexity on the number of ledgers included in the managed ledger.
*
* @param isPrecise set to true to get precise backlog count
* @return the number of entries
*/
long getNumberOfEntriesInBacklog();
long getNumberOfEntriesInBacklog(boolean isPrecise);

/**
* This signals that the reader is done with all the entries up to "position" (included). This can potentially
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,12 +734,16 @@ public long getEstimatedSizeSinceMarkDeletePosition() {
}

@Override
public long getNumberOfEntriesInBacklog() {
public long getNumberOfEntriesInBacklog(boolean isPrecise) {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor ml-entries: {} -- deleted-counter: {} other counters: mdPos {} rdPos {}",
ledger.getName(), name, ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger),
messagesConsumedCounter, markDeletePosition, readPosition);
}
if (isPrecise) {
return getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition())) - 1;
}

long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
if (backlog < 0) {
// In some case the counters get incorrect values, fall back to the precise backlog count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -142,6 +143,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();

// Ever increasing counter of entries added
@VisibleForTesting
static final AtomicLongFieldUpdater<ManagedLedgerImpl> ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "entriesAddedCounter");
@SuppressWarnings("unused")
Expand Down Expand Up @@ -3176,6 +3178,11 @@ public long getOffloadedSize() {
return offloadedSize;
}

@VisibleForTesting
public void setEntriesAddedCounter(long count) {
ENTRIES_ADDED_COUNTER_UPDATER.set(this, count);
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public long getNumberOfMessagesInBacklog() {
long count = 0;

for (ManagedCursor cursor : managedLedger.getCursors()) {
count += cursor.getNumberOfEntriesInBacklog();
count += cursor.getNumberOfEntriesInBacklog(false);
}

return count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public long getNumberOfEntries() {
}

@Override
public long getNumberOfEntriesInBacklog() {
public long getNumberOfEntriesInBacklog(boolean isPrecise) {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,24 @@ void testMultiPositionDelete() throws Exception {
Position p7 = ledger.addEntry("dummy-entry-7".getBytes(Encoding));

assertEquals(c1.getNumberOfEntries(), 7);
assertEquals(c1.getNumberOfEntriesInBacklog(), 7);
assertEquals(c1.getNumberOfEntriesInBacklog(false), 7);

c1.delete(Lists.newArrayList(p2, p3, p5, p7));

assertEquals(c1.getNumberOfEntries(), 3);
assertEquals(c1.getNumberOfEntriesInBacklog(), 3);
assertEquals(c1.getNumberOfEntriesInBacklog(false), 3);
assertEquals(c1.getMarkDeletedPosition(), p0);

c1.delete(Lists.newArrayList(p1));

assertEquals(c1.getNumberOfEntries(), 2);
assertEquals(c1.getNumberOfEntriesInBacklog(), 2);
assertEquals(c1.getNumberOfEntriesInBacklog(false), 2);
assertEquals(c1.getMarkDeletedPosition(), p3);

c1.delete(Lists.newArrayList(p4, p6, p7));

assertEquals(c1.getNumberOfEntries(), 0);
assertEquals(c1.getNumberOfEntriesInBacklog(), 0);
assertEquals(c1.getNumberOfEntriesInBacklog(false), 0);
assertEquals(c1.getMarkDeletedPosition(), p7);
}

Expand Down
Loading

0 comments on commit df15210

Please sign in to comment.