Skip to content

Commit

Permalink
[fix][broker] Fix deadlock while skip non-recoverable ledgers. (apach…
Browse files Browse the repository at this point in the history
…e#21915)

### Motivation
The sequence of events leading to the deadlock when methods from org.apache.bookkeeper.mledger.impl.ManagedCursorImpl are invoked concurrently is as follows:

1. Thread A calls asyncDelete, which then goes on to internally call internalAsyncMarkDelete. This results in acquiring a lock on pendingMarkDeleteOps through synchronized (pendingMarkDeleteOps).

2. Inside internalAsyncMarkDelete, internalMarkDelete is called which subsequently calls persistPositionToLedger. At the start of persistPositionToLedger, buildIndividualDeletedMessageRanges is invoked, where it tries to acquire a read lock using lock.readLock().lock(). At this point, if the write lock is being held by another thread, Thread A will block waiting for the read lock.

3. Concurrently, Thread B executes skipNonRecoverableLedger which first obtains a write lock using lock.writeLock().lock() and then proceeds to call asyncDelete.

4. At this moment, Thread B already holds the write lock and is attempting to acquire the synchronized lock on pendingMarkDeleteOps that Thread A already holds, while Thread A is waiting for the read lock that Thread B needs to release.

In code, the deadlock appears as follows:

Thread A: synchronized (pendingMarkDeleteOps) -> lock.readLock().lock() (waiting)
Thread B: lock.writeLock().lock() -> synchronized (pendingMarkDeleteOps) (waiting)

### Modifications

Avoid using a long-range lock.

Co-authored-by: ruihongzhou <[email protected]>
Co-authored-by: Jiwe Guo <[email protected]>
Co-authored-by: Lari Hotari <[email protected]>
  • Loading branch information
4 people committed Jan 31, 2024
1 parent 6f7b9d9 commit 37fc40c
Showing 1 changed file with 15 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.LongStream;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -2755,30 +2756,23 @@ public void skipNonRecoverableLedger(final long ledgerId){
if (ledgerInfo == null) {
return;
}
lock.writeLock().lock();
log.warn("[{}] [{}] Since the ledger [{}] is lost and the autoSkipNonRecoverableData is true, this ledger will"
+ " be auto acknowledge in subscription", ledger.getName(), name, ledgerId);
try {
for (int i = 0; i < ledgerInfo.getEntries(); i++) {
if (!individualDeletedMessages.contains(ledgerId, i)) {
asyncDelete(PositionImpl.get(ledgerId, i), new AsyncCallbacks.DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
// ignore.
}
asyncDelete(() -> LongStream.range(0, ledgerInfo.getEntries())
.mapToObj(i -> (Position) PositionImpl.get(ledgerId, i)).iterator(),
new AsyncCallbacks.DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
// ignore.
}

@Override
public void deleteFailed(ManagedLedgerException ex, Object ctx) {
// The method internalMarkDelete already handled the failure operation. We only need to
// make sure the memory state is updated.
// If the broker crashed, the non-recoverable ledger will be detected again.
}
}, null);
}
}
} finally {
lock.writeLock().unlock();
}
@Override
public void deleteFailed(ManagedLedgerException ex, Object ctx) {
// The method internalMarkDelete already handled the failure operation. We only need to
// make sure the memory state is updated.
// If the broker crashed, the non-recoverable ledger will be detected again.
}
}, null);
}

// //////////////////////////////////////////////////
Expand Down

0 comments on commit 37fc40c

Please sign in to comment.