From b314a5f4f1cb4b7f62a676648baddbd507d67a72 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 3 Jun 2019 10:42:44 -0400 Subject: [PATCH] Add AmqpHeaders.CHANNEL in error hander Resolves https://github.com/spring-projects/spring-amqp/issues/1012 Make the channel available in `RabbitListenerErrorHander` for when MANUAL ack mode is being used. **cherry-pick to 2.1.x** --- .../adapter/MessagingMessageListenerAdapter.java | 5 +++++ .../annotation/EnableRabbitIntegrationTests.java | 5 +++++ src/reference/asciidoc/amqp.adoc | 15 +++++++++++++++ 3 files changed, 25 insertions(+) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java index 34cf2fd291..aeb982b6d9 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java @@ -26,6 +26,7 @@ import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler; import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; import org.springframework.amqp.support.AmqpHeaderMapper; +import org.springframework.amqp.support.AmqpHeaders; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.MessagingMessageConverter; @@ -33,6 +34,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.remoting.support.RemoteInvocationResult; import org.springframework.util.Assert; @@ -142,6 +144,9 @@ public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel catch (ListenerExecutionFailedException e) { if (this.errorHandler != null) { try { + message = MessageBuilder.fromMessage(message) + .setHeader(AmqpHeaders.CHANNEL, channel) + .build(); Object errorResult = this.errorHandler.handleError(amqpMessage, message, e); if (errorResult != null) { handleResult(new InvocationResult(errorResult, null, null), amqpMessage, channel, message); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java index a6611db47e..63ffd50758 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java @@ -765,6 +765,8 @@ public void listenerErrorHandlerException() { catch (Exception e) { assertThat(e.getCause().getMessage()).isEqualTo("from error handler"); assertThat(e.getCause().getCause().getMessage()).isEqualTo("return this"); + EnableRabbitConfig config = this.context.getBean(EnableRabbitConfig.class); + assertThat(config.errorHandlerChannel).isNotNull(); } } @@ -1351,6 +1353,8 @@ public static class EnableRabbitConfig { private final CountDownLatch noListenerLatch = new CountDownLatch(1); + private volatile Channel errorHandlerChannel; + @Bean public ConnectionNameStrategy cns() { return new SimplePropertyValueConnectionNameStrategy("spring.application.name"); @@ -1552,6 +1556,7 @@ public RabbitListenerErrorHandler alwaysBARHandler() { @Bean public RabbitListenerErrorHandler throwANewException() { return (m, sm, e) -> { + this.errorHandlerChannel = sm.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class); throw new RuntimeException("from error handler", e.getCause()); }; } diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index f81221ffb3..fa200c14cc 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -2756,6 +2756,21 @@ If you use JSON, consider using an `errorHandler` to return some other Jackson-f IMPORTANT: In version 2.1, this interface moved from package `o.s.amqp.rabbit.listener` to `o.s.amqp.rabbit.listener.api`. +Starting with version 2.1.7, the `Channel` is available in a messaging message header; this allows you to ack or nack the failed messasge when using `AcknowledgeMode.MANUAL`: + +==== +[source, java] +---- +public Object handleError(Message amqpMessage, org.springframework.messaging.Message message, + ListenerExecutionFailedException exception) { + ... + message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class) + .basicReject(message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), + true); + } +---- +==== + ====== Container Management Containers created for annotations are not registered with the application context.