Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support direct reply-to feature #82

Merged
merged 1 commit into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/main/java/com/github/fridujo/rabbitmq/mock/MockChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.rabbitmq.client.impl.AMQImpl;

public class MockChannel implements Channel {
public static final String DIRECT_REPLY_TO_QUEUE = "amq.rabbitmq.reply-to";
private static final Logger LOGGER = LoggerFactory.getLogger(MockChannel.class);
private final int channelNumber;
private final MockNode node;
Expand All @@ -48,6 +49,7 @@ public class MockChannel implements Channel {
22);
private final Set<ConfirmListener> confirmListeners = new HashSet<>();

private final String directReplyToQueue;
private String lastGeneratedQueueName;
private Transaction transaction;
private boolean confirmMode = false;
Expand All @@ -58,6 +60,12 @@ public MockChannel(int channelNumber, MockNode node, MockConnection mockConnecti
this.node = node;
this.mockConnection = mockConnection;
this.metricsCollectorWrapper = metricsCollectorWrapper;

this.directReplyToQueue =
node
.queueDeclare(generateIfEmpty(""),false, true, true, Collections.emptyMap(), this)
.getQueue();

metricsCollectorWrapper.newChannel(this);
}

Expand Down Expand Up @@ -173,6 +181,9 @@ public void basicPublish(String exchange, String routingKey, boolean mandatory,

@Override
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) {
if (props != null && DIRECT_REPLY_TO_QUEUE.equals(props.getReplyTo())) {
props = props.builder().replyTo(directReplyToQueue).build();
}
getTransactionOrNode().basicPublish(exchange, routingKey, mandatory, immediate, nullToEmpty(props), body);
metricsCollectorWrapper.basicPublish(this);
if (confirmMode) {
Expand Down Expand Up @@ -305,6 +316,9 @@ public void queueDeclareNoWait(String queue, boolean durable, boolean exclusive,

@Override
public AMQP.Queue.DeclareOk queueDeclarePassive(String queueName) throws IOException {
if (DIRECT_REPLY_TO_QUEUE.equals(queueName)) {
return new AMQImpl.Queue.DeclareOk(queueName, 0, 0);
}
String definitiveQueueName = lastGeneratedIfEmpty(queueName);
Optional<MockQueue> queue = node.getQueue(definitiveQueueName);
if (!queue.isPresent()) {
Expand Down Expand Up @@ -360,6 +374,14 @@ public AMQP.Queue.PurgeOk queuePurge(String queue) {

@Override
public GetResponse basicGet(String queue, boolean autoAck) {
if (DIRECT_REPLY_TO_QUEUE.equals(queue)) {
queue = directReplyToQueue;

if (!autoAck) {
throw new IllegalStateException("direct reply-to requires autoAck");
}
}

GetResponse getResponse = node.basicGet(lastGeneratedIfEmpty(queue), autoAck, this::nextDeliveryTag);
if (getResponse != null) {
metricsCollectorWrapper.consumedMessage(this, getResponse.getEnvelope().getDeliveryTag(), autoAck);
Expand Down Expand Up @@ -467,6 +489,13 @@ public String basicConsume(String queue, boolean autoAck, String consumerTag, De

@Override
public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) {
if (DIRECT_REPLY_TO_QUEUE.equals(queue)) {
queue = directReplyToQueue;

if (!autoAck) {
throw new IllegalStateException("direct reply-to requires autoAck");
}
}
String serverConsumerTag = node.basicConsume(lastGeneratedIfEmpty(queue), autoAck, consumerTag, noLocal, exclusive, nullToEmpty(arguments), callback, this::nextDeliveryTag, mockConnection);
metricsCollectorWrapper.basicConsume(this, serverConsumerTag, autoAck);
return serverConsumerTag;
Expand Down
114 changes: 114 additions & 0 deletions src/test/java/com/github/fridujo/rabbitmq/mock/ChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -25,6 +27,9 @@
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

class ChannelTest {

@Test
Expand Down Expand Up @@ -633,4 +638,113 @@ void commit_or_rollback_can_be_called_multiple_times_after_a_single_select() thr
}
}
}

@Test
void directReplyTo_basicPublish_basicGet() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {

String queue = channel.queueDeclare().getQueue();

channel.basicPublish("", queue, new AMQP.BasicProperties.Builder().replyTo("amq.rabbitmq.reply-to").build(), "ping".getBytes());
assertThat(channel.messageCount(queue)).isEqualTo(1);

final GetResponse basicGet = channel.basicGet(queue, true);
final String replyTo = basicGet.getProps().getReplyTo();
assertThat(replyTo).startsWith("amq.gen-");

channel.basicPublish("", replyTo, null, "pong".getBytes());

final GetResponse reply = channel.basicGet("amq.rabbitmq.reply-to", true);
assertThat(new String(reply.getBody())).isEqualTo("pong");
}
}
}

@Test
void directReplyTo_basicPublish_basicConsume() throws IOException, TimeoutException, InterruptedException {
final String replyTo;
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {

String queue = channel.queueDeclare().getQueue();

channel.basicPublish("", queue, new AMQP.BasicProperties.Builder().replyTo("amq.rabbitmq.reply-to").build(), "ping".getBytes());
assertThat(channel.messageCount(queue)).isEqualTo(1);

final GetResponse basicGet = channel.basicGet(queue, true);
replyTo = basicGet.getProps().getReplyTo();
assertThat(replyTo).startsWith("amq.gen-");

channel.basicPublish("", replyTo, null, "pong".getBytes());

CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean cancelled = new AtomicBoolean();
AtomicReference<String> reply = new AtomicReference<>();
String consumerTag = channel.basicConsume("amq.rabbitmq.reply-to", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
assertThat(reply.compareAndSet(null, new String(body))).isTrue();
latch.countDown();
}

@Override
public void handleCancelOk(String consumerTag) {
cancelled.set(true);
}
});

latch.await(1, TimeUnit.SECONDS);
channel.basicCancel(consumerTag);

assertThat(cancelled).isTrue();
assertThat(reply.get()).isEqualTo("pong");
}
}

// assert that internal queue is removed
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThatThrownBy(() -> channel.queueDeclarePassive(replyTo)).isInstanceOf(IOException.class);
}
}
}

@Test
void directReplyTo_basicConsume_noAutoAck() {
assertThatThrownBy(() -> {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.basicConsume("amq.rabbitmq.reply-to", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
fail("not implemented");
}

@Override
public void handleCancelOk(String consumerTag) {
fail("not implemented");
}
});
}
}
}).isInstanceOf(IllegalStateException.class);
}

@Test
void directReplyTo_basicGet_noAutoAck() {
assertThatThrownBy(() -> {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.basicGet("amq.rabbitmq.reply-to", false);
}
}
}).isInstanceOf(IllegalStateException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
Expand Down Expand Up @@ -80,7 +82,35 @@ void basic_consume_case() {
}
}
}

@Test
void reply_direct_to() throws ExecutionException, InterruptedException {
String messageBody = "Hello world!";
try (AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AmqpConfiguration.class)) {
RabbitTemplate rabbitTemplate = queueAndExchangeSetup(context);

// using AsyncRabbitTemplate to avoid automatic fallback to temporary queue
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);

Receiver receiver = new Receiver();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(context.getBean(ConnectionFactory.class));
container.setQueueNames(QUEUE_NAME);
container.setMessageListener(new MessageListenerAdapter(receiver, "receiveMessageAndReply"));
try {
container.start();
asyncRabbitTemplate.start();

AsyncRabbitTemplate.RabbitConverterFuture<Object> result = asyncRabbitTemplate.convertSendAndReceive(EXCHANGE_NAME, "test.key2", messageBody);

assertThat(result.get()).isEqualTo(new StringBuilder(messageBody).reverse().toString());
assertThat(receiver.getMessages()).containsExactly(messageBody);
} finally {
container.stop();
asyncRabbitTemplate.stop();
}
}
}
private RabbitTemplate queueAndExchangeSetup(BeanFactory context) {
RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);

Expand Down Expand Up @@ -148,6 +178,11 @@ static class Receiver {
public void receiveMessage(String message) {
this.messages.add(message);
}

public String receiveMessageAndReply(String message) {
this.messages.add(message);
return new StringBuilder(message).reverse().toString();
}

List<String> getMessages() {
return messages;
Expand Down