Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/servicebus/azure-messaging-servicebus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ Unlike non-session-enabled queues or subscriptions, only a single receiver can r
receiver fetches a session, Service Bus locks the session for that receiver, and it has exclusive access to messages in
that session.

#### Send message to a session
#### Send a message to a session

Create a `ServiceBusSenderClient` for a session enabled queue or topic subscription. Setting `.setSessionId(String)` on
a `ServiceBusMessage` will publish the message to that session. If the session does not exist, it is created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import com.azure.messaging.servicebus.models.ReceiveMode;

import java.time.Duration;

/**
* Options set when creating a service bus receiver.
*/
Expand All @@ -15,26 +17,39 @@ class ReceiverOptions {
private final boolean isRollingSessionReceiver;
private final Integer maxConcurrentSessions;
private final boolean isSessionReceiver;
private final Duration maxAutoLockRenewalDuration;

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount) {
ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxAutoLockRenewalDuration) {
this.receiveMode = receiveMode;
this.prefetchCount = prefetchCount;
this.maxAutoLockRenewalDuration = maxAutoLockRenewalDuration;
this.sessionId = null;
this.isRollingSessionReceiver = false;
this.maxConcurrentSessions = null;
this.isSessionReceiver = false;
}

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, String sessionId,
boolean isRollingSessionReceiver, Integer maxConcurrentSessions) {
ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxAutoLockRenewalDuration,
String sessionId, boolean isRollingSessionReceiver, Integer maxConcurrentSessions) {
this.receiveMode = receiveMode;
this.prefetchCount = prefetchCount;
this.maxAutoLockRenewalDuration = maxAutoLockRenewalDuration;
this.sessionId = sessionId;
this.isRollingSessionReceiver = isRollingSessionReceiver;
this.maxConcurrentSessions = maxConcurrentSessions;
this.isSessionReceiver = true;
}

/**
* Gets whether or not auto-lock renewal is enabled. If the receiver is a session aware receiver, it renews the lock
* for the entire session; otherwise, renews the lock for each message.
*
* @return true if it renews the session or message lock; false otherwise.
*/
boolean autoLockRenewalEnabled() {
return maxAutoLockRenewalDuration != null && maxAutoLockRenewalDuration != Duration.ZERO;
}

/**
* Gets the receive mode for the message.
*
Expand Down Expand Up @@ -62,6 +77,11 @@ int getPrefetchCount() {
return prefetchCount;
}

/**
* Gets whether or not the receiver is a session-aware receiver.
*
* @return true if it is a session-aware receiver; false otherwise.
*/
boolean isSessionReceiver() {
return isSessionReceiver;
}
Expand All @@ -84,4 +104,14 @@ public boolean isRollingSessionReceiver() {
public Integer getMaxConcurrentSessions() {
return maxConcurrentSessions;
}

/**
* Gets the maximum Duration to renew the message or session lock.
*
* @return The maximum Duration to renew the message or session lock; {@code null} or {@link Duration#ZERO}
* if auto-lock renewal is disabled.
*/
public Duration getMaxAutoLockRenewalDuration() {
return maxAutoLockRenewalDuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.net.InetSocketAddress;
import java.net.Proxy;
import java.time.Duration;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -443,6 +444,9 @@ private static String getEntityPath(ClientLogger logger, MessagingEntityType ent
/**
* Builder for creating {@link ServiceBusSenderClient} and {@link ServiceBusSenderAsyncClient} to publish messages
* to Service Bus.
*
* @see ServiceBusSenderAsyncClient
* @see ServiceBusSenderClient
*/
@ServiceClientBuilder(serviceClients = {ServiceBusSenderClient.class, ServiceBusSenderAsyncClient.class})
public final class ServiceBusSenderClientBuilder {
Expand Down Expand Up @@ -530,7 +534,10 @@ public ServiceBusSenderClient buildClient() {

/**
* Builder for creating {@link ServiceBusReceiverClient} and {@link ServiceBusReceiverAsyncClient} to consume
* messages from a session aware Service Bus entity.
* messages from a <b>session aware</b> Service Bus entity.
*
* @see ServiceBusReceiverAsyncClient
* @see ServiceBusReceiverClient
*/
@ServiceClientBuilder(serviceClients = {ServiceBusReceiverClient.class, ServiceBusReceiverAsyncClient.class})
public final class ServiceBusSessionReceiverClientBuilder {
Expand All @@ -542,16 +549,29 @@ public final class ServiceBusSessionReceiverClientBuilder {
private String sessionId;
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewalDuration;

private ServiceBusSessionReceiverClientBuilder() {
}

/**
* Enables auto-lock renewal by renewing each session lock until the {@code maxAutoLockRenewalDuration} has
* elapsed.
*
* @param maxAutoLockRenewalDuration Maximum amount of time to renew the session lock.
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
*/
public ServiceBusSessionReceiverClientBuilder maxAutoLockRenewalDuration(Duration maxAutoLockRenewalDuration) {
this.maxAutoLockRenewalDuration = maxAutoLockRenewalDuration;
return this;
}

/**
* Enables session processing roll-over by processing at most {@code maxConcurrentSessions}.
*
* @param maxConcurrentSessions Maximum number of concurrent sessions to process at any given time.
*
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
* @throws IllegalArgumentException if {@code maxConcurrentSessions} is less than 1.
*/
public ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSessions) {
Expand All @@ -574,7 +594,7 @@ public ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcu
*
* @param prefetchCount The prefetch count.
*
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
*/
public ServiceBusSessionReceiverClientBuilder prefetchCount(int prefetchCount) {
this.prefetchCount = prefetchCount;
Expand All @@ -586,7 +606,7 @@ public ServiceBusSessionReceiverClientBuilder prefetchCount(int prefetchCount) {
*
* @param queueName Name of the queue.
*
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
*/
public ServiceBusSessionReceiverClientBuilder queueName(String queueName) {
this.queueName = queueName;
Expand All @@ -598,7 +618,7 @@ public ServiceBusSessionReceiverClientBuilder queueName(String queueName) {
*
* @param receiveMode Mode for receiving messages.
*
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
*/
public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMode) {
this.receiveMode = receiveMode;
Expand All @@ -610,7 +630,7 @@ public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMod
*
* @param sessionId session id.
*
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
*/
public ServiceBusSessionReceiverClientBuilder sessionId(String sessionId) {
this.sessionId = sessionId;
Expand All @@ -623,7 +643,7 @@ public ServiceBusSessionReceiverClientBuilder sessionId(String sessionId) {
*
* @param subscriptionName Name of the subscription.
*
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
* @see #topicName A topic name should be set as well.
*/
public ServiceBusSessionReceiverClientBuilder subscriptionName(String subscriptionName) {
Expand All @@ -636,7 +656,7 @@ public ServiceBusSessionReceiverClientBuilder subscriptionName(String subscripti
*
* @param topicName Name of the topic.
*
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
* @see #subscriptionName A subscription name should be set as well.
*/
public ServiceBusSessionReceiverClientBuilder topicName(String topicName) {
Expand Down Expand Up @@ -665,11 +685,14 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() {
if (prefetchCount < 1) {
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
"prefetchCount (%s) cannot be less than 1.", prefetchCount)));
} else if (maxAutoLockRenewalDuration != null && maxAutoLockRenewalDuration.isNegative()) {
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
"maxAutoLockRenewalDuration (%s) cannot be negative.", maxAutoLockRenewalDuration)));
}

final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount, sessionId,
isRollingSessionReceiver(), maxConcurrentSessions);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewalDuration, sessionId, isRollingSessionReceiver(), maxConcurrentSessions);

if (CoreUtils.isNullOrEmpty(sessionId)) {
final UnnamedSessionManager sessionManager = new UnnamedSessionManager(entityPath, entityType,
Expand Down Expand Up @@ -726,6 +749,9 @@ private boolean isRollingSessionReceiver() {
/**
* Builder for creating {@link ServiceBusReceiverClient} and {@link ServiceBusReceiverAsyncClient} to consume
* messages from Service Bus.
*
* @see ServiceBusReceiverAsyncClient
* @see ServiceBusReceiverClient
*/
@ServiceClientBuilder(serviceClients = {ServiceBusReceiverClient.class, ServiceBusReceiverAsyncClient.class})
public final class ServiceBusReceiverClientBuilder {
Expand All @@ -734,10 +760,23 @@ public final class ServiceBusReceiverClientBuilder {
private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK;
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewalDuration;

private ServiceBusReceiverClientBuilder() {
}

/**
* Enables auto-lock renewal by renewing each message lock renewal until the {@code maxAutoLockRenewalDuration}
* has elapsed.
*
* @param maxAutoLockRenewalDuration Maximum amount of time to renew the session lock.
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
*/
public ServiceBusReceiverClientBuilder maxAutoLockRenewalDuration(Duration maxAutoLockRenewalDuration) {
this.maxAutoLockRenewalDuration = maxAutoLockRenewalDuration;
return this;
}

/**
* Sets the prefetch count of the receiver. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link
* ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
Expand Down Expand Up @@ -827,10 +866,14 @@ public ServiceBusReceiverAsyncClient buildAsyncClient() {
if (prefetchCount < 1) {
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
"prefetchCount (%s) cannot be less than 1.", prefetchCount)));
} else if (maxAutoLockRenewalDuration != null && maxAutoLockRenewalDuration.isNegative()) {
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
"maxAutoLockRenewalDuration (%s) cannot be negative.", maxAutoLockRenewalDuration)));
}

final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount);
final ReceiverOptions receiverOptions = new ReceiverOptions(receiveMode, prefetchCount,
maxAutoLockRenewalDuration);

return new ServiceBusReceiverAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), entityPath,
entityType, receiverOptions, connectionProcessor, ServiceBusConstants.OPERATION_TIMEOUT,
Expand Down
Loading