Skip to content

Commit

Permalink
[improve][broker] Optimize PersistentTopic.getLastDispatchablePosition (
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun authored and nikhil-ctds committed Jun 17, 2024
1 parent 5c834f2 commit 0337bf6
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ protected TopicStatsHelper initialValue() {

private volatile CloseFutures closeFutures;

// The last position that can be dispatched to consumers
private volatile Position lastDispatchablePosition;

/***
* We use 2 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return
* the in-progress one when it is called the second time.
Expand Down Expand Up @@ -3476,18 +3479,57 @@ public Position getLastPosition() {

@Override
public CompletableFuture<Position> getLastDispatchablePosition() {
return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
if (Markers.isServerOnlyMarker(md)) {
return false;
} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
// Filter-out transaction aborted messages.
TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits());
return !isTxnAborted(txnID, (PositionImpl) entry.getPosition());
}
return true;
}, getMaxReadPosition());
if (lastDispatchablePosition != null) {
return CompletableFuture.completedFuture(lastDispatchablePosition);
}
return ManagedLedgerImplUtils
.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
if (Markers.isServerOnlyMarker(md)) {
return false;
} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
// Filter-out transaction aborted messages.
TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits());
return !isTxnAborted(txnID, (PositionImpl) entry.getPosition());
}
return true;
}, getMaxReadPosition())
.thenApply(position -> {
// Update lastDispatchablePosition to the given position
updateLastDispatchablePosition(position);
return position;
});
}

/**
* Update lastDispatchablePosition if the given position is greater than the lastDispatchablePosition.
*
* @param position
*/
public synchronized void updateLastDispatchablePosition(Position position) {
// Update lastDispatchablePosition to null if the position is null, fallback to
// ManagedLedgerImplUtils#asyncGetLastValidPosition
if (position == null) {
lastDispatchablePosition = null;
return;
}

PositionImpl position0 = (PositionImpl) position;
// If the position is greater than the maxReadPosition, ignore
if (position0.compareTo(getMaxReadPosition()) > 0) {
return;
}
// If the lastDispatchablePosition is null, set it to the position
if (lastDispatchablePosition == null) {
lastDispatchablePosition = position;
return;
}
// If the position is greater than the lastDispatchablePosition, update it
PositionImpl lastDispatchablePosition0 = (PositionImpl) lastDispatchablePosition;
if (position0.compareTo(lastDispatchablePosition0) > 0) {
lastDispatchablePosition = position;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
if (!isMarkerMessage) {
updateLastDispatchablePosition(position);
if (maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}
}

Expand Down Expand Up @@ -436,4 +439,11 @@ public long getCommittedTxnCount() {
.filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED))
.count();
}

// ThreadSafe
private void updateLastDispatchablePosition(Position position) {
if (topic instanceof PersistentTopic t) {
t.updateLastDispatchablePosition(position);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ private void handleTransactionMessage(TxnID txnId, Position position) {
}
}

// ThreadSafe
private void updateLastDispatchablePosition(Position position) {
topic.updateLastDispatchablePosition(position);
}

@Override
public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) {
return null;
Expand Down Expand Up @@ -459,6 +464,8 @@ void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
} else {
updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
}
// Update the last dispatchable position to null if there is a TXN finished.
updateLastDispatchablePosition(null);
}

/**
Expand Down Expand Up @@ -523,6 +530,10 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean i
}
}
}
// If the message is a normal message, update the last dispatchable position.
if (!isMarkerMessage) {
updateLastDispatchablePosition(position);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
if (!isMarkerMessage) {
updateLastDispatchablePosition(position);
if (maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}
}

Expand Down Expand Up @@ -148,4 +151,11 @@ public long getAbortedTxnCount() {
public long getCommittedTxnCount() {
return 0;
}

// ThreadSafe
private void updateLastDispatchablePosition(Position position) {
if (topic instanceof PersistentTopic t) {
t.updateLastDispatchablePosition(position);
}
}
}

0 comments on commit 0337bf6

Please sign in to comment.