-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Servicebus track2 sync queue up multiple receive calls #10940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
be7f82b
fb34f1c
41cec1e
9ed2f31
1a3ff03
5c3cf98
1d83193
d9b69e8
5a8a9ec
0146620
71cd8d1
452d557
9b52a98
bdd41f3
d8dd7b6
39b9a1a
f9cc7d2
0985041
d1589a0
3a4b16f
b67ac9b
08c3c87
0361bc1
77b4001
d473eb5
13ef032
2e1f98b
7a333f9
64e6b30
c9dcdd0
a1c24b3
510e76c
b9be8f5
fe0486c
8bea4db
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,114 +5,203 @@ | |
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only thing you need are unit tests. The logic in this subscriber is complex and I can see it being hard to debug. |
||
| import com.azure.core.util.logging.ClientLogger; | ||
| import org.reactivestreams.Subscription; | ||
| import reactor.core.Disposable; | ||
| import reactor.core.publisher.BaseSubscriber; | ||
| import reactor.core.publisher.Mono; | ||
|
|
||
| import java.util.Objects; | ||
| import java.util.Timer; | ||
| import java.util.TimerTask; | ||
| import java.util.Queue; | ||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| /** | ||
| * Subscriber that listens to events and publishes them downstream and publishes events to them in the order received. | ||
| */ | ||
| class SynchronousMessageSubscriber extends BaseSubscriber<ServiceBusReceivedMessageContext> { | ||
| private final ClientLogger logger = new ClientLogger(SynchronousMessageSubscriber.class); | ||
| private final Timer timer = new Timer(); | ||
| private final AtomicBoolean isDisposed = new AtomicBoolean(); | ||
| private final SynchronousReceiveWork work; | ||
| private final long prefetch; | ||
| private final AtomicInteger wip = new AtomicInteger(); | ||
| private final Queue<SynchronousReceiveWork> workQueue = new ConcurrentLinkedQueue<>(); | ||
| private final Queue<ServiceBusReceivedMessageContext> bufferMessages = new ConcurrentLinkedQueue<>(); | ||
| private final SynchronousReceiveWork initialWork; | ||
| private final AtomicLong remaining = new AtomicLong(); | ||
|
|
||
| private volatile Subscription subscription; | ||
|
|
||
| SynchronousMessageSubscriber(SynchronousReceiveWork work) { | ||
| this.work = Objects.requireNonNull(work, "'work' cannot be null."); | ||
| private SynchronousReceiveWork currentWork; | ||
| private Disposable timeoutOperation; | ||
|
|
||
| SynchronousMessageSubscriber(long prefetch, SynchronousReceiveWork initialWork) { | ||
|
hemanttanwar marked this conversation as resolved.
|
||
| this.prefetch = prefetch; | ||
| this.initialWork = initialWork; | ||
| } | ||
|
|
||
| /** | ||
| * On an initial subscription, will take the first work item, and request that amount of work for it. | ||
| * | ||
| * @param subscription Subscription for upstream. | ||
| */ | ||
| @Override | ||
| protected void hookOnSubscribe(Subscription subscription) { | ||
| if (this.subscription == null) { | ||
| this.subscription = subscription; | ||
| } | ||
| this.subscription = subscription; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you need this variable? if the subscription != null, I'll assume there is an upstream. Also, if we call hookonSubscribe twice, it'll stomp over the previous subscription. We should guard against this by only setting it when it is null and erroring when someone wants to set it again. You'll see a setOnce in some of the reactor operations. |
||
|
|
||
| logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", work.getId(), work.getNumberOfEvents(), | ||
| work.getTimeout()); | ||
|
|
||
| subscription.request(work.getNumberOfEvents()); | ||
| logger.info("[{}] onSubscribe Pending: {}, Scheduling receive timeout task '{}'.", initialWork.getId(), | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| initialWork.getNumberOfEvents(), initialWork.getTimeout()); | ||
|
|
||
| timer.schedule(new ReceiveTimeoutTask(work.getId(), this::dispose), work.getTimeout().toMillis()); | ||
| // This will trigger subscription.request(N) and queue up the work | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| queueWork(initialWork); | ||
| } | ||
|
|
||
| /** | ||
| * Publishes the event to the current {@link SynchronousReceiveWork}. If that work item is complete, will dispose of | ||
| * the subscriber. | ||
| * | ||
| * @param value Event to publish. | ||
| * @param message Event to publish. | ||
| */ | ||
| @Override | ||
| protected void hookOnNext(ServiceBusReceivedMessageContext value) { | ||
| work.next(value); | ||
| protected void hookOnNext(ServiceBusReceivedMessageContext message) { | ||
|
hemanttanwar marked this conversation as resolved.
|
||
| if (currentWork == null) { | ||
| //Boundary condition(timeout cases), buffer the received message for future requests. | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| bufferMessages.add(message); | ||
|
conniey marked this conversation as resolved.
Outdated
|
||
| return; | ||
| } | ||
| currentWork.next(message); | ||
| remaining.decrementAndGet(); | ||
|
|
||
| if (currentWork.isTerminal()) { | ||
| currentWork.complete(); | ||
| if (timeoutOperation != null && !timeoutOperation.isDisposed()) { | ||
| timeoutOperation.dispose(); | ||
| } | ||
|
|
||
| // Now see if there is more queued up work | ||
| currentWork = workQueue.poll(); | ||
| if (currentWork != null) { | ||
|
|
||
| logger.verbose("[{}] Picking up next receive request.", currentWork.getId()); | ||
|
|
||
| // timer to complete the current in case of timeout trigger | ||
| timeoutOperation = getTimeoutOperation(); | ||
|
|
||
| requestCredits(currentWork.getNumberOfEvents()); | ||
| } else { | ||
| if (wip.decrementAndGet() != 0) { | ||
| logger.warning("There is another worker in drainLoop. But there should only be 1 worker."); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (work.isTerminal()) { | ||
| logger.info("[{}] Completed. Closing Flux and cancelling subscription.", work.getId()); | ||
| dispose(); | ||
| /** | ||
| * | ||
| * @param requested credits for current {@link SynchronousReceiveWork}. | ||
| */ | ||
| private void requestCredits(long requested) { | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| long creditToAdd = requested - (remaining.get() + bufferMessages.size()); | ||
| if (creditToAdd > 0) { | ||
| remaining.addAndGet(creditToAdd); | ||
| subscription.request(creditToAdd); | ||
| } else { | ||
| logger.verbose("[{}] No need to request credit. ", currentWork.getId()); | ||
| } | ||
|
|
||
| } | ||
|
|
||
| @Override | ||
| protected void hookOnComplete() { | ||
| logger.info("[{}] Completed. No events to listen to.", work.getId()); | ||
| dispose(); | ||
| void queueWork(SynchronousReceiveWork work) { | ||
|
|
||
| logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", work.getId(), work.getNumberOfEvents(), | ||
| work.getTimeout()); | ||
| workQueue.add(work); | ||
| drain(); | ||
| } | ||
|
|
||
| private void drain() { | ||
| if (workQueue.size() == 0) { | ||
| return; | ||
| } | ||
|
|
||
| // If someone is already in this loop, then we are already clearing the queue. | ||
| if (!wip.compareAndSet(0, 1)) { | ||
| return; | ||
| } | ||
|
|
||
| // Drain queue.. | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| drainQueue(); | ||
|
|
||
| } | ||
|
|
||
|
|
||
| private void drainQueue() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can look at any of the operators for inspiration. |
||
| if (isTerminated()) { | ||
| return; | ||
| } | ||
| currentWork = workQueue.poll(); | ||
| if (currentWork == null) { | ||
| return; | ||
| } | ||
| long sentFromBuffer = 0; | ||
| if (bufferMessages.size() > 0) { | ||
| // If we already have messages in buffer, we should send it first | ||
|
|
||
| while (!bufferMessages.isEmpty() || sentFromBuffer < currentWork.getNumberOfEvents()) { | ||
| currentWork.next(bufferMessages.poll()); | ||
| remaining.decrementAndGet(); | ||
| ++sentFromBuffer; | ||
| } | ||
| if (sentFromBuffer == currentWork.getNumberOfEvents()) { | ||
| currentWork.complete(); | ||
| logger.verbose("[{}] Sent [{}] messages from buffer.", currentWork.getId(), sentFromBuffer); | ||
| drainQueue(); | ||
| } | ||
| } | ||
| // timer to complete the current in case of timeout trigger | ||
| timeoutOperation = getTimeoutOperation(); | ||
|
|
||
| requestCredits(currentWork.getNumberOfEvents() - sentFromBuffer); | ||
| } | ||
|
|
||
| private Disposable getTimeoutOperation() { | ||
| return Mono.delay(currentWork.getTimeout()) | ||
| .subscribe(l -> { | ||
| if (currentWork != null && !currentWork.isTerminal()) { | ||
| logger.verbose("[{}] Timeout triggered.", currentWork.getId()); | ||
| currentWork.complete(); | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| } | ||
| if (wip.decrementAndGet() != 0) { | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| logger.warning("There is another worker in drainLoop. But there should only be 1 worker."); | ||
| } | ||
| //any work which needs to be processed. | ||
| drain(); | ||
| }); | ||
| } | ||
| /** | ||
| * {@inheritDoc} | ||
| */ | ||
| @Override | ||
| protected void hookOnError(Throwable throwable) { | ||
| logger.error("[{}] Errors occurred upstream", work.getId(), throwable); | ||
| work.error(throwable); | ||
| logger.error("[{}] Errors occurred upstream", currentWork.getId(), throwable); | ||
| currentWork.error(throwable); | ||
| dispose(); | ||
| } | ||
|
|
||
| @Override | ||
| protected void hookOnCancel() { | ||
| dispose(); | ||
| } | ||
|
|
||
| /** | ||
| * {@inheritDoc} | ||
| */ | ||
| @Override | ||
| public void dispose() { | ||
| if (isDisposed.getAndSet(true)) { | ||
| return; | ||
| } | ||
|
|
||
| work.complete(); | ||
| subscription.cancel(); | ||
| timer.cancel(); | ||
| super.dispose(); | ||
| } | ||
| if (currentWork != null) { | ||
| currentWork.complete(); | ||
| } | ||
|
|
||
| private static final class ReceiveTimeoutTask extends TimerTask { | ||
| private final ClientLogger logger = new ClientLogger(ReceiveTimeoutTask.class); | ||
| private final long workId; | ||
| private final Runnable onDispose; | ||
| subscription.cancel(); | ||
|
|
||
| ReceiveTimeoutTask(long workId, Runnable onDispose) { | ||
| this.workId = workId; | ||
| this.onDispose = onDispose; | ||
| if (timeoutOperation != null && !timeoutOperation.isDisposed()) { | ||
| timeoutOperation.dispose(); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void run() { | ||
| logger.info("[{}] Timeout encountered, disposing of subscriber.", workId); | ||
| onDispose.run(); | ||
| } | ||
| private boolean isTerminated() { | ||
| return isDisposed.get(); | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.