Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
be7f82b
First draft implementation
May 7, 2020
fb34f1c
Queue up the receive request queuest
May 8, 2020
41cec1e
Improved logging
May 8, 2020
9ed2f31
spot bugs check resolution
May 8, 2020
1a3ff03
work in progress
May 8, 2020
5c3cf98
Increment package version after release of com.azure azure-messaging-…
azure-sdk May 8, 2020
1d83193
another implementation for multiple receive using BaseSubscriber
May 9, 2020
d9b69e8
continue fix some issues
May 9, 2020
5a8a9ec
added test
May 9, 2020
0146620
fixing unwanted check
May 9, 2020
71cd8d1
REview comments
May 11, 2020
452d557
Incorporating review comments
May 11, 2020
9b52a98
Removing unused import.
May 11, 2020
bdd41f3
continue work ...
May 12, 2020
d8dd7b6
Merge branch 'master' of github.com:Azure/azure-sdk-for-java into ser…
May 12, 2020
39b9a1a
resolve merge conflict
May 12, 2020
f9cc7d2
Merge branch 'azure-sdk-increment-package-version-servicebus-386162'
May 12, 2020
0985041
Added loginc for timeout
May 12, 2020
d1589a0
timeout and remaining messages to request logic
May 13, 2020
3a4b16f
merge master
May 13, 2020
b67ac9b
Merge branch 'master' of github.com:hemanttanwar/azure-sdk-for-java
May 13, 2020
08c3c87
Chaging WIP logic
May 14, 2020
0361bc1
Changed logic for draining queue
May 16, 2020
77b4001
Changed logic for draining queue
May 16, 2020
d473eb5
formatting changes
May 16, 2020
13ef032
formatting changes
May 16, 2020
2e1f98b
removing unwanted flags.git statuys
May 16, 2020
7a333f9
Added lock in ServiceBusReceiverClient
May 16, 2020
64e6b30
some more optimization
May 16, 2020
c9dcdd0
incorporated review comments
May 20, 2020
a1c24b3
removed unwanted files
May 20, 2020
510e76c
Merge branch 'master' of github.com:Azure/azure-sdk-for-java
May 20, 2020
b9be8f5
Merge master into branch
May 20, 2020
fe0486c
unit test
May 22, 2020
8bea4db
adding unit test
May 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import reactor.core.publisher.EmitterProcessor;
import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

Expand All @@ -35,10 +35,9 @@ public final class ServiceBusReceiverClient implements AutoCloseable {
private final AtomicInteger idGenerator = new AtomicInteger();
private final ServiceBusReceiverAsyncClient asyncClient;
private final Duration operationTimeout;
private final Object lock = new Object();

private final AtomicReference<EmitterProcessor<ServiceBusReceivedMessageContext>> messageProcessor =
new AtomicReference<>();
/* To hold each receive work item to be processed.*/
private final AtomicReference<SynchronousMessageSubscriber> synchronousMessageSubscriber = new AtomicReference<>();

/**
* Creates a synchronous receiver given its asynchronous counterpart.
Expand Down Expand Up @@ -467,15 +466,18 @@ public IterableStream<ServiceBusReceivedMessageContext> receive(int maxMessages)
}

/**
* Receives an iterable stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity.
* Receives an iterable stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity. The
* default receive mode is {@link ReceiveMode#PEEK_LOCK } unless it is changed during creation of
* {@link ServiceBusReceiverClient} using {@link ServiceBusReceiverClientBuilder#receiveMode(ReceiveMode)}.
*
* @param maxMessages The maximum number of messages to receive.
* @param maxWaitTime The time the client waits for receiving a message before it times out.
* @return An {@link IterableStream} of at most {@code maxMessages} messages from the Service Bus entity.
*
* @throws IllegalArgumentException if {@code maxMessages} or {@code maxWaitTime} is zero or a negative value.
*/
public IterableStream<ServiceBusReceivedMessageContext> receive(int maxMessages, Duration maxWaitTime) {
public IterableStream<ServiceBusReceivedMessageContext> receive(int maxMessages,
Duration maxWaitTime) {
if (maxMessages <= 0) {
throw logger.logExceptionAsError(new IllegalArgumentException(
"'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages));
Expand Down Expand Up @@ -609,9 +611,9 @@ public void setSessionState(String sessionId, byte[] sessionState) {
public void close() {
asyncClient.close();

EmitterProcessor<ServiceBusReceivedMessageContext> processor = messageProcessor.getAndSet(null);
if (processor != null) {
processor.onComplete();
SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.getAndSet(null);
if (messageSubscriber != null && !messageSubscriber.isDisposed()) {
messageSubscriber.dispose();
}
}

Expand All @@ -621,22 +623,24 @@ public void close() {
*/
private void queueWork(int maximumMessageCount, Duration maxWaitTime,
FluxSink<ServiceBusReceivedMessageContext> emitter) {
synchronized (lock) {
final long id = idGenerator.getAndIncrement();
EmitterProcessor<ServiceBusReceivedMessageContext> emitterProcessor = messageProcessor.get();

final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime,
emitter);
final SynchronousMessageSubscriber syncSubscriber = new SynchronousMessageSubscriber(work);
logger.info("[{}]: Started synchronous message subscriber.", id);

if (emitterProcessor == null) {
emitterProcessor = this.asyncClient.receive()
.subscribeWith(EmitterProcessor.create(asyncClient.getReceiverOptions().getPrefetchCount(), false));
messageProcessor.set(emitterProcessor);
final long id = idGenerator.getAndIncrement();
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, emitter);

SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get();
if (messageSubscriber == null) {
long prefetch = asyncClient.getReceiverOptions().getPrefetchCount();
SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(prefetch, work);

if (!synchronousMessageSubscriber.compareAndSet(null, newSubscriber)) {
newSubscriber.dispose();
SynchronousMessageSubscriber existing = synchronousMessageSubscriber.get();
existing.queueWork(work);
} else {
asyncClient.receive().subscribeWith(newSubscriber);
}

emitterProcessor.subscribe(syncSubscriber);
} else {
messageSubscriber.queueWork(work);
}
logger.verbose("[{}] Receive request queued up.", work.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,114 +5,250 @@

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only thing you need are unit tests. The logic in this subscriber is complex and I can see it being hard to debug.

import com.azure.core.util.logging.ClientLogger;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* Subscriber that listens to events and publishes them downstream and publishes events to them in the order received.
*/
class SynchronousMessageSubscriber extends BaseSubscriber<ServiceBusReceivedMessageContext> {
private final ClientLogger logger = new ClientLogger(SynchronousMessageSubscriber.class);
private final Timer timer = new Timer();
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final SynchronousReceiveWork work;
private final AtomicInteger wip = new AtomicInteger();
private final Queue<SynchronousReceiveWork> workQueue = new ConcurrentLinkedQueue<>();
private final Queue<ServiceBusReceivedMessageContext> bufferMessages = new ConcurrentLinkedQueue<>();
private final AtomicLong remaining = new AtomicLong();

private final long requested;
private final Object currentWorkLock = new Object();

private Disposable currentTimeoutOperation;
private SynchronousReceiveWork currentWork;
private boolean subscriberInitialized;

private volatile Subscription subscription;

SynchronousMessageSubscriber(SynchronousReceiveWork work) {
this.work = Objects.requireNonNull(work, "'work' cannot be null.");
private static final AtomicReferenceFieldUpdater<SynchronousMessageSubscriber, Subscription> UPSTREAM =
AtomicReferenceFieldUpdater.newUpdater(SynchronousMessageSubscriber.class, Subscription.class,
"subscription");


SynchronousMessageSubscriber(long prefetch, SynchronousReceiveWork initialWork) {
this.workQueue.add(initialWork);
requested = initialWork.getNumberOfEvents() > prefetch ? initialWork.getNumberOfEvents() : prefetch;
}

/**
* On an initial subscription, will take the first work item, and request that amount of work for it.
*
* @param subscription Subscription for upstream.
*/
@Override
protected void hookOnSubscribe(Subscription subscription) {
if (this.subscription == null) {

if (Operators.setOnce(UPSTREAM, this, subscription)) {
this.subscription = subscription;
remaining.addAndGet(requested);
subscription.request(requested);
subscriberInitialized = true;
drain();
} else {
logger.error("Already subscribed once.");
}
}

/**
* Publishes the event to the current {@link SynchronousReceiveWork}. If that work item is complete, will dispose of
* the subscriber.
* @param message Event to publish.
*/
@Override
protected void hookOnNext(ServiceBusReceivedMessageContext message) {
bufferMessages.add(message);
drain();
}

/**
* Queue the work to be picked up by drain loop.
* @param work to be queued.
*/
void queueWork(SynchronousReceiveWork work) {

logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", work.getId(), work.getNumberOfEvents(),
work.getTimeout());
workQueue.add(work);

subscription.request(work.getNumberOfEvents());

timer.schedule(new ReceiveTimeoutTask(work.getId(), this::dispose), work.getTimeout().toMillis());
// Do not drain if another thread want to queue the work before we have subscriber
if (subscriberInitialized) {
drain();
}
}

/**
* Publishes the event to the current {@link SynchronousReceiveWork}. If that work item is complete, will dispose of
* the subscriber.
*
* @param value Event to publish.
* Drain the work, only one thread can be in this loop at a time.
*/
@Override
protected void hookOnNext(ServiceBusReceivedMessageContext value) {
work.next(value);
private void drain() {
// If someone is already in this loop, then we are already clearing the queue.
if (!wip.compareAndSet(0, 1)) {
return;
}

if (work.isTerminal()) {
logger.info("[{}] Completed. Closing Flux and cancelling subscription.", work.getId());
dispose();
try {
drainQueue();
} finally {
final int decremented = wip.decrementAndGet();
if (decremented != 0) {
logger.warning("There should be 0, but was: {}", decremented);
}
}
}

@Override
protected void hookOnComplete() {
logger.info("[{}] Completed. No events to listen to.", work.getId());
dispose();
/***
* Drain the queue using a lock on current work in progress.
*/
private void drainQueue() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. There are 4 places in which you set currentWork and these can happen asynchronously. It's possible that the work you're referencing here is one that has been cancelled or timed out. Consider a currentWorkLock that synchronises read-write to this variable and the associated timeout operation.
  2. Looking at other implementations of subscribers and your current implementation, I would put most of the heavy lifting in this method. (ie. checking to see if we have current work, updating it if need be, setting/requesting more work). As it currently is, there are several ways to emit next items and the currentWork is controlled in many places which will be a pain to debug when this fails.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (isTerminated()) {
return;
}

// Acquiring the lock
synchronized (currentWorkLock) {

// Making sure current work not become terminal since last drain queue cycle
if (currentWork != null && currentWork.isTerminal()) {
workQueue.remove(currentWork);
if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
currentTimeoutOperation.dispose();
}
currentTimeoutOperation = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any benefit to setting this to null again (and in a few places)? Once a subscription is disposed calling dispose again is a no-op.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a current work is always tied to a timeout operation. Checking currentWork != null should be enough.

Copy link
Contributor Author

@hemanttanwar hemanttanwar May 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might pick currentWork more than one time from workQueue. currentTimeoutOperation == null will indicate that are we picking up first time.

We do not need to process currentWork if is picked up second time and no bufferMessages to send to it.

while ((currentWork = workQueue.peek()) != null && (currentTimeoutOperation == null || bufferMessages.size() > 0 )) {

The timeout Operation is not removing currentWork from he queue, thus currentWork needs be picked up again, thus we need to threat this different and go in the loop and remove this timeout work. setting currentTimeoutOperation = null indicate this case also.

}

// We should process a work when
// 1. it is first time getting picked up
// 2. or more messages have arrived while we were in drain loop.
// We might not have all the message in bufferMessages needed for workQueue, Thus we will only remove work
// from queue when we have delivered all the messages to currentWork.

while ((currentWork = workQueue.peek()) != null
&& (!currentWork.isProcessingStarted() || bufferMessages.size() > 0)) {

// Additional check for safety, but normally this work should never be terminal
if (currentWork.isTerminal()) {
// This work already finished by either timeout or no more messages to send, process next work.
workQueue.remove(currentWork);
if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
currentTimeoutOperation.dispose();
}
continue;
}

if (!currentWork.isProcessingStarted()) {
// timer to complete the currentWork in case of timeout trigger
currentTimeoutOperation = getTimeoutOperation(currentWork);
currentWork.startedProcessing();
}

// Send messages to currentWork from buffer
while (bufferMessages.size() > 0 && !currentWork.isTerminal()) {
currentWork.next(bufferMessages.poll());
remaining.decrementAndGet();
}

// if we have delivered all the messages to currentWork, we will complete it.
if (currentWork.isTerminal()) {
if (currentWork.getError() == null) {
currentWork.complete();
}
// Now remove from queue since it is complete
workQueue.remove(currentWork);
if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
currentTimeoutOperation.dispose();
}
logger.verbose("The work [{}] is complete.", currentWork.getId());
} else {
// Since this work is not complete, find out how much we should request from upstream
long creditToAdd = currentWork.getRemaining() - (remaining.get() + bufferMessages.size());
if (creditToAdd > 0) {
remaining.addAndGet(creditToAdd);
subscription.request(creditToAdd);
logger.verbose("Requesting [{}] from upstream for work [{}].", creditToAdd,
currentWork.getId());
}
}
}
}
}

/**
* @param work on which timeout thread need to start.
*
* @return {@link Disposable} for the timeout operation.
*/
private Disposable getTimeoutOperation(SynchronousReceiveWork work) {
Duration timeout = work.getTimeout();
return Mono.delay(timeout).thenReturn(work)
.subscribe(l -> {
synchronized (currentWorkLock) {
if (currentWork == work) {
work.timeout();
}
}
});
}

/**
* {@inheritDoc}
*/
@Override
protected void hookOnError(Throwable throwable) {
logger.error("[{}] Errors occurred upstream", work.getId(), throwable);
work.error(throwable);
logger.error("[{}] Errors occurred upstream", currentWork.getId(), throwable);
synchronized (currentWorkLock) {
currentWork.error(throwable);
}
dispose();
}

@Override
protected void hookOnCancel() {
dispose();
}

/**
* {@inheritDoc}
*/
@Override
public void dispose() {
if (isDisposed.getAndSet(true)) {
return;
}

work.complete();
synchronized (currentWorkLock) {
if (currentWork != null) {
currentWork.complete();
}
if (currentTimeoutOperation != null && !currentTimeoutOperation.isDisposed()) {
currentTimeoutOperation.dispose();
}
currentTimeoutOperation = null;
}

subscription.cancel();
timer.cancel();
super.dispose();
}

private static final class ReceiveTimeoutTask extends TimerTask {
private final ClientLogger logger = new ClientLogger(ReceiveTimeoutTask.class);
private final long workId;
private final Runnable onDispose;
private boolean isTerminated() {
return isDisposed.get();
}

ReceiveTimeoutTask(long workId, Runnable onDispose) {
this.workId = workId;
this.onDispose = onDispose;
}
int getWorkQueueSize() {
return this.workQueue.size();
}

@Override
public void run() {
logger.info("[{}] Timeout encountered, disposing of subscriber.", workId);
onDispose.run();
}
long getRequested() {
return this.requested;
}
}

boolean isSubscriberInitialized() {
return this.subscriberInitialized;
}
}
Loading