Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.ReceiveAsyncOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

Expand All @@ -17,6 +18,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* A <b>synchronous</b> receiver responsible for receiving {@link ServiceBusReceivedMessage} from a specific queue or
Expand All @@ -38,6 +40,9 @@ public final class ServiceBusReceiverClient implements AutoCloseable {
.setIsAutoCompleteEnabled(false)
.setMaxAutoLockRenewalDuration(Duration.ZERO);

private final AtomicReference<EmitterProcessor<ServiceBusReceivedMessageContext>> messageProcessor =
new AtomicReference<>();

/**
* Creates a synchronous receiver given its asynchronous counterpart.
*
Expand Down Expand Up @@ -606,20 +611,28 @@ public void setSessionState(String sessionId, byte[] sessionState) {
@Override
public void close() {
asyncClient.close();

if (messageProcessor.get() != null) {
messageProcessor.get().onComplete();
}
}

/**
* Given an {@code emitter}, queues that work in {@link SynchronousMessageSubscriber}. If the synchronous job has
* not been created, will initialise it.
* Given an {@code emitter}, creates a {@link SynchronousMessageSubscriber} to receive messages from Service Bus.
*/
private void queueWork(int maximumMessageCount, Duration maxWaitTime,
private synchronized void queueWork(int maximumMessageCount, Duration maxWaitTime,
FluxSink<ServiceBusReceivedMessageContext> emitter) {
final long id = idGenerator.getAndIncrement();
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime,
emitter);
final SynchronousMessageSubscriber syncSubscriber = new SynchronousMessageSubscriber(work);

logger.info("[{}]: Started synchronous message subscriber.", id);
asyncClient.receive(DEFAULT_RECEIVE_OPTIONS).subscribeWith(syncSubscriber);

if (messageProcessor.get() == null) {
messageProcessor.set(this.asyncClient.receive(DEFAULT_RECEIVE_OPTIONS)
.subscribeWith(EmitterProcessor.create(false)));
}

messageProcessor.get().subscribe(syncSubscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -71,7 +70,7 @@ protected void afterTest() {
if (pending > 0) {
try {
IterableStream<ServiceBusReceivedMessageContext> removedMessage = receiveAndDeleteReceiver.receive(
pending + BUFFER_MESSAGES_TO_REMOVE, Duration.ofSeconds(15));
pending, Duration.ofSeconds(15));

removedMessage.stream().forEach(context -> {
ServiceBusReceivedMessage message = context.getMessage();
Expand All @@ -98,24 +97,43 @@ protected void afterTest() {
}

/**
* Verifies that we can only call receive() once only.
* Verifies that we can only call receive() multiple times.
*/
@MethodSource("messagingEntityWithSessions")
@ParameterizedTest
void receiveByTwoSubscriber(MessagingEntityType entityType, boolean isSessionEnabled) {
// Arrange
setSenderAndReceiver(entityType, isSessionEnabled);
final int maxMessages = 1;
final Duration shortTimeOut = Duration.ofSeconds(5);
final int totalReceive = 3;
final Duration shortTimeOut = Duration.ofSeconds(8);

// Act & Assert
final IterableStream<ServiceBusReceivedMessageContext> messages = receiver.receive(maxMessages, shortTimeOut);
final String messageId = UUID.randomUUID().toString();
final ServiceBusMessage message = getMessage(messageId, isSessionEnabled);

final long receivedMessages = messages.stream().count();
assertEquals(0L, receivedMessages);
for (int i = 0; i < totalReceive * maxMessages; ++i) {
sendMessage(message);
}

// Act & Assert
IterableStream<ServiceBusReceivedMessageContext> messages;

int receivedMessageCount;
int totalReceivedCount = 0;
for (int i = 0; i < totalReceive; ++i) {
messages = receiver.receive(maxMessages, shortTimeOut);
receivedMessageCount = 0;
for (ServiceBusReceivedMessageContext receivedMessage : messages) {
assertMessageEquals(receivedMessage, messageId, isSessionEnabled);
receiver.complete(receivedMessage.getMessage());
messagesPending.decrementAndGet();
++receivedMessageCount;
}
assertEquals(maxMessages, receivedMessageCount);
totalReceivedCount += receivedMessageCount;
}

// Second time user try to receive, it should throw exception.
assertThrows(IllegalStateException.class, () -> receiver.receive(maxMessages, shortTimeOut));
assertEquals(totalReceive * maxMessages, totalReceivedCount);
}

/**
Expand Down Expand Up @@ -580,10 +598,8 @@ private void setSenderAndReceiver(MessagingEntityType entityType, boolean isSess
.buildClient();
} else {
receiver = getReceiverBuilder(false, entityType).buildClient();

receiveAndDeleteReceiver = getSessionReceiverBuilder(false, entityType,
Function.identity(),
builder -> builder.sessionId(sessionId).receiveMode(ReceiveMode.RECEIVE_AND_DELETE))
receiveAndDeleteReceiver = getReceiverBuilder(false, entityType).
receiveMode(ReceiveMode.RECEIVE_AND_DELETE)
.buildClient();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,9 @@ void receiveMessagesWithUserSpecifiedTimeout() {
final int maxMessages = 10;
final int numberToEmit = 5;
final Duration receiveTimeout = Duration.ofSeconds(2);
final AtomicInteger emittedMessages = new AtomicInteger();
Flux<ServiceBusReceivedMessageContext> messageSink = Flux.create(sink -> {
sink.onRequest(e -> {
final AtomicInteger emittedMessages = new AtomicInteger();
if (emittedMessages.get() >= numberToEmit) {
logger.info("Cannot emit more. Reached max already. Emitted: {}. Max: {}",
emittedMessages.get(), numberToEmit);
Expand Down Expand Up @@ -475,9 +475,10 @@ void receiveMessagesTimeout() {
// Arrange
final int maxMessages = 10;
final int numberToEmit = 5;

final AtomicInteger emittedMessages = new AtomicInteger();
Flux<ServiceBusReceivedMessageContext> messageSink = Flux.create(sink -> {
sink.onRequest(e -> {
final AtomicInteger emittedMessages = new AtomicInteger();
if (emittedMessages.get() >= numberToEmit) {
logger.info("Cannot emit more. Reached max already. Emitted: {}. Max: {}",
emittedMessages.get(), numberToEmit);
Expand Down