diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 563bc090759a..70262794a3ee 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -999,6 +999,13 @@ private ServiceBusAsyncConsumer getOrCreateConsumer(ReceiveAsyncOptions options) } } + /** + * + * @return receiver options set by user; + */ + ReceiverOptions getReceiverOptions() { + return receiverOptions; + } /** * Renews the message lock, and updates its value in the container. */ diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index f92f35180abf..ef7c67dc9d2d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -9,6 +9,7 @@ import com.azure.messaging.servicebus.models.DeadLetterOptions; import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; import com.azure.messaging.servicebus.models.ReceiveMode; +import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; @@ -17,6 +18,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * A synchronous receiver responsible for receiving {@link ServiceBusReceivedMessage} from a specific queue or @@ -34,10 +36,14 @@ public final class ServiceBusReceiverClient implements AutoCloseable { private final AtomicInteger idGenerator = new AtomicInteger(); private final ServiceBusReceiverAsyncClient asyncClient; private final Duration operationTimeout; + private final Object lock = new Object(); private static final ReceiveAsyncOptions DEFAULT_RECEIVE_OPTIONS = new ReceiveAsyncOptions() .setIsAutoCompleteEnabled(false) .setMaxAutoLockRenewalDuration(Duration.ZERO); + private final AtomicReference> messageProcessor = + new AtomicReference<>(); + /** * Creates a synchronous receiver given its asynchronous counterpart. * @@ -606,20 +612,35 @@ public void setSessionState(String sessionId, byte[] sessionState) { @Override public void close() { asyncClient.close(); + + EmitterProcessor processor = messageProcessor.getAndSet(null); + if (processor != null) { + processor.onComplete(); + } } /** - * Given an {@code emitter}, queues that work in {@link SynchronousMessageSubscriber}. If the synchronous job has - * not been created, will initialise it. + * Given an {@code emitter}, creates a {@link SynchronousMessageSubscriber} to receive messages from Service Bus + * entity. */ private void queueWork(int maximumMessageCount, Duration maxWaitTime, FluxSink emitter) { - final long id = idGenerator.getAndIncrement(); - final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, - emitter); - final SynchronousMessageSubscriber syncSubscriber = new SynchronousMessageSubscriber(work); - - logger.info("[{}]: Started synchronous message subscriber.", id); - asyncClient.receive(DEFAULT_RECEIVE_OPTIONS).subscribeWith(syncSubscriber); + synchronized (lock) { + final long id = idGenerator.getAndIncrement(); + EmitterProcessor emitterProcessor = messageProcessor.get(); + + final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, + emitter); + final SynchronousMessageSubscriber syncSubscriber = new SynchronousMessageSubscriber(work); + logger.info("[{}]: Started synchronous message subscriber.", id); + + if (emitterProcessor == null) { + emitterProcessor = this.asyncClient.receive(DEFAULT_RECEIVE_OPTIONS) + .subscribeWith(EmitterProcessor.create(asyncClient.getReceiverOptions().getPrefetchCount(), false)); + messageProcessor.set(emitterProcessor); + } + + emitterProcessor.subscribe(syncSubscriber); + } } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java index 256c128f9587..e69d9f2794c4 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java @@ -27,7 +27,6 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -71,7 +70,7 @@ protected void afterTest() { if (pending > 0) { try { IterableStream removedMessage = receiveAndDeleteReceiver.receive( - pending + BUFFER_MESSAGES_TO_REMOVE, Duration.ofSeconds(15)); + pending, Duration.ofSeconds(15)); removedMessage.stream().forEach(context -> { ServiceBusReceivedMessage message = context.getMessage(); @@ -98,7 +97,7 @@ protected void afterTest() { } /** - * Verifies that we can only call receive() once only. + * Verifies that we can only call receive() multiple times. */ @MethodSource("messagingEntityWithSessions") @ParameterizedTest @@ -106,16 +105,35 @@ void receiveByTwoSubscriber(MessagingEntityType entityType, boolean isSessionEna // Arrange setSenderAndReceiver(entityType, isSessionEnabled); final int maxMessages = 1; - final Duration shortTimeOut = Duration.ofSeconds(5); + final int totalReceive = 3; + final Duration shortTimeOut = Duration.ofSeconds(8); - // Act & Assert - final IterableStream messages = receiver.receive(maxMessages, shortTimeOut); + final String messageId = UUID.randomUUID().toString(); + final ServiceBusMessage message = getMessage(messageId, isSessionEnabled); - final long receivedMessages = messages.stream().count(); - assertEquals(0L, receivedMessages); + for (int i = 0; i < totalReceive * maxMessages; ++i) { + sendMessage(message); + } + + // Act & Assert + IterableStream messages; + + int receivedMessageCount; + int totalReceivedCount = 0; + for (int i = 0; i < totalReceive; ++i) { + messages = receiver.receive(maxMessages, shortTimeOut); + receivedMessageCount = 0; + for (ServiceBusReceivedMessageContext receivedMessage : messages) { + assertMessageEquals(receivedMessage, messageId, isSessionEnabled); + receiver.complete(receivedMessage.getMessage()); + messagesPending.decrementAndGet(); + ++receivedMessageCount; + } + assertEquals(maxMessages, receivedMessageCount); + totalReceivedCount += receivedMessageCount; + } - // Second time user try to receive, it should throw exception. - assertThrows(IllegalStateException.class, () -> receiver.receive(maxMessages, shortTimeOut)); + assertEquals(totalReceive * maxMessages, totalReceivedCount); } /** @@ -580,10 +598,8 @@ private void setSenderAndReceiver(MessagingEntityType entityType, boolean isSess .buildClient(); } else { receiver = getReceiverBuilder(false, entityType).buildClient(); - - receiveAndDeleteReceiver = getSessionReceiverBuilder(false, entityType, - Function.identity(), - builder -> builder.sessionId(sessionId).receiveMode(ReceiveMode.RECEIVE_AND_DELETE)) + receiveAndDeleteReceiver = getReceiverBuilder(false, entityType). + receiveMode(ReceiveMode.RECEIVE_AND_DELETE) .buildClient(); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java index 635f5e6d91d3..6747d6a08cf4 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java @@ -7,6 +7,7 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.models.DeadLetterOptions; import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; +import com.azure.messaging.servicebus.models.ReceiveMode; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -64,6 +65,7 @@ void setup() { when(asyncClient.getEntityPath()).thenReturn(ENTITY_PATH); when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); + when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1)); when(messageLockToken.getLockToken()).thenReturn(LOCK_TOKEN); @@ -382,9 +384,9 @@ void receiveMessagesWithUserSpecifiedTimeout() { final int maxMessages = 10; final int numberToEmit = 5; final Duration receiveTimeout = Duration.ofSeconds(2); + final AtomicInteger emittedMessages = new AtomicInteger(); Flux messageSink = Flux.create(sink -> { sink.onRequest(e -> { - final AtomicInteger emittedMessages = new AtomicInteger(); if (emittedMessages.get() >= numberToEmit) { logger.info("Cannot emit more. Reached max already. Emitted: {}. Max: {}", emittedMessages.get(), numberToEmit); @@ -475,9 +477,10 @@ void receiveMessagesTimeout() { // Arrange final int maxMessages = 10; final int numberToEmit = 5; + + final AtomicInteger emittedMessages = new AtomicInteger(); Flux messageSink = Flux.create(sink -> { sink.onRequest(e -> { - final AtomicInteger emittedMessages = new AtomicInteger(); if (emittedMessages.get() >= numberToEmit) { logger.info("Cannot emit more. Reached max already. Emitted: {}. Max: {}", emittedMessages.get(), numberToEmit);