Skip to content

Commit

Permalink
[fix][ml] Managed ledger should recover after open ledger failed (apa…
Browse files Browse the repository at this point in the history
…che#23368)

(cherry picked from commit 77cb67a)
  • Loading branch information
Demogorgon314 authored and lhotari committed Oct 10, 2024
1 parent e2712b1 commit c80eb40
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ public void initializeFailed(ManagedLedgerException e) {

// Clean the map if initialization fails
ledgers.remove(name, future);
entryCacheManager.removeEntryCache(name);

if (pendingInitializeLedgers.remove(name, pendingLedger)) {
pendingLedger.ledger.asyncClose(new CloseCallback() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.Entry;
Expand Down Expand Up @@ -509,6 +511,35 @@ public void recoverAfterWriteError() throws Exception {
entries.forEach(Entry::release);
}

@Test
public void recoverAfterOpenManagedLedgerFail() throws Exception {
ManagedLedger ledger = factory.open("recoverAfterOpenManagedLedgerFail");
Position position = ledger.addEntry("entry".getBytes());
ledger.close();
bkc.failAfter(0, BKException.Code.BookieHandleNotAvailableException);
try {
factory.open("recoverAfterOpenManagedLedgerFail");
} catch (Exception e) {
// ok
}

ledger = factory.open("recoverAfterOpenManagedLedgerFail");
CompletableFuture<byte[]> future = new CompletableFuture<>();
((ManagedLedgerImpl)ledger).asyncReadEntry((PositionImpl)position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
future.complete(entry.getData());
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
byte[] bytes = future.get(30, TimeUnit.SECONDS);
assertEquals(new String(bytes), "entry");
}

@Test
public void recoverLongTimeAfterMultipleWriteErrors() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("recoverLongTimeAfterMultipleWriteErrors");
Expand Down

0 comments on commit c80eb40

Please sign in to comment.