-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Add ServiceBus Session Receiver Client #16690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Show resolved
Hide resolved
...saging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java
Show resolved
Hide resolved
...g-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public void close() { | ||
| this.onClientClose.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this idempotent? If we call close multiple times, what happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This calls the ServiceBusClientBuilder's close() method.
...servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Show resolved
Hide resolved
...ng-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java
Show resolved
Hide resolved
...g-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...g-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
Outdated
Show resolved
Hide resolved
...servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
Outdated
Show resolved
Hide resolved
...servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Show resolved
Hide resolved
|
This pull request is protected by Check Enforcer. What is Check Enforcer?Check Enforcer helps ensure all pull requests are covered by at least one check-run (typically an Azure Pipeline). When all check-runs associated with this pull request pass then Check Enforcer itself will pass. Why am I getting this message?You are getting this message because Check Enforcer did not detect any check-runs being associated with this pull request within five minutes. This may indicate that your pull request is not covered by any pipelines and so Check Enforcer is correctly blocking the pull request being merged. What should I do now?If the check-enforcer check-run is not passing and all other check-runs associated with this PR are passing (excluding license-cla) then you could try telling Check Enforcer to evaluate your pull request again. You can do this by adding a comment to this pull request as follows: What if I am onboarding a new service?Often, new services do not have validation pipelines associated with them, in order to bootstrap pipelines for a new service, you can issue the following command as a pull request comment: |
...g-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...saging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...g-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Show resolved
Hide resolved
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Show resolved
Hide resolved
...servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
Outdated
Show resolved
Hide resolved
| * @return A {@link ServiceBusReceiverClient} that is tied to the available session. | ||
| * @throws IllegalStateException if the operation times out. | ||
| */ | ||
| public ServiceBusReceiverClient acceptNextSession(Duration timeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if timeout is Duration.ZERO? Also, if the timeout is larger than the retry timeout what happens? I don't think any code change is required but we just need to document both these cases.
...cebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
Outdated
Show resolved
Hide resolved
...ssaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
Show resolved
Hide resolved
| maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceiver(), | ||
| maxConcurrentSessions); | ||
|
|
||
| final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType, | ||
| connectionProcessor, tracerProvider, messageSerializer, receiverOptions); | ||
|
|
||
| return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath, | ||
| entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT, | ||
| tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, sessionManager); | ||
| } | ||
|
|
||
| /** | ||
| * This is a rolling session receiver only if maxConcurrentSessions is > 0 AND sessionId is null or empty. If | ||
| * there is a sessionId, this is going to be a single, named session receiver. | ||
| * | ||
| * @return {@code true} if this is an unnamed rolling session receiver; {@code false} otherwise. | ||
| */ | ||
| private boolean isRollingSessionReceiver() { | ||
| if (maxConcurrentSessions == null) { | ||
| return false; | ||
| } | ||
|
|
||
| if (maxConcurrentSessions < 1) { | ||
| throw logger.logExceptionAsError( | ||
| new IllegalArgumentException("Maximum number of concurrent sessions must be positive.")); | ||
| } | ||
| maxAutoLockRenewDuration, enableAutoComplete, null, null); | ||
|
|
||
| return CoreUtils.isNullOrEmpty(sessionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of this will be required for the session processor. Keep this code and make this available only for the processor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added maxConcurrentSessions back at package level but keep isRollingSessionReceiver deleted from the builder because ReceiverOptions.isRollingSessionReceiver() can be inferred from it's own properties.
...saging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
Show resolved
Hide resolved
| ServiceBusReceiverClient buildClientForProcessor() { | ||
| return new ServiceBusReceiverClient(buildAsyncClientForProcessor(false), retryOptions.getTryTimeout()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Processor will never build a sync client. So, this method can be removed.
| } | ||
|
|
||
| private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) { | ||
| private ServiceBusReceiverAsyncClient buildAsyncClientForProcessor(boolean isAutoCompleteAllowed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the async client built for processor will always have autoComplete enabled, the boolean param is redundant and can be removed.
| return false; | ||
| public ServiceBusSessionReceiverClient buildClient() { | ||
| return new ServiceBusSessionReceiverClient(buildAsyncClient(false), | ||
| retryOptions.getTryTimeout()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This timeout is for a single attempt, right? If the user has set retry policy to 3 attempts, will this not wait for the 2nd attempt after the first attempt fails due to timeout?
Add
ServiceBusSessionReceiverAsyncClientandServiceBusSessionReceiverClientto accept a specific session or a next available session.Closes #15330