From 2103dbb04a0b7defe65ce77ca8c50b214a41a788 Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Tue, 11 Feb 2020 17:15:07 -0500 Subject: [PATCH] [Service Bus Client] Initial Live Test Infrastructure The focus of these changes is to introduce infrastructure to allow the Service Bus live tests to dynamically manage resources, creating a namespace for each test run and allowing each test to request a dedicated queue or topic scope. Tests using this infrastructure are safe to run in parallel and isolated such that tests are using individual Azure resources and will not cascade failures nor should one test be able to impact the operation of another. --- eng/Packages.Data.props | 1 + .../src/Receiver/SessionReceiverClient.cs | 2 +- .../Azure.Messaging.ServiceBus.Tests.csproj | 7 +- .../tests/Infrastructure/ServiceBusScope.cs | 627 ++++++++++++++++++ .../tests/Infrastructure/TestCategory.cs | 18 + .../tests/Infrastructure/TestEnvironment.cs | 276 ++++++++ .../tests/Infrastructure/TestRunFixture.cs | 43 ++ .../tests/Properties/AssemblyInfo.cs | 6 + .../tests/ReceiverLiveTests.cs | 59 +- .../tests/SenderLiveTests.cs | 124 ++-- .../tests/ServiceBusLiveTestBase.cs | 8 +- .../tests/ServiceBusTestBase.cs | 9 - .../tests/SessionReceiverLiveTests.cs | 240 +++---- 13 files changed, 1200 insertions(+), 220 deletions(-) mode change 100644 => 100755 sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionReceiverClient.cs mode change 100644 => 100755 sdk/servicebus/Azure.Messaging.ServiceBus/tests/Azure.Messaging.ServiceBus.Tests.csproj create mode 100755 sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusScope.cs create mode 100755 sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestCategory.cs create mode 100755 sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestEnvironment.cs create mode 100755 sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestRunFixture.cs create mode 100644 sdk/servicebus/Azure.Messaging.ServiceBus/tests/Properties/AssemblyInfo.cs mode change 100644 => 100755 sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs mode change 100644 => 100755 sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs mode change 100644 => 100755 sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusLiveTestBase.cs mode change 100644 => 100755 sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs mode change 100644 => 100755 sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs diff --git a/eng/Packages.Data.props b/eng/Packages.Data.props index 8849bbc6dafc..9f2521f9f0cf 100755 --- a/eng/Packages.Data.props +++ b/eng/Packages.Data.props @@ -35,6 +35,7 @@ + diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionReceiverClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionReceiverClient.cs old mode 100644 new mode 100755 index 819c0c6e6d1f..0ad9ab57a494 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionReceiverClient.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/SessionReceiverClient.cs @@ -22,7 +22,7 @@ namespace Azure.Messaging.ServiceBus.Receiver public class SessionReceiverClient : ServiceBusReceiverClient { /// - /// Gets the time that the session identified by see cref="SessionId"/> is locked until for this client. + /// Gets the time that the session identified by is locked until for this client. /// public virtual DateTime LockedUntilUtc { get; } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Azure.Messaging.ServiceBus.Tests.csproj b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Azure.Messaging.ServiceBus.Tests.csproj old mode 100644 new mode 100755 index bf3e55ff49f4..b447ac489d4e --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Azure.Messaging.ServiceBus.Tests.csproj +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Azure.Messaging.ServiceBus.Tests.csproj @@ -4,9 +4,7 @@ - - - + @@ -16,6 +14,9 @@ + + + diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusScope.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusScope.cs new file mode 100755 index 000000000000..d95d91e2d3a0 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusScope.cs @@ -0,0 +1,627 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Net.Sockets; +using System.Runtime.CompilerServices; +using System.Security.Authentication; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Management.ResourceManager; +using Microsoft.Azure.Management.ResourceManager.Models; +using Microsoft.Azure.Management.ServiceBus; +using Microsoft.Azure.Management.ServiceBus.Models; +using Microsoft.IdentityModel.Clients.ActiveDirectory; +using Microsoft.Rest; +using Microsoft.Rest.Azure; +using Polly; + +namespace Azure.Messaging.ServiceBus.Tests +{ + /// + /// Provides access to dynamically created instances of Service Bus resources which exists only in the context + /// of their scope. + /// + /// + public static class ServiceBusScope + { + /// The maximum number of attempts to retry a management operation. + private const int RetryMaximumAttempts = 20; + + /// The number of seconds to use as the basis for backing off on retry attempts. + private const double RetryExponentialBackoffSeconds = 3.0; + + /// The number of seconds to use as the basis for applying jitter to retry back-off calculations. + private const double RetryBaseJitterSeconds = 60.0; + + /// The buffer to apply when considering refreshing; credentials that expire less than this duration will be refreshed. + private static readonly TimeSpan CredentialRefreshBuffer = TimeSpan.FromMinutes(5); + + /// The random number generator to use for each requesting thread. + private static readonly ThreadLocal RandomNumberGenerator = new ThreadLocal(() => new Random(Interlocked.Increment(ref s_randomSeed)), false); + + /// The seed to use for random number generation. + private static int s_randomSeed = Environment.TickCount; + + /// The token credential to be used with the Service Bus management client. + private static ManagementToken s_managementToken; + + /// + /// Performs the tasks needed to create a new Service Bus namespace within a resource group, intended to be used as + /// an ephemeral container for the queue and topic instances used in a given test run. + /// + /// + /// The key attributes for identifying and accessing a dynamically created Service Bus namespace. + /// + public static async Task CreateNamespaceAsync() + { + var azureSubscription = TestEnvironment.ServiceBusAzureSubscription; + var resourceGroup = TestEnvironment.ServiceBusResourceGroup; + var token = await AquireManagementTokenAsync().ConfigureAwait(false); + + string CreateName() => $"net-servicebus-{ Guid.NewGuid().ToString("D").Substring(0, 30) }"; + + using (var client = new ServiceBusManagementClient(new TokenCredentials(token)) { SubscriptionId = azureSubscription }) + { + var location = await QueryResourceGroupLocationAsync(token, resourceGroup, azureSubscription).ConfigureAwait(false); + + var serviceBusNamspace = new SBNamespace(sku: new SBSku(SkuName.Standard, SkuTier.Standard), tags: GenerateTags(), location: location); + serviceBusNamspace = await CreateRetryPolicy().ExecuteAsync(() => client.Namespaces.CreateOrUpdateAsync(resourceGroup, CreateName(), serviceBusNamspace)).ConfigureAwait(false); + + var accessKey = await CreateRetryPolicy().ExecuteAsync(() => client.Namespaces.ListKeysAsync(resourceGroup, serviceBusNamspace.Name, TestEnvironment.ServiceBusDefaultSharedAccessKey)).ConfigureAwait(false); + return new TestEnvironment.NamespaceProperties(serviceBusNamspace.Name, accessKey.PrimaryConnectionString, shouldRemoveAtCompletion: true); + } + } + + /// + /// Performs the tasks needed to remove an ephemeral Service Bus namespace used as a container for queue and topic instances + /// for a specific test run. + /// + /// + /// The name of the namespace to delete. + /// + public static async Task DeleteNamespaceAsync(string namespaceName) + { + var azureSubscription = TestEnvironment.ServiceBusAzureSubscription; + var resourceGroup = TestEnvironment.ServiceBusResourceGroup; + var token = await AquireManagementTokenAsync().ConfigureAwait(false); + + using (var client = new ServiceBusManagementClient(new TokenCredentials(token)) { SubscriptionId = azureSubscription }) + { + await CreateRetryPolicy().ExecuteAsync(() => client.Namespaces.DeleteAsync(resourceGroup, namespaceName)).ConfigureAwait(false); + } + } + + /// + /// Creates a Service Bus scope associated with a queue instance, intended to be used in the context + /// of a single test and disposed when the test has completed. + /// + /// + /// When true, partitioning will be enabled on the queue that is created. + /// When true, a session will be enabled on the queue that is created. + /// When true, forces creation of a new queue even if an environmental override was specified to use an existing one. + /// The name of the calling method; this is intended to be populated by the runtime. + /// + /// The requested Service Bus . + /// + /// + /// If an environmental override was set to use an existing Service Bus queue resource and the flag + /// was not set, the existing queue will be assumed with no validation. In this case the and + /// parameters are also ignored. + /// + /// + public static async Task CreateWithQueue(bool enablePartitioning, + bool enableSession, + bool forceQueueCreation = false, + [CallerMemberName] string caller = "") + { + // If there was an override and the force flag is not set for creation, then build a scope + // for the specified queue. + + if ((!string.IsNullOrEmpty(TestEnvironment.OverrideQueueName)) && (!forceQueueCreation)) + { + return new QueueScope(TestEnvironment.ServiceBusNamespace, TestEnvironment.OverrideQueueName, false); + } + + // Create a new queue specific to the scope being created. + + caller = (caller.Length < 16) ? caller : caller.Substring(0, 15); + + var azureSubscription = TestEnvironment.ServiceBusAzureSubscription; + var resourceGroup = TestEnvironment.ServiceBusResourceGroup; + var serviceBusNamespace = TestEnvironment.ServiceBusNamespace; + var token = await AquireManagementTokenAsync().ConfigureAwait(false); + + string CreateName() => $"{ Guid.NewGuid().ToString("D").Substring(0, 13) }-{ caller }"; + + using (var client = new ServiceBusManagementClient(new TokenCredentials(token)) { SubscriptionId = azureSubscription }) + { + var queueParameters = new SBQueue(enablePartitioning: enablePartitioning, requiresSession: enableSession, maxSizeInMegabytes: 1024); + var queue = await CreateRetryPolicy().ExecuteAsync(() => client.Queues.CreateOrUpdateAsync(resourceGroup, serviceBusNamespace, CreateName(), queueParameters)).ConfigureAwait(false); + + return new QueueScope(serviceBusNamespace, queue.Name, true); + } + } + + /// + /// Creates a Service Bus scope associated with a topic instance, intended to be used in the context + /// of a single test and disposed when the test has completed. + /// + /// + /// When true, partitioning will be enabled on the topic that is created. + /// When true, a session will be enabled on the topic that is created. + /// The set of subscriptions to create for the topic. If null, a default subscription will be assumed. + /// When true, forces creation of a new topic even if an environmental override was specified to use an existing one. + /// The name of the calling method; this is intended to be populated by the runtime. + /// + /// The requested Service Bus . + /// + /// + /// If an environmental override was set to use an existing Service Bus queue resource and the flag + /// was not set, the existing queue will be assumed with no validation. In this case the , + /// , and parameters are also ignored. + /// + /// + public static async Task CreateWithTopic(bool enablePartitioning, + bool enableSession, + IEnumerable topicSubscriptions = null, + bool forceTopicCreation = false, + [CallerMemberName] string caller = "") + { + caller = (caller.Length < 16) ? caller : caller.Substring(0, 15); + topicSubscriptions ??= new[] { "default-subscription" }; + + var azureSubscription = TestEnvironment.ServiceBusAzureSubscription; + var resourceGroup = TestEnvironment.ServiceBusResourceGroup; + var serviceBusNamespace = TestEnvironment.ServiceBusNamespace; + var token = await AquireManagementTokenAsync().ConfigureAwait(false); + + using (var client = new ServiceBusManagementClient(new TokenCredentials(token)) { SubscriptionId = azureSubscription }) + { + // If there was an override and the force flag is not set for creation, then build a scope for the + // specified topic. Query the topic resource to build the list of its subscriptions for the scope. + + if ((!string.IsNullOrEmpty(TestEnvironment.OverrideTopicName)) && (!forceTopicCreation)) + { + var subscriptionPage = await CreateRetryPolicy>().ExecuteAsync(() => client.Subscriptions.ListByTopicAsync(resourceGroup, serviceBusNamespace, TestEnvironment.OverrideTopicName)).ConfigureAwait(false); + var existingSubscriptions = new List(subscriptionPage.Select(item => item.Name)); + + while (!string.IsNullOrEmpty(subscriptionPage.NextPageLink)) + { + subscriptionPage = await CreateRetryPolicy>().ExecuteAsync(() => client.Subscriptions.ListByTopicAsync(resourceGroup, serviceBusNamespace, TestEnvironment.OverrideTopicName)).ConfigureAwait(false); + existingSubscriptions.AddRange(subscriptionPage.Select(item => item.Name)); + } + + return new TopicScope(TestEnvironment.ServiceBusNamespace, TestEnvironment.OverrideTopicName, existingSubscriptions, false); + } + + // Create a new topic specific for the scope being created. + + string CreateName() => $"{ Guid.NewGuid().ToString("D").Substring(0, 13) }-{ caller }"; + + var duplicateDetection = TimeSpan.FromMinutes(10); + var topicParameters = new SBTopic(enablePartitioning: enablePartitioning, duplicateDetectionHistoryTimeWindow: duplicateDetection, maxSizeInMegabytes: 1024); + var topic = await CreateRetryPolicy().ExecuteAsync(() => client.Topics.CreateOrUpdateAsync(resourceGroup, serviceBusNamespace, CreateName(), topicParameters)).ConfigureAwait(false); + + var subscriptionParams = new SBSubscription(requiresSession: enableSession, duplicateDetectionHistoryTimeWindow: duplicateDetection, maxDeliveryCount: 10); + + var activeSubscriptions = await Task.WhenAll + ( + topicSubscriptions.Select(subscriptionName => + CreateRetryPolicy().ExecuteAsync(() => client.Subscriptions.CreateOrUpdateAsync(resourceGroup, serviceBusNamespace, topic.Name, subscriptionName, subscriptionParams))) + + ).ConfigureAwait(false); + + return new TopicScope(serviceBusNamespace, topic.Name, activeSubscriptions.Select(item => item.Name).ToList(), true); + } + } + + /// + /// Queries the location for the requested Azure Resource Group. + /// + /// + /// The access token to use for the query request. + /// The name of the resource group to query the location of. + /// The identifier of the Azure subscription in which the resource group resides. + /// + /// The location code for the requested . + /// + private static async Task QueryResourceGroupLocationAsync(string accessToken, + string resourceGroupName, + string subscriptionId) + { + using var client = new ResourceManagementClient(new TokenCredentials(accessToken)) { SubscriptionId = subscriptionId }; + { + ResourceGroup resourceGroup = await CreateRetryPolicy().ExecuteAsync(() => client.ResourceGroups.GetAsync(resourceGroupName)); + return resourceGroup.Location; + } + } + + /// + /// Acquires a JWT token for use with the Service Bus management client. + /// + /// + /// The token to use for management operations against the Service Bus Live test namespace. + /// + private static async Task AquireManagementTokenAsync() + { + ManagementToken token = s_managementToken; + + // If there was no current token, or it is within the buffer for expiration, request a new token. + // There is a benign race condition here, where there may be multiple requests in-flight for a new token. Since + // this is test infrastructure, just allow the acquired token to replace the current one without attempting to + // coordinate or ensure that the most recent is kept. + + if ((token == null) || (token.ExpiresOn <= DateTimeOffset.UtcNow.Add(CredentialRefreshBuffer))) + { + var credential = new ClientCredential(TestEnvironment.ServiceBusClient, TestEnvironment.ServiceBusSecret); + var context = new AuthenticationContext($"https://login.windows.net/{ TestEnvironment.ServiceBusTenant }"); + var result = await context.AcquireTokenAsync("https://management.core.windows.net/", credential); + + if ((string.IsNullOrEmpty(result?.AccessToken))) + { + throw new AuthenticationException("Unable to acquire an Active Directory token for the Service Bus management client."); + } + + token = new ManagementToken(result.AccessToken, result.ExpiresOn); + Interlocked.Exchange(ref s_managementToken, token); + } + + return token.Token; + } + + /// + /// Generates the set of common metadata tags to apply to an ephemeral cloud resource + /// used for test purposes. + /// + /// + /// The set of metadata tags to apply. + /// + private static Dictionary GenerateTags() => + new Dictionary + { + { "source", typeof(ServiceBusScope).Assembly.GetName().Name }, + { "platform", System.Runtime.InteropServices.RuntimeInformation.OSDescription }, + { "framework", System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription }, + { "created", $"{ DateTimeOffset.UtcNow.ToString("s") }Z" }, + { "cleanup-after", $"{ DateTimeOffset.UtcNow.AddDays(1).ToString("s") }Z" } + }; + + /// + /// Creates the retry policy to apply to a management operation. + /// + /// + /// The expected type of response from the management operation. + /// + /// The maximum retry attempts to allow. + /// The number of seconds to use as the basis for backing off on retry attempts. + /// TThe number of seconds to use as the basis for applying jitter to retry back-off calculations. + /// + /// The retry policy in which to execute the management operation. + /// + private static IAsyncPolicy CreateRetryPolicy(int maxRetryAttempts = RetryMaximumAttempts, + double exponentialBackoffSeconds = RetryExponentialBackoffSeconds, + double baseJitterSeconds = RetryBaseJitterSeconds) => + Policy + .Handle(ex => ShouldRetry(ex)) + .WaitAndRetryAsync(maxRetryAttempts, attempt => CalculateRetryDelay(attempt, exponentialBackoffSeconds, baseJitterSeconds)); + + /// + /// Creates the retry policy to apply to a management operation. + /// + /// + /// The maximum retry attempts to allow. + /// The number of seconds to use as the basis for backing off on retry attempts. + /// TThe number of seconds to use as the basis for applying jitter to retry back-off calculations. + /// + /// The retry policy in which to execute the management operation. + /// + private static IAsyncPolicy CreateRetryPolicy(int maxRetryAttempts = RetryMaximumAttempts, + double exponentialBackoffSeconds = RetryExponentialBackoffSeconds, + double baseJitterSeconds = RetryBaseJitterSeconds) => + Policy + .Handle(ex => ShouldRetry(ex)) + .WaitAndRetryAsync(maxRetryAttempts, attempt => CalculateRetryDelay(attempt, exponentialBackoffSeconds, baseJitterSeconds)); + + /// + /// Determines whether the specified HTTP status code is considered eligible to retry + /// the associated operation. + /// + /// + /// The status code to consider. + /// + /// true if the status code is eligible for retries; otherwise, false. + /// + private static bool IsRetriableStatus(HttpStatusCode statusCode) => + ((statusCode == HttpStatusCode.Unauthorized) + || (statusCode == ((HttpStatusCode)408)) + || (statusCode == HttpStatusCode.Conflict) + || (statusCode == ((HttpStatusCode)429)) + || (statusCode == HttpStatusCode.InternalServerError) + || (statusCode == HttpStatusCode.ServiceUnavailable) + || (statusCode == HttpStatusCode.GatewayTimeout)); + + /// + /// Determines whether the specified exception is considered eligible to retry the associated + /// operation. + /// + /// + /// The exception to consider. + /// + /// true if the exception is eligible for retries; otherwise, false. + /// + private static bool ShouldRetry(Exception ex) => + ((IsRetriableException(ex)) || (IsRetriableException(ex?.InnerException))); + + /// + /// Determines whether the type of the specified exception is considered eligible to retry + /// the associated operation. + /// + /// + /// The exception to consider. + /// + /// true if the exception type is eligible for retries; otherwise, false. + /// + private static bool IsRetriableException(Exception ex) + { + if (ex == null) + { + return false; + } + + switch (ex) + { + case ErrorResponseException erEx: + return IsRetriableStatus(erEx.Response.StatusCode); + + case CloudException clEx: + return IsRetriableStatus(clEx.Response.StatusCode); + + case TimeoutException _: + case TaskCanceledException _: + case OperationCanceledException _: + case HttpRequestException _: + case WebException _: + case SocketException _: + case IOException _: + return true; + + default: + return false; + }; + } + + /// + /// Calculates the retry delay to use for management-related operations. + /// + /// + /// The current attempt number. + /// The exponential back-off amount,, in seconds. + /// The amount of base jitter to include, in seconds. + /// + /// The interval to wait before retrying the attempted operation. + /// + private static TimeSpan CalculateRetryDelay(int attempt, + double exponentialBackoffSeconds, + double baseJitterSeconds) => + TimeSpan.FromSeconds((Math.Pow(2, attempt) * exponentialBackoffSeconds) + (RandomNumberGenerator.Value.NextDouble() * baseJitterSeconds)); + + /// + /// Provides access to dynamically created Service Bus Queue instance which exists only in the context + /// of the scope; disposal removes the resource from Azure. + /// + /// + /// + /// + public sealed class QueueScope : IAsyncDisposable + { + /// Serves as a sentinel flag to denote when the instance has been disposed. + private bool _disposed = false; + + /// + /// The name of the Service Bus namespace associated with the queue. + /// + /// + public string NamespaceName { get; } + + /// + /// The name of the queue. + /// + /// + public string QueueName { get; } + + /// + /// A flag indicating if the queue should be removed when the scope is complete. + /// + /// + /// true if the queue was should be removed at scope completion; otherwise, false. + /// + private bool ShouldRemoveAtScopeCompletion { get; } + + /// + /// Initializes a new instance of the class. + /// + /// + /// The name of the Service Bus namespace to which the queue is associated. + /// The name of the queue. + /// A flag indicating whether the queue should be removed when the scope is complete. + /// + public QueueScope(string serviceBusNamespaceName, + string queueName, + bool shouldRemoveAtScopeCompletion) + { + NamespaceName = serviceBusNamespaceName; + QueueName = queueName; + ShouldRemoveAtScopeCompletion = shouldRemoveAtScopeCompletion; + } + + /// + /// Performs the tasks needed to remove the dynamically created Service Bus Queue. + /// + /// + public async ValueTask DisposeAsync() + { + if (!ShouldRemoveAtScopeCompletion) + { + _disposed = true; + } + + if (_disposed) + { + return; + } + + try + { + var azureSubscription = TestEnvironment.ServiceBusAzureSubscription; + var resourceGroup = TestEnvironment.ServiceBusResourceGroup; + var token = await AquireManagementTokenAsync().ConfigureAwait(false); + + using var client = new ServiceBusManagementClient(new TokenCredentials(token)) { SubscriptionId = azureSubscription }; + await CreateRetryPolicy().ExecuteAsync(() => client.Queues.DeleteAsync(resourceGroup, NamespaceName, QueueName)).ConfigureAwait(false); + } + catch + { + // This should not be considered a critical failure that results in a test failure. Due + // to ARM being temperamental, some management operations may be rejected. Throwing here + // does not help to ensure resource cleanup only flags the test itself as a failure. + // + // If a queue fails to be deleted, removing of the associated namespace at the end of the + // test run will also remove the orphan. + } + + _disposed = true; + } + } + + /// + /// Provides access to dynamically created Service Bus Topic instance which exists only in the context + /// of the scope; disposal removes the resource from Azure. + /// + /// + /// + /// + public sealed class TopicScope : IAsyncDisposable + { + /// Serves as a sentinel flag to denote when the instance has been disposed. + private bool _disposed = false; + + /// + /// The name of the Service Bus namespace associated with the queue. + /// + /// + public string NamespaceName { get; } + + /// + /// The name of the topic. + /// + /// + public string TopicName { get; } + + /// + /// The set of names for the subscriptions associated with the topic. + /// + /// + public IReadOnlyList SubscriptionNames { get; } + + /// + /// A flag indicating if the topic should be removed when the scope is complete. + /// + /// + /// true if the queue was should be removed at scope completion; otherwise, false. + /// + private bool ShouldRemoveAtScopeCompletion { get; } + + /// + /// Initializes a new instance of the class. + /// + /// + /// The name of the Service Bus namespace to which the queue is associated. + /// The name of the topic. + /// The set of names for the subscriptions + /// A flag indicating whether the topic should be removed when the scope is complete. + /// + public TopicScope(string serviceBusNamespaceName, + string topicName, + IReadOnlyList subscriptionNames, + bool shouldRemoveAtScopeCompletion) + { + NamespaceName = serviceBusNamespaceName; + TopicName = topicName; + SubscriptionNames = subscriptionNames; + ShouldRemoveAtScopeCompletion = shouldRemoveAtScopeCompletion; + } + + /// + /// Performs the tasks needed to remove the dynamically created Service Bus Topic. + /// + /// + public async ValueTask DisposeAsync() + { + if (!ShouldRemoveAtScopeCompletion) + { + _disposed = true; + } + + if (_disposed) + { + return; + } + + try + { + var azureSubscription = TestEnvironment.ServiceBusAzureSubscription; + var resourceGroup = TestEnvironment.ServiceBusResourceGroup; + var token = await AquireManagementTokenAsync().ConfigureAwait(false); + + using var client = new ServiceBusManagementClient(new TokenCredentials(token)) { SubscriptionId = azureSubscription }; + await CreateRetryPolicy().ExecuteAsync(() => client.Topics.DeleteAsync(resourceGroup, NamespaceName, TopicName)).ConfigureAwait(false); + } + catch + { + // This should not be considered a critical failure that results in a test failure. Due + // to ARM being temperamental, some management operations may be rejected. Throwing here + // does not help to ensure resource cleanup only flags the test itself as a failure. + // + // If a queue fails to be deleted, removing of the associated namespace at the end of the + // test run will also remove the orphan. + } + + _disposed = true; + } + } + + /// + /// An internal type for tracking the management access token and + /// its associated expiration. + /// + /// + private class ManagementToken + { + /// The value bearer token to use for authorization. + public readonly string Token; + + /// The date and time, in UTC, that the token expires. + public readonly DateTimeOffset ExpiresOn; + + /// + /// Initializes a new instance of the class. + /// + /// + /// The value of the bearer token. + /// The date and time, in UTC, that the token expires on. + /// + public ManagementToken(string token, + DateTimeOffset expiresOn) + { + Token = token; + ExpiresOn = expiresOn; + } + } + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestCategory.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestCategory.cs new file mode 100755 index 000000000000..b562573c6d4c --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestCategory.cs @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace Azure.Messaging.ServiceBus.Tests +{ + /// + /// The classification of a test or a suite of related tests. + /// + /// + public static class TestCategory + { + /// The associated test is meant to verify a scenario which depends upon one ore more live Azure services. + public const string Live = "Live"; + + /// The associated test should not be included when Visual Studio is performing "Live Unit Testing" runs. + public const string DisallowVisualStudioLiveUnitTesting = "SkipWhenLiveUnitTesting"; + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestEnvironment.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestEnvironment.cs new file mode 100755 index 000000000000..be95162c221f --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestEnvironment.cs @@ -0,0 +1,276 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus.Core; + +namespace Azure.Messaging.ServiceBus.Tests +{ + /// + /// Represents the ambient environment in which the test suite is + /// being run, offering access to information such as environment + /// variables. + /// + /// + public static class TestEnvironment + { + /// The environment variable value for the Service Bus subscription name, lazily evaluated. + private static readonly Lazy ServiceBusAzureSubscriptionInstance = + new Lazy(() => ReadAndVerifyEnvironmentVariable("SERVICE_BUS_SUBSCRIPTION"), LazyThreadSafetyMode.PublicationOnly); + + /// The environment variable value for the Service Bus resource group name, lazily evaluated. + private static readonly Lazy ServiceBusResourceGroupInstance = + new Lazy(() => ReadAndVerifyEnvironmentVariable("SERVICE_BUS_RESOURCEGROUP"), LazyThreadSafetyMode.PublicationOnly); + + /// The environment variable value for the Azure Active Directory tenant that holds the service principal, lazily evaluated. + private static readonly Lazy ServiceBusTenantInstance = + new Lazy(() => ReadAndVerifyEnvironmentVariable("SERVICE_BUS_TENANT"), LazyThreadSafetyMode.PublicationOnly); + + /// The environment variable value for the Azure Active Directory client identifier of the service principal, lazily evaluated. + private static readonly Lazy ServiceBusClientInstance = + new Lazy(() => ReadAndVerifyEnvironmentVariable("SERVICE_BUS_CLIENT"), LazyThreadSafetyMode.PublicationOnly); + + /// The environment variable value for the Azure Active Directory client secret of the service principal, lazily evaluated. + private static readonly Lazy ServiceBusSecretInstance = + new Lazy(() => ReadAndVerifyEnvironmentVariable("SERVICE_BUS_SECRET"), LazyThreadSafetyMode.PublicationOnly); + + /// The environment variable value for the override connection string to indicate an existing namespace should be used, lazily evaluated. + private static readonly Lazy ServiceBusOverrideConnectionString = + new Lazy(() => ReadEnvironmentVariable("SERVICE_BUS_NAMESPACE_CONNECTION_STRING"), LazyThreadSafetyMode.PublicationOnly); + + /// The environment variable value for the override name of an existing queue should be used when a queue scope is requested, lazily evaluated. + private static readonly Lazy ServiceBusOverrideQueueName = + new Lazy(() => ReadEnvironmentVariable("SERVICE_BUS_OVERRIDE_QUEUE"), LazyThreadSafetyMode.PublicationOnly); + + /// The environment variable value for the override name of an existing queue should be used when a topic scope is requested, lazily evaluated. + private static readonly Lazy ServiceBusOverrideTopicName = + new Lazy(() => ReadEnvironmentVariable("SERVICE_BUS_OVERRIDE_TOPIC"), LazyThreadSafetyMode.PublicationOnly); + + /// The active Service Bus namespace for this test run, lazily created. + private static readonly Lazy ActiveServiceBusNamespace = + new Lazy(EnsureServiceBusNamespace, LazyThreadSafetyMode.ExecutionAndPublication); + + /// The active Service Bus namespace for this test run, lazily created. + private static readonly Lazy ParsedConnectionString = + new Lazy(() => ConnectionStringParser.Parse(ServiceBusConnectionString), LazyThreadSafetyMode.ExecutionAndPublication); + + /// The name of the shared access key to be used for accessing an Service Bus namespace. + public const string ServiceBusDefaultSharedAccessKey = "RootManageSharedAccessKey"; + + /// + /// Indicates whether or not an ephemeral namespace was created for the current test execution. + /// + /// + /// true if an Service Bus namespace was created for the current test run; otherwise, false. + /// + public static bool ShouldRemoveNamespaceAfterTestRunCompletion => (ActiveServiceBusNamespace.IsValueCreated && ActiveServiceBusNamespace.Value.ShouldRemoveAtCompletion); + + /// + /// The connection string for the Service Bus namespace instance to be used for + /// Live tests. + /// + /// + /// The connection string will be determined by creating an ephemeral Service Bus namespace for the test execution. + /// + public static string ServiceBusConnectionString => ActiveServiceBusNamespace.Value.ConnectionString; + + /// + /// The name of the Service Bus namespace to be used for Live tests. + /// + /// + /// The name will be determined by creating an ephemeral Service Bus namespace for the test execution. + /// + public static string ServiceBusNamespace => ActiveServiceBusNamespace.Value.Name; + + /// + /// The name of the Azure subscription containing the Service Bus namespace instance to be used for + /// Live tests. + /// + /// + /// The name of the namespace is read from the "SERVICE_BUS_SUBSCRIPTION" environment variable. + /// + public static string ServiceBusAzureSubscription => ServiceBusAzureSubscriptionInstance.Value; + + /// + /// The name of the resource group containing the Service Bus namespace instance to be used for + /// Live tests. + /// + /// + /// The name of the namespace is read from the "SERVICE_BUS_RESOURCEGROUP" environment variable. + /// + public static string ServiceBusResourceGroup => ServiceBusResourceGroupInstance.Value; + + /// + /// The name of the Azure Active Directory tenant that holds the service principal to use for management + /// of the Service Bus namespace during Live tests. + /// + /// + /// The name of the namespace is read from the "SERVICE_BUS_TENANT" environment variable. + /// + public static string ServiceBusTenant => ServiceBusTenantInstance.Value; + + /// + /// The name of the Azure Active Directory client identifier of the service principal to use for management + /// of the Service Bus namespace during Live tests. + /// + /// + /// The name of the namespace is read from the "SERVICE_BUS_CLIENT" environment variable. + /// + public static string ServiceBusClient => ServiceBusClientInstance.Value; + + /// + /// The name of the Azure Active Directory client secret of the service principal to use for management + /// of the Service Bus namespace during Live tests. + /// + /// + /// The name of the namespace is read from the "SERVICE_BUS_SECRET" environment variable. + /// + public static string ServiceBusSecret => ServiceBusSecretInstance.Value; + + /// + /// The fully qualified namespace for the Service Bus namespace represented by this scope. + /// + /// + /// The fully qualified namespace, as contained within the associated connection string. + /// + public static string FullyQualifiedNamespace => ParsedConnectionString.Value.Endpoint.Host; + + /// + /// The shared access key name for the Service Bus namespace represented by this scope. + /// + /// + /// The shared access key name, as contained within the associated connection string. + /// + public static string SharedAccessKeyName => ParsedConnectionString.Value.SharedAccessKeyName; + + /// + /// The shared access key for the Service Bus namespace represented by this scope. + /// + /// + /// The shared access key, as contained within the associated connection string. + /// + public static string SharedAccessKey => ParsedConnectionString.Value.SharedAccessKey; + + /// + /// The name of an existing Service Bus queue to consider an override and use when + /// requesting a test scope, overriding the creation of a new dynamic queue specific to + /// the scope. + /// + /// + public static string OverrideQueueName => ServiceBusOverrideQueueName.Value; + + /// + /// The name of an existing Service Bus topic to consider an override and use when + /// requesting a test scope, overriding the creation of a new dynamic topic specific to + /// the scope. + /// + /// + public static string OverrideTopicName => ServiceBusOverrideTopicName.Value; + + /// + /// Builds a connection string for a specific Service Bus entity instance under the namespace used for + /// Live tests. + /// + /// + /// The name of the entity for which the connection string is being built. + /// + /// The connection string to the requested Service Bus namespace and entity. + /// + public static string BuildConnectionStringForEntity(string entityName) => $"{ ServiceBusConnectionString };EntityPath={ entityName }"; + + /// + /// Reads an environment variable. + /// + /// + /// The name of the environment variable to read. + /// + /// The value of the environment variable, if present and populated; null otherwise + /// + private static string ReadEnvironmentVariable(string variableName) => + Environment.GetEnvironmentVariable(variableName); + + /// + /// Reads an environment variable, ensuring that it is populated. + /// + /// + /// The name of the environment variable to read. + /// + /// The value of the environment variable, if present and populated; otherwise, a is thrown. + /// + private static string ReadAndVerifyEnvironmentVariable(string variableName) + { + var environmentValue = ReadEnvironmentVariable(variableName); + + if (string.IsNullOrWhiteSpace(environmentValue)) + { + throw new InvalidOperationException($"The environment variable '{ variableName }' was not found or was not populated."); + } + + return environmentValue; + } + + /// + /// Ensures that a Service Bus namespace is available. If the override was set for the environment, + /// that namespace will be respected. Otherwise, a new Service Bus namespace will be created on Azure for this test run. + /// + /// + /// The active Service Bus namespace for this test run. + /// + private static NamespaceProperties EnsureServiceBusNamespace() + { + if (!string.IsNullOrEmpty(ServiceBusOverrideConnectionString.Value)) + { + var parsed = ConnectionStringParser.Parse(ServiceBusOverrideConnectionString.Value); + + return new NamespaceProperties + ( + parsed.Endpoint.Host.Substring(0, parsed.Endpoint.Host.IndexOf('.')), + ServiceBusOverrideConnectionString.Value.Replace($";EntityPath={ parsed.EntityName }", string.Empty), + false + ); + } + + return Task + .Run(async () => await ServiceBusScope.CreateNamespaceAsync().ConfigureAwait(false)) + .ConfigureAwait(false) + .GetAwaiter() + .GetResult(); + } + + /// + /// The key attributes for identifying and accessing a dynamically created Service Bus namespace, + /// intended to serve as an ephemeral container for the entity instances used during a test run. + /// + /// + public struct NamespaceProperties + { + /// The name of the namespace. + public readonly string Name; + + /// The connection string to use for accessing the dynamically created namespace. + public readonly string ConnectionString; + + /// A flag indicating if the namespace was dynamically created by the test environment. + public readonly bool ShouldRemoveAtCompletion; + + /// + /// Initializes a new instance of the struct. + /// + /// + /// The name of the namespace. + /// The connection string to use for accessing the namespace. + /// A flag indicating if the namespace should be removed when the test run has completed. + /// + public NamespaceProperties(string name, + string connectionString, + bool shouldRemoveAtCompletion) + { + Name = name; + ConnectionString = connectionString; + ShouldRemoveAtCompletion = shouldRemoveAtCompletion; + } + } + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestRunFixture.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestRunFixture.cs new file mode 100755 index 000000000000..b688ce99f2ef --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/TestRunFixture.cs @@ -0,0 +1,43 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using NUnit.Framework; + +namespace Azure.Messaging.ServiceBus.Tests +{ + /// + /// Serves as a fixture for operations that are scoped to the entire + /// test run pass, rather than specific to a given test or test fixture. + /// + /// + [SetUpFixture] + public class TestRunFixture + { + /// + /// Performs the tasks needed to clean up after a test run + /// has completed. + /// + /// + [OneTimeTearDown] + public void Teardown() + { + try + { + if (TestEnvironment.ShouldRemoveNamespaceAfterTestRunCompletion) + { + ServiceBusScope.DeleteNamespaceAsync(TestEnvironment.ServiceBusNamespace).GetAwaiter().GetResult(); + } + } + catch + { + // This should not be considered a critical failure that results in a test run failure. Due + // to ARM being temperamental, some management operations may be rejected. Throwing here + // does not help to ensure resource cleanup. + // + // Because resources may be orphaned outside of an observed exception, throwing to raise awareness + // is not sufficient for all scenarios; since an external process is already needed to manage + // orphans, there is no benefit to failing the run; allow the test results to be reported. + } + } + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Properties/AssemblyInfo.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Properties/AssemblyInfo.cs new file mode 100644 index 000000000000..b5f9716641f8 --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Properties/AssemblyInfo.cs @@ -0,0 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using NUnit.Framework; + +[assembly: Parallelizable(ParallelScope.All)] diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs old mode 100644 new mode 100755 index 6ec748ff0023..6f2da1d08114 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ReceiverLiveTests.cs @@ -1,11 +1,9 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; -using Azure.Core.Testing; using Azure.Messaging.ServiceBus.Receiver; using Azure.Messaging.ServiceBus.Sender; using NUnit.Framework; @@ -17,35 +15,38 @@ public class ReceiverLiveTests : ServiceBusLiveTestBase [Test] public async Task Peek() { - var sender = new ServiceBusSenderClient(ConnString, QueueName); - var messageCt = 10; - - IEnumerable sentMessages = GetMessages(messageCt); - await sender.SendRangeAsync(sentMessages); - - var receiver = new QueueReceiverClient(ConnString, QueueName); - - Dictionary sentMessageIdToLabel = new Dictionary(); - foreach (ServiceBusMessage message in sentMessages) - { - sentMessageIdToLabel.Add(message.MessageId, Encoding.Default.GetString(message.Body)); - } - IAsyncEnumerable peekedMessages = receiver.PeekRangeAsync( - maxMessages: messageCt); - - var ct = 0; - await foreach (ServiceBusMessage peekedMessage in peekedMessages) + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { - var peekedText = Encoding.Default.GetString(peekedMessage.Body); - //var sentText = sentMessageIdToLabel[peekedMessage.MessageId]; - - //sentMessageIdToLabel.Remove(peekedMessage.MessageId); - //Assert.AreEqual(sentText, peekedText); - - TestContext.Progress.WriteLine($"{peekedMessage.Label}: {peekedText}"); - ct++; + await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + var messageCt = 10; + + IEnumerable sentMessages = GetMessages(messageCt); + await sender.SendRangeAsync(sentMessages); + + await using var receiver = new QueueReceiverClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + + Dictionary sentMessageIdToLabel = new Dictionary(); + foreach (ServiceBusMessage message in sentMessages) + { + sentMessageIdToLabel.Add(message.MessageId, Encoding.Default.GetString(message.Body)); + } + IAsyncEnumerable peekedMessages = receiver.PeekRangeAsync( + maxMessages: messageCt); + + var ct = 0; + await foreach (ServiceBusMessage peekedMessage in peekedMessages) + { + var peekedText = Encoding.Default.GetString(peekedMessage.Body); + //var sentText = sentMessageIdToLabel[peekedMessage.MessageId]; + + //sentMessageIdToLabel.Remove(peekedMessage.MessageId); + //Assert.AreEqual(sentText, peekedText); + + TestContext.Progress.WriteLine($"{peekedMessage.Label}: {peekedText}"); + ct++; + } + Assert.AreEqual(messageCt, ct); } - Assert.AreEqual(messageCt, ct); } } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs old mode 100644 new mode 100755 index 2d426e46401d..3763650fc17e --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SenderLiveTests.cs @@ -1,21 +1,15 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -using NUnit.Framework; -using Azure.Messaging.ServiceBus.Sender; -using Azure.Messaging.ServiceBus; +using System; +using System.Net; using System.Threading.Tasks; using Azure.Identity; -using System.Net; -using Azure.Messaging.ServiceBus.Tests; -using System; +using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Receiver; -using System.Xml.Schema; -using Azure.Core.Testing; -using Moq; -using Azure.Messaging.ServiceBus.Amqp; -using Azure.Messaging.ServiceBus.Core; -using Azure.Core.Pipeline; +using Azure.Messaging.ServiceBus.Sender; +using Azure.Messaging.ServiceBus.Tests; +using NUnit.Framework; namespace Microsoft.Azure.Template.Tests { @@ -24,83 +18,101 @@ public class SenderLiveTests : ServiceBusLiveTestBase [Test] public async Task Send_ConnString() { - var sender = new ServiceBusSenderClient(ConnString, QueueName); - await sender.SendRangeAsync(GetMessages(10)); + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + await sender.SendRangeAsync(GetMessages(10)); + } } [Test] public async Task Send_Token() { ClientSecretCredential credential = new ClientSecretCredential( - TenantId, - ClientId, - ClientSecret); + TestEnvironment.ServiceBusTenant, + TestEnvironment.ServiceBusClient, + TestEnvironment.ServiceBusSecret); - var sender = new ServiceBusSenderClient(Endpoint, QueueName, credential); - await sender.SendAsync(GetMessage()); + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var sender = new ServiceBusSenderClient(TestEnvironment.FullyQualifiedNamespace, scope.QueueName, credential); + await sender.SendAsync(GetMessage()); + } } [Test] public async Task Send_Connection_Topic() { - var conn = new ServiceBusConnection(ConnString, TopicName); - var options = new ServiceBusSenderClientOptions + await using (var scope = await ServiceBusScope.CreateWithTopic(enablePartitioning: false, enableSession: false)) { - RetryOptions = new ServiceBusRetryOptions(), - ConnectionOptions = new ServiceBusConnectionOptions() + await using var conn = new ServiceBusConnection(TestEnvironment.ServiceBusConnectionString, scope.TopicName); + var options = new ServiceBusSenderClientOptions { - TransportType = ServiceBusTransportType.AmqpWebSockets, - Proxy = new WebProxy("localHost") - } - }; - options.RetryOptions.Mode = ServiceBusRetryMode.Exponential; - var sender = new ServiceBusSenderClient(conn, options); + RetryOptions = new ServiceBusRetryOptions(), + ConnectionOptions = new ServiceBusConnectionOptions() + { + TransportType = ServiceBusTransportType.AmqpWebSockets, + Proxy = new WebProxy("localHost") + } + }; + options.RetryOptions.Mode = ServiceBusRetryMode.Exponential; - await sender.SendAsync(GetMessage()); + await using var sender = new ServiceBusSenderClient(conn, options); + await sender.SendAsync(GetMessage()); + } } [Test] public async Task Send_Topic_Session() { - var conn = new ServiceBusConnection(ConnString, "joshtopic"); - var options = new ServiceBusSenderClientOptions + await using (var scope = await ServiceBusScope.CreateWithTopic(enablePartitioning: false, enableSession: false)) { - RetryOptions = new ServiceBusRetryOptions(), - ConnectionOptions = new ServiceBusConnectionOptions() + await using var conn = new ServiceBusConnection(TestEnvironment.ServiceBusConnectionString, scope.TopicName); + var options = new ServiceBusSenderClientOptions { - TransportType = ServiceBusTransportType.AmqpWebSockets, - Proxy = new WebProxy("localHost") - } - }; - options.RetryOptions.Mode = ServiceBusRetryMode.Exponential; - var sender = new ServiceBusSenderClient(conn, options); - var message = GetMessage(); - message.SessionId = "1"; - await sender.SendAsync(message); + RetryOptions = new ServiceBusRetryOptions(), + ConnectionOptions = new ServiceBusConnectionOptions() + { + TransportType = ServiceBusTransportType.AmqpWebSockets, + Proxy = new WebProxy("localHost") + } + }; + options.RetryOptions.Mode = ServiceBusRetryMode.Exponential; + await using var sender = new ServiceBusSenderClient(conn, options); + var message = GetMessage(); + message.SessionId = "1"; + await sender.SendAsync(message); + } } [Test] - public void ClientProperties() + public async Task ClientProperties() { - var sender = new ServiceBusSenderClient(ConnString, QueueName); - Assert.AreEqual(QueueName, sender.EntityName); - Assert.AreEqual(Endpoint, sender.FullyQualifiedNamespace); + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + Assert.AreEqual(scope.QueueName, sender.EntityName); + Assert.AreEqual(TestEnvironment.FullyQualifiedNamespace, sender.FullyQualifiedNamespace); + } } [Test] public async Task Schedule() { - var sender = new ServiceBusSenderClient(ConnString, QueueName); - var scheduleTime = DateTimeOffset.UtcNow.AddHours(10); - var sequenceNum = await sender.ScheduleMessageAsync(GetMessage(), scheduleTime); + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) + { + await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + var scheduleTime = DateTimeOffset.UtcNow.AddHours(10); + var sequenceNum = await sender.ScheduleMessageAsync(GetMessage(), scheduleTime); - var receiver = new QueueReceiverClient(ConnString, QueueName); - ServiceBusMessage msg = await receiver.PeekBySequenceAsync(sequenceNum); - Assert.AreEqual(0, Convert.ToInt32(new TimeSpan(scheduleTime.Ticks - msg.ScheduledEnqueueTimeUtc.Ticks).TotalSeconds)); + await using var receiver = new QueueReceiverClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + ServiceBusMessage msg = await receiver.PeekBySequenceAsync(sequenceNum); + Assert.AreEqual(0, Convert.ToInt32(new TimeSpan(scheduleTime.Ticks - msg.ScheduledEnqueueTimeUtc.Ticks).TotalSeconds)); - await sender.CancelScheduledMessageAsync(sequenceNum); - msg = await receiver.PeekBySequenceAsync(sequenceNum); - Assert.IsNull(msg); + await sender.CancelScheduledMessageAsync(sequenceNum); + msg = await receiver.PeekBySequenceAsync(sequenceNum); + Assert.IsNull(msg); + } } } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusLiveTestBase.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusLiveTestBase.cs old mode 100644 new mode 100755 index d2dad0630748..0f330a3f1720 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusLiveTestBase.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusLiveTestBase.cs @@ -1,14 +1,12 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -using System; -using System.Collections.Generic; -using System.Text; -using Azure.Core.Testing; +using NUnit.Framework; namespace Azure.Messaging.ServiceBus.Tests { - [LiveOnly] + [Category(TestCategory.Live)] + [Category(TestCategory.DisallowVisualStudioLiveUnitTesting)] public class ServiceBusLiveTestBase : ServiceBusTestBase { } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs old mode 100644 new mode 100755 index d10d2e4c4258..87985cdba958 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/ServiceBusTestBase.cs @@ -9,15 +9,6 @@ namespace Azure.Messaging.ServiceBus.Tests { public class ServiceBusTestBase { - protected static string ConnString = Environment.GetEnvironmentVariable("SERVICE_BUS_CONN_STRING", EnvironmentVariableTarget.Machine); - protected static string TenantId = Environment.GetEnvironmentVariable("TENANT_ID", EnvironmentVariableTarget.Machine); - protected static string ClientId = Environment.GetEnvironmentVariable("CLIENT_ID", EnvironmentVariableTarget.Machine); - protected static string ClientSecret = Environment.GetEnvironmentVariable("CLIENT_SECRET", EnvironmentVariableTarget.Machine); - protected const string QueueName = "josh"; - protected const string SessionQueueName = "joshsession"; - protected const string TopicName = "joshtopic"; - protected const string Endpoint = "jolovservicebus.servicebus.windows.net"; - protected IEnumerable GetMessages(int count, string sessionId = null, string partitionKey = null) { var messages = new List(); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs old mode 100644 new mode 100755 index e4b0fadc6313..07e5ad1bf742 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/SessionReceiverLiveTests.cs @@ -21,78 +21,76 @@ public class SessionReceiverLiveTests : ServiceBusLiveTestBase [TestCase(null, null)] public async Task Peek_Session(long? sequenceNumber, string partitionKey) { - var sender = new ServiceBusSenderClient(ConnString, SessionQueueName); - var messageCt = 10; - var sessionId = Guid.NewGuid().ToString(); - - // send the messages - IEnumerable sentMessages = GetMessages(messageCt, sessionId, partitionKey); - await sender.SendRangeAsync(sentMessages); - Dictionary sentMessageIdToMsg = new Dictionary(); - foreach (ServiceBusMessage message in sentMessages) + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) { - sentMessageIdToMsg.Add(message.MessageId, message); - } + await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + var messageCt = 10; + var sessionId = Guid.NewGuid().ToString(); + + // send the messages + IEnumerable sentMessages = GetMessages(messageCt, sessionId, partitionKey); + await sender.SendRangeAsync(sentMessages); + Dictionary sentMessageIdToMsg = new Dictionary(); + foreach (ServiceBusMessage message in sentMessages) + { + sentMessageIdToMsg.Add(message.MessageId, message); + } - // peek the messages - var receiver = new SessionReceiverClient(sessionId, ConnString, SessionQueueName); + // peek the messages + await using var receiver = new SessionReceiverClient(sessionId, TestEnvironment.ServiceBusConnectionString, scope.QueueName); - sequenceNumber ??= 1; - IAsyncEnumerable peekedMessages = receiver.PeekRangeBySequenceAsync( - fromSequenceNumber: (long)sequenceNumber, - maxMessages: messageCt); + sequenceNumber ??= 1; + IAsyncEnumerable peekedMessages = receiver.PeekRangeBySequenceAsync( + fromSequenceNumber: (long)sequenceNumber, + maxMessages: messageCt); - // verify peeked == send - var ct = 0; - await foreach (ServiceBusMessage peekedMessage in peekedMessages) - { - var peekedText = Encoding.Default.GetString(peekedMessage.Body); - var sentMsg = sentMessageIdToMsg[peekedMessage.MessageId]; - - sentMessageIdToMsg.Remove(peekedMessage.MessageId); - Assert.AreEqual(Encoding.Default.GetString(sentMsg.Body), peekedText); - Assert.AreEqual(sentMsg.PartitionKey, peekedMessage.PartitionKey); - Assert.IsTrue(peekedMessage.SystemProperties.SequenceNumber >= sequenceNumber); - TestContext.Progress.WriteLine($"{peekedMessage.Label}: {peekedText}"); - ct++; - } - if (sequenceNumber == 1) - { - Assert.AreEqual(messageCt, ct); + // verify peeked == send + var ct = 0; + await foreach (ServiceBusMessage peekedMessage in peekedMessages) + { + var peekedText = Encoding.Default.GetString(peekedMessage.Body); + var sentMsg = sentMessageIdToMsg[peekedMessage.MessageId]; + + sentMessageIdToMsg.Remove(peekedMessage.MessageId); + Assert.AreEqual(Encoding.Default.GetString(sentMsg.Body), peekedText); + Assert.AreEqual(sentMsg.PartitionKey, peekedMessage.PartitionKey); + Assert.IsTrue(peekedMessage.SystemProperties.SequenceNumber >= sequenceNumber); + TestContext.Progress.WriteLine($"{peekedMessage.Label}: {peekedText}"); + ct++; + } + if (sequenceNumber == 1) + { + Assert.AreEqual(messageCt, ct); + } } } [Test] public async Task PeekMultipleSessions_ShouldThrow() { - var sender = new ServiceBusSenderClient(ConnString, SessionQueueName); - var messageCt = 10; - var sessionId = Guid.NewGuid().ToString(); - // send the messages - IEnumerable sentMessages = GetMessages(messageCt, sessionId); - await sender.SendRangeAsync(sentMessages); - - var receiver1 = new SessionReceiverClient(sessionId, ConnString, SessionQueueName); - var receiver2 = new SessionReceiverClient(sessionId, ConnString, SessionQueueName); - Dictionary sentMessageIdToMsg = new Dictionary(); - - // peek the messages - IAsyncEnumerable peekedMessages1 = receiver1.PeekRangeBySequenceAsync( - fromSequenceNumber: 1, - maxMessages: messageCt); - IAsyncEnumerable peekedMessages2 = receiver2.PeekRangeBySequenceAsync( - fromSequenceNumber: 1, - maxMessages: messageCt); - await peekedMessages1.GetAsyncEnumerator().MoveNextAsync(); - try + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) { - await peekedMessages2.GetAsyncEnumerator().MoveNextAsync(); - } - catch (Exception) - { - return; + await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + var messageCt = 10; + var sessionId = Guid.NewGuid().ToString(); + // send the messages + IEnumerable sentMessages = GetMessages(messageCt, sessionId); + await sender.SendRangeAsync(sentMessages); + + await using var receiver1 = new SessionReceiverClient(sessionId, TestEnvironment.ServiceBusConnectionString, scope.QueueName); + await using var receiver2 = new SessionReceiverClient(sessionId, TestEnvironment.ServiceBusConnectionString, scope.QueueName); + Dictionary sentMessageIdToMsg = new Dictionary(); + + // peek the messages + IAsyncEnumerable peekedMessages1 = receiver1.PeekRangeBySequenceAsync( + fromSequenceNumber: 1, + maxMessages: messageCt); + IAsyncEnumerable peekedMessages2 = receiver2.PeekRangeBySequenceAsync( + fromSequenceNumber: 1, + maxMessages: messageCt); + await peekedMessages1.GetAsyncEnumerator().MoveNextAsync(); + Assert.That(async () => await peekedMessages2.GetAsyncEnumerator().MoveNextAsync(), Throws.Exception); } - Assert.Fail("No exception!"); } [Test] @@ -102,29 +100,31 @@ public async Task PeekMultipleSessions_ShouldThrow() [TestCase(50, 10)] public async Task PeekRange_IncrementsSequenceNmber(int messageCt, int peekCt) { - var sender = new ServiceBusSenderClient(ConnString, SessionQueueName); - var sessionId = Guid.NewGuid().ToString(); - // send the messages - IEnumerable sentMessages = GetMessages(messageCt, sessionId); - await sender.SendRangeAsync(sentMessages); - - var receiver = new SessionReceiverClient(sessionId, ConnString, SessionQueueName); - - - long seq = 0; - for (int i = 0; i < messageCt/peekCt; i++) + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) { - IAsyncEnumerable peekedMessages = receiver.PeekRangeAsync( - maxMessages: peekCt); + var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + var sessionId = Guid.NewGuid().ToString(); + // send the messages + IEnumerable sentMessages = GetMessages(messageCt, sessionId); + await sender.SendRangeAsync(sentMessages); - await foreach (ServiceBusMessage msg in peekedMessages) + await using var receiver = new SessionReceiverClient(sessionId, TestEnvironment.ServiceBusConnectionString, scope.QueueName); + + long seq = 0; + for (int i = 0; i < messageCt / peekCt; i++) { - Assert.IsTrue(msg.SystemProperties.SequenceNumber > seq); - if (seq > 0) + IAsyncEnumerable peekedMessages = receiver.PeekRangeAsync( + maxMessages: peekCt); + + await foreach (ServiceBusMessage msg in peekedMessages) { - Assert.IsTrue(msg.SystemProperties.SequenceNumber == seq + 1); + Assert.IsTrue(msg.SystemProperties.SequenceNumber > seq); + if (seq > 0) + { + Assert.IsTrue(msg.SystemProperties.SequenceNumber == seq + 1); + } + seq = msg.SystemProperties.SequenceNumber; } - seq = msg.SystemProperties.SequenceNumber; } } } @@ -134,60 +134,66 @@ public async Task PeekRange_IncrementsSequenceNmber(int messageCt, int peekCt) [TestCase(50)] public async Task Peek_IncrementsSequenceNmber(int messageCt) { - var sender = new ServiceBusSenderClient(ConnString, SessionQueueName); - var sessionId = Guid.NewGuid().ToString(); - // send the messages - IEnumerable sentMessages = GetMessages(messageCt, sessionId); - await sender.SendRangeAsync(sentMessages); - - var receiver = new SessionReceiverClient(sessionId, ConnString, SessionQueueName); + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) + { + await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + var sessionId = Guid.NewGuid().ToString(); + // send the messages + IEnumerable sentMessages = GetMessages(messageCt, sessionId); + await sender.SendRangeAsync(sentMessages); + await using var receiver = new SessionReceiverClient(sessionId, TestEnvironment.ServiceBusConnectionString, scope.QueueName); - long seq = 0; - for (int i = 0; i < messageCt ; i++) - { - ServiceBusMessage msg = await receiver.PeekAsync(); - Assert.IsTrue(msg.SystemProperties.SequenceNumber > seq); - if (seq > 0) + long seq = 0; + for (int i = 0; i < messageCt; i++) { - Assert.IsTrue(msg.SystemProperties.SequenceNumber == seq + 1); + ServiceBusMessage msg = await receiver.PeekAsync(); + Assert.IsTrue(msg.SystemProperties.SequenceNumber > seq); + if (seq > 0) + { + Assert.IsTrue(msg.SystemProperties.SequenceNumber == seq + 1); + } + seq = msg.SystemProperties.SequenceNumber; } - seq = msg.SystemProperties.SequenceNumber; } } [Test] + [Ignore("Test is currently failing; investigation needed")] public async Task RoundRobinSessions() { - var sender = new ServiceBusSenderClient(ConnString, SessionQueueName); - var messageCt = 10; - HashSet sessions = new HashSet() { "1", "2", "3" }; - - // send the messages - foreach (string session in sessions) + await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) { - await sender.SendRangeAsync(GetMessages(messageCt, session)); - } - - var receiverClient = new QueueReceiverClient(ConnString, SessionQueueName); - var sessionId = ""; - // create receiver not scoped to a specific session - for (int i = 0; i < 10; i++) - { - SessionReceiverClient sessionClient = receiverClient.GetSessionReceiverClient(); - IAsyncEnumerable peekedMessages = sessionClient.PeekRangeBySequenceAsync( - fromSequenceNumber: 1, - maxMessages: 10); + await using var sender = new ServiceBusSenderClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + var messageCt = 10; + HashSet sessions = new HashSet() { "1", "2", "3" }; - await foreach (ServiceBusMessage peekedMessage in peekedMessages) + // send the messages + foreach (string session in sessions) { - Assert.AreEqual(sessionClient.SessionId, peekedMessage.SessionId); + await sender.SendRangeAsync(GetMessages(messageCt, session)); } - TestContext.Progress.WriteLine(sessionId); - sessionId = sessionClient.SessionId; - // Close the session client when we are done with it. Since the sessionClient doesn't own the underlying connection, the connection remains open, but the session link will be closed. - await sessionClient.CloseAsync(); + var receiverClient = new QueueReceiverClient(TestEnvironment.ServiceBusConnectionString, scope.QueueName); + var sessionId = ""; + // create receiver not scoped to a specific session + for (int i = 0; i < 10; i++) + { + SessionReceiverClient sessionClient = receiverClient.GetSessionReceiverClient(); + IAsyncEnumerable peekedMessages = sessionClient.PeekRangeBySequenceAsync( + fromSequenceNumber: 1, + maxMessages: 10); + + await foreach (ServiceBusMessage peekedMessage in peekedMessages) + { + Assert.AreEqual(sessionClient.SessionId, peekedMessage.SessionId); + } + TestContext.Progress.WriteLine(sessionId); + sessionId = sessionClient.SessionId; + + // Close the session client when we are done with it. Since the sessionClient doesn't own the underlying connection, the connection remains open, but the session link will be closed. + await sessionClient.CloseAsync(); + } } } }