From aee166331fc10f91ff86edab8f8a56e511b79d68 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 21 Oct 2020 22:38:38 -0700 Subject: [PATCH 01/23] Add ServiceBus Session Receiver Client --- .../messaging/servicebus/ReceiverOptions.java | 7 +- .../servicebus/ServiceBusClientBuilder.java | 77 ++--------- .../ServiceBusReceiverAsyncClient.java | 33 +++-- .../ServiceBusSessionReceiverAsyncClient.java | 124 ++++++++++++++++++ .../ServiceBusSessionReceiverClient.java | 68 ++++++++++ .../servicebus/UnnamedSessionManager.java | 30 ++++- .../azure-messaging-servicebus.properties | 1 + .../messaging/servicebus/ReadmeSamples.java | 8 +- .../ReceiveMultipleSessionsAsyncSample.java | 4 +- .../ReceiveNamedSessionAsyncSample.java | 15 ++- .../servicebus/ReceiveNamedSessionSample.java | 7 +- .../ReceiveSingleSessionAsyncSample.java | 9 +- .../SendAndReceiveSessionMessageSample.java | 9 +- ...ReceiverAsyncClientJavaDocCodeSamples.java | 18 +-- ...BusReceiverAsyncClientIntegrationTest.java | 7 +- .../ServiceBusReceiverAsyncClientTest.java | 2 +- ...rviceBusReceiverClientIntegrationTest.java | 6 +- ...ceBusSenderAsyncClientIntegrationTest.java | 3 +- .../UnnamedSessionManagerIntegrationTest.java | 9 +- .../servicebus/UnnamedSessionManagerTest.java | 6 +- 20 files changed, 301 insertions(+), 142 deletions(-) create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java index 94a9364fd151..9ac21725048a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java @@ -12,7 +12,6 @@ class ReceiverOptions { private final ReceiveMode receiveMode; private final int prefetchCount; private final String sessionId; - private final boolean isRollingSessionReceiver; private final Integer maxConcurrentSessions; private final boolean isSessionReceiver; @@ -20,17 +19,15 @@ class ReceiverOptions { this.receiveMode = receiveMode; this.prefetchCount = prefetchCount; this.sessionId = null; - this.isRollingSessionReceiver = false; this.maxConcurrentSessions = null; this.isSessionReceiver = false; } ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, - String sessionId, boolean isRollingSessionReceiver, Integer maxConcurrentSessions) { + String sessionId, Integer maxConcurrentSessions) { this.receiveMode = receiveMode; this.prefetchCount = prefetchCount; this.sessionId = sessionId; - this.isRollingSessionReceiver = isRollingSessionReceiver; this.maxConcurrentSessions = maxConcurrentSessions; this.isSessionReceiver = true; } @@ -78,7 +75,7 @@ boolean isSessionReceiver() { * false} otherwise. */ public boolean isRollingSessionReceiver() { - return isRollingSessionReceiver; + return maxConcurrentSessions != null && maxConcurrentSessions > 0 && sessionId == null; } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index b847c3d746ce..d8df4eed6c56 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -621,35 +621,15 @@ public ServiceBusSenderClient buildClient() { @ServiceClientBuilder(serviceClients = {ServiceBusReceiverClient.class, ServiceBusReceiverAsyncClient.class}) public final class ServiceBusSessionReceiverClientBuilder { - private Integer maxConcurrentSessions = null; private int prefetchCount = DEFAULT_PREFETCH_COUNT; private String queueName; private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK; - private String sessionId; private String subscriptionName; private String topicName; private ServiceBusSessionReceiverClientBuilder() { } - /** - * Enables session processing roll-over by processing at most {@code maxConcurrentSessions}. - * - * @param maxConcurrentSessions Maximum number of concurrent sessions to process at any given time. - * - * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. - * @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1. - */ - public ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) { - if (maxConcurrentSessions < 1) { - throw logger.logExceptionAsError(new IllegalArgumentException( - "maxConcurrentSessions cannot be less than 1.")); - } - - this.maxConcurrentSessions = maxConcurrentSessions; - return this; - } - /** * Sets the prefetch count of the receiver. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link * ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. @@ -691,18 +671,6 @@ public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMod return this; } - /** - * Sets the session id. - * - * @param sessionId session id. - * - * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. - */ - public ServiceBusSessionReceiverClientBuilder sessionId(String sessionId) { - this.sessionId = sessionId; - return this; - } - /** * Sets the name of the subscription in the topic to listen to. {@link #topicName(String)} must also be set. * @@ -743,7 +711,7 @@ public ServiceBusSessionReceiverClientBuilder topicName(String topicName) { * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) * queueName()} or {@link #topicName(String) topicName()}, respectively. */ - public ServiceBusReceiverAsyncClient buildAsyncClient() { + public ServiceBusSessionReceiverAsyncClient buildAsyncClient() { final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName, queueName); final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName, @@ -753,28 +721,18 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() { final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer); final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, - sessionId, isRollingSessionReceiver(), maxConcurrentSessions); - - if (CoreUtils.isNullOrEmpty(sessionId)) { - final UnnamedSessionManager sessionManager = new UnnamedSessionManager(entityPath, entityType, - connectionProcessor, connectionProcessor.getRetryOptions().getTryTimeout(), tracerProvider, - messageSerializer, receiverOptions); + null, null); - return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath, - entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT, - tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, sessionManager); - } else { - return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath, - entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT, - tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose); - } + return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), + entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer, + ServiceBusClientBuilder.this::onClientClose); } /** * Creates a synchronous, session-aware Service Bus receiver responsible for reading * {@link ServiceBusMessage messages} from a specific queue or topic. * - * @return An new {@link ServiceBusReceiverClient} that receives messages from a queue or topic. + * @return An new {@link ServiceBusSessionReceiverClient} that receives messages from a queue or topic. * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String) * topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link * #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in @@ -783,27 +741,8 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() { * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) * queueName()} or {@link #topicName(String) topicName()}, respectively. */ - public ServiceBusReceiverClient buildClient() { - return new ServiceBusReceiverClient(buildAsyncClient(), retryOptions.getTryTimeout()); - } - - /** - * This is a rolling session receiver only if maxConcurrentSessions is > 0 AND sessionId is null or empty. If - * there is a sessionId, this is going to be a single, named session receiver. - * - * @return {@code true} if this is an unnamed rolling session receiver; {@code false} otherwise. - */ - private boolean isRollingSessionReceiver() { - if (maxConcurrentSessions == null) { - return false; - } - - if (maxConcurrentSessions < 1) { - throw logger.logExceptionAsError( - new IllegalArgumentException("Maximum number of concurrent sessions must be positive.")); - } - - return CoreUtils.isNullOrEmpty(sessionId); + public ServiceBusSessionReceiverClient buildClient() { + return new ServiceBusSessionReceiverClient(this.buildAsyncClient(), retryOptions.getTryTimeout()); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 6360dfc7e5aa..a0652889b45d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -62,22 +62,26 @@ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receiveWithReceiveAndDeleteMode} * *
Receive messages from a specific session
- *To fetch messages from a specific session, set {@link ServiceBusSessionReceiverClientBuilder#sessionId(String)}. + *
To fetch messages from a specific session, switch to {@link ServiceBusSessionReceiverClientBuilder} and + * build the session receiver client. Use {@link ServiceBusSessionReceiverAsyncClient#acceptSession(String)} to create a + * session-bound {@link ServiceBusReceiverAsyncClient}. *
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId} * - *Process messages from multiple sessions
- *To process messages from multiple sessions, set - * {@link ServiceBusSessionReceiverClientBuilder#maxConcurrentSessions(int)}. This will process in parallel at most - * {@code maxConcurrentSessions}. In addition, when all the messages in a session have been consumed, it will find the - * next available session to process.
- * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#multiplesessions} - * *Process messages from the first available session
*To process messages from the first available session, switch to {@link ServiceBusSessionReceiverClientBuilder} and - * build the receiver client. It will find the first available session to process messages from.
+ * build the session receiver client. Use {@link ServiceBusSessionReceiverAsyncClient#acceptNextSession()} to + * find the first available session to process messages from. * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#singlesession} * + *Process messages from multiple sessions
+ *To process messages from multiple sessions, switch to {@link ServiceBusSessionReceiverClientBuilder} and + * build the session receiver client, then use {@link ServiceBusSessionReceiverAsyncClient#getReceiverClient(int)} + * to create an receiver client that processes events from multiple sessions in parallel. + * In addition, when all the messages in a session have been consumed, it will find the + * next available session to process.
+ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#multiplesessions} + * *Rate limiting consumption of messages from Service Bus resource
*For message receivers that need to limit the number of messages they receive at a given time, they can use * {@link BaseSubscriber#request(long)}.
@@ -1097,4 +1101,15 @@ private String getLinkName(String sessionId) { return existing != null ? existing.getLinkName() : null; } } + + private MonoTo process messages from the first available session, switch to {@link ServiceBusSessionReceiverClientBuilder} and * build the session receiver client. Use {@link ServiceBusSessionReceiverAsyncClient#acceptNextSession()} to * find the first available session to process messages from.
- * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#singlesession} - * - *Process messages from multiple sessions
- *To process messages from multiple sessions, switch to {@link ServiceBusSessionReceiverClientBuilder} and - * build the session receiver client, then use {@link ServiceBusSessionReceiverAsyncClient#getReceiverClient(int)} - * to create an receiver client that processes events from multiple sessions in parallel. - * In addition, when all the messages in a session have been consumed, it will find the - * next available session to process.
- * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#multiplesessions} + * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession} * *Rate limiting consumption of messages from Service Bus resource
*For message receivers that need to limit the number of messages they receive at a given time, they can use
@@ -393,6 +385,11 @@ public Mono
+ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession}
+ *
+ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId}
+ * Accept next available session
+ *
+ *
*/
-public class ServiceBusSessionReceiverAsyncClient implements AutoCloseable {
+public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable {
private final String fullyQualifiedNamespace;
private final String entityPath;
private final MessagingEntityType entityType;
@@ -50,75 +61,62 @@ public class ServiceBusSessionReceiverAsyncClient implements AutoCloseable {
this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null.");
- this.unNamedSessionManager = new ServiceBusSessionManager(entityPath, entityType,
- connectionProcessor, tracerProvider,
- messageSerializer, receiverOptions);
+ this.unNamedSessionManager = new ServiceBusSessionManager(entityPath, entityType, connectionProcessor,
+ tracerProvider, messageSerializer, receiverOptions);
}
/**
- * Create a link for the next available session and use the link to create a {@link ServiceBusReceiverAsyncClient}
- * to receive messages from that session.
+ * Acquires a session lock for the next available session and create a {@link ServiceBusReceiverAsyncClient}
+ * to receive messages from the session.
+ *
+ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId} + *
* @param sessionId The session Id. * @return A {@link ServiceBusReceiverAsyncClient} that is tied to the specified session. - * @throws IllegalArgumentException if {@code sessionId} is null or empty. + * @throws NullPointerException if {@code sessionId} is null. + * @throws IllegalArgumentException if {@code sessionId} is empty. + * @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled. + * @throws com.azure.core.amqp.exception.AmqpException if the session has been locked by another session receiver. */ public Mono+ * {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession} + *
+ *+ * {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId} + *
+ *Accept next available session
+ * {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession} + * + * @return A {@link ServiceBusReceiverClient} that is tied to the available session. + * @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled. + */ + public ServiceBusReceiverClient acceptNextSession() { + return sessionAsyncClient.acceptNextSession() + .map(asyncClient -> new ServiceBusReceiverClient(asyncClient)) + .block(); } /** * Create a link for the next available session and use the link to create a {@link ServiceBusReceiverClient} * to receive messages from that session. + * @param timeout the call is cancelled after this duration. * @return A {@link ServiceBusReceiverClient} that is tied to the available session. + * @throws IllegalStateException if the operation times out. */ - public ServiceBusReceiverClient acceptNextSession() { + public ServiceBusReceiverClient acceptNextSession(Duration timeout) { + Objects.requireNonNull(timeout, "'timeout' can not be null."); return sessionAsyncClient.acceptNextSession() - .map(asyncClient -> new ServiceBusReceiverClient(asyncClient, operationTimeout)) - .block(operationTimeout); + .map(asyncClient -> new ServiceBusReceiverClient(asyncClient)) + .block(timeout); } /** - * Create a link for the "sessionId" and use the link to create a {@link ServiceBusReceiverClient} + * Acquires a session lock for {@code sessionId} and create a {@link ServiceBusReceiverClient} * to receive messages from the session. + *+ * {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId} + *
* @param sessionId The session Id. * @return A {@link ServiceBusReceiverClient} that is tied to the specified session. - * @throws IllegalArgumentException if {@code sessionId} is null or empty. + * @throws NullPointerException if {@code sessionId} is null. + * @throws IllegalArgumentException if {@code sessionId} is empty. + * @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled. + * @throws com.azure.core.amqp.exception.AmqpException if the session has been locked by another session receiver. */ public ServiceBusReceiverClient acceptSession(String sessionId) { return sessionAsyncClient.acceptSession(sessionId) - .map(asyncClient -> new ServiceBusReceiverClient(asyncClient, operationTimeout)) - .block(operationTimeout); + .map(asyncClient -> new ServiceBusReceiverClient(asyncClient)) + .block(); } /** - * Create a {@link ServiceBusReceiverClient} that processes at most {@code maxConcurrentSessions} sessions. + * Acquires a session lock for {@code sessionId} and create a {@link ServiceBusReceiverClient} + * to receive messages from the session. * - * @param maxConcurrentSessions Maximum number of concurrent sessions to process at any given time. + * @param sessionId The session Id. + * @param timeout the call is cancelled after this duration. * - * @return The {@link ServiceBusReceiverClient} object that will be used to receive messages. - * @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1. + * @return A {@link ServiceBusReceiverClient} that is tied to the specified session. + * @throws NullPointerException if {@code sessionId} is null. + * @throws IllegalArgumentException if {@code sessionId} is empty. + * @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled. + * @throws com.azure.core.amqp.exception.AmqpException if the session has been locked by another session receiver. */ - public ServiceBusReceiverClient getReceiverClient(int maxConcurrentSessions) { - return new ServiceBusReceiverClient(sessionAsyncClient.getReceiverClient(maxConcurrentSessions), - operationTimeout); + public ServiceBusReceiverClient acceptSession(String sessionId, Duration timeout) { + Objects.requireNonNull(timeout, "'timeout' can not be null."); + return sessionAsyncClient.acceptSession(sessionId) + .map(asyncClient -> new ServiceBusReceiverClient(asyncClient)) + .block(timeout); } @Override diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java deleted file mode 100644 index b7b5c0ee3702..000000000000 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.messaging.servicebus; - -import reactor.core.Disposable; -import reactor.core.publisher.Mono; - -import java.security.SecureRandom; -import java.util.concurrent.TimeUnit; - -/** - * Sample demonstrates how to receive and process multiple sessions. In the sample, at most 3 sessions are processed - * concurrently. When there are no more messages in a session, the receiver finds the next available session to - * process. - */ -public class ReceiveMultipleSessionsAsyncSample { - private static final SecureRandom RANDOM = new SecureRandom(); - - /** - * Main method to invoke this demo on how to receive messages from multiple sessions in an Azure Service Bus Queue. - * - * @param args Unused arguments to the program. - * - * @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete. - */ - public static void main(String[] args) throws InterruptedException { - - // The connection string value can be obtained by: - // 1. Going to your Service Bus namespace in Azure Portal. - // 2. Go to "Shared access policies" - // 3. Copy the connection string for the "RootManageSharedAccessKey" policy. - String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" - + "SharedAccessKey={key}"; - - // Create a receiver. - // "<* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId} @@ -67,13 +69,15 @@ public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable /** * Acquires a session lock for the next available session and create a {@link ServiceBusReceiverAsyncClient} - * to receive messages from the session. - *
Accept next available session
+ * to receive messages from the session. It will wait until a session is available if no one is available + * immediately. + ** {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession} *
* @return A {@link ServiceBusReceiverAsyncClient} that is tied to the available session. * @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled. */ + @ServiceMethod(returns = ReturnType.SINGLE) public Mono* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId} *
@@ -100,10 +105,15 @@ public Mono* {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId} @@ -26,7 +29,7 @@ * * */ -public class ServiceBusSessionReceiverClient implements AutoCloseable { +public final class ServiceBusSessionReceiverClient implements AutoCloseable { private final ServiceBusSessionReceiverAsyncClient sessionAsyncClient; private final Duration operationTimeout; @@ -37,14 +40,18 @@ public class ServiceBusSessionReceiverClient implements AutoCloseable { /** * Acquires a session lock for the next available session and create a {@link ServiceBusReceiverClient} - * to receive messages from the session. It will wait if no session is available. - *
Accept next available session
+ * to receive messages from the session. It will wait until a session is available if no one is available + * immediately. + ** {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession} *
* @return A {@link ServiceBusReceiverClient} that is tied to the available session. * @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled. - * @throws IllegalStateException if the operation times out. + * @throws IllegalStateException if the operation times out. The timeout duration is the tryTimeout + * of when you build this client with the + * {@link ServiceBusClientBuilder#retryOptions(com.azure.core.amqp.AmqpRetryOptions)}. */ + @ServiceMethod(returns = ReturnType.SINGLE) public ServiceBusReceiverClient acceptNextSession() { return sessionAsyncClient.acceptNextSession() .map(asyncClient -> new ServiceBusReceiverClient(asyncClient, operationTimeout)) @@ -53,7 +60,8 @@ public ServiceBusReceiverClient acceptNextSession() { /** * Acquires a session lock for {@code sessionId} and create a {@link ServiceBusReceiverClient} - * to receive messages from the session. + * to receive messages from the session. If the session is already locked by another client, an + * {@link com.azure.core.amqp.exception.AmqpException} is thrown immediately. ** {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId} *
@@ -63,8 +71,11 @@ public ServiceBusReceiverClient acceptNextSession() { * @throws IllegalArgumentException if {@code sessionId} is empty. * @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled. * @throws com.azure.core.amqp.exception.AmqpException if the session has been locked by another session receiver. - * @throws IllegalStateException if the operation times out. + * @throws IllegalStateException if the operation times out. The timeout duration is the tryTimeout + * of when you build this client with the + * {@link ServiceBusClientBuilder#retryOptions(com.azure.core.amqp.AmqpRetryOptions)}. */ + @ServiceMethod(returns = ReturnType.SINGLE) public ServiceBusReceiverClient acceptSession(String sessionId) { return sessionAsyncClient.acceptSession(sessionId) .map(asyncClient -> new ServiceBusReceiverClient(asyncClient, operationTimeout)) From 4eb8093ac1d21eb7f87130becfdffa3e472ff03f Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 28 Oct 2020 23:02:09 -0700 Subject: [PATCH 11/23] Remove INVALID_LOCK_TOKEN_STRING --- .../com/azure/messaging/servicebus/implementation/Messages.java | 1 - .../src/main/resources/azure-messaging-servicebus.properties | 1 - 2 files changed, 2 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java index 49671a2fdb02..5c7ca158388f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java @@ -20,7 +20,6 @@ public class Messages { public static final String CLASS_NOT_A_SUPPORTED_TYPE = getMessage("CLASS_NOT_A_SUPPORTED_TYPE"); public static final String INVALID_OPERATION_DISPOSED_RECEIVER = getMessage("INVALID_OPERATION_DISPOSED_RECEIVER"); - public static final String INVALID_LOCK_TOKEN_STRING = getMessage("INVALID_LOCK_TOKEN_STRING"); public static final String MESSAGE_NOT_OF_TYPE = getMessage("MESSAGE_NOT_OF_TYPE"); public static final String REQUEST_VALUE_NOT_VALID = getMessage("REQUEST_VALUE_NOT_VALID"); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties b/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties index 291f6ee5831a..0600b337ca0e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/resources/azure-messaging-servicebus.properties @@ -5,4 +5,3 @@ MESSAGE_NOT_OF_TYPE=Message body type is not of type Data, but type: %s. Not set REQUEST_VALUE_NOT_VALID=Back pressure request value not valid. It must be between {} and {}. INVALID_OPERATION_DISPOSED_RECEIVER=Cannot perform operation '%s' on a disposed receiver. INVALID_LOCK_TOKEN_STRING=Invalid lock token '%s'. -SESSION_BOUND_RECEIVER=This receiver client accepts session %s. It can't be used for another session %s. From 60d0b5282beb89005ff11ad4796c8cd73a4a36f7 Mon Sep 17 00:00:00 2001 From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com> Date: Wed, 28 Oct 2020 23:35:47 -0700 Subject: [PATCH 12/23] Fix unnamesession test --- .../messaging/servicebus/ServiceBusSessionManagerTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java index 0d73c526c2ac..833b78e2b561 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java @@ -347,6 +347,7 @@ void multipleSessions() { void multipleReceiveUnnamedSession() { // Arrange final int expectedLinksCreated = 2; + final Callable* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId} *
@@ -105,9 +105,7 @@ public Mono* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession} - *
** {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId} - *
** {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession} - *
+ *
* @return A {@link ServiceBusReceiverAsyncClient} that is tied to the available session.
* @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled.
*/
@@ -101,7 +99,7 @@ public Mono
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId}
- *
* {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession}
- *
* {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId}
- *
* {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession}
- *
* @return A {@link ServiceBusReceiverClient} that is tied to the available session.
* @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled.
* @throws IllegalStateException if the operation times out. The timeout duration is the tryTimeout
@@ -66,7 +64,7 @@ public ServiceBusReceiverClient acceptNextSession() {
* {@link com.azure.core.amqp.exception.AmqpException} is thrown immediately.
*
* {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId}
- *
* @param sessionId The session Id.
* @return A {@link ServiceBusReceiverClient} that is tied to the specified session.
* @throws NullPointerException if {@code sessionId} is null.
From e7177900c0688f105b99540bffe42d0282495828 Mon Sep 17 00:00:00 2001
From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com>
Date: Thu, 29 Oct 2020 16:31:29 -0700
Subject: [PATCH 20/23] Restore some session related code but hide with package
level
---
.../servicebus/ServiceBusClientBuilder.java | 66 ++++++++
.../servicebus/ServiceBusReceiverClient.java | 141 +++++++++++++++---
2 files changed, 187 insertions(+), 20 deletions(-)
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
index ce079b18c61f..98b47ad619e0 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
@@ -754,6 +754,72 @@ public ServiceBusSessionReceiverClientBuilder topicName(String topicName) {
return this;
}
+ /**
+ * Creates an asynchronous, session-aware Service Bus receiver responsible for reading {@link
+ * ServiceBusMessage messages} from a specific queue or topic.
+ *
+ * @return An new {@link ServiceBusReceiverAsyncClient} that receives messages from a queue or topic.
+ * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
+ * topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
+ * #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
+ * {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
+ * #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
+ * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
+ * queueName()} or {@link #topicName(String) topicName()}, respectively.
+ */
+ ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() {
+ return buildAsyncClientForProcessor(true);
+ }
+
+ /**
+ * Creates a synchronous, session-aware Service Bus receiver responsible for reading {@link
+ * ServiceBusMessage messages} from a specific queue or topic.
+ *
+ * @return An new {@link ServiceBusReceiverClient} that receives messages from a queue or topic.
+ * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String)
+ * topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link
+ * #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in
+ * {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link
+ * #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not.
+ * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String)
+ * queueName()} or {@link #topicName(String) topicName()}, respectively.
+ */
+ ServiceBusReceiverClient buildClientForProcessor() {
+ return new ServiceBusReceiverClient(buildAsyncClientForProcessor(false), retryOptions.getTryTimeout());
+ }
+
+ private ServiceBusReceiverAsyncClient buildAsyncClientForProcessor(boolean isAutoCompleteAllowed) {
+ final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName,
+ queueName);
+ final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName,
+ SubQueue.NONE);
+
+ if (!isAutoCompleteAllowed && enableAutoComplete) {
+ logger.warning(
+ "'enableAutoComplete' is not supported in synchronous client except through callback receive.");
+ enableAutoComplete = false;
+ } else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
+ throw logger.logExceptionAsError(new IllegalStateException(
+ "'enableAutoComplete' is not valid for RECEIVE_AND_DELETE mode."));
+ }
+
+ if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
+ maxAutoLockRenewDuration = Duration.ZERO;
+ }
+
+ final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
+ final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
+ maxAutoLockRenewDuration, enableAutoComplete, null,
+ maxConcurrentSessions);
+
+ final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType,
+ connectionProcessor, tracerProvider, messageSerializer, receiverOptions);
+
+ return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
+ entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
+ tracerProvider, messageSerializer, ServiceBusClientBuilder.this::onClientClose, sessionManager);
+ }
+
/**
* Creates an asynchronous, session-aware Service Bus receiver responsible for reading {@link
* ServiceBusMessage messages} from a specific queue or topic.
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
index 6ef4cd224f51..66acf0c326d5 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java
@@ -216,7 +216,7 @@ public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions opti
* @throws IllegalStateException if the receiver is a non-session receiver.
*/
public byte[] getSessionState() {
- return asyncClient.getSessionState().block(operationTimeout);
+ return this.getSessionState(asyncClient.getReceiverOptions().getSessionId());
}
/**
@@ -228,9 +228,22 @@ public byte[] getSessionState() {
* @see Message browsing
*/
public ServiceBusReceivedMessage peekMessage() {
- return asyncClient.peekMessage().block(operationTimeout);
+ return this.peekMessage(asyncClient.getReceiverOptions().getSessionId());
}
+ /**
+ * Reads the next active message without changing the state of the receiver or the message source. The first call to
+ * {@code peek()} fetches the first active message for this receiver. Each subsequent call fetches the subsequent
+ * message in the entity.
+ *
+ * @param sessionId Session id of the message to peek from. {@code null} if there is no session.
+ *
+ * @return A peeked {@link ServiceBusReceivedMessage}.
+ * @see Message browsing
+ */
+ ServiceBusReceivedMessage peekMessage(String sessionId) {
+ return asyncClient.peekMessage(sessionId).block(operationTimeout);
+ }
/**
* Starting from the given sequence number, reads next the active message without changing the state of the receiver
* or the message source.
@@ -241,7 +254,21 @@ public ServiceBusReceivedMessage peekMessage() {
* @see Message browsing
*/
public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber) {
- return asyncClient.peekMessageAt(sequenceNumber).block(operationTimeout);
+ return this.peekMessageAt(sequenceNumber, asyncClient.getReceiverOptions().getSessionId());
+ }
+
+ /**
+ * Starting from the given sequence number, reads next the active message without changing the state of the receiver
+ * or the message source.
+ *
+ * @param sequenceNumber The sequence number from where to read the message.
+ * @param sessionId Session id of the message to peek from. {@code null} if there is no session.
+ *
+ * @return A peeked {@link ServiceBusReceivedMessage}.
+ * @see Message browsing
+ */
+ ServiceBusReceivedMessage peekMessageAt(long sequenceNumber, String sessionId) {
+ return asyncClient.peekMessageAt(sequenceNumber, sessionId).block(operationTimeout);
}
/**
@@ -254,12 +281,26 @@ public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber) {
* @see Message browsing
*/
public IterableStream
+ *
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession}
*
+ *
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId}
*
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId}
- *
+ *
* @param sessionId The session Id.
* @return A {@link ServiceBusReceiverAsyncClient} that is tied to the specified session.
* @throws NullPointerException if {@code sessionId} is null.
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
index c515acf0fc58..a8994c59091a 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
@@ -16,13 +16,13 @@
*
+ *
* {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession}
*
+ *
* {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId}
*
+ *
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession}
- *
+ *
* @return A {@link ServiceBusReceiverAsyncClient} that is tied to the available session.
* @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled.
*/
@@ -97,9 +92,9 @@ public Mono
+ *
* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId}
- *
+ *
* @param sessionId The session Id.
* @return A {@link ServiceBusReceiverAsyncClient} that is tied to the specified session.
* @throws NullPointerException if {@code sessionId} is null.
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
index a8994c59091a..57078cfc28ee 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java
@@ -13,19 +13,13 @@
/**
* This session receiver client is used to acquire session locks from a queue or topic and create
* {@link ServiceBusReceiverClient} instances that are tied to the locked sessions.
- *
+ *
* {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession}
- *
+ *
* @return A {@link ServiceBusReceiverClient} that is tied to the available session.
* @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled.
* @throws IllegalStateException if the operation times out. The timeout duration is the tryTimeout
@@ -62,9 +56,9 @@ public ServiceBusReceiverClient acceptNextSession() {
* Acquires a session lock for {@code sessionId} and create a {@link ServiceBusReceiverClient}
* to receive messages from the session. If the session is already locked by another client, an
* {@link com.azure.core.amqp.exception.AmqpException} is thrown immediately.
- *
+ *
* {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId}
- *
+ *
* @param sessionId The session Id.
* @return A {@link ServiceBusReceiverClient} that is tied to the specified session.
* @throws NullPointerException if {@code sessionId} is null.
*
@@ -99,7 +99,7 @@ public Mono
*
From 36c8c95a2d0c929c73c7604b24fc3db176e97db2 Mon Sep 17 00:00:00 2001
From: Yijun Xie <48257664+YijunXieMS@users.noreply.github.com>
Date: Fri, 30 Oct 2020 01:32:40 -0700
Subject: [PATCH 23/23] Try fix javadoc ci problem
---
.../ServiceBusSessionReceiverAsyncClient.java | 25 ++++++++-----------
.../ServiceBusSessionReceiverClient.java | 24 +++++++-----------
2 files changed, 19 insertions(+), 30 deletions(-)
diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
index 2ee274d1b43e..3594b8aca09e 100644
--- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
+++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java
@@ -22,19 +22,14 @@
/**
* This session receiver client is used to acquire session locks from a queue or topic and create
* {@link ServiceBusReceiverAsyncClient} instances that are tied to the locked sessions.
- *
- *
+ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession}
+ *
+ * Use {@link #acceptNextSession()} to acquire the lock of the next available session without specifying the session id.
+ *
+ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId}
*
*/
@ServiceClient(builder = ServiceBusClientBuilder.class, isAsync = true)
@@ -71,9 +66,9 @@ public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable
* Acquires a session lock for the next available session and create a {@link ServiceBusReceiverAsyncClient}
* to receive messages from the session. It will wait until a session is available if no one is available
* immediately.
- *
- *
+ * Use {@link #acceptNextSession()} to acquire the lock of the next available session without specifying the session id.
+ *
+ * {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId}
*
*/
@ServiceClient(builder = ServiceBusClientBuilder.class)
@@ -42,9 +36,9 @@ public final class ServiceBusSessionReceiverClient implements AutoCloseable {
* Acquires a session lock for the next available session and create a {@link ServiceBusReceiverClient}
* to receive messages from the session. It will wait until a session is available if no one is available
* immediately.
- *