diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/Directory.Build.props b/sdk/servicebus/Azure.Messaging.ServiceBus/Directory.Build.props
index 687001ded693..03647177e773 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/Directory.Build.props
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/Directory.Build.props
@@ -3,4 +3,16 @@
Add any shared properties you want for the projects under this package directory that need to be set before the auto imported Directory.Build.props
-->
+
+
+
+
+ $(NoWarn);
+ AZC0006;
+ AZC0007;
+
+
+
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpClient.cs
index 635e577d62e8..4ce697d9e55f 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpClient.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpClient.cs
@@ -9,7 +9,6 @@
using System.Threading.Tasks;
using Azure.Core;
using Azure.Messaging.ServiceBus.Authorization;
-using Azure.Messaging.ServiceBus.Receiver;
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Diagnostics;
using Microsoft.Azure.Amqp;
@@ -174,7 +173,7 @@ protected AmqpClient(string host,
///
///
///
- ///
+ ///
///
///
///
@@ -187,37 +186,53 @@ public override async Task> PeekAsync(
string receiveLinkName = null,
CancellationToken cancellationToken = default)
{
- RetriableContext context = new RetriableContext(
- ConnectionScope,
- new Stopwatch(),
- retryPolicy,
- EntityName,
- cancellationToken);
-
- return await context.RunOperation(
- async () => await PeekInternal(
- context,
+ IEnumerable messages = null;
+ Task peekTask = retryPolicy.RunOperation(async (timeout) =>
+ {
+ messages = await PeekInternal(
+ retryPolicy,
fromSequenceNumber,
messageCount,
sessionId,
- receiveLinkName)
- .ConfigureAwait(false))
- .ConfigureAwait(false);
+ receiveLinkName,
+ timeout,
+ cancellationToken).ConfigureAwait(false);
+ },
+ EntityName,
+ ConnectionScope,
+ cancellationToken);
+ await peekTask.ConfigureAwait(false);
+ return messages;
}
- private async Task> PeekInternal(
- RetriableContext context,
- long? fromSequenceNumber,
- int messageCount = 1,
- string sessionId = null,
- string receiveLinkName = null)
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ internal async Task> PeekInternal(
+ ServiceBusRetryPolicy retryPolicy,
+ long? fromSequenceNumber,
+ int messageCount,
+ string sessionId,
+ string receiveLinkName,
+ TimeSpan timeout,
+ CancellationToken cancellationToken = default)
{
+ var stopWatch = new Stopwatch();
+ stopWatch.Start();
AmqpRequestMessage amqpRequestMessage = AmqpRequestMessage.CreateRequest(
ManagementConstants.Operations.PeekMessageOperation,
- context.TimeSpan,
+ timeout,
null);
- await AquireAccessTokenAsync(context.CancellationToken).ConfigureAwait(false);
+ await AquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
if (receiveLinkName != null)
{
@@ -235,25 +250,16 @@ private async Task> PeekInternal(
RequestResponseAmqpLink link = await ManagementLink.GetOrCreateAsync(
UseMinimum(ConnectionScope.SessionTimeout,
- context.TimeSpan.CalculateRemaining(context.Stopwatch.Elapsed)))
+ timeout.CalculateRemaining(stopWatch.Elapsed)))
.ConfigureAwait(false);
- context.CancellationToken.ThrowIfCancellationRequested();
-
- // This is how Track 1 makes the request
- //var responseAmqpMessage = await Task.Factory.FromAsync(
- //(c, s) => link.BeginRequest(
- // amqpRequestMessage.AmqpMessage,
- // transactionId,
- // TimeSpan.FromSeconds(30),
- // c, s),
- //(a) => link.EndRequest(a),
- //this).ConfigureAwait(false);
+ cancellationToken.ThrowIfCancellationRequested();
using AmqpMessage responseAmqpMessage = await link.RequestAsync(
amqpRequestMessage.AmqpMessage,
- context.TimeSpan.CalculateRemaining(context.Stopwatch.Elapsed))
+ timeout.CalculateRemaining(stopWatch.Elapsed))
.ConfigureAwait(false);
- context.CancellationToken.ThrowIfCancellationRequested();
+
+ cancellationToken.ThrowIfCancellationRequested();
AmqpResponseMessage amqpResponseMessage = AmqpResponseMessage.CreateResponse(responseAmqpMessage);
@@ -301,89 +307,72 @@ public override async Task CancelScheduledMessageAsync(
string receiveLinkName = null,
CancellationToken cancellationToken = default)
{
- var failedAttemptCount = 0;
- var stopWatch = Stopwatch.StartNew();
-
- try
+ Task cancelMessageTask = retryPolicy.RunOperation(async (timeout) =>
{
- var tryTimeout = retryPolicy.CalculateTryTimeout(0);
+ await CancelScheduledMessageInternal(
+ sequenceNumber,
+ retryPolicy,
+ receiveLinkName,
+ timeout,
+ cancellationToken).ConfigureAwait(false);
+ },
+ EntityName,
+ ConnectionScope,
+ cancellationToken);
+ await cancelMessageTask.ConfigureAwait(false);
+ }
- while (!cancellationToken.IsCancellationRequested)
- {
- try
- {
- var request = AmqpRequestMessage.CreateRequest(
- ManagementConstants.Operations.CancelScheduledMessageOperation,
- tryTimeout,
- null);
-
- if (receiveLinkName != null)
- {
- request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLinkName;
- }
-
- request.Map[ManagementConstants.Properties.SequenceNumbers] = new[] { sequenceNumber };
-
- RequestResponseAmqpLink link = await ManagementLink.GetOrCreateAsync(
- UseMinimum(ConnectionScope.SessionTimeout,
- tryTimeout.CalculateRemaining(stopWatch.Elapsed)))
- .ConfigureAwait(false);
-
- using AmqpMessage response = await link.RequestAsync(
- request.AmqpMessage,
- tryTimeout.CalculateRemaining(stopWatch.Elapsed))
- .ConfigureAwait(false);
-
- cancellationToken.ThrowIfCancellationRequested();
- stopWatch.Stop();
- AmqpResponseMessage amqpResponseMessage = AmqpResponseMessage.CreateResponse(response);
-
-
- if (amqpResponseMessage.StatusCode != AmqpResponseStatusCode.OK)
- {
- throw new Exception();
- //throw response.ToMessagingContractException();
- }
- return;
- }
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ internal async Task CancelScheduledMessageInternal(
+ long sequenceNumber,
+ ServiceBusRetryPolicy retryPolicy,
+ string receiveLinkName,
+ TimeSpan timeout,
+ CancellationToken cancellationToken = default)
+ {
+ var stopWatch = Stopwatch.StartNew();
- catch (Exception ex)
- {
- // Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
- // Otherwise, mark the exception as active and break out of the loop.
-
- ++failedAttemptCount;
- TimeSpan? retryDelay = retryPolicy.CalculateRetryDelay(ex, failedAttemptCount);
-
- if (retryDelay.HasValue && !ConnectionScope.IsDisposed && !cancellationToken.IsCancellationRequested)
- {
- ServiceBusEventSource.Log.CancelScheduledMessageError(EntityName, ex.Message);
- await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
-
- tryTimeout = retryPolicy.CalculateTryTimeout(failedAttemptCount);
- stopWatch.Reset();
- }
- else
- {
- throw ex;
- }
- }
- }
- // If no value has been returned nor exception thrown by this point,
- // then cancellation has been requested.
+ var request = AmqpRequestMessage.CreateRequest(
+ ManagementConstants.Operations.CancelScheduledMessageOperation,
+ timeout,
+ null);
- throw new TaskCanceledException();
- }
- catch (Exception ex)
+ if (receiveLinkName != null)
{
- ServiceBusEventSource.Log.CancelScheduledMessageError(EntityName, ex.Message);
- throw;
+ request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLinkName;
}
- finally
+
+ request.Map[ManagementConstants.Properties.SequenceNumbers] = new[] { sequenceNumber };
+
+ RequestResponseAmqpLink link = await ManagementLink.GetOrCreateAsync(
+ UseMinimum(ConnectionScope.SessionTimeout,
+ timeout.CalculateRemaining(stopWatch.Elapsed)))
+ .ConfigureAwait(false);
+
+ using AmqpMessage response = await link.RequestAsync(
+ request.AmqpMessage,
+ timeout.CalculateRemaining(stopWatch.Elapsed))
+ .ConfigureAwait(false);
+
+ cancellationToken.ThrowIfCancellationRequested();
+ stopWatch.Stop();
+ AmqpResponseMessage amqpResponseMessage = AmqpResponseMessage.CreateResponse(response);
+
+
+ if (amqpResponseMessage.StatusCode != AmqpResponseStatusCode.OK)
{
- stopWatch.Stop();
- ServiceBusEventSource.Log.CancelScheduledMessageComplete(EntityName);
+ throw new Exception();
+ //throw response.ToMessagingContractException();
}
+ return;
}
///
@@ -400,123 +389,112 @@ public override async Task ScheduleMessageAsync(
string receiveLinkName = null,
CancellationToken cancellationToken = default)
{
- var failedAttemptCount = 0;
+ long sequenceNumber = 0;
+ Task scheduleTask = retryPolicy.RunOperation(async (timeout) =>
+ {
+ sequenceNumber = await ScheduleMessageInternal(
+ message,
+ retryPolicy,
+ receiveLinkName,
+ timeout,
+ cancellationToken).ConfigureAwait(false);
+ },
+ EntityName,
+ ConnectionScope,
+ cancellationToken);
+ await scheduleTask.ConfigureAwait(false);
+ return sequenceNumber;
+ }
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ internal async Task ScheduleMessageInternal(
+ ServiceBusMessage message,
+ ServiceBusRetryPolicy retryPolicy,
+ string receiveLinkName,
+ TimeSpan timeout,
+ CancellationToken cancellationToken = default)
+ {
var stopWatch = Stopwatch.StartNew();
- try
+ using (AmqpMessage amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message))
{
- TimeSpan tryTimeout = retryPolicy.CalculateTryTimeout(0);
- while (!cancellationToken.IsCancellationRequested)
+
+ var request = AmqpRequestMessage.CreateRequest(
+ ManagementConstants.Operations.ScheduleMessageOperation,
+ timeout,
+ null);
+
+ if (receiveLinkName != null)
+ {
+ request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLinkName;
+ }
+
+ ArraySegment[] payload = amqpMessage.GetPayload();
+ var buffer = new BufferListStream(payload);
+ ArraySegment value = buffer.ReadBytes((int)buffer.Length);
+
+ var entry = new AmqpMap();
{
- try
+ entry[ManagementConstants.Properties.Message] = value;
+ entry[ManagementConstants.Properties.MessageId] = message.MessageId;
+
+ if (!string.IsNullOrWhiteSpace(message.SessionId))
{
- using (AmqpMessage amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message))
- {
-
- var request = AmqpRequestMessage.CreateRequest(
- ManagementConstants.Operations.ScheduleMessageOperation,
- tryTimeout,
- null);
-
- if (receiveLinkName != null)
- {
- request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLinkName;
- }
-
- ArraySegment[] payload = amqpMessage.GetPayload();
- var buffer = new BufferListStream(payload);
- ArraySegment value = buffer.ReadBytes((int)buffer.Length);
-
- var entry = new AmqpMap();
- {
- entry[ManagementConstants.Properties.Message] = value;
- entry[ManagementConstants.Properties.MessageId] = message.MessageId;
-
- if (!string.IsNullOrWhiteSpace(message.SessionId))
- {
- entry[ManagementConstants.Properties.SessionId] = message.SessionId;
- }
-
- if (!string.IsNullOrWhiteSpace(message.PartitionKey))
- {
- entry[ManagementConstants.Properties.PartitionKey] = message.PartitionKey;
- }
-
- if (!string.IsNullOrWhiteSpace(message.ViaPartitionKey))
- {
- entry[ManagementConstants.Properties.ViaPartitionKey] = message.ViaPartitionKey;
- }
- }
-
- request.Map[ManagementConstants.Properties.Messages] = new List { entry };
-
-
- RequestResponseAmqpLink link = await ManagementLink.GetOrCreateAsync(
- UseMinimum(ConnectionScope.SessionTimeout,
- tryTimeout.CalculateRemaining(stopWatch.Elapsed)))
- .ConfigureAwait(false);
-
- using AmqpMessage response = await link.RequestAsync(
- request.AmqpMessage,
- tryTimeout.CalculateRemaining(stopWatch.Elapsed))
- .ConfigureAwait(false);
-
- cancellationToken.ThrowIfCancellationRequested();
- stopWatch.Stop();
-
- AmqpResponseMessage amqpResponseMessage = AmqpResponseMessage.CreateResponse(response);
-
- if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
- {
- var sequenceNumbers = amqpResponseMessage.GetValue(ManagementConstants.Properties.SequenceNumbers);
- if (sequenceNumbers == null || sequenceNumbers.Length < 1)
- {
- throw new ServiceBusException(true, "Could not schedule message successfully.");
- }
-
- return sequenceNumbers[0];
-
- }
- else
- {
- throw new Exception();
-
- //throw response.ToMessagingContractException();
- }
- }
+ entry[ManagementConstants.Properties.SessionId] = message.SessionId;
}
- catch (Exception ex)
- {
- // Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
- // Otherwise, mark the exception as active and break out of the loop.
- ++failedAttemptCount;
- TimeSpan? retryDelay = retryPolicy.CalculateRetryDelay(ex, failedAttemptCount);
-
- if (retryDelay.HasValue && !ConnectionScope.IsDisposed && !cancellationToken.IsCancellationRequested)
- {
- ServiceBusEventSource.Log.ScheduleMessageError(EntityName, ex.Message);
- await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
+ if (!string.IsNullOrWhiteSpace(message.PartitionKey))
+ {
+ entry[ManagementConstants.Properties.PartitionKey] = message.PartitionKey;
+ }
- tryTimeout = retryPolicy.CalculateTryTimeout(failedAttemptCount);
- stopWatch.Reset();
- }
+ if (!string.IsNullOrWhiteSpace(message.ViaPartitionKey))
+ {
+ entry[ManagementConstants.Properties.ViaPartitionKey] = message.ViaPartitionKey;
}
}
- // If no value has been returned nor exception thrown by this point,
- // then cancellation has been requested.
- throw new TaskCanceledException();
- }
- catch (Exception ex)
- {
- ServiceBusEventSource.Log.ScheduleMessageError(EntityName, ex.Message);
- throw;
- }
- finally
- {
+ request.Map[ManagementConstants.Properties.Messages] = new List { entry };
+
+ RequestResponseAmqpLink link = await ManagementLink.GetOrCreateAsync(
+ UseMinimum(ConnectionScope.SessionTimeout,
+ timeout.CalculateRemaining(stopWatch.Elapsed)))
+ .ConfigureAwait(false);
+
+ using AmqpMessage response = await link.RequestAsync(
+ request.AmqpMessage,
+ timeout.CalculateRemaining(stopWatch.Elapsed))
+ .ConfigureAwait(false);
+
+ cancellationToken.ThrowIfCancellationRequested();
stopWatch.Stop();
- ServiceBusEventSource.Log.ScheduleMessageComplete(EntityName);
+
+ AmqpResponseMessage amqpResponseMessage = AmqpResponseMessage.CreateResponse(response);
+
+ if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
+ {
+ var sequenceNumbers = amqpResponseMessage.GetValue(ManagementConstants.Properties.SequenceNumbers);
+ if (sequenceNumbers == null || sequenceNumbers.Length < 1)
+ {
+ throw new ServiceBusException(true, "Could not schedule message successfully.");
+ }
+
+ return sequenceNumbers[0];
+
+ }
+ else
+ {
+ throw new Exception();
+ //throw response.ToMessagingContractException();
+ }
}
}
@@ -562,13 +540,15 @@ public override TransportSender CreateSender(ServiceBusRetryPolicy retryPolicy)
/// The relative priority to associate with the link; for a non-exclusive link, this value should be null.
/// Controls the number of events received and queued locally without regard to whether an operation was requested. If null a default will be used.
///
+ ///
///
/// A configured in the requested manner.
///
public override TransportConsumer CreateConsumer(ServiceBusRetryPolicy retryPolicy,
long? ownerLevel,
uint? prefetchCount,
- string sessionId = default)
+ string sessionId,
+ bool isSessionReceiver)
{
Argument.AssertNotClosed(_closed, nameof(AmqpClient));
@@ -579,7 +559,8 @@ public override TransportConsumer CreateConsumer(ServiceBusRetryPolicy retryPoli
prefetchCount,
ConnectionScope,
retryPolicy,
- sessionId
+ sessionId,
+ isSessionReceiver
);
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs
index 46401edb3b68..acf2ad3a0b4e 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs
@@ -11,7 +11,6 @@
using System.Threading.Tasks;
using Azure.Core;
using Azure.Messaging.ServiceBus.Authorization;
-using Azure.Messaging.ServiceBus.Receiver;
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Diagnostics;
using Azure.Messaging.ServiceBus.Primitives;
@@ -28,7 +27,7 @@ namespace Azure.Messaging.ServiceBus.Amqp
/// client types within a given scope.
///
///
- internal class AmqpConnectionScope : IDisposable
+ internal class AmqpConnectionScope : TransportConnectionScope
{
/// The name to assign to the SASL handler to specify that CBS tokens are in use.
private const string CbsSaslHandlerName = "MSSBCBS";
@@ -84,7 +83,7 @@ internal class AmqpConnectionScope : IDisposable
/// interval be used when creating or opening AMQP links and related constructs.
///
///
- public TimeSpan SessionTimeout { get; } = TimeSpan.FromSeconds(30);
+ public override TimeSpan SessionTimeout { get; } = TimeSpan.FromSeconds(30);
///
/// Indicates whether this has been disposed.
@@ -92,7 +91,7 @@ internal class AmqpConnectionScope : IDisposable
///
/// true if disposed; otherwise, false.
///
- public bool IsDisposed { get; private set; }
+ public override bool IsDisposed { get; protected set; }
///
/// The cancellation token to use with operations initiated by the scope.
@@ -231,16 +230,19 @@ public virtual async Task OpenManagementLinkAsync(TimeS
/// Controls the number of events received and queued locally without regard to whether an operation was requested.
/// The relative priority to associate with the link; for a non-exclusive link, this value should be null.
///
+ ///
/// The timeout to apply when creating the link.
/// An optional instance to signal the request to cancel the operation.
///
/// A link for use with consumer operations.
///
- public virtual async Task OpenConsumerLinkAsync(TimeSpan timeout,
- uint prefetchCount,
- long? ownerLevel,
- string sessionId,
- CancellationToken cancellationToken)
+ public virtual async Task OpenConsumerLinkAsync(
+ TimeSpan timeout,
+ uint prefetchCount,
+ long? ownerLevel,
+ string sessionId,
+ bool isSessionReceiver,
+ CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
@@ -258,6 +260,7 @@ public virtual async Task OpenConsumerLinkAsync(TimeSpan time
prefetchCount,
ownerLevel,
sessionId,
+ isSessionReceiver,
cancellationToken
).ConfigureAwait(false);
@@ -306,7 +309,7 @@ public virtual async Task OpenProducerLinkAsync(
/// including ensuring that the client itself has been closed.
///
///
- public void Dispose()
+ public override void Dispose()
{
if (IsDisposed)
{
@@ -426,7 +429,7 @@ protected virtual async Task CreateManagementLinkAsync(
var authExpirationUtc = await RequestAuthorizationUsingCbsAsync(connection, TokenProvider, ServiceEndpoint, endpointUri.AbsoluteUri, endpointUri.AbsoluteUri, claims, timeout.CalculateRemaining(stopWatch.Elapsed)).ConfigureAwait(false);
var link = new RequestResponseAmqpLink(/*AmqpManagement.LinkType*/"entity-mgmt", session, entityPath, linkSettings.Properties);
- linkSettings.LinkName = $"{connection.Settings.ContainerId};{connection.Identifier}:{session.Identifier}:{link.Identifier}:111";
+ linkSettings.LinkName = $"{connection.Settings.ContainerId};{connection.Identifier}:{session.Identifier}:{link.Identifier}";
stopWatch.Stop();
// Track the link before returning it, so that it can be managed with the scope.
@@ -471,18 +474,21 @@ protected virtual async Task CreateManagementLinkAsync(
/// Controls the number of events received and queued locally without regard to whether an operation was requested.
/// The relative priority to associate with the link; for a non-exclusive link, this value should be null.
///
+ ///
/// The timeout to apply when creating the link.
/// An optional instance to signal the request to cancel the operation.
///
/// A link for use for operations related to receiving events.
///
- protected virtual async Task CreateReceivingLinkAsync(AmqpConnection connection,
- Uri endpoint,
- TimeSpan timeout,
- uint prefetchCount,
- long? ownerLevel,
- string sessionId,
- CancellationToken cancellationToken)
+ protected virtual async Task CreateReceivingLinkAsync(
+ AmqpConnection connection,
+ Uri endpoint,
+ TimeSpan timeout,
+ uint prefetchCount,
+ long? ownerLevel,
+ string sessionId,
+ bool isSessionReceiver,
+ CancellationToken cancellationToken)
{
Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
cancellationToken.ThrowIfCancellationRequested();
@@ -506,20 +512,14 @@ protected virtual async Task CreateReceivingLinkAsync(AmqpCon
await OpenAmqpObjectAsync(session, timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
- // Create and open the link.
var filters = new FilterSet();
- if (sessionId != null)
+
+ // even if supplied sessionId is null, we need to add the Session filter if it is a session receiver
+ if (isSessionReceiver)
{
filters.Add(AmqpClientConstants.SessionFilterName, sessionId);
}
-
- //if (this.isSessionReceiver)
- //{
- // filterMap = new FilterSet { { AmqpClientConstants.SessionFilterName, this.SessionIdInternal } };
- //}
-
-
var linkSettings = new AmqpLinkSettings
{
Role = true,
@@ -530,17 +530,14 @@ protected virtual async Task CreateReceivingLinkAsync(AmqpCon
Target = new Target { Address = Guid.NewGuid().ToString() }
};
- //linkSettings.AddProperty(AmqpProperty.EntityType, "0,1,2,3");//(int)AmqpProperty.Entity.ConsumerGroup);
-
if (ownerLevel.HasValue)
{
linkSettings.AddProperty(AmqpProperty.OwnerLevel, ownerLevel.Value);
}
var link = new ReceivingAmqpLink(linkSettings);
- linkSettings.LinkName = $"{connection.Settings.ContainerId};{connection.Identifier}:{session.Identifier}:{link.Identifier}:{linkSettings.Source.ToString()}:test";
+ linkSettings.LinkName = $"{connection.Settings.ContainerId};{connection.Identifier}:{session.Identifier}:{link.Identifier}:{linkSettings.Source.ToString()}";
- //linkSettings.LinkName = $"{ Id };{ connection.Identifier }:{ session.Identifier }:{ link.Identifier }";
link.AttachTo(session);
stopWatch.Stop();
@@ -590,10 +587,11 @@ protected virtual async Task CreateReceivingLinkAsync(AmqpCon
///
/// A link for use for operations related to receiving events.
///
- protected virtual async Task CreateSendingLinkAsync(AmqpConnection connection,
- Uri endpoint,
- TimeSpan timeout,
- CancellationToken cancellationToken)
+ protected virtual async Task CreateSendingLinkAsync(
+ AmqpConnection connection,
+ Uri endpoint,
+ TimeSpan timeout,
+ CancellationToken cancellationToken)
{
Argument.AssertNotDisposed(IsDisposed, nameof(AmqpConnectionScope));
cancellationToken.ThrowIfCancellationRequested();
@@ -628,7 +626,6 @@ protected virtual async Task CreateSendingLinkAsync(AmqpConnect
};
linkSettings.AddProperty(AmqpProperty.Timeout, (uint)timeout.CalculateRemaining(stopWatch.Elapsed).TotalMilliseconds);
- //linkSettings.AddProperty(AmqpProperty.EntityType, "0,1"); // MessagingEntityType
var link = new SendingAmqpLink(linkSettings);
linkSettings.LinkName = $"{ Id };{ connection.Identifier }:{ session.Identifier }:{ link.Identifier }";
@@ -684,8 +681,9 @@ protected virtual async Task CreateSendingLinkAsync(AmqpConnect
/// its communication properties modified.
///
///
- protected virtual void BeginTrackingLinkAsActive(AmqpObject link,
- Timer authorizationRefreshTimer = null)
+ protected virtual void BeginTrackingLinkAsActive(
+ AmqpObject link,
+ Timer authorizationRefreshTimer = null)
{
// Register the link as active and having authorization automatically refreshed, so that it can be
// managed with the scope.
@@ -753,15 +751,16 @@ protected virtual TimeSpan CalculateLinkAuthorizationRefreshInterval(DateTime ex
///
/// A delegate to perform the refresh when a timer is due.
///
- protected virtual TimerCallback CreateAuthorizationRefreshHandler(AmqpConnection connection,
- AmqpObject amqpLink,
- CbsTokenProvider tokenProvider,
- Uri endpoint,
- string audience,
- string resource,
- string[] requiredClaims,
- TimeSpan refreshTimeout,
- Func refreshTimerFactory)
+ protected virtual TimerCallback CreateAuthorizationRefreshHandler(
+ AmqpConnection connection,
+ AmqpObject amqpLink,
+ CbsTokenProvider tokenProvider,
+ Uri endpoint,
+ string audience,
+ string resource,
+ string[] requiredClaims,
+ TimeSpan refreshTimeout,
+ Func refreshTimerFactory)
{
return async _ =>
{
@@ -838,13 +837,14 @@ protected virtual Task OpenAmqpObjectAsync(AmqpObject target,
/// credentials.
///
///
- protected virtual Task RequestAuthorizationUsingCbsAsync(AmqpConnection connection,
- CbsTokenProvider tokenProvider,
- Uri endpoint,
- string audience,
- string resource,
- string[] requiredClaims,
- TimeSpan timeout)
+ protected virtual Task RequestAuthorizationUsingCbsAsync(
+ AmqpConnection connection,
+ CbsTokenProvider tokenProvider,
+ Uri endpoint,
+ string audience,
+ string resource,
+ string[] requiredClaims,
+ TimeSpan timeout)
{
var authLink = connection.Extensions.Find();
return authLink.SendTokenAsync(TokenProvider, endpoint, audience, resource, requiredClaims, timeout);
@@ -883,8 +883,9 @@ private static AmqpSettings CreateAmpqSettings(Version amqpVersion)
///
/// The settings to use for transport.
///
- private static TransportSettings CreateTransportSettingsforTcp(string hostName,
- int port)
+ private static TransportSettings CreateTransportSettingsforTcp(
+ string hostName,
+ int port)
{
var tcpSettings = new TcpTransportSettings
{
@@ -909,8 +910,9 @@ private static TransportSettings CreateTransportSettingsforTcp(string hostName,
///
/// The settings to use for transport.
///
- private static TransportSettings CreateTransportSettingsForWebSockets(string hostName,
- IWebProxy proxy)
+ private static TransportSettings CreateTransportSettingsForWebSockets(
+ string hostName,
+ IWebProxy proxy)
{
var uriBuilder = new UriBuilder(hostName)
{
@@ -935,8 +937,9 @@ private static TransportSettings CreateTransportSettingsForWebSockets(string hos
///
/// The settings to apply to the connection.
///
- private static AmqpConnectionSettings CreateAmqpConnectionSettings(string hostName,
- string identifier)
+ private static AmqpConnectionSettings CreateAmqpConnectionSettings(
+ string hostName,
+ string identifier)
{
var connectionSettings = new AmqpConnectionSettings
{
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConsumer.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConsumer.cs
index dc74d57f4a38..c09c2cd92883 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConsumer.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConsumer.cs
@@ -8,16 +8,16 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
-using Azure.Messaging.ServiceBus.Receiver;
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Diagnostics;
using Microsoft.Azure.Amqp;
+using Microsoft.Azure.Amqp.Framing;
namespace Azure.Messaging.ServiceBus.Amqp
{
///
/// A transport client abstraction responsible for brokering operations for AMQP-based connections.
- /// It is intended that the public make use of an instance
+ /// It is intended that the public make use of an instance
/// via containment and delegate operations to it.
///
///
@@ -45,7 +45,7 @@ internal class AmqpConsumer : TransportConsumer
/// The name of the Service Bus entity to which the client is bound.
///
///
- private string EntityName { get; }
+ public override string EntityName { get; }
///
/// The identifier of the Service Bus entity partition that this consumer is associated with. Events will be read
@@ -58,25 +58,23 @@ internal class AmqpConsumer : TransportConsumer
/// The policy to use for determining retry behavior for when an operation fails.
///
///
- private ServiceBusRetryPolicy RetryPolicy { get; }
-
- /////
- ///// The converter to use for translating between AMQP messages and client library
- ///// types.
- /////
- //private AmqpMessageConverter MessageConverter { get; }
-
- /////
- ///// The AMQP connection scope responsible for managing transport constructs for this instance.
- /////
- /////
- //internal AmqpConnectionScope ConnectionScope { get; }
-
- /////
- ///// The AMQP link intended for use with receiving operations.
- /////
- /////
- //internal FaultTolerantAmqpObject ReceiveLink { get; }
+ private readonly ServiceBusRetryPolicy _retryPolicy;
+
+ ///
+ /// Indicates whether or not this is a receiver scoped to a session.
+ ///
+ private readonly bool _isSessionReceiver;
+
+ ///
+ /// The AMQP connection scope responsible for managing transport constructs for this instance.
+ ///
+ ///
+ private readonly AmqpConnectionScope _connectionScope;
+
+ ///
+ public override TransportConnectionScope ConnectionScope =>
+ _connectionScope;
+
///
/// Initializes a new instance of the class.
@@ -88,6 +86,7 @@ internal class AmqpConsumer : TransportConsumer
/// The AMQP connection context for operations .
/// The retry policy to consider when an operation fails.
///
+ ///
///
///
/// As an internal type, this class performs only basic sanity checks against its arguments. It
@@ -104,25 +103,26 @@ public AmqpConsumer(
uint? prefetchCount,
AmqpConnectionScope connectionScope,
ServiceBusRetryPolicy retryPolicy,
- string sessionId)
+ string sessionId,
+ bool isSessionReceiver)
{
Argument.AssertNotNullOrEmpty(entityName, nameof(entityName));
Argument.AssertNotNull(connectionScope, nameof(connectionScope));
Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
EntityName = entityName;
- ConnectionScope = connectionScope;
- RetryPolicy = retryPolicy;
+ _connectionScope = connectionScope;
+ _retryPolicy = retryPolicy;
+ _isSessionReceiver = isSessionReceiver;
ReceiveLink = new FaultTolerantAmqpObject(
timeout =>
- ConnectionScope.OpenConsumerLinkAsync(
- //consumerGroup,
- //partitionId,
- timeout,
- prefetchCount ?? DefaultPrefetchCount,
- ownerLevel,
- sessionId,
- CancellationToken.None),
+ _connectionScope.OpenConsumerLinkAsync(
+ timeout: timeout,
+ prefetchCount: prefetchCount ?? DefaultPrefetchCount,
+ ownerLevel: ownerLevel,
+ sessionId: sessionId,
+ isSessionReceiver: isSessionReceiver,
+ cancellationToken: CancellationToken.None),
link =>
{
CloseLink(link);
@@ -155,7 +155,7 @@ public override async Task> ReceiveAsync(
var receivedMessageCount = 0;
var failedAttemptCount = 0;
- var tryTimeout = RetryPolicy.CalculateTryTimeout(0);
+ var tryTimeout = _retryPolicy.CalculateTryTimeout(0);
var waitTime = (maximumWaitTime ?? tryTimeout);
var link = default(ReceivingAmqpLink);
var retryDelay = default(TimeSpan?);
@@ -222,14 +222,14 @@ public override async Task> ReceiveAsync(
// Otherwise, bubble the exception.
++failedAttemptCount;
- retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
+ retryDelay = _retryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
{
ServiceBusEventSource.Log.MessageReceiveError(EntityName, activeEx.Message);
await Task.Delay(UseMinimum(retryDelay.Value, waitTime.CalculateRemaining(stopWatch.Elapsed)), cancellationToken).ConfigureAwait(false);
- tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
+ tryTimeout = _retryPolicy.CalculateTryTimeout(failedAttemptCount);
}
else if (ex is AmqpException)
{
@@ -316,5 +316,30 @@ private static TimeSpan UseMinimum(
TimeSpan firstOption,
TimeSpan secondOption) =>
(firstOption < secondOption) ? firstOption : secondOption;
+
+ ///
+ /// Get the session Id corresponding to this consumer
+ ///
+ ///
+ ///
+ public override async Task GetSessionId(CancellationToken cancellationToken = default)
+ {
+ if (!_isSessionReceiver)
+ {
+ return null;
+ }
+ ReceivingAmqpLink openedLink = null;
+ await _retryPolicy.RunOperation(
+ async (timeout) =>
+ openedLink = await ReceiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false),
+ EntityName,
+ ConnectionScope,
+ cancellationToken).ConfigureAwait(false);
+
+ var source = (Source)openedLink.Settings.Source;
+ source.FilterSet.TryGetValue(AmqpClientConstants.SessionFilterName, out var sessionId);
+ return sessionId;
+ }
+
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs
index a814b67b3cca..5e096d13d5de 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs
@@ -10,7 +10,6 @@
using Azure.Core;
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Diagnostics;
-using Azure.Messaging.ServiceBus.Sender;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Framing;
@@ -18,7 +17,7 @@ namespace Azure.Messaging.ServiceBus.Amqp
{
///
/// A transport producer abstraction responsible for brokering operations for AMQP-based connections.
- /// It is intended that the public make use of an instance
+ /// It is intended that the public make use of an instance
/// via containment and delegate operations to it.
///
///
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/RetriableContext.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/RetriableContext.cs
deleted file mode 100644
index 40dd0c053ba0..000000000000
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/RetriableContext.cs
+++ /dev/null
@@ -1,99 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-using System;
-using System.Diagnostics;
-using System.Threading;
-using System.Threading.Tasks;
-using Azure.Messaging.ServiceBus.Amqp;
-using Azure.Messaging.ServiceBus.Diagnostics;
-using Microsoft.Azure.Amqp;
-
-namespace Azure.Messaging.ServiceBus.Core
-{
- internal class RetriableContext
- {
- public Stopwatch Stopwatch { get; }
- public TimeSpan TimeSpan { get; }
- public CancellationToken CancellationToken { get; }
- private ServiceBusRetryPolicy RetryPolicy { get; }
- private string EntityName { get; }
- private AmqpConnectionScope Scope { get; }
-
- public RetriableContext(
- AmqpConnectionScope scope,
- Stopwatch stopwatch,
- ServiceBusRetryPolicy retryPolicy,
- string entityName,
- CancellationToken cancellationToken)
- {
- Scope = scope;
- Stopwatch = stopwatch;
- RetryPolicy = retryPolicy;
- TimeSpan = retryPolicy.CalculateTryTimeout(0);
- CancellationToken = cancellationToken;
- EntityName = entityName;
- }
-
- public async Task RunOperation(Func> operation)
- {
- var failedAttemptCount = 0;
-
- var stopWatch = Stopwatch.StartNew();
- try
- {
- TimeSpan tryTimeout = RetryPolicy.CalculateTryTimeout(0);
-
- while (!CancellationToken.IsCancellationRequested)
- {
-
- try
- {
- return await operation().ConfigureAwait(false);
- }
-
- catch (Exception ex)
- {
- Exception activeEx = ex.TranslateServiceException(EntityName);
-
- // Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
- // Otherwise, mark the exception as active and break out of the loop.
-
- ++failedAttemptCount;
- TimeSpan? retryDelay = RetryPolicy.CalculateRetryDelay(ex, failedAttemptCount);
- if (retryDelay.HasValue && !Scope.IsDisposed && !CancellationToken.IsCancellationRequested)
- {
- //EventHubsEventSource.Log.GetPropertiesError(EventHubName, activeEx.Message);
- await Task.Delay(retryDelay.Value, CancellationToken).ConfigureAwait(false);
-
- tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
- stopWatch.Reset();
- }
- else if (ex is AmqpException)
- {
- throw activeEx;
- }
- else
- {
- throw;
- }
- }
- }
- // If no value has been returned nor exception thrown by this point,
- // then cancellation has been requested.
- throw new TaskCanceledException();
- }
- catch (Exception exception)
- {
- throw exception;
- //TODO through correct exception throw AmqpExceptionHelper.GetClientException(exception);
- }
- finally
- {
- stopWatch.Stop();
- //TODO log correct completion event ServiceBusEventSource.Log.PeekMessagesComplete(EntityName);
- }
- }
-
- }
-}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportClient.cs
index fa68a71cf9f2..1d98d52e6f2c 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportClient.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportClient.cs
@@ -111,6 +111,7 @@ public abstract Task CancelScheduledMessageAsync(
/// The relative priority to associate with the link; for a non-exclusive link, this value should be null.
/// Controls the number of events received and queued locally without regard to whether an operation was requested. If null a default will be used.
///
+ ///
///
/// A configured in the requested manner.
///
@@ -118,7 +119,8 @@ public abstract TransportConsumer CreateConsumer(
ServiceBusRetryPolicy retryPolicy,
long? ownerLevel,
uint? prefetchCount,
- string sessionId);
+ string sessionId,
+ bool isSessionReceiver);
///
/// Closes the connection to the transport client instance.
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportConnectionScope.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportConnectionScope.cs
new file mode 100644
index 000000000000..886461e30ae8
--- /dev/null
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportConnectionScope.cs
@@ -0,0 +1,29 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+
+namespace Azure.Messaging.ServiceBus.Core
+{
+ internal abstract class TransportConnectionScope : IDisposable
+ {
+ ///
+ /// Indicates whether this has been disposed.
+ ///
+ ///
+ /// true if disposed; otherwise, false.
+ ///
+ public abstract bool IsDisposed { get; protected set; }
+
+ ///
+ /// The recommended timeout to associate with the session.
+ ///
+ ///
+ public abstract TimeSpan SessionTimeout { get; }
+
+ ///
+ /// Disposes of the connection scope.
+ ///
+ public abstract void Dispose();
+ }
+}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportConsumer.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportConsumer.cs
index 54b9f9cbea47..41f1ddc341cc 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportConsumer.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportConsumer.cs
@@ -6,14 +6,13 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Amqp;
-using Azure.Messaging.ServiceBus.Receiver;
using Microsoft.Azure.Amqp;
namespace Azure.Messaging.ServiceBus.Core
{
///
/// Provides an abstraction for generalizing an Event Consumer so that a dedicated instance may provide operations
- /// for a specific transport, such as AMQP or JMS. It is intended that the public employ
+ /// for a specific transport, such as AMQP or JMS. It is intended that the public employ
/// a transport consumer via containment and delegate operations to it rather than understanding protocol-specific details
/// for different transports.
///
@@ -36,14 +35,15 @@ internal abstract class TransportConsumer
public FaultTolerantAmqpObject ReceiveLink { get; protected set; }
///
- ///
+ /// The scope associated with the
+ /// .
///
- public AmqpConnectionScope ConnectionScope { get; set; }
+ public virtual TransportConnectionScope ConnectionScope { get; set; }
///
///
///
- public string SessionId { get; set; }
+ public virtual string EntityName { get; }
///
/// Receives a batch of from the Service Bus entity.
@@ -67,5 +67,13 @@ public abstract Task> ReceiveAsync(
/// An optional instance to signal the request to cancel the operation.
///
public abstract Task CloseAsync(CancellationToken cancellationToken);
+
+ ///
+ /// Get the session Id corresponding to this consumer.
+ ///
+ ///
+ ///
+ public abstract Task GetSessionId(CancellationToken cancellationToken = default);
+
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs
index 6d9e025670b2..746578802952 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs
@@ -4,14 +4,13 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
-using Azure.Messaging.ServiceBus.Sender;
using Microsoft.Azure.Amqp;
namespace Azure.Messaging.ServiceBus.Core
{
///
/// Provides an abstraction for generalizing an Service Bus entity Producer so that a dedicated instance may provide operations
- /// for a specific transport, such as AMQP or JMS. It is intended that the public employ
+ /// for a specific transport, such as AMQP or JMS. It is intended that the public employ
/// a transport producer via containment and delegate operations to it rather than understanding protocol-specific details
/// for different transports.
///
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs
index f3b22ff41610..c8b1025e7ebc 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs
@@ -3,8 +3,7 @@
using System;
using System.Diagnostics.Tracing;
-using Azure.Messaging.ServiceBus.Receiver;
-using Azure.Messaging.ServiceBus.Sender;
+using Azure.Messaging.ServiceBus.Core;
namespace Azure.Messaging.ServiceBus.Diagnostics
{
@@ -183,7 +182,7 @@ public void MessageReceiveError(
///
/// Indicates that a client is closing, which may correspond to an ,
- /// , , or EventProcessorClient.
+ /// , , or EventProcessorClient.
///
///
/// The type of client being closed.
@@ -203,7 +202,7 @@ public void ClientCloseStart(Type clientType,
///
/// Indicates that a client has been closed, which may correspond to an ,
- /// , , or EventProcessorClient.
+ /// , , or EventProcessorClient.
///
///
/// The type of client being closed.
@@ -223,7 +222,7 @@ public void ClientCloseComplete(Type clientType,
///
/// Indicates that an exception was encountered while closing an ,
- /// , , or EventProcessorClient.
+ /// , , or EventProcessorClient.
///
///
/// The type of client being closed.
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/MessageHandlerOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/MessageHandlerOptions.cs
index 7ca941aac14c..f9a6da950219 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/MessageHandlerOptions.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/MessageHandlerOptions.cs
@@ -6,7 +6,7 @@
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Primitives;
-namespace Azure.Messaging.ServiceBus.Receiver
+namespace Azure.Messaging.ServiceBus
{
/// Provides options associated with message pump processing using
/// cref="QueueClient.RegisterMessageHandler(Func{Message, CancellationToken, Task}, MessageHandlerOptions)" /> and
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/QueueReceiverClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/QueueReceiverClient.cs
index 5ac08d4ffe13..bfcd9eb7ac5b 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/QueueReceiverClient.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/QueueReceiverClient.cs
@@ -7,12 +7,12 @@
using Azure.Core;
using Azure.Messaging.ServiceBus.Core;
-namespace Azure.Messaging.ServiceBus.Receiver
+namespace Azure.Messaging.ServiceBus
{
///
///
///
- public class QueueReceiverClient : ServiceBusReceiverClient
+ public class QueueReceiverClient : ServiceBusReceiver
{
///
///
@@ -23,14 +23,14 @@ public class QueueReceiverClient : ServiceBusReceiverClient
///
protected QueueReceiverClient(){}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
///
///
/// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
- /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ Service Bus entity NAME ]]" to the end of the
+ /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ SERVICE BUS ENTITY NAME ]]" to the end of the
/// connection string. For example, ";EntityPath=telemetry-hub".
///
/// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
@@ -42,8 +42,30 @@ public QueueReceiverClient(string connectionString)
{
}
+
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
+ ///
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
+ /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ SERVICE BUS ENTITY NAME ]]" to the end of the
+ /// connection string. For example, ";EntityPath=telemetry-hub".
+ ///
+ /// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
+ /// Service Bus entity will result in a connection string that contains the name.
+ ///
+ ///
+ public QueueReceiverClient(string connectionString, SessionOptions sessionOptions)
+ : base(connectionString, null, null, sessionOptions, new QueueReceiverClientOptions())
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
///
///
/// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
@@ -51,7 +73,7 @@ public QueueReceiverClient(string connectionString)
///
///
/// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
- /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ Service Bus entity NAME ]]" to the end of the
+ /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ SERVICE BUS ENTITY NAME ]]" to the end of the
/// connection string. For example, ";EntityPath=telemetry-hub".
///
/// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
@@ -66,7 +88,32 @@ public QueueReceiverClient(
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
+ ///
+ /// The set of options to use for this consumer.
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
+ /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ SERVICE BUS ENTITY NAME ]]" to the end of the
+ /// connection string. For example, ";EntityPath=telemetry-hub".
+ ///
+ /// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
+ /// Service Bus entity will result in a connection string that contains the name.
+ ///
+ ///
+ public QueueReceiverClient(
+ string connectionString,
+ SessionOptions sessionOptions,
+ QueueReceiverClientOptions clientOptions)
+ : base(connectionString, null, sessionOptions, clientOptions?.Clone() ?? new QueueReceiverClientOptions())
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
///
///
/// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
@@ -88,7 +135,31 @@ public QueueReceiverClient(
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
+ /// The name of the specific Service Bus entity to associate the consumer with.
+ ///
+ ///
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus entity itself, it will contain the name of the desired Service Bus entity,
+ /// and can be used directly without passing the . The name of the Service Bus entity should be
+ /// passed only once, either as part of the connection string or separately.
+ ///
+ ///
+ public QueueReceiverClient(
+ string connectionString,
+ string queueName,
+ SessionOptions sessionOptions,
+ QueueReceiverClientOptions clientOptions = default)
+ : base(connectionString, queueName, sessionOptions, clientOptions?.Clone() ?? new QueueReceiverClientOptions())
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
///
///
/// The fully qualified Service Bus namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net.
@@ -104,5 +175,25 @@ public QueueReceiverClient(
:base(fullyQualifiedNamespace, queueName, credential, null, clientOptions?.Clone() ?? new QueueReceiverClientOptions())
{
}
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The fully qualified Service Bus namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net.
+ /// The name of the specific Service Bus entity to associate the consumer with.
+ /// The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.
+ ///
+ /// A set of options to apply when configuring the consumer.
+ ///
+ public QueueReceiverClient(
+ string fullyQualifiedNamespace,
+ string queueName,
+ TokenCredential credential,
+ SessionOptions sessionOptions,
+ QueueReceiverClientOptions clientOptions = default)
+ : base(fullyQualifiedNamespace, queueName, credential, sessionOptions, clientOptions?.Clone() ?? new QueueReceiverClientOptions())
+ {
+ }
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/QueueReceiverClientOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/QueueReceiverClientOptions.cs
index 58f89fba147e..209144e84433 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/QueueReceiverClientOptions.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/QueueReceiverClientOptions.cs
@@ -3,7 +3,7 @@
using Azure.Messaging.ServiceBus.Core;
-namespace Azure.Messaging.ServiceBus.Receiver
+namespace Azure.Messaging.ServiceBus
{
///
///
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ReceiveMode.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ReceiveMode.cs
index bee3b1db8ad3..e031aa13c580 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ReceiveMode.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ReceiveMode.cs
@@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
-namespace Azure.Messaging.ServiceBus.Receiver
+namespace Azure.Messaging.ServiceBus
{
///
/// Specifies the behavior of the receiver.
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiverClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs
similarity index 78%
rename from sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiverClient.cs
rename to sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs
index c2a81d4dce98..e03de2467eed 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiverClient.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs
@@ -10,11 +10,13 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
+using Azure.Messaging.ServiceBus.Amqp;
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Diagnostics;
using Microsoft.Azure.Amqp;
+using Microsoft.Azure.Amqp.Framing;
-namespace Azure.Messaging.ServiceBus.Receiver
+namespace Azure.Messaging.ServiceBus.Core
{
///
/// A client responsible for reading from a specific entity
@@ -29,7 +31,7 @@ namespace Azure.Messaging.ServiceBus.Receiver
/// sometimes referred to as "Non-Epoch Consumers."
///
///
- public abstract class ServiceBusReceiverClient : IAsyncDisposable
+ public abstract class ServiceBusReceiver : IAsyncDisposable
{
///
/// The fully qualified Service Bus namespace that the consumer is associated with. This is likely
@@ -50,13 +52,15 @@ public abstract class ServiceBusReceiverClient : IAsyncDisposable
///
public ReceiveMode ReceiveMode { get; private set; }
+ internal bool IsSessionReceiver { get; set; }
+
///
///
///
- public int PrefetchCount { get; }
+ public uint PrefetchCount { get; }
///
- /// Indicates whether or not this has been closed.
+ /// Indicates whether or not this has been closed.
///
///
///
@@ -93,13 +97,44 @@ public abstract class ServiceBusReceiverClient : IAsyncDisposable
internal TransportConsumer Consumer { get; }
///
- /// Initializes a new instance of the class.
+ ///
+ ///
+ public ServiceBusSession Session { get; set; }
+
+ /////
+ ///// Initializes a new instance of the class.
+ /////
+ /////
+ ///// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
+ ///// The name of the specific Service Bus entity to associate the consumer with.
+ /////
+ /////
+ /////
+ ///// If the connection string is copied from the Service Bus entity itself, it will contain the name of the desired Service Bus entity,
+ ///// and can be used directly without passing the . The name of the Service Bus entity should be
+ ///// passed only once, either as part of the connection string or separately.
+ /////
+ /////
+ //public ServiceBusReceiver(
+ // string connectionString,
+ // string entityName,
+ // SessionOptions sessionOptions) :
+ // this(
+ // connectionString,
+ // entityName,
+ // sessionOptions,
+ // new ServiceBusReceiverClientOptions())
+ //{
+ //}
+
+ ///
+ /// Initializes a new instance of the class.
///
///
/// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
/// The name of the specific Service Bus entity to associate the consumer with.
/// A set of options to apply when configuring the consumer.
- ///
+ ///
///
///
/// If the connection string is copied from the Service Bus entity itself, it will contain the name of the desired Service Bus entity,
@@ -107,37 +142,95 @@ public abstract class ServiceBusReceiverClient : IAsyncDisposable
/// passed only once, either as part of the connection string or separately.
///
///
- internal ServiceBusReceiverClient(
+ internal ServiceBusReceiver(
string connectionString,
string entityName,
- string sessionId,
+ SessionOptions sessionOptions,
ServiceBusReceiverClientOptions clientOptions)
{
Argument.AssertNotNullOrEmpty(connectionString, nameof(connectionString));
Argument.AssertNotNull(clientOptions, nameof(clientOptions));
+ IsSessionReceiver = sessionOptions != null;
OwnsConnection = true;
Connection = new ServiceBusConnection(connectionString, entityName, clientOptions.ConnectionOptions);
RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy();
ReceiveMode = clientOptions.ReceiveMode;
- Consumer = Connection.CreateTransportConsumer(retryPolicy: RetryPolicy, sessionId: sessionId);
+ Consumer = Connection.CreateTransportConsumer(
+ retryPolicy: RetryPolicy,
+ prefetchCount: PrefetchCount,
+ sessionId: sessionOptions?.SessionId,
+ isSessionReceiver: IsSessionReceiver);
+ Session = new ServiceBusSession(
+ Consumer,
+ RetryPolicy);
}
- ///
- /// Initializes a new instance of the class.
+ /////
+ ///// Initializes a new instance of the class.
+ /////
+ /////
+ ///// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
+ ///// The name of the specific Service Bus entity to associate the consumer with.
+ /////
+ /////
+ ///// If the connection string is copied from the Service Bus entity itself, it will contain the name of the desired Service Bus entity,
+ ///// and can be used directly without passing the . The name of the Service Bus entity should be
+ ///// passed only once, either as part of the connection string or separately.
+ /////
+ /////
+ //public ServiceBusReceiver(
+ // string connectionString,
+ // string entityName) :
+ // this(
+ // connectionString,
+ // entityName,
+ // null,
+ // new ServiceBusReceiverClientOptions())
+ //{
+ //}
+
+ /////
+ ///// Initializes a new instance of the class.
+ /////
+ /////
+ ///// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
+ ///// The name of the specific Service Bus entity to associate the consumer with.
+ /////
+ /////
+ /////
+ ///// If the connection string is copied from the Service Bus entity itself, it will contain the name of the desired Service Bus entity,
+ ///// and can be used directly without passing the . The name of the Service Bus entity should be
+ ///// passed only once, either as part of the connection string or separately.
+ /////
+ /////
+ //public ServiceBusReceiver(
+ // string connectionString,
+ // string entityName,
+ // ServiceBusReceiverClientOptions clientOptions) :
+ // this(
+ // connectionString,
+ // entityName,
+ // null,
+ // clientOptions?.Clone() ?? new ServiceBusReceiverClientOptions())
+ //{
+ //}
+
+ ///
+ /// Initializes a new instance of the class.
///
///
/// The fully qualified Service Bus namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net.
/// The name of the specific Service Bus entity to associate the consumer with.
/// The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.
- ///
+ ///
/// A set of options to apply when configuring the consumer.
///
- internal ServiceBusReceiverClient(
+ internal ServiceBusReceiver(
string fullyQualifiedNamespace,
string entityName,
TokenCredential credential,
- string sessionId,
+ SessionOptions sessionOptions,
ServiceBusReceiverClientOptions clientOptions)
{
Argument.AssertNotNullOrEmpty(fullyQualifiedNamespace, nameof(fullyQualifiedNamespace));
@@ -145,41 +238,57 @@ internal ServiceBusReceiverClient(
Argument.AssertNotNull(credential, nameof(credential));
Argument.AssertNotNull(clientOptions, nameof(clientOptions));
+ IsSessionReceiver = sessionOptions != null;
OwnsConnection = true;
Connection = new ServiceBusConnection(fullyQualifiedNamespace, entityName, credential, clientOptions.ConnectionOptions);
RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy();
ReceiveMode = clientOptions.ReceiveMode;
- Consumer = Connection.CreateTransportConsumer(retryPolicy: RetryPolicy, sessionId:sessionId);
+ Consumer = Connection.CreateTransportConsumer(
+ retryPolicy: RetryPolicy,
+ prefetchCount: PrefetchCount,
+ sessionId: sessionOptions?.SessionId,
+ isSessionReceiver: IsSessionReceiver);
+ Session = new ServiceBusSession(
+ Consumer,
+ RetryPolicy);
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The connection to use for communication with the Service Bus service.
- ///
+ ///
/// A set of options to apply when configuring the consumer.
///
- internal ServiceBusReceiverClient(
+ internal ServiceBusReceiver(
ServiceBusConnection connection,
- string sessionId,
+ SessionOptions sessionOptions,
ServiceBusReceiverClientOptions clientOptions)
{
Argument.AssertNotNull(connection, nameof(connection));
Argument.AssertNotNull(clientOptions, nameof(clientOptions));
+ IsSessionReceiver = sessionOptions != null;
OwnsConnection = false;
Connection = connection;
RetryPolicy = clientOptions.RetryOptions.ToRetryPolicy();
ReceiveMode = clientOptions.ReceiveMode;
- Consumer = Connection.CreateTransportConsumer(retryPolicy: RetryPolicy, sessionId: sessionId);
+ Consumer = Connection.CreateTransportConsumer(
+ retryPolicy: RetryPolicy,
+ prefetchCount: PrefetchCount,
+ sessionId: sessionOptions?.SessionId,
+ isSessionReceiver: IsSessionReceiver);
+ Session = new ServiceBusSession(
+ Consumer,
+ RetryPolicy);
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
- protected ServiceBusReceiverClient()
+ protected ServiceBusReceiver()
{
OwnsConnection = false;
}
@@ -201,23 +310,23 @@ public virtual async IAsyncEnumerable ReceiveRangeAsync(
}
}
- ///
- /// Get a SessionReceiverClient scoped to the ServiceBusReceiverClient entity and a specified session.
- /// Note once the SessionReceiverClient is created it will be scoped to only the specified session for its lifetime.
- ///
- /// Session to receive messages from.
- /// A SessionReceiverClient instance scoped to the ServiceBusReceiverClient entity and specified session.
- public SessionReceiverClient GetSessionReceiverClient(string sessionId) =>
- new SessionReceiverClient(Connection, sessionId);
+ /////
+ ///// Get a SessionReceiverClient scoped to the ServiceBusReceiverClient entity and a specified session.
+ ///// Note once the SessionReceiverClient is created it will be scoped to only the specified session for its lifetime.
+ /////
+ ///// Session to receive messages from.
+ ///// A SessionReceiverClient instance scoped to the ServiceBusReceiverClient entity and specified session.
+ //internal SessionReceiverClient GetSessionReceiverClient(string sessionId) =>
+ // new SessionReceiverClient(Connection, sessionId);
- ///
- /// Get a SessionReceiverClient scoped to the current entity without specifying a particular session.
- /// The broker will decide what session to use for operations. Note once the SessionReceiverClient is created,
- /// it will be scoped to only one session for its lifetime.
- ///
- /// A SessionReceiverClient instance scoped to the ServiceBusReceiverClient entity and session determined by the broker.
- public SessionReceiverClient GetSessionReceiverClient() =>
- GetSessionReceiverClient(null);
+ /////
+ ///// Get a SessionReceiverClient scoped to the current entity without specifying a particular session.
+ ///// The broker will decide what session to use for operations. Note once the SessionReceiverClient is created,
+ ///// it will be scoped to only one session for its lifetime.
+ /////
+ ///// A SessionReceiverClient instance scoped to the ServiceBusReceiverClient entity and session determined by the broker.
+ //internal SessionReceiverClient GetSessionReceiverClient() =>
+ // GetSessionReceiverClient(null);
///
///
@@ -239,7 +348,7 @@ public virtual async Task ReceiveAsync(
///
public virtual async Task PeekAsync(CancellationToken cancellationToken = default)
{
- IAsyncEnumerator result = PeekRangeBySequenceAsync(fromSequenceNumber: 1).GetAsyncEnumerator();
+ IAsyncEnumerator result = PeekRangeBySequenceInternal(null).GetAsyncEnumerator();
await result.MoveNextAsync().ConfigureAwait(false);
return result.Current;
}
@@ -270,7 +379,7 @@ public virtual async IAsyncEnumerable PeekRangeAsync(
[EnumeratorCancellation]
CancellationToken cancellationToken = default)
{
- IAsyncEnumerable ret = PeekRangeBySequenceAsync(fromSequenceNumber: 1, maxMessages);
+ IAsyncEnumerable ret = PeekRangeBySequenceInternal(fromSequenceNumber: null, maxMessages);
await foreach (ServiceBusMessage msg in ret.ConfigureAwait(false))
{
yield return msg;
@@ -305,18 +414,25 @@ public virtual async IAsyncEnumerable PeekRangeBySequenceAsyn
///
///
///
- ///
///
///
internal async IAsyncEnumerable PeekRangeBySequenceInternal(
long? fromSequenceNumber,
int maxMessages = 1,
- string sessionId = null,
[EnumeratorCancellation]
CancellationToken cancellationToken = default)
{
- string receiveLinkName = "";
+ if (IsSessionReceiver)
+ {
+ // if this is a session receiver, the receive link must be open in order to peek messages
+ await RetryPolicy.RunOperation(
+ async (timeout) => await Consumer.ReceiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false),
+ EntityName,
+ Consumer.ConnectionScope,
+ cancellationToken).ConfigureAwait(false);
+ }
+ string receiveLinkName = "";
if (Consumer.ReceiveLink.TryGetOpenedObject(out ReceivingAmqpLink link))
{
receiveLinkName = link.Name;
@@ -326,7 +442,7 @@ internal async IAsyncEnumerable PeekRangeBySequenceInternal(
RetryPolicy,
fromSequenceNumber,
maxMessages,
- sessionId,
+ await Session.GetSessionId().ConfigureAwait(false),
receiveLinkName,
cancellationToken)
.ConfigureAwait(false))
@@ -419,7 +535,7 @@ public virtual async Task DeadLetterAsync(string lockToken, IDictionary
/// A lock token can be found in ,
/// only when is set to .
- /// In order to receive a message from the deadletter queue, you will need a new , with the corresponding path.
+ /// In order to receive a message from the deadletter queue, you will need a new , with the corresponding path.
/// You can use EntityNameHelper.FormatDeadLetterPath(string) to help with this.
/// This operation can only be performed on messages that were received by this receiver.
///
@@ -537,7 +653,7 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau
IsClosed = true;
var clientHash = GetHashCode().ToString();
- ServiceBusEventSource.Log.ClientCloseStart(typeof(ServiceBusReceiverClient), EntityName, clientHash);
+ ServiceBusEventSource.Log.ClientCloseStart(typeof(ServiceBusReceiver), EntityName, clientHash);
// Attempt to close the transport consumer. In the event that an exception is encountered,
// it should not impact the attempt to close the connection, assuming ownership.
@@ -550,7 +666,7 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau
}
catch (Exception ex)
{
- ServiceBusEventSource.Log.ClientCloseError(typeof(ServiceBusReceiverClient), EntityName, clientHash, ex.Message);
+ ServiceBusEventSource.Log.ClientCloseError(typeof(ServiceBusReceiver), EntityName, clientHash, ex.Message);
transportConsumerException = ex;
}
@@ -566,13 +682,13 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau
}
catch (Exception ex)
{
- ServiceBusEventSource.Log.ClientCloseError(typeof(ServiceBusReceiverClient), EntityName, clientHash, ex.Message);
+ ServiceBusEventSource.Log.ClientCloseError(typeof(ServiceBusReceiver), EntityName, clientHash, ex.Message);
transportConsumerException = null;
throw;
}
finally
{
- ServiceBusEventSource.Log.ClientCloseComplete(typeof(ServiceBusReceiverClient), EntityName, clientHash);
+ ServiceBusEventSource.Log.ClientCloseComplete(typeof(ServiceBusReceiver), EntityName, clientHash);
}
// If there was an active exception pending from closing the individual
@@ -585,7 +701,7 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau
}
///
- /// Performs the task needed to clean up resources used by the ,
+ /// Performs the task needed to clean up resources used by the ,
/// including ensuring that the client itself has been closed.
///
///
@@ -664,7 +780,7 @@ protected void RegisterMessageHandler(Func to configure the settings of the pump.
// TODO remove if won't be used
- public void RegisterSessionHandler(Func handler, Func exceptionReceivedHandler)
+ internal void RegisterSessionHandler(Func handler, Func exceptionReceivedHandler)
{
var sessionHandlerOptions = new SessionHandlerOptions(exceptionReceivedHandler);
this.RegisterSessionHandler(handler, sessionHandlerOptions);
@@ -678,18 +794,10 @@ public void RegisterSessionHandler(Func contains the session information, and must be used to perform Complete/Abandon/Deadletter or other such operations on the
/// Options used to configure the settings of the session pump.
/// Enable prefetch to speed up the receive rate.
- public void RegisterSessionHandler(Func handler, SessionHandlerOptions sessionHandlerOptions)
+ internal void RegisterSessionHandler(Func handler, SessionHandlerOptions sessionHandlerOptions)
{
// TODO remove if won't be used
}
-
- internal RetriableContext CreateRetriableContext(CancellationToken cancellationToken) =>
- new RetriableContext(
- Consumer.ConnectionScope,
- new Stopwatch(),
- RetryPolicy,
- EntityName,
- cancellationToken);
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiverClientOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiverClientOptions.cs
index 133417e846c5..a8228c98f3aa 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiverClientOptions.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiverClientOptions.cs
@@ -5,10 +5,10 @@
using Azure.Core;
using Azure.Messaging.ServiceBus.Core;
-namespace Azure.Messaging.ServiceBus.Receiver
+namespace Azure.Messaging.ServiceBus.Core
{
///
- /// The baseline set of options that can be specified when creating a
+ /// The baseline set of options that can be specified when creating a
/// to configure its behavior.
///
///
@@ -23,7 +23,7 @@ public abstract class ServiceBusReceiverClientOptions
///
///
///
- public int PrefetchCount = 0;
+ public uint PrefetchCount = 0;
///
///
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSession.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSession.cs
new file mode 100644
index 000000000000..7a353070c6dc
--- /dev/null
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSession.cs
@@ -0,0 +1,71 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Messaging.ServiceBus.Amqp;
+using Azure.Messaging.ServiceBus.Core;
+using Microsoft.Azure.Amqp;
+using Microsoft.Azure.Amqp.Framing;
+
+namespace Azure.Messaging.ServiceBus
+{
+ ///
+ ///
+ ///
+ public class ServiceBusSession
+ {
+ private readonly TransportConsumer _consumer;
+ private readonly ServiceBusRetryPolicy _retryPolicy;
+
+ internal ServiceBusSession(
+ TransportConsumer consumer,
+ ServiceBusRetryPolicy retryPolicy)
+ {
+ _consumer = consumer;
+ _retryPolicy = retryPolicy;
+ }
+
+ ///
+ /// TODO implement
+ ///
+ ///
+ public virtual async Task GetStateAsync(
+ CancellationToken cancellationToken = default)
+ {
+ return await Task.FromResult(new byte[4]).ConfigureAwait(false);
+ }
+
+ ///
+ /// TODO implement
+ ///
+ ///
+ ///
+ ///
+ public virtual async Task SetStateAsync(byte[] sessionState,
+ CancellationToken cancellationToken = default)
+ {
+ await Task.Delay(1).ConfigureAwait(false);
+ }
+
+ ///
+ /// TODO implement
+ ///
+ ///
+ public virtual async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)
+ {
+ await Task.Delay(1).ConfigureAwait(false);
+ }
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ public virtual async Task GetSessionId(CancellationToken cancellationToken = default) =>
+ await _consumer.GetSessionId(cancellationToken).ConfigureAwait(false);
+ }
+}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionOptions.cs
new file mode 100644
index 000000000000..730b46cb9874
--- /dev/null
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionOptions.cs
@@ -0,0 +1,25 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Azure.Messaging.ServiceBus
+{
+ ///
+ ///
+ ///
+ public class SessionOptions
+ {
+ ///
+ ///
+ ///
+ public string SessionId { get; set; }
+
+ ///
+ ///
+ ///
+ public ServiceBusConnection Connection { get; set; }
+ }
+}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionReceiverClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionReceiverClient.cs
deleted file mode 100755
index 0ad9ab57a494..000000000000
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionReceiverClient.cs
+++ /dev/null
@@ -1,333 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-using System;
-using System.Collections.Generic;
-using System.Diagnostics;
-using System.Runtime.CompilerServices;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-using Azure.Core;
-using Azure.Messaging.ServiceBus.Amqp;
-using Azure.Messaging.ServiceBus.Core;
-using Microsoft.Azure.Amqp;
-using Microsoft.Azure.Amqp.Framing;
-
-namespace Azure.Messaging.ServiceBus.Receiver
-{
- ///
- ///
- ///
- public class SessionReceiverClient : ServiceBusReceiverClient
- {
- ///
- /// Gets the time that the session identified by is locked until for this client.
- ///
- public virtual DateTime LockedUntilUtc { get; }
-
- ///
- /// The session Id for the session that the receiver is scoped to.
- ///
- public virtual string SessionId { get; set; }
-
- ///
- /// Constructor for mocking.
- ///
- protected SessionReceiverClient() { }
-
- ///
- ///
- ///
- ///
- ///
- ///
- internal SessionReceiverClient(
- ServiceBusConnection connection,
- string sessionId = null,
- SessionReceiverClientOptions clientOptions = default)
- : base(connection, sessionId, clientOptions?.Clone() ?? new SessionReceiverClientOptions())
- {
- SessionId = sessionId;
- }
- ///
- /// Initializes a new instance of the class.
- ///
- ///
- /// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
- ///
- ///
- ///
- /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
- /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ Service Bus entity NAME ]]" to the end of the
- /// connection string. For example, ";EntityPath=telemetry-hub".
- ///
- /// If you have defined a shared access policy directly on the Service Bus itself, then copying the connection string from that
- /// Service Bus entity will result in a connection string that contains the name.
- ///
- ///
- public SessionReceiverClient(
- string connectionString,
- string entityName)
- : base(connectionString, entityName, null, new SessionReceiverClientOptions())
- {
- }
-
- ///
- /// Initializes a new instance of the class.
- ///
- ///
- /// The connection string to use for connecting to the Service Bus namespace; it is expected that the entity name and the shared key properties are contained in this connection string.
- ///
- ///
- ///
- ///
- /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
- /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ ENTITY NAME ]]" to the end of the
- /// connection string. For example, ";EntityPath=telemetry-hub".
- ///
- /// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
- /// Service Bus entity will result in a connection string that contains the name.
- ///
- ///
- public SessionReceiverClient(
- string connectionString,
- string entityName,
- SessionReceiverClientOptions clientOptions)
- : base(connectionString, entityName, null, clientOptions?.Clone() ?? new SessionReceiverClientOptions())
- {
- }
-
- ///
- /// Initializes a new instance of the class.
- ///
- ///
- ///
- /// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
- ///
- ///
- ///
- /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired entity,
- /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ entity name ]]" to the end of the
- /// connection string. For example, ";EntityPath=orders-queue".
- ///
- /// If you have defined a shared access policy directly on the entity itself, then copying the connection string from that
- /// entity will result in a connection string that contains the name.
- ///
- ///
- public SessionReceiverClient(
- string sessionId,
- string connectionString,
- string entityName)
- : base(connectionString, entityName, sessionId, new SessionReceiverClientOptions())
- {
- SessionId = sessionId;
- }
-
- ///
- /// Initializes a new instance of the class.
- ///
- ///
- ///
- /// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
- ///
- /// The set of options to use for this consumer.
- ///
- ///
- /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
- /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ Service Bus entity NAME ]]" to the end of the
- /// connection string. For example, ";EntityPath=telemetry-hub".
- ///
- /// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
- /// Service Bus entity will result in a connection string that contains the name.
- ///
- ///
- public SessionReceiverClient(
- string sessionId,
- string connectionString,
- string entityName,
- SessionReceiverClientOptions clientOptions)
- : base(connectionString, entityName, sessionId, clientOptions?.Clone() ?? new SessionReceiverClientOptions())
- {
- SessionId = sessionId;
- }
-
-
- ///
- /// Initializes a new instance of the class.
- ///
- ///
- ///
- /// The fully qualified Service Bus namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net.
- /// The name of the specific entity to associate the receiver with.
- /// The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.
- /// A set of options to apply when configuring the consumer.
- ///
- public SessionReceiverClient(
- string sessionId,
- string fullyQualifiedNamespace,
- string entityName,
- TokenCredential credential,
- SessionReceiverClientOptions clientOptions = default)
- : base(fullyQualifiedNamespace, entityName, credential, sessionId, clientOptions?.Clone() ?? new SessionReceiverClientOptions())
- {
- SessionId = sessionId;
- }
-
- ///
- /// Initializes a new instance of the class.
- ///
- ///
- /// The fully qualified Service Bus namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net.
- /// The name of the specific entity to associate the receiver with.
- /// The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.
- /// A set of options to apply when configuring the consumer.
- ///
- public SessionReceiverClient(
- string fullyQualifiedNamespace,
- string entityName,
- TokenCredential credential,
- SessionReceiverClientOptions clientOptions = default)
- : base(fullyQualifiedNamespace, entityName, credential, null, clientOptions?.Clone() ?? new SessionReceiverClientOptions())
- {
- }
-
- ///
- ///
- ///
- ///
- ///
- public override async Task PeekAsync(CancellationToken cancellationToken = default)
- {
- IAsyncEnumerator result = PeekRangeBySequenceInternal(fromSequenceNumber: null).GetAsyncEnumerator();
- await result.MoveNextAsync().ConfigureAwait(false);
- return result.Current;
- }
-
- ///
- ///
- ///
- ///
- ///
- ///
- public override async Task PeekBySequenceAsync(
- long fromSequenceNumber,
- CancellationToken cancellationToken = default)
- {
- IAsyncEnumerator result = PeekRangeBySequenceAsync(fromSequenceNumber: fromSequenceNumber).GetAsyncEnumerator();
- await result.MoveNextAsync().ConfigureAwait(false);
- return result.Current;
- }
-
- ///
- ///
- ///
- ///
- ///
- ///
- public override async IAsyncEnumerable PeekRangeAsync(
- int maxMessages,
- [EnumeratorCancellation]
- CancellationToken cancellationToken = default)
- {
- IAsyncEnumerable ret = PeekRangeBySequenceInternal(fromSequenceNumber: null, maxMessages);
- await foreach (ServiceBusMessage msg in ret.ConfigureAwait(false))
- {
- yield return msg;
- }
- }
-
- ///
- ///
- ///
- ///
- ///
- ///
- ///
- public override async IAsyncEnumerable PeekRangeBySequenceAsync(
- long fromSequenceNumber,
- int maxMessages = 1,
- [EnumeratorCancellation]
- CancellationToken cancellationToken = default)
- {
-
- IAsyncEnumerable ret = PeekRangeBySequenceInternal(
- fromSequenceNumber: fromSequenceNumber,
- maxMessages: maxMessages,
- cancellationToken: cancellationToken);
- await foreach (ServiceBusMessage msg in ret.ConfigureAwait(false))
- {
- yield return msg;
- }
- }
-
- ///
- ///
- ///
- ///
- ///
- ///
- ///
- internal async IAsyncEnumerable PeekRangeBySequenceInternal(
- long? fromSequenceNumber,
- int maxMessages = 1,
- [EnumeratorCancellation]
- CancellationToken cancellationToken = default)
- {
-
- RetriableContext context = CreateRetriableContext(cancellationToken);
- ReceivingAmqpLink openedLink =
- await context.RunOperation(
- async () => await Consumer.ReceiveLink.GetOrCreateAsync(context.TimeSpan)
- .ConfigureAwait(false))
- .ConfigureAwait(false);
-
- var source = (Source)openedLink.Settings.Source;
- if (source.FilterSet.TryGetValue(AmqpClientConstants.SessionFilterName, out var tempSessionId))
- {
- // If one of the constructors not accepting a SessionId was used, the broker will determine which session to send messages from.
- SessionId = tempSessionId;
- }
- IAsyncEnumerable ret = PeekRangeBySequenceInternal(
- fromSequenceNumber: fromSequenceNumber,
- maxMessages: maxMessages,
- sessionId: SessionId,
- cancellationToken: cancellationToken);
- await foreach (ServiceBusMessage msg in ret.ConfigureAwait(false))
- {
- yield return msg;
- }
- }
-
- ///
- /// TODO implement
- ///
- ///
- public virtual async Task GetStateAsync(
- CancellationToken cancellationToken = default)
- {
- return await Task.FromResult(new byte[4]).ConfigureAwait(false);
- }
-
- ///
- /// TODO implement
- ///
- ///
- ///
- ///
- public virtual async Task SetStateAsync(byte[] sessionState,
- CancellationToken cancellationToken = default)
- {
- await Task.Delay(1).ConfigureAwait(false);
- }
-
- ///
- /// TODO implement
- ///
- ///
- public virtual async Task RenewSessionLockAsync(CancellationToken cancellationToken = default)
- {
- await Task.Delay(1).ConfigureAwait(false);
- }
- }
-}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionReceiverClientOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionReceiverClientOptions.cs
deleted file mode 100644
index 5323d0080eb0..000000000000
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionReceiverClientOptions.cs
+++ /dev/null
@@ -1,26 +0,0 @@
-// Copyright (c) Microsoft Corporation. All rights reserved.
-// Licensed under the MIT License.
-
-using Azure.Messaging.ServiceBus.Core;
-
-namespace Azure.Messaging.ServiceBus.Receiver
-{
- ///
- ///
- ///
- public class SessionReceiverClientOptions : ServiceBusReceiverClientOptions
- {
- ///
- /// Creates a new copy of the current , cloning its attributes into a new instance.
- ///
- ///
- /// A new copy of .
- ///
- internal SessionReceiverClientOptions Clone() =>
- new SessionReceiverClientOptions
- {
- _connectionOptions = ConnectionOptions.Clone(),
- _retryOptions = RetryOptions.Clone()
- };
- }
-}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionReceiverClient.cs
similarity index 57%
rename from sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionClient.cs
rename to sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionReceiverClient.cs
index 3b4a4ba75dba..a8fc12729abd 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionClient.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionReceiverClient.cs
@@ -11,12 +11,12 @@
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Filters;
-namespace Azure.Messaging.ServiceBus.Receiver
+namespace Azure.Messaging.ServiceBus
{
///
///
///
- public class SubscriptionClient : ServiceBusReceiverClient
+ public class SubscriptionReceiverClient : ServiceBusReceiver
{
///
/// Gets the path of the corresponding topic.
@@ -37,9 +37,9 @@ public class SubscriptionClient : ServiceBusReceiverClient
///
///
///
- protected SubscriptionClient() { }
+ protected SubscriptionReceiverClient() { }
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
@@ -53,16 +53,61 @@ protected SubscriptionClient() { }
/// Service Bus entity will result in a connection string that contains the name.
///
///
- public SubscriptionClient(string connectionString)
- : base(connectionString, null, null, default(SubscriptionClientOptions))
+ public SubscriptionReceiverClient(string connectionString)
+ : base(connectionString, null, null, default(SubscriptionReceiverClientOptions))
{
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
+ ///
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
+ /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ Service Bus entity NAME ]]" to the end of the
+ /// connection string. For example, ";EntityPath=telemetry-hub".
+ ///
+ /// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
+ /// Service Bus entity will result in a connection string that contains the name.
+ ///
+ ///
+ public SubscriptionReceiverClient(string connectionString, SessionOptions sessionOptions)
+ : base(connectionString, null, sessionOptions, default(SubscriptionReceiverClientOptions))
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
+ /// The set of options to use for this consumer.
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
+ /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ Service Bus entity NAME ]]" to the end of the
+ /// connection string. For example, ";EntityPath=telemetry-hub".
+ ///
+ /// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
+ /// Service Bus entity will result in a connection string that contains the name.
+ ///
+ ///
+ public SubscriptionReceiverClient(
+ string connectionString,
+ SubscriptionReceiverClientOptions clientOptions)
+ : base(connectionString, null, null, clientOptions?.Clone() ?? new SubscriptionReceiverClientOptions())
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
+ ///
/// The set of options to use for this consumer.
///
///
@@ -74,19 +119,43 @@ public SubscriptionClient(string connectionString)
/// Service Bus entity will result in a connection string that contains the name.
///
///
- public SubscriptionClient(
+ public SubscriptionReceiverClient(
+ string connectionString,
+ SessionOptions sessionOptions,
+ SubscriptionReceiverClientOptions clientOptions)
+ : base(connectionString, null, sessionOptions, clientOptions?.Clone() ?? new SubscriptionReceiverClientOptions())
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
+ /// The name of the specific Service Bus entity to associate the consumer with.
+ ///
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus entity itself, it will contain the name of the desired Service Bus entity,
+ /// and can be used directly without passing the . The name of the Service Bus entity should be
+ /// passed only once, either as part of the connection string or separately.
+ ///
+ ///
+ public SubscriptionReceiverClient(
string connectionString,
- SubscriptionClientOptions clientOptions)
- : base(connectionString, null, null, clientOptions?.Clone() ?? new SubscriptionClientOptions())
+ string subscriptionName,
+ SubscriptionReceiverClientOptions clientOptions = default)
+ : base(connectionString, subscriptionName, null, clientOptions?.Clone() ?? new SubscriptionReceiverClientOptions())
{
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
/// The name of the specific Service Bus entity to associate the consumer with.
+ ///
///
///
///
@@ -95,29 +164,50 @@ public SubscriptionClient(
/// passed only once, either as part of the connection string or separately.
///
///
- public SubscriptionClient(
+ public SubscriptionReceiverClient(
string connectionString,
string subscriptionName,
- SubscriptionClientOptions clientOptions = default)
- : base(connectionString, subscriptionName, null, clientOptions?.Clone() ?? new SubscriptionClientOptions())
+ SessionOptions sessionOptions,
+ SubscriptionReceiverClientOptions clientOptions = default)
+ : base(connectionString, subscriptionName, sessionOptions, clientOptions?.Clone() ?? new SubscriptionReceiverClientOptions())
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The fully qualified Service Bus namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net.
+ /// The name of the specific Service Bus entity to associate the consumer with.
+ /// The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.
+ /// A set of options to apply when configuring the consumer.
+ ///
+ public SubscriptionReceiverClient(
+ string fullyQualifiedNamespace,
+ string subscriptionName,
+ TokenCredential credential,
+ SubscriptionReceiverClientOptions clientOptions = default)
+ : base(fullyQualifiedNamespace, subscriptionName, credential, null, clientOptions?.Clone() ?? new SubscriptionReceiverClientOptions())
{
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The fully qualified Service Bus namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net.
/// The name of the specific Service Bus entity to associate the consumer with.
/// The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.
+ ///
/// A set of options to apply when configuring the consumer.
///
- public SubscriptionClient(
+ public SubscriptionReceiverClient(
string fullyQualifiedNamespace,
string subscriptionName,
TokenCredential credential,
- SubscriptionClientOptions clientOptions = default)
- : base(fullyQualifiedNamespace, subscriptionName, credential, null, clientOptions?.Clone() ?? new SubscriptionClientOptions())
+ SessionOptions sessionOptions,
+ SubscriptionReceiverClientOptions clientOptions = default)
+ : base(fullyQualifiedNamespace, subscriptionName, credential, sessionOptions, clientOptions?.Clone() ?? new SubscriptionReceiverClientOptions())
{
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionClientOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionReceiverClientOptions.cs
similarity index 54%
rename from sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionClientOptions.cs
rename to sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionReceiverClientOptions.cs
index ea4e07a2d621..03062c51d5dc 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionClientOptions.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SubscriptionReceiverClientOptions.cs
@@ -3,21 +3,21 @@
using Azure.Messaging.ServiceBus.Core;
-namespace Azure.Messaging.ServiceBus.Receiver
+namespace Azure.Messaging.ServiceBus
{
///
///
///
- public class SubscriptionClientOptions : ServiceBusReceiverClientOptions
+ public class SubscriptionReceiverClientOptions : ServiceBusReceiverClientOptions
{
///
- /// Creates a new copy of the current , cloning its attributes into a new instance.
+ /// Creates a new copy of the current , cloning its attributes into a new instance.
///
///
- /// A new copy of .
+ /// A new copy of .
///
- internal SubscriptionClientOptions Clone() =>
- new SubscriptionClientOptions
+ internal SubscriptionReceiverClientOptions Clone() =>
+ new SubscriptionReceiverClientOptions
{
_connectionOptions = ConnectionOptions.Clone(),
_retryOptions = RetryOptions.Clone()
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/QueueSenderClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/QueueSenderClient.cs
new file mode 100644
index 000000000000..018fbe384903
--- /dev/null
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/QueueSenderClient.cs
@@ -0,0 +1,116 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using Azure.Core;
+using Azure.Messaging.ServiceBus.Core;
+
+namespace Azure.Messaging.ServiceBus
+{
+ ///
+ ///
+ ///
+ public class QueueSenderClient : ServiceBusSender
+ {
+ ///
+ ///
+ ///
+ protected QueueSenderClient() { }
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
+ /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ Service Bus entity NAME ]]" to the end of the
+ /// connection string. For example, ";EntityPath=orders-queue".
+ ///
+ /// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
+ /// Service Bus entity will result in a connection string that contains the name.
+ ///
+ ///
+ public QueueSenderClient(string connectionString) : base(connectionString, null, null)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
+ /// The set of options to use for this consumer.
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
+ /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ Service Bus entity NAME ]]" to the end of the
+ /// connection string. For example, ";EntityPath=orders-queue".
+ ///
+ /// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
+ /// Service Bus entity will result in a connection string that contains the name.
+ ///
+ ///
+ public QueueSenderClient(string connectionString, ServiceBusSenderClientOptions clientOptions)
+ : base(connectionString, null, clientOptions)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
+ /// The name of the specific Service Bus entity to associate the producer with.
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus entity itself, it will contain the name of the desired Service Bus entity,
+ /// and can be used directly without passing the . The name of the Service Bus entity should be
+ /// passed only once, either as part of the connection string or separately.
+ ///
+ ///
+ public QueueSenderClient(string connectionString, string entityName)
+ : base(connectionString, entityName, null)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
+ /// The name of the specific Service Bus entity to associate the producer with.
+ /// A set of options to apply when configuring the producer.
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus entity itself, it will contain the name of the desired Service Bus entity,
+ /// and can be used directly without passing the . The name of the Service Bus entity should be
+ /// passed only once, either as part of the connection string or separately.
+ ///
+ ///
+ public QueueSenderClient(
+ string connectionString,
+ string entityName,
+ ServiceBusSenderClientOptions clientOptions)
+ :base(connectionString, entityName, clientOptions)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The fully qualified Service Bus namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net.
+ /// The name of the specific Service Bus entity to associated the producer with.
+ /// The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.
+ /// A set of options to apply when configuring the producer.
+ ///
+ public QueueSenderClient(
+ string fullyQualifiedNamespace,
+ string entityName,
+ TokenCredential credential,
+ ServiceBusSenderClientOptions clientOptions = default)
+ : base(fullyQualifiedNamespace, entityName, credential, clientOptions)
+ {
+ }
+ }
+}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSenderClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs
similarity index 91%
rename from sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSenderClient.cs
rename to sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs
index fb2ec58662a3..2eabdde227db 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSenderClient.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs
@@ -15,13 +15,13 @@
using Azure.Messaging.ServiceBus.Diagnostics;
using Microsoft.Azure.Amqp;
-namespace Azure.Messaging.ServiceBus.Sender
+namespace Azure.Messaging.ServiceBus.Core
{
///
/// A client responsible for sending to a specific Service Bus entity (queue or topic).
///
///
- public class ServiceBusSenderClient : IAsyncDisposable
+ public abstract class ServiceBusSender : IAsyncDisposable
{
///
/// The fully qualified Service Bus namespace that the producer is associated with. This is likely
@@ -38,7 +38,7 @@ public class ServiceBusSenderClient : IAsyncDisposable
public string EntityName => Connection.EntityName;
///
- /// Indicates whether or not this has been closed.
+ /// Indicates whether or not this has been closed.
///
///
///
@@ -80,7 +80,7 @@ public class ServiceBusSenderClient : IAsyncDisposable
private ClientDiagnostics ClientDiagnostics { get; set; }
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
@@ -94,12 +94,12 @@ public class ServiceBusSenderClient : IAsyncDisposable
/// Service Bus entity will result in a connection string that contains the name.
///
///
- public ServiceBusSenderClient(string connectionString) : this(connectionString, null, null)
+ public ServiceBusSender(string connectionString) : this(connectionString, null, null)
{
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
@@ -114,13 +114,13 @@ public ServiceBusSenderClient(string connectionString) : this(connectionString,
/// Service Bus entity will result in a connection string that contains the name.
///
///
- public ServiceBusSenderClient(string connectionString, ServiceBusSenderClientOptions clientOptions)
+ public ServiceBusSender(string connectionString, ServiceBusSenderClientOptions clientOptions)
: this(connectionString, null, clientOptions)
{
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
@@ -132,13 +132,13 @@ public ServiceBusSenderClient(string connectionString, ServiceBusSenderClientOpt
/// passed only once, either as part of the connection string or separately.
///
///
- public ServiceBusSenderClient(string connectionString, string entityName)
+ public ServiceBusSender(string connectionString, string entityName)
: this(connectionString, entityName, null)
{
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
@@ -151,7 +151,7 @@ public ServiceBusSenderClient(string connectionString, string entityName)
/// passed only once, either as part of the connection string or separately.
///
///
- public ServiceBusSenderClient(
+ public ServiceBusSender(
string connectionString,
string entityName,
ServiceBusSenderClientOptions clientOptions)
@@ -166,7 +166,7 @@ public ServiceBusSenderClient(
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The fully qualified Service Bus namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net.
@@ -174,7 +174,7 @@ public ServiceBusSenderClient(
/// The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.
/// A set of options to apply when configuring the producer.
///
- public ServiceBusSenderClient(
+ public ServiceBusSender(
string fullyQualifiedNamespace,
string entityName,
TokenCredential credential,
@@ -195,13 +195,13 @@ public ServiceBusSenderClient(
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
/// The connection to use for communication with the Service Bus service.
/// A set of options to apply when configuring the producer.
///
- internal ServiceBusSenderClient(
+ internal ServiceBusSender(
ServiceBusConnection connection,
ServiceBusSenderClientOptions clientOptions = default)
{
@@ -216,34 +216,10 @@ internal ServiceBusSenderClient(
}
///
- /// Initializes a new instance of the class.
+ /// Initializes a new instance of the class.
///
///
- /// The connection to use as the basis for delegation of client-type operations.
- /// The transport producer instance to use as the basis for service communication.
- ///
- ///
- /// This constructor is intended to be used internally for functional
- /// testing only.
- ///
- ///
- internal ServiceBusSenderClient(
- ServiceBusConnection connection,
- TransportSender transportProducer)
- {
- Argument.AssertNotNull(connection, nameof(connection));
- Argument.AssertNotNull(transportProducer, nameof(transportProducer));
-
- OwnsConnection = false;
- Connection = connection;
- InnerSender = transportProducer;
- }
-
- ///
- /// Initializes a new instance of the class.
- ///
- ///
- protected ServiceBusSenderClient()
+ protected ServiceBusSender()
{
OwnsConnection = false;
}
@@ -377,7 +353,7 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau
IsClosed = true;
var identifier = GetHashCode().ToString();
- ServiceBusEventSource.Log.ClientCloseStart(typeof(ServiceBusSenderClient), EntityName, identifier);
+ ServiceBusEventSource.Log.ClientCloseStart(typeof(ServiceBusSender), EntityName, identifier);
// Attempt to close the active transport producers. In the event that an exception is encountered,
// it should not impact the attempt to close the connection, assuming ownership.
@@ -390,7 +366,7 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau
}
catch (Exception ex)
{
- ServiceBusEventSource.Log.ClientCloseError(typeof(ServiceBusSenderClient), EntityName, identifier, ex.Message);
+ ServiceBusEventSource.Log.ClientCloseError(typeof(ServiceBusSender), EntityName, identifier, ex.Message);
transportProducerException = ex;
}
@@ -406,12 +382,12 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau
}
catch (Exception ex)
{
- ServiceBusEventSource.Log.ClientCloseError(typeof(ServiceBusSenderClient), EntityName, identifier, ex.Message);
+ ServiceBusEventSource.Log.ClientCloseError(typeof(ServiceBusSender), EntityName, identifier, ex.Message);
throw;
}
finally
{
- ServiceBusEventSource.Log.ClientCloseComplete(typeof(ServiceBusSenderClient), EntityName, identifier);
+ ServiceBusEventSource.Log.ClientCloseComplete(typeof(ServiceBusSender), EntityName, identifier);
}
// If there was an active exception pending from closing the individual
@@ -424,7 +400,7 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken = defau
}
///
- /// Performs the task needed to clean up resources used by the ,
+ /// Performs the task needed to clean up resources used by the ,
/// including ensuring that the client itself has been closed.
///
///
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSenderClientOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSenderClientOptions.cs
index 0e36f409cd4b..f0ce32257f42 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSenderClientOptions.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSenderClientOptions.cs
@@ -5,10 +5,10 @@
using Azure.Core;
using Azure.Messaging.ServiceBus.Core;
-namespace Azure.Messaging.ServiceBus.Sender
+namespace Azure.Messaging.ServiceBus
{
///
- /// The set of options that can be specified when creating an
+ /// The set of options that can be specified when creating a Sender instance />
/// to configure its behavior.
///
///
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/TopicSenderClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/TopicSenderClient.cs
new file mode 100644
index 000000000000..2bca0b7af24b
--- /dev/null
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/TopicSenderClient.cs
@@ -0,0 +1,116 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using Azure.Core;
+using Azure.Messaging.ServiceBus.Core;
+
+namespace Azure.Messaging.ServiceBus
+{
+ ///
+ ///
+ ///
+ public class TopicSenderClient : ServiceBusSender
+ {
+ ///
+ ///
+ ///
+ protected TopicSenderClient() { }
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
+ /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ Service Bus entity NAME ]]" to the end of the
+ /// connection string. For example, ";EntityPath=orders-queue".
+ ///
+ /// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
+ /// Service Bus entity will result in a connection string that contains the name.
+ ///
+ ///
+ public TopicSenderClient(string connectionString) : base(connectionString, null, null)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the Service Bus entity name and the shared key properties are contained in this connection string.
+ /// The set of options to use for this consumer.
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus namespace, it will likely not contain the name of the desired Service Bus entity,
+ /// which is needed. In this case, the name can be added manually by adding ";EntityPath=[[ Service Bus entity NAME ]]" to the end of the
+ /// connection string. For example, ";EntityPath=orders-queue".
+ ///
+ /// If you have defined a shared access policy directly on the Service Bus entity itself, then copying the connection string from that
+ /// Service Bus entity will result in a connection string that contains the name.
+ ///
+ ///
+ public TopicSenderClient(string connectionString, ServiceBusSenderClientOptions clientOptions)
+ : base(connectionString, null, clientOptions)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
+ /// The name of the specific Service Bus entity to associate the producer with.
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus entity itself, it will contain the name of the desired Service Bus entity,
+ /// and can be used directly without passing the . The name of the Service Bus entity should be
+ /// passed only once, either as part of the connection string or separately.
+ ///
+ ///
+ public TopicSenderClient(string connectionString, string entityName)
+ : base(connectionString, entityName, null)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The connection string to use for connecting to the Service Bus namespace; it is expected that the shared key properties are contained in this connection string, but not the Service Bus entity name.
+ /// The name of the specific Service Bus entity to associate the producer with.
+ /// A set of options to apply when configuring the producer.
+ ///
+ ///
+ /// If the connection string is copied from the Service Bus entity itself, it will contain the name of the desired Service Bus entity,
+ /// and can be used directly without passing the . The name of the Service Bus entity should be
+ /// passed only once, either as part of the connection string or separately.
+ ///
+ ///
+ public TopicSenderClient(
+ string connectionString,
+ string entityName,
+ ServiceBusSenderClientOptions clientOptions)
+ : base(connectionString, entityName, clientOptions)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ ///
+ /// The fully qualified Service Bus namespace to connect to. This is likely to be similar to {yournamespace}.servicebus.windows.net.
+ /// The name of the specific Service Bus entity to associated the producer with.
+ /// The Azure managed identity credential to use for authorization. Access controls may be specified by the Service Bus namespace or the requested Service Bus entity, depending on Azure configuration.
+ /// A set of options to apply when configuring the producer.
+ ///
+ public TopicSenderClient(
+ string fullyQualifiedNamespace,
+ string entityName,
+ TokenCredential credential,
+ ServiceBusSenderClientOptions clientOptions = default)
+ : base(fullyQualifiedNamespace, entityName, credential, clientOptions)
+ {
+ }
+ }
+}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusConnection.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusConnection.cs
index 9d368f43a81b..a4ada62fc4dc 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusConnection.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusConnection.cs
@@ -10,7 +10,6 @@
using Azure.Core;
using Azure.Messaging.ServiceBus.Amqp;
using Azure.Messaging.ServiceBus.Authorization;
-using Azure.Messaging.ServiceBus.Receiver;
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Diagnostics;
using Newtonsoft.Json.Linq;
@@ -27,7 +26,7 @@ namespace Azure.Messaging.ServiceBus
///
///
///
- internal class ServiceBusConnection : IAsyncDisposable
+ public class ServiceBusConnection : IAsyncDisposable
{
///
/// The fully qualified Service Bus namespace that the connection is associated with. This is likely
@@ -310,8 +309,13 @@ internal virtual async Task> PeekAsync(
string sessionId = null,
string receiveLinkName = null,
CancellationToken cancellationToken = default) =>
- await InnerClient.PeekAsync(retryPolicy, fromSequenceNumber, messageCount, sessionId, receiveLinkName, cancellationToken)
- .ConfigureAwait(false);
+ await InnerClient.PeekAsync(
+ retryPolicy,
+ fromSequenceNumber,
+ messageCount,
+ sessionId,
+ receiveLinkName,
+ cancellationToken).ConfigureAwait(false);
///
///
@@ -381,6 +385,7 @@ internal virtual TransportSender CreateTransportProducer(ServiceBusRetryPolicy r
/// The relative priority to associate with the link; for a non-exclusive link, this value should be null.
/// Controls the number of events received and queued locally without regard to whether an operation was requested. If null a default will be used.
///
+ ///
///
/// A configured in the requested manner.
///
@@ -388,10 +393,11 @@ internal virtual TransportConsumer CreateTransportConsumer(
ServiceBusRetryPolicy retryPolicy,
long? ownerLevel = default,
uint? prefetchCount = default,
- string sessionId = default)
+ string sessionId = default,
+ bool isSessionReceiver = default)
{
Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
- return InnerClient.CreateConsumer(retryPolicy, ownerLevel, prefetchCount, sessionId);
+ return InnerClient.CreateConsumer(retryPolicy, ownerLevel, prefetchCount, sessionId, isSessionReceiver);
}
///
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusException.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusException.cs
index 987b56fbf798..ab225b25e630 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusException.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusException.cs
@@ -3,7 +3,6 @@
using System;
using System.Globalization;
-using Azure.Messaging.ServiceBus.Receiver;
namespace Azure.Messaging.ServiceBus
{
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusRetryPolicy.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusRetryPolicy.cs
index b3c7b1646f76..e872a449fa68 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusRetryPolicy.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/ServiceBusRetryPolicy.cs
@@ -4,6 +4,11 @@
using System;
using System.ComponentModel;
using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+using Azure.Messaging.ServiceBus.Amqp;
+using Azure.Messaging.ServiceBus.Core;
+using Microsoft.Azure.Amqp;
namespace Azure.Messaging.ServiceBus
{
@@ -72,5 +77,75 @@ public abstract class ServiceBusRetryPolicy
///
[EditorBrowsable(EditorBrowsableState.Never)]
public override string ToString() => base.ToString();
+
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ internal async Task RunOperation(
+ Func operation,
+ string entityName,
+ TransportConnectionScope scope,
+ CancellationToken cancellationToken)
+ {
+ var failedAttemptCount = 0;
+
+ try
+ {
+ TimeSpan tryTimeout = CalculateTryTimeout(0);
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+
+ try
+ {
+ await operation(tryTimeout).ConfigureAwait(false);
+ return;
+ }
+
+ catch (Exception ex)
+ {
+ Exception activeEx = ex.TranslateServiceException(entityName);
+
+ // Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
+ // Otherwise, mark the exception as active and break out of the loop.
+
+ ++failedAttemptCount;
+ TimeSpan? retryDelay = CalculateRetryDelay(ex, failedAttemptCount);
+ if (retryDelay.HasValue && !scope.IsDisposed && !cancellationToken.IsCancellationRequested)
+ {
+ //EventHubsEventSource.Log.GetPropertiesError(EventHubName, activeEx.Message);
+ await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
+
+ tryTimeout = CalculateTryTimeout(failedAttemptCount);
+ }
+ else if (ex is AmqpException)
+ {
+ throw activeEx;
+ }
+ else
+ {
+ throw;
+ }
+ }
+ }
+ // If no value has been returned nor exception thrown by this point,
+ // then cancellation has been requested.
+ throw new TaskCanceledException();
+ }
+ catch (Exception exception)
+ {
+ throw exception;
+ //TODO through correct exception throw AmqpExceptionHelper.GetClientException(exception);
+ }
+ finally
+ {
+ //TODO log correct completion event ServiceBusEventSource.Log.PeekMessagesComplete(EntityName);
+ }
+ }
}
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/SessionHandlerOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/SessionHandlerOptions.cs
index 3d47d67d52e3..3add2eaaa357 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/SessionHandlerOptions.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/SessionHandlerOptions.cs
@@ -6,7 +6,7 @@
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Primitives;
-namespace Azure.Messaging.ServiceBus.Receiver
+namespace Azure.Messaging.ServiceBus
{
/// Provides options associated with session pump processing using
/// cref="QueueClient.RegisterSessionHandler(Func{IMessageSession, Message, CancellationToken, Task}, SessionHandlerOptions)" /> and
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs
old mode 100755
new mode 100644
index 6f2da1d08114..13e2efcf3639
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs
@@ -4,8 +4,8 @@
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
-using Azure.Messaging.ServiceBus.Receiver;
-using Azure.Messaging.ServiceBus.Sender;
+using Azure.Core.Testing;
+using Azure.Messaging.ServiceBus.Core;
using NUnit.Framework;
namespace Azure.Messaging.ServiceBus.Tests
@@ -17,7 +17,7 @@ public async Task Peek()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
- await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ await using var sender = new QueueSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
var messageCt = 10;
IEnumerable sentMessages = GetMessages(messageCt);
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs
old mode 100755
new mode 100644
index 3763650fc17e..284672079be8
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs
@@ -6,8 +6,7 @@
using System.Threading.Tasks;
using Azure.Identity;
using Azure.Messaging.ServiceBus;
-using Azure.Messaging.ServiceBus.Receiver;
-using Azure.Messaging.ServiceBus.Sender;
+using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Tests;
using NUnit.Framework;
@@ -20,7 +19,7 @@ public async Task Send_ConnString()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
- await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ await using var sender = new QueueSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
await sender.SendRangeAsync(GetMessages(10));
}
}
@@ -35,7 +34,7 @@ public async Task Send_Token()
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
- await using var sender = new ServiceBusSenderClient(TestEnvironment.FullyQualifiedNamespace, scope.QueueName, credential);
+ await using var sender = new QueueSenderClient(TestEnvironment.FullyQualifiedNamespace, scope.QueueName, credential);
await sender.SendAsync(GetMessage());
}
}
@@ -45,19 +44,18 @@ public async Task Send_Connection_Topic()
{
await using (var scope = await ServiceBusScope.CreateWithTopic(enablePartitioning: false, enableSession: false))
{
- await using var conn = new ServiceBusConnection(TestEnvironment.ServiceBusConnectionString, scope.TopicName);
var options = new ServiceBusSenderClientOptions
{
RetryOptions = new ServiceBusRetryOptions(),
ConnectionOptions = new ServiceBusConnectionOptions()
{
TransportType = ServiceBusTransportType.AmqpWebSockets,
- Proxy = new WebProxy("localHost")
+ Proxy = WebRequest.DefaultWebProxy
}
};
options.RetryOptions.Mode = ServiceBusRetryMode.Exponential;
- await using var sender = new ServiceBusSenderClient(conn, options);
+ await using var sender = new TopicSenderClient(TestEnvironment.ServiceBusConnectionString, scope.TopicName, options);
await sender.SendAsync(GetMessage());
}
}
@@ -67,18 +65,17 @@ public async Task Send_Topic_Session()
{
await using (var scope = await ServiceBusScope.CreateWithTopic(enablePartitioning: false, enableSession: false))
{
- await using var conn = new ServiceBusConnection(TestEnvironment.ServiceBusConnectionString, scope.TopicName);
var options = new ServiceBusSenderClientOptions
{
RetryOptions = new ServiceBusRetryOptions(),
ConnectionOptions = new ServiceBusConnectionOptions()
{
TransportType = ServiceBusTransportType.AmqpWebSockets,
- Proxy = new WebProxy("localHost")
+ Proxy = WebRequest.DefaultWebProxy
}
};
options.RetryOptions.Mode = ServiceBusRetryMode.Exponential;
- await using var sender = new ServiceBusSenderClient(conn, options);
+ await using var sender = new TopicSenderClient(TestEnvironment.ServiceBusConnectionString, scope.TopicName, options);
var message = GetMessage();
message.SessionId = "1";
await sender.SendAsync(message);
@@ -90,7 +87,7 @@ public async Task ClientProperties()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
- await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ await using var sender = new QueueSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
Assert.AreEqual(scope.QueueName, sender.EntityName);
Assert.AreEqual(TestEnvironment.FullyQualifiedNamespace, sender.FullyQualifiedNamespace);
}
@@ -101,7 +98,7 @@ public async Task Schedule()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
- await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ await using var sender = new QueueSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
var scheduleTime = DateTimeOffset.UtcNow.AddHours(10);
var sequenceNum = await sender.ScheduleMessageAsync(GetMessage(), scheduleTime);
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs
index c5d7a053aea9..2b79f9b14493 100644
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderTests.cs
@@ -9,7 +9,6 @@
using System.Threading.Tasks;
using Azure.Core.Pipeline;
using Azure.Messaging.ServiceBus.Core;
-using Azure.Messaging.ServiceBus.Sender;
using Moq;
using NUnit.Framework;
@@ -20,7 +19,7 @@ public class SenderTests : ServiceBusTestBase
[Test]
public void Send_NullShouldThrow()
{
- var mock = new Mock()
+ var mock = new Mock()
{
CallBase = true
};
@@ -30,7 +29,7 @@ public void Send_NullShouldThrow()
[Test]
public async Task Send_DelegatesToSendRange()
{
- var mock = new Mock()
+ var mock = new Mock()
{
CallBase = true
};
@@ -47,7 +46,7 @@ public async Task Send_DelegatesToSendRange()
[Test]
public void SendRange_NullShouldThrow()
{
- var mock = new Mock()
+ var mock = new Mock()
{
CallBase = true
};
@@ -57,7 +56,7 @@ public void SendRange_NullShouldThrow()
[Test]
public async Task SendRange_DelegatesToInnerSender()
{
- var mock = new Mock()
+ var mock = new Mock()
{
CallBase = true
};
@@ -78,7 +77,7 @@ public void ClientProperties()
var fullyQualifiedNamespace = new UriBuilder($"{account}.servicebus.windows.net/").Host;
var connString = $"Endpoint=sb://{fullyQualifiedNamespace};SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={Encoding.Default.GetString(GetRandomBuffer(64))}";
var queueName = Encoding.Default.GetString(GetRandomBuffer(12));
- var sender = new ServiceBusSenderClient(connString, queueName);
+ var sender = new QueueSenderClient(connString, queueName);
Assert.AreEqual(queueName, sender.EntityName);
Assert.AreEqual(fullyQualifiedNamespace, sender.FullyQualifiedNamespace);
}
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusLiveTestBase.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusLiveTestBase.cs
old mode 100755
new mode 100644
diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs
old mode 100755
new mode 100644
index 07e5ad1bf742..5024c388e159
--- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs
+++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs
@@ -5,8 +5,7 @@
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
-using Azure.Messaging.ServiceBus.Receiver;
-using Azure.Messaging.ServiceBus.Sender;
+using Azure.Messaging.ServiceBus.Core;
using NUnit.Framework;
using NUnit.Framework.Internal;
@@ -23,7 +22,7 @@ public async Task Peek_Session(long? sequenceNumber, string partitionKey)
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
- await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ await using var sender = new QueueSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
var messageCt = 10;
var sessionId = Guid.NewGuid().ToString();
@@ -37,7 +36,17 @@ public async Task Peek_Session(long? sequenceNumber, string partitionKey)
}
// peek the messages
- await using var receiver = new SessionReceiverClient(sessionId, TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ var sessionSettings = new SessionOptions()
+ {
+ Connection = new ServiceBusConnection(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName),
+ SessionId = sessionId
+ };
+ var receiver = new QueueReceiverClient(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName,
+ sessionSettings);
sequenceNumber ??= 1;
IAsyncEnumerable peekedMessages = receiver.PeekRangeBySequenceAsync(
@@ -70,15 +79,28 @@ public async Task PeekMultipleSessions_ShouldThrow()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
- await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ await using var sender = new QueueSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
var messageCt = 10;
var sessionId = Guid.NewGuid().ToString();
// send the messages
IEnumerable sentMessages = GetMessages(messageCt, sessionId);
await sender.SendRangeAsync(sentMessages);
- await using var receiver1 = new SessionReceiverClient(sessionId, TestEnvironment.ServiceBusConnectionString, scope.QueueName);
- await using var receiver2 = new SessionReceiverClient(sessionId, TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ var sessionSettings = new SessionOptions()
+ {
+ Connection = new ServiceBusConnection(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName),
+ SessionId = sessionId
+ };
+ var receiver1 = new QueueReceiverClient(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName,
+ sessionSettings);
+ var receiver2 = new QueueReceiverClient(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName,
+ sessionSettings);
Dictionary sentMessageIdToMsg = new Dictionary();
// peek the messages
@@ -98,17 +120,30 @@ public async Task PeekMultipleSessions_ShouldThrow()
[TestCase(10, 5)]
[TestCase(50, 1)]
[TestCase(50, 10)]
- public async Task PeekRange_IncrementsSequenceNmber(int messageCt, int peekCt)
+ public async Task PeekRange_IncrementsSequenceNumber(int messageCt, int peekCt)
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
- var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ var sender = new QueueSenderClient(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName);
var sessionId = Guid.NewGuid().ToString();
// send the messages
IEnumerable sentMessages = GetMessages(messageCt, sessionId);
await sender.SendRangeAsync(sentMessages);
- await using var receiver = new SessionReceiverClient(sessionId, TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ var sessionSettings = new SessionOptions()
+ {
+ Connection = new ServiceBusConnection(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName),
+ SessionId = sessionId
+ };
+ var receiver = new QueueReceiverClient(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName,
+ sessionSettings);
+
long seq = 0;
for (int i = 0; i < messageCt / peekCt; i++)
@@ -116,6 +151,7 @@ public async Task PeekRange_IncrementsSequenceNmber(int messageCt, int peekCt)
IAsyncEnumerable peekedMessages = receiver.PeekRangeAsync(
maxMessages: peekCt);
+
await foreach (ServiceBusMessage msg in peekedMessages)
{
Assert.IsTrue(msg.SystemProperties.SequenceNumber > seq);
@@ -136,13 +172,24 @@ public async Task Peek_IncrementsSequenceNmber(int messageCt)
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
- await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ await using var sender = new QueueSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
var sessionId = Guid.NewGuid().ToString();
// send the messages
IEnumerable sentMessages = GetMessages(messageCt, sessionId);
await sender.SendRangeAsync(sentMessages);
- await using var receiver = new SessionReceiverClient(sessionId, TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ var sessionSettings = new SessionOptions()
+ {
+ Connection = new ServiceBusConnection(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName),
+ SessionId = sessionId
+ };
+ var receiver = new QueueReceiverClient(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName,
+ sessionSettings);
+
long seq = 0;
for (int i = 0; i < messageCt; i++)
@@ -159,12 +206,11 @@ public async Task Peek_IncrementsSequenceNmber(int messageCt)
}
[Test]
- [Ignore("Test is currently failing; investigation needed")]
public async Task RoundRobinSessions()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
{
- await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
+ await using var sender = new QueueSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
var messageCt = 10;
HashSet sessions = new HashSet() { "1", "2", "3" };
@@ -174,25 +220,33 @@ public async Task RoundRobinSessions()
await sender.SendRangeAsync(GetMessages(messageCt, session));
}
- var receiverClient = new QueueReceiverClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName);
- var sessionId = "";
// create receiver not scoped to a specific session
for (int i = 0; i < 10; i++)
{
- SessionReceiverClient sessionClient = receiverClient.GetSessionReceiverClient();
- IAsyncEnumerable peekedMessages = sessionClient.PeekRangeBySequenceAsync(
+ var sessionSettings = new SessionOptions()
+ {
+ Connection = new ServiceBusConnection(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName),
+ SessionId = null
+ };
+
+ var receiver = new QueueReceiverClient(
+ TestEnvironment.ServiceBusConnectionString,
+ scope.QueueName,
+ sessionSettings);
+ IAsyncEnumerable peekedMessages = receiver.PeekRangeBySequenceAsync(
fromSequenceNumber: 1,
maxMessages: 10);
await foreach (ServiceBusMessage peekedMessage in peekedMessages)
{
- Assert.AreEqual(sessionClient.SessionId, peekedMessage.SessionId);
+ var sessionId = await receiver.Session.GetSessionId();
+ Assert.AreEqual(sessionId, peekedMessage.SessionId);
}
- TestContext.Progress.WriteLine(sessionId);
- sessionId = sessionClient.SessionId;
- // Close the session client when we are done with it. Since the sessionClient doesn't own the underlying connection, the connection remains open, but the session link will be closed.
- await sessionClient.CloseAsync();
+ // Close the receiver client when we are done with it. Since the sessionClient doesn't own the underlying connection, the connection remains open, but the session link will be closed.
+ await receiver.CloseAsync();
}
}
}