-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Add ServiceBus Session Receiver Client #16690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
aee1663
094f9ed
b4aeb73
b7d7ab1
d9af703
f59e84f
8b063c3
a9a7a0e
2c2c7ae
464d533
27580ca
30266e1
4eb8093
60d0b52
600f60f
22308dd
925d7b8
b08e95d
c86c5fc
ef3d9d5
c13f280
e717790
110fcb7
d514793
36c8c95
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -624,11 +624,9 @@ public ServiceBusSenderClient buildClient() { | |
| @ServiceClientBuilder(serviceClients = {ServiceBusReceiverClient.class, ServiceBusReceiverAsyncClient.class}) | ||
| public final class ServiceBusSessionReceiverClientBuilder { | ||
| private boolean enableAutoComplete = true; | ||
| private Integer maxConcurrentSessions = null; | ||
| private int prefetchCount = DEFAULT_PREFETCH_COUNT; | ||
| private String queueName; | ||
| private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK; | ||
| private String sessionId; | ||
| private String subscriptionName; | ||
| private String topicName; | ||
| private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION; | ||
|
|
@@ -666,24 +664,6 @@ public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewDuration(Duration | |
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Enables session processing roll-over by processing at most {@code maxConcurrentSessions}. | ||
| * | ||
| * @param maxConcurrentSessions Maximum number of concurrent sessions to process at any given time. | ||
| * | ||
| * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. | ||
| * @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1. | ||
| */ | ||
| public ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) { | ||
| if (maxConcurrentSessions < 1) { | ||
| throw logger.logExceptionAsError(new IllegalArgumentException( | ||
| "maxConcurrentSessions cannot be less than 1.")); | ||
| } | ||
|
|
||
| this.maxConcurrentSessions = maxConcurrentSessions; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the prefetch count of the receiver. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link | ||
| * ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. | ||
|
|
@@ -728,18 +708,6 @@ public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMod | |
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the session id. | ||
| * | ||
| * @param sessionId session id. | ||
| * | ||
| * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. | ||
| */ | ||
| public ServiceBusSessionReceiverClientBuilder sessionId(String sessionId) { | ||
| this.sessionId = sessionId; | ||
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the name of the subscription in the topic to listen to. <b>{@link #topicName(String)} must also be set. | ||
| * </b> | ||
|
|
@@ -771,7 +739,7 @@ public ServiceBusSessionReceiverClientBuilder topicName(String topicName) { | |
| * Creates an <b>asynchronous</b>, <b>session-aware</b> Service Bus receiver responsible for reading {@link | ||
| * ServiceBusMessage messages} from a specific queue or topic. | ||
| * | ||
| * @return An new {@link ServiceBusReceiverAsyncClient} that receives messages from a queue or topic. | ||
| * @return An new {@link ServiceBusSessionReceiverAsyncClient} that receives messages from a queue or topic. | ||
| * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String) | ||
| * topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link | ||
| * #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in | ||
|
|
@@ -780,7 +748,7 @@ public ServiceBusSessionReceiverClientBuilder topicName(String topicName) { | |
| * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) | ||
| * queueName()} or {@link #topicName(String) topicName()}, respectively. | ||
| */ | ||
| public ServiceBusReceiverAsyncClient buildAsyncClient() { | ||
| public ServiceBusSessionReceiverAsyncClient buildAsyncClient() { | ||
| return buildAsyncClient(true); | ||
| } | ||
|
|
||
|
|
@@ -797,11 +765,12 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() { | |
| * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) | ||
| * queueName()} or {@link #topicName(String) topicName()}, respectively. | ||
| */ | ||
| public ServiceBusReceiverClient buildClient() { | ||
| return new ServiceBusReceiverClient(buildAsyncClient(false), retryOptions.getTryTimeout()); | ||
| public ServiceBusSessionReceiverClient buildClient() { | ||
| return new ServiceBusSessionReceiverClient(buildAsyncClient(false), | ||
| retryOptions.getTryTimeout()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This timeout is for a single attempt, right? If the user has set retry policy to 3 attempts, will this not wait for the 2nd attempt after the first attempt fails due to timeout? |
||
| } | ||
|
|
||
| private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) { | ||
| private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) { | ||
| final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName, | ||
| queueName); | ||
| final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName, | ||
|
|
@@ -822,34 +791,11 @@ private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAll | |
|
|
||
| final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer); | ||
| final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, | ||
| maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceiver(), | ||
| maxConcurrentSessions); | ||
|
|
||
| final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType, | ||
| connectionProcessor, tracerProvider, messageSerializer, receiverOptions); | ||
|
|
||
| return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath, | ||
| entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT, | ||
| tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, sessionManager); | ||
| } | ||
|
|
||
| /** | ||
| * This is a rolling session receiver only if maxConcurrentSessions is > 0 AND sessionId is null or empty. If | ||
| * there is a sessionId, this is going to be a single, named session receiver. | ||
| * | ||
| * @return {@code true} if this is an unnamed rolling session receiver; {@code false} otherwise. | ||
| */ | ||
| private boolean isRollingSessionReceiver() { | ||
| if (maxConcurrentSessions == null) { | ||
| return false; | ||
| } | ||
|
|
||
| if (maxConcurrentSessions < 1) { | ||
| throw logger.logExceptionAsError( | ||
| new IllegalArgumentException("Maximum number of concurrent sessions must be positive.")); | ||
| } | ||
| maxAutoLockRenewDuration, enableAutoComplete, null, null); | ||
|
|
||
| return CoreUtils.isNullOrEmpty(sessionId); | ||
|
Comment on lines
-825
to
-852
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of this will be required for the session processor. Keep this code and make this available only for the processor.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added maxConcurrentSessions back at package level but keep isRollingSessionReceiver deleted from the builder because ReceiverOptions.isRollingSessionReceiver() can be inferred from it's own properties. |
||
| return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), | ||
| entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer, | ||
| ServiceBusClientBuilder.this::onClientClose); | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.