Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions src/main/java/com/rabbitmq/jms/client/RMQMessageProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.message.RMQBytesMessage;
import com.rabbitmq.jms.client.message.RMQTextMessage;
import com.rabbitmq.jms.util.RMQJMSException;
import java.util.Map;
Expand Down Expand Up @@ -373,6 +372,7 @@ private void sendAMQPMessage(RMQDestination destination, RMQMessage msg, Message
bob.contentType("application/octet-stream");
bob.deliveryMode(RMQMessage.rmqDeliveryMode(deliveryMode));
bob.priority(priority);
bob.correlationId(msg.getJMSCorrelationID());
bob.expiration(rmqExpiration(timeToLive));
Map<String, Object> messageHeaders = msg.toAmqpHeaders();
if (this.keepTextMessageType && msg instanceof RMQTextMessage) {
Expand All @@ -382,7 +382,7 @@ private void sendAMQPMessage(RMQDestination destination, RMQMessage msg, Message
String targetAmqpExchangeName = session.delayMessage(destination, messageHeaders, deliveryDelay);
bob.headers(messageHeaders);

maybeSetReplyToPropertyToDirectReplyTo(bob, msg);
setReplyToProperty(bob, msg);

bob = amqpPropertiesCustomiser.apply(bob, msg);

Expand All @@ -409,12 +409,13 @@ protected void sendJMSMessage(RMQDestination destination, RMQMessage msg, Messag
bob.contentType("application/octet-stream");
bob.deliveryMode(RMQMessage.rmqDeliveryMode(deliveryMode));
bob.priority(priority);
bob.correlationId(msg.getJMSCorrelationID());
bob.expiration(rmqExpiration(timeToLive));
Map<String, Object> headers = msg.toHeaders();
String targetAmqpExchangeName = session.delayMessage(destination, headers, deliveryDelay);
bob.headers(headers);

maybeSetReplyToPropertyToDirectReplyTo(bob, msg);
setReplyToProperty(bob, msg);

byte[] data = msg.toByteArray();

Expand All @@ -427,11 +428,16 @@ protected void sendJMSMessage(RMQDestination destination, RMQMessage msg, Messag
}

/**
* Set AMQP reply-to property to direct-reply-to if necessary.
* Set AMQP reply-to property to reply-to if necessary.
* <p>
* <p>
* Set the <code>reply-to</code> property to <code>amq.rabbitmq.reply-to</code>
* if the <code>JMSReplyTo</code> header is set to a destination with that
* name.
* name and does not have a routing key (which indicates this this is a forwarded
* reply to destination).
* <p>
* Set the <code>reply-to</code> property to the amq routing key, if the routing key
* if not null.
* <p>
* For outbound RPC request.
*
Expand All @@ -440,11 +446,13 @@ protected void sendJMSMessage(RMQDestination destination, RMQMessage msg, Messag
* @throws JMSException
* @since 1.11.0
*/
private static void maybeSetReplyToPropertyToDirectReplyTo(AMQP.BasicProperties.Builder builder, RMQMessage msg) throws JMSException {
private static void setReplyToProperty(AMQP.BasicProperties.Builder builder, RMQMessage msg) throws JMSException {
if (msg.getJMSReplyTo() != null && msg.getJMSReplyTo() instanceof RMQDestination) {
RMQDestination replyTo = (RMQDestination) msg.getJMSReplyTo();
if (DIRECT_REPLY_TO.equals(replyTo.getDestinationName())) {
if (DIRECT_REPLY_TO.equals(replyTo.getDestinationName()) && replyTo.getAmqpRoutingKey() == null) {
builder.replyTo(DIRECT_REPLY_TO);
} else {
builder.replyTo(replyTo.getAmqpRoutingKey());
}
}
}
Expand Down
238 changes: 236 additions & 2 deletions src/test/java/com/rabbitmq/jms/client/RMQMessageProducerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,27 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2017-2023 VMware, Inc. or its affiliates. All rights reserved.
package com.rabbitmq.jms.client;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.jms.admin.RMQDestination;
import com.rabbitmq.jms.client.message.RMQBytesMessage;
import com.rabbitmq.jms.client.message.RMQTextMessage;
import javax.jms.CompletionListener;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;

import java.io.IOException;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
Expand All @@ -32,10 +34,12 @@ public class RMQMessageProducerTest {

RMQSession session;
RMQDestination destination;
Channel channel;

@BeforeEach public void init() {
session = mock(RMQSession.class);
destination = mock(RMQDestination.class);
channel = mock(Channel.class);
}

@Test public void preferProducerPropertyNoMessagePropertySpecified() throws Exception {
Expand Down Expand Up @@ -151,6 +155,236 @@ public class RMQMessageProducerTest {
verify(channel).basicPublish(eq("x"), anyString(), any(AMQP.BasicProperties.class), any(byte[].class));
}

@SuppressWarnings("unchecked")
@Test
@DisplayName("RMQMessageProducer::send should ensure that an AMQP message has the correlation id on the rabbit message")
public void sendAMQPMessageWithCorrelationIdCopy() throws JMSException, IOException {

try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) {
RMQTextMessage message = new RMQTextMessage();
message.setText("Test message");
message.setJMSCorrelationID("TESTID");

when(session.getChannel()).thenReturn(channel);
when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch");

when(destination.isAmqp()).thenReturn(true);
when(destination.isAmqpWritable()).thenReturn(true);
when(destination.getAmqpRoutingKey()).thenReturn("key");

producer.send(message);

ArgumentCaptor<com.rabbitmq.client.AMQP.BasicProperties> propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class);

verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes()));

assertEquals("TESTID", propCapture.getValue().getCorrelationId());
}
}

@SuppressWarnings("unchecked")
@Test
@DisplayName("RMQMessageProducer::send should ensure that a JMS message has the correlation id on the rabbit message")
public void sendJMSMessageWithCorrelationIdCopy() throws JMSException, IOException {

try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) {
RMQMessage message = mock(RMQMessage.class);

when(message.toByteArray()).thenReturn("Test message".getBytes());
when(message.getJMSCorrelationID()).thenReturn("TESTID");

when(session.getChannel()).thenReturn(channel);
when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch");

when(destination.isAmqp()).thenReturn(false);
when(destination.isAmqpWritable()).thenReturn(true);
when(destination.getAmqpRoutingKey()).thenReturn("key");

producer.send(message);

ArgumentCaptor<com.rabbitmq.client.AMQP.BasicProperties> propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class);

verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes()));

assertEquals("TESTID", propCapture.getValue().getCorrelationId());
}
}

@SuppressWarnings("unchecked")
@Test
@DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a direct reply to is handled correctly")
public void sendAMQPMessageWithDirectReplyTo() throws JMSException, IOException {

try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) {
RMQTextMessage message = new RMQTextMessage();
message.setText("Test message");
message.setJMSCorrelationID("TESTID");
message.setJMSReplyTo(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to", "amq.rabbitmq.reply-to"));

when(session.getChannel()).thenReturn(channel);
when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch");

when(destination.isAmqp()).thenReturn(true);
when(destination.isAmqpWritable()).thenReturn(true);
when(destination.getAmqpRoutingKey()).thenReturn("key");

producer.send(message);

ArgumentCaptor<com.rabbitmq.client.AMQP.BasicProperties> propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class);

verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes()));

assertEquals("amq.rabbitmq.reply-to", propCapture.getValue().getReplyTo());
}
}

@SuppressWarnings("unchecked")
@Test
@DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a forwarded direct reply to is handled correctly")
public void sendAMQPMessageWithDirectReplyToForwardedId() throws JMSException, IOException {

try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) {
RMQTextMessage message = new RMQTextMessage();
message.setText("Test message");
message.setJMSCorrelationID("TESTID");
message.setJMSReplyTo(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to-replyToID", "amq.rabbitmq.reply-to"));

when(session.getChannel()).thenReturn(channel);
when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch");

when(destination.isAmqp()).thenReturn(true);
when(destination.isAmqpWritable()).thenReturn(true);
when(destination.getAmqpRoutingKey()).thenReturn("key");

producer.send(message);

ArgumentCaptor<com.rabbitmq.client.AMQP.BasicProperties> propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class);

verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes()));

assertEquals("amq.rabbitmq.reply-to-replyToID", propCapture.getValue().getReplyTo());
}
}

@SuppressWarnings("unchecked")
@Test
@DisplayName("RMQMessageProducer::send should ensure that an AMQP message with a non-direct reply to is handled correctly")
public void sendAMQPMessageWithGenericReplyTo() throws JMSException, IOException {

try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) {
RMQTextMessage message = new RMQTextMessage();
message.setText("Test message");
message.setJMSCorrelationID("TESTID");
message.setJMSReplyTo(new RMQDestination("other-replyto", "exch", "other-replyto", "other-replyto"));

when(session.getChannel()).thenReturn(channel);
when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch");

when(destination.isAmqp()).thenReturn(true);
when(destination.isAmqpWritable()).thenReturn(true);
when(destination.getAmqpRoutingKey()).thenReturn("key");

producer.send(message);

ArgumentCaptor<com.rabbitmq.client.AMQP.BasicProperties> propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class);

verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes()));

assertEquals("other-replyto", propCapture.getValue().getReplyTo());
}
}

@SuppressWarnings("unchecked")
@Test
@DisplayName("RMQMessageProducer::send should ensure that a JMS message with a direct reply to is handled correctly")
public void sendJMSMessageWithDirectReplyTo() throws JMSException, IOException {

try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) {
RMQMessage message = mock(RMQMessage.class);

when(message.toByteArray()).thenReturn("Test message".getBytes());
when(message.getJMSCorrelationID()).thenReturn("TESTID");
when(message.getJMSReplyTo()).thenReturn(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to", "amq.rabbitmq.reply-to"));


when(session.getChannel()).thenReturn(channel);
when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch");

when(destination.isAmqp()).thenReturn(false);
when(destination.isAmqpWritable()).thenReturn(true);
when(destination.getAmqpRoutingKey()).thenReturn("key");

producer.send(message);

ArgumentCaptor<com.rabbitmq.client.AMQP.BasicProperties> propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class);

verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes()));

assertEquals("amq.rabbitmq.reply-to", propCapture.getValue().getReplyTo());
}
}

@SuppressWarnings("unchecked")
@Test
@DisplayName("RMQMessageProducer::send should ensure that a JMS message with a forwarded direct reply to is handled correctly")
public void sendJMSMessageWithDirectReplyToWithForwardedId() throws JMSException, IOException {

try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) {
RMQMessage message = mock(RMQMessage.class);

when(message.toByteArray()).thenReturn("Test message".getBytes());
when(message.getJMSCorrelationID()).thenReturn("TESTID");
when(message.getJMSReplyTo()).thenReturn(new RMQDestination("amq.rabbitmq.reply-to", "", "amq.rabbitmq.reply-to-forwarded-id", "amq.rabbitmq.reply-to"));


when(session.getChannel()).thenReturn(channel);
when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch");

when(destination.isAmqp()).thenReturn(false);
when(destination.isAmqpWritable()).thenReturn(true);
when(destination.getAmqpRoutingKey()).thenReturn("key");

producer.send(message);

ArgumentCaptor<com.rabbitmq.client.AMQP.BasicProperties> propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class);

verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes()));

assertEquals("amq.rabbitmq.reply-to-forwarded-id", propCapture.getValue().getReplyTo());
}
}

@SuppressWarnings("unchecked")
@Test
@DisplayName("RMQMessageProducer::send should ensure that a JMS message with a non-direct reply to is handled correctly")
public void sendJMSMessageWithGenericReplyTo() throws JMSException, IOException {

try (RMQMessageProducer producer = new RMQMessageProducer(session, destination)) {
RMQMessage message = mock(RMQMessage.class);

when(message.toByteArray()).thenReturn("Test message".getBytes());
when(message.getJMSCorrelationID()).thenReturn("TESTID");
when(message.getJMSReplyTo()).thenReturn(new RMQDestination("other-reply-to", "", "other-reply-to", "other-reply-to"));


when(session.getChannel()).thenReturn(channel);
when(session.delayMessage(eq(destination), any(Map.class), any(Long.class))).thenReturn("targetExch");

when(destination.isAmqp()).thenReturn(false);
when(destination.isAmqpWritable()).thenReturn(true);
when(destination.getAmqpRoutingKey()).thenReturn("key");

producer.send(message);

ArgumentCaptor<com.rabbitmq.client.AMQP.BasicProperties> propCapture = ArgumentCaptor.forClass(com.rabbitmq.client.AMQP.BasicProperties.class);

verify(channel).basicPublish(eq("targetExch"), eq("key"), propCapture.capture(), eq("Test message".getBytes()));

assertEquals("other-reply-to", propCapture.getValue().getReplyTo());
}
}


static class StubRMQMessageProducer extends RMQMessageProducer {

RMQMessage message;
Expand Down
Loading