Skip to content

Commit

Permalink
[fix][broker] type cast on exceptions in exceptionally can lead to lo…
Browse files Browse the repository at this point in the history
…st calls (#23117)
  • Loading branch information
shibd authored Aug 2, 2024
1 parent f02ce6c commit 12588a8
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
});
return future;
}).thenAccept(ml -> callback.openLedgerComplete(ml, ctx)).exceptionally(exception -> {
callback.openLedgerFailed((ManagedLedgerException) exception.getCause(), ctx);
callback.openLedgerFailed(ManagedLedgerException
.getManagedLedgerException(FutureUtil.unwrapCompletionException(exception)), ctx);
return null;
});
}
Expand All @@ -475,7 +476,8 @@ public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);
}).exceptionally(e -> {
log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), ctx);
callback.openReadOnlyManagedLedgerFailed(ManagedLedgerException
.getManagedLedgerException(FutureUtil.unwrapCompletionException(e)), ctx);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,8 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
if (uninitializedCursors.containsKey(cursorName)) {
uninitializedCursors.get(cursorName).thenAccept(cursor -> callback.openCursorComplete(cursor, ctx))
.exceptionally(ex -> {
callback.openCursorFailed((ManagedLedgerException) ex, ctx);
callback.openCursorFailed(ManagedLedgerException
.getManagedLedgerException(FutureUtil.unwrapCompletionException(ex)), ctx);
return null;
});
return;
Expand Down Expand Up @@ -2975,9 +2976,8 @@ public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
truncateFuture.whenComplete((ignore, exc) -> {
if (exc != null) {
log.error("[{}] Error truncating ledger for deletion", name, exc);
callback.deleteLedgerFailed(exc instanceof ManagedLedgerException
? (ManagedLedgerException) exc : new ManagedLedgerException(exc),
ctx);
callback.deleteLedgerFailed(ManagedLedgerException.getManagedLedgerException(
FutureUtil.unwrapCompletionException(exc)), ctx);
} else {
asyncDeleteInternal(callback, ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4246,15 +4246,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
decrementPendingWriteOpsAndCheck();
})
.exceptionally(throwable -> {
throwable = throwable.getCause();
throwable = FutureUtil.unwrapCompletionException(throwable);
if (throwable instanceof NotAllowedException) {
publishContext.completed((NotAllowedException) throwable, -1, -1);
decrementPendingWriteOpsAndCheck();
return null;
} else if (!(throwable instanceof ManagedLedgerException)) {
throwable = new ManagedLedgerException(throwable);
} else {
addFailed(ManagedLedgerException.getManagedLedgerException(throwable), publishContext);
}
addFailed((ManagedLedgerException) throwable, publishContext);
return null;
});
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,7 @@ public static void asyncReadCompactedEntries(TopicCompactionService topicCompact
});
}).exceptionally((exception) -> {
exception = FutureUtil.unwrapCompletionException(exception);
ManagedLedgerException managedLedgerException;
if (exception instanceof ManagedLedgerException) {
managedLedgerException = (ManagedLedgerException) exception;
} else {
managedLedgerException = new ManagedLedgerException(exception);
}
callback.readEntriesFailed(managedLedgerException, readEntriesCtx);
callback.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(exception), readEntriesCtx);
return null;
});
}
Expand Down

0 comments on commit 12588a8

Please sign in to comment.