-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Servicebus track2 sync queue up multiple receive calls #10940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
hemanttanwar
merged 35 commits into
Azure:master
from
hemanttanwar:servicebus-track2-sync-queueup-multiple-receive-calls
May 22, 2020
Merged
Changes from 9 commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
be7f82b
First draft implementation
fb34f1c
Queue up the receive request queuest
41cec1e
Improved logging
9ed2f31
spot bugs check resolution
1a3ff03
work in progress
5c3cf98
Increment package version after release of com.azure azure-messaging-…
azure-sdk 1d83193
another implementation for multiple receive using BaseSubscriber
d9b69e8
continue fix some issues
5a8a9ec
added test
0146620
fixing unwanted check
71cd8d1
REview comments
452d557
Incorporating review comments
9b52a98
Removing unused import.
bdd41f3
continue work ...
d8dd7b6
Merge branch 'master' of github.com:Azure/azure-sdk-for-java into ser…
39b9a1a
resolve merge conflict
f9cc7d2
Merge branch 'azure-sdk-increment-package-version-servicebus-386162'
0985041
Added loginc for timeout
d1589a0
timeout and remaining messages to request logic
3a4b16f
merge master
b67ac9b
Merge branch 'master' of github.com:hemanttanwar/azure-sdk-for-java
08c3c87
Chaging WIP logic
0361bc1
Changed logic for draining queue
77b4001
Changed logic for draining queue
d473eb5
formatting changes
13ef032
formatting changes
2e1f98b
removing unwanted flags.git statuys
7a333f9
Added lock in ServiceBusReceiverClient
64e6b30
some more optimization
c9dcdd0
incorporated review comments
a1c24b3
removed unwanted files
510e76c
Merge branch 'master' of github.com:Azure/azure-sdk-for-java
b9be8f5
Merge master into branch
fe0486c
unit test
8bea4db
adding unit test
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
168 changes: 168 additions & 0 deletions
168
...g-servicebus/src/main/java/com/azure/messaging/servicebus/LongLivedMessageSubscriber.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| package com.azure.messaging.servicebus; | ||
|
|
||
| import com.azure.core.util.logging.ClientLogger; | ||
| import org.reactivestreams.Subscription; | ||
| import reactor.core.Disposable; | ||
| import reactor.core.publisher.BaseSubscriber; | ||
| import reactor.core.publisher.Mono; | ||
|
|
||
| import java.util.Queue; | ||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| class LongLivedMessageSubscriber extends BaseSubscriber<ServiceBusReceivedMessageContext> { | ||
|
|
||
| private final ClientLogger logger = new ClientLogger(SynchronousMessageSubscriber.class); | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| private final AtomicBoolean isDisposed = new AtomicBoolean(false); | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| private final long prefetch; | ||
| private final AtomicInteger wip = new AtomicInteger(); | ||
|
|
||
| private volatile Subscription subscription; | ||
|
|
||
| private Queue<SynchronousReceiveWork> workQueue = new ConcurrentLinkedQueue<>(); | ||
| private SynchronousReceiveWork currentWork = null; | ||
| private Disposable timeoutOperation; | ||
| private Disposable drainQueueDisposable; | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
|
|
||
|
|
||
| LongLivedMessageSubscriber(long prefetch) { | ||
| this.prefetch = prefetch; | ||
|
conniey marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| /** | ||
| * On an initial subscription, will take the first work item, and request that amount of work for it. | ||
| * | ||
| * @param subscription Subscription for upstream. | ||
| */ | ||
| @Override | ||
| protected void hookOnSubscribe(Subscription subscription) { | ||
| if (this.subscription == null) { | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| this.subscription = subscription; | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Publishes the event to the current {@link SynchronousReceiveWork}. If that work item is complete, will dispose of | ||
| * the subscriber. | ||
| * | ||
| * @param message Event to publish. | ||
| */ | ||
| @Override | ||
| protected void hookOnNext(ServiceBusReceivedMessageContext message) { | ||
|
|
||
| currentWork.next(message); | ||
|
|
||
| logger.verbose("[{}] received message with Sequence Number [{}].", currentWork.getId(), message.getMessage().getSequenceNumber()); | ||
|
|
||
| if (currentWork.isTerminal()) { | ||
| logger.info("[{}] Completed. Closing Flux and cancelling subscription.", currentWork.getId()); | ||
| completeCurrentWork(currentWork); | ||
| } | ||
| } | ||
|
|
||
| private void completeCurrentWork(SynchronousReceiveWork currentWork) { | ||
|
|
||
| if (isTerminated()) { | ||
| return; | ||
| } | ||
|
|
||
| currentWork.complete(); | ||
| logger.verbose("[{}] work completed.", currentWork.getId()); | ||
|
|
||
| if (timeoutOperation != null && !timeoutOperation.isDisposed()) { | ||
| timeoutOperation.dispose(); | ||
| } | ||
| if (drainQueueDisposable != null && !drainQueueDisposable.isDisposed()) { | ||
| drainQueueDisposable.dispose(); | ||
| } | ||
|
|
||
| if (wip.decrementAndGet() != 0) { | ||
| logger.warning("There is another worker in drainLoop. But there should only be 1 worker. Value:"+wip.get()); | ||
| } | ||
|
|
||
| // After current work finished and there more receive requests | ||
| if (workQueue.size() > 0 ) { | ||
| drain(); | ||
| } | ||
| } | ||
|
|
||
| void queueWork(SynchronousReceiveWork work) { | ||
| workQueue.add(work); | ||
| drain(); | ||
| } | ||
|
|
||
| private void drain() { | ||
| // If someone is already in this loop, then we are already clearing the queue. | ||
| if (!wip.compareAndSet(0, 1)) { | ||
| return; | ||
| } | ||
| // Drain queue.. | ||
| drainQueueDisposable = Mono.just(true) | ||
| .subscribe(l -> { | ||
| drainQueue(); | ||
| }); | ||
| } | ||
|
|
||
| private void drainQueue() { | ||
| if (isTerminated()) { | ||
| return; | ||
| } | ||
| currentWork = workQueue.poll(); | ||
| if (currentWork == null) { | ||
| return; | ||
| } | ||
| subscription.request(currentWork.getNumberOfEvents()); | ||
|
|
||
| // timer to complete the current in case of timeout trigger | ||
| timeoutOperation = Mono.delay(currentWork.getTimeout()) | ||
| .subscribe(l -> { | ||
| if (!currentWork.isTerminal()) { | ||
| completeCurrentWork(currentWork); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
|
|
||
| @Override | ||
| protected void hookOnComplete() { | ||
| logger.info("[{}] Completed. No events to listen to.", currentWork.getId()); | ||
| dispose(); | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| /** | ||
| * {@inheritDoc} | ||
| */ | ||
| @Override | ||
| protected void hookOnError(Throwable throwable) { | ||
| logger.error("[{}] Errors occurred upstream", currentWork.getId(), throwable); | ||
| currentWork.error(throwable); | ||
| dispose(); | ||
| } | ||
|
|
||
| @Override | ||
| protected void hookOnCancel() { | ||
| dispose(); | ||
| } | ||
|
|
||
| /** | ||
| * {@inheritDoc} | ||
| */ | ||
| @Override | ||
| public void dispose() { | ||
| if (isDisposed.getAndSet(true)) { | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| return; | ||
| } | ||
|
|
||
| currentWork.complete(); | ||
| subscription.cancel(); | ||
| if (timeoutOperation != null && !timeoutOperation.isDisposed() ) { | ||
|
hemanttanwar marked this conversation as resolved.
Outdated
|
||
| timeoutOperation.dispose(); | ||
| } | ||
| super.dispose(); | ||
| } | ||
|
|
||
| private boolean isTerminated(){ | ||
| return isDisposed.get(); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.