Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
be7f82b
First draft implementation
May 7, 2020
fb34f1c
Queue up the receive request queuest
May 8, 2020
41cec1e
Improved logging
May 8, 2020
9ed2f31
spot bugs check resolution
May 8, 2020
1a3ff03
work in progress
May 8, 2020
5c3cf98
Increment package version after release of com.azure azure-messaging-…
azure-sdk May 8, 2020
1d83193
another implementation for multiple receive using BaseSubscriber
May 9, 2020
d9b69e8
continue fix some issues
May 9, 2020
5a8a9ec
added test
May 9, 2020
0146620
fixing unwanted check
May 9, 2020
71cd8d1
REview comments
May 11, 2020
452d557
Incorporating review comments
May 11, 2020
9b52a98
Removing unused import.
May 11, 2020
bdd41f3
continue work ...
May 12, 2020
d8dd7b6
Merge branch 'master' of github.com:Azure/azure-sdk-for-java into ser…
May 12, 2020
39b9a1a
resolve merge conflict
May 12, 2020
f9cc7d2
Merge branch 'azure-sdk-increment-package-version-servicebus-386162'
May 12, 2020
0985041
Added loginc for timeout
May 12, 2020
d1589a0
timeout and remaining messages to request logic
May 13, 2020
3a4b16f
merge master
May 13, 2020
b67ac9b
Merge branch 'master' of github.com:hemanttanwar/azure-sdk-for-java
May 13, 2020
08c3c87
Chaging WIP logic
May 14, 2020
0361bc1
Changed logic for draining queue
May 16, 2020
77b4001
Changed logic for draining queue
May 16, 2020
d473eb5
formatting changes
May 16, 2020
13ef032
formatting changes
May 16, 2020
2e1f98b
removing unwanted flags.git statuys
May 16, 2020
7a333f9
Added lock in ServiceBusReceiverClient
May 16, 2020
64e6b30
some more optimization
May 16, 2020
c9dcdd0
incorporated review comments
May 20, 2020
a1c24b3
removed unwanted files
May 20, 2020
510e76c
Merge branch 'master' of github.com:Azure/azure-sdk-for-java
May 20, 2020
b9be8f5
Merge master into branch
May 20, 2020
fe0486c
unit test
May 22, 2020
8bea4db
adding unit test
May 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.ReceiveAsyncOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

Expand Down Expand Up @@ -41,8 +40,8 @@ public final class ServiceBusReceiverClient implements AutoCloseable {
.setIsAutoCompleteEnabled(false)
.setMaxAutoLockRenewalDuration(Duration.ZERO);

private final AtomicReference<EmitterProcessor<ServiceBusReceivedMessageContext>> messageProcessor =
new AtomicReference<>();
/* To hold each receive work item to be processed.*/
private final AtomicReference<SynchronousMessageSubscriber> synchronousMessageSubscriber = new AtomicReference<>();

/**
* Creates a synchronous receiver given its asynchronous counterpart.
Expand Down Expand Up @@ -479,7 +478,8 @@ public IterableStream<ServiceBusReceivedMessageContext> receive(int maxMessages)
*
* @throws IllegalArgumentException if {@code maxMessages} or {@code maxWaitTime} is zero or a negative value.
*/
public IterableStream<ServiceBusReceivedMessageContext> receive(int maxMessages, Duration maxWaitTime) {
public synchronized IterableStream<ServiceBusReceivedMessageContext> receive(int maxMessages,
Comment thread
hemanttanwar marked this conversation as resolved.
Outdated
Duration maxWaitTime) {
if (maxMessages <= 0) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
Expand Down Expand Up @@ -613,9 +613,9 @@ public void setSessionState(String sessionId, byte[] sessionState) {
public void close() {
asyncClient.close();

EmitterProcessor<ServiceBusReceivedMessageContext> processor = messageProcessor.getAndSet(null);
if (processor != null) {
processor.onComplete();
SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.getAndSet(null);
if (messageSubscriber != null && !messageSubscriber.isDisposed()) {
messageSubscriber.dispose();
}
}

Expand All @@ -627,20 +627,20 @@ private void queueWork(int maximumMessageCount, Duration maxWaitTime,
FluxSink<ServiceBusReceivedMessageContext> emitter) {
synchronized (lock) {
Comment thread
conniey marked this conversation as resolved.
Outdated
final long id = idGenerator.getAndIncrement();
EmitterProcessor<ServiceBusReceivedMessageContext> emitterProcessor = messageProcessor.get();

final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime,
emitter);
final SynchronousMessageSubscriber syncSubscriber = new SynchronousMessageSubscriber(work);
logger.info("[{}]: Started synchronous message subscriber.", id);

if (emitterProcessor == null) {
emitterProcessor = this.asyncClient.receive(DEFAULT_RECEIVE_OPTIONS)
.subscribeWith(EmitterProcessor.create(asyncClient.getReceiverOptions().getPrefetchCount(), false));
messageProcessor.set(emitterProcessor);
SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get();
if (messageSubscriber == null) {
messageSubscriber = asyncClient.receive(DEFAULT_RECEIVE_OPTIONS)
.subscribeWith(new SynchronousMessageSubscriber(asyncClient.getReceiverOptions()
.getPrefetchCount(), work));
synchronousMessageSubscriber.set(messageSubscriber);
Comment thread
hemanttanwar marked this conversation as resolved.
Outdated
} else {
messageSubscriber.queueWork(work);
}

emitterProcessor.subscribe(syncSubscriber);
logger.verbose("[{}] Receive request queued up.", work.getId());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,114 +5,162 @@

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only thing you need are unit tests. The logic in this subscriber is complex and I can see it being hard to debug.

import com.azure.core.util.logging.ClientLogger;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;

import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Subscriber that listens to events and publishes them downstream and publishes events to them in the order received.
*/
class SynchronousMessageSubscriber extends BaseSubscriber<ServiceBusReceivedMessageContext> {
private final ClientLogger logger = new ClientLogger(SynchronousMessageSubscriber.class);
private final Timer timer = new Timer();
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final SynchronousReceiveWork work;

private final long prefetch;
private final AtomicInteger wip = new AtomicInteger();
private final Queue<SynchronousReceiveWork> workQueue = new ConcurrentLinkedQueue<>();
private final SynchronousReceiveWork initialWork;
private volatile Subscription subscription;

SynchronousMessageSubscriber(SynchronousReceiveWork work) {
this.work = Objects.requireNonNull(work, "'work' cannot be null.");

private SynchronousReceiveWork currentWork;
private Disposable timeoutOperation;
private Disposable drainQueueDisposable;

SynchronousMessageSubscriber(long prefetch, SynchronousReceiveWork initWork) {
Comment thread
hemanttanwar marked this conversation as resolved.
Outdated
this.prefetch = prefetch;
this.initialWork = initWork;
}

/**
* On an initial subscription, will take the first work item, and request that amount of work for it.
*
* @param subscription Subscription for upstream.
*/
@Override
protected void hookOnSubscribe(Subscription subscription) {
if (this.subscription == null) {
this.subscription = subscription;
}

logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", work.getId(), work.getNumberOfEvents(),
work.getTimeout());
this.subscription = subscription;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this variable? if the subscription != null, I'll assume there is an upstream.

Also, if we call hookonSubscribe twice, it'll stomp over the previous subscription. We should guard against this by only setting it when it is null and erroring when someone wants to set it again.

You'll see a setOnce in some of the reactor operations.


subscription.request(work.getNumberOfEvents());
logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", initialWork.getId(),
initialWork.getNumberOfEvents(), initialWork.getTimeout());

timer.schedule(new ReceiveTimeoutTask(work.getId(), this::dispose), work.getTimeout().toMillis());
// This will trigger subscription.request(N) and queue up the work
Comment thread
hemanttanwar marked this conversation as resolved.
Outdated
queueWork(initialWork);
}

/**
* Publishes the event to the current {@link SynchronousReceiveWork}. If that work item is complete, will dispose of
* the subscriber.
*
* @param value Event to publish.
* @param message Event to publish.
*/
@Override
protected void hookOnNext(ServiceBusReceivedMessageContext value) {
work.next(value);
protected void hookOnNext(ServiceBusReceivedMessageContext message) {
Comment thread
hemanttanwar marked this conversation as resolved.
currentWork.next(message);

if (work.isTerminal()) {
logger.info("[{}] Completed. Closing Flux and cancelling subscription.", work.getId());
dispose();
if (currentWork.isTerminal()) {
logger.info("[{}] Completed. Closing Flux and cancelling subscription.", currentWork.getId());
completeCurrentWork(currentWork);
}
}

@Override
protected void hookOnComplete() {
logger.info("[{}] Completed. No events to listen to.", work.getId());
dispose();
private void completeCurrentWork(SynchronousReceiveWork currentWork) {

if (isTerminated()) {
return;
}

currentWork.complete();
logger.verbose("[{}] work completed.", currentWork.getId());

if (timeoutOperation != null && !timeoutOperation.isDisposed()) {
Comment thread
hemanttanwar marked this conversation as resolved.
Outdated
timeoutOperation.dispose();
}
if (drainQueueDisposable != null && !drainQueueDisposable.isDisposed()) {
drainQueueDisposable.dispose();
}

if (wip.decrementAndGet() != 0) {
Comment thread
conniey marked this conversation as resolved.
Outdated
logger.warning("There is another worker in drainLoop. But there should only be 1 worker.");
}

// After current work finished and there more receive requests
if (workQueue.size() > 0) {
Comment thread
hemanttanwar marked this conversation as resolved.
Outdated
drain();
}
}

void queueWork(SynchronousReceiveWork work) {

logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", work.getId(), work.getNumberOfEvents(),
work.getTimeout());
workQueue.add(work);
drain();
}

private void drain() {
// If someone is already in this loop, then we are already clearing the queue.
if (!wip.compareAndSet(0, 1)) {
return;
}
// Drain queue..
Comment thread
hemanttanwar marked this conversation as resolved.
Outdated
drainQueueDisposable = Mono.just(true)
.subscribe(l -> {
drainQueue();
});
}

private void drainQueue() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. There are 4 places in which you set currentWork and these can happen asynchronously. It's possible that the work you're referencing here is one that has been cancelled or timed out. Consider a currentWorkLock that synchronises read-write to this variable and the associated timeout operation.
  2. Looking at other implementations of subscribers and your current implementation, I would put most of the heavy lifting in this method. (ie. checking to see if we have current work, updating it if need be, setting/requesting more work). As it currently is, there are several ways to emit next items and the currentWork is controlled in many places which will be a pain to debug when this fails.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (isTerminated()) {
return;
}
currentWork = workQueue.poll();
if (currentWork == null) {
return;
}

subscription.request(currentWork.getNumberOfEvents());
Comment thread
hemanttanwar marked this conversation as resolved.
Outdated

// timer to complete the current in case of timeout trigger
timeoutOperation = Mono.delay(currentWork.getTimeout())
.subscribe(l -> {
if (!currentWork.isTerminal()) {
completeCurrentWork(currentWork);
}
});
}

/**
* {@inheritDoc}
*/
@Override
protected void hookOnError(Throwable throwable) {
logger.error("[{}] Errors occurred upstream", work.getId(), throwable);
work.error(throwable);
logger.error("[{}] Errors occurred upstream", currentWork.getId(), throwable);
currentWork.error(throwable);
dispose();
}

@Override
protected void hookOnCancel() {
dispose();
}

/**
* {@inheritDoc}
*/
@Override
public void dispose() {
if (isDisposed.getAndSet(true)) {
return;
}

work.complete();
if (currentWork.getError() != null) {
currentWork.error(currentWork.getError());
} else {
currentWork.complete();
}
subscription.cancel();
timer.cancel();
super.dispose();
}

private static final class ReceiveTimeoutTask extends TimerTask {
private final ClientLogger logger = new ClientLogger(ReceiveTimeoutTask.class);
private final long workId;
private final Runnable onDispose;

ReceiveTimeoutTask(long workId, Runnable onDispose) {
this.workId = workId;
this.onDispose = onDispose;
if (timeoutOperation != null && !timeoutOperation.isDisposed()) {
timeoutOperation.dispose();
}
}

@Override
public void run() {
logger.info("[{}] Timeout encountered, disposing of subscriber.", workId);
onDispose.run();
}
private boolean isTerminated() {
return isDisposed.get();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,12 @@ void error(Throwable error) {
this.error = error;
emitter.error(error);
}

/**
* Indicate is an this work encountered an error.
* @return true if an error occured.
*/
Throwable getError() {
Comment thread
hemanttanwar marked this conversation as resolved.
return this.error;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Integration tests for {@link ServiceBusReceiverClient} from queues or subscriptions.
Expand Down Expand Up @@ -101,11 +102,11 @@ protected void afterTest() {
*/
@MethodSource("messagingEntityWithSessions")
@ParameterizedTest
void receiveByTwoSubscriber(MessagingEntityType entityType, boolean isSessionEnabled) {
void multipleReceiveByOneSubscriber(MessagingEntityType entityType, boolean isSessionEnabled) {
// Arrange
setSenderAndReceiver(entityType, isSessionEnabled);
final int maxMessages = 1;
final int totalReceive = 3;
final int maxMessages = 3;
final int totalReceive = 10;
final Duration shortTimeOut = Duration.ofSeconds(8);

final String messageId = UUID.randomUUID().toString();
Expand Down Expand Up @@ -136,6 +137,64 @@ void receiveByTwoSubscriber(MessagingEntityType entityType, boolean isSessionEna
assertEquals(totalReceive * maxMessages, totalReceivedCount);
}

/**
* Verifies that we can only call receive() multiple times.
*/
@MethodSource("messagingEntityWithSessions")
@ParameterizedTest
void parallelReceiveByOneSubscriber(MessagingEntityType entityType, boolean isSessionEnabled) {
// Arrange
setSenderAndReceiver(entityType, isSessionEnabled);
final int maxMessagesEachReceive = 4;
final int totalReceiver = 10;
final Duration shortTimeOut = Duration.ofSeconds(8);

final String messageId = UUID.randomUUID().toString();
final ServiceBusMessage message = getMessage(messageId, isSessionEnabled);

for (int i = 0; i < totalReceiver * maxMessagesEachReceive; ++i) {
sendMessage(message);
}

// Act & Assert
AtomicInteger totalReceivedMessages = new AtomicInteger();
List<Thread> receiverThreads = new ArrayList<>();
for (int i = 0; i < totalReceiver; ++i) {
int finalI = i;
Thread thread = new Thread(() -> {
IterableStream<ServiceBusReceivedMessageContext> messages1 = receiver.
receive(maxMessagesEachReceive, shortTimeOut);
int receivedMessageCount = 0;
long lastSequenceReceiver = 0;
for (ServiceBusReceivedMessageContext receivedMessage : messages1) {
logger.verbose("Receiver [{}}] Received Sequence Number: ", (finalI + 1), receivedMessage
.getMessage().getSequenceNumber());
assertMessageEquals(receivedMessage, messageId, isSessionEnabled);
receiver.complete(receivedMessage.getMessage());
assertTrue(receivedMessage.getMessage().getSequenceNumber() > lastSequenceReceiver);
lastSequenceReceiver = receivedMessage.getMessage().getSequenceNumber();
messagesPending.decrementAndGet();
++receivedMessageCount;
}
totalReceivedMessages.addAndGet(receivedMessageCount);
assertEquals(maxMessagesEachReceive, receivedMessageCount);
logger.verbose("Receiver [{}}] . Test Completed receivers ", (finalI + 1));
});
receiverThreads.add(thread);
}

receiverThreads.forEach(t -> t.start());

receiverThreads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
fail("Error in receiving messages: " + e.getMessage());
}
});
assertEquals(totalReceiver * maxMessagesEachReceive, totalReceivedMessages.get());
}

/**
* Verifies that we can send and receive two messages.
*/
Expand Down