Skip to content

Commit

Permalink
support direct reply-to feature
Browse files Browse the repository at this point in the history
  • Loading branch information
yfunikov authored and ledoyen committed Jan 15, 2020
1 parent 4bf9919 commit 357c570
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 0 deletions.
29 changes: 29 additions & 0 deletions src/main/java/com/github/fridujo/rabbitmq/mock/MockChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.rabbitmq.client.impl.AMQImpl;

public class MockChannel implements Channel {
public static final String DIRECT_REPLY_TO_QUEUE = "amq.rabbitmq.reply-to";
private static final Logger LOGGER = LoggerFactory.getLogger(MockChannel.class);
private final int channelNumber;
private final MockNode node;
Expand All @@ -48,6 +49,7 @@ public class MockChannel implements Channel {
22);
private final Set<ConfirmListener> confirmListeners = new HashSet<>();

private final String directReplyToQueue;
private String lastGeneratedQueueName;
private Transaction transaction;
private boolean confirmMode = false;
Expand All @@ -58,6 +60,12 @@ public MockChannel(int channelNumber, MockNode node, MockConnection mockConnecti
this.node = node;
this.mockConnection = mockConnection;
this.metricsCollectorWrapper = metricsCollectorWrapper;

this.directReplyToQueue =
node
.queueDeclare(generateIfEmpty(""),false, true, true, Collections.emptyMap(), this)
.getQueue();

metricsCollectorWrapper.newChannel(this);
}

Expand Down Expand Up @@ -173,6 +181,9 @@ public void basicPublish(String exchange, String routingKey, boolean mandatory,

@Override
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) {
if (props != null && DIRECT_REPLY_TO_QUEUE.equals(props.getReplyTo())) {
props = props.builder().replyTo(directReplyToQueue).build();
}
getTransactionOrNode().basicPublish(exchange, routingKey, mandatory, immediate, nullToEmpty(props), body);
metricsCollectorWrapper.basicPublish(this);
if (confirmMode) {
Expand Down Expand Up @@ -305,6 +316,9 @@ public void queueDeclareNoWait(String queue, boolean durable, boolean exclusive,

@Override
public AMQP.Queue.DeclareOk queueDeclarePassive(String queueName) throws IOException {
if (DIRECT_REPLY_TO_QUEUE.equals(queueName)) {
return new AMQImpl.Queue.DeclareOk(queueName, 0, 0);
}
String definitiveQueueName = lastGeneratedIfEmpty(queueName);
Optional<MockQueue> queue = node.getQueue(definitiveQueueName);
if (!queue.isPresent()) {
Expand Down Expand Up @@ -360,6 +374,14 @@ public AMQP.Queue.PurgeOk queuePurge(String queue) {

@Override
public GetResponse basicGet(String queue, boolean autoAck) {
if (DIRECT_REPLY_TO_QUEUE.equals(queue)) {
queue = directReplyToQueue;

if (!autoAck) {
throw new IllegalStateException("direct reply-to requires autoAck");
}
}

GetResponse getResponse = node.basicGet(lastGeneratedIfEmpty(queue), autoAck, this::nextDeliveryTag);
if (getResponse != null) {
metricsCollectorWrapper.consumedMessage(this, getResponse.getEnvelope().getDeliveryTag(), autoAck);
Expand Down Expand Up @@ -467,6 +489,13 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, De

@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) {
if (DIRECT_REPLY_TO_QUEUE.equals(queue)) {
queue = directReplyToQueue;

if (!autoAck) {
throw new IllegalStateException("direct reply-to requires autoAck");
}
}
String serverConsumerTag = node.basicConsume(lastGeneratedIfEmpty(queue), autoAck, consumerTag, noLocal, exclusive, nullToEmpty(arguments), callback, this::nextDeliveryTag, mockConnection);
metricsCollectorWrapper.basicConsume(this, serverConsumerTag, autoAck);
return serverConsumerTag;
Expand Down
114 changes: 114 additions & 0 deletions src/test/java/com/github/fridujo/rabbitmq/mock/ChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -25,6 +27,9 @@
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

class ChannelTest {

@Test
Expand Down Expand Up @@ -633,4 +638,113 @@ void commit_or_rollback_can_be_called_multiple_times_after_a_single_select() thr
}
}
}

@Test
void directReplyTo_basicPublish_basicGet() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {

String queue = channel.queueDeclare().getQueue();

channel.basicPublish("", queue, new AMQP.BasicProperties.Builder().replyTo("amq.rabbitmq.reply-to").build(), "ping".getBytes());
assertThat(channel.messageCount(queue)).isEqualTo(1);

final GetResponse basicGet = channel.basicGet(queue, true);
final String replyTo = basicGet.getProps().getReplyTo();
assertThat(replyTo).startsWith("amq.gen-");

channel.basicPublish("", replyTo, null, "pong".getBytes());

final GetResponse reply = channel.basicGet("amq.rabbitmq.reply-to", true);
assertThat(new String(reply.getBody())).isEqualTo("pong");
}
}
}

@Test
void directReplyTo_basicPublish_basicConsume() throws IOException, TimeoutException, InterruptedException {
final String replyTo;
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {

String queue = channel.queueDeclare().getQueue();

channel.basicPublish("", queue, new AMQP.BasicProperties.Builder().replyTo("amq.rabbitmq.reply-to").build(), "ping".getBytes());
assertThat(channel.messageCount(queue)).isEqualTo(1);

final GetResponse basicGet = channel.basicGet(queue, true);
replyTo = basicGet.getProps().getReplyTo();
assertThat(replyTo).startsWith("amq.gen-");

channel.basicPublish("", replyTo, null, "pong".getBytes());

CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean cancelled = new AtomicBoolean();
AtomicReference<String> reply = new AtomicReference<>();
String consumerTag = channel.basicConsume("amq.rabbitmq.reply-to", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
assertThat(reply.compareAndSet(null, new String(body))).isTrue();
latch.countDown();
}

@Override
public void handleCancelOk(String consumerTag) {
cancelled.set(true);
}
});

latch.await(1, TimeUnit.SECONDS);
channel.basicCancel(consumerTag);

assertThat(cancelled).isTrue();
assertThat(reply.get()).isEqualTo("pong");
}
}

// assert that internal queue is removed
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThatThrownBy(() -> channel.queueDeclarePassive(replyTo)).isInstanceOf(IOException.class);
}
}
}

@Test
void directReplyTo_basicConsume_noAutoAck() {
assertThatThrownBy(() -> {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.basicConsume("amq.rabbitmq.reply-to", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
fail("not implemented");
}

@Override
public void handleCancelOk(String consumerTag) {
fail("not implemented");
}
});
}
}
}).isInstanceOf(IllegalStateException.class);
}

@Test
void directReplyTo_basicGet_noAutoAck() {
assertThatThrownBy(() -> {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.basicGet("amq.rabbitmq.reply-to", false);
}
}
}).isInstanceOf(IllegalStateException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
Expand Down Expand Up @@ -80,7 +82,35 @@ void basic_consume_case() {
}
}
}

@Test
void reply_direct_to() throws ExecutionException, InterruptedException {
String messageBody = "Hello world!";
try (AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AmqpConfiguration.class)) {
RabbitTemplate rabbitTemplate = queueAndExchangeSetup(context);

// using AsyncRabbitTemplate to avoid automatic fallback to temporary queue
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);

Receiver receiver = new Receiver();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(context.getBean(ConnectionFactory.class));
container.setQueueNames(QUEUE_NAME);
container.setMessageListener(new MessageListenerAdapter(receiver, "receiveMessageAndReply"));
try {
container.start();
asyncRabbitTemplate.start();

AsyncRabbitTemplate.RabbitConverterFuture<Object> result = asyncRabbitTemplate.convertSendAndReceive(EXCHANGE_NAME, "test.key2", messageBody);

assertThat(result.get()).isEqualTo(new StringBuilder(messageBody).reverse().toString());
assertThat(receiver.getMessages()).containsExactly(messageBody);
} finally {
container.stop();
asyncRabbitTemplate.stop();
}
}
}
private RabbitTemplate queueAndExchangeSetup(BeanFactory context) {
RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);

Expand Down Expand Up @@ -148,6 +178,11 @@ static class Receiver {
public void receiveMessage(String message) {
this.messages.add(message);
}

public String receiveMessageAndReply(String message) {
this.messages.add(message);
return new StringBuilder(message).reverse().toString();
}

List<String> getMessages() {
return messages;
Expand Down

0 comments on commit 357c570

Please sign in to comment.