Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,14 @@ internal AmqpClient(
/// <param name="entityPath">The entity path to send the message to.</param>
/// <param name="viaEntityPath">The entity path to route the message through. Useful when using transactions.</param>
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
/// <param name="identifier">The identifier for the sender.</param>
///
/// <returns>A <see cref="TransportSender"/> configured in the requested manner.</returns>
public override TransportSender CreateSender(
string entityPath,
string viaEntityPath,
ServiceBusRetryPolicy retryPolicy)
ServiceBusRetryPolicy retryPolicy,
string identifier)
{
Argument.AssertNotClosed(_closed, nameof(AmqpClient));

Expand All @@ -132,7 +134,8 @@ public override TransportSender CreateSender(
entityPath,
viaEntityPath,
ConnectionScope,
retryPolicy
retryPolicy,
identifier
);
}

Expand Down Expand Up @@ -182,19 +185,22 @@ public override TransportReceiver CreateReceiver(
///
/// <param name="subscriptionPath">The path of the Service Bus subscription to which the rule manager is bound.</param>
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
/// <param name="identifier">The identifier for the rule manager.</param>
///
/// <returns>A <see cref="TransportRuleManager"/> configured in the requested manner.</returns>
public override TransportRuleManager CreateRuleManager(
string subscriptionPath,
ServiceBusRetryPolicy retryPolicy)
ServiceBusRetryPolicy retryPolicy,
string identifier)
{
Argument.AssertNotClosed(_closed, nameof(AmqpClient));

return new AmqpRuleManager
(
subscriptionPath,
ConnectionScope,
retryPolicy
retryPolicy,
identifier
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private async Task<Controller> CreateControllerAsync(TimeSpan timeout)
await amqpSession.CloseAsync(timeout).ConfigureAwait(false);
}

MessagingEventSource.Log.AmqpCreateControllerException(ActiveConnection.ToString(), exception);
ServiceBusEventSource.Log.CreateControllerException(ActiveConnection.ToString(), exception.ToString());
throw;
}

Expand All @@ -217,8 +217,8 @@ protected AmqpConnectionScope()
/// <summary>
/// Opens an AMQP link for use with management operations.
/// </summary>
/// <param name="entityPath"></param>
///
/// <param name="entityPath">The path for the entity.</param>
/// <param name="identifier">The identifier for the sender or receiver that is opening a management link.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
Expand All @@ -231,25 +231,35 @@ protected AmqpConnectionScope()
///
public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(
string entityPath,
string identifier,
TimeSpan timeout,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var stopWatch = ValueStopwatch.StartNew();
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
ServiceBusEventSource.Log.CreateManagementLinkStart(identifier);
try
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var link = await CreateManagementLinkAsync(
entityPath,
connection,
timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
var stopWatch = ValueStopwatch.StartNew();
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
var link = await CreateManagementLinkAsync(
entityPath,
connection,
timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

return link;
await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
ServiceBusEventSource.Log.CreateManagementLinkComplete(identifier);
return link;
}
catch (Exception ex)
{
ServiceBusEventSource.Log.CreateManagementLinkException(identifier, ex.ToString());
throw;
}
}

/// <summary>
Expand All @@ -261,6 +271,7 @@ public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(
/// <param name="receiveMode">The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode.</param>
/// <param name="sessionId"></param>
/// <param name="isSessionReceiver"></param>
/// <param name="identifier">The identifier for the receive link.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
Expand All @@ -273,35 +284,44 @@ public virtual async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
ReceiveMode receiveMode,
string sessionId,
bool isSessionReceiver,
string identifier,
Comment thread
ShivangiReja marked this conversation as resolved.
CancellationToken cancellationToken)
{
ServiceBusEventSource.Log.CreateReceiveLinkStart(identifier);
try
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var stopWatch = ValueStopwatch.StartNew();
var receiverEndpoint = new Uri(ServiceEndpoint, entityPath);

var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
var stopWatch = ValueStopwatch.StartNew();
var receiverEndpoint = new Uri(ServiceEndpoint, entityPath);

ReceivingAmqpLink link = await CreateReceivingLinkAsync(
entityPath,
connection,
receiverEndpoint,
timeout.CalculateRemaining(stopWatch.GetElapsedTime()),
prefetchCount,
receiveMode,
sessionId,
isSessionReceiver,
cancellationToken
).ConfigureAwait(false);
var connection = await ActiveConnection.GetOrCreateAsync(timeout).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
ReceivingAmqpLink link = await CreateReceivingLinkAsync(
entityPath,
connection,
receiverEndpoint,
timeout.CalculateRemaining(stopWatch.GetElapsedTime()),
prefetchCount,
receiveMode,
sessionId,
isSessionReceiver,
cancellationToken
).ConfigureAwait(false);

await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

return link;
await OpenAmqpObjectAsync(link, timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
ServiceBusEventSource.Log.CreateReceiveLinkComplete(identifier);
return link;
}
catch (Exception ex)
{
ServiceBusEventSource.Log.CreateReceiveLinkException(identifier, ex.ToString());
throw;
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,14 @@ public AmqpReceiver(
receiveMode: receiveMode,
sessionId: sessionId,
isSessionReceiver: isSessionReceiver,
identifier: _identifier,
cancellationToken: CancellationToken.None),
link => CloseLink(link));

_managementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
timeout => _connectionScope.OpenManagementLinkAsync(
_entityPath,
_identifier,
timeout,
CancellationToken.None),
link => CloseLink(link));
Expand All @@ -173,7 +175,7 @@ private void CloseLink(RequestResponseAmqpLink link)
/// <param name="maxWaitTime">An optional <see cref="TimeSpan"/> specifying the maximum time to wait for the first message before returning an empty list if no messages have been received.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>List of messages received. Returns null if no message is found.</returns>
/// <returns>List of messages received. Returns an empty list if no message is found.</returns>
public override async Task<IList<ServiceBusReceivedMessage>> ReceiveBatchAsync(
int maxMessages,
TimeSpan? maxWaitTime,
Expand All @@ -195,7 +197,7 @@ await _retryPolicy.RunOperation(async (timeout) =>
}

/// <summary>
/// Receives a batch of <see cref="ServiceBusMessage" /> from the Service Bus entity partition.
/// Receives a batch of <see cref="ServiceBusMessage" /> from the Service Bus entity.
/// </summary>
///
/// <param name="maxMessages">The maximum number of messages to receive in this batch.</param>
Expand All @@ -204,7 +206,7 @@ await _retryPolicy.RunOperation(async (timeout) =>
/// <param name="timeout">The per-try timeout specified in the RetryOptions.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>The batch of <see cref="ServiceBusMessage" /> from the Service Bus entity partition this consumer is associated with. If no events are present, an empty enumerable is returned.</returns>
/// <returns>The batch of <see cref="ServiceBusMessage" /> from the Service Bus entity this receiver is associated with. If no messages are present, an empty list is returned.</returns>
///
private async Task<IList<ServiceBusReceivedMessage>> ReceiveBatchAsyncInternal(
int maxMessages,
Expand Down Expand Up @@ -389,9 +391,9 @@ private async Task DisposeMessagesAsync(
ServiceBusEventSource.Log.LinkStateLost(
_identifier,
receiveLink.Name,
receiveLink.State,
receiveLink.State.ToString(),
_isSessionReceiver,
exception);
exception.ToString());
ThrowLockLostException();
}

Expand Down Expand Up @@ -1229,7 +1231,7 @@ private void OnSessionReceiverLinkClosed(object receiver, EventArgs e)
ServiceBusEventSource.Log.SessionReceiverLinkClosed(
_identifier,
SessionId,
LinkException);
LinkException.ToString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ internal class AmqpRuleManager : TransportRuleManager
/// </summary>
private readonly ServiceBusRetryPolicy _retryPolicy;

/// <summary>
/// The identifier for the rule manager.
/// </summary>
private readonly string _identifier;

/// <summary>
/// The AMQP connection scope responsible for managing transport constructs for this instance.
/// </summary>
Expand Down Expand Up @@ -67,6 +72,7 @@ static AmqpRuleManager()
/// <param name="subscriptionPath">The path of the Service Bus subscription to which the rule manager is bound.</param>
/// <param name="connectionScope">The AMQP connection context for operations.</param>
/// <param name="retryPolicy">The retry policy to consider when an operation fails.</param>
/// <param name="identifier">The identifier for the rule manager.</param>
///
/// <remarks>
/// As an internal type, this class performs only basic sanity checks against its arguments. It
Expand All @@ -79,7 +85,8 @@ static AmqpRuleManager()
public AmqpRuleManager(
string subscriptionPath,
AmqpConnectionScope connectionScope,
ServiceBusRetryPolicy retryPolicy)
ServiceBusRetryPolicy retryPolicy,
string identifier)
{
Argument.AssertNotNullOrEmpty(subscriptionPath, nameof(subscriptionPath));
Argument.AssertNotNull(connectionScope, nameof(connectionScope));
Expand All @@ -88,10 +95,12 @@ public AmqpRuleManager(
_subscriptionPath = subscriptionPath;
_connectionScope = connectionScope;
_retryPolicy = retryPolicy;
_identifier = identifier;

_managementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
timeout => _connectionScope.OpenManagementLinkAsync(
_subscriptionPath,
_identifier,
timeout,
CancellationToken.None),
link =>
Expand Down
Loading