From 12588a8f30f05198e7e82879bb3351290bc15888 Mon Sep 17 00:00:00 2001
From: Baodi Shi <baodi@apache.org>
Date: Fri, 2 Aug 2024 14:46:31 +0800
Subject: [PATCH] [fix][broker] type cast on exceptions in exceptionally can
 lead to lost calls (#23117)

---
 .../bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java | 6 ++++--
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 8 ++++----
 .../pulsar/broker/service/persistent/PersistentTopic.java | 8 +++-----
 .../org/apache/pulsar/compaction/CompactedTopicUtils.java | 8 +-------
 4 files changed, 12 insertions(+), 18 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 398575461d5bf..8ba800ff88130 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -449,7 +449,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
                     });
             return future;
         }).thenAccept(ml -> callback.openLedgerComplete(ml, ctx)).exceptionally(exception -> {
-            callback.openLedgerFailed((ManagedLedgerException) exception.getCause(), ctx);
+            callback.openLedgerFailed(ManagedLedgerException
+                    .getManagedLedgerException(FutureUtil.unwrapCompletionException(exception)), ctx);
             return null;
         });
     }
@@ -475,7 +476,8 @@ public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
                     callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);
                 }).exceptionally(e -> {
                     log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
-                    callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), ctx);
+                    callback.openReadOnlyManagedLedgerFailed(ManagedLedgerException
+                            .getManagedLedgerException(FutureUtil.unwrapCompletionException(e)), ctx);
                     return null;
                 });
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 209bf57b24f0f..14d424dc7eacd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -988,7 +988,8 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
         if (uninitializedCursors.containsKey(cursorName)) {
             uninitializedCursors.get(cursorName).thenAccept(cursor -> callback.openCursorComplete(cursor, ctx))
                     .exceptionally(ex -> {
-                callback.openCursorFailed((ManagedLedgerException) ex, ctx);
+                callback.openCursorFailed(ManagedLedgerException
+                        .getManagedLedgerException(FutureUtil.unwrapCompletionException(ex)), ctx);
                 return null;
             });
             return;
@@ -2975,9 +2976,8 @@ public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
         truncateFuture.whenComplete((ignore, exc) -> {
             if (exc != null) {
                 log.error("[{}] Error truncating ledger for deletion", name, exc);
-                callback.deleteLedgerFailed(exc instanceof ManagedLedgerException
-                        ? (ManagedLedgerException) exc : new ManagedLedgerException(exc),
-                        ctx);
+                callback.deleteLedgerFailed(ManagedLedgerException.getManagedLedgerException(
+                        FutureUtil.unwrapCompletionException(exc)), ctx);
             } else {
                 asyncDeleteInternal(callback, ctx);
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7926545647e0d..7a520d879b782 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4246,15 +4246,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
                             decrementPendingWriteOpsAndCheck();
                         })
                         .exceptionally(throwable -> {
-                            throwable = throwable.getCause();
+                            throwable = FutureUtil.unwrapCompletionException(throwable);
                             if (throwable instanceof NotAllowedException) {
                               publishContext.completed((NotAllowedException) throwable, -1, -1);
                               decrementPendingWriteOpsAndCheck();
-                              return null;
-                            } else if (!(throwable instanceof ManagedLedgerException)) {
-                                throwable = new ManagedLedgerException(throwable);
+                            } else {
+                                addFailed(ManagedLedgerException.getManagedLedgerException(throwable), publishContext);
                             }
-                            addFailed((ManagedLedgerException) throwable, publishContext);
                             return null;
                         });
                 break;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
index aae332acfcbbc..5023180e0b979 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java
@@ -102,13 +102,7 @@ public static void asyncReadCompactedEntries(TopicCompactionService topicCompact
                     });
         }).exceptionally((exception) -> {
             exception = FutureUtil.unwrapCompletionException(exception);
-            ManagedLedgerException managedLedgerException;
-            if (exception instanceof ManagedLedgerException) {
-                managedLedgerException = (ManagedLedgerException) exception;
-            } else {
-                managedLedgerException = new ManagedLedgerException(exception);
-            }
-            callback.readEntriesFailed(managedLedgerException, readEntriesCtx);
+            callback.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(exception), readEntriesCtx);
             return null;
         });
     }