Skip to content

Commit 6ea230e

Browse files
authored
SyncClient: Bug fix - Multiple Receive on same client (#10734)
* SyncReceiverClient: Should be able to call receive() multiple times on same receiver client.
1 parent b0ed22d commit 6ea230e

File tree

4 files changed

+72
-25
lines changed

4 files changed

+72
-25
lines changed

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -999,6 +999,13 @@ private ServiceBusAsyncConsumer getOrCreateConsumer(ReceiveAsyncOptions options)
999999
}
10001000
}
10011001

1002+
/**
1003+
*
1004+
* @return receiver options set by user;
1005+
*/
1006+
ReceiverOptions getReceiverOptions() {
1007+
return receiverOptions;
1008+
}
10021009
/**
10031010
* Renews the message lock, and updates its value in the container.
10041011
*/

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.azure.messaging.servicebus.models.DeadLetterOptions;
1010
import com.azure.messaging.servicebus.models.ReceiveAsyncOptions;
1111
import com.azure.messaging.servicebus.models.ReceiveMode;
12+
import reactor.core.publisher.EmitterProcessor;
1213
import reactor.core.publisher.Flux;
1314
import reactor.core.publisher.FluxSink;
1415

@@ -17,6 +18,7 @@
1718
import java.util.Map;
1819
import java.util.Objects;
1920
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.concurrent.atomic.AtomicReference;
2022

2123
/**
2224
* A <b>synchronous</b> receiver responsible for receiving {@link ServiceBusReceivedMessage} from a specific queue or
@@ -34,10 +36,14 @@ public final class ServiceBusReceiverClient implements AutoCloseable {
3436
private final AtomicInteger idGenerator = new AtomicInteger();
3537
private final ServiceBusReceiverAsyncClient asyncClient;
3638
private final Duration operationTimeout;
39+
private final Object lock = new Object();
3740
private static final ReceiveAsyncOptions DEFAULT_RECEIVE_OPTIONS = new ReceiveAsyncOptions()
3841
.setIsAutoCompleteEnabled(false)
3942
.setMaxAutoLockRenewalDuration(Duration.ZERO);
4043

44+
private final AtomicReference<EmitterProcessor<ServiceBusReceivedMessageContext>> messageProcessor =
45+
new AtomicReference<>();
46+
4147
/**
4248
* Creates a synchronous receiver given its asynchronous counterpart.
4349
*
@@ -606,20 +612,35 @@ public void setSessionState(String sessionId, byte[] sessionState) {
606612
@Override
607613
public void close() {
608614
asyncClient.close();
615+
616+
EmitterProcessor<ServiceBusReceivedMessageContext> processor = messageProcessor.getAndSet(null);
617+
if (processor != null) {
618+
processor.onComplete();
619+
}
609620
}
610621

611622
/**
612-
* Given an {@code emitter}, queues that work in {@link SynchronousMessageSubscriber}. If the synchronous job has
613-
* not been created, will initialise it.
623+
* Given an {@code emitter}, creates a {@link SynchronousMessageSubscriber} to receive messages from Service Bus
624+
* entity.
614625
*/
615626
private void queueWork(int maximumMessageCount, Duration maxWaitTime,
616627
FluxSink<ServiceBusReceivedMessageContext> emitter) {
617-
final long id = idGenerator.getAndIncrement();
618-
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime,
619-
emitter);
620-
final SynchronousMessageSubscriber syncSubscriber = new SynchronousMessageSubscriber(work);
621-
622-
logger.info("[{}]: Started synchronous message subscriber.", id);
623-
asyncClient.receive(DEFAULT_RECEIVE_OPTIONS).subscribeWith(syncSubscriber);
628+
synchronized (lock) {
629+
final long id = idGenerator.getAndIncrement();
630+
EmitterProcessor<ServiceBusReceivedMessageContext> emitterProcessor = messageProcessor.get();
631+
632+
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime,
633+
emitter);
634+
final SynchronousMessageSubscriber syncSubscriber = new SynchronousMessageSubscriber(work);
635+
logger.info("[{}]: Started synchronous message subscriber.", id);
636+
637+
if (emitterProcessor == null) {
638+
emitterProcessor = this.asyncClient.receive(DEFAULT_RECEIVE_OPTIONS)
639+
.subscribeWith(EmitterProcessor.create(asyncClient.getReceiverOptions().getPrefetchCount(), false));
640+
messageProcessor.set(emitterProcessor);
641+
}
642+
643+
emitterProcessor.subscribe(syncSubscriber);
644+
}
624645
}
625646
}

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
2828
import static org.junit.jupiter.api.Assertions.assertEquals;
2929
import static org.junit.jupiter.api.Assertions.assertNotNull;
30-
import static org.junit.jupiter.api.Assertions.assertThrows;
3130
import static org.junit.jupiter.api.Assertions.assertTrue;
3231

3332
/**
@@ -71,7 +70,7 @@ protected void afterTest() {
7170
if (pending > 0) {
7271
try {
7372
IterableStream<ServiceBusReceivedMessageContext> removedMessage = receiveAndDeleteReceiver.receive(
74-
pending + BUFFER_MESSAGES_TO_REMOVE, Duration.ofSeconds(15));
73+
pending, Duration.ofSeconds(15));
7574

7675
removedMessage.stream().forEach(context -> {
7776
ServiceBusReceivedMessage message = context.getMessage();
@@ -98,24 +97,43 @@ protected void afterTest() {
9897
}
9998

10099
/**
101-
* Verifies that we can only call receive() once only.
100+
* Verifies that we can only call receive() multiple times.
102101
*/
103102
@MethodSource("messagingEntityWithSessions")
104103
@ParameterizedTest
105104
void receiveByTwoSubscriber(MessagingEntityType entityType, boolean isSessionEnabled) {
106105
// Arrange
107106
setSenderAndReceiver(entityType, isSessionEnabled);
108107
final int maxMessages = 1;
109-
final Duration shortTimeOut = Duration.ofSeconds(5);
108+
final int totalReceive = 3;
109+
final Duration shortTimeOut = Duration.ofSeconds(8);
110110

111-
// Act & Assert
112-
final IterableStream<ServiceBusReceivedMessageContext> messages = receiver.receive(maxMessages, shortTimeOut);
111+
final String messageId = UUID.randomUUID().toString();
112+
final ServiceBusMessage message = getMessage(messageId, isSessionEnabled);
113113

114-
final long receivedMessages = messages.stream().count();
115-
assertEquals(0L, receivedMessages);
114+
for (int i = 0; i < totalReceive * maxMessages; ++i) {
115+
sendMessage(message);
116+
}
117+
118+
// Act & Assert
119+
IterableStream<ServiceBusReceivedMessageContext> messages;
120+
121+
int receivedMessageCount;
122+
int totalReceivedCount = 0;
123+
for (int i = 0; i < totalReceive; ++i) {
124+
messages = receiver.receive(maxMessages, shortTimeOut);
125+
receivedMessageCount = 0;
126+
for (ServiceBusReceivedMessageContext receivedMessage : messages) {
127+
assertMessageEquals(receivedMessage, messageId, isSessionEnabled);
128+
receiver.complete(receivedMessage.getMessage());
129+
messagesPending.decrementAndGet();
130+
++receivedMessageCount;
131+
}
132+
assertEquals(maxMessages, receivedMessageCount);
133+
totalReceivedCount += receivedMessageCount;
134+
}
116135

117-
// Second time user try to receive, it should throw exception.
118-
assertThrows(IllegalStateException.class, () -> receiver.receive(maxMessages, shortTimeOut));
136+
assertEquals(totalReceive * maxMessages, totalReceivedCount);
119137
}
120138

121139
/**
@@ -580,10 +598,8 @@ private void setSenderAndReceiver(MessagingEntityType entityType, boolean isSess
580598
.buildClient();
581599
} else {
582600
receiver = getReceiverBuilder(false, entityType).buildClient();
583-
584-
receiveAndDeleteReceiver = getSessionReceiverBuilder(false, entityType,
585-
Function.identity(),
586-
builder -> builder.sessionId(sessionId).receiveMode(ReceiveMode.RECEIVE_AND_DELETE))
601+
receiveAndDeleteReceiver = getReceiverBuilder(false, entityType).
602+
receiveMode(ReceiveMode.RECEIVE_AND_DELETE)
587603
.buildClient();
588604
}
589605
}

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.azure.core.util.logging.ClientLogger;
88
import com.azure.messaging.servicebus.models.DeadLetterOptions;
99
import com.azure.messaging.servicebus.models.ReceiveAsyncOptions;
10+
import com.azure.messaging.servicebus.models.ReceiveMode;
1011
import org.junit.jupiter.api.AfterEach;
1112
import org.junit.jupiter.api.BeforeEach;
1213
import org.junit.jupiter.api.Test;
@@ -64,6 +65,7 @@ void setup() {
6465

6566
when(asyncClient.getEntityPath()).thenReturn(ENTITY_PATH);
6667
when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE);
68+
when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1));
6769

6870
when(messageLockToken.getLockToken()).thenReturn(LOCK_TOKEN);
6971

@@ -382,9 +384,9 @@ void receiveMessagesWithUserSpecifiedTimeout() {
382384
final int maxMessages = 10;
383385
final int numberToEmit = 5;
384386
final Duration receiveTimeout = Duration.ofSeconds(2);
387+
final AtomicInteger emittedMessages = new AtomicInteger();
385388
Flux<ServiceBusReceivedMessageContext> messageSink = Flux.create(sink -> {
386389
sink.onRequest(e -> {
387-
final AtomicInteger emittedMessages = new AtomicInteger();
388390
if (emittedMessages.get() >= numberToEmit) {
389391
logger.info("Cannot emit more. Reached max already. Emitted: {}. Max: {}",
390392
emittedMessages.get(), numberToEmit);
@@ -475,9 +477,10 @@ void receiveMessagesTimeout() {
475477
// Arrange
476478
final int maxMessages = 10;
477479
final int numberToEmit = 5;
480+
481+
final AtomicInteger emittedMessages = new AtomicInteger();
478482
Flux<ServiceBusReceivedMessageContext> messageSink = Flux.create(sink -> {
479483
sink.onRequest(e -> {
480-
final AtomicInteger emittedMessages = new AtomicInteger();
481484
if (emittedMessages.get() >= numberToEmit) {
482485
logger.info("Cannot emit more. Reached max already. Emitted: {}. Max: {}",
483486
emittedMessages.get(), numberToEmit);

0 commit comments

Comments
 (0)