diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs index 9a27440bce8b..39f13797e6dc 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs @@ -53,13 +53,13 @@ internal class AmqpConnectionScope : TransportConnectionScope /// refreshing authorization. Authorization will be refreshed earlier /// than the expected expiration by this amount. /// - private static TimeSpan AuthorizationRefreshBuffer { get; } = TimeSpan.FromMinutes(5); + private static TimeSpan AuthorizationRefreshBuffer { get; } = TimeSpan.FromMinutes(10); /// /// The minimum amount of time for authorization to be refreshed; any calculations that /// call for refreshing more frequently will be substituted with this value. /// - private static TimeSpan MinimumAuthorizationRefresh { get; } = TimeSpan.FromMinutes(4); + private static TimeSpan MinimumAuthorizationRefresh { get; } = TimeSpan.FromMinutes(2); /// /// The amount time to allow to refresh authorization of an AMQP link. @@ -230,6 +230,7 @@ public virtual async Task OpenManagementLinkAsync( var link = await CreateManagementLinkAsync( entityPath, + identifier, connection, timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); @@ -249,6 +250,7 @@ public virtual async Task OpenManagementLinkAsync( /// /// Opens an AMQP link for use with receiver operations. /// + /// The identifier of the entity that is receiving. /// The entity path to receive from. /// The timeout to apply when creating the link. /// Controls the number of events received and queued locally without regard to whether an operation was requested. @@ -260,6 +262,7 @@ public virtual async Task OpenManagementLinkAsync( /// A link for use with consumer operations. /// public virtual async Task OpenReceiverLinkAsync( + string identifier, string entityPath, TimeSpan timeout, uint prefetchCount, @@ -277,15 +280,16 @@ public virtual async Task OpenReceiverLinkAsync( cancellationToken.ThrowIfCancellationRequested(); ReceivingAmqpLink link = await CreateReceivingLinkAsync( - entityPath, - connection, - receiverEndpoint, - timeout.CalculateRemaining(stopWatch.GetElapsedTime()), - prefetchCount, - receiveMode, - sessionId, - isSessionReceiver, - cancellationToken + entityPath: entityPath, + identifier: identifier, + connection: connection, + endpoint: receiverEndpoint, + timeout: timeout.CalculateRemaining(stopWatch.GetElapsedTime()), + prefetchCount: prefetchCount, + receiveMode: receiveMode, + sessionId: sessionId, + isSessionReceiver: isSessionReceiver, + cancellationToken: cancellationToken ).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); @@ -299,6 +303,7 @@ public virtual async Task OpenReceiverLinkAsync( /// Opens an AMQP link for use with sender operations. /// /// + /// The identifier for the sender that is opening a send link. /// The timeout to apply when creating the link. /// An optional instance to signal the request to cancel the operation. /// @@ -306,6 +311,7 @@ public virtual async Task OpenReceiverLinkAsync( /// public virtual async Task OpenSenderLinkAsync( string entityPath, + string identifier, TimeSpan timeout, CancellationToken cancellationToken) { @@ -317,9 +323,11 @@ public virtual async Task OpenSenderLinkAsync( cancellationToken.ThrowIfCancellationRequested(); SendingAmqpLink link = await CreateSendingLinkAsync( - entityPath, - connection, - timeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false); + entityPath: entityPath, + identifier: identifier, + connection: connection, + timeout: timeout.CalculateRemaining(stopWatch.GetElapsedTime()), + cancellationToken: cancellationToken).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); @@ -415,7 +423,7 @@ protected virtual async Task CreateAndOpenConnectionAsync( /// Creates an AMQP link for use with management operations. /// /// - /// + /// The identifier for the sender or receiver that is opening a management link. /// The active and opened AMQP connection to use for this link. /// The timeout to apply when creating the link. /// An optional instance to signal the request to cancel the operation. @@ -423,6 +431,7 @@ protected virtual async Task CreateAndOpenConnectionAsync( /// A link for use with management operations. protected virtual async Task CreateManagementLinkAsync( string entityPath, + string identifier, AmqpConnection connection, TimeSpan timeout, CancellationToken cancellationToken) @@ -456,12 +465,13 @@ protected virtual async Task CreateManagementLinkAsync( var endpoint = new Uri(ServiceEndpoint, entityPath); var audience = new[] { endpoint.AbsoluteUri }; DateTime authExpirationUtc = await RequestAuthorizationUsingCbsAsync( - connection, - TokenProvider, - ServiceEndpoint, - audience, - claims, - timeout.CalculateRemaining(stopWatch.GetElapsedTime())) + connection: connection, + tokenProvider: TokenProvider, + endpoint: ServiceEndpoint, + audience: audience, + requiredClaims: claims, + timeout: timeout.CalculateRemaining(stopWatch.GetElapsedTime()), + identifier: identifier) .ConfigureAwait(false); var link = new RequestResponseAmqpLink( @@ -476,16 +486,16 @@ protected virtual async Task CreateManagementLinkAsync( TimerCallback refreshHandler = CreateAuthorizationRefreshHandler ( - entityPath, - connection, - link, - TokenProvider, - ServiceEndpoint, - audience, - claims, - AuthorizationRefreshTimeout, - () => (ActiveLinks.ContainsKey(link) ? refreshTimer : null) - ); + entityPath: entityPath, + connection: connection, + amqpLink: link, + tokenProvider: TokenProvider, + endpoint: ServiceEndpoint, + audience: audience, + requiredClaims: claims, + refreshTimeout: AuthorizationRefreshTimeout, + refreshTimerFactory: () => (ActiveLinks.ContainsKey(link) ? refreshTimer : null), + identifier: identifier); refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan); @@ -515,6 +525,7 @@ protected virtual async Task CreateManagementLinkAsync( /// Creates an AMQP link for use with receiving operations. /// /// The entity path to receive from. + /// The identifier for the receiver that is creating a receive link. /// The active and opened AMQP connection to use for this link. /// The fully qualified endpoint to open the link for. /// Controls the number of events received and queued locally without regard to whether an operation was requested. @@ -527,6 +538,7 @@ protected virtual async Task CreateManagementLinkAsync( /// A link for use for operations related to receiving events. protected virtual async Task CreateReceivingLinkAsync( string entityPath, + string identifier, AmqpConnection connection, Uri endpoint, TimeSpan timeout, @@ -549,12 +561,13 @@ protected virtual async Task CreateReceivingLinkAsync( string[] authClaims = new string[] { ServiceBusClaim.Send }; var audience = new[] { endpoint.AbsoluteUri }; DateTime authExpirationUtc = await RequestAuthorizationUsingCbsAsync( - connection, - TokenProvider, - endpoint, - audience, - authClaims, - timeout.CalculateRemaining(stopWatch.GetElapsedTime())).ConfigureAwait(false); + connection: connection, + tokenProvider: TokenProvider, + endpoint: endpoint, + audience: audience, + requiredClaims: authClaims, + timeout: timeout.CalculateRemaining(stopWatch.GetElapsedTime()), + identifier: identifier).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); // Create and open the AMQP session associated with the link. @@ -600,16 +613,16 @@ protected virtual async Task CreateReceivingLinkAsync( TimerCallback refreshHandler = CreateAuthorizationRefreshHandler ( - entityPath, - connection, - link, - TokenProvider, - endpoint, - audience, - authClaims, - AuthorizationRefreshTimeout, - () => (ActiveLinks.ContainsKey(link) ? refreshTimer : null) - ); + entityPath: entityPath, + connection: connection, + amqpLink: link, + tokenProvider: TokenProvider, + endpoint: endpoint, + audience: audience, + requiredClaims: authClaims, + refreshTimeout: AuthorizationRefreshTimeout, + refreshTimerFactory: () => (ActiveLinks.ContainsKey(link) ? refreshTimer : null), + identifier: identifier); refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan); @@ -639,6 +652,7 @@ protected virtual async Task CreateReceivingLinkAsync( /// Creates an AMQP link for use with publishing operations. /// /// The entity path to send to. + /// The identifier of the sender that is creating a send link. /// The active and opened AMQP connection to use for this link. /// The timeout to apply when creating the link. /// An optional instance to signal the request to cancel the operation. @@ -646,6 +660,7 @@ protected virtual async Task CreateReceivingLinkAsync( /// A link for use for operations related to receiving events. protected virtual async Task CreateSendingLinkAsync( string entityPath, + string identifier, AmqpConnection connection, TimeSpan timeout, CancellationToken cancellationToken) @@ -669,12 +684,13 @@ protected virtual async Task CreateSendingLinkAsync( var authClaims = new[] { ServiceBusClaim.Send }; DateTime authExpirationUtc = await RequestAuthorizationUsingCbsAsync( - connection, - TokenProvider, - destinationEndpoint, - audience, - authClaims, - timeout.CalculateRemaining(stopWatch.GetElapsedTime())) + connection: connection, + tokenProvider: TokenProvider, + endpoint: destinationEndpoint, + audience: audience, + requiredClaims: authClaims, + timeout: timeout.CalculateRemaining(stopWatch.GetElapsedTime()), + identifier: identifier) .ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); @@ -709,15 +725,16 @@ protected virtual async Task CreateSendingLinkAsync( TimerCallback refreshHandler = CreateAuthorizationRefreshHandler ( - entityPath, - connection, - link, - TokenProvider, - destinationEndpoint, - audience, - authClaims, - AuthorizationRefreshTimeout, - () => refreshTimer + entityPath: entityPath, + connection: connection, + amqpLink: link, + tokenProvider: TokenProvider, + endpoint: destinationEndpoint, + audience: audience, + requiredClaims: authClaims, + refreshTimeout: AuthorizationRefreshTimeout, + refreshTimerFactory: () => refreshTimer, + identifier: identifier ); refreshTimer = new Timer(refreshHandler, null, CalculateLinkAuthorizationRefreshInterval(authExpirationUtc), Timeout.InfiniteTimeSpan); @@ -819,8 +836,7 @@ protected virtual TimeSpan CalculateLinkAuthorizationRefreshInterval( /// Creates the timer event handler to support refreshing AMQP link authorization /// on a recurring basis. /// - /// - /// + /// The entity path to refresh authorization with. /// The AMQP connection to which the link being refreshed is bound to. /// The AMQP link to refresh authorization for. /// The to use for obtaining access tokens. @@ -829,6 +845,7 @@ protected virtual TimeSpan CalculateLinkAuthorizationRefreshInterval( /// The set of claims required to support the operations of the AMQP link. /// The timeout to apply when requesting authorization refresh. /// A function to allow retrieving the associated with the link authorization. + /// The identifier of the entity that will be refreshing authorization. /// /// A delegate to perform the refresh when a timer is due. protected virtual TimerCallback CreateAuthorizationRefreshHandler( @@ -840,7 +857,8 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler( string[] audience, string[] requiredClaims, TimeSpan refreshTimeout, - Func refreshTimerFactory) + Func refreshTimerFactory, + string identifier) { return async _ => { @@ -855,12 +873,13 @@ protected virtual TimerCallback CreateAuthorizationRefreshHandler( } DateTime authExpirationUtc = await RequestAuthorizationUsingCbsAsync( - connection, - tokenProvider, - endpoint, - audience, - requiredClaims, - refreshTimeout) + connection: connection, + tokenProvider: tokenProvider, + endpoint: endpoint, + audience: audience, + requiredClaims: requiredClaims, + timeout: refreshTimeout, + identifier: identifier) .ConfigureAwait(false); // Reset the timer for the next refresh. @@ -940,6 +959,7 @@ protected virtual async Task OpenAmqpObjectAsync( /// The audience associated with the authorization. This is likely the absolute URI. /// The set of claims required to support the operations of the AMQP link. /// The timeout to apply when requesting authorization. + /// The identifier of the entity requesting authorization. /// /// The date/time, in UTC, when the authorization expires. /// @@ -954,19 +974,33 @@ protected virtual async Task RequestAuthorizationUsingCbsAsync( Uri endpoint, string[] audience, string[] requiredClaims, - TimeSpan timeout) + TimeSpan timeout, + string identifier) { + string uri = endpoint.AbsoluteUri; + ServiceBusEventSource.Log.RequestAuthorizationStart(identifier, uri); AmqpCbsLink authLink = connection.Extensions.Find(); DateTime cbsTokenExpiresAtUtc = DateTime.MaxValue; - foreach (string resource in audience) + + try { - DateTime expiresAt = - await authLink.SendTokenAsync(TokenProvider, endpoint, resource, resource, requiredClaims, timeout).ConfigureAwait(false); - if (expiresAt < cbsTokenExpiresAtUtc) + foreach (string resource in audience) { - cbsTokenExpiresAtUtc = expiresAt; + DateTime expiresAt = + await authLink.SendTokenAsync(TokenProvider, endpoint, resource, resource, requiredClaims, timeout).ConfigureAwait(false); + if (expiresAt < cbsTokenExpiresAtUtc) + { + cbsTokenExpiresAtUtc = expiresAt; + } } } + catch (Exception ex) + { + ServiceBusEventSource.Log.RequestAuthorizationException(identifier, uri, ex.ToString()); + throw; + } + + ServiceBusEventSource.Log.RequestAuthorizationComplete(identifier, uri, cbsTokenExpiresAtUtc.ToString(CultureInfo.InvariantCulture)); return cbsTokenExpiresAtUtc; } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index ca60ec28bd5a..f11c69b51a30 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -142,7 +142,8 @@ public AmqpReceiver( timeout: timeout, prefetchCount: prefetchCount, receiveMode: receiveMode, - isSessionReceiver: isSessionReceiver), + isSessionReceiver: isSessionReceiver, + identifier: identifier), link => CloseLink(link)); _managementLink = new FaultTolerantAmqpObject( @@ -166,13 +167,15 @@ private async Task OpenReceiverLinkAsync( TimeSpan timeout, uint prefetchCount, ServiceBusReceiveMode receiveMode, - bool isSessionReceiver) + bool isSessionReceiver, + string identifier) { ServiceBusEventSource.Log.CreateReceiveLinkStart(_identifier); try { ReceivingAmqpLink link = await _connectionScope.OpenReceiverLinkAsync( + identifier: identifier, entityPath: _entityPath, timeout: timeout, prefetchCount: prefetchCount, diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs index aee5e5c6ffa7..ca70079af2b7 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs @@ -583,9 +583,10 @@ protected virtual async Task CreateLinkAndEnsureSenderStateAsyn try { SendingAmqpLink link = await _connectionScope.OpenSenderLinkAsync( - _entityPath, - timeout, - cancellationToken).ConfigureAwait(false); + entityPath: _entityPath, + identifier: _identifier, + timeout: timeout, + cancellationToken: cancellationToken).ConfigureAwait(false); if (!MaxMessageSize.HasValue) { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs index b9a893160cfb..3192a692f0d0 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs @@ -186,6 +186,10 @@ internal ServiceBusEventSource() { } internal const int ProcessorMessageHandlerCompleteEvent = 103; internal const int ProcessorMessageHandlerExceptionEvent = 104; + internal const int RequestAuthorizationStartEvent = 105; + internal const int RequestAuthorizationCompleteEvent = 106; + internal const int RequestAuthorizationExceptionEvent = 107; + #endregion // add new event numbers here incrementing from previous @@ -1096,6 +1100,33 @@ public virtual void ManagementLinkClosedCore( WriteEvent(ManagementLinkClosedEvent, identifier, linkException); } } + + [Event(RequestAuthorizationStartEvent, Level = EventLevel.Verbose, Message = "{0}: Requesting authorization to {1}")] + public virtual void RequestAuthorizationStart(string identifier, string endpoint) + { + if (IsEnabled()) + { + WriteEvent(RequestAuthorizationStartEvent, identifier, endpoint); + } + } + + [Event(RequestAuthorizationCompleteEvent, Level = EventLevel.Verbose, Message = "{0}: Authorization to {1} complete. Expiration time: {2}")] + public virtual void RequestAuthorizationComplete(string identifier, string endpoint, string expiration) + { + if (IsEnabled()) + { + WriteEvent(RequestAuthorizationCompleteEvent, identifier, endpoint, expiration); + } + } + + [Event(RequestAuthorizationExceptionEvent, Level = EventLevel.Verbose, Message = "{0}: An exception occured while requesting authorization to {1}. Exception: {2}.")] + public virtual void RequestAuthorizationException(string identifier, string endpoint, string exception) + { + if (IsEnabled()) + { + WriteEvent(RequestAuthorizationExceptionEvent, identifier, endpoint, exception); + } + } #endregion #region Retries diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpReceiverTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpReceiverTests.cs index 01b75afab2cc..9bf5077a2c76 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpReceiverTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpReceiverTests.cs @@ -156,6 +156,7 @@ public void ReceiveAsyncAppliesTheRetryPolicy(ServiceBusRetryOptions retryOption mockScope .Setup(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), @@ -173,6 +174,7 @@ public void ReceiveAsyncAppliesTheRetryPolicy(ServiceBusRetryOptions retryOption mockScope .Verify(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), @@ -210,6 +212,7 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(ServiceBu mockScope .Setup(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), @@ -227,6 +230,7 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(ServiceBu mockScope .Verify(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), @@ -272,6 +276,7 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(ServiceBusRetryOption mockScope .Setup(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), @@ -288,6 +293,7 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(ServiceBusRetryOption cancellationSource.Token), Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusFailureReason.ServiceBusy)); mockScope .Verify(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), @@ -329,6 +335,7 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled() mockScope .Setup(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), @@ -346,6 +353,7 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled() mockScope .Verify(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), @@ -382,6 +390,7 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled() mockScope .Setup(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), @@ -399,6 +408,7 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled() mockScope .Verify(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), @@ -435,6 +445,7 @@ public void ReceiveAsyncDoesntRetryOnTaskCanceled() mockScope .Setup(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), @@ -452,6 +463,7 @@ public void ReceiveAsyncDoesntRetryOnTaskCanceled() mockScope .Verify(scope => scope.OpenReceiverLinkAsync( + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs index 92b31fee00b3..109739fdbc4b 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs @@ -51,6 +51,8 @@ public async Task LogsEvents() await sender.SendMessagesAsync(batch); _listener.SingleEventById(ServiceBusEventSource.CreateSendLinkStartEvent, e => e.Payload.Contains(sender.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationStartEvent, e => e.Payload.Contains(sender.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationCompleteEvent, e => e.Payload.Contains(sender.Identifier)); _listener.SingleEventById(ServiceBusEventSource.CreateSendLinkCompleteEvent, e => e.Payload.Contains(sender.Identifier)); _listener.SingleEventById(ServiceBusEventSource.SendMessageStartEvent, e => e.Payload.Contains(sender.Identifier)); _listener.SingleEventById(ServiceBusEventSource.SendMessageCompleteEvent, e => e.Payload.Contains(sender.Identifier)); @@ -77,6 +79,8 @@ public async Task LogsEvents() } } _listener.SingleEventById(ServiceBusEventSource.CreateReceiveLinkStartEvent, e => e.Payload.Contains(receiver.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationStartEvent, e => e.Payload.Contains(receiver.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationCompleteEvent, e => e.Payload.Contains(receiver.Identifier)); _listener.SingleEventById(ServiceBusEventSource.CreateReceiveLinkCompleteEvent, e => e.Payload.Contains(receiver.Identifier)); Assert.IsTrue(_listener.EventsById(ServiceBusEventSource.ReceiveMessageStartEvent).Any()); Assert.IsTrue(_listener.EventsById(ServiceBusEventSource.ReceiveMessageCompleteEvent).Any()); @@ -144,6 +148,8 @@ public async Task LogsSessionEvents() await sender.SendMessagesAsync(batch); _listener.SingleEventById(ServiceBusEventSource.CreateSendLinkStartEvent, e => e.Payload.Contains(sender.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationStartEvent, e => e.Payload.Contains(sender.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationCompleteEvent, e => e.Payload.Contains(sender.Identifier)); _listener.SingleEventById(ServiceBusEventSource.CreateSendLinkCompleteEvent, e => e.Payload.Contains(sender.Identifier)); _listener.SingleEventById(ServiceBusEventSource.SendMessageStartEvent, e => e.Payload.Contains(sender.Identifier)); _listener.SingleEventById(ServiceBusEventSource.SendMessageCompleteEvent, e => e.Payload.Contains(sender.Identifier)); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs index 914da2130ce6..69206d392031 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs @@ -296,7 +296,10 @@ async Task ProcessMessage(ProcessMessageEventArgs args) Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusFailureReason.MessageLockLost)); Interlocked.Increment(ref messageCt); var setIndex = Interlocked.Increment(ref completionSourceIndex); - completionSources[setIndex].SetResult(true); + if (setIndex < numThreads) + { + completionSources[setIndex].SetResult(true); + } } } await Task.WhenAll(completionSources.Select(source => source.Task)); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs index 413565ce8035..b95313c8f7c5 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs @@ -748,7 +748,10 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args) } Interlocked.Increment(ref messageCt); var setIndex = Interlocked.Increment(ref completionSourceIndex); - completionSources[setIndex].SetResult(true); + if (setIndex < numThreads) + { + completionSources[setIndex].SetResult(true); + } } await Task.WhenAll(completionSources.Select(source => source.Task)); await processor.StopProcessingAsync(); diff --git a/sdk/servicebus/test-resources-post.ps1 b/sdk/servicebus/test-resources-post.ps1 new file mode 100644 index 000000000000..28e3cf0a9783 --- /dev/null +++ b/sdk/servicebus/test-resources-post.ps1 @@ -0,0 +1,16 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +# The purpose of this script is to add a small delay between the creation of the live test resources +# and the execution of the live tests. This allows RBAC to replicate and avoids flakiness in the first set +# of live tests that might otherwise start running before RBAC has replicated. + +param ( + [hashtable] $DeploymentOutputs, + [string] $TenantId, + [string] $TestApplicationId, + [string] $TestApplicationSecret +) + +Write-Verbose "Sleeping for 60 seconds to let RBAC replicate" +Start-Sleep -s 60