Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.ReceiveAsyncOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

Expand All @@ -17,6 +19,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* A <b>synchronous</b> receiver responsible for receiving {@link ServiceBusReceivedMessage} from a specific queue or
Expand All @@ -38,6 +41,9 @@ public final class ServiceBusReceiverClient implements AutoCloseable {
.setIsAutoCompleteEnabled(false)
.setMaxAutoLockRenewalDuration(Duration.ZERO);

private final AtomicReference<EmitterProcessor<ServiceBusReceivedMessageContext>> messageProcessor =
new AtomicReference<>();
private final AtomicReference<Disposable> messageProcessorSubscription = new AtomicReference<>();
/**
* Creates a synchronous receiver given its asynchronous counterpart.
*
Expand Down Expand Up @@ -622,20 +628,56 @@ public void setSessionState(String sessionId, byte[] sessionState) {
@Override
public void close() {
asyncClient.close();

if (messageProcessor.get() != null) {
messageProcessor.get().dispose();
}

Disposable activeSubscription = messageProcessorSubscription.get();
if (activeSubscription != null) {
activeSubscription.dispose();
}
}

/**
* Given an {@code emitter}, queues that work in {@link SynchronousMessageSubscriber}. If the synchronous job has
* not been created, will initialise it.
* Given an {@code emitter}, creates a {@link EmitterProcessor} to receive messages from Service Bus. If the
* message processor has not been created, will initialise it.
*/
private void queueWork(int maximumMessageCount, Duration maxWaitTime,
FluxSink<ServiceBusReceivedMessageContext> emitter) {
final long id = idGenerator.getAndIncrement();
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime,
emitter);
final SynchronousMessageSubscriber syncSubscriber = new SynchronousMessageSubscriber(work);

logger.info("[{}]: Started synchronous message subscriber.", id);
asyncClient.receive(DEFAULT_RECEIVE_OPTIONS).subscribeWith(syncSubscriber);
if (messageProcessor.get() != null && messageProcessor.get().isDisposed()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't thread safe, in the time that you check for null, someone else could have set it. Or if it is null, and then you create it, it could possibly overwrite an existing EmitterProcessor that was set in this time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should insure against it messageProcessor.compareAndSet(null, newProcessor)

logger.error("[{}]: Can not receive messaged because client is closed.", asyncClient.getEntityPath());
return;
}

if (messageProcessor.get() == null) {
EmitterProcessor<ServiceBusReceivedMessageContext> processor = asyncClient.receive(DEFAULT_RECEIVE_OPTIONS)
.subscribeWith(EmitterProcessor.create(false));

if (!messageProcessor.compareAndSet(null, processor)) {
processor.dispose();
}

logger.info("[{}]: Started ContinuesMessageSubscriber message subscriber for entity.",
asyncClient.getEntityPath());
}

Disposable newSubscription = messageProcessor.get()
.take(maximumMessageCount)
.timeout(maxWaitTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout() is not the right operator here. The maxWaitTime param specifies the maximum time to wait before receiving all messages upto maximumMessageCount. timeout() operator will reset the timer after each message is received. Also, if there is a timeout, it will throw an exception which is not what should happen. Instead, it should return all messages until the timeout occurred.

.doOnNext(messageContext -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are for side effects.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doOnNext, doOnError, doOnComplete are for side effects like logging or monitoring. It shouldn't be used as part of the main chain of operations.

emitter.next(messageContext);
})
.doOnError(throwable -> {
emitter.error(throwable);
})
.doOnComplete(emitter::complete)
.subscribe();

Disposable oldSubscription = messageProcessorSubscription.getAndSet(newSubscription);
if (oldSubscription != null) {
oldSubscription.dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -70,7 +69,7 @@ protected void afterTest() {
if (pending > 0) {
try {
IterableStream<ServiceBusReceivedMessageContext> removedMessage = receiveAndDeleteReceiver.receive(
pending + BUFFER_MESSAGES_TO_REMOVE, Duration.ofSeconds(15));
pending, Duration.ofSeconds(15));

removedMessage.stream().forEach(context -> {
ServiceBusReceivedMessage message = context.getMessage();
Expand All @@ -97,24 +96,43 @@ protected void afterTest() {
}

/**
* Verifies that we can only call receive() once only.
* Verifies that we can only call receive() multiple times.
*/
@MethodSource("messagingEntityWithSessions")
@ParameterizedTest
void receiveByTwoSubscriber(MessagingEntityType entityType, boolean isSessionEnabled) {
// Arrange
setSenderAndReceiver(entityType, isSessionEnabled);
final int maxMessages = 1;
final int maxMessages = 5;
final int totalReceive = 3;
final Duration shortTimeOut = Duration.ofSeconds(5);

// Act & Assert
final IterableStream<ServiceBusReceivedMessageContext> messages = receiver.receive(maxMessages, shortTimeOut);
final String messageId = UUID.randomUUID().toString();
final ServiceBusMessage message = getMessage(messageId, isSessionEnabled);

final long receivedMessages = messages.stream().count();
assertEquals(0L, receivedMessages);
for (int i = 0; i < totalReceive * maxMessages; ++i) {
sendMessage(message);
}

// Act & Assert
IterableStream<ServiceBusReceivedMessageContext> messages;

int receivedMessageCount;
int totalReceivedCount = 0;
for (int i = 0; i < totalReceive; ++i) {
messages = receiver.receive(maxMessages, shortTimeOut);
receivedMessageCount = 0;
for (ServiceBusReceivedMessageContext receivedMessage : messages) {
assertMessageEquals(receivedMessage, messageId, isSessionEnabled);
receiver.complete(receivedMessage.getMessage());
messagesPending.decrementAndGet();
++receivedMessageCount;
}
assertEquals(maxMessages, receivedMessageCount);
totalReceivedCount += receivedMessageCount;
}

// Second time user try to receive, it should throw exception.
assertThrows(IllegalStateException.class, () -> receiver.receive(maxMessages, shortTimeOut));
assertEquals(totalReceive * maxMessages, totalReceivedCount);
}

/**
Expand Down