Skip to content

Commit

Permalink
support direct reply-to feature
Browse files Browse the repository at this point in the history
  • Loading branch information
yfunikov committed Jan 5, 2020
1 parent abaec24 commit c0064e9
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ public MockChannel(int channelNumber, MockNode node, MockConnection mockConnecti
.queueDeclare(generateIfEmpty(""),false, true, true, Collections.emptyMap(), this)
.getQueue();

node.queueBind(directReplyToQueue, "", directReplyToQueue, null);

metricsCollectorWrapper.newChannel(this);
}

Expand Down
54 changes: 23 additions & 31 deletions src/test/java/com/github/fridujo/rabbitmq/mock/ChannelTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
package com.github.fridujo.rabbitmq.mock;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
Expand All @@ -11,24 +27,9 @@
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;

class ChannelTest {

@Test
Expand Down Expand Up @@ -707,15 +708,15 @@ public void handleCancelOk(String consumerTag) {
// assert that internal queue is removed
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThrows(IOException.class, () -> channel.queueDeclarePassive(replyTo));
assertThatThrownBy(() -> channel.queueDeclarePassive(replyTo)).isInstanceOf(IOException.class);
}
}

}

@Test
void directReplyTo_basicConsume_noAutoAck() {
assertThrows(IllegalStateException.class, () -> {
assertThatThrownBy(() -> {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.basicConsume("amq.rabbitmq.reply-to", false, new DefaultConsumer(channel) {
Expand All @@ -725,36 +726,27 @@ public void handleDelivery(String consumerTag,
AMQP.BasicProperties properties,
byte[] body) {

fail();
fail("not implemented");
}

@Override
public void handleCancelOk(String consumerTag) {
fail();
fail("not implemented");
}
});
}
}
});
}).isInstanceOf(IllegalStateException.class);
}

@Test
void directReplyTo_basicGet_noAutoAck() {
assertThrows(IllegalStateException.class, () -> {
assertThatThrownBy(() -> {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
channel.basicGet("amq.rabbitmq.reply-to", false);
}
}
});
}

@Test
void directReplyTo_replyTo_queue_exists() throws IOException, TimeoutException {
try (Connection conn = new MockConnectionFactory().newConnection()) {
try (Channel channel = conn.createChannel()) {
assertThat(channel.queueDeclarePassive("amq.rabbitmq.reply-to")).isNotNull();
}
}
}).isInstanceOf(IllegalStateException.class);
}
}

0 comments on commit c0064e9

Please sign in to comment.