Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.ReceiveAsyncOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

Expand All @@ -17,6 +19,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
* A <b>synchronous</b> receiver responsible for receiving {@link ServiceBusReceivedMessage} from a specific queue or
Expand All @@ -38,6 +41,9 @@ public final class ServiceBusReceiverClient implements AutoCloseable {
.setIsAutoCompleteEnabled(false)
.setMaxAutoLockRenewalDuration(Duration.ZERO);

private final AtomicReference<EmitterProcessor<ServiceBusReceivedMessageContext>> messageProcessor =
new AtomicReference<>();
private final AtomicReference<Disposable> messageProcessorSubscription = new AtomicReference<>();
/**
* Creates a synchronous receiver given its asynchronous counterpart.
*
Expand Down Expand Up @@ -503,7 +509,6 @@ public IterableStream<ServiceBusReceivedMessageContext> receive(int maxMessages,

final Flux<ServiceBusReceivedMessageContext> messages = Flux.create(emitter -> queueWork(maxMessages,
maxWaitTime, emitter));

return new IterableStream<>(messages);
}

Expand Down Expand Up @@ -622,20 +627,61 @@ public void setSessionState(String sessionId, byte[] sessionState) {
@Override
public void close() {
asyncClient.close();

if (messageProcessor.get() != null && !messageProcessor.get().isDisposed()) {
messageProcessor.get().dispose();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Between isDisposed() check and dispose() the processor might have changed state. This is not an atomic operation, if that's what you were trying to achieve using the AtomicReference.


Disposable activeSubscription = messageProcessorSubscription.get();
if (activeSubscription != null && !activeSubscription.isDisposed()) {
activeSubscription.dispose();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, this is not an atomic operation.

}

/**
* Given an {@code emitter}, queues that work in {@link SynchronousMessageSubscriber}. If the synchronous job has
* not been created, will initialise it.
* Given an {@code emitter}, creates a {@link EmitterProcessor} to receive messages from Service Bus. If the
* message processor has not been created, will initialise it.
*/
private void queueWork(int maximumMessageCount, Duration maxWaitTime,
FluxSink<ServiceBusReceivedMessageContext> emitter) {
final long id = idGenerator.getAndIncrement();
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime,
emitter);
final SynchronousMessageSubscriber syncSubscriber = new SynchronousMessageSubscriber(work);

logger.info("[{}]: Started synchronous message subscriber.", id);
asyncClient.receive(DEFAULT_RECEIVE_OPTIONS).subscribeWith(syncSubscriber);
if (messageProcessor.get() != null && messageProcessor.get().isDisposed()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't thread safe, in the time that you check for null, someone else could have set it. Or if it is null, and then you create it, it could possibly overwrite an existing EmitterProcessor that was set in this time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should insure against it messageProcessor.compareAndSet(null, newProcessor)

logger.error("[{}]: Can not receive messaged because client is closed.", asyncClient.getEntityPath());
return;
}

if (messageProcessor.get() == null) {
logger.info("[{}]: Creating EmitterProcessor message processor for entity.", asyncClient.getEntityPath());

EmitterProcessor<ServiceBusReceivedMessageContext> newProcessor = asyncClient.receive(DEFAULT_RECEIVE_OPTIONS)
.subscribeWith(EmitterProcessor.create(false));

// if some other thread have come in between, we will dispose new processor
if (!messageProcessor.compareAndSet(null, newProcessor)) {
newProcessor.dispose();
}

logger.info("[{}]: Started EmitterProcessor message processor for entity.",
asyncClient.getEntityPath());
}

Disposable newSubscription = messageProcessor.get()
.take(maximumMessageCount)
.timeout(maxWaitTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout() is not the right operator here. The maxWaitTime param specifies the maximum time to wait before receiving all messages upto maximumMessageCount. timeout() operator will reset the timer after each message is received. Also, if there is a timeout, it will throw an exception which is not what should happen. Instead, it should return all messages until the timeout occurred.

.map(message -> {
emitter.next(message);
return message;
})
.subscribe(message -> {},
error -> {
logger.error("Error occurred while receiving messages.", error);
emitter.error(error);
},
() -> emitter.complete());

Disposable oldSubscription = messageProcessorSubscription.getAndSet(newSubscription);
if (oldSubscription != null && !oldSubscription.isDisposed()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the existing subscription is still waiting for work? You're going to cancel it?

oldSubscription.dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -70,7 +69,7 @@ protected void afterTest() {
if (pending > 0) {
try {
IterableStream<ServiceBusReceivedMessageContext> removedMessage = receiveAndDeleteReceiver.receive(
pending + BUFFER_MESSAGES_TO_REMOVE, Duration.ofSeconds(15));
pending, Duration.ofSeconds(15));

removedMessage.stream().forEach(context -> {
ServiceBusReceivedMessage message = context.getMessage();
Expand All @@ -97,24 +96,43 @@ protected void afterTest() {
}

/**
* Verifies that we can only call receive() once only.
* Verifies that we can only call receive() multiple times.
*/
@MethodSource("messagingEntityWithSessions")
@ParameterizedTest
void receiveByTwoSubscriber(MessagingEntityType entityType, boolean isSessionEnabled) {
// Arrange
setSenderAndReceiver(entityType, isSessionEnabled);
final int maxMessages = 1;
final int maxMessages = 5;
final int totalReceive = 3;
final Duration shortTimeOut = Duration.ofSeconds(5);

// Act & Assert
final IterableStream<ServiceBusReceivedMessageContext> messages = receiver.receive(maxMessages, shortTimeOut);
final String messageId = UUID.randomUUID().toString();
final ServiceBusMessage message = getMessage(messageId, isSessionEnabled);

final long receivedMessages = messages.stream().count();
assertEquals(0L, receivedMessages);
for (int i = 0; i < totalReceive * maxMessages; ++i) {
sendMessage(message);
}

// Act & Assert
IterableStream<ServiceBusReceivedMessageContext> messages;

int receivedMessageCount;
int totalReceivedCount = 0;
for (int i = 0; i < totalReceive; ++i) {
messages = receiver.receive(maxMessages, shortTimeOut);
receivedMessageCount = 0;
for (ServiceBusReceivedMessageContext receivedMessage : messages) {
assertMessageEquals(receivedMessage, messageId, isSessionEnabled);
receiver.complete(receivedMessage.getMessage());
messagesPending.decrementAndGet();
++receivedMessageCount;
}
assertEquals(maxMessages, receivedMessageCount);
totalReceivedCount += receivedMessageCount;
}

// Second time user try to receive, it should throw exception.
assertThrows(IllegalStateException.class, () -> receiver.receive(maxMessages, shortTimeOut));
assertEquals(totalReceive * maxMessages, totalReceivedCount);
}

/**
Expand Down