diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 398575461d5bf..8ba800ff88130 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -449,7 +449,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { }); return future; }).thenAccept(ml -> callback.openLedgerComplete(ml, ctx)).exceptionally(exception -> { - callback.openLedgerFailed((ManagedLedgerException) exception.getCause(), ctx); + callback.openLedgerFailed(ManagedLedgerException + .getManagedLedgerException(FutureUtil.unwrapCompletionException(exception)), ctx); return null; }); } @@ -475,7 +476,8 @@ public void asyncOpenReadOnlyManagedLedger(String managedLedgerName, callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx); }).exceptionally(e -> { log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e); - callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), ctx); + callback.openReadOnlyManagedLedgerFailed(ManagedLedgerException + .getManagedLedgerException(FutureUtil.unwrapCompletionException(e)), ctx); return null; }); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 209bf57b24f0f..14d424dc7eacd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -988,7 +988,8 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP if (uninitializedCursors.containsKey(cursorName)) { uninitializedCursors.get(cursorName).thenAccept(cursor -> callback.openCursorComplete(cursor, ctx)) .exceptionally(ex -> { - callback.openCursorFailed((ManagedLedgerException) ex, ctx); + callback.openCursorFailed(ManagedLedgerException + .getManagedLedgerException(FutureUtil.unwrapCompletionException(ex)), ctx); return null; }); return; @@ -2975,9 +2976,8 @@ public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) { truncateFuture.whenComplete((ignore, exc) -> { if (exc != null) { log.error("[{}] Error truncating ledger for deletion", name, exc); - callback.deleteLedgerFailed(exc instanceof ManagedLedgerException - ? (ManagedLedgerException) exc : new ManagedLedgerException(exc), - ctx); + callback.deleteLedgerFailed(ManagedLedgerException.getManagedLedgerException( + FutureUtil.unwrapCompletionException(exc)), ctx); } else { asyncDeleteInternal(callback, ctx); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7926545647e0d..7a520d879b782 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4246,15 +4246,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon decrementPendingWriteOpsAndCheck(); }) .exceptionally(throwable -> { - throwable = throwable.getCause(); + throwable = FutureUtil.unwrapCompletionException(throwable); if (throwable instanceof NotAllowedException) { publishContext.completed((NotAllowedException) throwable, -1, -1); decrementPendingWriteOpsAndCheck(); - return null; - } else if (!(throwable instanceof ManagedLedgerException)) { - throwable = new ManagedLedgerException(throwable); + } else { + addFailed(ManagedLedgerException.getManagedLedgerException(throwable), publishContext); } - addFailed((ManagedLedgerException) throwable, publishContext); return null; }); break; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java index aae332acfcbbc..5023180e0b979 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java @@ -102,13 +102,7 @@ public static void asyncReadCompactedEntries(TopicCompactionService topicCompact }); }).exceptionally((exception) -> { exception = FutureUtil.unwrapCompletionException(exception); - ManagedLedgerException managedLedgerException; - if (exception instanceof ManagedLedgerException) { - managedLedgerException = (ManagedLedgerException) exception; - } else { - managedLedgerException = new ManagedLedgerException(exception); - } - callback.readEntriesFailed(managedLedgerException, readEntriesCtx); + callback.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(exception), readEntriesCtx); return null; }); }