diff --git a/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml b/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml index d2d6a48257c4..350114d7c9ad 100755 --- a/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml +++ b/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml @@ -167,6 +167,10 @@ the main ServiceBusClientBuilder. --> + + + + diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index 0d360fbd8c0f..cbbdb6ac30dc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -57,7 +57,7 @@ Both the asynchronous and synchronous Service Bus sender and receiver clients ar `ServiceBusClientBuilder`. The snippets below create a synchronous Service Bus sender and an asynchronous receiver, respectively. - + ```java ServiceBusSenderClient sender = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -66,7 +66,7 @@ ServiceBusSenderClient sender = new ServiceBusClientBuilder() .buildClient(); ``` - + ```java ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -102,7 +102,7 @@ refer to [the associated documentation][aad_authorization]. Use the returned token credential to authenticate the client: - + ```java TokenCredential credential = new DefaultAzureCredentialBuilder() .build(); @@ -152,7 +152,7 @@ a topic. The snippet below creates a synchronous [`ServiceBusSenderClient`][ServiceBusSenderClient] to publish a message to a queue. - + ```java ServiceBusSenderClient sender = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -187,7 +187,7 @@ queue or topic/subscriber. The snippet below creates a [`ServiceBusReceiverClient`][ServiceBusReceiverClient] to receive messages from a topic subscription. - + ```java ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -214,7 +214,7 @@ receiver.close(); The asynchronous [`ServiceBusReceiverAsyncClient`][ServiceBusReceiverAsyncClient] continuously fetches messages until the `subscription` is disposed. - + ```java ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") @@ -247,7 +247,7 @@ When a message is received, it can be settled using any of the `complete()`, `ab overloads. The sample below completes a received message from synchronous [`ServiceBusReceiverClient`][ServiceBusReceiverClient]. - + ```java // This fetches a batch of 10 messages or until the default operation timeout has elapsed. receiver.receiveMessages(10).forEach(context -> { @@ -287,7 +287,7 @@ Create a [`ServiceBusSenderClient`][ServiceBusSenderClient] for a session enable `ServiceBusMessage.setSessionId(String)` on a `ServiceBusMessage` will publish the message to that session. If the session does not exist, it is created. - + ```java // Setting sessionId publishes that message to a specific session, in this case, "greeting". ServiceBusMessage message = new ServiceBusMessage("Hello world") @@ -300,25 +300,26 @@ sender.sendMessage(message); Receivers can fetch messages from a specific session or the first available, unlocked session. - + ```java // Creates a session-enabled receiver that gets messages from the session "greetings". -ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() +ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sessionReceiver() .queueName("<< QUEUE NAME >>") - .sessionId("greetings") .buildAsyncClient(); +Mono receiverAsyncClient = sessionReceiver.acceptSession("greetings"); ``` - + ```java // Creates a session-enabled receiver that gets messages from the first available session. -ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() +ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sessionReceiver() .queueName("<< QUEUE NAME >>") .buildAsyncClient(); +Mono receiverAsyncClient = sessionReceiver.acceptNextSession(); ``` ### Create a dead-letter queue Receiver @@ -328,7 +329,7 @@ The dead-letter queue doesn't need to be explicitly created and can't be deleted of the main entity. For session enabled or non-session queue or topic subscriptions, the dead-letter receiver can be created the same way as shown below. Learn more about dead-letter queue [here][dead-letter-queue]. - + ```java ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java index bc388dfc4232..a3748b714d98 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java @@ -3,7 +3,6 @@ package com.azure.messaging.servicebus; -import com.azure.core.util.CoreUtils; import com.azure.messaging.servicebus.models.ReceiveMode; import java.time.Duration; @@ -16,28 +15,22 @@ class ReceiverOptions { private final int prefetchCount; private final boolean enableAutoComplete; private final String sessionId; - private final boolean isRollingSessionReceiver; private final Integer maxConcurrentSessions; - private final boolean isSessionReceiver; private final Duration maxLockRenewDuration; ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, boolean enableAutoComplete) { - this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, false, null); + this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null); } ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration, - boolean enableAutoComplete, String sessionId, boolean isRollingSessionReceiver, - Integer maxConcurrentSessions) { - + boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions) { this.receiveMode = receiveMode; this.prefetchCount = prefetchCount; this.enableAutoComplete = enableAutoComplete; this.sessionId = sessionId; - this.isRollingSessionReceiver = isRollingSessionReceiver; this.maxConcurrentSessions = maxConcurrentSessions; this.maxLockRenewDuration = maxLockRenewDuration; - this.isSessionReceiver = !CoreUtils.isNullOrEmpty(sessionId) || isRollingSessionReceiver; } /** @@ -90,7 +83,7 @@ boolean isAutoLockRenewEnabled() { * @return true if it is a session-aware receiver; false otherwise. */ boolean isSessionReceiver() { - return isSessionReceiver; + return sessionId != null || maxConcurrentSessions != null; } /** @@ -100,7 +93,7 @@ boolean isSessionReceiver() { * false} otherwise. */ public boolean isRollingSessionReceiver() { - return isRollingSessionReceiver; + return maxConcurrentSessions != null && maxConcurrentSessions > 0 && sessionId == null; } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index 5a97bf81f991..98b47ad619e0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -628,7 +628,6 @@ public final class ServiceBusSessionReceiverClientBuilder { private int prefetchCount = DEFAULT_PREFETCH_COUNT; private String queueName; private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK; - private String sessionId; private String subscriptionName; private String topicName; private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION; @@ -674,7 +673,7 @@ public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewDuration(Duration * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. * @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1. */ - public ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) { + ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) { if (maxConcurrentSessions < 1) { throw logger.logExceptionAsError(new IllegalArgumentException( "maxConcurrentSessions cannot be less than 1.")); @@ -728,18 +727,6 @@ public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMod return this; } - /** - * Sets the session id. - * - * @param sessionId session id. - * - * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. - */ - public ServiceBusSessionReceiverClientBuilder sessionId(String sessionId) { - this.sessionId = sessionId; - return this; - } - /** * Sets the name of the subscription in the topic to listen to. {@link #topicName(String)} must also be set. * @@ -780,8 +767,8 @@ public ServiceBusSessionReceiverClientBuilder topicName(String topicName) { * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) * queueName()} or {@link #topicName(String) topicName()}, respectively. */ - public ServiceBusReceiverAsyncClient buildAsyncClient() { - return buildAsyncClient(true); + ServiceBusReceiverAsyncClient buildAsyncClientForProcessor() { + return buildAsyncClientForProcessor(true); } /** @@ -797,11 +784,11 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() { * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) * queueName()} or {@link #topicName(String) topicName()}, respectively. */ - public ServiceBusReceiverClient buildClient() { - return new ServiceBusReceiverClient(buildAsyncClient(false), retryOptions.getTryTimeout()); + ServiceBusReceiverClient buildClientForProcessor() { + return new ServiceBusReceiverClient(buildAsyncClientForProcessor(false), retryOptions.getTryTimeout()); } - private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) { + private ServiceBusReceiverAsyncClient buildAsyncClientForProcessor(boolean isAutoCompleteAllowed) { final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName, queueName); final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName, @@ -822,7 +809,7 @@ private ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAll final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer); final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, - maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceiver(), + maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions); final ServiceBusSessionManager sessionManager = new ServiceBusSessionManager(entityPath, entityType, @@ -834,22 +821,66 @@ maxAutoLockRenewDuration, enableAutoComplete, sessionId, isRollingSessionReceive } /** - * This is a rolling session receiver only if maxConcurrentSessions is > 0 AND sessionId is null or empty. If - * there is a sessionId, this is going to be a single, named session receiver. + * Creates an asynchronous, session-aware Service Bus receiver responsible for reading {@link + * ServiceBusMessage messages} from a specific queue or topic. + * + * @return An new {@link ServiceBusSessionReceiverAsyncClient} that receives messages from a queue or topic. + * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String) + * topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link + * #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in + * {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link + * #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not. + * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) + * queueName()} or {@link #topicName(String) topicName()}, respectively. + */ + public ServiceBusSessionReceiverAsyncClient buildAsyncClient() { + return buildAsyncClient(true); + } + + /** + * Creates a synchronous, session-aware Service Bus receiver responsible for reading {@link + * ServiceBusMessage messages} from a specific queue or topic. * - * @return {@code true} if this is an unnamed rolling session receiver; {@code false} otherwise. + * @return An new {@link ServiceBusReceiverClient} that receives messages from a queue or topic. + * @throws IllegalStateException if {@link #queueName(String) queueName} or {@link #topicName(String) + * topicName} are not set or, both of these fields are set. It is also thrown if the Service Bus {@link + * #connectionString(String) connectionString} contains an {@code EntityPath} that does not match one set in + * {@link #queueName(String) queueName} or {@link #topicName(String) topicName}. Lastly, if a {@link + * #topicName(String) topicName} is set, but {@link #subscriptionName(String) subscriptionName} is not. + * @throws IllegalArgumentException Queue or topic name are not set via {@link #queueName(String) + * queueName()} or {@link #topicName(String) topicName()}, respectively. */ - private boolean isRollingSessionReceiver() { - if (maxConcurrentSessions == null) { - return false; + public ServiceBusSessionReceiverClient buildClient() { + return new ServiceBusSessionReceiverClient(buildAsyncClient(false), + retryOptions.getTryTimeout()); + } + + private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) { + final MessagingEntityType entityType = validateEntityPaths(logger, connectionStringEntityName, topicName, + queueName); + final String entityPath = getEntityPath(logger, entityType, queueName, topicName, subscriptionName, + SubQueue.NONE); + + if (!isAutoCompleteAllowed && enableAutoComplete) { + logger.warning( + "'enableAutoComplete' is not supported in synchronous client except through callback receive."); + enableAutoComplete = false; + } else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) { + throw logger.logExceptionAsError(new IllegalStateException( + "'enableAutoComplete' is not valid for RECEIVE_AND_DELETE mode.")); } - if (maxConcurrentSessions < 1) { - throw logger.logExceptionAsError( - new IllegalArgumentException("Maximum number of concurrent sessions must be positive.")); + if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) { + maxAutoLockRenewDuration = Duration.ZERO; } - return CoreUtils.isNullOrEmpty(sessionId); + final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer); + final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, + maxAutoLockRenewDuration, enableAutoComplete, null, maxConcurrentSessions); + + return new ServiceBusSessionReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), + entityPath, entityType, receiverOptions, connectionProcessor, tracerProvider, messageSerializer, + ServiceBusClientBuilder.this::onClientClose); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 6151a7387dc2..d7daa1471044 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -63,21 +63,17 @@ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receiveWithReceiveAndDeleteMode} * *

Receive messages from a specific session

- *

To fetch messages from a specific session, set {@link ServiceBusSessionReceiverClientBuilder#sessionId(String)}. + *

To fetch messages from a specific session, switch to {@link ServiceBusSessionReceiverClientBuilder} and + * build the session receiver client. Use {@link ServiceBusSessionReceiverAsyncClient#acceptSession(String)} to create a + * session-bound {@link ServiceBusReceiverAsyncClient}. *

* {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId} * - *

Process messages from multiple sessions

- *

To process messages from multiple sessions, set - * {@link ServiceBusSessionReceiverClientBuilder#maxConcurrentSessions(int)}. This will process in parallel at most - * {@code maxConcurrentSessions}. In addition, when all the messages in a session have been consumed, it will find the - * next available session to process.

- * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#multiplesessions} - * *

Process messages from the first available session

*

To process messages from the first available session, switch to {@link ServiceBusSessionReceiverClientBuilder} and - * build the receiver client. It will find the first available session to process messages from.

- * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#singlesession} + * build the session receiver client. Use {@link ServiceBusSessionReceiverAsyncClient#acceptNextSession()} to + * find the first available session to process messages from.

+ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession} * *

Rate limiting consumption of messages from Service Bus resource

*

For message receivers that need to limit the number of messages they receive at a given time, they can use @@ -374,28 +370,13 @@ public Mono deadLetter(ServiceBusReceivedMessage message, DeadLetterOption } /** - * Gets the state of a session given its identifier. - * - * @param sessionId Identifier of session to get. + * Gets the state of the session if this receiver is a session receiver. * * @return The session state or an empty Mono if there is no state set for the session. * @throws IllegalStateException if the receiver is a non-session receiver. */ - public Mono getSessionState(String sessionId) { - if (isDisposed.get()) { - return monoError(logger, new IllegalStateException( - String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getSessionState"))); - } else if (!receiverOptions.isSessionReceiver()) { - return monoError(logger, new IllegalStateException("Cannot get session state on a non-session receiver.")); - } - - if (sessionManager != null) { - return sessionManager.getSessionState(sessionId); - } else { - return connectionProcessor - .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) - .flatMap(channel -> channel.getSessionState(sessionId, getLinkName(sessionId))); - } + public Mono getSessionState() { + return getSessionState(receiverOptions.getSessionId()); } /** @@ -421,12 +402,11 @@ public Mono peekMessage() { * @throws IllegalStateException if the receiver is disposed. * @see Message browsing */ - public Mono peekMessage(String sessionId) { + Mono peekMessage(String sessionId) { if (isDisposed.get()) { return monoError(logger, new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peek"))); } - return connectionProcessor .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) .flatMap(channel -> { @@ -467,12 +447,11 @@ public Mono peekMessageAt(long sequenceNumber) { * @return A peeked {@link ServiceBusReceivedMessage}. * @see Message browsing */ - public Mono peekMessageAt(long sequenceNumber, String sessionId) { + Mono peekMessageAt(long sequenceNumber, String sessionId) { if (isDisposed.get()) { return monoError(logger, new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekAt"))); } - return connectionProcessor .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) .flatMap(node -> node.peek(sequenceNumber, sessionId, getLinkName(sessionId))); @@ -501,7 +480,7 @@ public Flux peekMessages(int maxMessages) { * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public Flux peekMessages(int maxMessages, String sessionId) { + Flux peekMessages(int maxMessages, String sessionId) { if (isDisposed.get()) { return fluxError(logger, new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatch"))); @@ -564,7 +543,7 @@ public Flux peekMessagesAt(int maxMessages, long sequ * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public Flux peekMessagesAt(int maxMessages, long sequenceNumber, String sessionId) { + Flux peekMessagesAt(int maxMessages, long sequenceNumber, String sessionId) { if (isDisposed.get()) { return fluxError(logger, new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatchAt"))); @@ -637,7 +616,7 @@ public Mono receiveDeferredMessage(long sequenceNumbe * * @return A deferred message with the matching {@code sequenceNumber}. */ - public Mono receiveDeferredMessage(long sequenceNumber, String sessionId) { + Mono receiveDeferredMessage(long sequenceNumber, String sessionId) { return connectionProcessor .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) .flatMap(node -> node.receiveDeferredMessages(receiverOptions.getReceiveMode(), @@ -677,13 +656,12 @@ public Flux receiveDeferredMessages(Iterable se * * @return An {@link IterableStream} of deferred {@link ServiceBusReceivedMessage messages}. */ - public Flux receiveDeferredMessages(Iterable sequenceNumbers, + Flux receiveDeferredMessages(Iterable sequenceNumbers, String sessionId) { if (isDisposed.get()) { return fluxError(logger, new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "receiveDeferredMessageBatch"))); } - return connectionProcessor .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) .flatMapMany(node -> node.receiveDeferredMessages(receiverOptions.getReceiveMode(), @@ -794,34 +772,18 @@ public Mono renewMessageLock(ServiceBusReceivedMessage message, Duration m } /** - * Renews the session lock. - * - * @param sessionId Identifier of session to get. + * Renews the session lock if this receiver is a session receiver. * * @return The next expiration time for the session lock. * @throws IllegalStateException if the receiver is a non-session receiver. */ - public Mono renewSessionLock(String sessionId) { - if (isDisposed.get()) { - return monoError(logger, new IllegalStateException( - String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "renewSessionLock"))); - } else if (!receiverOptions.isSessionReceiver()) { - return monoError(logger, new IllegalStateException("Cannot renew session lock on a non-session receiver.")); - } - - final String linkName = sessionManager != null - ? sessionManager.getLinkName(sessionId) - : null; - - return connectionProcessor - .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) - .flatMap(channel -> channel.renewSessionLock(sessionId, linkName)); + public Mono renewSessionLock() { + return renewSessionLock(receiverOptions.getSessionId()); } /** - * Starts the auto lock renewal for a session id. + * Starts the auto lock renewal for the session this receiver works for. * - * @param sessionId Id for the session to renew. * @param maxLockRenewalDuration Maximum duration to keep renewing the session lock. * * @return A lock renewal operation for the message. @@ -829,55 +791,20 @@ public Mono renewSessionLock(String sessionId) { * @throws IllegalArgumentException if {@code sessionId} is an empty string. * @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed. */ - public Mono renewSessionLock(String sessionId, Duration maxLockRenewalDuration) { - if (isDisposed.get()) { - return monoError(logger, new IllegalStateException( - String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewSessionLock"))); - } else if (!receiverOptions.isSessionReceiver()) { - return monoError(logger, new IllegalStateException( - "Cannot renew session lock on a non-session receiver.")); - } else if (maxLockRenewalDuration == null) { - return monoError(logger, new NullPointerException("'maxLockRenewalDuration' cannot be null.")); - } else if (maxLockRenewalDuration.isNegative()) { - return monoError(logger, new IllegalArgumentException( - "'maxLockRenewalDuration' cannot be negative.")); - } else if (Objects.isNull(sessionId)) { - return monoError(logger, new NullPointerException("'sessionId' cannot be null.")); - } else if (sessionId.isEmpty()) { - return monoError(logger, new IllegalArgumentException("'sessionId' cannot be empty.")); - } - - final LockRenewalOperation operation = new LockRenewalOperation(sessionId, maxLockRenewalDuration, true, - this::renewSessionLock); - - renewalContainer.addOrUpdate(sessionId, OffsetDateTime.now().plus(maxLockRenewalDuration), operation); - return operation.getCompletionOperation(); + public Mono renewSessionLock(Duration maxLockRenewalDuration) { + return this.renewSessionLock(receiverOptions.getSessionId(), maxLockRenewalDuration); } /** - * Sets the state of a session given its identifier. + * Sets the state of the session this receiver works for. * - * @param sessionId Identifier of session to get. * @param sessionState State to set on the session. * * @return A Mono that completes when the session is set * @throws IllegalStateException if the receiver is a non-session receiver. */ - public Mono setSessionState(String sessionId, byte[] sessionState) { - if (isDisposed.get()) { - return monoError(logger, new IllegalStateException( - String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "setSessionState"))); - } else if (!receiverOptions.isSessionReceiver()) { - return monoError(logger, new IllegalStateException("Cannot set session state on a non-session receiver.")); - } - - final String linkName = sessionManager != null - ? sessionManager.getLinkName(sessionId) - : null; - - return connectionProcessor - .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) - .flatMap(channel -> channel.setSessionState(sessionId, sessionState, linkName)); + public Mono setSessionState(byte[] sessionState) { + return this.setSessionState(receiverOptions.getSessionId(), sessionState); } /** @@ -1136,4 +1063,76 @@ private String getLinkName(String sessionId) { return existing != null ? existing.getLinkName() : null; } } + + Mono renewSessionLock(String sessionId) { + if (isDisposed.get()) { + return monoError(logger, new IllegalStateException( + String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "renewSessionLock"))); + } else if (!receiverOptions.isSessionReceiver()) { + return monoError(logger, new IllegalStateException("Cannot renew session lock on a non-session receiver.")); + } + final String linkName = sessionManager != null + ? sessionManager.getLinkName(sessionId) + : null; + + return connectionProcessor + .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) + .flatMap(channel -> channel.renewSessionLock(sessionId, linkName)); + } + + Mono renewSessionLock(String sessionId, Duration maxLockRenewalDuration) { + if (isDisposed.get()) { + return monoError(logger, new IllegalStateException( + String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewSessionLock"))); + } else if (!receiverOptions.isSessionReceiver()) { + return monoError(logger, new IllegalStateException( + "Cannot renew session lock on a non-session receiver.")); + } else if (maxLockRenewalDuration == null) { + return monoError(logger, new NullPointerException("'maxLockRenewalDuration' cannot be null.")); + } else if (maxLockRenewalDuration.isNegative()) { + return monoError(logger, new IllegalArgumentException( + "'maxLockRenewalDuration' cannot be negative.")); + } else if (Objects.isNull(sessionId)) { + return monoError(logger, new NullPointerException("'sessionId' cannot be null.")); + } else if (sessionId.isEmpty()) { + return monoError(logger, new IllegalArgumentException("'sessionId' cannot be empty.")); + } + final LockRenewalOperation operation = new LockRenewalOperation(sessionId, + maxLockRenewalDuration, true, this::renewSessionLock); + + renewalContainer.addOrUpdate(sessionId, OffsetDateTime.now().plus(maxLockRenewalDuration), operation); + return operation.getCompletionOperation(); + } + + Mono setSessionState(String sessionId, byte[] sessionState) { + if (isDisposed.get()) { + return monoError(logger, new IllegalStateException( + String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "setSessionState"))); + } else if (!receiverOptions.isSessionReceiver()) { + return monoError(logger, new IllegalStateException("Cannot set session state on a non-session receiver.")); + } + final String linkName = sessionManager != null + ? sessionManager.getLinkName(sessionId) + : null; + + return connectionProcessor + .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) + .flatMap(channel -> channel.setSessionState(sessionId, sessionState, linkName)); + } + + Mono getSessionState(String sessionId) { + if (isDisposed.get()) { + return monoError(logger, new IllegalStateException( + String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getSessionState"))); + } else if (!receiverOptions.isSessionReceiver()) { + return monoError(logger, new IllegalStateException("Cannot get session state on a non-session receiver.")); + } + if (sessionManager != null) { + return sessionManager.getSessionState(sessionId); + } else { + return connectionProcessor + .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) + .flatMap(channel -> channel.getSessionState(sessionId, getLinkName(sessionId))); + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index 38c550a96a15..66acf0c326d5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -210,15 +210,13 @@ public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions opti } /** - * Gets the state of a session given its identifier. - * - * @param sessionId Identifier of session to get. + * Gets the state of the session if this receiver is a session receiver. * * @return The session state or null if there is no state set for the session. * @throws IllegalStateException if the receiver is a non-session receiver. */ - public byte[] getSessionState(String sessionId) { - return asyncClient.getSessionState(sessionId).block(operationTimeout); + public byte[] getSessionState() { + return this.getSessionState(asyncClient.getReceiverOptions().getSessionId()); } /** @@ -230,7 +228,7 @@ public byte[] getSessionState(String sessionId) { * @see Message browsing */ public ServiceBusReceivedMessage peekMessage() { - return asyncClient.peekMessage().block(operationTimeout); + return this.peekMessage(asyncClient.getReceiverOptions().getSessionId()); } /** @@ -243,10 +241,9 @@ public ServiceBusReceivedMessage peekMessage() { * @return A peeked {@link ServiceBusReceivedMessage}. * @see Message browsing */ - public ServiceBusReceivedMessage peekMessage(String sessionId) { + ServiceBusReceivedMessage peekMessage(String sessionId) { return asyncClient.peekMessage(sessionId).block(operationTimeout); } - /** * Starting from the given sequence number, reads next the active message without changing the state of the receiver * or the message source. @@ -257,7 +254,7 @@ public ServiceBusReceivedMessage peekMessage(String sessionId) { * @see Message browsing */ public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber) { - return asyncClient.peekMessageAt(sequenceNumber).block(operationTimeout); + return this.peekMessageAt(sequenceNumber, asyncClient.getReceiverOptions().getSessionId()); } /** @@ -270,7 +267,7 @@ public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber) { * @return A peeked {@link ServiceBusReceivedMessage}. * @see Message browsing */ - public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber, String sessionId) { + ServiceBusReceivedMessage peekMessageAt(long sequenceNumber, String sessionId) { return asyncClient.peekMessageAt(sequenceNumber, sessionId).block(operationTimeout); } @@ -284,18 +281,7 @@ public ServiceBusReceivedMessage peekMessageAt(long sequenceNumber, String sessi * @see Message browsing */ public IterableStream peekMessages(int maxMessages) { - if (maxMessages <= 0) { - throw logger.logExceptionAsError(new IllegalArgumentException( - "'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages)); - } - - final Flux messages = asyncClient.peekMessages(maxMessages) - .timeout(operationTimeout); - - // Subscribe so we can kick off this operation. - messages.subscribe(); - - return new IterableStream<>(messages); + return this.peekMessages(maxMessages, asyncClient.getReceiverOptions().getSessionId()); } /** @@ -308,7 +294,7 @@ public IterableStream peekMessages(int maxMessages) { * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public IterableStream peekMessages(int maxMessages, String sessionId) { + IterableStream peekMessages(int maxMessages, String sessionId) { if (maxMessages <= 0) { throw logger.logExceptionAsError(new IllegalArgumentException( "'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages)); @@ -335,18 +321,7 @@ public IterableStream peekMessages(int maxMessages, S * @see Message browsing */ public IterableStream peekMessagesAt(int maxMessages, long sequenceNumber) { - if (maxMessages <= 0) { - throw logger.logExceptionAsError(new IllegalArgumentException( - "'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages)); - } - - final Flux messages = asyncClient.peekMessagesAt(maxMessages, sequenceNumber) - .timeout(operationTimeout); - - // Subscribe so we can kick off this operation. - messages.subscribe(); - - return new IterableStream<>(messages); + return this.peekMessagesAt(maxMessages, sequenceNumber, asyncClient.getReceiverOptions().getSessionId()); } /** @@ -361,7 +336,7 @@ public IterableStream peekMessagesAt(int maxMessages, * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public IterableStream peekMessagesAt(int maxMessages, long sequenceNumber, + IterableStream peekMessagesAt(int maxMessages, long sequenceNumber, String sessionId) { if (maxMessages <= 0) { throw logger.logExceptionAsError(new IllegalArgumentException( @@ -431,7 +406,7 @@ public IterableStream receiveMessages(int maxM * @return A deferred message with the matching {@code sequenceNumber}. */ public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber) { - return asyncClient.receiveDeferredMessage(sequenceNumber).block(operationTimeout); + return this.receiveDeferredMessage(sequenceNumber, asyncClient.getReceiverOptions().getSessionId()); } /** @@ -444,7 +419,7 @@ public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber) { * * @return A deferred message with the matching {@code sequenceNumber}. */ - public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber, String sessionId) { + ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber, String sessionId) { return asyncClient.receiveDeferredMessage(sequenceNumber, sessionId).block(operationTimeout); } @@ -457,13 +432,7 @@ public ServiceBusReceivedMessage receiveDeferredMessage(long sequenceNumber, Str * @return An {@link IterableStream} of deferred {@link ServiceBusReceivedMessage messages}. */ public IterableStream receiveDeferredMessageBatch(Iterable sequenceNumbers) { - final Flux messages = asyncClient.receiveDeferredMessages(sequenceNumbers) - .timeout(operationTimeout); - - // Subscribe so we can kick off this operation. - messages.subscribe(); - - return new IterableStream<>(messages); + return this.receiveDeferredMessageBatch(sequenceNumbers, asyncClient.getReceiverOptions().getSessionId()); } /** @@ -475,7 +444,7 @@ public IterableStream receiveDeferredMessageBatch(Ite * * @return An {@link IterableStream} of deferred {@link ServiceBusReceivedMessage messages}. */ - public IterableStream receiveDeferredMessageBatch(Iterable sequenceNumbers, + IterableStream receiveDeferredMessageBatch(Iterable sequenceNumbers, String sessionId) { final Flux messages = asyncClient.receiveDeferredMessages(sequenceNumbers, sessionId).timeout(operationTimeout); @@ -529,23 +498,20 @@ public void renewMessageLock(ServiceBusReceivedMessage message, Duration maxLock } /** - * Sets the state of a session given its identifier. - * - * @param sessionId Identifier of session to get. + * Sets the state of the session if this receiver is a session receiver. * * @return The next expiration time for the session lock. * @throws NullPointerException if {@code sessionId} is null. * @throws IllegalArgumentException if {@code sessionId} is an empty string. * @throws IllegalStateException if the receiver is a non-session receiver. */ - public OffsetDateTime renewSessionLock(String sessionId) { - return asyncClient.renewSessionLock(sessionId).block(operationTimeout); + public OffsetDateTime renewSessionLock() { + return this.renewSessionLock(asyncClient.getReceiverOptions().getSessionId()); } /** - * Starts the auto lock renewal for a session id. + * Starts the auto lock renewal for the session that this receiver works for. * - * @param sessionId Id for the session to renew. * @param maxLockRenewalDuration Maximum duration to keep renewing the session. * @param onError A function to call when an error occurs during lock renewal. * @@ -553,27 +519,19 @@ public OffsetDateTime renewSessionLock(String sessionId) { * @throws IllegalArgumentException if {@code sessionId} is an empty string. * @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed. */ - public void renewSessionLock(String sessionId, Duration maxLockRenewalDuration, Consumer onError) { - final Consumer throwableConsumer = onError != null - ? onError - : error -> logger.warning("Exception occurred while renewing session: '{}'.", sessionId, error); - - asyncClient.renewSessionLock(sessionId, maxLockRenewalDuration).subscribe( - v -> logger.verbose("Completed renewing session: '{}'", sessionId), - throwableConsumer, - () -> logger.verbose("Auto session lock renewal operation completed: {}", sessionId)); + public void renewSessionLock(Duration maxLockRenewalDuration, Consumer onError) { + this.renewSessionLock(asyncClient.getReceiverOptions().getSessionId(), maxLockRenewalDuration, onError); } /** - * Sets the state of a session given its identifier. + * Sets the state of the session if this receiver is a session receiver. * - * @param sessionId Identifier of session to get. * @param sessionState State to set on the session. * * @throws IllegalStateException if the receiver is a non-session receiver. */ - public void setSessionState(String sessionId, byte[] sessionState) { - asyncClient.setSessionState(sessionId, sessionState).block(operationTimeout); + public void setSessionState(byte[] sessionState) { + this.setSessionState(asyncClient.getReceiverOptions().getSessionId(), sessionState); } /** @@ -651,4 +609,27 @@ private void queueWork(int maximumMessageCount, Duration maxWaitTime, } logger.verbose("[{}] Receive request queued up.", work.getId()); } + + OffsetDateTime renewSessionLock(String sessionId) { + return asyncClient.renewSessionLock(sessionId).block(operationTimeout); + } + + void renewSessionLock(String sessionId, Duration maxLockRenewalDuration, Consumer onError) { + final Consumer throwableConsumer = onError != null + ? onError + : error -> logger.warning("Exception occurred while renewing session: '{}'.", sessionId, error); + + asyncClient.renewSessionLock(maxLockRenewalDuration).subscribe( + v -> logger.verbose("Completed renewing session: '{}'", sessionId), + throwableConsumer, + () -> logger.verbose("Auto session lock renewal operation completed: {}", sessionId)); + } + + void setSessionState(String sessionId, byte[] sessionState) { + asyncClient.setSessionState(sessionId, sessionState).block(operationTimeout); + } + + byte[] getSessionState(String sessionId) { + return asyncClient.getSessionState(sessionId).block(operationTimeout); + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index 11a80b5cbab8..26bb783d7b00 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -55,6 +55,7 @@ class ServiceBusSessionManager implements AutoCloseable { private final String entityPath; private final MessagingEntityType entityType; private final ReceiverOptions receiverOptions; + private final ServiceBusReceiveLink receiveLink; private final ServiceBusConnectionProcessor connectionProcessor; private final Duration operationTimeout; private final TracerProvider tracerProvider; @@ -77,7 +78,7 @@ class ServiceBusSessionManager implements AutoCloseable { ServiceBusSessionManager(String entityPath, MessagingEntityType entityType, ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider, - MessageSerializer messageSerializer, ReceiverOptions receiverOptions) { + MessageSerializer messageSerializer, ReceiverOptions receiverOptions, ServiceBusReceiveLink receiveLink) { this.entityPath = entityPath; this.entityType = entityType; this.receiverOptions = receiverOptions; @@ -103,6 +104,14 @@ class ServiceBusSessionManager implements AutoCloseable { this.processor = EmitterProcessor.create(numberOfSchedulers, false); this.sessionReceiveSink = processor.sink(); + this.receiveLink = receiveLink; + } + + ServiceBusSessionManager(String entityPath, MessagingEntityType entityType, + ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider, + MessageSerializer messageSerializer, ReceiverOptions receiverOptions) { + this(entityPath, entityType, connectionProcessor, tracerProvider, + messageSerializer, receiverOptions, null); } /** @@ -247,7 +256,10 @@ private Mono createSessionReceiveLink() { * @return A Mono that completes when an unnamed session becomes available. * @throws AmqpException if the session manager is already disposed. */ - private Mono getActiveLink() { + Mono getActiveLink() { + if (this.receiveLink != null) { + return Mono.just(this.receiveLink); + } return Mono.defer(() -> createSessionReceiveLink() .flatMap(link -> link.getEndpointStates() .takeUntil(e -> e == AmqpEndpointState.ACTIVE) @@ -282,7 +294,7 @@ private Mono getActiveLink() { */ private Flux getSession(Scheduler scheduler, boolean disposeOnIdle) { return getActiveLink().flatMap(link -> link.getSessionId() - .map(linkName -> sessionReceivers.compute(linkName, (key, existing) -> { + .map(sessionId -> sessionReceivers.compute(sessionId, (key, existing) -> { if (existing != null) { return existing; } @@ -290,7 +302,7 @@ private Flux getSession(Scheduler scheduler, b receiverOptions.getPrefetchCount(), disposeOnIdle, scheduler, this::renewSessionLock, maxSessionLockRenewDuration); }))) - .flatMapMany(session -> session.receive().doFinally(signalType -> { + .flatMapMany(sessionReceiver -> sessionReceiver.receive().doFinally(signalType -> { logger.verbose("Adding scheduler back to pool."); availableSchedulers.push(scheduler); if (receiverOptions.isRollingSessionReceiver()) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java new file mode 100644 index 000000000000..3594b8aca09e --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClient.java @@ -0,0 +1,129 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.TracerProvider; +import com.azure.core.annotation.ReturnType; +import com.azure.core.annotation.ServiceClient; +import com.azure.core.annotation.ServiceMethod; +import com.azure.core.util.CoreUtils; +import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.servicebus.implementation.MessagingEntityType; +import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor; +import com.azure.messaging.servicebus.implementation.ServiceBusConstants; +import reactor.core.publisher.Mono; + +import java.util.Objects; + +import static com.azure.core.util.FluxUtil.monoError; + +/** + * This session receiver client is used to acquire session locks from a queue or topic and create + * {@link ServiceBusReceiverAsyncClient} instances that are tied to the locked sessions. + * + * Use {@link #acceptSession(String)} to acquire the lock of a session if you know the session id. + * + * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession} + * + * Use {@link #acceptNextSession()} to acquire the lock of the next available session without specifying the session id. + * + * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId} + * + */ +@ServiceClient(builder = ServiceBusClientBuilder.class, isAsync = true) +public final class ServiceBusSessionReceiverAsyncClient implements AutoCloseable { + private final String fullyQualifiedNamespace; + private final String entityPath; + private final MessagingEntityType entityType; + private final ReceiverOptions receiverOptions; + private final ServiceBusConnectionProcessor connectionProcessor; + private final TracerProvider tracerProvider; + private final MessageSerializer messageSerializer; + private final Runnable onClientClose; + private final ServiceBusSessionManager unNamedSessionManager; // for acceptNextSession() + private final ClientLogger logger = new ClientLogger(ServiceBusSessionReceiverAsyncClient.class); + + ServiceBusSessionReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, + MessagingEntityType entityType, ReceiverOptions receiverOptions, + ServiceBusConnectionProcessor connectionProcessor, TracerProvider tracerProvider, + MessageSerializer messageSerializer, Runnable onClientClose) { + this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, + "'fullyQualifiedNamespace' cannot be null."); + this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null."); + this.entityType = Objects.requireNonNull(entityType, "'entityType' cannot be null."); + this.receiverOptions = Objects.requireNonNull(receiverOptions, "'receiveOptions cannot be null.'"); + this.connectionProcessor = Objects.requireNonNull(connectionProcessor, "'connectionProcessor' cannot be null."); + this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null."); + this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null."); + this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null."); + this.unNamedSessionManager = new ServiceBusSessionManager(entityPath, entityType, connectionProcessor, + tracerProvider, messageSerializer, receiverOptions); + } + + /** + * Acquires a session lock for the next available session and create a {@link ServiceBusReceiverAsyncClient} + * to receive messages from the session. It will wait until a session is available if no one is available + * immediately. + * + * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession} + * + * @return A {@link ServiceBusReceiverAsyncClient} that is tied to the available session. + * @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono acceptNextSession() { + return unNamedSessionManager.getActiveLink().flatMap(receiveLink -> receiveLink.getSessionId() + .map(sessionId -> { + final ReceiverOptions newReceiverOptions = new ReceiverOptions(receiverOptions.getReceiveMode(), + receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(), + receiverOptions.isAutoLockRenewEnabled(), sessionId, null); + final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath, + entityType, connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions, + receiveLink); + return new ServiceBusReceiverAsyncClient(fullyQualifiedNamespace, entityPath, + entityType, newReceiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT, + tracerProvider, messageSerializer, () -> { }, sessionSpecificManager); + })); + } + + /** + * Acquires a session lock for {@code sessionId} and create a {@link ServiceBusReceiverAsyncClient} + * to receive messages from the session. If the session is already locked by another client, an + * {@link com.azure.core.amqp.exception.AmqpException} is thrown. + * + * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId} + * + * @param sessionId The session Id. + * @return A {@link ServiceBusReceiverAsyncClient} that is tied to the specified session. + * @throws NullPointerException if {@code sessionId} is null. + * @throws IllegalArgumentException if {@code sessionId} is empty. + * @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled. + * @throws com.azure.core.amqp.exception.AmqpException if the lock cannot be acquired. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public Mono acceptSession(String sessionId) { + if (sessionId == null) { + return monoError(logger, new NullPointerException("'sessionId' cannot be null")); + } + if (CoreUtils.isNullOrEmpty(sessionId)) { + return monoError(logger, new IllegalArgumentException("'sessionId' cannot be empty")); + } + final ReceiverOptions newReceiverOptions = new ReceiverOptions(receiverOptions.getReceiveMode(), + receiverOptions.getPrefetchCount(), receiverOptions.getMaxLockRenewDuration(), + receiverOptions.isAutoLockRenewEnabled(), sessionId, null); + final ServiceBusSessionManager sessionSpecificManager = new ServiceBusSessionManager(entityPath, entityType, + connectionProcessor, tracerProvider, messageSerializer, newReceiverOptions); + + return sessionSpecificManager.getActiveLink().map(receiveLink -> new ServiceBusReceiverAsyncClient( + fullyQualifiedNamespace, entityPath, entityType, newReceiverOptions, connectionProcessor, + ServiceBusConstants.OPERATION_TIMEOUT, tracerProvider, messageSerializer, () -> { }, + sessionSpecificManager)); + } + + @Override + public void close() { + this.onClientClose.run(); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java new file mode 100644 index 000000000000..57078cfc28ee --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClient.java @@ -0,0 +1,83 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.annotation.ReturnType; +import com.azure.core.annotation.ServiceClient; +import com.azure.core.annotation.ServiceMethod; + +import java.time.Duration; +import java.util.Objects; + +/** + * This session receiver client is used to acquire session locks from a queue or topic and create + * {@link ServiceBusReceiverClient} instances that are tied to the locked sessions. + * Use {@link #acceptSession(String)} to acquire the lock of a session if you know the session id. + * + * {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession} + * + * Use {@link #acceptNextSession()} to acquire the lock of the next available session without specifying the session id. + * + * {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId} + * + */ +@ServiceClient(builder = ServiceBusClientBuilder.class) +public final class ServiceBusSessionReceiverClient implements AutoCloseable { + private final ServiceBusSessionReceiverAsyncClient sessionAsyncClient; + private final Duration operationTimeout; + + ServiceBusSessionReceiverClient(ServiceBusSessionReceiverAsyncClient asyncClient, Duration operationTimeout) { + this.sessionAsyncClient = Objects.requireNonNull(asyncClient, "'asyncClient' cannot be null."); + this.operationTimeout = operationTimeout; + } + + /** + * Acquires a session lock for the next available session and create a {@link ServiceBusReceiverClient} + * to receive messages from the session. It will wait until a session is available if no one is available + * immediately. + * + * {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession} + * + * @return A {@link ServiceBusReceiverClient} that is tied to the available session. + * @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled. + * @throws IllegalStateException if the operation times out. The timeout duration is the tryTimeout + * of when you build this client with the + * {@link ServiceBusClientBuilder#retryOptions(com.azure.core.amqp.AmqpRetryOptions)}. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public ServiceBusReceiverClient acceptNextSession() { + return sessionAsyncClient.acceptNextSession() + .map(asyncClient -> new ServiceBusReceiverClient(asyncClient, operationTimeout)) + .block(operationTimeout); + } + + /** + * Acquires a session lock for {@code sessionId} and create a {@link ServiceBusReceiverClient} + * to receive messages from the session. If the session is already locked by another client, an + * {@link com.azure.core.amqp.exception.AmqpException} is thrown immediately. + * + * {@codesnippet com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId} + * + * @param sessionId The session Id. + * @return A {@link ServiceBusReceiverClient} that is tied to the specified session. + * @throws NullPointerException if {@code sessionId} is null. + * @throws IllegalArgumentException if {@code sessionId} is empty. + * @throws UnsupportedOperationException if the queue or topic subscription is not session-enabled. + * @throws com.azure.core.amqp.exception.AmqpException if the lock cannot be acquired. + * @throws IllegalStateException if the operation times out. The timeout duration is the tryTimeout + * of when you build this client with the + * {@link ServiceBusClientBuilder#retryOptions(com.azure.core.amqp.AmqpRetryOptions)}. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public ServiceBusReceiverClient acceptSession(String sessionId) { + return sessionAsyncClient.acceptSession(sessionId) + .map(asyncClient -> new ServiceBusReceiverClient(asyncClient, operationTimeout)) + .block(operationTimeout); + } + + @Override + public void close() { + sessionAsyncClient.close(); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java index 49671a2fdb02..5c7ca158388f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/Messages.java @@ -20,7 +20,6 @@ public class Messages { public static final String CLASS_NOT_A_SUPPORTED_TYPE = getMessage("CLASS_NOT_A_SUPPORTED_TYPE"); public static final String INVALID_OPERATION_DISPOSED_RECEIVER = getMessage("INVALID_OPERATION_DISPOSED_RECEIVER"); - public static final String INVALID_LOCK_TOKEN_STRING = getMessage("INVALID_LOCK_TOKEN_STRING"); public static final String MESSAGE_NOT_OF_TYPE = getMessage("MESSAGE_NOT_OF_TYPE"); public static final String REQUEST_VALUE_NOT_VALID = getMessage("REQUEST_VALUE_NOT_VALID"); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java index 9dc191505b66..40433c6e00a1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReadmeSamples.java @@ -9,6 +9,7 @@ import com.azure.messaging.servicebus.models.ReceiveMode; import com.azure.messaging.servicebus.models.SubQueue; import reactor.core.Disposable; +import reactor.core.publisher.Mono; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -173,12 +174,12 @@ public void createSessionMessage() { */ public void namedSessionReceiver() { // Creates a session-enabled receiver that gets messages from the session "greetings". - ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sessionReceiver() .queueName("<< QUEUE NAME >>") - .sessionId("greetings") .buildAsyncClient(); + Mono receiverAsyncClient = sessionReceiver.acceptSession("greetings"); } /** @@ -186,11 +187,12 @@ public void namedSessionReceiver() { */ public void unnamedSessionReceiver() { // Creates a session-enabled receiver that gets messages from the first available session. - ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>") .sessionReceiver() .queueName("<< QUEUE NAME >>") .buildAsyncClient(); + Mono receiverAsyncClient = sessionReceiver.acceptNextSession(); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java deleted file mode 100644 index 87f06985bbe1..000000000000 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.messaging.servicebus; - -import reactor.core.Disposable; -import reactor.core.publisher.Mono; - -import java.security.SecureRandom; -import java.util.concurrent.TimeUnit; - -/** - * Sample demonstrates how to receive and process multiple sessions. In the sample, at most 3 sessions are processed - * concurrently. When there are no more messages in a session, the receiver finds the next available session to - * process. - */ -public class ReceiveMultipleSessionsAsyncSample { - private static final SecureRandom RANDOM = new SecureRandom(); - - /** - * Main method to invoke this demo on how to receive messages from multiple sessions in an Azure Service Bus Queue. - * - * @param args Unused arguments to the program. - * - * @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete. - */ - public static void main(String[] args) throws InterruptedException { - - // The connection string value can be obtained by: - // 1. Going to your Service Bus namespace in Azure Portal. - // 2. Go to "Shared access policies" - // 3. Copy the connection string for the "RootManageSharedAccessKey" policy. - String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" - + "SharedAccessKey={key}"; - - // Create a receiver. - // "<>" will be the name of the Service Bus session-enabled queue instance you created inside the - // Service Bus namespace. - ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() - .connectionString(connectionString) - .sessionReceiver() - .maxConcurrentSessions(3) - .queueName("<>") - .buildAsyncClient(); - - Disposable subscription = receiver.receiveMessages() - .flatMap(context -> { - if (context.hasError()) { - System.out.printf("An error occurred in session %s. Error: %s%n", - context.getSessionId(), context.getThrowable()); - - return Mono.empty(); - } - - System.out.println("Processing message from session: " + context.getSessionId()); - - // Change the `messageProcessed` according to you business logic and if you are able to process the - // message successfully. In this case, we randomly get a boolean to determine if processing was - // successful or not. - boolean messageProcessed = RANDOM.nextBoolean(); - if (messageProcessed) { - return receiver.complete(context.getMessage()); - } else { - return receiver.abandon(context.getMessage()); - } - }).subscribe(); - - // Subscribe is not a blocking call so we sleep here so the program does not end. - TimeUnit.SECONDS.sleep(60); - - // Disposing of the subscription will cancel the receive() operation. - subscription.dispose(); - - // Close the receiver. - receiver.close(); - } -} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java index 0c764c0774c8..cec638475786 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java @@ -33,15 +33,15 @@ public static void main(String[] args) throws InterruptedException { // Create a receiver. // "<>" will be the name of the Service Bus session-enabled queue instance you created inside the // Service Bus namespace. - ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() - .sessionId("greetings") .receiveMode(ReceiveMode.PEEK_LOCK) .queueName("<>") .buildAsyncClient(); - Disposable subscription = receiver.receiveMessages() + Mono receiverMono = sessionReceiver.acceptSession("greetings"); + Disposable subscription = receiverMono.flatMapMany(receiver -> receiver.receiveMessages() .flatMap(context -> { if (context.hasError()) { System.out.printf("An error occurred in session %s. Error: %s%n", @@ -52,18 +52,19 @@ public static void main(String[] args) throws InterruptedException { System.out.println("Processing message from session: " + context.getSessionId()); // Process message then complete it. - return receiver.complete(context.getMessage()); - }) + //return receiver.complete(context.getMessage()); + return Mono.empty(); + })) .subscribe(aVoid -> { }, error -> System.err.println("Error occurred: " + error)); // Subscribe is not a blocking call so we sleep here so the program does not end. - TimeUnit.SECONDS.sleep(60); + TimeUnit.SECONDS.sleep(10); // Disposing of the subscription will cancel the receive() operation. subscription.dispose(); // Close the receiver. - receiver.close(); + sessionReceiver.close(); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionSample.java index 7e71405c23bf..4d2a824c8782 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionSample.java @@ -39,13 +39,12 @@ public static void main(String[] args) { // Create a receiver. // "<>" will be the name of the Service Bus session-enabled queue instance you created inside the // Service Bus namespace. - ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() + ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() - .sessionId("greetings") .queueName("<>") .buildClient(); - + ServiceBusReceiverClient receiver = sessionReceiver.acceptSession("greetings"); while (isRunning.get()) { IterableStream messages = receiver.receiveMessages(10, Duration.ofSeconds(30)); @@ -64,7 +63,7 @@ public static void main(String[] args) { } // Close the receiver. - receiver.close(); + sessionReceiver.close(); } private static boolean processMessage(ServiceBusReceivedMessage message) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java index c7e178ca2327..fa9d8ef9cc6e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java @@ -32,7 +32,7 @@ public static void main(String[] args) throws InterruptedException { // Create a receiver. // "<>" will be the name of the Service Bus topic you created inside the Service Bus namespace. // "<>" will be the name of the session-enabled subscription. - ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString(connectionString) .sessionReceiver() .receiveMode(ReceiveMode.PEEK_LOCK) @@ -40,7 +40,8 @@ public static void main(String[] args) throws InterruptedException { .subscriptionName("<>") .buildAsyncClient(); - Disposable subscription = receiver.receiveMessages() + Mono receiverMono = sessionReceiver.acceptNextSession(); + Disposable subscription = receiverMono.flatMapMany(receiver -> receiver.receiveMessages() .flatMap(context -> { if (context.hasError()) { System.out.printf("An error occurred in session %s. Error: %s%n", @@ -52,7 +53,7 @@ public static void main(String[] args) throws InterruptedException { // Process message return receiver.complete(context.getMessage()); - }).subscribe(aVoid -> { + })).subscribe(aVoid -> { }, error -> System.err.println("Error occurred: " + error)); // Subscribe is not a blocking call so we sleep here so the program does not end. @@ -62,6 +63,6 @@ public static void main(String[] args) throws InterruptedException { subscription.dispose(); // Close the receiver. - receiver.close(); + sessionReceiver.close(); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java index 23c579d46280..cfabe68fa679 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java @@ -50,10 +50,9 @@ public static void main(String[] args) throws InterruptedException { .buildAsyncClient(); // Instantiate a client that will be used to receive messages from the session. - ServiceBusReceiverAsyncClient receiver = builder.sessionReceiver() + ServiceBusSessionReceiverAsyncClient sessionReceiver = builder.sessionReceiver() .receiveMode(ReceiveMode.PEEK_LOCK) .queueName(queueName) - .sessionId(sessionId) .buildAsyncClient(); List messages = Arrays.asList( @@ -82,7 +81,7 @@ public static void main(String[] args) throws InterruptedException { () -> System.out.println("Batch send complete.")); // After sending that message, we receive the messages for that sessionId. - receiver.receiveMessages().flatMap(context -> { + sessionReceiver.acceptSession(sessionId).flatMapMany(receiver -> receiver.receiveMessages().flatMap(context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.println("Received Message Id: " + message.getMessageId()); @@ -90,13 +89,13 @@ public static void main(String[] args) throws InterruptedException { System.out.println("Received Message: " + new String(message.getBody())); return receiver.complete(message); - }).subscribe(); + })).subscribe(); // subscribe() is not a blocking call. We sleep here so the program does not end before the send is complete. TimeUnit.SECONDS.sleep(10); // Close the sender and receiver. sender.close(); - receiver.close(); + sessionReceiver.close(); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java index ceef613e1d9f..2d4a63b83d71 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java @@ -145,52 +145,34 @@ public void receiveAll() { * Demonstrates how to create a session receiver for a single, first available session. */ public void sessionReceiverSingleInstantiation() { - // BEGIN: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#singlesession - ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + // BEGIN: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession + ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + "SharedAccessKey={key};EntityPath={eh-name}") .sessionReceiver() .queueName("<< QUEUE NAME >>") .buildAsyncClient(); - // END: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#singlesession + Mono receiverMono = sessionReceiver.acceptNextSession(); + // END: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#nextsession - receiver.close(); + sessionReceiver.close(); } /** - * Demonstrates how to create a session receiver for a specific session. + * Demonstrates how to create a session receiver for a single know session id. */ - public void sessionReceiverNamedInstantiation() { + public void sessionReceiverSessionIdInstantiation() { // BEGIN: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId - ServiceBusReceiverAsyncClient consumer = new ServiceBusClientBuilder() + ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder() .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + "SharedAccessKey={key};EntityPath={eh-name}") .sessionReceiver() - .topicName("<< TOPIC NAME >>") - .subscriptionName("<< SUBSCRIPTION NAME >>") - .sessionId("<< my-session-id >>") + .queueName("<< QUEUE NAME >>") .buildAsyncClient(); + Mono receiverMono = sessionReceiver.acceptSession("<< my-session-id >>"); // END: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId - consumer.close(); - } - - /** - * Demonstrates how to create a session receiver for processing multiple sessions. - */ - public void sessionReceiverMultipleInstantiation() { - // BEGIN: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#multiplesessions - ServiceBusReceiverAsyncClient consumer = new ServiceBusClientBuilder() - .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" - + "SharedAccessKey={key};EntityPath={eh-name}") - .sessionReceiver() - .topicName("<< TOPIC NAME >>") - .subscriptionName("<< SUBSCRIPTION NAME >>") - .maxConcurrentSessions(3) - .buildAsyncClient(); - // END: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#multiplesessions - - consumer.close(); + sessionReceiver.close(); } public void createTransaction() { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverClientJavaDocCodeSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverClientJavaDocCodeSample.java index 57da95ac8ff8..9732f361355f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverClientJavaDocCodeSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverClientJavaDocCodeSample.java @@ -24,4 +24,37 @@ public void instantiate() { receiver.close(); } + + /** + * Demonstrates how to create a session receiver for a single, first available session. + */ + public void sessionReceiverSingleInstantiation() { + // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession + ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder() + .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + + "SharedAccessKey={key};EntityPath={eh-name}") + .sessionReceiver() + .queueName("<< QUEUE NAME >>") + .buildClient(); + ServiceBusReceiverClient receiver = sessionReceiver.acceptNextSession(); + // END: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#nextsession + sessionReceiver.close(); + } + + /** + * Demonstrates how to create a session receiver for a single know session id. + */ + public void sessionReceiverSessionIdInstantiation() { + // BEGIN: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId + ServiceBusSessionReceiverClient sessionReceiver = new ServiceBusClientBuilder() + .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + + "SharedAccessKey={key};EntityPath={eh-name}") + .sessionReceiver() + .queueName("<< QUEUE NAME >>") + .buildClient(); + ServiceBusReceiverClient receiver = sessionReceiver.acceptSession("<< my-session-id >>"); + // END: com.azure.messaging.servicebus.servicebusreceiverclient.instantiation#sessionId + + sessionReceiver.close(); + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java index 4515af758819..98bc1ec05372 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusAsyncConsumerTest.java @@ -116,8 +116,7 @@ void receiveNoAutoComplete() { final Duration maxAutoLockRenewDuration = Duration.ofSeconds(0); final OffsetDateTime lockedUntil = OffsetDateTime.now().plusSeconds(3); final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.RECEIVE_AND_DELETE, prefetch, - maxAutoLockRenewDuration, false, "sessionId", false, - 1); + maxAutoLockRenewDuration, false, "sessionId", null); final ServiceBusAsyncConsumer consumer = new ServiceBusAsyncConsumer(LINK_NAME, linkProcessor, serializer, receiverOptions); @@ -164,8 +163,7 @@ void canDispose() { final OffsetDateTime lockedUntil = OffsetDateTime.now().plusSeconds(3); final String lockToken = UUID.randomUUID().toString(); final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.RECEIVE_AND_DELETE, prefetch, - maxAutoLockRenewDuration, false, "sessionId", false, - 1); + maxAutoLockRenewDuration, false, "sessionId", null); final ServiceBusAsyncConsumer consumer = new ServiceBusAsyncConsumer(LINK_NAME, linkProcessor, serializer, receiverOptions); @@ -205,8 +203,7 @@ void onError() { final OffsetDateTime lockedUntil = OffsetDateTime.now().plusSeconds(3); final String lockToken = UUID.randomUUID().toString(); final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.RECEIVE_AND_DELETE, prefetch, - maxAutoLockRenewDuration, false, "sessionId", false, - 1); + maxAutoLockRenewDuration, false, "sessionId", null); final ServiceBusAsyncConsumer consumer = new ServiceBusAsyncConsumer(LINK_NAME, linkProcessor, serializer, receiverOptions); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java index e107af34a79e..756e008f63aa 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java @@ -68,6 +68,7 @@ class ServiceBusReceiverAsyncClientIntegrationTest extends IntegrationTestBase { * Receiver used to clean up resources in {@link #afterTest()}. */ private ServiceBusReceiverAsyncClient receiveAndDeleteReceiver; + private Mono receiveAndDeleteReceiverMono; ServiceBusReceiverAsyncClientIntegrationTest() { super(new ClientLogger(ServiceBusReceiverAsyncClientIntegrationTest.class)); @@ -85,12 +86,19 @@ protected void afterTest() { final int pendingDeferred = messagesDeferredPending.size(); if (pending < 1 && pendingDeferred < 1) { dispose(receiver, sender, receiveAndDeleteReceiver); + if (receiveAndDeleteReceiverMono != null) { + dispose(receiveAndDeleteReceiverMono.block()); + } return; } // In the case that this test failed... we're going to drain the queue or subscription. try { + dispose(receiver, sender); if (pending > 0) { + if (receiveAndDeleteReceiverMono != null) { + receiveAndDeleteReceiver = receiveAndDeleteReceiverMono.block(); + } receiveAndDeleteReceiver.receiveMessages() .map(message -> { logger.info("Message received: {}", message.getMessage().getSequenceNumber()); @@ -118,7 +126,7 @@ protected void afterTest() { } catch (Exception e) { logger.warning("Error occurred when draining for deferred messages.", e); } finally { - dispose(receiver, sender, receiveAndDeleteReceiver); + dispose(receiveAndDeleteReceiver); } } @@ -312,19 +320,14 @@ void receiveTwoMessagesAutoComplete(MessagingEntityType entityType, boolean isSe if (isSessionEnabled) { assertNotNull(sessionId, "'sessionId' should have been set."); this.receiver = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) - .sessionId(sessionId) - .buildAsyncClient(); - this.receiveAndDeleteReceiver = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, + .buildAsyncClient().acceptSession(sessionId).block(); + this.receiveAndDeleteReceiverMono = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) - .sessionId(sessionId) .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) - .buildAsyncClient(); + .buildAsyncClient().acceptSession(sessionId); } else { this.receiver = getReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) .buildAsyncClient(); - this.receiveAndDeleteReceiver = getReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) - .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) - .buildAsyncClient(); } final String messageId = UUID.randomUUID().toString(); @@ -360,13 +363,11 @@ void receiveMessageAutoComplete(MessagingEntityType entityType, boolean isSessio if (isSessionEnabled) { assertNotNull(sessionId, "'sessionId' should have been set."); this.receiver = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) - .sessionId(sessionId) - .buildAsyncClient(); - this.receiveAndDeleteReceiver = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, + .buildAsyncClient().acceptSession(sessionId).block(); + this.receiveAndDeleteReceiverMono = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) - .sessionId(sessionId) .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) - .buildAsyncClient(); + .buildAsyncClient().acceptSession(sessionId); } else { this.receiver = getReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) .buildAsyncClient(); @@ -941,12 +942,12 @@ void setAndGetSessionState(MessagingEntityType entityType) { logger.info("SessionId: {}. LockToken: {}. LockedUntil: {}. Message received.", m.getSessionId(), m.getMessage().getLockToken(), m.getMessage().getLockedUntil()); receivedMessage.set(m.getMessage()); - return receiver.setSessionState(sessionId, sessionState); + return receiver.setSessionState(sessionState); })) .expectComplete() .verify(); - StepVerifier.create(receiver.getSessionState(sessionId)) + StepVerifier.create(receiver.getSessionState()) .assertNext(state -> { logger.info("State received: {}", new String(state, UTF_8)); assertArrayEquals(sessionState, state); @@ -1265,14 +1266,12 @@ private void setSenderAndReceiver(MessagingEntityType entityType, int entityInde assertNotNull(sessionId, "'sessionId' should have been set."); this.receiver = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) .disableAutoComplete() - .sessionId(sessionId) - .buildAsyncClient(); - this.receiveAndDeleteReceiver = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, + .buildAsyncClient().acceptSession(sessionId).block(); + this.receiveAndDeleteReceiverMono = getSessionReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) .disableAutoComplete() - .sessionId(sessionId) .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) - .buildAsyncClient(); + .buildAsyncClient().acceptSession(sessionId); } else { this.receiver = getReceiverBuilder(useCredentials, entityType, entityIndex, shareConnection) .disableAutoComplete() diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index 8948e8349c0d..bfdd9d0d7f64 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -178,7 +178,7 @@ CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, new AmqpRe sessionReceiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, false, "Some-Session", - false, null), + null), connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); } @@ -939,7 +939,7 @@ void autoCompleteMessageSessionReceiver() { final List messages = getMessages(); final String lockToken = UUID.randomUUID().toString(); final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, null, - true, "Some-Session", false, null); + true, "Some-Session", null); final ServiceBusReceiverAsyncClient sessionReceiver2 = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, receiverOptions, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java index 4cef38641f7e..daac1f6d6c42 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java @@ -17,6 +17,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import reactor.core.publisher.Mono; import java.time.Duration; import java.time.OffsetDateTime; @@ -54,6 +55,7 @@ class ServiceBusReceiverClientIntegrationTest extends IntegrationTestBase { * Receiver used to clean up resources in {@link #afterTest()}. */ private ServiceBusReceiverClient receiveAndDeleteReceiver; + private Mono receiveAndDeleteReceiverMono; protected ServiceBusReceiverClientIntegrationTest() { super(new ClientLogger(ServiceBusReceiverClientIntegrationTest.class)); @@ -69,11 +71,18 @@ protected void afterTest() { final int pending = messagesPending.get(); if (pending < 1 && messagesDeferred.get().size() < 1) { dispose(receiver, sender, receiveAndDeleteReceiver); + if (receiveAndDeleteReceiverMono != null) { + dispose(receiveAndDeleteReceiverMono.block()); + } return; } // In the case that this test failed... we're going to drain the queue or subscription. if (pending > 0) { + dispose(receiver, sender); + if (receiveAndDeleteReceiverMono != null) { + receiveAndDeleteReceiver = receiveAndDeleteReceiverMono.block(); + } try { IterableStream removedMessage = receiveAndDeleteReceiver.receiveMessages( pending, Duration.ofSeconds(15)); @@ -99,7 +108,7 @@ protected void afterTest() { } } - dispose(receiver, sender, receiveAndDeleteReceiver); + dispose(receiveAndDeleteReceiver); } /** @@ -732,11 +741,8 @@ void receiveAndDefer(MessagingEntityType entityType, boolean isSessionEnabled) { // cleanup final ServiceBusReceivedMessage deferred; - if (isSessionEnabled) { - deferred = receiver.receiveDeferredMessage(receivedMessage.getSequenceNumber(), sessionId); - } else { - deferred = receiver.receiveDeferredMessage(receivedMessage.getSequenceNumber()); - } + deferred = receiver.receiveDeferredMessage(receivedMessage.getSequenceNumber()); + receiver.complete(deferred); messagesPending.addAndGet(-maxMessages); } @@ -817,13 +823,11 @@ private void setSenderAndReceiver(MessagingEntityType entityType, int entityInde if (isSessionEnabled) { assertNotNull(sessionId, "'sessionId' should have been set."); this.receiver = getSessionReceiverBuilder(false, entityType, entityIndex, sharedConnection) - .sessionId(sessionId) - .buildClient(); - this.receiveAndDeleteReceiver = getSessionReceiverBuilder(false, entityType, entityIndex, + .buildClient().acceptSession(sessionId); + this.receiveAndDeleteReceiverMono = Mono.fromCallable(() -> getSessionReceiverBuilder(false, entityType, entityIndex, sharedConnection) - .sessionId(sessionId) .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) - .buildClient(); + .buildClient().acceptSession(sessionId)); } else { this.receiver = getReceiverBuilder(false, entityType, entityIndex, sharedConnection) .buildClient(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java index b026ff44c458..c975b62fa26e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java @@ -40,7 +40,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -55,6 +54,7 @@ class ServiceBusReceiverClientTest { private static final String NAMESPACE = "test-namespace"; private static final String ENTITY_PATH = "test-entity-path"; private static final String LOCK_TOKEN = UUID.randomUUID().toString(); + private static final String SESSION_ID = "test-session-id"; private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(5); @@ -72,6 +72,8 @@ class ServiceBusReceiverClientTest { private ServiceBusReceivedMessage message; @Mock private Consumer onErrorConsumer; + @Mock + private ReceiverOptions sessionReceiverOptions; @BeforeEach void setup() { @@ -79,7 +81,7 @@ void setup() { when(asyncClient.getEntityPath()).thenReturn(ENTITY_PATH); when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, null, false)); - + when(sessionReceiverOptions.getSessionId()).thenReturn(SESSION_ID); client = new ServiceBusReceiverClient(asyncClient, OPERATION_TIMEOUT); } @@ -221,13 +223,13 @@ void autoRenewSessionLock() { fail("On error should not have been invoked."); return null; }).when(onErrorConsumer).accept(any()); - when(asyncClient.renewSessionLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + when(asyncClient.renewSessionLock(maxDuration)).thenReturn(publisher.mono()); // Act - client.renewSessionLock(LOCK_TOKEN, maxDuration, onErrorConsumer); + client.renewSessionLock(maxDuration, onErrorConsumer); // Assert - verify(asyncClient).renewSessionLock(LOCK_TOKEN, maxDuration); + verify(asyncClient).renewSessionLock(maxDuration); } /** @@ -240,15 +242,15 @@ void autoRenewSessionLockFails() { final TestPublisher publisher = TestPublisher.create(); final Throwable testError = new IllegalAccessException("Some exception"); - when(asyncClient.renewSessionLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + when(asyncClient.renewSessionLock(maxDuration)).thenReturn(publisher.mono()); - client.renewSessionLock(LOCK_TOKEN, maxDuration, onErrorConsumer); + client.renewSessionLock(maxDuration, onErrorConsumer); // Act publisher.error(testError); // Assert - verify(asyncClient).renewSessionLock(LOCK_TOKEN, maxDuration); + verify(asyncClient).renewSessionLock(maxDuration); verify(onErrorConsumer).accept(testError); } @@ -262,15 +264,15 @@ void autoRenewSessionLockFailsNull() { final TestPublisher publisher = TestPublisher.create(); final Throwable testError = new IllegalAccessException("Some exception"); - when(asyncClient.renewSessionLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + when(asyncClient.renewSessionLock(maxDuration)).thenReturn(publisher.mono()); - client.renewSessionLock(LOCK_TOKEN, maxDuration, null); + client.renewSessionLock(maxDuration, null); // Act publisher.error(testError); // Assert - verify(asyncClient).renewSessionLock(LOCK_TOKEN, maxDuration); + verify(asyncClient).renewSessionLock(maxDuration); verify(onErrorConsumer, never()).accept(testError); } @@ -392,12 +394,12 @@ void deadLetterMessageWithOptions() { @Test void getSessionState() { // Arrange - final String sessionId = "a-session-id"; final byte[] contents = new byte[]{10, 111, 23}; - when(asyncClient.getSessionState(sessionId)).thenReturn(Mono.just(contents)); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.getSessionState(SESSION_ID)).thenReturn(Mono.just(contents)); // Act - final byte[] actual = client.getSessionState(sessionId); + final byte[] actual = client.getSessionState(); // Assert assertEquals(contents, actual); @@ -406,11 +408,11 @@ void getSessionState() { @Test void getSessionStateNull() { // Arrange - final String sessionId = "a-session-id"; - when(asyncClient.getSessionState(sessionId)).thenReturn(Mono.empty()); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.getSessionState(SESSION_ID)).thenReturn(Mono.empty()); // Act - final byte[] actual = client.getSessionState(sessionId); + final byte[] actual = client.getSessionState(); // Assert assertNull(actual); @@ -420,7 +422,8 @@ void getSessionStateNull() { void peekMessage() { // Arrange final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); - when(asyncClient.peekMessage()).thenReturn(Mono.just(message)); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.peekMessage(SESSION_ID)).thenReturn(Mono.just(message)); // Act final ServiceBusReceivedMessage actual = client.peekMessage(); @@ -432,7 +435,8 @@ void peekMessage() { @Test void peekMessageEmptyEntity() { // Arrange - when(asyncClient.peekMessage()).thenReturn(Mono.empty()); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.peekMessage(SESSION_ID)).thenReturn(Mono.empty()); // Act final ServiceBusReceivedMessage actual = client.peekMessage(); @@ -446,7 +450,8 @@ void peekMessageFromSequence() { // Arrange final long sequenceNumber = 154; final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); - when(asyncClient.peekMessageAt(sequenceNumber)).thenReturn(Mono.just(message)); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.peekMessageAt(sequenceNumber, SESSION_ID)).thenReturn(Mono.just(message)); // Act final ServiceBusReceivedMessage actual = client.peekMessageAt(sequenceNumber); @@ -462,7 +467,8 @@ void peekMessageFromSequence() { void peekMessagesEmptyEntity() { // Arrange final int maxMessages = 10; - when(asyncClient.peekMessages(maxMessages)).thenReturn(Flux.empty()); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.peekMessages(maxMessages, SESSION_ID)).thenReturn(Flux.empty()); // Act final IterableStream actual = client.peekMessages(maxMessages); @@ -505,7 +511,9 @@ void peekMessagesMax() { }); }); - when(asyncClient.peekMessages(maxMessages)).thenReturn(messages); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.peekMessages(maxMessages, SESSION_ID)).thenReturn(messages); + // Act final IterableStream actual = client.peekMessages(maxMessages); @@ -549,7 +557,8 @@ void peekMessagesLessThan() { }); }); - when(asyncClient.peekMessages(maxMessages)).thenReturn(messages); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.peekMessages(maxMessages, SESSION_ID)).thenReturn(messages); // Act final IterableStream actual = client.peekMessages(maxMessages); @@ -576,7 +585,8 @@ void peekMessagesMaxSequenceNumber() { sink.complete(); })); - when(asyncClient.peekMessagesAt(maxMessages, sequenceNumber)).thenReturn(messages); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.peekMessagesAt(maxMessages, sequenceNumber, SESSION_ID)).thenReturn(messages); // Act final IterableStream actual = client.peekMessagesAt(maxMessages, sequenceNumber); @@ -758,7 +768,8 @@ void receiveDeferredMessage() { // Arrange final long sequenceNumber = 231412; final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); - when(asyncClient.receiveDeferredMessage(anyLong())).thenReturn(Mono.just(message)); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.receiveDeferredMessage(sequenceNumber, SESSION_ID)).thenReturn(Mono.just(message)); // Act final ServiceBusReceivedMessage actual = client.receiveDeferredMessage(sequenceNumber); @@ -766,7 +777,7 @@ void receiveDeferredMessage() { // Assert assertEquals(message, actual); - verify(asyncClient).receiveDeferredMessage(sequenceNumber); + verify(asyncClient).receiveDeferredMessage(sequenceNumber, SESSION_ID); } @Test @@ -776,7 +787,8 @@ void receiveDeferredMessageBatch() { final long sequenceNumber2 = 13124; final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class); - when(asyncClient.receiveDeferredMessages(any())).thenReturn(Flux.just(message, message2)); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.receiveDeferredMessages(any(), eq(SESSION_ID))).thenReturn(Flux.just(message, message2)); List collection = Arrays.asList(sequenceNumber, sequenceNumber2); // Act @@ -809,10 +821,12 @@ void renewSessionLock() { // Arrange final String sessionId = "a-session-id"; final OffsetDateTime response = Instant.ofEpochSecond(1585259339).atOffset(ZoneOffset.UTC); - when(asyncClient.renewSessionLock(sessionId)).thenReturn(Mono.just(response)); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.renewSessionLock(SESSION_ID)).thenReturn(Mono.just(response)); + // Act - final OffsetDateTime actual = client.renewSessionLock(sessionId); + final OffsetDateTime actual = client.renewSessionLock(); // Assert assertEquals(response, actual); @@ -821,14 +835,14 @@ void renewSessionLock() { @Test void setSessionState() { // Arrange - final String sessionId = "a-session-id"; final byte[] contents = new byte[]{10, 111, 23}; - when(asyncClient.setSessionState(sessionId, contents)).thenReturn(Mono.empty()); + when(asyncClient.getReceiverOptions()).thenReturn(sessionReceiverOptions); + when(asyncClient.setSessionState(SESSION_ID, contents)).thenReturn(Mono.empty()); // Act - client.setSessionState(sessionId, contents); + client.setSessionState(contents); // Assert - verify(asyncClient).setSessionState(sessionId, contents); + verify(asyncClient).setSessionState(SESSION_ID, contents); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerIntegrationTest.java index fe6687796376..68b32b6ab095 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerIntegrationTest.java @@ -8,7 +8,6 @@ import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder; import com.azure.messaging.servicebus.implementation.MessagingEntityType; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import reactor.core.Disposable; @@ -20,14 +19,12 @@ import java.time.Duration; import java.time.OffsetDateTime; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.IntStream; import static com.azure.messaging.servicebus.TestUtils.getServiceBusMessage; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -103,69 +100,6 @@ void singleUnnamedSession(MessagingEntityType entityType) { } } - /** - * Verifies that we can roll over to a next session. - */ - @Test - void multipleSessions() { - // Arrange - final int entityIndex = TestUtils.USE_CASE_MULTIPLE_SESSION; - final String messageId = "singleUnnamedSession"; - final String now = OffsetDateTime.now().toString(); - final List sessionIds = IntStream.range(0, 3) - .mapToObj(number -> String.join("-", String.valueOf(number), "singleUnnamedSession", now)) - .collect(Collectors.toList()); - - logger.info("------ Session ids ------"); - for (int i = 0; i < sessionIds.size(); i++) { - logger.info("[{}]: {}", i, sessionIds.get(i)); - } - - final String contents = "Some-contents"; - final int numberToSend = 3; - final int maxMessages = numberToSend * sessionIds.size(); - final int maxConcurrency = 2; - final Set set = new HashSet<>(); - - setSenderAndReceiver(MessagingEntityType.SUBSCRIPTION, entityIndex, - builder -> builder.maxConcurrentSessions(maxConcurrency)); - - final Disposable subscription = Flux.interval(Duration.ofMillis(500)) - .take(maxMessages) - .flatMap(index -> { - final int i = (int) (index % sessionIds.size()); - final String id = sessionIds.get(i); - final ServiceBusMessage message = getServiceBusMessage(contents, messageId) - .setSessionId(id); - messagesPending.incrementAndGet(); - return sender.sendMessage(message).thenReturn( - String.format("sessionId[%s] sent[%s] Message sent.", id, index)); - }).subscribe( - message -> logger.info(message), - error -> logger.error("Error encountered.", error), - () -> logger.info("Finished sending.")); - - // Act & Assert - try { - StepVerifier.create(receiver.receiveMessages()) - .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency, messageId, contents, context)) - .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency, messageId, contents, context)) - .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency, messageId, contents, context)) - - .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency, messageId, contents, context)) - .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency, messageId, contents, context)) - .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency, messageId, contents, context)) - - .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency + 1, messageId, contents, context)) - .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency + 1, messageId, contents, context)) - .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency + 1, messageId, contents, context)) - .thenCancel() - .verify(Duration.ofMinutes(2)); - } finally { - subscription.dispose(); - } - } - private void assertFromSession(List sessionIds, Set currentSessions, int maxSize, String messageId, String contents, ServiceBusReceivedMessageContext context) { logger.info("Verifying message: {}", context.getSessionId()); @@ -192,7 +126,7 @@ private void setSenderAndReceiver(MessagingEntityType entityType, int entityInde .buildAsyncClient(); ServiceBusSessionReceiverClientBuilder sessionBuilder = getSessionReceiverBuilder(false, entityType, entityIndex, false); - this.receiver = onBuild.apply(sessionBuilder).buildAsyncClient(); + this.receiver = onBuild.apply(sessionBuilder).buildAsyncClient().acceptSession(sessionId).block(); } private static void assertMessageEquals(String sessionId, String messageId, String contents, diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java index bd0260b853ff..833b78e2b561 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java @@ -155,8 +155,7 @@ void afterEach(TestInfo testInfo) { @Test void receiveNull() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, - true, 5); + ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, 5); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, tracerProvider, messageSerializer, receiverOptions); @@ -173,7 +172,7 @@ void receiveNull() { void singleUnnamedSession() { // Arrange ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, false, null, - true, 5); + 5); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, tracerProvider, messageSerializer, receiverOptions); @@ -226,7 +225,7 @@ void singleUnnamedSession() { void multipleSessions() { // Arrange final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, MAX_LOCK_RENEWAL, true, - null, true, 5); + null, 5); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, tracerProvider, messageSerializer, receiverOptions); @@ -348,8 +347,9 @@ void multipleSessions() { void multipleReceiveUnnamedSession() { // Arrange final int expectedLinksCreated = 2; + final Callable onRenewal = () -> OffsetDateTime.now().plus(Duration.ofSeconds(5)); final ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, - null, false, 1); + null, 1); sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, tracerProvider, messageSerializer, receiverOptions); @@ -359,6 +359,7 @@ void multipleReceiveUnnamedSession() { when(amqpReceiveLink.getLinkName()).thenReturn(linkName); when(amqpReceiveLink.getSessionId()).thenReturn(Mono.just(sessionId)); + when(amqpReceiveLink.getSessionLockedUntil()).thenReturn(Mono.fromCallable(onRenewal)); // Session 2's final ServiceBusReceiveLink amqpReceiveLink2 = mock(ServiceBusReceiveLink.class); @@ -373,6 +374,7 @@ void multipleReceiveUnnamedSession() { when(amqpReceiveLink2.getEndpointStates()).thenReturn(endpointProcessor); when(amqpReceiveLink2.getLinkName()).thenReturn(linkName2); when(amqpReceiveLink2.getSessionId()).thenReturn(Mono.just(sessionId2)); + when(amqpReceiveLink2.getSessionLockedUntil()).thenReturn(Mono.fromCallable(onRenewal)); final AtomicInteger count = new AtomicInteger(); when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ReceiveMode.class), isNull(), diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java new file mode 100644 index 000000000000..069cb8901a56 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverAsyncClientTest.java @@ -0,0 +1,320 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.amqp.AmqpEndpointState; +import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.AmqpTransportType; +import com.azure.core.amqp.ProxyOptions; +import com.azure.core.amqp.implementation.CbsAuthorizationType; +import com.azure.core.amqp.implementation.ConnectionOptions; +import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.TracerProvider; +import com.azure.core.credential.TokenCredential; +import com.azure.core.util.ClientOptions; +import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.servicebus.implementation.MessagingEntityType; +import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection; +import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor; +import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; +import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink; +import com.azure.messaging.servicebus.models.ReceiveMode; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.engine.SslDomain; +import org.apache.qpid.proton.message.Message; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.ReplayProcessor; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; + +import java.time.Duration; +import java.time.OffsetDateTime; +import java.util.Collections; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class ServiceBusSessionReceiverAsyncClientTest { + private static final ClientOptions CLIENT_OPTIONS = new ClientOptions(); + private static final Duration TIMEOUT = Duration.ofSeconds(10); + private static final Duration MAX_LOCK_RENEWAL = Duration.ofSeconds(5); + + private static final String NAMESPACE = "my-namespace-foo.net"; + private static final String ENTITY_PATH = "queue-name"; + private static final MessagingEntityType ENTITY_TYPE = MessagingEntityType.QUEUE; + + private final ClientLogger logger = new ClientLogger(ServiceBusReceiverAsyncClientTest.class); + private final ReplayProcessor endpointProcessor = ReplayProcessor.cacheLast(); + private final FluxSink endpointSink = endpointProcessor.sink(FluxSink.OverflowStrategy.BUFFER); + private final EmitterProcessor messageProcessor = EmitterProcessor.create(); + private final FluxSink messageSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); + private final TracerProvider tracerProvider = new TracerProvider(Collections.emptyList()); + + private ServiceBusConnectionProcessor connectionProcessor; + private ServiceBusSessionManager sessionManager; + + @Mock + private ServiceBusReceiveLink amqpReceiveLink; + @Mock + private ServiceBusAmqpConnection connection; + @Mock + private TokenCredential tokenCredential; + @Mock + private MessageSerializer messageSerializer; + @Mock + private ServiceBusManagementNode managementNode; + + + @BeforeAll + static void beforeAll() { + StepVerifier.setDefaultTimeout(Duration.ofSeconds(60)); + } + + @AfterAll + static void afterAll() { + StepVerifier.resetDefaultTimeout(); + } + + @BeforeEach + void beforeEach(TestInfo testInfo) { + logger.info("===== [{}] Setting up. =====", testInfo.getDisplayName()); + + MockitoAnnotations.initMocks(this); + + // Forcing us to publish the messages we receive on the AMQP link on single. Similar to how it is done + // in ReactorExecutor. + when(amqpReceiveLink.receive()).thenReturn(messageProcessor.publishOn(Schedulers.single())); + + when(amqpReceiveLink.getHostname()).thenReturn(NAMESPACE); + when(amqpReceiveLink.getEntityPath()).thenReturn(ENTITY_PATH); + when(amqpReceiveLink.getEndpointStates()).thenReturn(endpointProcessor); + + ConnectionOptions connectionOptions = new ConnectionOptions(NAMESPACE, tokenCredential, + CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, + new AmqpRetryOptions().setTryTimeout(TIMEOUT), ProxyOptions.SYSTEM_DEFAULTS, Schedulers.boundedElastic(), + CLIENT_OPTIONS, SslDomain.VerifyMode.VERIFY_PEER_NAME); + + when(connection.getEndpointStates()).thenReturn(endpointProcessor); + endpointSink.next(AmqpEndpointState.ACTIVE); + + when(connection.getManagementNode(ENTITY_PATH, ENTITY_TYPE)) + .thenReturn(Mono.just(managementNode)); + + connectionProcessor = + Flux.create(sink -> sink.next(connection)) + .subscribeWith(new ServiceBusConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(), + connectionOptions.getRetry())); + } + + @AfterEach + void afterEach(TestInfo testInfo) { + logger.info("===== [{}] Tearing down. =====", testInfo.getDisplayName()); + + if (sessionManager != null) { + sessionManager.close(); + } + + if (connectionProcessor != null) { + connectionProcessor.dispose(); + } + + Mockito.framework().clearInlineMocks(); + } + + @Test + void acceptSession() { + // Arrange + ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, null, null); + final String lockToken = "a-lock-token"; + final String linkName = "my-link-name"; + final String sessionId = linkName; + final OffsetDateTime sessionLockedUntil = OffsetDateTime.now().plus(Duration.ofSeconds(30)); + + final Message message = mock(Message.class); + final ServiceBusReceivedMessage receivedMessage = mock(ServiceBusReceivedMessage.class); + + when(messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage); + when(receivedMessage.getSessionId()).thenReturn(sessionId); + when(receivedMessage.getLockToken()).thenReturn(lockToken); + + final int numberOfMessages = 5; + + when(amqpReceiveLink.getLinkName()).thenReturn(linkName); + when(amqpReceiveLink.getSessionId()).thenReturn(Mono.just(sessionId)); + when(amqpReceiveLink.getSessionLockedUntil()) + .thenAnswer(invocation -> Mono.just(sessionLockedUntil)); + when(amqpReceiveLink.updateDisposition(lockToken, Accepted.getInstance())).thenReturn(Mono.empty()); + + when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ReceiveMode.class), isNull(), + any(MessagingEntityType.class), eq(sessionId))).thenReturn(Mono.just(amqpReceiveLink)); + + ServiceBusSessionReceiverAsyncClient client = new ServiceBusSessionReceiverAsyncClient( + NAMESPACE, ENTITY_PATH, + MessagingEntityType.QUEUE, receiverOptions, + connectionProcessor, tracerProvider, + messageSerializer, () -> { } + ); + + // Act & Assert + StepVerifier.create(client.acceptSession(sessionId).flatMapMany(ServiceBusReceiverAsyncClient::receiveMessages)) + .then(() -> { + for (int i = 0; i < numberOfMessages; i++) { + messageSink.next(message); + } + }) + .assertNext(context -> assertMessageEquals(sessionId, receivedMessage, context)) + .assertNext(context -> assertMessageEquals(sessionId, receivedMessage, context)) + .assertNext(context -> assertMessageEquals(sessionId, receivedMessage, context)) + .assertNext(context -> assertMessageEquals(sessionId, receivedMessage, context)) + .assertNext(context -> assertMessageEquals(sessionId, receivedMessage, context)) + .thenCancel().verify(); + } + + @Test + void acceptNextSession() { + // Arrange + ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, Duration.ZERO, false, null, null); + sessionManager = new ServiceBusSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, + tracerProvider, messageSerializer, receiverOptions); + + final int numberOfMessages = 5; + final Callable onRenewal = () -> OffsetDateTime.now().plus(Duration.ofSeconds(5)); + + final String sessionId = "session-1"; + final String lockToken = "a-lock-token"; + final String linkName = "my-link-name"; + + final Message message = mock(Message.class); + final ServiceBusReceivedMessage receivedMessage = mock(ServiceBusReceivedMessage.class); + + when(receivedMessage.getSessionId()).thenReturn(sessionId); + when(receivedMessage.getLockToken()).thenReturn(lockToken); + + when(amqpReceiveLink.getLinkName()).thenReturn(linkName); + when(amqpReceiveLink.getSessionId()).thenReturn(Mono.just(sessionId)); + when(amqpReceiveLink.getSessionLockedUntil()).thenReturn(Mono.fromCallable(onRenewal)); + when(amqpReceiveLink.updateDisposition(lockToken, Accepted.getInstance())).thenReturn(Mono.empty()); + + // Session 2's messages + final ServiceBusReceiveLink amqpReceiveLink2 = mock(ServiceBusReceiveLink.class); + final Message message2 = mock(Message.class); + final ServiceBusReceivedMessage receivedMessage2 = mock(ServiceBusReceivedMessage.class); + final String sessionId2 = "session-2"; + final String lockToken2 = "a-lock-token-2"; + final String linkName2 = "my-link-name-2"; + final TestPublisher messagePublisher2 = TestPublisher.create(); + final Flux messageFlux2 = messagePublisher2.flux(); + + when(receivedMessage2.getSessionId()).thenReturn(sessionId2); + when(receivedMessage2.getLockToken()).thenReturn(lockToken2); + + when(amqpReceiveLink2.receive()).thenReturn(messageFlux2); + when(amqpReceiveLink2.getHostname()).thenReturn(NAMESPACE); + when(amqpReceiveLink2.getEntityPath()).thenReturn(ENTITY_PATH); + when(amqpReceiveLink2.getEndpointStates()).thenReturn(endpointProcessor); + when(amqpReceiveLink2.getLinkName()).thenReturn(linkName2); + when(amqpReceiveLink2.getSessionId()).thenReturn(Mono.just(sessionId2)); + when(amqpReceiveLink2.getSessionLockedUntil()).thenReturn(Mono.fromCallable(onRenewal)); + when(amqpReceiveLink2.updateDisposition(lockToken2, Accepted.getInstance())).thenReturn(Mono.empty()); + + final AtomicInteger count = new AtomicInteger(); + when(connection.createReceiveLink(anyString(), eq(ENTITY_PATH), any(ReceiveMode.class), isNull(), + any(MessagingEntityType.class), isNull())).thenAnswer(invocation -> { + final int number = count.getAndIncrement(); + switch (number) { + case 0: + return Mono.just(amqpReceiveLink); + case 1: + return Mono.just(amqpReceiveLink2); + default: + return Mono.empty(); + } + }); + + when(messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage); + when(messageSerializer.deserialize(message2, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage2); + + when(managementNode.renewSessionLock(sessionId, linkName)).thenReturn(Mono.fromCallable(onRenewal)); + when(managementNode.renewSessionLock(sessionId2, linkName2)).thenReturn(Mono.fromCallable(onRenewal)); + + ServiceBusSessionReceiverAsyncClient client = new ServiceBusSessionReceiverAsyncClient( + NAMESPACE, ENTITY_PATH, + MessagingEntityType.QUEUE, receiverOptions, + connectionProcessor, tracerProvider, + messageSerializer, () -> { } + ); + + // Act & Assert + StepVerifier.create(client.acceptNextSession().flatMapMany(ServiceBusReceiverAsyncClient::receiveMessages)) + .then(() -> { + for (int i = 0; i < numberOfMessages; i++) { + messageSink.next(message); + } + }) + .assertNext(context -> { + assertMessageEquals(sessionId, receivedMessage, context); + }) + .assertNext(context -> { + assertMessageEquals(sessionId, receivedMessage, context); + }) + .assertNext(context -> { + assertMessageEquals(sessionId, receivedMessage, context); + }) + .assertNext(context -> { + assertMessageEquals(sessionId, receivedMessage, context); + }) + .assertNext(context -> { + assertMessageEquals(sessionId, receivedMessage, context); + }) + .thenAwait(Duration.ofSeconds(1)).thenCancel().verify(); + + StepVerifier.create(client.acceptNextSession().flatMapMany(ServiceBusReceiverAsyncClient::receiveMessages)) + .then(() -> { + for (int i = 0; i < 3; i++) { + messagePublisher2.next(message2); + } + }) + .assertNext(context -> { + assertMessageEquals(sessionId2, receivedMessage2, context); + }) + .assertNext(context -> { + assertMessageEquals(sessionId2, receivedMessage2, context); + }) + .assertNext(context -> { + assertMessageEquals(sessionId2, receivedMessage2, context); + }) + .thenAwait(Duration.ofSeconds(1)) + .thenCancel() + .verify(); + } + + private static void assertMessageEquals(String sessionId, ServiceBusReceivedMessage expected, + ServiceBusReceivedMessageContext actual) { + assertEquals(sessionId, actual.getSessionId()); + assertNull(actual.getThrowable()); + + assertEquals(expected, actual.getMessage()); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClientTest.java new file mode 100644 index 000000000000..189636025e76 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionReceiverClientTest.java @@ -0,0 +1,80 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +class ServiceBusSessionReceiverClientTest { + + @Mock + private ServiceBusSessionReceiverAsyncClient sessionAsyncClient; + + @Mock + private ServiceBusReceiverAsyncClient asyncClient; + + @BeforeEach + void beforeEach(TestInfo testInfo) { + MockitoAnnotations.initMocks(this); + } + + @AfterEach + void afterEach(TestInfo testInfo) { + Mockito.framework().clearInlineMocks(); + } + + + @Test + void acceptSession() { + when(sessionAsyncClient.acceptSession(anyString())).thenReturn(Mono.just(asyncClient)); + ServiceBusSessionReceiverClient sessionClient = new ServiceBusSessionReceiverClient(sessionAsyncClient, + Duration.ofMillis(100)); + + assertNotNull(sessionClient.acceptSession("sessionId")); + } + + @Test + void acceptSessionTimeout() { + when(sessionAsyncClient.acceptSession(anyString())).thenReturn(Mono.just(asyncClient) + .delayElement(Duration.ofMillis(100))); + ServiceBusSessionReceiverClient sessionClient = new ServiceBusSessionReceiverClient(sessionAsyncClient, + Duration.ofMillis(50)); + + assertThrows(IllegalStateException.class, + () -> sessionClient.acceptSession("sessionId")); + } + + @Test + void acceptNextSession() { + when(sessionAsyncClient.acceptNextSession()).thenReturn(Mono.just(asyncClient)); + ServiceBusSessionReceiverClient sessionClient = new ServiceBusSessionReceiverClient(sessionAsyncClient, + Duration.ofMillis(100)); + + assertNotNull(sessionClient.acceptNextSession()); + } + + @Test + void acceptNextSessionTimeout() { + when(sessionAsyncClient.acceptNextSession()).thenReturn(Mono.just(asyncClient) + .delayElement(Duration.ofMillis(100))); + ServiceBusSessionReceiverClient sessionClient = new ServiceBusSessionReceiverClient(sessionAsyncClient, + Duration.ofMillis(50)); + + assertThrows(IllegalStateException.class, + () -> sessionClient.acceptNextSession()); + } +}