Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ public AmqpReceiver(
timeout: timeout,
prefetchCount: prefetchCount,
receiveMode: receiveMode,
isSessionReceiver: isSessionReceiver),
isSessionReceiver: isSessionReceiver,
identifier: identifier),
link => CloseLink(link));

_managementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
Expand All @@ -166,13 +167,15 @@ private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
TimeSpan timeout,
uint prefetchCount,
ServiceBusReceiveMode receiveMode,
bool isSessionReceiver)
bool isSessionReceiver,
string identifier)
{
ServiceBusEventSource.Log.CreateReceiveLinkStart(_identifier);

try
{
ReceivingAmqpLink link = await _connectionScope.OpenReceiverLinkAsync(
identifier: identifier,
entityPath: _entityPath,
timeout: timeout,
prefetchCount: prefetchCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,10 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureSenderStateAsyn
try
{
SendingAmqpLink link = await _connectionScope.OpenSenderLinkAsync(
_entityPath,
timeout,
cancellationToken).ConfigureAwait(false);
entityPath: _entityPath,
identifier: _identifier,
timeout: timeout,
cancellationToken: cancellationToken).ConfigureAwait(false);

if (!MaxMessageSize.HasValue)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ internal ServiceBusEventSource() { }
internal const int ProcessorMessageHandlerCompleteEvent = 103;
internal const int ProcessorMessageHandlerExceptionEvent = 104;

internal const int RequestAuthorizationStartEvent = 105;
internal const int RequestAuthorizationCompleteEvent = 106;
internal const int RequestAuthorizationExceptionEvent = 107;

#endregion
// add new event numbers here incrementing from previous

Expand Down Expand Up @@ -1096,6 +1100,33 @@ public virtual void ManagementLinkClosedCore(
WriteEvent(ManagementLinkClosedEvent, identifier, linkException);
}
}

[Event(RequestAuthorizationStartEvent, Level = EventLevel.Verbose, Message = "{0}: Requesting authorization to {1}")]
public virtual void RequestAuthorizationStart(string identifier, string endpoint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be overlooking, but I'm not sure from usage how these differ from the AmqpLinkAuthorizationRefresh series (ids 39-41). Should we consider merging them?

{
if (IsEnabled())
{
WriteEvent(RequestAuthorizationStartEvent, identifier, endpoint);
}
}

[Event(RequestAuthorizationCompleteEvent, Level = EventLevel.Verbose, Message = "{0}: Authorization to {1} complete. Expiration time: {2}")]
public virtual void RequestAuthorizationComplete(string identifier, string endpoint, string expiration)
{
if (IsEnabled())
{
WriteEvent(RequestAuthorizationCompleteEvent, identifier, endpoint, expiration);
}
}

[Event(RequestAuthorizationExceptionEvent, Level = EventLevel.Verbose, Message = "{0}: An exception occured while requesting authorization to {1}. Exception: {2}.")]
public virtual void RequestAuthorizationException(string identifier, string endpoint, string exception)
{
if (IsEnabled())
{
WriteEvent(RequestAuthorizationExceptionEvent, identifier, endpoint, exception);
}
}
#endregion

#region Retries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public void ReceiveAsyncAppliesTheRetryPolicy(ServiceBusRetryOptions retryOption

mockScope
.Setup(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand All @@ -173,6 +174,7 @@ public void ReceiveAsyncAppliesTheRetryPolicy(ServiceBusRetryOptions retryOption

mockScope
.Verify(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand Down Expand Up @@ -210,6 +212,7 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(ServiceBu

mockScope
.Setup(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand All @@ -227,6 +230,7 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(ServiceBu

mockScope
.Verify(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand Down Expand Up @@ -272,6 +276,7 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(ServiceBusRetryOption

mockScope
.Setup(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand All @@ -288,6 +293,7 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(ServiceBusRetryOption
cancellationSource.Token), Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusFailureReason.ServiceBusy));
mockScope
.Verify(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand Down Expand Up @@ -329,6 +335,7 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled()

mockScope
.Setup(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand All @@ -346,6 +353,7 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled()

mockScope
.Verify(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand Down Expand Up @@ -382,6 +390,7 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled()

mockScope
.Setup(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand All @@ -399,6 +408,7 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled()

mockScope
.Verify(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand Down Expand Up @@ -435,6 +445,7 @@ public void ReceiveAsyncDoesntRetryOnTaskCanceled()

mockScope
.Setup(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand All @@ -452,6 +463,7 @@ public void ReceiveAsyncDoesntRetryOnTaskCanceled()

mockScope
.Verify(scope => scope.OpenReceiverLinkAsync(
It.IsAny<string>(),
It.IsAny<string>(),
It.IsAny<TimeSpan>(),
It.IsAny<uint>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public async Task LogsEvents()

await sender.SendMessagesAsync(batch);
_listener.SingleEventById(ServiceBusEventSource.CreateSendLinkStartEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationStartEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationCompleteEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.CreateSendLinkCompleteEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.SendMessageStartEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.SendMessageCompleteEvent, e => e.Payload.Contains(sender.Identifier));
Expand All @@ -77,6 +79,8 @@ public async Task LogsEvents()
}
}
_listener.SingleEventById(ServiceBusEventSource.CreateReceiveLinkStartEvent, e => e.Payload.Contains(receiver.Identifier));
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationStartEvent, e => e.Payload.Contains(receiver.Identifier));
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationCompleteEvent, e => e.Payload.Contains(receiver.Identifier));
_listener.SingleEventById(ServiceBusEventSource.CreateReceiveLinkCompleteEvent, e => e.Payload.Contains(receiver.Identifier));
Assert.IsTrue(_listener.EventsById(ServiceBusEventSource.ReceiveMessageStartEvent).Any());
Assert.IsTrue(_listener.EventsById(ServiceBusEventSource.ReceiveMessageCompleteEvent).Any());
Expand Down Expand Up @@ -144,6 +148,8 @@ public async Task LogsSessionEvents()

await sender.SendMessagesAsync(batch);
_listener.SingleEventById(ServiceBusEventSource.CreateSendLinkStartEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationStartEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationCompleteEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.CreateSendLinkCompleteEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.SendMessageStartEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.SendMessageCompleteEvent, e => e.Payload.Contains(sender.Identifier));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,10 @@ async Task ProcessMessage(ProcessMessageEventArgs args)
Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusFailureReason.MessageLockLost));
Interlocked.Increment(ref messageCt);
var setIndex = Interlocked.Increment(ref completionSourceIndex);
completionSources[setIndex].SetResult(true);
if (setIndex < numThreads)
{
completionSources[setIndex].SetResult(true);
}
}
}
await Task.WhenAll(completionSources.Select(source => source.Task));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,10 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
}
Interlocked.Increment(ref messageCt);
var setIndex = Interlocked.Increment(ref completionSourceIndex);
completionSources[setIndex].SetResult(true);
if (setIndex < numThreads)
{
completionSources[setIndex].SetResult(true);
}
}
await Task.WhenAll(completionSources.Select(source => source.Task));
await processor.StopProcessingAsync();
Expand Down
16 changes: 16 additions & 0 deletions sdk/servicebus/test-resources-post.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

# The purpose of this script is to add a small delay between the creation of the live test resources
# and the execution of the live tests. This allows RBAC to replicate and avoids flakiness in the first set
# of live tests that might otherwise start running before RBAC has replicated.

param (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. I'm going to steal this after it merges...

[hashtable] $DeploymentOutputs,
[string] $TenantId,
[string] $TestApplicationId,
[string] $TestApplicationSecret
)

Write-Verbose "Sleeping for 60 seconds to let RBAC replicate"
Start-Sleep -s 60