-
Notifications
You must be signed in to change notification settings - Fork 586
Add ConfirmationChannel for async publisher confirmations
#1824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add ConfirmationChannel for async publisher confirmations
#1824
Conversation
|
Hey @acogoluegnes! Here's a proposal for implementing "automatic" publisher confirmation tracking in a similar manner to the .NET client. As you're the expert here, please suggest better names, a better implementation, etc, as I'm just barely familiar enough with Java and this project to implement this feature with the help of a genie. I did, of course, review the code. If you like the way this is going, I thought I'd also add rate-throttling in a similar manner to the .NET client, i.e. as the outstanding confirmation window closes, increase delay between publishes to (hopefully) allow the broker to catch up. |
6a31a46 to
714370e
Compare
|
Thanks for this contributon @lukebakken. I think it is on the right track. I added some comments in the code. Just a few more remarks:
|
714370e to
44c3aca
Compare
c64f7ba to
5520ef4
Compare
|
@lukebakken I have sent you an external contributor invite for this repo. |
5520ef4 to
dd2fb5e
Compare
ConfirmationChannel for async publisher confirmations
|
@michaelklishin @acogoluegnes I took this comment to heart and re-implemented this feature as a |
041b932 to
b0761bf
Compare
The new `ConfirmationChannel` API introduced in rabbitmq/rabbitmq-java-client#1824 provides asynchronous publisher confirmation tracking with a `CompletableFuture`-based API, rate limiting, and message correlation support. This change adds `PublisherConfirmsAsync.java` to demonstrate the `ConfirmationChannel` API. The tutorial shows how to create a `ConfirmationChannel` wrapper with rate limiting, publish messages asynchronously with correlation context, and wait for all confirmations using `CompletableFuture.allOf()`. Depends on rabbitmq/rabbitmq-java-client#1824
b0761bf to
49a0ff5
Compare
c002375 to
8112046
Compare
|
Alrighty gang, all set I think. Small tutorial here - rabbitmq/rabbitmq-tutorials#707 |
Traditional publisher confirms in the Java client require manual tracking of sequence numbers and correlation of Basic.Return messages. This makes per-message error handling complex and provides no built-in async pattern, backpressure mechanism, or message correlation support. This change introduces `ConfirmationChannel`, a wrapper that provides automatic publisher confirmation tracking with a `CompletableFuture`-based API, optional throttling, and generic context parameter for message correlation. The implementation uses listener-based integration with existing `Channel` instances, requiring no modifications to `ChannelN`. New API components: - `ConfirmationChannel` interface - Extends `Channel` and adds `basicPublishAsync()` methods that return `CompletableFuture<T>` - `ConfirmationChannelN` implementation - Wraps any `Channel` instance and tracks confirmations via return/confirm/shutdown listeners - `PublishException` - Exception thrown when message is nack'd or returned, with sequence number, routing details, and user context The wrapper maintains independent sequence numbers using `AtomicLong` and stores confirmation state in a `ConcurrentHashMap`. Each entry holds the future, rate limiter permit, and user-provided context. Messages include an `x-seq-no` header for correlating Basic.Return responses. Rate limiting is optional via `RateLimiter` parameter. The `ThrottlingRateLimiter` implementation uses progressive delays (0-1000ms) based on capacity usage, applying backpressure when available permits fall below a threshold (default 50%). The `basicPublish()` and `waitForConfirms()` methods throw `UnsupportedOperationException` on `ConfirmationChannel` to prevent mixing synchronous and asynchronous patterns. All other `Channel` methods delegate to the wrapped instance. Tests include 9 unit tests for `ThrottlingRateLimiter` and 24 integration tests for publisher confirmation tracking with context parameter verification, rate limiting scenarios, and error handling.
8112046 to
9a31acc
Compare
|
@lukebakken since this is a feature by most definitions, we now would have to go through a special approval process on our end :( |
Traditional publisher confirms in the Java client require manual
tracking of sequence numbers and correlation of Basic.Return messages.
This makes per-message error handling complex and provides no built-in
async pattern, backpressure mechanism, or message correlation support.
This change introduces
ConfirmationChannel, a wrapper that providesautomatic publisher confirmation tracking with a
CompletableFuture-based API, optional throttling, and generic contextparameter for message correlation. The implementation uses
listener-based integration with existing
Channelinstances, requiringno modifications to
ChannelN.New API components:
ConfirmationChannelinterface - ExtendsChanneland addsbasicPublishAsync()methods that returnCompletableFuture<T>ConfirmationChannelNimplementation - Wraps anyChannelinstanceand tracks confirmations via return/confirm/shutdown listeners
PublishException- Exception thrown when message is nack'd orreturned, with sequence number, routing details, and user context
The wrapper maintains independent sequence numbers using
AtomicLongand stores confirmation state in a
ConcurrentHashMap. Each entry holdsthe future, rate limiter permit, and user-provided context. Messages
include an
x-seq-noheader for correlating Basic.Return responses.Rate limiting is optional via
RateLimiterparameter. TheThrottlingRateLimiterimplementation uses progressive delays(0-1000ms) based on capacity usage, applying backpressure when available
permits fall below a threshold (default 50%).
The
basicPublish()andwaitForConfirms()methods throwUnsupportedOperationExceptiononConfirmationChannelto preventmixing synchronous and asynchronous patterns. All other
Channelmethods delegate to the wrapped instance.