diff --git a/sdk/servicebus/azure-messaging-servicebus/README.md b/sdk/servicebus/azure-messaging-servicebus/README.md index 18f538c07392..833445f2b593 100644 --- a/sdk/servicebus/azure-messaging-servicebus/README.md +++ b/sdk/servicebus/azure-messaging-servicebus/README.md @@ -250,7 +250,7 @@ Unlike non-session-enabled queues or subscriptions, only a single receiver can r receiver fetches a session, Service Bus locks the session for that receiver, and it has exclusive access to messages in that session. -#### Send message to a session +#### Send a message to a session Create a `ServiceBusSenderClient` for a session enabled queue or topic subscription. Setting `.setSessionId(String)` on a `ServiceBusMessage` will publish the message to that session. If the session does not exist, it is created. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java index 6be7af21f44f..06a9a4ac643c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ReceiverOptions.java @@ -5,6 +5,8 @@ import com.azure.messaging.servicebus.models.ReceiveMode; +import java.time.Duration; + /** * Options set when creating a service bus receiver. */ @@ -15,26 +17,39 @@ class ReceiverOptions { private final boolean isRollingSessionReceiver; private final Integer maxConcurrentSessions; private final boolean isSessionReceiver; + private final Duration maxAutoLockRenewalDuration; - ReceiverOptions(ReceiveMode receiveMode, int prefetchCount) { + ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxAutoLockRenewalDuration) { this.receiveMode = receiveMode; this.prefetchCount = prefetchCount; + this.maxAutoLockRenewalDuration = maxAutoLockRenewalDuration; this.sessionId = null; this.isRollingSessionReceiver = false; this.maxConcurrentSessions = null; this.isSessionReceiver = false; } - ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, String sessionId, - boolean isRollingSessionReceiver, Integer maxConcurrentSessions) { + ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxAutoLockRenewalDuration, + String sessionId, boolean isRollingSessionReceiver, Integer maxConcurrentSessions) { this.receiveMode = receiveMode; this.prefetchCount = prefetchCount; + this.maxAutoLockRenewalDuration = maxAutoLockRenewalDuration; this.sessionId = sessionId; this.isRollingSessionReceiver = isRollingSessionReceiver; this.maxConcurrentSessions = maxConcurrentSessions; this.isSessionReceiver = true; } + /** + * Gets whether or not auto-lock renewal is enabled. If the receiver is a session aware receiver, it renews the lock + * for the entire session; otherwise, renews the lock for each message. + * + * @return true if it renews the session or message lock; false otherwise. + */ + boolean autoLockRenewalEnabled() { + return maxAutoLockRenewalDuration != null && maxAutoLockRenewalDuration != Duration.ZERO; + } + /** * Gets the receive mode for the message. * @@ -62,6 +77,11 @@ int getPrefetchCount() { return prefetchCount; } + /** + * Gets whether or not the receiver is a session-aware receiver. + * + * @return true if it is a session-aware receiver; false otherwise. + */ boolean isSessionReceiver() { return isSessionReceiver; } @@ -84,4 +104,14 @@ public boolean isRollingSessionReceiver() { public Integer getMaxConcurrentSessions() { return maxConcurrentSessions; } + + /** + * Gets the maximum Duration to renew the message or session lock. + * + * @return The maximum Duration to renew the message or session lock; {@code null} or {@link Duration#ZERO} + * if auto-lock renewal is disabled. + */ + public Duration getMaxAutoLockRenewalDuration() { + return maxAutoLockRenewalDuration; + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index b7e607cbf84f..32127b31606e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -38,6 +38,7 @@ import java.net.InetSocketAddress; import java.net.Proxy; +import java.time.Duration; import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -443,6 +444,9 @@ private static String getEntityPath(ClientLogger logger, MessagingEntityType ent /** * Builder for creating {@link ServiceBusSenderClient} and {@link ServiceBusSenderAsyncClient} to publish messages * to Service Bus. + * + * @see ServiceBusSenderAsyncClient + * @see ServiceBusSenderClient */ @ServiceClientBuilder(serviceClients = {ServiceBusSenderClient.class, ServiceBusSenderAsyncClient.class}) public final class ServiceBusSenderClientBuilder { @@ -530,7 +534,10 @@ public ServiceBusSenderClient buildClient() { /** * Builder for creating {@link ServiceBusReceiverClient} and {@link ServiceBusReceiverAsyncClient} to consume - * messages from a session aware Service Bus entity. + * messages from a session aware Service Bus entity. + * + * @see ServiceBusReceiverAsyncClient + * @see ServiceBusReceiverClient */ @ServiceClientBuilder(serviceClients = {ServiceBusReceiverClient.class, ServiceBusReceiverAsyncClient.class}) public final class ServiceBusSessionReceiverClientBuilder { @@ -542,16 +549,29 @@ public final class ServiceBusSessionReceiverClientBuilder { private String sessionId; private String subscriptionName; private String topicName; + private Duration maxAutoLockRenewalDuration; private ServiceBusSessionReceiverClientBuilder() { } + /** + * Enables auto-lock renewal by renewing each session lock until the {@code maxAutoLockRenewalDuration} has + * elapsed. + * + * @param maxAutoLockRenewalDuration Maximum amount of time to renew the session lock. + * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. + */ + public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewalDuration(Duration maxAutoLockRenewalDuration) { + this.maxAutoLockRenewalDuration = maxAutoLockRenewalDuration; + return this; + } + /** * Enables session processing roll-over by processing at most {@code maxConcurrentSessions}. * * @param maxConcurrentSessions Maximum number of concurrent sessions to process at any given time. * - * @return The modified {@link ServiceBusReceiverClientBuilder} object. + * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. * @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1. */ public ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) { @@ -574,7 +594,7 @@ public ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcu * * @param prefetchCount The prefetch count. * - * @return The modified {@link ServiceBusReceiverClientBuilder} object. + * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. */ public ServiceBusSessionReceiverClientBuilder prefetchCount(int prefetchCount) { this.prefetchCount = prefetchCount; @@ -586,7 +606,7 @@ public ServiceBusSessionReceiverClientBuilder prefetchCount(int prefetchCount) { * * @param queueName Name of the queue. * - * @return The modified {@link ServiceBusReceiverClientBuilder} object. + * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. */ public ServiceBusSessionReceiverClientBuilder queueName(String queueName) { this.queueName = queueName; @@ -598,7 +618,7 @@ public ServiceBusSessionReceiverClientBuilder queueName(String queueName) { * * @param receiveMode Mode for receiving messages. * - * @return The modified {@link ServiceBusReceiverClientBuilder} object. + * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. */ public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMode) { this.receiveMode = receiveMode; @@ -610,7 +630,7 @@ public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMod * * @param sessionId session id. * - * @return The modified {@link ServiceBusReceiverClientBuilder} object. + * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. */ public ServiceBusSessionReceiverClientBuilder sessionId(String sessionId) { this.sessionId = sessionId; @@ -623,7 +643,7 @@ public ServiceBusSessionReceiverClientBuilder sessionId(String sessionId) { * * @param subscriptionName Name of the subscription. * - * @return The modified {@link ServiceBusReceiverClientBuilder} object. + * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. * @see #topicName A topic name should be set as well. */ public ServiceBusSessionReceiverClientBuilder subscriptionName(String subscriptionName) { @@ -636,7 +656,7 @@ public ServiceBusSessionReceiverClientBuilder subscriptionName(String subscripti * * @param topicName Name of the topic. * - * @return The modified {@link ServiceBusReceiverClientBuilder} object. + * @return The modified {@link ServiceBusSessionReceiverClientBuilder} object. * @see #subscriptionName A subscription name should be set as well. */ public ServiceBusSessionReceiverClientBuilder topicName(String topicName) { @@ -665,11 +685,14 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() { if (prefetchCount < 1) { throw logger.logExceptionAsError(new IllegalArgumentException(String.format( "prefetchCount (%s) cannot be less than 1.", prefetchCount))); + } else if (maxAutoLockRenewalDuration != null && maxAutoLockRenewalDuration.isNegative()) { + throw logger.logExceptionAsError(new IllegalArgumentException(String.format( + "maxAutoLockRenewalDuration (%s) cannot be negative.", maxAutoLockRenewalDuration))); } final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer); - final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, sessionId, - isRollingSessionReceiver(), maxConcurrentSessions); + final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, + maxAutoLockRenewalDuration, sessionId, isRollingSessionReceiver(), maxConcurrentSessions); if (CoreUtils.isNullOrEmpty(sessionId)) { final UnnamedSessionManager sessionManager = new UnnamedSessionManager(entityPath, entityType, @@ -726,6 +749,9 @@ private boolean isRollingSessionReceiver() { /** * Builder for creating {@link ServiceBusReceiverClient} and {@link ServiceBusReceiverAsyncClient} to consume * messages from Service Bus. + * + * @see ServiceBusReceiverAsyncClient + * @see ServiceBusReceiverClient */ @ServiceClientBuilder(serviceClients = {ServiceBusReceiverClient.class, ServiceBusReceiverAsyncClient.class}) public final class ServiceBusReceiverClientBuilder { @@ -734,10 +760,23 @@ public final class ServiceBusReceiverClientBuilder { private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK; private String subscriptionName; private String topicName; + private Duration maxAutoLockRenewalDuration; private ServiceBusReceiverClientBuilder() { } + /** + * Enables auto-lock renewal by renewing each message lock renewal until the {@code maxAutoLockRenewalDuration} + * has elapsed. + * + * @param maxAutoLockRenewalDuration Maximum amount of time to renew the session lock. + * @return The modified {@link ServiceBusReceiverClientBuilder} object. + */ + public ServiceBusReceiverClientBuilder maxAutoLockRenewalDuration(Duration maxAutoLockRenewalDuration) { + this.maxAutoLockRenewalDuration = maxAutoLockRenewalDuration; + return this; + } + /** * Sets the prefetch count of the receiver. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link * ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1. @@ -827,10 +866,14 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() { if (prefetchCount < 1) { throw logger.logExceptionAsError(new IllegalArgumentException(String.format( "prefetchCount (%s) cannot be less than 1.", prefetchCount))); + } else if (maxAutoLockRenewalDuration != null && maxAutoLockRenewalDuration.isNegative()) { + throw logger.logExceptionAsError(new IllegalArgumentException(String.format( + "maxAutoLockRenewalDuration (%s) cannot be negative.", maxAutoLockRenewalDuration))); } final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer); - final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount); + final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, + maxAutoLockRenewalDuration); return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath, entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT, diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java index 87746a1284f3..71c3d937fafc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java @@ -5,6 +5,7 @@ import com.azure.core.util.Context; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; import java.util.Arrays; @@ -49,6 +50,17 @@ public class ServiceBusMessage { private String to; private String viaPartitionKey; + /** + * Creates a {@link ServiceBusMessage} with a {@link java.nio.charset.StandardCharsets#UTF_8 UTF_8} encoded body. + * + * @param body The content of the Service bus message. + * + * @throws NullPointerException if {@code body} is null. + */ + public ServiceBusMessage(String body) { + this(Objects.requireNonNull(body, "'body' cannot be null.").getBytes(StandardCharsets.UTF_8)); + } + /** * Creates a {@link ServiceBusMessage} containing the {@code body}. * @@ -62,8 +74,8 @@ public ServiceBusMessage(byte[] body) { } /** - * Creates a {@link ServiceBusMessage} using properties from {@code receivedMessage}. - * This is normally used when a {@link ServiceBusReceivedMessage} needs to be sent to another entity. + * Creates a {@link ServiceBusMessage} using properties from {@code receivedMessage}. This is normally used when a + * {@link ServiceBusReceivedMessage} needs to be sent to another entity. * * @param receivedMessage The received message to create new message from. * @@ -142,7 +154,6 @@ public ServiceBusMessage setContentType(String contentType) { *

* * @return correlation id of this message - * * @see Message * Routing and Correlation */ @@ -156,7 +167,6 @@ public String getCorrelationId() { * @param correlationId correlation id of this message * * @return The updated {@link ServiceBusMessage}. - * * @see #getCorrelationId() */ public ServiceBusMessage setCorrelationId(String correlationId) { @@ -214,7 +224,6 @@ public ServiceBusMessage setMessageId(String messageId) { * this value. * * @return The partition key of this message - * * @see Partitioned * entities */ @@ -228,7 +237,6 @@ public String getPartitionKey() { * @param partitionKey partition key of this message * * @return The updated {@link ServiceBusMessage}. - * * @see #getPartitionKey() */ public ServiceBusMessage setPartitionKey(String partitionKey) { @@ -244,7 +252,6 @@ public ServiceBusMessage setPartitionKey(String partitionKey) { * it expects the reply to be sent to. * * @return ReplyTo property value of this message - * * @see Message * Routing and Correlation */ @@ -258,7 +265,6 @@ public String getReplyTo() { * @param replyTo ReplyTo property value of this message * * @return The updated {@link ServiceBusMessage}. - * * @see #getReplyTo() */ public ServiceBusMessage setReplyTo(String replyTo) { @@ -302,7 +308,6 @@ public ServiceBusMessage setTo(String to) { * does. * * @return Time to live duration of this message - * * @see Message Expiration */ public Duration getTimeToLive() { @@ -315,7 +320,6 @@ public Duration getTimeToLive() { * @param timeToLive Time to Live duration of this message * * @return The updated {@link ServiceBusMessage}. - * * @see #getTimeToLive() */ public ServiceBusMessage setTimeToLive(Duration timeToLive) { @@ -333,7 +337,6 @@ public ServiceBusMessage setTimeToLive(Duration timeToLive) { *

* * @return the instant at which the message will be enqueued in Azure Service Bus - * * @see Message Sequencing and * Timestamps */ @@ -347,7 +350,6 @@ public Instant getScheduledEnqueueTime() { * @param scheduledEnqueueTime the instant at which this message should be enqueued in Azure Service Bus. * * @return The updated {@link ServiceBusMessage}. - * * @see #getScheduledEnqueueTime() */ public ServiceBusMessage setScheduledEnqueueTime(Instant scheduledEnqueueTime) { @@ -362,7 +364,6 @@ public ServiceBusMessage setScheduledEnqueueTime(Instant scheduledEnqueueTime) { * to the reply entity. * * @return ReplyToSessionId property value of this message - * * @see Message * Routing and Correlation */ @@ -385,12 +386,13 @@ public ServiceBusMessage setReplyToSessionId(String replyToSessionId) { /** * Gets the partition key for sending a message to a entity via another partitioned transfer entity. * - * If a message is sent via a transfer queue in the scope of a transaction, this value selects the - * transfer queue partition: This is functionally equivalent to {@link #getPartitionKey()} and ensures that - * messages are kept together and in order as they are transferred. + * If a message is sent via a transfer queue in the scope of a transaction, this value selects the transfer queue + * partition: This is functionally equivalent to {@link #getPartitionKey()} and ensures that messages are kept + * together and in order as they are transferred. * * @return partition key on the via queue. - * @see Transfers and Send Via + * @see Transfers + * and Send Via */ public String getViaPartitionKey() { return viaPartitionKey; @@ -400,6 +402,7 @@ public String getViaPartitionKey() { * Sets a via-partition key for sending a message to a destination entity via another partitioned entity * * @param viaPartitionKey via-partition key of this message + * * @return The updated {@link ServiceBusMessage}. * @see #getViaPartitionKey() */ @@ -446,7 +449,6 @@ Context getContext() { * @param value The value for this context object. * * @return The updated {@link ServiceBusMessage}. - * * @throws NullPointerException if {@code key} or {@code value} is null. */ public ServiceBusMessage addContext(String key, Object value) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 70262794a3ee..6d918cf9aa1f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -15,6 +15,7 @@ import com.azure.core.util.CoreUtils; import com.azure.core.util.IterableStream; import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder; import com.azure.messaging.servicebus.implementation.DispositionStatus; import com.azure.messaging.servicebus.implementation.MessageLockContainer; import com.azure.messaging.servicebus.implementation.MessagingEntityType; @@ -22,7 +23,6 @@ import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink; import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor; import com.azure.messaging.servicebus.models.DeadLetterOptions; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; import com.azure.messaging.servicebus.models.ReceiveMode; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; @@ -52,10 +52,27 @@ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiateWithDefaultCredential} * *

Receive all messages from Service Bus resource

- * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#all } + * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#all} * *

Receive messages in {@link ReceiveMode#RECEIVE_AND_DELETE} mode from Service Bus resource

- * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receiveWithReceiveAndDeleteMode } + * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.receiveWithReceiveAndDeleteMode} + * + *

Receive messages from a specific session

+ *

To fetch messages from a specific session, set {@link ServiceBusSessionReceiverClientBuilder#sessionId(String)}. + *

+ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId} + * + *

Process messages from multiple sessions

+ *

To process messages from multiple sessions, set + * {@link ServiceBusSessionReceiverClientBuilder#maxConcurrentSessions(int)}. This will process in parallel at most + * {@code maxConcurrentSessions}. In addition, when all the messages in a session have been consumed, it will find the + * next available session to process.

+ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#multiplesessions} + * + *

Process messages from the first available session

+ *

To process messages from the first available session, switch to {@link ServiceBusSessionReceiverClientBuilder} and + * build the receiver client. It will find the first available session to process messages from.

+ * {@codesnippet com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#singlesession} * *

Rate limiting consumption of messages from Service Bus resource

*

For message receivers that need to limit the number of messages they receive at a given time, they can use @@ -79,7 +96,6 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable { private final ServiceBusConnectionProcessor connectionProcessor; private final TracerProvider tracerProvider; private final MessageSerializer messageSerializer; - private final ReceiveAsyncOptions defaultReceiveOptions; private final Runnable onClientClose; private final UnnamedSessionManager unnamedSessionManager; @@ -114,9 +130,6 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable { this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null."); this.managementNodeLocks = new MessageLockContainer(cleanupInterval); - this.defaultReceiveOptions = new ReceiveAsyncOptions() - .setIsAutoCompleteEnabled(true) - .setMaxAutoLockRenewalDuration(connectionProcessor.getRetryOptions().getTryTimeout()); this.unnamedSessionManager = null; } @@ -136,9 +149,6 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable { this.unnamedSessionManager = Objects.requireNonNull(unnamedSessionManager, "'sessionManager' cannot be null."); this.managementNodeLocks = new MessageLockContainer(cleanupInterval); - this.defaultReceiveOptions = new ReceiveAsyncOptions() - .setIsAutoCompleteEnabled(true) - .setMaxAutoLockRenewalDuration(connectionProcessor.getRetryOptions().getTryTimeout()); } /** @@ -244,7 +254,11 @@ public Mono abandon(MessageLockToken lockToken, Map proper * @throws IllegalArgumentException if {@link MessageLockToken#getLockToken()} returns a null lock token. */ public Mono complete(MessageLockToken lockToken) { - return complete(lockToken, receiverOptions.getSessionId()); + if (lockToken instanceof ServiceBusReceivedMessage) { + return complete(lockToken, ((ServiceBusReceivedMessage) lockToken).getSessionId()); + } else { + return complete(lockToken, null); + } } /** @@ -446,8 +460,8 @@ public Mono getSessionState(String sessionId) { * @return A peeked {@link ServiceBusReceivedMessage}. * @see Message browsing */ - public Mono peek() { - return peek(receiverOptions.getSessionId()); + public Mono browse() { + return browse(receiverOptions.getSessionId()); } /** @@ -460,7 +474,7 @@ public Mono peek() { * @return A peeked {@link ServiceBusReceivedMessage}. * @see Message browsing */ - public Mono peek(String sessionId) { + public Mono browse(String sessionId) { if (isDisposed.get()) { return monoError(logger, new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peek"))); @@ -492,8 +506,8 @@ public Mono peek(String sessionId) { * @return A peeked {@link ServiceBusReceivedMessage}. * @see Message browsing */ - public Mono peekAt(long sequenceNumber) { - return peekAt(sequenceNumber, receiverOptions.getSessionId()); + public Mono browseAt(long sequenceNumber) { + return browseAt(sequenceNumber, receiverOptions.getSessionId()); } /** @@ -506,7 +520,7 @@ public Mono peekAt(long sequenceNumber) { * @return A peeked {@link ServiceBusReceivedMessage}. * @see Message browsing */ - public Mono peekAt(long sequenceNumber, String sessionId) { + public Mono browseAt(long sequenceNumber, String sessionId) { if (isDisposed.get()) { return monoError(logger, new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekAt"))); @@ -526,8 +540,8 @@ public Mono peekAt(long sequenceNumber, String sessio * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public Flux peekBatch(int maxMessages) { - return peekBatch(maxMessages, receiverOptions.getSessionId()); + public Flux browseBatch(int maxMessages) { + return browseBatch(maxMessages, receiverOptions.getSessionId()); } /** @@ -540,7 +554,7 @@ public Flux peekBatch(int maxMessages) { * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public Flux peekBatch(int maxMessages, String sessionId) { + public Flux browseBatch(int maxMessages, String sessionId) { if (isDisposed.get()) { return fluxError(logger, new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatch"))); @@ -587,8 +601,8 @@ public Flux peekBatch(int maxMessages, String session * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public Flux peekBatchAt(int maxMessages, long sequenceNumber) { - return peekBatchAt(maxMessages, sequenceNumber, receiverOptions.getSessionId()); + public Flux browseBatchAt(int maxMessages, long sequenceNumber) { + return browseBatchAt(maxMessages, sequenceNumber, receiverOptions.getSessionId()); } /** @@ -603,7 +617,7 @@ public Flux peekBatchAt(int maxMessages, long sequenc * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public Flux peekBatchAt(int maxMessages, long sequenceNumber, String sessionId) { + public Flux browseBatchAt(int maxMessages, long sequenceNumber, String sessionId) { if (isDisposed.get()) { return fluxError(logger, new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatchAt"))); @@ -631,43 +645,10 @@ public Flux peekBatchAt(int maxMessages, long sequenc * downstream consumers are still processing the message. */ public Flux receive() { - return receive(defaultReceiveOptions); - } - - /** - * Receives a stream of {@link ServiceBusReceivedMessage messages} from the Service Bus entity with a set of - * options. To disable lock auto-renewal, set {@link ReceiveAsyncOptions#setMaxAutoLockRenewalDuration(Duration) - * setMaxAutoRenewDuration} to {@link Duration#ZERO} or {@code null}. - * - * @param options Set of options to set when receiving messages. - * - * @return A stream of messages from the Service Bus entity. - * @throws NullPointerException if {@code options} is null. - * @throws IllegalArgumentException if {@link ReceiveAsyncOptions#getMaxAutoLockRenewalDuration() max auto-renew - * duration} is negative. - */ - public Flux receive(ReceiveAsyncOptions options) { - if (isDisposed.get()) { - return fluxError(logger, new IllegalStateException( - String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "receive"))); - } - - if (Objects.isNull(options)) { - return fluxError(logger, new NullPointerException("'options' cannot be null")); - } else if (options.getMaxAutoLockRenewalDuration() != null - && options.getMaxAutoLockRenewalDuration().isNegative()) { - return fluxError(logger, new IllegalArgumentException("'maxAutoRenewDuration' cannot be negative.")); - } - - if (receiverOptions.getReceiveMode() != ReceiveMode.PEEK_LOCK && options.isAutoCompleteEnabled()) { - return fluxError(logger, new UnsupportedOperationException( - "Auto-complete is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.")); - } - if (unnamedSessionManager != null) { - return unnamedSessionManager.receive(options); + return unnamedSessionManager.receive(); } else { - return getOrCreateConsumer(options).receive().map(message -> new ServiceBusReceivedMessageContext(message)); + return getOrCreateConsumer().receive().map(message -> new ServiceBusReceivedMessageContext(message)); } } @@ -913,12 +894,27 @@ private Mono updateDisposition(MessageLockToken message, DispositionStatus } final String lockToken = message.getLockToken(); - logger.info("{}: Update started. Disposition: {}. Lock: {}.", entityPath, dispositionStatus, lockToken); + final String sessionIdToUse; + if (message instanceof ServiceBusReceivedMessage) { + sessionIdToUse = ((ServiceBusReceivedMessage) message).getSessionId(); + if (!CoreUtils.isNullOrEmpty(sessionIdToUse) && !CoreUtils.isNullOrEmpty(sessionId) + && !sessionIdToUse.equals(sessionId)) { + logger.warning("Given sessionId '{}' does not match message's sessionId '{}'", + sessionId, sessionIdToUse); + } + } else if (sessionId == null && !CoreUtils.isNullOrEmpty(receiverOptions.getSessionId())) { + sessionIdToUse = receiverOptions.getSessionId(); + } else { + sessionIdToUse = sessionId; + } + + logger.info("{}: Update started. Disposition: {}. Lock: {}. SessionId {}.", entityPath, dispositionStatus, + lockToken, sessionIdToUse); final Mono performOnManagement = connectionProcessor .flatMap(connection -> connection.getManagementNode(entityPath, entityType)) .flatMap(node -> node.updateDisposition(lockToken, dispositionStatus, deadLetterReason, - deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId))) + deadLetterErrorDescription, propertiesToModify, sessionIdToUse, getLinkName(sessionIdToUse))) .then(Mono.fromRunnable(() -> { logger.info("{}: Update completed. Disposition: {}. Lock: {}.", entityPath, dispositionStatus, lockToken); @@ -927,8 +923,8 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId })); if (unnamedSessionManager != null) { - return unnamedSessionManager.updateDisposition(message, sessionId, dispositionStatus, propertiesToModify, - deadLetterReason, deadLetterErrorDescription) + return unnamedSessionManager.updateDisposition(message, sessionIdToUse, dispositionStatus, + propertiesToModify, deadLetterReason, deadLetterErrorDescription) .flatMap(isSuccess -> { if (isSuccess) { return Mono.empty(); @@ -950,7 +946,7 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId } } - private ServiceBusAsyncConsumer getOrCreateConsumer(ReceiveAsyncOptions options) { + private ServiceBusAsyncConsumer getOrCreateConsumer() { final ServiceBusAsyncConsumer existing = consumer.get(); if (existing != null) { return existing; @@ -981,12 +977,9 @@ private ServiceBusAsyncConsumer getOrCreateConsumer(ReceiveAsyncOptions options) final ServiceBusReceiveLinkProcessor linkMessageProcessor = receiveLink.subscribeWith( new ServiceBusReceiveLinkProcessor(receiverOptions.getPrefetchCount(), retryPolicy, connectionProcessor, context)); - final boolean isAutoLockRenewal = options.getMaxAutoLockRenewalDuration() != null - && !options.getMaxAutoLockRenewalDuration().isZero(); - final ServiceBusAsyncConsumer newConsumer = new ServiceBusAsyncConsumer(linkName, linkMessageProcessor, - messageSerializer, options.isAutoCompleteEnabled(), isAutoLockRenewal, - options.getMaxAutoLockRenewalDuration(), connectionProcessor.getRetryOptions(), + messageSerializer, false, receiverOptions.autoLockRenewalEnabled(), + receiverOptions.getMaxAutoLockRenewalDuration(), connectionProcessor.getRetryOptions(), (token, associatedLinkName) -> renewMessageLock(token, associatedLinkName)); // There could have been multiple threads trying to create this async consumer when the result was null. @@ -1000,12 +993,12 @@ private ServiceBusAsyncConsumer getOrCreateConsumer(ReceiveAsyncOptions options) } /** - * * @return receiver options set by user; */ ReceiverOptions getReceiverOptions() { return receiverOptions; } + /** * Renews the message lock, and updates its value in the container. */ @@ -1023,8 +1016,8 @@ private Mono renewMessageLock(MessageLockToken lockToken, String linkNa } /** - * If the receiver has not connected via {@link #receive(ReceiveAsyncOptions)} or {@link #receive()}, all its - * current operations have been performed through the management node. + * If the receiver has not connected via {@link #receive()}, all its current operations have been performed through + * the management node. * * @return The name of the receive link, or null of it has not connected via a receive link. */ diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index ef7c67dc9d2d..3beab123a9b5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -7,7 +7,6 @@ import com.azure.core.util.IterableStream; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.models.DeadLetterOptions; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; import com.azure.messaging.servicebus.models.ReceiveMode; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; @@ -37,9 +36,6 @@ public final class ServiceBusReceiverClient implements AutoCloseable { private final ServiceBusReceiverAsyncClient asyncClient; private final Duration operationTimeout; private final Object lock = new Object(); - private static final ReceiveAsyncOptions DEFAULT_RECEIVE_OPTIONS = new ReceiveAsyncOptions() - .setIsAutoCompleteEnabled(false) - .setMaxAutoLockRenewalDuration(Duration.ZERO); private final AtomicReference> messageProcessor = new AtomicReference<>(); @@ -308,8 +304,8 @@ public byte[] getSessionState(String sessionId) { * @return A peeked {@link ServiceBusReceivedMessage}. * @see Message browsing */ - public ServiceBusReceivedMessage peek() { - return asyncClient.peek().block(operationTimeout); + public ServiceBusReceivedMessage browse() { + return asyncClient.browse().block(operationTimeout); } /** @@ -322,8 +318,8 @@ public ServiceBusReceivedMessage peek() { * @return A peeked {@link ServiceBusReceivedMessage}. * @see Message browsing */ - public ServiceBusReceivedMessage peek(String sessionId) { - return asyncClient.peek(sessionId).block(operationTimeout); + public ServiceBusReceivedMessage browse(String sessionId) { + return asyncClient.browse(sessionId).block(operationTimeout); } /** @@ -335,8 +331,8 @@ public ServiceBusReceivedMessage peek(String sessionId) { * @return A peeked {@link ServiceBusReceivedMessage}. * @see Message browsing */ - public ServiceBusReceivedMessage peekAt(long sequenceNumber) { - return asyncClient.peekAt(sequenceNumber).block(operationTimeout); + public ServiceBusReceivedMessage browseAt(long sequenceNumber) { + return asyncClient.browseAt(sequenceNumber).block(operationTimeout); } /** @@ -349,8 +345,8 @@ public ServiceBusReceivedMessage peekAt(long sequenceNumber) { * @return A peeked {@link ServiceBusReceivedMessage}. * @see Message browsing */ - public ServiceBusReceivedMessage peekAt(long sequenceNumber, String sessionId) { - return asyncClient.peekAt(sequenceNumber, sessionId).block(operationTimeout); + public ServiceBusReceivedMessage browseAt(long sequenceNumber, String sessionId) { + return asyncClient.browseAt(sequenceNumber, sessionId).block(operationTimeout); } /** @@ -362,13 +358,13 @@ public ServiceBusReceivedMessage peekAt(long sequenceNumber, String sessionId) { * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public IterableStream peekBatch(int maxMessages) { + public IterableStream browseBatch(int maxMessages) { if (maxMessages <= 0) { throw logger.logExceptionAsError(new IllegalArgumentException( "'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages)); } - final Flux messages = asyncClient.peekBatch(maxMessages) + final Flux messages = asyncClient.browseBatch(maxMessages) .timeout(operationTimeout); // Subscribe so we can kick off this operation. @@ -387,13 +383,13 @@ public IterableStream peekBatch(int maxMessages) { * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public IterableStream peekBatch(int maxMessages, String sessionId) { + public IterableStream browseBatch(int maxMessages, String sessionId) { if (maxMessages <= 0) { throw logger.logExceptionAsError(new IllegalArgumentException( "'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages)); } - final Flux messages = asyncClient.peekBatch(maxMessages, sessionId) + final Flux messages = asyncClient.browseBatch(maxMessages, sessionId) .timeout(operationTimeout); // Subscribe so we can kick off this operation. @@ -413,13 +409,13 @@ public IterableStream peekBatch(int maxMessages, Stri * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public IterableStream peekBatchAt(int maxMessages, long sequenceNumber) { + public IterableStream browseBatchAt(int maxMessages, long sequenceNumber) { if (maxMessages <= 0) { throw logger.logExceptionAsError(new IllegalArgumentException( "'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages)); } - final Flux messages = asyncClient.peekBatchAt(maxMessages, sequenceNumber) + final Flux messages = asyncClient.browseBatchAt(maxMessages, sequenceNumber) .timeout(operationTimeout); // Subscribe so we can kick off this operation. @@ -440,15 +436,15 @@ public IterableStream peekBatchAt(int maxMessages, lo * @throws IllegalArgumentException if {@code maxMessages} is not a positive integer. * @see Message browsing */ - public IterableStream peekBatchAt(int maxMessages, long sequenceNumber, + public IterableStream browseBatchAt(int maxMessages, long sequenceNumber, String sessionId) { if (maxMessages <= 0) { throw logger.logExceptionAsError(new IllegalArgumentException( "'maxMessages' cannot be less than or equal to 0. maxMessages: " + maxMessages)); } - final Flux messages = asyncClient.peekBatchAt(maxMessages, sequenceNumber, sessionId) - .timeout(operationTimeout); + final Flux messages = asyncClient.browseBatchAt(maxMessages, sequenceNumber, + sessionId).timeout(operationTimeout); // Subscribe so we can kick off this operation. messages.subscribe(); @@ -635,7 +631,7 @@ private void queueWork(int maximumMessageCount, Duration maxWaitTime, logger.info("[{}]: Started synchronous message subscriber.", id); if (emitterProcessor == null) { - emitterProcessor = this.asyncClient.receive(DEFAULT_RECEIVE_OPTIONS) + emitterProcessor = this.asyncClient.receive() .subscribeWith(EmitterProcessor.create(asyncClient.getReceiverOptions().getPrefetchCount(), false)); messageProcessor.set(emitterProcessor); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/UnnamedSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/UnnamedSessionManager.java index 03b2706ff685..ccd70edfa212 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/UnnamedSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/UnnamedSessionManager.java @@ -17,7 +17,6 @@ import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor; import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; import org.apache.qpid.proton.amqp.transport.DeliveryState; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; @@ -40,7 +39,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static com.azure.core.util.FluxUtil.fluxError; import static com.azure.core.util.FluxUtil.monoError; import static com.azure.messaging.servicebus.implementation.Messages.INVALID_OPERATION_DISPOSED_RECEIVER; import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE; @@ -75,7 +73,6 @@ class UnnamedSessionManager implements AutoCloseable { private final FluxSink> sessionReceiveSink; private volatile Flux receiveFlux; - private volatile ReceiveAsyncOptions receiveAsyncOptions; UnnamedSessionManager(String entityPath, MessagingEntityType entityType, ServiceBusConnectionProcessor connectionProcessor, Duration operationTimeout, TracerProvider tracerProvider, @@ -141,17 +138,12 @@ Mono getSessionState(String sessionId) { * * @return A Flux of messages merged from different sessions. */ - Flux receive(ReceiveAsyncOptions options) { - if (options == null) { - return fluxError(logger, new NullPointerException("'options' cannot be null.")); - } - + Flux receive() { if (!isStarted.getAndSet(true)) { - receiveAsyncOptions = options; this.sessionReceiveSink.onRequest(this::onSessionRequest); if (!receiverOptions.isRollingSessionReceiver()) { - receiveFlux = getSession(options, schedulers.get(0)); + receiveFlux = getSession(schedulers.get(0), false); } else { receiveFlux = Flux.merge(processor, receiverOptions.getMaxConcurrentSessions()); } @@ -279,20 +271,21 @@ private Mono getActiveLink() { * Gets the next available unnamed session with the given receive options and publishes its contents on the given * {@code scheduler}. * - * @param options Receive options. - * + * @param scheduler Scheduler to coordinate received methods on. + * @param disposeOnIdle true to dispose receiver when it idles; false otherwise. * @return A Mono that completes with an unnamed session receiver. */ - private Flux getSession(ReceiveAsyncOptions options, Scheduler scheduler) { + private Flux getSession(Scheduler scheduler, boolean disposeOnIdle) { return getActiveLink().flatMap(link -> link.getSessionId() .map(linkName -> sessionReceivers.compute(linkName, (key, existing) -> { if (existing != null) { return existing; } - return new UnnamedSessionReceiver(link, messageSerializer, options.isAutoCompleteEnabled(), - options.getMaxAutoLockRenewalDuration(), connectionProcessor.getRetryOptions(), - receiverOptions.getPrefetchCount(), scheduler, this::renewSessionLock); + return new UnnamedSessionReceiver(link, messageSerializer, connectionProcessor.getRetryOptions(), + receiverOptions.getPrefetchCount(), disposeOnIdle, scheduler, + receiverOptions.autoLockRenewalEnabled(), receiverOptions.getMaxAutoLockRenewalDuration(), + this::renewSessionLock); }))) .flatMapMany(session -> session.receive().doFinally(signalType -> { logger.verbose("Adding scheduler back to pool."); @@ -312,12 +305,6 @@ private Mono getManagementNode() { * @param request Number of unnamed active sessions to emit. */ private void onSessionRequest(long request) { - if (receiveAsyncOptions == null) { - sessionReceiveSink.error(new IllegalStateException( - "Cannot create receiver when there are no receive options set.")); - return; - } - if (isDisposed.get()) { logger.info("Session manager is disposed. Not emitting more unnamed sessions."); return; @@ -337,7 +324,7 @@ private void onSessionRequest(long request) { return; } - Flux session = getSession(receiveAsyncOptions, scheduler); + Flux session = getSession(scheduler, true); sessionReceiveSink.next(session); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/UnnamedSessionReceiver.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/UnnamedSessionReceiver.java index ea45e6bfd6eb..5166b252999c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/UnnamedSessionReceiver.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/UnnamedSessionReceiver.java @@ -41,6 +41,7 @@ class UnnamedSessionReceiver implements AutoCloseable { private final AtomicReference sessionId = new AtomicReference<>(); private final ClientLogger logger = new ClientLogger(UnnamedSessionReceiver.class); private final ServiceBusReceiveLink receiveLink; + private final boolean enableSessionLockRenewal; private final Duration maxSessionLockRenewDuration; private final Function> renewSessionLock; private final Disposable.Composite subscriptions; @@ -49,10 +50,27 @@ class UnnamedSessionReceiver implements AutoCloseable { private final DirectProcessor messageReceivedEmitter = DirectProcessor.create(); private final FluxSink messageReceivedSink = messageReceivedEmitter.sink(FluxSink.OverflowStrategy.BUFFER); + /** + * Creates a receiver for the first available session. + * + * @param receiveLink Service Bus receive link for available session. + * @param messageSerializer Serializes and deserializes messages from Service Bus. + * @param retryOptions Retry options for the receiver. + * @param prefetch Number of messages to prefetch from session. + * @param disposeOnIdle true to dispose the session receiver if there are no more messages and the receiver is + * idle. + * @param scheduler The scheduler to publish messages on. + * @param maxSessionLockRenewDuration Maximum time to renew the session lock for. {@code null} or {@link + * Duration#ZERO} to disable session lock renewal. + * @param renewSessionLock Function to renew the session lock. + */ UnnamedSessionReceiver(ServiceBusReceiveLink receiveLink, MessageSerializer messageSerializer, - boolean isAutoComplete, Duration maxSessionLockRenewDuration, AmqpRetryOptions retryOptions, int prefetch, - Scheduler scheduler, Function> renewSessionLock) { + AmqpRetryOptions retryOptions, int prefetch, boolean disposeOnIdle, Scheduler scheduler, + boolean enableSessionLockRenewal, Duration maxSessionLockRenewDuration, + Function> renewSessionLock) { + this.receiveLink = receiveLink; + this.enableSessionLockRenewal = enableSessionLockRenewal; this.maxSessionLockRenewDuration = maxSessionLockRenewDuration; this.renewSessionLock = renewSessionLock; this.lockContainer = new MessageLockContainer(ServiceBusConstants.OPERATION_TIMEOUT); @@ -66,13 +84,13 @@ class UnnamedSessionReceiver implements AutoCloseable { final Flux receivedMessagesFlux = receiveLink .receive() .publishOn(scheduler) - .doOnSubscribe(subscription -> { + .doOnSubscribe(subscription -> { logger.verbose("Adding prefetch to receive link."); receiveLink.addCredits(prefetch); }) .takeUntilOther(cancelReceiveProcessor) .map(message -> messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)) - .subscribeWith(new ServiceBusMessageProcessor(receiveLink.getLinkName(), isAutoComplete, false, + .subscribeWith(new ServiceBusMessageProcessor(receiveLink.getLinkName(), false, false, Duration.ZERO, retryOptions, errorContext, messageManagement)) .map(message -> { if (!CoreUtils.isNullOrEmpty(message.getLockToken())) { @@ -98,16 +116,26 @@ class UnnamedSessionReceiver implements AutoCloseable { this.receivedMessages = Flux.concat(receivedMessagesFlux, cancelReceiveProcessor); this.subscriptions = Disposables.composite(); - this.subscriptions.add(Flux.switchOnNext(messageReceivedEmitter - .flatMap(lockToken -> Mono.delay(retryOptions.getTryTimeout())) - .handle((l, sink) -> { - logger.info("entityPath[{}]. sessionId[{}]. Did not a receive message within timeout {}.", - receiveLink.getEntityPath(), sessionId.get(), retryOptions.getTryTimeout()); - cancelReceiveProcessor.onComplete(); - sink.complete(); - })) - .subscribe()); - this.subscriptions.add(receiveLink.getSessionId().subscribe(id -> sessionId.set(id))); + + // Creates a subscription that disposes/closes the receiver when there are no more messages in the session and + // receiver is idle. + if (disposeOnIdle) { + this.subscriptions.add(Flux.switchOnNext(messageReceivedEmitter + .flatMap(lockToken -> Mono.delay(retryOptions.getTryTimeout())) + .handle((l, sink) -> { + logger.info("entityPath[{}]. sessionId[{}]. Did not a receive message within timeout {}.", + receiveLink.getEntityPath(), sessionId.get(), retryOptions.getTryTimeout()); + cancelReceiveProcessor.onComplete(); + sink.complete(); + })) + .subscribe()); + } + + this.subscriptions.add(receiveLink.getSessionId().subscribe(id -> { + if (!sessionId.compareAndSet(null, id)) { + logger.warning("Another method set sessionId. Existing: {}. Returned: {}.", sessionId.get(), id); + } + })); this.subscriptions.add(receiveLink.getSessionLockedUntil().subscribe(lockedUntil -> { if (!sessionLockedUntil.compareAndSet(null, lockedUntil)) { logger.info("SessionLockedUntil was already set: {}", sessionLockedUntil); @@ -121,8 +149,9 @@ class UnnamedSessionReceiver implements AutoCloseable { * Gets whether or not the receiver contains the lock token. * * @param lockToken Lock token for the message. + * * @return {@code true} if the session receiver contains the lock token to the unsettled delivery; {@code false} - * otherwise. + * otherwise. * @throws NullPointerException if {@code lockToken} is null. * @throws IllegalArgumentException if {@code lockToken} is empty. */ @@ -153,6 +182,11 @@ Flux receive() { return receivedMessages; } + /** + * Updates the session lock time. + * + * @param lockedUntil Gets the time when the session is locked until. + */ void setSessionLockedUntil(Instant lockedUntil) { sessionLockedUntil.set(lockedUntil); } @@ -192,25 +226,15 @@ private Disposable getRenewLockOperation(Instant initialLockedUntil) { // Adjust the interval, so we can buffer time for the time it'll take to refresh. sink.next(MessageUtils.adjustServerTimeout(initialInterval)); - // TODO (conniey): Do we need to stop processing lock renewal after a max duration?? - // if (maxSessionLockRenewDuration == null || maxSessionLockRenewDuration.isZero()) { - // return Disposables.disposed(); - // } - // - // final Disposable timeoutOperation = Mono.delay(maxSessionLockRenewDuration) - // .subscribe(l -> { - // if (!sink.isCancelled()) { - // sink.error(new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, - // "Could not complete session processing within renewal time. Max session renewal time: " - // + maxSessionLockRenewDuration, - // new LinkErrorContext(receiveLink.getHostname(), receiveLink.getEntityPath(), null, null))); - // } - // }); - // - // return Disposables.composite(renewLockSubscription, timeoutOperation); - - return Flux.switchOnNext(emitterProcessor.map(i -> Flux.interval(i))) - .takeUntilOther(cancelReceiveProcessor) + final Flux cancellationSignals; + if (enableSessionLockRenewal) { + cancellationSignals = Flux.first(cancelReceiveProcessor, Mono.delay(maxSessionLockRenewDuration)); + } else { + cancellationSignals = Flux.first(cancelReceiveProcessor); + } + + return Flux.switchOnNext(emitterProcessor.map(Flux::interval)) + .takeUntilOther(cancellationSignals) .flatMap(delay -> { final String id = sessionId.get(); @@ -228,14 +252,14 @@ private Disposable getRenewLockOperation(Instant initialLockedUntil) { sink.next(MessageUtils.adjustServerTimeout(next)); return instant; }) - .subscribe( - lockedUntil -> logger.verbose("lockToken[{}]. lockedUntil[{}]. Lock renewal successful.", sessionId, - lockedUntil), - error -> { + .subscribe(lockedUntil -> { + logger.verbose("lockToken[{}]. lockedUntil[{}]. Lock renewal successful.", sessionId, + lockedUntil); + sessionLockedUntil.set(lockedUntil); + }, error -> { logger.error("Error occurred while renewing lock token.", error); cancelReceiveProcessor.onNext(new ServiceBusReceivedMessageContext(sessionId.get(), error)); - }, - () -> { + }, () -> { logger.verbose("Renewing session lock task completed."); cancelReceiveProcessor.onComplete(); }); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ReceiveAsyncOptions.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ReceiveAsyncOptions.java deleted file mode 100644 index 32b65f7bf672..000000000000 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/models/ReceiveAsyncOptions.java +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.messaging.servicebus.models; - -import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient; - -import java.time.Duration; - -/** - * Options set when receiving using {@link ServiceBusReceiverAsyncClient}. - */ -public class ReceiveAsyncOptions { - private boolean enableAutoComplete; - private Duration maxAutoRenewDuration; - - /** - * Gets whether the message should be automatically completed when consumers are finished processing the message. - * - * @return {@code true} to automatically complete the message; {@code false} otherwise. - */ - public boolean isAutoCompleteEnabled() { - return enableAutoComplete; - } - - /** - * Sets whether the message should be automatically completed when consumers are finished processing the message. - * - * @param isAutoCompleteEnabled {@code true} to automatically complete the message; {@code false} otherwise. - * - * @return The updated {@link ReceiveAsyncOptions} object. - */ - public ReceiveAsyncOptions setIsAutoCompleteEnabled(boolean isAutoCompleteEnabled) { - this.enableAutoComplete = isAutoCompleteEnabled; - return this; - } - - /** - * Gets the amount of time to continue auto-renewing the message lock. - * - * @return the amount of time to continue auto-renewing the message lock. {@link Duration#ZERO} or {@code null} - * indicates that auto-renewal is disabled. - */ - public Duration getMaxAutoLockRenewalDuration() { - return maxAutoRenewDuration; - } - - /** - * Sets the amount of time to continue auto-renewing the message lock. Setting {@link Duration#ZERO} or {@code null} - * disables auto-renewal. - * - * @param maxAutoLockRenewalDuration the amount of time to continue auto-renewing the message lock. {@link - * Duration#ZERO} or {@code null} indicates that auto-renewal is disabled. - * - * @return The updated {@link ReceiveAsyncOptions} object. - */ - public ReceiveAsyncOptions setMaxAutoLockRenewalDuration(Duration maxAutoLockRenewalDuration) { - this.maxAutoRenewDuration = maxAutoLockRenewalDuration; - return this; - } -} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/PeekMessageAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/PeekMessageAsyncSample.java index 456142f1ecc5..140908d131a0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/PeekMessageAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/PeekMessageAsyncSample.java @@ -32,7 +32,7 @@ public static void main(String[] args) throws InterruptedException { .queueName("<>") .buildAsyncClient(); - receiver.peek().subscribe( + receiver.browse().subscribe( message -> { System.out.println("Received Message Id: " + message.getMessageId()); System.out.println("Received Message: " + new String(message.getBody())); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAndSettleAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAndSettleAsyncSample.java index 21d363a26bfc..09e836866d2e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAndSettleAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAndSettleAsyncSample.java @@ -3,18 +3,16 @@ package com.azure.messaging.servicebus; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; import reactor.core.Disposable; -import java.time.Duration; import java.util.concurrent.TimeUnit; /** * Sample demonstrates how to receive an {@link ServiceBusReceivedMessage} from an Azure Service Bus Queue and settle * it. Settling of message include {@link ServiceBusReceiverAsyncClient#complete(MessageLockToken) complete()}, {@link - * ServiceBusReceiverAsyncClient#defer(MessageLockToken) defer()}, {@link ServiceBusReceiverAsyncClient#abandon(MessageLockToken) - * abandon}, or {@link ServiceBusReceiverAsyncClient#deadLetter(MessageLockToken) dead-letter} a message. + * ServiceBusReceiverAsyncClient#defer(MessageLockToken) defer()}, + * {@link ServiceBusReceiverAsyncClient#abandon(MessageLockToken) abandon}, or + * {@link ServiceBusReceiverAsyncClient#deadLetter(MessageLockToken) dead-letter} a message. */ public class ReceiveMessageAndSettleAsyncSample { @@ -41,19 +39,10 @@ public static void main(String[] args) throws InterruptedException { ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString(connectionString) .receiver() - .receiveMode(ReceiveMode.PEEK_LOCK) .queueName("<>") .buildAsyncClient(); - // At most, the receiver will automatically renew the message lock until 120 seconds have elapsed. - // By default, after messages are processed, they are completed (ie. removed from the queue/topic). Setting - // enableAutoComplete to false, means the onus is on users to complete, abandon, defer, or dead-letter the - // message when they are finished with it. - final ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setIsAutoCompleteEnabled(false) - .setMaxAutoLockRenewalDuration(Duration.ofSeconds(120)); - - Disposable subscription = receiver.receive(options) + Disposable subscription = receiver.receive() .flatMap(context -> { boolean messageProcessed = false; // Process the context and its message here. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAsyncSample.java index 3fb571973270..bf7b9598a5e9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAsyncSample.java @@ -5,6 +5,7 @@ import reactor.core.Disposable; +import java.time.Instant; import java.util.concurrent.TimeUnit; /** @@ -32,20 +33,30 @@ public static void main(String[] args) throws InterruptedException { // "<>" will look similar to "{your-namespace}.servicebus.windows.net" // "<>" will be the name of the Service Bus queue instance you created // inside the Service Bus namespace. - ServiceBusReceiverAsyncClient receiverAsyncClient = new ServiceBusClientBuilder() + ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() .connectionString(connectionString) .receiver() .queueName("<>") .buildAsyncClient(); - Disposable subscription = receiverAsyncClient.receive() - .subscribe(context -> { + Disposable subscription = receiver.receive() + .flatMap(context -> { ServiceBusReceivedMessage message = context.getMessage(); - System.out.println("Received Message Id:" + message.getMessageId()); - System.out.println("Received Message:" + new String(message.getBody())); - // By default, the message will be auto completed. - }, + // process message + System.out.println("Received Message Id: " + message.getMessageId()); + System.out.println("Received Message: " + new String(message.getBody())); + + boolean isSuccessfullyProcessed = processMessage(message); + + // When we are finished processing the message, then complete or abandon it. + if (isSuccessfullyProcessed) { + return receiver.complete(message).thenReturn("Completed: " + message.getMessageId()); + } else { + return receiver.abandon(message).thenReturn("Abandoned: " + message.getMessageId()); + } + }) + .subscribe(message -> System.out.printf("Processed at %s. %s%n", Instant.now(), message), error -> System.err.println("Error occurred while receiving message: " + error), () -> System.out.println("Receiving complete.")); @@ -57,6 +68,10 @@ public static void main(String[] args) throws InterruptedException { subscription.dispose(); // Close the receiver. - receiverAsyncClient.close(); + receiver.close(); + } + + private static boolean processMessage(ServiceBusReceivedMessage message) { + return true; } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAutoLockRenewal.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAutoLockRenewal.java new file mode 100644 index 000000000000..62d6f362c092 --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAutoLockRenewal.java @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import reactor.core.Disposable; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** + * Demonstrates how to enable automatic lock renewal for a message when receiving from Service Bus. + */ +public class ReceiveMessageAutoLockRenewal { + /** + * Main method to invoke this demo on how to receive an {@link ServiceBusReceivedMessage} from Service Bus and + * automatically renew the message lock. + * + * @param args Unused arguments to the program. + * + * @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete. + */ + public static void main(String[] args) throws InterruptedException { + + // The connection string value can be obtained by: + // 1. Going to your Service Bus namespace in Azure Portal. + // 2. Go to "Shared access policies" + // 3. Copy the connection string for the "RootManageSharedAccessKey" policy. + String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + + "SharedAccessKey={key}"; + + // Create a receiver. + // "<>" will look similar to "{your-namespace}.servicebus.windows.net" + // "<>" will be the name of the Service Bus queue instance you created + // inside the Service Bus namespace. + // At most, the receiver will automatically renew the message lock until 120 seconds have elapsed. + ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + .connectionString(connectionString) + .receiver() + .maxAutoLockRenewalDuration(Duration.ofSeconds(120)) + .queueName("<>") + .buildAsyncClient(); + + Disposable subscription = receiver.receive() + .flatMap(context -> { + boolean messageProcessed = false; + // Process the context and its message here. + // Change the `messageProcessed` according to you business logic and if you are able to process the + // message successfully. + if (messageProcessed) { + return receiver.complete(context.getMessage()); + } else { + return receiver.abandon(context.getMessage()); + } + }).subscribe(); + + // Subscribe is not a blocking call so we sleep here so the program does not end. + TimeUnit.SECONDS.sleep(60); + + // Disposing of the subscription will cancel the receive() operation. + subscription.dispose(); + + // Close the receiver. + receiver.close(); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAzureIdentityAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAzureIdentityAsyncSample.java index 21369c99db18..fe544d25aadc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAzureIdentityAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageAzureIdentityAsyncSample.java @@ -19,6 +19,7 @@ public class ReceiveMessageAzureIdentityAsyncSample { * Service Bus Subscription for a Topic. * * @param args Unused arguments to the program. + * * @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete. */ public static void main(String[] args) throws InterruptedException { @@ -47,12 +48,15 @@ public static void main(String[] args) throws InterruptedException { .buildAsyncClient(); Disposable subscription = receiverAsyncClient.receive() - .subscribe(context -> { + .flatMap(context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.println("Received Message Id:" + message.getMessageId()); System.out.println("Received Message:" + new String(message.getBody())); - }, + + return receiverAsyncClient.complete(message); + }) + .subscribe(aVoid -> System.out.println("Processed message."), error -> System.err.println("Error occurred while receiving message: " + error), () -> System.out.println("Receiving complete.")); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java index a23782c0fafc..0d689124f05c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMessageSyncSample.java @@ -29,25 +29,29 @@ public static void main(String[] args) { // "<>" will look similar to "{your-namespace}.servicebus.windows.net" // "<>" will be the name of the Service Bus queue instance you created // inside the Service Bus namespace. - ServiceBusReceiverClient receiverClient = new ServiceBusClientBuilder() + ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() .connectionString(connectionString) .receiver() .queueName("<>") .buildClient(); - final IterableStream receivedMessages = - receiverClient.receive(5); + // Try to receive a set of messages from Service Bus 10 times. A batch of messages are returned when 5 messages + // are received, or the operation timeout has elapsed, whichever occurs first. + for (int i = 0; i < 10; i++) { + final IterableStream receivedMessages = + receiver.receive(5); - receivedMessages.stream().forEach(context -> { - ServiceBusReceivedMessage message = context.getMessage(); + receivedMessages.stream().forEach(context -> { + ServiceBusReceivedMessage message = context.getMessage(); - System.out.println("Received Message Id: " + message.getMessageId()); - System.out.println("Received Message: " + new String(message.getBody())); + System.out.println("Received Message Id: " + message.getMessageId()); + System.out.println("Received Message: " + new String(message.getBody())); - receiverClient.complete(message); - }); + receiver.complete(message); + }); + } // Close the receiver. - receiverClient.close(); + receiver.close(); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java index 184805b6311e..1cbc442b62a1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveMultipleSessionsAsyncSample.java @@ -3,8 +3,6 @@ package com.azure.messaging.servicebus; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; -import com.azure.messaging.servicebus.models.ReceiveMode; import reactor.core.Disposable; import reactor.core.publisher.Mono; @@ -42,17 +40,10 @@ public static void main(String[] args) throws InterruptedException { .connectionString(connectionString) .sessionReceiver() .maxConcurrentSessions(3) - .receiveMode(ReceiveMode.PEEK_LOCK) .queueName("<>") .buildAsyncClient(); - // By default, after messages are processed, they are completed (ie. removed from the queue/topic). Setting - // enableAutoComplete to true will tell the processor to complete or abandon the message depending on whether or - // not processing the message results in an exception. - ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setIsAutoCompleteEnabled(false); - - Disposable subscription = receiver.receive(options) + Disposable subscription = receiver.receive() .flatMap(context -> { if (context.hasError()) { System.out.printf("An error occurred in session %s. Error: %s%n", diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java index 6b33ecbd6f95..1defafd86f1b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java @@ -5,6 +5,7 @@ import com.azure.messaging.servicebus.models.ReceiveMode; import reactor.core.Disposable; +import reactor.core.publisher.Mono; import java.util.concurrent.TimeUnit; @@ -41,20 +42,20 @@ public static void main(String[] args) throws InterruptedException { .buildAsyncClient(); Disposable subscription = receiver.receive() - .subscribe(context -> { + .flatMap(context -> { if (context.hasError()) { System.out.printf("An error occurred in session %s. Error: %s%n", context.getSessionId(), context.getThrowable()); - return; + return Mono.empty(); } System.out.println("Processing message from session: " + context.getSessionId()); - // Process message - // The message is automatically completed if no exceptions are thrown while processing message. - }, error -> { - System.err.println("Error occurred: " + error); - }); + // Process message then complete it. + return receiver.complete(context.getMessage()); + }) + .subscribe(aVoid -> { + }, error -> System.err.println("Error occurred: " + error)); // Subscribe is not a blocking call so we sleep here so the program does not end. TimeUnit.SECONDS.sleep(60); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionSample.java new file mode 100644 index 000000000000..84818bc226ec --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionSample.java @@ -0,0 +1,74 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.core.util.IterableStream; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Demonstrates how to receive messages from a named session using {@link ServiceBusReceiverClient}. + * The sample below runs for 2 minutes. In those two minutes, it will poll Service Bus for batches of messages. + */ +public class ReceiveNamedSessionSample { + + /** + * Main method to invoke this demo on how to receive messages from a session with id "greetings" in an Azure Service + * Bus Queue. + * + * @param args Unused arguments to the program. + */ + public static void main(String[] args) { + final AtomicBoolean isRunning = new AtomicBoolean(true); + + Mono.delay(Duration.ofMinutes(2)).subscribe(index -> { + System.out.println("2 minutes has elapsed, stopping receive loop."); + isRunning.set(false); + }); + + // The connection string value can be obtained by: + // 1. Going to your Service Bus namespace in Azure Portal. + // 2. Go to "Shared access policies" + // 3. Copy the connection string for the "RootManageSharedAccessKey" policy. + String connectionString = "Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + + "SharedAccessKey={key}"; + + // Create a receiver. + // "<>" will be the name of the Service Bus session-enabled queue instance you created inside the + // Service Bus namespace. + ServiceBusReceiverClient receiver = new ServiceBusClientBuilder() + .connectionString(connectionString) + .sessionReceiver() + .sessionId("greetings") + .queueName("<>") + .buildClient(); + + while (isRunning.get()) { + IterableStream messages = receiver.receive(10, Duration.ofSeconds(30)); + + for (ServiceBusReceivedMessageContext context : messages) { + System.out.println("Processing message from session: " + context.getSessionId()); + + ServiceBusReceivedMessage message = context.getMessage(); + boolean isSuccessfullyProcessed = processMessage(message); + + if (isSuccessfullyProcessed) { + receiver.complete(message); + } else { + receiver.abandon(message); + } + } + } + + // Close the receiver. + receiver.close(); + } + + private static boolean processMessage(ServiceBusReceivedMessage message) { + System.out.println("Processing message: " + message.getMessageId()); + return true; + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java index 6b1ec92583d6..23f95a2b3768 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveSingleSessionAsyncSample.java @@ -5,6 +5,7 @@ import com.azure.messaging.servicebus.models.ReceiveMode; import reactor.core.Disposable; +import reactor.core.publisher.Mono; import java.util.concurrent.TimeUnit; @@ -39,23 +40,20 @@ public static void main(String[] args) throws InterruptedException { .subscriptionName("<>") .buildAsyncClient(); - // Messages are continuously received until there are no more messages in the current session. Then, the Flux - // completes. Disposable subscription = receiver.receive() - .subscribe(context -> { + .flatMap(context -> { if (context.hasError()) { System.out.printf("An error occurred in session %s. Error: %s%n", context.getSessionId(), context.getThrowable()); - return; + return Mono.empty(); } System.out.println("Processing message from session: " + context.getSessionId()); // Process message - // The message is automatically completed if no exceptions are thrown while processing message. - }, error -> { - System.err.println("Error occurred: " + error); - }); + return receiver.complete(context.getMessage()); + }).subscribe(aVoid -> { + }, error -> System.err.println("Error occurred: " + error)); // Subscribe is not a blocking call so we sleep here so the program does not end. TimeUnit.SECONDS.sleep(60); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java index 0b519fca2e84..a8309a21209e 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/SendAndReceiveSessionMessageSample.java @@ -22,6 +22,7 @@ public class SendAndReceiveSessionMessageSample { * session-enabled Azure Service Bus queue. * * @param args Unused arguments to the program. + * * @throws InterruptedException If the program is unable to sleep while waiting for the operations to complete. */ public static void main(String[] args) throws InterruptedException { @@ -81,13 +82,15 @@ public static void main(String[] args) throws InterruptedException { () -> System.out.println("Batch send complete.")); // After sending that message, we receive the messages for that sessionId. - receiver.receive().subscribe(context -> { + receiver.receive().flatMap(context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.println("Received Message Id: " + message.getMessageId()); System.out.println("Received Message Session Id: " + message.getSessionId()); System.out.println("Received Message: " + new String(message.getBody())); - }); + + return receiver.complete(message); + }).subscribe(); // subscribe() is not a blocking call. We sleep here so the program does not end before the send is complete. TimeUnit.SECONDS.sleep(10); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java index 65fe2b3fbe77..3484f6163d4f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientJavaDocCodeSamples.java @@ -123,15 +123,69 @@ public void receiveAll() { .buildAsyncClient(); // BEGIN: com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#all - Disposable subscription = receiver.receive().subscribe(context -> { + Disposable subscription = receiver.receive().flatMap(context -> { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Received message id: %s%n", message.getMessageId()); System.out.printf("Contents of message as string: %s%n", new String(message.getBody(), UTF_8)); - }); + return receiver.complete(message); + }).subscribe(aVoid -> System.out.println("Processed message."), + error -> System.out.println("Error occurred: " + error)); // When program ends, or you're done receiving all messages. receiver.close(); subscription.dispose(); // END: com.azure.messaging.servicebus.servicebusasyncreceiverclient.receive#all } + + /** + * Demonstrates how to create a session receiver for a single, first available session. + */ + public void sessionReceiverSingleInstantiation() { + // BEGIN: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#singlesession + ServiceBusReceiverAsyncClient consumer = new ServiceBusClientBuilder() + .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + + "SharedAccessKey={key};EntityPath={eh-name}") + .sessionReceiver() + .queueName("<< QUEUE NAME >>") + .buildAsyncClient(); + // END: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#singlesession + + consumer.close(); + } + + /** + * Demonstrates how to create a session receiver for a specific session. + */ + public void sessionReceiverNamedInstantiation() { + // BEGIN: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId + ServiceBusReceiverAsyncClient consumer = new ServiceBusClientBuilder() + .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + + "SharedAccessKey={key};EntityPath={eh-name}") + .sessionReceiver() + .topicName("<< TOPIC NAME >>") + .subscriptionName("<< SUBSCRIPTION NAME >>") + .sessionId("<< my-session-id >>") + .buildAsyncClient(); + // END: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#sessionId + + consumer.close(); + } + + /** + * Demonstrates how to create a session receiver for processing multiple sessions. + */ + public void sessionReceiverMultipleInstantiation() { + // BEGIN: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#multiplesessions + ServiceBusReceiverAsyncClient consumer = new ServiceBusClientBuilder() + .connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};" + + "SharedAccessKey={key};EntityPath={eh-name}") + .sessionReceiver() + .topicName("<< TOPIC NAME >>") + .subscriptionName("<< SUBSCRIPTION NAME >>") + .maxConcurrentSessions(3) + .buildAsyncClient(); + // END: com.azure.messaging.servicebus.servicebusasyncreceiverclient.instantiation#multiplesessions + + consumer.close(); + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java index d3f9918d0d83..da2410597d69 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java @@ -231,7 +231,8 @@ protected ServiceBusClientBuilder getBuilder(boolean useCredentials) { } } - protected ServiceBusSenderClientBuilder getSenderBuilder(boolean useCredentials, MessagingEntityType entityType, boolean isSessionAware) { + protected ServiceBusSenderClientBuilder getSenderBuilder(boolean useCredentials, MessagingEntityType entityType, + boolean isSessionAware) { switch (entityType) { case QUEUE: @@ -246,48 +247,46 @@ protected ServiceBusSenderClientBuilder getSenderBuilder(boolean useCredentials, assertNotNull(topicName, "'topicName' cannot be null."); assertNotNull(subscriptionName, "'subscriptionName' cannot be null."); - return getBuilder(useCredentials).sender() - .topicName(topicName); + return getBuilder(useCredentials).sender().topicName(topicName); default: throw logger.logExceptionAsError(new IllegalArgumentException("Unknown entity type: " + entityType)); } } protected ServiceBusReceiverClientBuilder getReceiverBuilder(boolean useCredentials, - MessagingEntityType entityType) { + MessagingEntityType entityType, + Function onBuilderCreate) { + + final ServiceBusClientBuilder builder = onBuilderCreate.apply(getBuilder(useCredentials)); + switch (entityType) { case QUEUE: final String queueName = getQueueName(); assertNotNull(queueName, "'queueName' cannot be null."); - return getBuilder(useCredentials).receiver() - .queueName(queueName); + return builder.receiver().queueName(queueName); case SUBSCRIPTION: final String topicName = getTopicName(); final String subscriptionName = getSubscriptionName(); assertNotNull(topicName, "'topicName' cannot be null."); assertNotNull(subscriptionName, "'subscriptionName' cannot be null."); - return getBuilder(useCredentials).receiver() - .topicName(topicName).subscriptionName(subscriptionName); + return builder.receiver().topicName(topicName).subscriptionName(subscriptionName); default: throw logger.logExceptionAsError(new IllegalArgumentException("Unknown entity type: " + entityType)); } } - protected ServiceBusSessionReceiverClientBuilder getSessionReceiverBuilder( - boolean useCredentials, MessagingEntityType entityType, - Function onBuilderCreate, - Function onReceiverCreate) { + protected ServiceBusSessionReceiverClientBuilder getSessionReceiverBuilder(boolean useCredentials, + MessagingEntityType entityType, Function onBuilderCreate) { switch (entityType) { case QUEUE: final String queueName = getSessionQueueName(); assertNotNull(queueName, "'queueName' cannot be null."); - final ServiceBusSessionReceiverClientBuilder builder = onBuilderCreate.apply(getBuilder(useCredentials)) + return onBuilderCreate.apply(getBuilder(useCredentials)) .sessionReceiver() .queueName(queueName); - return onReceiverCreate.apply(builder); case SUBSCRIPTION: final String topicName = getTopicName(); @@ -295,10 +294,9 @@ protected ServiceBusSessionReceiverClientBuilder getSessionReceiverBuilder( assertNotNull(topicName, "'topicName' cannot be null."); assertNotNull(subscriptionName, "'subscriptionName' cannot be null."); - final ServiceBusSessionReceiverClientBuilder builder2 = onBuilderCreate.apply(getBuilder(useCredentials)) + return onBuilderCreate.apply(getBuilder(useCredentials)) .sessionReceiver() .topicName(topicName).subscriptionName(subscriptionName); - return onReceiverCreate.apply(builder2); default: throw logger.logExceptionAsError(new IllegalArgumentException("Unknown entity type: " + entityType)); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java index 22053b5b43d0..11ab4a1e73e1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ProxyReceiveTest.java @@ -7,7 +7,6 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.jproxy.ProxyServer; import com.azure.messaging.servicebus.jproxy.SimpleProxy; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; import com.azure.messaging.servicebus.models.ReceiveMode; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -98,9 +97,6 @@ public void receiveMessage() { .queueName(queueName) .buildAsyncClient(); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setIsAutoCompleteEnabled(false); - // Act & Assert try { StepVerifier.create(sender.createBatch() @@ -113,7 +109,7 @@ public void receiveMessage() { })) .verifyComplete(); - StepVerifier.create(receiver.receive(options).take(NUMBER_OF_EVENTS)) + StepVerifier.create(receiver.receive().take(NUMBER_OF_EVENTS)) .expectNextCount(NUMBER_OF_EVENTS) .expectComplete() .verify(TIMEOUT); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java index f9daccffd2d8..b060a8d73156 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusMessageTest.java @@ -7,6 +7,7 @@ import org.junit.jupiter.api.Test; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertThrows; public class ServiceBusMessageTest { @@ -14,13 +15,33 @@ public class ServiceBusMessageTest { private static final String PAYLOAD = new String(new char[10000]).replace("\0", "a"); private static final byte[] PAYLOAD_BYTES = PAYLOAD.getBytes(UTF_8); + /** + * Verify UTF_8 encoded body is created. + */ + @Test + void bodyAsString() { + // Arrange + String body = "some-contents"; + byte[] encoded = body.getBytes(UTF_8); + + // Act + ServiceBusMessage message = new ServiceBusMessage(body); + + // Assert + assertArrayEquals(encoded, message.getBody()); + } + + /** + * Verify that expected exceptions are thrown. + */ @Test - public void byteArrayNotNull() { + void bodyNotNull() { + assertThrows(NullPointerException.class, () -> new ServiceBusMessage((String) null)); assertThrows(NullPointerException.class, () -> new ServiceBusMessage((byte[]) null)); } @Test - public void messagePropertiesShouldNotBeNull() { + void messagePropertiesShouldNotBeNull() { // Act final ServiceBusMessage serviceBusMessageData = new ServiceBusMessage(PAYLOAD_BYTES); @@ -34,7 +55,7 @@ public void messagePropertiesShouldNotBeNull() { * Verify that we can create an Message with an empty byte array. */ @Test - public void canCreateWithEmptyArray() { + void canCreateWithEmptyArray() { // Arrange byte[] byteArray = new byte[0]; @@ -51,7 +72,7 @@ public void canCreateWithEmptyArray() { * Verify that we can create an Message with the correct body contents. */ @Test - public void canCreateWithBytePayload() { + void canCreateWithBytePayload() { // Act final ServiceBusMessage serviceBusMessageData = new ServiceBusMessage(PAYLOAD_BYTES); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java index a8917f312a67..88cea0bb2f20 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java @@ -6,8 +6,8 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.DispositionStatus; import com.azure.messaging.servicebus.implementation.MessagingEntityType; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; import com.azure.messaging.servicebus.models.ReceiveMode; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import reactor.core.publisher.Mono; @@ -33,6 +33,7 @@ /** * Integration tests for {@link ServiceBusReceiverAsyncClient} from queues or subscriptions. */ +@Tag("integration") class ServiceBusReceiverAsyncClientIntegrationTest extends IntegrationTestBase { private final ClientLogger logger = new ClientLogger(ServiceBusReceiverAsyncClientIntegrationTest.class); private final AtomicInteger messagesPending = new AtomicInteger(); @@ -68,7 +69,7 @@ protected void afterTest() { if (isSessionEnabled) { logger.info("Sessioned receiver. It is probably locked until some time."); } else { - receiveAndDeleteReceiver.receive(new ReceiveAsyncOptions().setIsAutoCompleteEnabled(false)) + receiveAndDeleteReceiver.receive() .take(pending) .map(message -> { logger.info("Message received: {}", message.getMessage().getSequenceNumber()); @@ -147,7 +148,7 @@ void peekMessage(MessagingEntityType entityType, boolean isSessionEnabled) { sendMessage(message).block(TIMEOUT); // Assert & Act - StepVerifier.create(receiver.peek()) + StepVerifier.create(receiver.browse()) .assertNext(receivedMessage -> assertMessageEquals(receivedMessage, messageId, isSessionEnabled)) .verifyComplete(); } @@ -164,12 +165,11 @@ void sendScheduledMessageAndReceive(MessagingEntityType entityType, boolean isSe final String messageId = UUID.randomUUID().toString(); final ServiceBusMessage message = getMessage(messageId, isSessionEnabled); final Instant scheduledEnqueueTime = Instant.now().plusSeconds(2); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions().setIsAutoCompleteEnabled(false); sender.scheduleMessage(message, scheduledEnqueueTime).block(TIMEOUT); // Assert & Act - StepVerifier.create(Mono.delay(Duration.ofSeconds(3)).then(receiveAndDeleteReceiver.receive(options).next())) + StepVerifier.create(Mono.delay(Duration.ofSeconds(3)).then(receiveAndDeleteReceiver.receive().next())) .assertNext(receivedMessage -> { assertMessageEquals(receivedMessage, messageId, isSessionEnabled); messagesPending.decrementAndGet(); @@ -224,15 +224,14 @@ void peekFromSequenceNumberMessage(MessagingEntityType entityType, boolean isSes sendMessage(message).block(TIMEOUT); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions().setIsAutoCompleteEnabled(false); - final ServiceBusReceivedMessageContext receivedContext = receiver.receive(options).next().block(TIMEOUT); + final ServiceBusReceivedMessageContext receivedContext = receiver.receive().next().block(TIMEOUT); assertNotNull(receivedContext); final ServiceBusReceivedMessage receivedMessage = receivedContext.getMessage(); assertNotNull(receivedMessage); // Assert & Act - StepVerifier.create(receiver.peekAt(receivedMessage.getSequenceNumber())) + StepVerifier.create(receiver.browseAt(receivedMessage.getSequenceNumber())) .assertNext(m -> { assertEquals(receivedMessage.getSequenceNumber(), m.getSequenceNumber()); assertMessageEquals(m, messageId, isSessionEnabled); @@ -266,20 +265,20 @@ void peekBatchMessages(MessagingEntityType entityType, boolean isSessionEnabled) sendMessage(messages).block(TIMEOUT); // Assert & Act - StepVerifier.create(receiver.peekBatch(3)) + StepVerifier.create(receiver.browseBatch(3)) .assertNext(message -> checkCorrectMessage.accept(message, 0)) .assertNext(message -> checkCorrectMessage.accept(message, 1)) .assertNext(message -> checkCorrectMessage.accept(message, 2)) .verifyComplete(); - StepVerifier.create(receiver.peekBatch(4)) + StepVerifier.create(receiver.browseBatch(4)) .assertNext(message -> checkCorrectMessage.accept(message, 3)) .assertNext(message -> checkCorrectMessage.accept(message, 4)) .assertNext(message -> checkCorrectMessage.accept(message, 5)) .assertNext(message -> checkCorrectMessage.accept(message, 6)) .verifyComplete(); - StepVerifier.create(receiver.peek()) + StepVerifier.create(receiver.browse()) .assertNext(message -> checkCorrectMessage.accept(message, 7)) .verifyComplete(); } @@ -301,7 +300,7 @@ void peekBatchMessagesFromSequence(MessagingEntityType entityType) { Mono.when(sendMessage(message), sendMessage(message)).block(TIMEOUT); // Assert & Act - StepVerifier.create(receiver.peekBatchAt(maxMessages, fromSequenceNumber)) + StepVerifier.create(receiver.browseBatchAt(maxMessages, fromSequenceNumber)) .expectNextCount(maxMessages) .verifyComplete(); } @@ -320,8 +319,7 @@ void deadLetterMessage(MessagingEntityType entityType, boolean isSessionEnabled) sendMessage(message).block(TIMEOUT); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions().setIsAutoCompleteEnabled(false); - final ServiceBusReceivedMessageContext receivedContext = receiver.receive(options).next().block(TIMEOUT); + final ServiceBusReceivedMessageContext receivedContext = receiver.receive().next().block(TIMEOUT); assertNotNull(receivedContext); final ServiceBusReceivedMessage receivedMessage = receivedContext.getMessage(); @@ -345,8 +343,7 @@ void receiveAndComplete(MessagingEntityType entityType, boolean isSessionEnabled sendMessage(message).block(TIMEOUT); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions().setIsAutoCompleteEnabled(false); - final ServiceBusReceivedMessageContext receivedContext = receiver.receive(options).next().block(TIMEOUT); + final ServiceBusReceivedMessageContext receivedContext = receiver.receive().next().block(TIMEOUT); assertNotNull(receivedContext); final ServiceBusReceivedMessage receivedMessage = receivedContext.getMessage(); @@ -370,13 +367,11 @@ void receiveAndRenewLock(MessagingEntityType entityType) { final String messageId = UUID.randomUUID().toString(); final ServiceBusMessage message = getMessage(messageId, false); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setIsAutoCompleteEnabled(false); // Blocking here because it is not part of the scenario we want to test. sendMessage(message).block(TIMEOUT); - final ServiceBusReceivedMessageContext receivedContext = receiver.receive(options).next().block(TIMEOUT); + final ServiceBusReceivedMessageContext receivedContext = receiver.receive().next().block(TIMEOUT); assertNotNull(receivedContext); final ServiceBusReceivedMessage receivedMessage = receivedContext.getMessage(); @@ -414,19 +409,19 @@ void receiveAndRenewLock(MessagingEntityType entityType) { @ParameterizedTest void autoRenewLockOnReceiveMessage(MessagingEntityType entityType, boolean isSessionEnabled) { // Arrange +// setSenderAndReceiver(entityType, isSessionEnabled, +// builder -> builder.maxAutoLockRenewalDuration(Duration.ofSeconds(120))); + setSenderAndReceiver(entityType, isSessionEnabled); final String messageId = UUID.randomUUID().toString(); final ServiceBusMessage message = getMessage(messageId, isSessionEnabled); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setIsAutoCompleteEnabled(false) - .setMaxAutoLockRenewalDuration(Duration.ofSeconds(120)); // Send the message to verify. sendMessage(message).block(TIMEOUT); // Act & Assert - StepVerifier.create(receiver.receive(options).map(ServiceBusReceivedMessageContext::getMessage)) + StepVerifier.create(receiver.receive().map(ServiceBusReceivedMessageContext::getMessage)) .assertNext(received -> { assertNotNull(received.getLockedUntil()); assertNotNull(received.getLockToken()); @@ -474,12 +469,10 @@ void receiveAndAbandon(MessagingEntityType entityType, boolean isSessionEnabled) final String messageId = UUID.randomUUID().toString(); final ServiceBusMessage message = getMessage(messageId, isSessionEnabled); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setIsAutoCompleteEnabled(false); sendMessage(message).block(TIMEOUT); - final ServiceBusReceivedMessageContext receivedContext = receiver.receive(options).next().block(TIMEOUT); + final ServiceBusReceivedMessageContext receivedContext = receiver.receive().next().block(TIMEOUT); assertNotNull(receivedContext); final ServiceBusReceivedMessage receivedMessage = receivedContext.getMessage(); @@ -502,8 +495,7 @@ void receiveAndDefer(MessagingEntityType entityType, boolean isSessionEnabled) { sendMessage(message).block(TIMEOUT); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions().setIsAutoCompleteEnabled(false); - final ServiceBusReceivedMessageContext receivedContext = receiver.receive(options).next().block(TIMEOUT); + final ServiceBusReceivedMessageContext receivedContext = receiver.receive().next().block(TIMEOUT); assertNotNull(receivedContext); final ServiceBusReceivedMessage receivedMessage = receivedContext.getMessage(); @@ -528,8 +520,7 @@ void receiveDeferredMessageBySequenceNumber(MessagingEntityType entityType, Disp final ServiceBusMessage message = getMessage(messageId, false); sendMessage(message).block(TIMEOUT); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions().setIsAutoCompleteEnabled(false); - final ServiceBusReceivedMessageContext receivedContext = receiver.receive(options).next().block(TIMEOUT); + final ServiceBusReceivedMessageContext receivedContext = receiver.receive().next().block(TIMEOUT); assertNotNull(receivedContext); final ServiceBusReceivedMessage receivedMessage = receivedContext.getMessage(); @@ -578,7 +569,6 @@ void sendReceiveMessageWithVariousPropertyTypes(MessagingEntityType entityType, final String messageId = UUID.randomUUID().toString(); final ServiceBusMessage messageToSend = getMessage(messageId, isSessionEnabled); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions().setIsAutoCompleteEnabled(false); Map sentProperties = messageToSend.getProperties(); sentProperties.put("NullProperty", null); @@ -596,7 +586,7 @@ void sendReceiveMessageWithVariousPropertyTypes(MessagingEntityType entityType, sendMessage(messageToSend).block(TIMEOUT); // Assert & Act - StepVerifier.create(receiveAndDeleteReceiver.receive(options)) + StepVerifier.create(receiveAndDeleteReceiver.receive()) .assertNext(receivedMessage -> { messagesPending.decrementAndGet(); assertMessageEquals(receivedMessage, messageId, isSessionEnabled); @@ -657,21 +647,30 @@ void setAndGetSessionState(MessagingEntityType entityType) { * Sets the sender and receiver. If session is enabled, then a single-named session receiver is created. */ private void setSenderAndReceiver(MessagingEntityType entityType, boolean isSessionEnabled) { + setSenderAndReceiver(entityType, isSessionEnabled, null); + } + + private void setSenderAndReceiver(MessagingEntityType entityType, boolean isSessionEnabled, + Duration autoLockRenewal) { this.isSessionEnabled = isSessionEnabled; this.sender = getSenderBuilder(false, entityType, isSessionEnabled).buildAsyncClient(); if (isSessionEnabled) { assertNotNull(sessionId, "'sessionId' should have been set."); - this.receiver = getSessionReceiverBuilder(false, entityType, - Function.identity(), - builder -> builder.sessionId(sessionId)).buildAsyncClient(); - this.receiveAndDeleteReceiver = getSessionReceiverBuilder(false, entityType, - Function.identity(), - builder -> builder.sessionId(sessionId).receiveMode(ReceiveMode.RECEIVE_AND_DELETE)) + this.receiver = getSessionReceiverBuilder(false, entityType, Function.identity()) + .sessionId(sessionId) + .maxAutoLockRenewalDuration(autoLockRenewal) + .buildAsyncClient(); + this.receiveAndDeleteReceiver = getSessionReceiverBuilder(false, entityType, Function.identity()) + .sessionId(sessionId) + .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) .buildAsyncClient(); } else { - this.receiver = getReceiverBuilder(false, entityType).buildAsyncClient(); - this.receiveAndDeleteReceiver = getReceiverBuilder(false, entityType) + this.receiver = getReceiverBuilder(false, entityType, Function.identity()) + .maxAutoLockRenewalDuration(autoLockRenewal) + .buildAsyncClient(); + this.receiveAndDeleteReceiver = getReceiverBuilder(false, entityType, Function.identity()) + .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) .buildAsyncClient(); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index 274f280ed5c8..01299f23b5da 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -13,6 +13,7 @@ import com.azure.core.amqp.implementation.TracerProvider; import com.azure.core.credential.TokenCredential; import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder; import com.azure.messaging.servicebus.implementation.DispositionStatus; import com.azure.messaging.servicebus.implementation.MessageWithLockToken; import com.azure.messaging.servicebus.implementation.MessagingEntityType; @@ -21,9 +22,7 @@ import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; import com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver; import com.azure.messaging.servicebus.models.DeadLetterOptions; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; import com.azure.messaging.servicebus.models.ReceiveMode; -import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType; import org.apache.qpid.proton.message.Message; @@ -56,7 +55,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -73,7 +71,6 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; class ServiceBusReceiverAsyncClientTest { @@ -99,6 +96,7 @@ class ServiceBusReceiverAsyncClientTest { private ServiceBusConnectionProcessor connectionProcessor; private ServiceBusReceiverAsyncClient receiver; private ServiceBusReceiverAsyncClient sessionReceiver; + private Duration maxAutoLockRenewalDuration; @Mock private ServiceBusReactorReceiver amqpReceiveLink; @@ -163,11 +161,11 @@ CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP, new AmqpRe connectionOptions.getRetry())); receiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH), connectionProcessor, CLEANUP_INTERVAL, + new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, maxAutoLockRenewalDuration), connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); sessionReceiver = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, - new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, "Some-Session", false, null), + new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH, maxAutoLockRenewalDuration, "Some-Session", false, null), connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); } @@ -195,12 +193,12 @@ void peekTwoMessages() { .thenReturn(Mono.just(receivedMessage), Mono.just(receivedMessage2)); // Act & Assert - StepVerifier.create(receiver.peek()) + StepVerifier.create(receiver.browse()) .expectNext(receivedMessage) .verifyComplete(); // Act & Assert - StepVerifier.create(receiver.peek()) + StepVerifier.create(receiver.browse()) .expectNext(receivedMessage2) .verifyComplete(); @@ -226,7 +224,7 @@ void peekWithSequenceOneMessage() { when(managementNode.peek(fromSequenceNumber, null, null)).thenReturn(Mono.just(receivedMessage)); // Act & Assert - StepVerifier.create(receiver.peekAt(fromSequenceNumber)) + StepVerifier.create(receiver.browseAt(fromSequenceNumber)) .expectNext(receivedMessage) .verifyComplete(); } @@ -240,9 +238,6 @@ void receivesNumberOfEvents() { // Arrange final int numberOfEvents = 1; final List messages = getMessages(10); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setMaxAutoLockRenewalDuration(Duration.ZERO) - .setIsAutoCompleteEnabled(false); ServiceBusReceivedMessage receivedMessage = mock(ServiceBusReceivedMessage.class); when(receivedMessage.getLockToken()).thenReturn(UUID.randomUUID().toString()); @@ -250,7 +245,7 @@ void receivesNumberOfEvents() { .thenReturn(receivedMessage); // Act & Assert - StepVerifier.create(receiver.receive(options).take(numberOfEvents)) + StepVerifier.create(receiver.receive().take(numberOfEvents)) .then(() -> messages.forEach(m -> messageSink.next(m))) .expectNextCount(numberOfEvents) .verifyComplete(); @@ -258,104 +253,6 @@ void receivesNumberOfEvents() { verify(amqpReceiveLink).addCredits(PREFETCH); } - /** - * Verifies that we can receive messages from the processor. - */ - @Test - void receivesAndAutoCompletes() throws InterruptedException { - // Arrange - final ReceiverOptions options = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH); - final ServiceBusReceiverAsyncClient consumer2 = new ServiceBusReceiverAsyncClient( - NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, options, connectionProcessor, CLEANUP_INTERVAL, - tracerProvider, messageSerializer, onClientClose); - - final UUID lockToken1 = UUID.randomUUID(); - final UUID lockToken2 = UUID.randomUUID(); - final Instant expiration = Instant.now().plus(Duration.ofMinutes(1)); - - final MessageWithLockToken message = mock(MessageWithLockToken.class); - final MessageWithLockToken message2 = mock(MessageWithLockToken.class); - - when(message.getLockToken()).thenReturn(lockToken1); - when(message2.getLockToken()).thenReturn(lockToken2); - - when(messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage); - when(messageSerializer.deserialize(message2, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage2); - - when(receivedMessage.getLockToken()).thenReturn(lockToken1.toString()); - when(receivedMessage.getLockedUntil()).thenReturn(expiration); - when(receivedMessage2.getLockToken()).thenReturn(lockToken2.toString()); - when(receivedMessage2.getLockedUntil()).thenReturn(expiration); - - when(connection.getManagementNode(ENTITY_PATH, ENTITY_TYPE)) - .thenReturn(Mono.just(managementNode)); - - when(amqpReceiveLink.updateDisposition(eq(lockToken1.toString()), argThat(e -> e.getType() == DeliveryStateType.Accepted))).thenReturn(Mono.empty()); - when(amqpReceiveLink.updateDisposition(eq(lockToken2.toString()), argThat(e -> e.getType() == DeliveryStateType.Accepted))).thenReturn(Mono.empty()); - - // Act and Assert - StepVerifier.create(consumer2.receive().take(2)) - .then(() -> { - messageSink.next(message); - messageSink.next(message2); - }) - .assertNext(context -> Assertions.assertEquals(receivedMessage, context.getMessage())) - .assertNext(context -> Assertions.assertEquals(receivedMessage2, context.getMessage())) - .verifyComplete(); - - TimeUnit.SECONDS.sleep(2); - - logger.info("Verifying assertions."); - verify(amqpReceiveLink).updateDisposition(eq(lockToken1.toString()), any(Accepted.class)); - verify(amqpReceiveLink).updateDisposition(eq(lockToken2.toString()), any(Accepted.class)); - } - - /** - * Verifies that if there is no lock token, and auto-complete is requested. It errors. - */ - @Test - void receivesAndAutoCompleteWithoutLockTokenErrors() { - // Arrange - final ReceiverOptions options = new ReceiverOptions(ReceiveMode.PEEK_LOCK, PREFETCH); - final ServiceBusReceiverAsyncClient consumer2 = new ServiceBusReceiverAsyncClient( - NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, options, connectionProcessor, CLEANUP_INTERVAL, - tracerProvider, messageSerializer, onClientClose); - - final MessageWithLockToken message = mock(MessageWithLockToken.class); - final MessageWithLockToken message2 = mock(MessageWithLockToken.class); - - when(messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage); - when(messageSerializer.deserialize(message2, ServiceBusReceivedMessage.class)).thenReturn(receivedMessage2); - - final Instant lockedUntil = Instant.now().plusSeconds(30); - final Instant lockedUntil2 = Instant.now().plusSeconds(30); - - when(receivedMessage.getLockToken()).thenReturn(null); - when(receivedMessage.getLockedUntil()).thenReturn(lockedUntil); - - when(receivedMessage2.getLockToken()).thenReturn(UUID.randomUUID().toString()); - when(receivedMessage2.getLockedUntil()).thenReturn(lockedUntil2); - - when(connection.getManagementNode(ENTITY_PATH, ENTITY_TYPE)) - .thenReturn(Mono.just(managementNode)); - - when(managementNode.updateDisposition(any(), eq(DispositionStatus.COMPLETED), isNull(), isNull(), isNull(), - isNull(), isNull())) - .thenReturn(Mono.delay(Duration.ofMillis(250)).then()); - - // Act and Assert - try { - StepVerifier.create(consumer2.receive().take(2)) - .then(() -> messageSink.next(message)) - .expectError(IllegalStateException.class) - .verify(); - } finally { - consumer2.close(); - } - - verifyZeroInteractions(managementNode); - } - /** * Verifies that we error if we try to complete a message without a lock token. */ @@ -390,7 +287,7 @@ void completeNullMessage() { */ @Test void completeInReceiveAndDeleteMode() { - final ReceiverOptions options = new ReceiverOptions(ReceiveMode.RECEIVE_AND_DELETE, PREFETCH); + final ReceiverOptions options = new ReceiverOptions(ReceiveMode.RECEIVE_AND_DELETE, PREFETCH, maxAutoLockRenewalDuration); ServiceBusReceiverAsyncClient client = new ServiceBusReceiverAsyncClient(NAMESPACE, ENTITY_PATH, MessagingEntityType.QUEUE, options, connectionProcessor, CLEANUP_INTERVAL, tracerProvider, messageSerializer, onClientClose); @@ -420,7 +317,7 @@ void peekBatchMessages() { .thenReturn(Flux.fromArray(new ServiceBusReceivedMessage[]{receivedMessage, receivedMessage2})); // Act & Assert - StepVerifier.create(receiver.peekBatch(numberOfEvents)) + StepVerifier.create(receiver.browseBatch(numberOfEvents)) .expectNextCount(numberOfEvents) .verifyComplete(); } @@ -438,7 +335,7 @@ void peekBatchWithSequenceNumberMessages() { .thenReturn(Flux.fromArray(new ServiceBusReceivedMessage[]{receivedMessage, receivedMessage2})); // Act & Assert - StepVerifier.create(receiver.peekBatchAt(numberOfEvents, fromSequenceNumber)) + StepVerifier.create(receiver.browseBatchAt(numberOfEvents, fromSequenceNumber)) .expectNext(receivedMessage, receivedMessage2) .verifyComplete(); } @@ -626,23 +523,15 @@ void callsClientCloseOnce() { @Test void receiveIllegalOptions() { // Arrange - final ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder() + ServiceBusReceiverClientBuilder builder = new ServiceBusClientBuilder() .connectionString(NAMESPACE_CONNECTION_STRING) .receiver() .topicName("baz").subscriptionName("bar") - .receiveMode(ReceiveMode.PEEK_LOCK) - .buildAsyncClient(); - final ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setMaxAutoLockRenewalDuration(Duration.ofSeconds(-1)); + .maxAutoLockRenewalDuration(Duration.ofSeconds(-1)) + .receiveMode(ReceiveMode.PEEK_LOCK); // Act & Assert - StepVerifier.create(receiver.receive(options)) - .expectError(IllegalArgumentException.class) - .verify(); - - StepVerifier.create(receiver.receive(null)) - .expectError(NullPointerException.class) - .verify(); + Assertions.assertThrows(IllegalArgumentException.class, () -> builder.buildAsyncClient()); } @Test diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java index e69d9f2794c4..b8e834a24ec2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientIntegrationTest.java @@ -9,6 +9,7 @@ import com.azure.messaging.servicebus.implementation.MessagingEntityType; import com.azure.messaging.servicebus.models.ReceiveMode; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -32,6 +33,7 @@ /** * Integration tests for {@link ServiceBusReceiverClient} from queues or subscriptions. */ +@Tag("integration") class ServiceBusReceiverClientIntegrationTest extends IntegrationTestBase { /* Sometime not all the messages are cleaned-up. This is buffer to ensure all the messages are cleaned-up.*/ @@ -235,7 +237,7 @@ void peekMessage(MessagingEntityType entityType, boolean isSessionEnabled) { sendMessage(message); // Act - ServiceBusReceivedMessage receivedMessage = receiver.peek(); + ServiceBusReceivedMessage receivedMessage = receiver.browse(); // Assert assertMessageEquals(receivedMessage, messageId, isSessionEnabled); @@ -265,7 +267,7 @@ void peekFromSequenceNumberMessage(MessagingEntityType entityType, boolean isSes assertNotNull(receivedMessage); // Act - ServiceBusReceivedMessage receivedPeekMessage = receiver.peekAt(receivedMessage.getSequenceNumber()); + ServiceBusReceivedMessage receivedPeekMessage = receiver.browseAt(receivedMessage.getSequenceNumber()); // Assert assertEquals(receivedMessage.getSequenceNumber(), receivedPeekMessage.getSequenceNumber()); @@ -289,7 +291,7 @@ void peekBatchMessages(MessagingEntityType entityType, boolean isSessionEnabled) sendMessage(message); // Act - IterableStream iterableMessages = receiver.peekBatch(maxMessages); + IterableStream iterableMessages = receiver.browseBatch(maxMessages); // Assert Assertions.assertEquals(maxMessages, (int) iterableMessages.stream().count()); @@ -313,7 +315,7 @@ void peekBatchMessagesFromSequence(MessagingEntityType entityType) { sendMessage(message); // Act - IterableStream iterableMessages = receiver.peekBatchAt(maxMessages, fromSequenceNumber); + IterableStream iterableMessages = receiver.browseBatchAt(maxMessages, fromSequenceNumber); // Assert final List asList = iterableMessages.stream().collect(Collectors.toList()); @@ -580,26 +582,34 @@ void sendReceiveMessageWithVariousPropertyTypes(MessagingEntityType entityType, } } + /** * Sets the sender and receiver. If session is enabled, then a single-named session receiver is created. */ private void setSenderAndReceiver(MessagingEntityType entityType, boolean isSessionEnabled) { - sender = getSenderBuilder(false, entityType, isSessionEnabled).buildClient(); + setSenderAndReceiver(entityType, isSessionEnabled, null); + } + + private void setSenderAndReceiver(MessagingEntityType entityType, boolean isSessionEnabled, + Duration autoLockRenewal) { + this.sender = getSenderBuilder(false, entityType, isSessionEnabled).buildClient(); if (isSessionEnabled) { assertNotNull(sessionId, "'sessionId' should have been set."); - - receiver = getSessionReceiverBuilder(false, entityType, - Function.identity(), - builder -> builder.sessionId(sessionId)).buildClient(); - receiveAndDeleteReceiver = getSessionReceiverBuilder(false, entityType, - Function.identity(), - builder -> builder.sessionId(sessionId).receiveMode(ReceiveMode.RECEIVE_AND_DELETE)) + this.receiver = getSessionReceiverBuilder(false, entityType, Function.identity()) + .sessionId(sessionId) + .maxAutoLockRenewalDuration(autoLockRenewal) + .buildClient(); + this.receiveAndDeleteReceiver = getSessionReceiverBuilder(false, entityType, Function.identity()) + .sessionId(sessionId) + .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) .buildClient(); } else { - receiver = getReceiverBuilder(false, entityType).buildClient(); - receiveAndDeleteReceiver = getReceiverBuilder(false, entityType). - receiveMode(ReceiveMode.RECEIVE_AND_DELETE) + this.receiver = getReceiverBuilder(false, entityType, Function.identity()) + .maxAutoLockRenewalDuration(autoLockRenewal) + .buildClient(); + this.receiveAndDeleteReceiver = getReceiverBuilder(false, entityType, Function.identity()) + .receiveMode(ReceiveMode.RECEIVE_AND_DELETE) .buildClient(); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java index 6747d6a08cf4..1cd4d30bbf90 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java @@ -6,7 +6,6 @@ import com.azure.core.util.IterableStream; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.models.DeadLetterOptions; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; import com.azure.messaging.servicebus.models.ReceiveMode; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -50,6 +49,8 @@ class ServiceBusReceiverClientTest { private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(5); private final ClientLogger logger = new ClientLogger(ServiceBusReceiverClientTest.class); + + private Duration maxAutoLockRenewalDuration; private ServiceBusReceiverClient client; @Mock @@ -65,7 +66,8 @@ void setup() { when(asyncClient.getEntityPath()).thenReturn(ENTITY_PATH); when(asyncClient.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); - when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1)); + when(asyncClient.getReceiverOptions()).thenReturn(new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, + maxAutoLockRenewalDuration)); when(messageLockToken.getLockToken()).thenReturn(LOCK_TOKEN); @@ -212,10 +214,10 @@ void getSessionStateNull() { void peekMessage() { // Arrange final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); - when(asyncClient.peek()).thenReturn(Mono.just(message)); + when(asyncClient.browse()).thenReturn(Mono.just(message)); // Act - final ServiceBusReceivedMessage actual = client.peek(); + final ServiceBusReceivedMessage actual = client.browse(); // Assert assertEquals(message, actual); @@ -226,10 +228,10 @@ void peekMessageFromSequence() { // Arrange final long sequenceNumber = 154; final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); - when(asyncClient.peekAt(sequenceNumber)).thenReturn(Mono.just(message)); + when(asyncClient.browseAt(sequenceNumber)).thenReturn(Mono.just(message)); // Act - final ServiceBusReceivedMessage actual = client.peekAt(sequenceNumber); + final ServiceBusReceivedMessage actual = client.browseAt(sequenceNumber); // Assert assertEquals(message, actual); @@ -265,10 +267,10 @@ void peekBatchMessagesMax() { } }); }); - when(asyncClient.peekBatch(maxMessages)).thenReturn(messages); + when(asyncClient.browseBatch(maxMessages)).thenReturn(messages); // Act - final IterableStream actual = client.peekBatch(maxMessages); + final IterableStream actual = client.browseBatch(maxMessages); // Assert assertNotNull(actual); @@ -309,10 +311,10 @@ void peekBatchMessagesLessThan() { }); }); - when(asyncClient.peekBatch(maxMessages)).thenReturn(messages); + when(asyncClient.browseBatch(maxMessages)).thenReturn(messages); // Act - final IterableStream actual = client.peekBatch(maxMessages); + final IterableStream actual = client.browseBatch(maxMessages); // Assert assertNotNull(actual); @@ -338,10 +340,10 @@ void peekBatchMessagesMaxSequenceNumber() { sink.complete(); }); }); - when(asyncClient.peekBatchAt(maxMessages, sequenceNumber)).thenReturn(messages); + when(asyncClient.browseBatchAt(maxMessages, sequenceNumber)).thenReturn(messages); // Act - final IterableStream actual = client.peekBatchAt(maxMessages, sequenceNumber); + final IterableStream actual = client.browseBatchAt(maxMessages, sequenceNumber); // Assert assertNotNull(actual); @@ -411,7 +413,7 @@ void receiveMessagesWithUserSpecifiedTimeout() { sink.complete(); }); }); - when(asyncClient.receive(any(ReceiveAsyncOptions.class))).thenReturn(messageSink); + when(asyncClient.receive()).thenReturn(messageSink); // Act final IterableStream actual = client.receive(maxMessages, receiveTimeout); @@ -457,7 +459,7 @@ void receiveMessagesMax() { }); }); - when(asyncClient.receive(any(ReceiveAsyncOptions.class))).thenReturn(messageSink); + when(asyncClient.receive()).thenReturn(messageSink); // Act final IterableStream actual = client.receive(maxMessages); @@ -503,7 +505,7 @@ void receiveMessagesTimeout() { sink.complete(); }); }); - when(asyncClient.receive(any(ReceiveAsyncOptions.class))).thenReturn(messageSink); + when(asyncClient.receive()).thenReturn(messageSink); // Act final IterableStream actual = client.receive(maxMessages); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java index e2b2315e257d..5651efed89a9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientIntegrationTest.java @@ -6,9 +6,9 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.MessagingEntityType; import com.azure.messaging.servicebus.models.CreateBatchOptions; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; import com.azure.messaging.servicebus.models.ReceiveMode; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -24,6 +24,7 @@ /** * Integration tests for {@link ServiceBusSenderAsyncClient} from queues or subscriptions. */ +@Tag("integration") class ServiceBusSenderAsyncClientIntegrationTest extends IntegrationTestBase { private ServiceBusSenderAsyncClient sender; private ServiceBusReceiverAsyncClient receiver; @@ -43,7 +44,7 @@ protected void afterTest() { } try { - receiver.receive(new ReceiveAsyncOptions().setIsAutoCompleteEnabled(false)) + receiver.receive() .take(numberOfMessages) .map(message -> { logger.info("Message received: {}", message.getMessage().getSequenceNumber()); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientIntegrationTest.java index ed31ee6fafb4..09c69f32dfbf 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderClientIntegrationTest.java @@ -6,9 +6,9 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.MessagingEntityType; import com.azure.messaging.servicebus.models.CreateBatchOptions; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; import com.azure.messaging.servicebus.models.ReceiveMode; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -24,6 +24,7 @@ /** * Integration tests for the {@link ServiceBusSenderClient}. */ +@Tag("integration") class ServiceBusSenderClientIntegrationTest extends IntegrationTestBase { private ServiceBusSenderClient sender; private ServiceBusReceiverAsyncClient receiver; @@ -38,7 +39,7 @@ protected void afterTest() { dispose(sender); try { - receiver.receive(new ReceiveAsyncOptions().setIsAutoCompleteEnabled(false)) + receiver.receive() .take(messagesPending.get()) .map(message -> { logger.info("Message received: {}", message.getMessage().getSequenceNumber()); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/UnnamedSessionManagerIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/UnnamedSessionManagerIntegrationTest.java index cd30868eda9c..db904b724360 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/UnnamedSessionManagerIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/UnnamedSessionManagerIntegrationTest.java @@ -8,7 +8,7 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder; import com.azure.messaging.servicebus.implementation.MessagingEntityType; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -37,6 +37,7 @@ /** * Integration tests for {@link UnnamedSessionManager}. */ +@Tag("integration") class UnnamedSessionManagerIntegrationTest extends IntegrationTestBase { private final AtomicInteger messagesPending = new AtomicInteger(); @@ -66,11 +67,8 @@ void singleUnnamedSession(MessagingEntityType entityType) { final String sessionId = "singleUnnamedSession-" + Instant.now().toString(); final String contents = "hello world"; final int numberToSend = 5; - final ReceiveAsyncOptions receiveOptions = new ReceiveAsyncOptions() - .setMaxAutoLockRenewalDuration(Duration.ofMinutes(2)) - .setIsAutoCompleteEnabled(true); - setSenderAndReceiver(entityType, TIMEOUT, Function.identity()); + setSenderAndReceiver(entityType, TIMEOUT, builder -> builder.maxAutoLockRenewalDuration(Duration.ofMinutes(2))); final Disposable subscription = Flux.interval(Duration.ofMillis(500)) .take(numberToSend) @@ -85,7 +83,7 @@ void singleUnnamedSession(MessagingEntityType entityType) { // Act & Assert try { - StepVerifier.create(receiver.receive(receiveOptions)) + StepVerifier.create(receiver.receive()) .assertNext(context -> assertMessageEquals(sessionId, messageId, contents, context)) .assertNext(context -> assertMessageEquals(sessionId, messageId, contents, context)) .assertNext(context -> assertMessageEquals(sessionId, messageId, contents, context)) @@ -120,12 +118,9 @@ void multipleSessions() { final int maxMessages = numberToSend * sessionIds.size(); final int maxConcurrency = 2; final Set set = new HashSet<>(); - final ReceiveAsyncOptions receiveOptions = new ReceiveAsyncOptions() - .setMaxAutoLockRenewalDuration(Duration.ofMinutes(2)) - .setIsAutoCompleteEnabled(true); setSenderAndReceiver(MessagingEntityType.SUBSCRIPTION, Duration.ofSeconds(20), - builder -> builder.maxConcurrentSessions(maxConcurrency)); + builder -> builder.maxConcurrentSessions(maxConcurrency).maxAutoLockRenewalDuration(Duration.ofMinutes(2))); final Disposable subscription = Flux.interval(Duration.ofMillis(500)) .take(maxMessages) @@ -143,7 +138,7 @@ void multipleSessions() { // Act & Assert try { - StepVerifier.create(receiver.receive(receiveOptions)) + StepVerifier.create(receiver.receive()) .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency, messageId, contents, context)) .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency, messageId, contents, context)) .assertNext(context -> assertFromSession(sessionIds, set, maxConcurrency, messageId, contents, context)) @@ -185,10 +180,10 @@ private void setSenderAndReceiver(MessagingEntityType entityType, Duration opera Function onBuild) { this.sender = getSenderBuilder(false, entityType, true).buildAsyncClient(); - this.receiver = getSessionReceiverBuilder(false, entityType, - builder -> builder.retryOptions(new AmqpRetryOptions().setTryTimeout(operationTimeout)), - builder -> onBuild.apply(builder)) - .buildAsyncClient(); + ServiceBusSessionReceiverClientBuilder sessionBuilder = getSessionReceiverBuilder(false, entityType, + builder -> builder.retryOptions(new AmqpRetryOptions().setTryTimeout(operationTimeout))); + + this.receiver = onBuild.apply(sessionBuilder).buildAsyncClient(); } private static void assertMessageEquals(String sessionId, String messageId, String contents, diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/UnnamedSessionManagerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/UnnamedSessionManagerTest.java index 9d888dcbd421..fd206b444401 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/UnnamedSessionManagerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/UnnamedSessionManagerTest.java @@ -18,7 +18,6 @@ import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor; import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink; -import com.azure.messaging.servicebus.models.ReceiveAsyncOptions; import com.azure.messaging.servicebus.models.ReceiveMode; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.message.Message; @@ -133,12 +132,12 @@ void afterEach(TestInfo testInfo) { @Test void receiveNull() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, null, true, 5); + ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, Duration.ZERO, null, true, 5); sessionManager = new UnnamedSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, TIMEOUT, tracerProvider, messageSerializer, receiverOptions); // Act & Assert - StepVerifier.create(sessionManager.receive(null)) + StepVerifier.create(sessionManager.receive()) .expectError(NullPointerException.class) .verify(); } @@ -149,13 +148,11 @@ void receiveNull() { @Test void singleUnnamedSession() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, null, false, null); + ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, Duration.ofSeconds(20), + null, false, null); sessionManager = new UnnamedSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, TIMEOUT, tracerProvider, messageSerializer, receiverOptions); - ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setMaxAutoLockRenewalDuration(Duration.ofSeconds(20)) - .setIsAutoCompleteEnabled(true); final String sessionId = "session-1"; final String lockToken = "a-lock-token"; final String linkName = "my-link-name"; @@ -182,7 +179,7 @@ void singleUnnamedSession() { when(managementNode.renewSessionLock(sessionId, linkName)).thenReturn(Mono.empty()); // Act & Assert - StepVerifier.create(sessionManager.receive(options)) + StepVerifier.create(sessionManager.receive()) .then(() -> { for (int i = 0; i < numberOfMessages; i++) { messageSink.next(message); @@ -203,14 +200,12 @@ void singleUnnamedSession() { @Test void multipleSessions() { // Arrange - ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, null, false, null); + ReceiverOptions receiverOptions = new ReceiverOptions(ReceiveMode.PEEK_LOCK, 1, Duration.ofSeconds(20), + null, false, null); sessionManager = new UnnamedSessionManager(ENTITY_PATH, ENTITY_TYPE, connectionProcessor, TIMEOUT, tracerProvider, messageSerializer, receiverOptions); final int numberOfMessages = 5; - final ReceiveAsyncOptions options = new ReceiveAsyncOptions() - .setMaxAutoLockRenewalDuration(Duration.ofSeconds(20)) - .setIsAutoCompleteEnabled(true); final String sessionId = "session-1"; final String lockToken = "a-lock-token"; final String linkName = "my-link-name"; @@ -236,7 +231,7 @@ void multipleSessions() { when(managementNode.renewSessionLock(sessionId, linkName)).thenReturn(Mono.empty()); // Act & Assert - StepVerifier.create(sessionManager.receive(options)) + StepVerifier.create(sessionManager.receive()) .then(() -> { for (int i = 0; i < numberOfMessages; i++) { messageSink.next(message);