Skip to content

Commit

Permalink
[improve][broker] Optimize PersistentTopic.getLastDispatchablePosition (
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun authored and lhotari committed Jun 13, 2024
1 parent dae7d8b commit 912ae3c
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ protected TopicStatsHelper initialValue() {

private volatile CloseFutures closeFutures;

// The last position that can be dispatched to consumers
private volatile Position lastDispatchablePosition;

/***
* We use 2 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return
* the in-progress one when it is called the second time.
Expand Down Expand Up @@ -3459,18 +3462,57 @@ public Position getLastPosition() {

@Override
public CompletableFuture<Position> getLastDispatchablePosition() {
return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
if (Markers.isServerOnlyMarker(md)) {
return false;
} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
// Filter-out transaction aborted messages.
TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits());
return !isTxnAborted(txnID, (PositionImpl) entry.getPosition());
}
return true;
}, getMaxReadPosition());
if (lastDispatchablePosition != null) {
return CompletableFuture.completedFuture(lastDispatchablePosition);
}
return ManagedLedgerImplUtils
.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
if (Markers.isServerOnlyMarker(md)) {
return false;
} else if (md.hasTxnidMostBits() && md.hasTxnidLeastBits()) {
// Filter-out transaction aborted messages.
TxnID txnID = new TxnID(md.getTxnidMostBits(), md.getTxnidLeastBits());
return !isTxnAborted(txnID, (PositionImpl) entry.getPosition());
}
return true;
}, getMaxReadPosition())
.thenApply(position -> {
// Update lastDispatchablePosition to the given position
updateLastDispatchablePosition(position);
return position;
});
}

/**
* Update lastDispatchablePosition if the given position is greater than the lastDispatchablePosition.
*
* @param position
*/
public synchronized void updateLastDispatchablePosition(Position position) {
// Update lastDispatchablePosition to null if the position is null, fallback to
// ManagedLedgerImplUtils#asyncGetLastValidPosition
if (position == null) {
lastDispatchablePosition = null;
return;
}

PositionImpl position0 = (PositionImpl) position;
// If the position is greater than the maxReadPosition, ignore
if (position0.compareTo(getMaxReadPosition()) > 0) {
return;
}
// If the lastDispatchablePosition is null, set it to the position
if (lastDispatchablePosition == null) {
lastDispatchablePosition = position;
return;
}
// If the position is greater than the lastDispatchablePosition, update it
PositionImpl lastDispatchablePosition0 = (PositionImpl) lastDispatchablePosition;
if (position0.compareTo(lastDispatchablePosition0) > 0) {
lastDispatchablePosition = position;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
if (!isMarkerMessage) {
updateLastDispatchablePosition(position);
if (maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}
}

Expand Down Expand Up @@ -424,4 +427,11 @@ public long getCommittedTxnCount() {
.filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED))
.count();
}

// ThreadSafe
private void updateLastDispatchablePosition(Position position) {
if (topic instanceof PersistentTopic t) {
t.updateLastDispatchablePosition(position);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ private void handleTransactionMessage(TxnID txnId, Position position) {
}
}

// ThreadSafe
private void updateLastDispatchablePosition(Position position) {
topic.updateLastDispatchablePosition(position);
}

@Override
public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) {
return null;
Expand Down Expand Up @@ -455,6 +460,8 @@ void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
} else {
updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
}
// Update the last dispatchable position to null if there is a TXN finished.
updateLastDispatchablePosition(null);
}

/**
Expand Down Expand Up @@ -519,6 +526,10 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean i
}
}
}
// If the message is a normal message, update the last dispatchable position.
if (!isMarkerMessage) {
updateLastDispatchablePosition(position);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ public boolean isTxnAborted(TxnID txnID, PositionImpl readPosition) {

@Override
public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
if (!isMarkerMessage && maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
if (!isMarkerMessage) {
updateLastDispatchablePosition(position);
if (maxReadPositionCallBack != null) {
maxReadPositionCallBack.maxReadPositionMovedForward(null, position);
}
}
}

Expand Down Expand Up @@ -136,4 +139,11 @@ public long getAbortedTxnCount() {
public long getCommittedTxnCount() {
return 0;
}

// ThreadSafe
private void updateLastDispatchablePosition(Position position) {
if (topic instanceof PersistentTopic t) {
t.updateLastDispatchablePosition(position);
}
}
}

0 comments on commit 912ae3c

Please sign in to comment.