From 34ea8a343e6048ae9881526437353198c9c745fb Mon Sep 17 00:00:00 2001 From: Andy Lehane Date: Thu, 27 Apr 2023 11:00:49 +0100 Subject: [PATCH 1/3] Bug Fix To Handle Forwarded Direct Reply To and Non Direct Reply To Q's Reason for update - we have a system that uses IBMMQ via JMS and are converting some operations to use RabbitMQ, rather than IBMMQ. To be able to maintain backwards compatibility and remain technology agnostic, we're using the Rabbit-JMS-Client. rather than than the native rabbitmq client. This update is a bug fix for allowing the rabbitmq-jms-client to correctly respond to handle the following use cases: - Replying correctly to a message containing a direct reply to queue by ensuring the rabbit correlation id is set (as well as the JMS Correllation id). - Forwarding on a message containing a direct reply to queue to a further consumer where a future consumer will reply to the initial client. - Handling a reply to queue that's not a "direct reply to" or a forwarded "direct reply to" queue. Change Details RMQMessageProducer: - Replying to a direct reply to - this has been acheived by ensuring that the JMSCorrelationId is set as the rabbit mq correllation id, as well as being set as the jms property form of the correlation id - Rename maybeSetReplyToPropertyToDirectReplyTo to setReplyToProperty to allow any reply to queue to be sent onto the consumer, not just direct reply to initialisations. This allows the rabbit-jms-client to handle forwarded direct-reply-to and non-direct reply to destinations. RMQMessage: - Rename maybeSetupDirectReplyTo to setupJMSReplyTo and update to be able to handle non-direct reply to queues. This code assumes that the reply to queue will be hosted on the same exchange as the messaging being sent I'm not sure if this is a valid assumption or whether this should potentially allow the use of: - - Use the same exchange - / - Allow an exchange name to be set for the reply to queue. --- .../com/rabbitmq/jms/client/RMQMessage.java | 56 +++-- .../jms/client/RMQMessageProducer.java | 22 +- .../jms/client/RMQMessageProducerTest.java | 231 +++++++++++++++++- .../rabbitmq/jms/client/RMQMessageTest.java | 117 ++++++++- 4 files changed, 393 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/rabbitmq/jms/client/RMQMessage.java b/src/main/java/com/rabbitmq/jms/client/RMQMessage.java index 547d5013..2c910f96 100644 --- a/src/main/java/com/rabbitmq/jms/client/RMQMessage.java +++ b/src/main/java/com/rabbitmq/jms/client/RMQMessage.java @@ -849,7 +849,7 @@ static RMQMessage convertJmsMessage(RMQSession session, GetResponse response, Re // JMSProperties already set message.setReadonly(true); // Set readOnly - mandatory for received messages - maybeSetupDirectReplyTo(message, response.getProps().getReplyTo()); + setupJMSReplyTo(message, response.getProps().getReplyTo()); receivingContextConsumer.accept(new ReceivingContext(message)); return message; @@ -869,7 +869,7 @@ private static RMQMessage convertAmqpMessage(RMQSession session, RMQDestination message.setJMSPropertiesFromAmqpProperties(props); message.setReadonly(true); // Set readOnly - mandatory for received messages - maybeSetupDirectReplyTo(message, response.getProps().getReplyTo()); + setupJMSReplyTo(message, response.getProps().getReplyTo()); receivingContextConsumer.accept(new ReceivingContext(message)); return message; @@ -899,23 +899,33 @@ private static RMQMessage handleJmsRedeliveredAndDeliveryCount(GetResponse respo } /** - * Properly assign JMSReplyTo header when using direct reply to. + * Properly assign JMSReplyTo header when using reply to. *

* On a received request message, the AMQP reply-to property is * set to a specific amq.rabbitmq.reply-to.ID value. * We must use this value for the JMS reply to destination if * we want to send the response back to the destination the sender * is waiting. + *

+ * If we are not using a direct reply to, then assume that the + * reply to queue is hosted on the same exchange as the message + * being sent. * * @param message * @param replyTo * @throws JMSException * @since 1.11.0 */ - private static void maybeSetupDirectReplyTo(RMQMessage message, String replyTo) throws JMSException { - if (replyTo != null && replyTo.startsWith(DIRECT_REPLY_TO)) { - RMQDestination replyToDestination = new RMQDestination(DIRECT_REPLY_TO, "", replyTo, replyTo); - message.setJMSReplyTo(replyToDestination); + private static void setupJMSReplyTo(RMQMessage message, String replyTo) throws JMSException { + if (replyTo != null) { + if (replyTo.startsWith(DIRECT_REPLY_TO)) { + RMQDestination replyToDestination = new RMQDestination(DIRECT_REPLY_TO, "", replyTo, replyTo); + message.setJMSReplyTo(replyToDestination); + } else { + // If we're not a direct reply-to, assume we're replying on the same exhange as the initial request. + RMQDestination replyToDestination = new RMQDestination(replyTo, ((RMQDestination) message.getJMSDestination()).getAmqpExchangeName(), replyTo, replyTo); + message.setJMSReplyTo(replyToDestination); + } } } @@ -1242,24 +1252,24 @@ void generateInternalID() { this.rmqProperties.put(JMS_MESSAGE_ID, "ID:" + this.internalMessageID); } - /** - * Utility method used to be able to write primitives and objects to a data - * stream without keeping track of order and type. - *

- * This also allows any Object to be written. - *

- *

- * The purpose of this method is to optimise the writing of a primitive that - * is represented as an object by only writing the type and the primitive - * value to the stream. - *

- * - * @param s the primitive to be written - * @param out the stream to write the primitive to. - * @throws IOException if an I/O error occurs + /** + * Utility method used to be able to write primitives and objects to a data + * stream without keeping track of order and type. + *

+ * This also allows any Object to be written. + *

+ *

+ * The purpose of this method is to optimise the writing of a primitive that + * is represented as an object by only writing the type and the primitive + * value to the stream. + *

+ * + * @param s the primitive to be written + * @param out the stream to write the primitive to. + * @throws IOException if an I/O error occurs * @throws MessageFormatException if message cannot be parsed * - */ + */ protected static void writePrimitive(Object s, ObjectOutput out) throws IOException, MessageFormatException { writePrimitive(s, out, false); } diff --git a/src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java b/src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java index ae4f4a0e..61da93e8 100644 --- a/src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java +++ b/src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java @@ -8,7 +8,6 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.jms.admin.RMQDestination; -import com.rabbitmq.jms.client.message.RMQBytesMessage; import com.rabbitmq.jms.client.message.RMQTextMessage; import com.rabbitmq.jms.util.RMQJMSException; import java.util.Map; @@ -373,6 +372,7 @@ private void sendAMQPMessage(RMQDestination destination, RMQMessage msg, Message bob.contentType("application/octet-stream"); bob.deliveryMode(RMQMessage.rmqDeliveryMode(deliveryMode)); bob.priority(priority); + bob.correlationId(msg.getJMSCorrelationID()); bob.expiration(rmqExpiration(timeToLive)); Map messageHeaders = msg.toAmqpHeaders(); if (this.keepTextMessageType && msg instanceof RMQTextMessage) { @@ -382,7 +382,7 @@ private void sendAMQPMessage(RMQDestination destination, RMQMessage msg, Message String targetAmqpExchangeName = session.delayMessage(destination, messageHeaders, deliveryDelay); bob.headers(messageHeaders); - maybeSetReplyToPropertyToDirectReplyTo(bob, msg); + setReplyToProperty(bob, msg); bob = amqpPropertiesCustomiser.apply(bob, msg); @@ -409,12 +409,13 @@ protected void sendJMSMessage(RMQDestination destination, RMQMessage msg, Messag bob.contentType("application/octet-stream"); bob.deliveryMode(RMQMessage.rmqDeliveryMode(deliveryMode)); bob.priority(priority); + bob.correlationId(msg.getJMSCorrelationID()); bob.expiration(rmqExpiration(timeToLive)); Map headers = msg.toHeaders(); String targetAmqpExchangeName = session.delayMessage(destination, headers, deliveryDelay); bob.headers(headers); - maybeSetReplyToPropertyToDirectReplyTo(bob, msg); + setReplyToProperty(bob, msg); byte[] data = msg.toByteArray(); @@ -427,11 +428,16 @@ protected void sendJMSMessage(RMQDestination destination, RMQMessage msg, Messag } /** - * Set AMQP reply-to property to direct-reply-to if necessary. + * Set AMQP reply-to property to reply-to if necessary. + *

*

* Set the reply-to property to amq.rabbitmq.reply-to * if the JMSReplyTo header is set to a destination with that - * name. + * name and does not have a routing key (which indicates this this is a forwarded + * reply to destination). + *

+ * Set the reply-to property to the amq routing key, if the routing key + * if not null. *

* For outbound RPC request. * @@ -440,11 +446,13 @@ protected void sendJMSMessage(RMQDestination destination, RMQMessage msg, Messag * @throws JMSException * @since 1.11.0 */ - private static void maybeSetReplyToPropertyToDirectReplyTo(AMQP.BasicProperties.Builder builder, RMQMessage msg) throws JMSException { + private static void setReplyToProperty(AMQP.BasicProperties.Builder builder, RMQMessage msg) throws JMSException { if (msg.getJMSReplyTo() != null && msg.getJMSReplyTo() instanceof RMQDestination) { RMQDestination replyTo = (RMQDestination) msg.getJMSReplyTo(); - if (DIRECT_REPLY_TO.equals(replyTo.getDestinationName())) { + if (DIRECT_REPLY_TO.equals(replyTo.getDestinationName()) && replyTo.getAmqpRoutingKey() == null) { builder.replyTo(DIRECT_REPLY_TO); + } else { + builder.replyTo(replyTo.getAmqpRoutingKey()); } } } diff --git a/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java b/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java index ec01b754..53ef0c27 100644 --- a/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java +++ b/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java @@ -2,18 +2,19 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. // -// Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved. +// Copyright (c) 2017-2023 VMware, Inc. or its affiliates. All rights reserved. package com.rabbitmq.jms.client; import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.jms.admin.RMQDestination; import com.rabbitmq.jms.client.message.RMQBytesMessage; import com.rabbitmq.jms.client.message.RMQTextMessage; import javax.jms.CompletionListener; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import javax.jms.DeliveryMode; @@ -21,6 +22,7 @@ import javax.jms.Message; import java.io.IOException; +import java.util.Map; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; @@ -32,10 +34,12 @@ public class RMQMessageProducerTest { RMQSession session; RMQDestination destination; + Channel channel; @BeforeEach public void init() { session = mock(RMQSession.class); destination = mock(RMQDestination.class); + channel = mock(Channel.class); } @Test public void preferProducerPropertyNoMessagePropertySpecified() throws Exception { @@ -151,6 +155,229 @@ public class RMQMessageProducerTest { verify(channel).basicPublish(eq("x"), anyString(), any(AMQP.BasicProperties.class), any(byte[].class)); } + @Test + @DisplayName("RMQMessageProducer::send should ensure that an AMQP message has the correlation id on the rabbit message") + public void sendAMQPMessageWithCorrelationIdCopy() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQTextMessage message = new RMQTextMessage(); + message.setText("Test message"); + message.setJMSCorrelationID("TESTID"); + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(true); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("TESTID", propCapture.getValue().getCorrelationId()); + } + } + + @Test + @DisplayName("RMQMessageProducer::send should ensure that a JMS message has the correlation id on the rabbit message") + public void sendJMSMessageWithCorrelationIdCopy() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQMessage message = mock(RMQMessage.class); + + when(message.toByteArray()).thenReturn("Test message".getBytes()); + when(message.getJMSCorrelationID()).thenReturn("TESTID"); + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(false); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("TESTID", propCapture.getValue().getCorrelationId()); + } + } + + @Test + @DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a direct reply to is handled correctly") + public void sendAMQPMessageWithDirectReplyTo() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQTextMessage message = new RMQTextMessage(); + message.setText("Test message"); + message.setJMSCorrelationID("TESTID"); + message.setJMSReplyTo(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to", "amq.rabbitmq.reply-to")); + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(true); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("amq.rabbitmq.reply-to", propCapture.getValue().getReplyTo()); + } + } + + @Test + @DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a forwarded direct reply to is handled correctly") + public void sendAMQPMessageWithDirectReplyToForwardedId() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQTextMessage message = new RMQTextMessage(); + message.setText("Test message"); + message.setJMSCorrelationID("TESTID"); + message.setJMSReplyTo(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to-replyToID", "amq.rabbitmq.reply-to")); + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(true); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("amq.rabbitmq.reply-to-replyToID", propCapture.getValue().getReplyTo()); + } + } + + @Test + @DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a non-direct reply to is handled correctly") + public void sendAMQPMessageWithGenericReplyTo() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQTextMessage message = new RMQTextMessage(); + message.setText("Test message"); + message.setJMSCorrelationID("TESTID"); + message.setJMSReplyTo(new RMQDestination("other-replyto", "exch", "other-replyto", "other-replyto")); + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(true); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("other-replyto", propCapture.getValue().getReplyTo()); + } + } + + @Test + @DisplayName("RMQMessageProducer::send should ensure that a JMS message with a direct reply to is handled correctly") + public void sendJMSMessageWithDirectReplyTo() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQMessage message = mock(RMQMessage.class); + + when(message.toByteArray()).thenReturn("Test message".getBytes()); + when(message.getJMSCorrelationID()).thenReturn("TESTID"); + when(message.getJMSReplyTo()).thenReturn(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to", "amq.rabbitmq.reply-to")); + + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(false); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("amq.rabbitmq.reply-to", propCapture.getValue().getReplyTo()); + } + } + + @Test + @DisplayName("RMQMessageProducer::send should ensure that a JMS message with a forwarded direct reply to is handled correctly") + public void sendJMSMessageWithDirectReplyToWithForwardedId() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQMessage message = mock(RMQMessage.class); + + when(message.toByteArray()).thenReturn("Test message".getBytes()); + when(message.getJMSCorrelationID()).thenReturn("TESTID"); + when(message.getJMSReplyTo()).thenReturn(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to-forwarded-id", "amq.rabbitmq.reply-to")); + + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(false); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("amq.rabbitmq.reply-to-forwarded-id", propCapture.getValue().getReplyTo()); + } + } + + + @Test + @DisplayName("RMQMessageProducer::send should ensure that a JMS message with a non-direct reply to is handled correctly") + public void sendJMSMessageWithGenericReplyTo() throws JMSException, IOException { + + try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) { + RMQMessage message = mock(RMQMessage.class); + + when(message.toByteArray()).thenReturn("Test message".getBytes()); + when(message.getJMSCorrelationID()).thenReturn("TESTID"); + when(message.getJMSReplyTo()).thenReturn(new RMQDestination("other-reply-to", "", "other-reply-to", "other-reply-to")); + + + when(session.getChannel()).thenReturn(channel); + when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch"); + + when(destination.isAmqp()).thenReturn(false); + when(destination.isAmqpWritable()).thenReturn(true); + when(destination.getAmqpRoutingKey()).thenReturn("key"); + + producer.send(message); + + ArgumentCaptor propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class); + + verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes())); + + assertEquals("other-reply-to", propCapture.getValue().getReplyTo()); + } + } + + static class StubRMQMessageProducer extends RMQMessageProducer { RMQMessage message; diff --git a/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java b/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java index ab182e99..84e5950c 100644 --- a/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java +++ b/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java @@ -5,7 +5,17 @@ // Copyright (c) 2014-2022 VMware, Inc. or its affiliates. All rights reserved. package com.rabbitmq.jms.client; +import com.rabbitmq.client.AMQP.BasicProperties; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.GetResponse; +import com.rabbitmq.jms.admin.RMQDestination; import com.rabbitmq.jms.client.message.RMQTextMessage; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -15,9 +25,23 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; class RMQMessageTest { + RMQSession session; + GetResponse getResponse; + ReceivingContextConsumer consumer; + + @BeforeEach public void init() { + session = mock(RMQSession.class); + getResponse = mock(GetResponse.class); + consumer = mock(ReceivingContextConsumer.class); + } + + @Test @DisplayName("RMQMessage::toAmqpHeaders should convert all properties to amqp headers") void convertsAllTypesToAmqpHeaders() throws JMSException, IOException { @@ -47,4 +71,95 @@ void convertsAllTypesToAmqpHeaders() throws JMSException, IOException { ); } -} \ No newline at end of file + + @Test + @DisplayName("RMQMessage::convertMessage - amqp message - ensure JMS reply to is null with no replyto") + void convertAMQPMessageWithNoReplyTo() throws JMSException { + + BasicProperties props = mock(BasicProperties.class); + Envelope envelope = mock(Envelope.class); + + when(getResponse.getProps()).thenReturn(props); + when(getResponse.getEnvelope()).thenReturn(envelope); + when(envelope.isRedeliver()).thenReturn(false); + + RMQDestination destination = new RMQDestination("dest", "exch", "key", "queue"); + destination.setAmqp(true); + + RMQMessage result = RMQMessage.convertMessage(session, destination, getResponse, consumer); + + assertNull(result.getJMSReplyTo()); + } + + @Test + @DisplayName("RMQMessage::convertMessage - amqp message - ensure JMS reply to is set to direct reply to") + void convertAMQPMessageWithDirectReplyTo() throws JMSException { + + BasicProperties props = mock(BasicProperties.class); + Envelope envelope = mock(Envelope.class); + + when(getResponse.getProps()).thenReturn(props); + when(getResponse.getEnvelope()).thenReturn(envelope); + when(envelope.isRedeliver()).thenReturn(false); + + when(props.getReplyTo()).thenReturn("amq.rabbitmq.reply-to"); + + RMQDestination destination = new RMQDestination("dest", "exch", "key", "queue"); + destination.setAmqp(true); + + RMQMessage result = RMQMessage.convertMessage(session, destination, getResponse, consumer); + + assertNotNull(result.getJMSReplyTo()); + + RMQDestination expected = new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to", "amq.rabbitmq.reply-to"); + assertEquals(expected, result.getJMSReplyTo()); + } + + @Test + @DisplayName("RMQMessage::convertMessage - amqp message - ensure JMS reply to is set to the forwarded direct reply to") + void convertAMQPMessageWithForwardedDirectReplyTo() throws JMSException { + + BasicProperties props = mock(BasicProperties.class); + Envelope envelope = mock(Envelope.class); + + when(getResponse.getProps()).thenReturn(props); + when(getResponse.getEnvelope()).thenReturn(envelope); + when(envelope.isRedeliver()).thenReturn(false); + + when(props.getReplyTo()).thenReturn("amq.rabbitmq.reply-to-forwarded-id"); + + RMQDestination destination = new RMQDestination("dest", "exch", "key", "queue"); + destination.setAmqp(true); + + RMQMessage result = RMQMessage.convertMessage(session, destination, getResponse, consumer); + + assertNotNull(result.getJMSReplyTo()); + + RMQDestination expected = new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to-forwarded-id", "amq.rabbitmq.reply-to-forwarded-id"); + assertEquals(expected, result.getJMSReplyTo()); + } + + @Test + @DisplayName("RMQMessage::convertMessage - amqp message - ensure JMS reply to is set to the non direct reply to") + void convertAMQPMessageWithNonDirectReplyTo() throws JMSException { + + BasicProperties props = mock(BasicProperties.class); + Envelope envelope = mock(Envelope.class); + + when(getResponse.getProps()).thenReturn(props); + when(getResponse.getEnvelope()).thenReturn(envelope); + when(envelope.isRedeliver()).thenReturn(false); + + when(props.getReplyTo()).thenReturn("non-direct-replyto"); + + RMQDestination destination = new RMQDestination("dest", "exch", "key", "queue"); + destination.setAmqp(true); + + RMQMessage result = RMQMessage.convertMessage(session, destination, getResponse, consumer); + + assertNotNull(result.getJMSReplyTo()); + + RMQDestination expected = new RMQDestination("non-direct-replyto", "exch", "non-direct-replyto", "non-direct-replyto"); + assertEquals(expected, result.getJMSReplyTo()); + } +} From 14aed30f6b4265d12cf7c56b8651f47d58a57934 Mon Sep 17 00:00:00 2001 From: Andy Lehane Date: Thu, 27 Apr 2023 13:13:52 +0100 Subject: [PATCH 2/3] Rollback RMQMessage Changes The RMQMessage.maybeSetupDirectReplyTo changes are not required. --- .../com/rabbitmq/jms/client/RMQMessage.java | 56 ++++++++----------- .../rabbitmq/jms/client/RMQMessageTest.java | 5 +- 2 files changed, 24 insertions(+), 37 deletions(-) diff --git a/src/main/java/com/rabbitmq/jms/client/RMQMessage.java b/src/main/java/com/rabbitmq/jms/client/RMQMessage.java index 2c910f96..547d5013 100644 --- a/src/main/java/com/rabbitmq/jms/client/RMQMessage.java +++ b/src/main/java/com/rabbitmq/jms/client/RMQMessage.java @@ -849,7 +849,7 @@ static RMQMessage convertJmsMessage(RMQSession session, GetResponse response, Re // JMSProperties already set message.setReadonly(true); // Set readOnly - mandatory for received messages - setupJMSReplyTo(message, response.getProps().getReplyTo()); + maybeSetupDirectReplyTo(message, response.getProps().getReplyTo()); receivingContextConsumer.accept(new ReceivingContext(message)); return message; @@ -869,7 +869,7 @@ private static RMQMessage convertAmqpMessage(RMQSession session, RMQDestination message.setJMSPropertiesFromAmqpProperties(props); message.setReadonly(true); // Set readOnly - mandatory for received messages - setupJMSReplyTo(message, response.getProps().getReplyTo()); + maybeSetupDirectReplyTo(message, response.getProps().getReplyTo()); receivingContextConsumer.accept(new ReceivingContext(message)); return message; @@ -899,33 +899,23 @@ private static RMQMessage handleJmsRedeliveredAndDeliveryCount(GetResponse respo } /** - * Properly assign JMSReplyTo header when using reply to. + * Properly assign JMSReplyTo header when using direct reply to. *

* On a received request message, the AMQP reply-to property is * set to a specific amq.rabbitmq.reply-to.ID value. * We must use this value for the JMS reply to destination if * we want to send the response back to the destination the sender * is waiting. - *

- * If we are not using a direct reply to, then assume that the - * reply to queue is hosted on the same exchange as the message - * being sent. * * @param message * @param replyTo * @throws JMSException * @since 1.11.0 */ - private static void setupJMSReplyTo(RMQMessage message, String replyTo) throws JMSException { - if (replyTo != null) { - if (replyTo.startsWith(DIRECT_REPLY_TO)) { - RMQDestination replyToDestination = new RMQDestination(DIRECT_REPLY_TO, "", replyTo, replyTo); - message.setJMSReplyTo(replyToDestination); - } else { - // If we're not a direct reply-to, assume we're replying on the same exhange as the initial request. - RMQDestination replyToDestination = new RMQDestination(replyTo, ((RMQDestination) message.getJMSDestination()).getAmqpExchangeName(), replyTo, replyTo); - message.setJMSReplyTo(replyToDestination); - } + private static void maybeSetupDirectReplyTo(RMQMessage message, String replyTo) throws JMSException { + if (replyTo != null && replyTo.startsWith(DIRECT_REPLY_TO)) { + RMQDestination replyToDestination = new RMQDestination(DIRECT_REPLY_TO, "", replyTo, replyTo); + message.setJMSReplyTo(replyToDestination); } } @@ -1252,24 +1242,24 @@ void generateInternalID() { this.rmqProperties.put(JMS_MESSAGE_ID, "ID:" + this.internalMessageID); } - /** - * Utility method used to be able to write primitives and objects to a data - * stream without keeping track of order and type. - *

- * This also allows any Object to be written. - *

- *

- * The purpose of this method is to optimise the writing of a primitive that - * is represented as an object by only writing the type and the primitive - * value to the stream. - *

- * - * @param s the primitive to be written - * @param out the stream to write the primitive to. - * @throws IOException if an I/O error occurs + /** + * Utility method used to be able to write primitives and objects to a data + * stream without keeping track of order and type. + *

+ * This also allows any Object to be written. + *

+ *

+ * The purpose of this method is to optimise the writing of a primitive that + * is represented as an object by only writing the type and the primitive + * value to the stream. + *

+ * + * @param s the primitive to be written + * @param out the stream to write the primitive to. + * @throws IOException if an I/O error occurs * @throws MessageFormatException if message cannot be parsed * - */ + */ protected static void writePrimitive(Object s, ObjectOutput out) throws IOException, MessageFormatException { writePrimitive(s, out, false); } diff --git a/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java b/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java index 84e5950c..f58a49d8 100644 --- a/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java +++ b/src/test/java/com/rabbitmq/jms/client/RMQMessageTest.java @@ -157,9 +157,6 @@ void convertAMQPMessageWithNonDirectReplyTo() throws JMSException { RMQMessage result = RMQMessage.convertMessage(session, destination, getResponse, consumer); - assertNotNull(result.getJMSReplyTo()); - - RMQDestination expected = new RMQDestination("non-direct-replyto", "exch", "non-direct-replyto", "non-direct-replyto"); - assertEquals(expected, result.getJMSReplyTo()); + assertNull(result.getJMSReplyTo()); } } From fcfbad4694f451de625510088bbbf8a6ad9c9d9c Mon Sep 17 00:00:00 2001 From: Andy Lehane Date: Thu, 27 Apr 2023 15:42:03 +0100 Subject: [PATCH 3/3] Tidy unchecked warnings in RMQMessageProducerTest --- .../com/rabbitmq/jms/client/RMQMessageProducerTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java b/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java index 53ef0c27..6c0aeae0 100644 --- a/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java +++ b/src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java @@ -155,6 +155,7 @@ public class RMQMessageProducerTest { verify(channel).basicPublish(eq("x"), anyString(), any(AMQP.BasicProperties.class), any(byte[].class)); } + @SuppressWarnings("unchecked") @Test @DisplayName("RMQMessageProducer::send should ensure that an AMQP message has the correlation id on the rabbit message") public void sendAMQPMessageWithCorrelationIdCopy() throws JMSException, IOException { @@ -181,6 +182,7 @@ public void sendAMQPMessageWithCorrelationIdCopy() throws JMSException, IOExcept } } + @SuppressWarnings("unchecked") @Test @DisplayName("RMQMessageProducer::send should ensure that a JMS message has the correlation id on the rabbit message") public void sendJMSMessageWithCorrelationIdCopy() throws JMSException, IOException { @@ -208,6 +210,7 @@ public void sendJMSMessageWithCorrelationIdCopy() throws JMSException, IOExcepti } } + @SuppressWarnings("unchecked") @Test @DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a direct reply to is handled correctly") public void sendAMQPMessageWithDirectReplyTo() throws JMSException, IOException { @@ -235,6 +238,7 @@ public void sendAMQPMessageWithDirectReplyTo() throws JMSException, IOException } } + @SuppressWarnings("unchecked") @Test @DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a forwarded direct reply to is handled correctly") public void sendAMQPMessageWithDirectReplyToForwardedId() throws JMSException, IOException { @@ -262,6 +266,7 @@ public void sendAMQPMessageWithDirectReplyToForwardedId() throws JMSException, I } } + @SuppressWarnings("unchecked") @Test @DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a non-direct reply to is handled correctly") public void sendAMQPMessageWithGenericReplyTo() throws JMSException, IOException { @@ -289,6 +294,7 @@ public void sendAMQPMessageWithGenericReplyTo() throws JMSException, IOException } } + @SuppressWarnings("unchecked") @Test @DisplayName("RMQMessageProducer::send should ensure that a JMS message with a direct reply to is handled correctly") public void sendJMSMessageWithDirectReplyTo() throws JMSException, IOException { @@ -318,6 +324,7 @@ public void sendJMSMessageWithDirectReplyTo() throws JMSException, IOException { } } + @SuppressWarnings("unchecked") @Test @DisplayName("RMQMessageProducer::send should ensure that a JMS message with a forwarded direct reply to is handled correctly") public void sendJMSMessageWithDirectReplyToWithForwardedId() throws JMSException, IOException { @@ -347,7 +354,7 @@ public void sendJMSMessageWithDirectReplyToWithForwardedId() throws JMSException } } - + @SuppressWarnings("unchecked") @Test @DisplayName("RMQMessageProducer::send should ensure that a JMS message with a non-direct reply to is handled correctly") public void sendJMSMessageWithGenericReplyTo() throws JMSException, IOException {