diff --git a/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus/Listeners/ServiceBusListener.cs b/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus/Listeners/ServiceBusListener.cs index bccdfd5ce..8d8c20d69 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus/Listeners/ServiceBusListener.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus/Listeners/ServiceBusListener.cs @@ -38,6 +38,8 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider private ClientEntity _clientEntity; private bool _disposed; private bool _started; + // Serialize execution of StopAsync to avoid calling Unregister* concurrently + private readonly SemaphoreSlim _stopAsyncSemaphore = new SemaphoreSlim(1, 1); private IMessageSession _messageSession; private SessionMessageProcessor _sessionMessageProcessor; @@ -118,27 +120,53 @@ public Task StartAsync(CancellationToken cancellationToken) public async Task StopAsync(CancellationToken cancellationToken) { ThrowIfDisposed(); - - if (!_started) + await _stopAsyncSemaphore.WaitAsync(); { - throw new InvalidOperationException("The listener has not yet been started or has already been stopped."); - } + try + { + if (!_started) + { + throw new InvalidOperationException("The listener has not yet been started or has already been stopped."); + } - // cancel our token source to signal any in progress - // ProcessMessageAsync invocations to cancel - _cancellationTokenSource.Cancel(); + // Unregister* methods stop new messages from being processed while allowing in-flight messages to complete. + // As the amount of time functions are allowed to complete processing varies by SKU, we specify max timespan + // as the amount of time Service Bus SDK should wait for in-flight messages to complete procesing after + // unregistering the message handler so that functions have as long as the host continues to run time to complete. + if (_singleDispatch) + { + if (_isSessionsEnabled) + { + if (_clientEntity != null) + { + if (_clientEntity is QueueClient queueClient) + { + await queueClient.UnregisterSessionHandlerAsync(TimeSpan.MaxValue); + } + else + { + SubscriptionClient subscriptionClient = _clientEntity as SubscriptionClient; + await subscriptionClient.UnregisterSessionHandlerAsync(TimeSpan.MaxValue); + } + } + } + else + { + if (_receiver != null && _receiver.IsValueCreated) + { + await Receiver.UnregisterMessageHandlerAsync(TimeSpan.MaxValue); + } + } + } + // Batch processing will be stopped via the _started flag on its next iteration - if (_receiver != null && _receiver.IsValueCreated) - { - await Receiver.CloseAsync(); - _receiver = CreateMessageReceiver(); - } - if (_clientEntity != null) - { - await _clientEntity.CloseAsync(); - _clientEntity = null; + _started = false; + } + finally + { + _stopAsyncSemaphore.Release(); + } } - _started = false; } public void Cancel() @@ -176,6 +204,9 @@ public void Dispose() _clientEntity = null; } + _stopAsyncSemaphore.Dispose(); + _cancellationTokenSource.Dispose(); + _disposed = true; } } @@ -192,37 +223,39 @@ private Lazy CreateSessionClient() internal async Task ProcessMessageAsync(Message message, CancellationToken cancellationToken) { - CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token); - - if (!await _messageProcessor.BeginProcessingMessageAsync(message, linkedCts.Token)) + using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token)) { - return; - } + if (!await _messageProcessor.BeginProcessingMessageAsync(message, linkedCts.Token)) + { + return; + } - ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateSingle(message); - input.MessageReceiver = Receiver; + ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateSingle(message); + input.MessageReceiver = Receiver; - TriggeredFunctionData data = input.GetTriggerFunctionData(); - FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token); - await _messageProcessor.CompleteProcessingMessageAsync(message, result, linkedCts.Token); + TriggeredFunctionData data = input.GetTriggerFunctionData(); + FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token); + await _messageProcessor.CompleteProcessingMessageAsync(message, result, linkedCts.Token); + } } internal async Task ProcessSessionMessageAsync(IMessageSession session, Message message, CancellationToken cancellationToken) { - CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token); - - _messageSession = session; - if (!await _sessionMessageProcessor.BeginProcessingMessageAsync(session, message, linkedCts.Token)) + using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token)) { - return; - } + _messageSession = session; + if (!await _sessionMessageProcessor.BeginProcessingMessageAsync(session, message, linkedCts.Token)) + { + return; + } - ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateSingle(message); - input.MessageReceiver = session; + ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateSingle(message); + input.MessageReceiver = session; - TriggeredFunctionData data = input.GetTriggerFunctionData(); - FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token); - await _sessionMessageProcessor.CompleteProcessingMessageAsync(session, message, result, linkedCts.Token); + TriggeredFunctionData data = input.GetTriggerFunctionData(); + FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token); + await _sessionMessageProcessor.CompleteProcessingMessageAsync(session, message, result, linkedCts.Token); + } } internal void StartMessageBatchReceiver(CancellationToken cancellationToken) @@ -246,6 +279,7 @@ internal void StartMessageBatchReceiver(CancellationToken cancellationToken) { if (!_started || cancellationToken.IsCancellationRequested) { + _logger.LogInformation("Message processing has been stopped or cancelled"); return; } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus/WebJobs.Extensions.ServiceBus.csproj b/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus/WebJobs.Extensions.ServiceBus.csproj index d20542a04..8fc997e3d 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus/WebJobs.Extensions.ServiceBus.csproj +++ b/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus/WebJobs.Extensions.ServiceBus.csproj @@ -6,7 +6,7 @@ Microsoft.Azure.WebJobs.ServiceBus Microsoft.Azure.WebJobs.Extensions.ServiceBus Microsoft Azure WebJobs SDK ServiceBus Extension - 4.1.2 + 4.2.0$(VersionSuffix) N/A $(Version) Commit hash: $(CommitHash) Microsoft @@ -28,6 +28,7 @@ false true + 1701;1702 @@ -37,9 +38,9 @@ - - - + + + all @@ -47,6 +48,7 @@ + diff --git a/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/README.md b/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/README.md new file mode 100644 index 000000000..f25b6393e --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/README.md @@ -0,0 +1,34 @@ +# Service Bus Extension for Azure Functions guide to running integration tests locally +Integration tests are implemented in the `EndToEndTests` and `SessionsEndToEndTests` classes and require special configuration to execute locally in Visual Studio or via dotnet test. + +All configuration is done via a json file called `appsettings.tests` which on windows should be located in the `%USERPROFILE%\.azurefunctions` folder (e.g. `C:\Users\user123\.azurefunctions`) + +**Note:** *The specifics of the configuration will change when the validation code is modified so check the code for the latest configuration if the tests do not pass as this readme file may not have been updated with each code change.* + +Create the appropriate Azure resources if needed as explained below and create or update the `appsettings.tests` file in the location specified above by copying the configuration below and replacing all the `PLACEHOLDER` values + +appsettings.tests contents +``` +{ + "ConnectionStrings": { + "ServiceBus": "PLACEHOLDER", + "ServiceBusSecondary": "PLACEHOLDER" + }, + "AzureWebJobsStorage": "PLACEHOLDER" +} +``` +Create a storage account and configure its connection string into `AzureWebJobsStorage`. This will be used by the webjobs hosts created by the tests. + +Create two service bus namespaces and configure their connection strings in `ConnectionStrings:ServiceBus` and `ConnectionStrings:ServiceBusSecondary`. +1. In the namespace configured into `ConnectionStrings:ServiceBus`, create queues with the following names: + 1. `core-test-queue1` + 2. `core-test-queue2` + 3. `core-test-queue3` + 4. `core-test-queue1-sessions` (enable sessions when creating) +2. In the namespace configured into `ConnectionStrings:ServiceBus`, create topics and subscriptions with the following names: + 1. `core-test-topic1` with two subscriptions: `sub1` and `sub2` + 2. `core-test-topic1-sessions` with one subscription: `sub1-sessions` (enable sessions in the subscription when creating) +2. In the namespace configured into `ConnectionStrings:ServiceBusSecondary`, create queues with the following names: + 1. `core-test-queue1` + + Change the message lock duration setting on all queues and subscriptions to 5 minutes to all for delays associated with stepping through code in debug mode. \ No newline at end of file diff --git a/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/ServiceBusEndToEndTests.cs b/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/ServiceBusEndToEndTests.cs index 286eb1d8f..f23351ab5 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/ServiceBusEndToEndTests.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/ServiceBusEndToEndTests.cs @@ -22,6 +22,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Xunit; +using Xunit.Abstractions; namespace Microsoft.Azure.WebJobs.Host.EndToEndTests { @@ -39,12 +40,20 @@ public class ServiceBusEndToEndTests : IDisposable private const string TopicSubscriptionName2 = "sub2"; private const string TriggerDetailsMessageStart = "Trigger Details:"; + private const string DrainingQueueMessageBody = "queue-message-draining-no-sessions-1"; + private const string DrainingTopicMessageBody = "topic-message-draining-no-sessions-1"; - private const int SBTimeout = 60 * 1000; + private const int SBTimeoutMills = 120 * 1000; + private const int DrainWaitTimeoutMills = 120 * 1000; + private const int DrainSleepMills = 5 * 1000; + private const int MaxAutoRenewDurationMin = 5; + internal static TimeSpan HostShutdownTimeout = TimeSpan.FromSeconds(120); private static EventWaitHandle _topicSubscriptionCalled1; private static EventWaitHandle _topicSubscriptionCalled2; private static EventWaitHandle _eventWait; + private static EventWaitHandle _drainValidationPreDelay; + private static EventWaitHandle _drainValidationPostDelay; // These two variables will be checked at the end of the test private static string _resultMessage1; @@ -54,13 +63,23 @@ public class ServiceBusEndToEndTests : IDisposable private readonly string _primaryConnectionString; private readonly string _secondaryConnectionString; - public ServiceBusEndToEndTests() + private readonly ITestOutputHelper outputLogger; + + public ServiceBusEndToEndTests(ITestOutputHelper output) { + outputLogger = output; + var config = new ConfigurationBuilder() .AddEnvironmentVariables() .AddTestSettings() .Build(); + // Add all test configuration to the environment as WebJobs requires a few of them to be in the environment + foreach (var kv in config.AsEnumerable()) + { + Environment.SetEnvironmentVariable(kv.Key, kv.Value); + } + _eventWait = new ManualResetEvent(initialState: false); _primaryConnectionString = config.GetConnectionStringOrSetting(ServiceBus.Constants.DefaultConnectionStringName); @@ -81,25 +100,29 @@ public async Task ServiceBusEndToEnd() public async Task ServiceBusBinderTest() { var hostType = typeof(ServiceBusTestJobs); - var host = CreateHost(); - var method = typeof(ServiceBusTestJobs).GetMethod("ServiceBusBinderTest"); + using (IHost host = CreateHost()) + { + var method = typeof(ServiceBusTestJobs).GetMethod("ServiceBusBinderTest"); - int numMessages = 10; - var args = new { message = "Test Message", numMessages = numMessages }; - var jobHost = host.GetJobHost(); - await jobHost.CallAsync(method, args); - await jobHost.CallAsync(method, args); - await jobHost.CallAsync(method, args); + int numMessages = 10; + var args = new { message = "Test Message", numMessages = numMessages }; + var jobHost = host.GetJobHost(); + await jobHost.CallAsync(method, args); + await jobHost.CallAsync(method, args); + await jobHost.CallAsync(method, args); - var count = await CleanUpEntity(BinderQueueName); + var count = await CleanUpEntity(BinderQueueName); - Assert.Equal(numMessages * 3, count); + Assert.Equal(numMessages * 3, count); + + await host.StopAsync(); + } } [Fact] public async Task CustomMessageProcessorTest() { - IHost host = new HostBuilder() + using (IHost host = new HostBuilder() .ConfigureDefaultTestHost(b => { b.AddServiceBus(); @@ -108,23 +131,28 @@ public async Task CustomMessageProcessorTest() { services.AddSingleton(); }) - .Build(); - - var loggerProvider = host.GetTestLoggerProvider(); + .ConfigureServices(s => + { + s.Configure(opts => opts.ShutdownTimeout = HostShutdownTimeout); + }) + .Build()) + { + var loggerProvider = host.GetTestLoggerProvider(); - await ServiceBusEndToEndInternal(host: host); + await ServiceBusEndToEndInternal(host: host); - // in addition to verifying that our custom processor was called, we're also - // verifying here that extensions can log - IEnumerable messages = loggerProvider.GetAllLogMessages().Where(m => m.Category == CustomMessagingProvider.CustomMessagingCategory); - Assert.Equal(4, messages.Count(p => p.FormattedMessage.Contains("Custom processor Begin called!"))); - Assert.Equal(4, messages.Count(p => p.FormattedMessage.Contains("Custom processor End called!"))); + // in addition to verifying that our custom processor was called, we're also + // verifying here that extensions can log + IEnumerable messages = loggerProvider.GetAllLogMessages().Where(m => m.Category == CustomMessagingProvider.CustomMessagingCategory); + Assert.Equal(4, messages.Count(p => p.FormattedMessage.Contains("Custom processor Begin called!"))); + Assert.Equal(4, messages.Count(p => p.FormattedMessage.Contains("Custom processor End called!"))); + } } [Fact] public async Task MultipleAccountTest() { - IHost host = new HostBuilder() + using (IHost host = new HostBuilder() .ConfigureDefaultTestHost(b => { b.AddServiceBus(); @@ -133,24 +161,31 @@ public async Task MultipleAccountTest() { services.AddSingleton(); }) - .Build(); - - await WriteQueueMessage(_secondaryConnectionString, FirstQueueName, "Test"); + .ConfigureServices(s => + { + s.Configure(opts => opts.ShutdownTimeout = HostShutdownTimeout); + }) + .Build()) + { + await WriteQueueMessage(_secondaryConnectionString, FirstQueueName, "Test"); - _topicSubscriptionCalled1 = new ManualResetEvent(initialState: false); + _topicSubscriptionCalled1 = new ManualResetEvent(initialState: false); + _topicSubscriptionCalled2 = new ManualResetEvent(initialState: false); - await host.StartAsync(); + await host.StartAsync(); - _topicSubscriptionCalled1.WaitOne(SBTimeout); + _topicSubscriptionCalled1.WaitOne(SBTimeoutMills); + _topicSubscriptionCalled2.WaitOne(SBTimeoutMills); - // ensure all logs have had a chance to flush - await Task.Delay(3000); + // ensure all logs have had a chance to flush + await Task.Delay(3000); - // Wait for the host to terminate - await host.StopAsync(); - host.Dispose(); + // Wait for the host to terminate + await host.StopAsync(); + } Assert.Equal("Test-topic-1", _resultMessage1); + Assert.Equal("Test-topic-2", _resultMessage2); } [Fact] @@ -180,66 +215,162 @@ public async Task TestBatch_DataContractPoco() [Fact] public async Task BindToPoco() { - var host = BuildTestHost(); - - await WriteQueueMessage(_primaryConnectionString, FirstQueueName, "{ Name: 'foo', Value: 'bar' }"); + using (IHost host = BuildTestHost()) + { + await WriteQueueMessage(_primaryConnectionString, FirstQueueName, "{ Name: 'foo', Value: 'bar' }"); - await host.StartAsync(); + await host.StartAsync(); - bool result = _eventWait.WaitOne(SBTimeout); - Assert.True(result); + bool result = _eventWait.WaitOne(SBTimeoutMills); + Assert.True(result); - var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); - Assert.Contains("PocoValues(foo,bar)", logs); + var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); + Assert.Contains("PocoValues(foo,bar)", logs); - await host.StopAsync(); - host.Dispose(); + await host.StopAsync(); + } } [Fact] public async Task BindToString() { - var host = BuildTestHost(); + using (IHost host = BuildTestHost()) + { + var method = typeof(ServiceBusArgumentBindingJob).GetMethod(nameof(ServiceBusArgumentBindingJob.BindToString), BindingFlags.Static | BindingFlags.Public); + var jobHost = host.GetJobHost(); + await jobHost.CallAsync(method, new { input = "foobar" }); - var method = typeof(ServiceBusArgumentBindingJob).GetMethod(nameof(ServiceBusArgumentBindingJob.BindToString), BindingFlags.Static | BindingFlags.Public); - var jobHost = host.GetJobHost(); - await jobHost.CallAsync(method, new { input = "foobar" }); + bool result = _eventWait.WaitOne(SBTimeoutMills); + Assert.True(result); - bool result = _eventWait.WaitOne(SBTimeout); - Assert.True(result); + var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); + Assert.Contains("Input(foobar)", logs); - var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); - Assert.Contains("Input(foobar)", logs); + await host.StopAsync(); + } } - private async Task TestMultiple(bool isXml = false) + [Fact] + public async Task MessageDrainingQueue() + { + await TestSingleDrainMode(true); + } + + [Fact] + public async Task MessageDrainingTopic() + { + await TestSingleDrainMode(false); + } + + [Fact] + public async Task MessageDrainingQueueBatch() { - IHost host = BuildTestHost(); + await TestMultipleDrainMode(true); + } + + [Fact] + public async Task MessageDrainingTopicBatch() + { + await TestMultipleDrainMode(false); + } + + /* + * Helper functions + */ - if (isXml) + private async Task TestSingleDrainMode(bool sendToQueue) + { + using (IHost host = BuildTestHostMessageDraining()) { - await WriteQueueMessage(_primaryConnectionString, FirstQueueName, new TestPoco() { Name = "Test1", Value = "Value" }); - await WriteQueueMessage(_primaryConnectionString, FirstQueueName, new TestPoco() { Name = "Test2", Value = "Value" }); + await host.StartAsync(); + + _drainValidationPreDelay = new ManualResetEvent(initialState: false); + _drainValidationPostDelay = new ManualResetEvent(initialState: false); + + if (sendToQueue) + { + await WriteQueueMessage(_primaryConnectionString, FirstQueueName, DrainingQueueMessageBody); + } + else + { + await WriteTopicMessage(_primaryConnectionString, TopicName, DrainingTopicMessageBody); + } + + // Wait to ensure function invocatoin has started before draining messages + Assert.True(_drainValidationPreDelay.WaitOne(SBTimeoutMills)); + + // Start draining in-flight messages + var drainModeManager = host.Services.GetService(); + await drainModeManager.EnableDrainModeAsync(CancellationToken.None); + + // Validate that function execution was allowed to complete + Assert.True(_drainValidationPostDelay.WaitOne(DrainWaitTimeoutMills + SBTimeoutMills)); + + await host.StopAsync(); } - else + } + + private async Task TestMultiple(bool isXml = false) + { + using (IHost host = BuildTestHost()) { - await WriteQueueMessage(_primaryConnectionString, FirstQueueName, "{'Name': 'Test1', 'Value': 'Value'}"); - await WriteQueueMessage(_primaryConnectionString, FirstQueueName, "{'Name': 'Test2', 'Value': 'Value'}"); + if (isXml) + { + await WriteQueueMessage(_primaryConnectionString, FirstQueueName, new TestPoco() { Name = "Test1", Value = "Value" }); + await WriteQueueMessage(_primaryConnectionString, FirstQueueName, new TestPoco() { Name = "Test2", Value = "Value" }); + } + else + { + await WriteQueueMessage(_primaryConnectionString, FirstQueueName, "{'Name': 'Test1', 'Value': 'Value'}"); + await WriteQueueMessage(_primaryConnectionString, FirstQueueName, "{'Name': 'Test2', 'Value': 'Value'}"); + } + + _topicSubscriptionCalled1 = new ManualResetEvent(initialState: false); + + await host.StartAsync(); + + bool result = _topicSubscriptionCalled1.WaitOne(SBTimeoutMills); + Assert.True(result); + + // ensure message are completed + await Task.Delay(2000); + + // Wait for the host to terminate + await host.StopAsync(); } + } - _topicSubscriptionCalled1 = new ManualResetEvent(initialState: false); + private async Task TestMultipleDrainMode(bool sendToQueue) + { + using (IHost host = BuildTestHostMessageDraining()) + { + await host.StartAsync(); - await host.StartAsync(); + _drainValidationPreDelay = new ManualResetEvent(initialState: false); + _drainValidationPostDelay = new ManualResetEvent(initialState: false); + + if (sendToQueue) + { + await ServiceBusEndToEndTests.WriteQueueMessage(_primaryConnectionString, FirstQueueName, DrainingQueueMessageBody); + } + else + { + await ServiceBusEndToEndTests.WriteTopicMessage(_primaryConnectionString, TopicName, DrainingTopicMessageBody); + } - bool result = _topicSubscriptionCalled1.WaitOne(SBTimeout); - Assert.True(result); + // Wait to ensure function invocatoin has started before draining messages + Assert.True(_drainValidationPreDelay.WaitOne(SBTimeoutMills)); - // ensure message are completed - await Task.Delay(2000); + // Start draining in-flight messages + var drainModeManager = host.Services.GetService(); + await drainModeManager.EnableDrainModeAsync(CancellationToken.None); - // Wait for the host to terminate - await host.StopAsync(); - host.Dispose(); + // Validate that function execution was allowed to complete + Assert.True(_drainValidationPostDelay.WaitOne(DrainWaitTimeoutMills + SBTimeoutMills)); + + // Wait for the host to terminate + await host.StopAsync(); + } } private async Task CleanUpEntity(string queueName, string connectionString = null) @@ -292,12 +423,17 @@ private IHost CreateHost() { services.AddSingleton(_nameResolver); }) + .ConfigureServices(s => + { + s.Configure(opts => opts.ShutdownTimeout = HostShutdownTimeout); + }) .Build(); } private async Task ServiceBusEndToEndInternal(IHost host = null) { - if (host == null) + bool hostSupplied = (host != null); + if (!hostSupplied) { host = CreateHost(); } @@ -309,122 +445,134 @@ private async Task ServiceBusEndToEndInternal(IHost host = null) _topicSubscriptionCalled1 = new ManualResetEvent(initialState: false); _topicSubscriptionCalled2 = new ManualResetEvent(initialState: false); - using (host) - { - await host.StartAsync(); + await host.StartAsync(); - _topicSubscriptionCalled1.WaitOne(SBTimeout); - _topicSubscriptionCalled2.WaitOne(SBTimeout); + _topicSubscriptionCalled1.WaitOne(SBTimeoutMills); + _topicSubscriptionCalled2.WaitOne(SBTimeoutMills); - // ensure all logs have had a chance to flush - await Task.Delay(4000); + // ensure all logs have had a chance to flush + await Task.Delay(4000); - // Wait for the host to terminate - await host.StopAsync(); + // Wait for the host to terminate + await host.StopAsync(); - Assert.Equal("E2E-SBQueue2SBQueue-SBQueue2SBTopic-topic-1", _resultMessage1); - Assert.Equal("E2E-SBQueue2SBQueue-SBQueue2SBTopic-topic-2", _resultMessage2); - - IEnumerable logMessages = host.GetTestLoggerProvider() - .GetAllLogMessages(); - - // filter out anything from the custom processor for easier validation. - IEnumerable consoleOutput = logMessages - .Where(m => m.Category != CustomMessagingProvider.CustomMessagingCategory); - - Assert.DoesNotContain(consoleOutput, p => p.Level == LogLevel.Error); - - string[] consoleOutputLines = consoleOutput - .Where(p => p.FormattedMessage != null) - .SelectMany(p => p.FormattedMessage.Split(Environment.NewLine, StringSplitOptions.RemoveEmptyEntries)) - .OrderBy(p => p) - .ToArray(); - - string[] expectedOutputLines = new string[] - { - "Found the following functions:", - $"{jobContainerType.FullName}.SBQueue2SBQueue", - $"{jobContainerType.FullName}.MultipleAccounts", - $"{jobContainerType.FullName}.SBQueue2SBTopic", - $"{jobContainerType.FullName}.SBTopicListener1", - $"{jobContainerType.FullName}.SBTopicListener2", - $"{jobContainerType.FullName}.ServiceBusBinderTest", - "Job host started", - $"Executing '{jobContainerType.Name}.SBQueue2SBQueue' (Reason='', Id=", - $"Executed '{jobContainerType.Name}.SBQueue2SBQueue' (Succeeded, Id=", - $"Trigger Details:", - $"Executing '{jobContainerType.Name}.SBQueue2SBTopic' (Reason='', Id=", - $"Executed '{jobContainerType.Name}.SBQueue2SBTopic' (Succeeded, Id=", - $"Trigger Details:", - $"Executing '{jobContainerType.Name}.SBTopicListener1' (Reason='', Id=", - $"Executed '{jobContainerType.Name}.SBTopicListener1' (Succeeded, Id=", - $"Trigger Details:", - $"Executing '{jobContainerType.Name}.SBTopicListener2' (Reason='', Id=", - $"Executed '{jobContainerType.Name}.SBTopicListener2' (Succeeded, Id=", - $"Trigger Details:", - "Job host stopped", - "Starting JobHost", - "Stopping JobHost", - "FunctionResultAggregatorOptions", - "{", - " \"BatchSize\": 1000", - " \"FlushTimeout\": \"00:00:30\",", - " \"IsEnabled\": true", - "}", - "LoggerFilterOptions", - "{", - " \"MinLevel\": \"Information\"", - " \"Rules\": []", - "}", - "ServiceBusOptions", - "{", - " \"PrefetchCount\": 0,", - " \"MessageHandlerOptions\": {", - " \"AutoComplete\": true,", - " \"MaxAutoRenewDuration\": \"00:05:00\",", - $" \"MaxConcurrentCalls\": {16 * Utility.GetProcessorCount()}", - " }", - " \"SessionHandlerOptions\": {", - " \"MaxAutoRenewDuration\": \"00:05:00\",", - " \"MessageWaitTimeout\": \"00:01:00\",", - " \"MaxConcurrentSessions\": 2000", - " \"AutoComplete\": true", - " }", - "}", - " \"BatchOptions\": {", - " \"MaxMessageCount\": 1000,", - " \"OperationTimeout\": \"00:01:00\",", - " \"AutoComplete\": true", - " }", - "SingletonOptions", - "{", - " \"ListenerLockPeriod\": \"00:01:00\"", - " \"ListenerLockRecoveryPollingInterval\": \"00:01:00\"", - " \"LockAcquisitionPollingInterval\": \"00:00:05\"", - " \"LockAcquisitionTimeout\": \"", - " \"LockPeriod\": \"00:00:15\"", - "}", - }.OrderBy(p => p).ToArray(); - - expectedOutputLines = expectedOutputLines.Select(x => x.Replace(" ", string.Empty)).ToArray(); - consoleOutputLines = consoleOutputLines.Select(x => x.Replace(" ", string.Empty)).ToArray(); - - Action[] inspectors = expectedOutputLines.Select>(p => (string m) => - { - Assert.True(p.StartsWith(m) || m.StartsWith(p)); - }).ToArray(); - Assert.Collection(consoleOutputLines, inspectors); + Assert.Equal("E2E-SBQueue2SBQueue-SBQueue2SBTopic-topic-1", _resultMessage1); + Assert.Equal("E2E-SBQueue2SBQueue-SBQueue2SBTopic-topic-2", _resultMessage2); - // Verify that trigger details are properly formatted - string[] triggerDetailsConsoleOutput = consoleOutputLines - .Where(m => m.StartsWith(TriggerDetailsMessageStart)).ToArray(); + IEnumerable logMessages = host.GetTestLoggerProvider() + .GetAllLogMessages(); - string expectedPattern = "Trigger Details: MessageId: (.*), DeliveryCount: [0-9]+, EnqueuedTime: (.*), LockedUntil: (.*)"; + // filter out anything from the custom processor for easier validation. + IEnumerable consoleOutput = logMessages + .Where(m => m.Category != CustomMessagingProvider.CustomMessagingCategory); - foreach (string msg in triggerDetailsConsoleOutput) - { - Assert.True(Regex.IsMatch(msg, expectedPattern), $"Expected trace event {expectedPattern} not found."); - } + Assert.DoesNotContain(consoleOutput, p => p.Level == LogLevel.Error); + + string[] consoleOutputLines = consoleOutput + .Where(p => p.FormattedMessage != null) + .SelectMany(p => p.FormattedMessage.Split(Environment.NewLine, StringSplitOptions.RemoveEmptyEntries)) + .OrderBy(p => p) + .ToArray(); + + string[] expectedOutputLines = new string[] + { + "Found the following functions:", + $"{jobContainerType.FullName}.SBQueue2SBQueue", + $"{jobContainerType.FullName}.MultipleAccounts", + $"{jobContainerType.FullName}.SBQueue2SBTopic", + $"{jobContainerType.FullName}.SBTopicListener1", + $"{jobContainerType.FullName}.SBTopicListener2", + $"{jobContainerType.FullName}.ServiceBusBinderTest", + "Job host started", + $"Executing '{jobContainerType.Name}.SBQueue2SBQueue'", + $"Executed '{jobContainerType.Name}.SBQueue2SBQueue' (Succeeded, Id=", + $"Trigger Details:", + $"Executing '{jobContainerType.Name}.SBQueue2SBTopic'", + $"Executed '{jobContainerType.Name}.SBQueue2SBTopic' (Succeeded, Id=", + $"Trigger Details:", + $"Executing '{jobContainerType.Name}.SBTopicListener1'", + $"Executed '{jobContainerType.Name}.SBTopicListener1' (Succeeded, Id=", + $"Trigger Details:", + $"Executing '{jobContainerType.Name}.SBTopicListener2'", + $"Executed '{jobContainerType.Name}.SBTopicListener2' (Succeeded, Id=", + $"Trigger Details:", + "Job host stopped", + "Starting JobHost", + "Stopping JobHost", + "Stoppingthelistener'Microsoft.Azure.WebJobs.ServiceBus.Listeners.ServiceBusListener'forfunction'MultipleAccounts'", + "Stoppedthelistener'Microsoft.Azure.WebJobs.ServiceBus.Listeners.ServiceBusListener'forfunction'MultipleAccounts'", + "Stoppingthelistener'Microsoft.Azure.WebJobs.ServiceBus.Listeners.ServiceBusListener'forfunction'SBQueue2SBQueue'", + "Stoppedthelistener'Microsoft.Azure.WebJobs.ServiceBus.Listeners.ServiceBusListener'forfunction'SBQueue2SBQueue'", + "Stoppingthelistener'Microsoft.Azure.WebJobs.ServiceBus.Listeners.ServiceBusListener'forfunction'SBQueue2SBTopic'", + "Stoppedthelistener'Microsoft.Azure.WebJobs.ServiceBus.Listeners.ServiceBusListener'forfunction'SBQueue2SBTopic'", + "Stoppingthelistener'Microsoft.Azure.WebJobs.ServiceBus.Listeners.ServiceBusListener'forfunction'SBTopicListener1'", + "Stoppedthelistener'Microsoft.Azure.WebJobs.ServiceBus.Listeners.ServiceBusListener'forfunction'SBTopicListener1'", + "Stoppingthelistener'Microsoft.Azure.WebJobs.ServiceBus.Listeners.ServiceBusListener'forfunction'SBTopicListener2'", + "Stoppedthelistener'Microsoft.Azure.WebJobs.ServiceBus.Listeners.ServiceBusListener'forfunction'SBTopicListener2'", + "FunctionResultAggregatorOptions", + "{", + " \"BatchSize\": 1000", + " \"FlushTimeout\": \"00:00:30\",", + " \"IsEnabled\": true", + "}", + "LoggerFilterOptions", + "{", + " \"MinLevel\": \"Information\"", + " \"Rules\": []", + "}", + "ServiceBusOptions", + "{", + " \"PrefetchCount\": 0,", + " \"MessageHandlerOptions\": {", + " \"AutoComplete\": true,", + " \"MaxAutoRenewDuration\": \"00:05:00\",", + $" \"MaxConcurrentCalls\": {16 * Utility.GetProcessorCount()}", + " }", + " \"SessionHandlerOptions\": {", + " \"MaxAutoRenewDuration\": \"00:05:00\",", + " \"MessageWaitTimeout\": \"00:01:00\",", + " \"MaxConcurrentSessions\": 2000", + " \"AutoComplete\": true", + " }", + "}", + " \"BatchOptions\": {", + " \"MaxMessageCount\": 1000,", + " \"OperationTimeout\": \"00:01:00\",", + " \"AutoComplete\": true", + " }", + "SingletonOptions", + "{", + " \"ListenerLockPeriod\": \"00:01:00\"", + " \"ListenerLockRecoveryPollingInterval\": \"00:01:00\"", + " \"LockAcquisitionPollingInterval\": \"00:00:05\"", + " \"LockAcquisitionTimeout\": \"", + " \"LockPeriod\": \"00:00:15\"", + "}", + }.OrderBy(p => p).ToArray(); + + expectedOutputLines = expectedOutputLines.Select(x => x.Replace(" ", string.Empty)).ToArray(); + consoleOutputLines = consoleOutputLines.Select(x => x.Replace(" ", string.Empty)).ToArray(); + + Action[] inspectors = expectedOutputLines.Select>(p => (string m) => + { + Assert.True(p.StartsWith(m) || m.StartsWith(p)); + }).ToArray(); + Assert.Collection(consoleOutputLines, inspectors); + + // Verify that trigger details are properly formatted + string[] triggerDetailsConsoleOutput = consoleOutputLines + .Where(m => m.StartsWith(TriggerDetailsMessageStart)).ToArray(); + + string expectedPattern = "Trigger Details: MessageId: (.*), DeliveryCount: [0-9]+, EnqueuedTime: (.*), LockedUntil: (.*)"; + + foreach (string msg in triggerDetailsConsoleOutput) + { + Assert.True(Regex.IsMatch(msg, expectedPattern), $"Expected trace event {expectedPattern} not found."); + } + + if (!hostSupplied) + { + host.Dispose(); } } @@ -691,6 +839,97 @@ public static void BindToString( } } + public class DrainModeTestJobQueue + { + public async static Task QueueNoSessions( + [ServiceBusTrigger(FirstQueueName)] Message msg, + MessageReceiver messageReceiver, + CancellationToken cancellationToken, + ILogger logger) + { + logger.LogInformation($"DrainModeValidationFunctions.QueueNoSessions: message data {msg.Body}"); + _drainValidationPreDelay.Set(); + await DrainModeHelper.WaitForCancellation(cancellationToken); + Assert.True(cancellationToken.IsCancellationRequested); + await messageReceiver.CompleteAsync(msg.SystemProperties.LockToken); + _drainValidationPostDelay.Set(); + } + } + + public class DrainModeTestJobTopic + { + public async static Task TopicNoSessions( + [ServiceBusTrigger(TopicName, TopicSubscriptionName1)] Message msg, + MessageReceiver messageReceiver, + CancellationToken cancellationToken, + ILogger logger) + { + logger.LogInformation($"DrainModeValidationFunctions.NoSessions: message data {msg.Body}"); + _drainValidationPreDelay.Set(); + await DrainModeHelper.WaitForCancellation(cancellationToken); + Assert.True(cancellationToken.IsCancellationRequested); + await messageReceiver.CompleteAsync(msg.SystemProperties.LockToken); + _drainValidationPostDelay.Set(); + } + } + + public class DrainModeTestJobQueueBatch + { + public async static Task QueueNoSessionsBatch( + [ServiceBusTrigger(FirstQueueName)] Message[] array, + MessageReceiver messageReceiver, + CancellationToken cancellationToken, + ILogger logger) + { + Assert.True(array.Length > 0); + logger.LogInformation($"DrainModeTestJobBatch.QueueNoSessionsBatch: received {array.Length} messages"); + _drainValidationPreDelay.Set(); + await DrainModeHelper.WaitForCancellation(cancellationToken); + Assert.True(cancellationToken.IsCancellationRequested); + foreach (Message msg in array) + { + await messageReceiver.CompleteAsync(msg.SystemProperties.LockToken); + } + _drainValidationPostDelay.Set(); + } + } + + public class DrainModeTestJobTopicBatch + { + public async static Task TopicNoSessionsBatch( + [ServiceBusTrigger(TopicName, TopicSubscriptionName1)] Message[] array, + MessageReceiver messageReceiver, + CancellationToken cancellationToken, + ILogger logger) + { + Assert.True(array.Length > 0); + logger.LogInformation($"DrainModeTestJobBatch.TopicNoSessionsBatch: received {array.Length} messages"); + _drainValidationPreDelay.Set(); + await DrainModeHelper.WaitForCancellation(cancellationToken); + Assert.True(cancellationToken.IsCancellationRequested); + foreach (Message msg in array) + { + await messageReceiver.CompleteAsync(msg.SystemProperties.LockToken); + } + _drainValidationPostDelay.Set(); + } + } + + public class DrainModeHelper + { + public async static Task WaitForCancellation(CancellationToken cancellationToken) + { + // Wait until the drain operation begins, signalled by the cancellation token + int elapsedTimeMills = 0; + while (elapsedTimeMills < DrainWaitTimeoutMills && !cancellationToken.IsCancellationRequested) + { + await Task.Delay(elapsedTimeMills += 500); + } + // Allow some time for the Service Bus SDK to start draining before returning + await Task.Delay(DrainSleepMills); + } + } + private class CustomMessagingProvider : MessagingProvider { public const string CustomMessagingCategory = "CustomMessagingProvider"; @@ -709,7 +948,7 @@ public override MessageProcessor CreateMessageProcessor(string entityPath, strin var options = new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 3, - MaxAutoRenewDuration = TimeSpan.FromMinutes(1) + MaxAutoRenewDuration = TimeSpan.FromMinutes(MaxAutoRenewDurationMin) }; var messageReceiver = new MessageReceiver(_options.ConnectionString, entityPath); @@ -753,6 +992,33 @@ private IHost BuildTestHost() { b.AddServiceBus(); }, nameResolver: _nameResolver) + .ConfigureServices(s => + { + s.Configure(opts => opts.ShutdownTimeout = HostShutdownTimeout); + }) + .Build(); + + return host; + } + + private IHost BuildTestHostMessageDraining() + { + IHost host = new HostBuilder() + .ConfigureDefaultTestHost(b => + { + b.AddServiceBus(sbOptions => + { + // We want to ensure messages can be completed in the function code before signaling success to the test + sbOptions.MessageHandlerOptions.AutoComplete = false; + sbOptions.BatchOptions.AutoComplete = false; + sbOptions.MessageHandlerOptions.MaxAutoRenewDuration = TimeSpan.FromMinutes(MaxAutoRenewDurationMin); + sbOptions.MessageHandlerOptions.MaxConcurrentCalls = 1; + }); + }, nameResolver: _nameResolver) + .ConfigureServices(s => + { + s.Configure(opts => opts.ShutdownTimeout = HostShutdownTimeout); + }) .Build(); return host; diff --git a/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/ServiceBusSessionsEndToEndTests.cs b/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/ServiceBusSessionsEndToEndTests.cs index dad86b594..4fdfaf955 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/ServiceBusSessionsEndToEndTests.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/ServiceBusSessionsEndToEndTests.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Text; @@ -19,6 +20,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Xunit; +using Xunit.Abstractions; namespace Microsoft.Azure.WebJobs.Host.EndToEndTests { @@ -28,19 +30,37 @@ public class ServiceBusSessionsBusEndToEndTests : IDisposable private const string _queueName = _prefix + "queue1-sessions"; private const string _topicName = _prefix + "topic1-sessions"; private const string _subscriptionName = "sub1-sessions"; + private const string _drainModeSessionId = "drain-session"; + private const string DrainingQueueMessageBody = "queue-message-draining-with-sessions-1"; + private const string DrainingTopicMessageBody = "topic-message-draining-with-sessions-1"; private static EventWaitHandle _waitHandle1; private static EventWaitHandle _waitHandle2; + private static EventWaitHandle _drainValidationPreDelay; + private static EventWaitHandle _drainValidationPostDelay; private readonly RandomNameResolver _nameResolver; - private const int SBTimeout = 120 * 1000; + private const int SBTimeoutMills = 120 * 1000; + private const int DrainWaitTimeoutMills = 120 * 1000; + private const int DrainSleepMills = 5 * 1000; + public const int MaxAutoRenewDurationMin = 5; + internal static TimeSpan HostShutdownTimeout = TimeSpan.FromSeconds(120); private readonly string _connectionString; - public ServiceBusSessionsBusEndToEndTests() + private readonly ITestOutputHelper outputLogger; + + public ServiceBusSessionsBusEndToEndTests(ITestOutputHelper output) { + outputLogger = output; + var config = new ConfigurationBuilder() .AddEnvironmentVariables() .AddTestSettings() .Build(); + // Add all test configuration to the environment as WebJobs requires a few of them to be in the environment + foreach (var kv in config.AsEnumerable()) + { + Environment.SetEnvironmentVariable(kv.Key, kv.Value); + } _connectionString = config.GetConnectionStringOrSetting(ServiceBus.Constants.DefaultConnectionStringName); _nameResolver = new RandomNameResolver(); @@ -63,7 +83,7 @@ public async Task ServiceBusSessionQueue_OrderGuaranteed() await ServiceBusEndToEndTests.WriteQueueMessage(_connectionString, _queueName, "message4", "test-session1"); await ServiceBusEndToEndTests.WriteQueueMessage(_connectionString, _queueName, "message5", "test-session1"); - Assert.True(_waitHandle1.WaitOne(SBTimeout)); + Assert.True(_waitHandle1.WaitOne(SBTimeoutMills)); IEnumerable logMessages = host.GetTestLoggerProvider().GetAllLogMessages(); @@ -77,6 +97,8 @@ public async Task ServiceBusSessionQueue_OrderGuaranteed() { Assert.True(logMessage.FormattedMessage.StartsWith("message" + i++)); } + + await host.StopAsync(); } } @@ -95,7 +117,7 @@ public async Task ServiceBusSessionTopicSubscription_OrderGuaranteed() await ServiceBusEndToEndTests.WriteTopicMessage(_connectionString, _topicName, "message4", "test-session1"); await ServiceBusEndToEndTests.WriteTopicMessage(_connectionString, _topicName, "message5", "test-session1"); - Assert.True(_waitHandle1.WaitOne(SBTimeout)); + Assert.True(_waitHandle1.WaitOne(SBTimeoutMills)); IEnumerable logMessages = host.GetTestLoggerProvider().GetAllLogMessages(); @@ -109,13 +131,14 @@ public async Task ServiceBusSessionTopicSubscription_OrderGuaranteed() { Assert.True(logMessage.FormattedMessage.StartsWith("message" + i++)); } + + await host.StopAsync(); } } [Fact] public async Task ServiceBusSessionQueue_DifferentHosts_DifferentSessions() { - using (var host1 = ServiceBusSessionsTestHelper.CreateHost(_nameResolver, true)) using (var host2 = ServiceBusSessionsTestHelper.CreateHost(_nameResolver, true)) { @@ -140,8 +163,8 @@ public async Task ServiceBusSessionQueue_DifferentHosts_DifferentSessions() await ServiceBusEndToEndTests.WriteQueueMessage(_connectionString, _queueName, "message5", "test-session1"); await ServiceBusEndToEndTests.WriteQueueMessage(_connectionString, _queueName, "message5", "test-session2"); - Assert.True(_waitHandle1.WaitOne(SBTimeout)); - Assert.True(_waitHandle2.WaitOne(SBTimeout)); + Assert.True(_waitHandle1.WaitOne(SBTimeoutMills)); + Assert.True(_waitHandle2.WaitOne(SBTimeoutMills)); IEnumerable logMessages1 = host1.GetTestLoggerProvider().GetAllLogMessages(); List consoleOutput1 = logMessages1.Where(m => m.Category == "Function.SBQueue1Trigger.User").ToList(); @@ -162,6 +185,13 @@ public async Task ServiceBusSessionQueue_DifferentHosts_DifferentSessions() { Assert.Equal(sessionId2, m.FormattedMessage[m.FormattedMessage.Length - 1]); } + + List tasks = new List + { + host1.StopAsync(), + host2.StopAsync() + }; + Task.WaitAll(tasks.ToArray()); } } @@ -192,8 +222,8 @@ public async Task ServiceBusSessionSub_DifferentHosts_DifferentSessions() await ServiceBusEndToEndTests.WriteTopicMessage(_connectionString, _topicName, "message5", "test-session1"); await ServiceBusEndToEndTests.WriteTopicMessage(_connectionString, _topicName, "message5", "test-session2"); - Assert.True(_waitHandle1.WaitOne(SBTimeout)); - Assert.True(_waitHandle2.WaitOne(SBTimeout)); + Assert.True(_waitHandle1.WaitOne(SBTimeoutMills)); + Assert.True(_waitHandle2.WaitOne(SBTimeoutMills)); IEnumerable logMessages1 = host1.GetTestLoggerProvider().GetAllLogMessages(); List consoleOutput1 = logMessages1.Where(m => m.Category == "Function.SBSub1Trigger.User").ToList(); @@ -215,6 +245,13 @@ public async Task ServiceBusSessionSub_DifferentHosts_DifferentSessions() { Assert.Equal(sessionId2, m.FormattedMessage[m.FormattedMessage.Length - 1]); } + + List tasks = new List + { + host1.StopAsync(), + host2.StopAsync() + }; + Task.WaitAll(tasks.ToArray()); } } @@ -243,8 +280,8 @@ public async Task ServiceBusSessionQueue_SessionLocks() await ServiceBusEndToEndTests.WriteQueueMessage(_connectionString, _queueName, "message5", "test-session1"); await ServiceBusEndToEndTests.WriteQueueMessage(_connectionString, _queueName, "message5", "test-session2"); - Assert.True(_waitHandle1.WaitOne(SBTimeout)); - Assert.True(_waitHandle2.WaitOne(SBTimeout)); + Assert.True(_waitHandle1.WaitOne(SBTimeoutMills)); + Assert.True(_waitHandle2.WaitOne(SBTimeoutMills)); IEnumerable logMessages1 = host.GetTestLoggerProvider().GetAllLogMessages(); @@ -266,6 +303,8 @@ public async Task ServiceBusSessionQueue_SessionLocks() consoleOutput1[5].FormattedMessage[consoleOutput1[0].FormattedMessage.Length - 1]); } } + + await host.StopAsync(); } } @@ -294,8 +333,8 @@ public async Task ServiceBusSessionSub_SessionLocks() await ServiceBusEndToEndTests.WriteTopicMessage(_connectionString, _topicName, "message5", "test-session1"); await ServiceBusEndToEndTests.WriteTopicMessage(_connectionString, _topicName, "message5", "test-session2"); - Assert.True(_waitHandle1.WaitOne(SBTimeout)); - Assert.True(_waitHandle2.WaitOne(SBTimeout)); + Assert.True(_waitHandle1.WaitOne(SBTimeoutMills)); + Assert.True(_waitHandle2.WaitOne(SBTimeoutMills)); IEnumerable logMessages1 = host.GetTestLoggerProvider().GetAllLogMessages(); @@ -317,6 +356,8 @@ public async Task ServiceBusSessionSub_SessionLocks() consoleOutput1[5].FormattedMessage[consoleOutput1[0].FormattedMessage.Length - 1]); } } + + await host.StopAsync(); } } @@ -344,14 +385,71 @@ public async Task TestBatch_DataContractPoco() await TestMultiple(true); } + [Fact] + public async Task MessageDraining_QueueWithSessions() + { + await TestSingleDrainMode(true); + } + + [Fact] + public async Task MessageDraining_TopicWithSessions() + { + await TestSingleDrainMode(false); + } + + [Fact] + public async Task MessageDraining_QueueWithSessions_Batch() + { + await TestMultipleDrainMode(true); + } + + [Fact] + public async Task MessageDraining_TopicWithSessions_Batch() + { + await TestMultipleDrainMode(false); + } + + /* + * Helper functions + */ + + private async Task TestSingleDrainMode(bool sendToQueue) + { + _drainValidationPreDelay = new ManualResetEvent(initialState: false); + _drainValidationPostDelay = new ManualResetEvent(initialState: false); + + if (sendToQueue) + { + await ServiceBusEndToEndTests.WriteQueueMessage( + _connectionString, _queueName, DrainingQueueMessageBody, _drainModeSessionId); + } + else + { + await ServiceBusEndToEndTests.WriteTopicMessage( + _connectionString, _topicName, DrainingTopicMessageBody, _drainModeSessionId); + } + + using (IHost host = ServiceBusSessionsTestHelper.CreateHost(_nameResolver, false, false)) + { + await host.StartAsync(); + + // Wait to ensure function invocatoin has started before draining messages + Assert.True(_drainValidationPreDelay.WaitOne(SBTimeoutMills)); + + // Start draining in-flight messages + var drainModeManager = host.Services.GetService(); + await drainModeManager.EnableDrainModeAsync(CancellationToken.None); + + // Validate that function execution was allowed to complete + Assert.True(_drainValidationPostDelay.WaitOne(DrainWaitTimeoutMills + SBTimeoutMills)); + + await host.StopAsync(); + } + } + private async Task TestMultiple(bool isXml = false) { - IHost host = new HostBuilder() - .ConfigureDefaultTestHost(b => - { - b.AddServiceBus(); - }, nameResolver: _nameResolver) - .Build(); + _waitHandle1 = new ManualResetEvent(initialState: false); if (isXml) { @@ -364,19 +462,52 @@ private async Task TestMultiple(bool isXml = false) await ServiceBusEndToEndTests.WriteQueueMessage(_connectionString, _queueName, "{'Name': 'Test2', 'Value': 'Value'}", "sessionId"); } - _waitHandle1 = new ManualResetEvent(initialState: false); + using (IHost host = ServiceBusSessionsTestHelper.CreateHost(_nameResolver)) + { + await host.StartAsync(); - await host.StartAsync(); + bool result = _waitHandle1.WaitOne(SBTimeoutMills); + Assert.True(result); - bool result = _waitHandle1.WaitOne(SBTimeout); - Assert.True(result); + // ensure message are completed + await Task.Delay(2000); - // ensure message are completed - await Task.Delay(2000); + // Wait for the host to terminate + await host.StopAsync(); + } + } - // Wait for the host to terminate - await host.StopAsync(); - host.Dispose(); + private async Task TestMultipleDrainMode(bool sendToQueue) + { + _drainValidationPreDelay = new ManualResetEvent(initialState: false); + _drainValidationPostDelay = new ManualResetEvent(initialState: false); + + if (sendToQueue) + { + await ServiceBusEndToEndTests.WriteQueueMessage(_connectionString, _queueName, DrainingQueueMessageBody, _drainModeSessionId); + } + else + { + await ServiceBusEndToEndTests.WriteTopicMessage(_connectionString, _topicName, DrainingTopicMessageBody, _drainModeSessionId); + } + + using (IHost host = ServiceBusSessionsTestHelper.CreateHost(_nameResolver, false, false)) + { + await host.StartAsync(); + + // Wait to ensure function invocatoin has started before draining messages + Assert.True(_drainValidationPreDelay.WaitOne(SBTimeoutMills)); + + // Start draining in-flight messages + var drainModeManager = host.Services.GetService(); + await drainModeManager.EnableDrainModeAsync(CancellationToken.None); + + // Validate that function execution was allowed to complete + Assert.True(_drainValidationPostDelay.WaitOne(DrainWaitTimeoutMills + SBTimeoutMills)); + + // Wait for the host to terminate + await host.StopAsync(); + } } private async Task Cleanup() @@ -436,12 +567,107 @@ public static void SBSub2Trigger( } } + public class DrainModeTestJobQueue + { + public async static Task QueueWithSessions( + [ServiceBusTrigger(_queueName, IsSessionsEnabled = true)] Message msg, + IMessageSession messageSession, + CancellationToken cancellationToken, + ILogger logger) + { + logger.LogInformation($"DrainModeValidationFunctions.QueueWithSessions: message data {msg.Body} with session id {msg.SessionId}"); + Assert.Equal(_drainModeSessionId, msg.SessionId); + _drainValidationPreDelay.Set(); + await DrainModeHelper.WaitForCancellation(cancellationToken); + Assert.True(cancellationToken.IsCancellationRequested); + await messageSession.CompleteAsync(msg.SystemProperties.LockToken); + _drainValidationPostDelay.Set(); + } + } + + public class DrainModeTestJobTopic + { + public async static Task TopicWithSessions( + [ServiceBusTrigger(_topicName, _subscriptionName, IsSessionsEnabled = true)] Message msg, + IMessageSession messageSession, + CancellationToken cancellationToken, + ILogger logger) + { + logger.LogInformation($"DrainModeValidationFunctions.TopicWithSessions: message data {msg.Body} with session id {msg.SessionId}"); + Assert.Equal(_drainModeSessionId, msg.SessionId); + _drainValidationPreDelay.Set(); + await DrainModeHelper.WaitForCancellation(cancellationToken); + Assert.True(cancellationToken.IsCancellationRequested); + await messageSession.CompleteAsync(msg.SystemProperties.LockToken); + _drainValidationPostDelay.Set(); + } + } + + public class DrainModeTestJobQueueBatch + { + public async static Task QueueWithSessionsBatch( + [ServiceBusTrigger(_queueName, IsSessionsEnabled = true)] Message[] array, + IMessageSession messageSession, + CancellationToken cancellationToken, + ILogger logger) + { + Assert.True(array.Length > 0); + logger.LogInformation($"DrainModeTestJobBatch.QueueWithSessionsBatch: received {array.Length} messages with session id {array[0].SessionId}"); + Assert.Equal(_drainModeSessionId, array[0].SessionId); + _drainValidationPreDelay.Set(); + await DrainModeHelper.WaitForCancellation(cancellationToken); + Assert.True(cancellationToken.IsCancellationRequested); + foreach (Message msg in array) + { + await messageSession.CompleteAsync(msg.SystemProperties.LockToken); + } + _drainValidationPostDelay.Set(); + } + } + + public class DrainModeTestJobTopicBatch + { + public async static Task TopicWithSessionsBatch( + [ServiceBusTrigger(_topicName, _subscriptionName, IsSessionsEnabled = true)] Message[] array, + MessageReceiver messageReceiver, + CancellationToken cancellationToken, + ILogger logger) + { + Assert.True(array.Length > 0); + logger.LogInformation($"DrainModeTestJobBatch.TopicWithSessionsBatch: received {array.Length} messages with session id {array[0].SessionId}"); + Assert.Equal(_drainModeSessionId, array[0].SessionId); + _drainValidationPreDelay.Set(); + await DrainModeHelper.WaitForCancellation(cancellationToken); + Assert.True(cancellationToken.IsCancellationRequested); + foreach (Message msg in array) + { + await messageReceiver.CompleteAsync(msg.SystemProperties.LockToken); + } + _drainValidationPostDelay.Set(); + } + } + + public class DrainModeHelper + { + public async static Task WaitForCancellation(CancellationToken cancellationToken) + { + // Wait until the drain operation begins, signalled by the cancellation token + int elapsedTimeMills = 0; + while (elapsedTimeMills < DrainWaitTimeoutMills && !cancellationToken.IsCancellationRequested) + { + await Task.Delay(elapsedTimeMills += 500); + } + // Allow some time for the Service Bus SDK to start draining before returning + await Task.Delay(DrainSleepMills); + } + } + public class ServiceBusMultipleTestJobsBase { protected static bool firstReceived = false; protected static bool secondReceived = false; - public static void ProcessMessages(string[] messages) + public static void ProcessMessages(string[] messages, EventWaitHandle waitHandle = null) { if (messages.Contains("{'Name': 'Test1', 'Value': 'Value'}")) { @@ -454,7 +680,7 @@ public static void ProcessMessages(string[] messages) if (firstReceived && secondReceived) { - _waitHandle1.Set(); + bool b = (waitHandle !=null) ? waitHandle.Set() : _waitHandle1.Set(); } } } @@ -577,6 +803,7 @@ public void Dispose() { _waitHandle2.Dispose(); } + Cleanup().GetAwaiter().GetResult(); } } @@ -633,21 +860,32 @@ public static string GetStringMessage(Message message) } } - public static IHost CreateHost(INameResolver reolver, bool addCustomProvider = false) + public static IHost CreateHost(INameResolver nameResolver, bool addCustomProvider = false, bool autoComplete = true) { return new HostBuilder() .ConfigureDefaultTestHost(b => { - b.AddServiceBus(); + b.AddServiceBus(sbOptions => + { + // Will be disabled for drain mode validation as messages are completed by functoin code to validate draining allows completion + sbOptions.SessionHandlerOptions.AutoComplete = autoComplete; + sbOptions.BatchOptions.AutoComplete = autoComplete; + sbOptions.SessionHandlerOptions.MaxAutoRenewDuration = TimeSpan.FromMinutes(ServiceBusSessionsBusEndToEndTests.MaxAutoRenewDurationMin); + sbOptions.SessionHandlerOptions.MaxConcurrentSessions = 1; + }); }) .ConfigureServices(services => { - services.AddSingleton(reolver); + services.AddSingleton(nameResolver); if (addCustomProvider) { services.AddSingleton(); } }) + .ConfigureServices(s => + { + s.Configure(opts => opts.ShutdownTimeout = ServiceBusSessionsBusEndToEndTests.HostShutdownTimeout); + }) .Build(); } diff --git a/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/WebJobs.Extensions.ServiceBus.Tests.csproj b/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/WebJobs.Extensions.ServiceBus.Tests.csproj index 8c4a71ffe..1d5a2dd01 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/WebJobs.Extensions.ServiceBus.Tests.csproj +++ b/test/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests/WebJobs.Extensions.ServiceBus.Tests.csproj @@ -25,8 +25,8 @@ - - + + all