diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs index dc00d23a11ac..a2cc83a3746d 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs @@ -29,7 +29,8 @@ internal class ReceiverManager protected readonly ServiceBusProcessorOptions ProcessorOptions; private readonly MessagingClientDiagnostics _clientDiagnostics; - protected bool AutoRenewLock => ProcessorOptions.MaxAutoLockRenewalDuration > TimeSpan.Zero; + protected bool AutoRenewLock => ProcessorOptions.MaxAutoLockRenewalDuration > TimeSpan.Zero || + ProcessorOptions.MaxAutoLockRenewalDuration == Timeout.InfiniteTimeSpan; public ReceiverManager( ServiceBusProcessor processor, diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessorOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessorOptions.cs index be045477cc4f..16e791074ad9 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessorOptions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessorOptions.cs @@ -3,6 +3,7 @@ using System; using System.ComponentModel; +using System.Threading; using Azure.Core; namespace Azure.Messaging.ServiceBus @@ -62,6 +63,7 @@ public int PrefetchCount /// /// Gets or sets the maximum duration within which the lock will be renewed automatically. This /// value should be greater than the longest message lock duration; for example, the LockDuration Property. + /// To specify an infinite duration, use . /// /// /// The maximum duration during which message locks are automatically renewed. The default value is 5 minutes. @@ -77,7 +79,10 @@ public TimeSpan MaxAutoLockRenewalDuration set { - Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration)); + if (value != Timeout.InfiniteTimeSpan) + { + Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration)); + } _maxAutoRenewDuration = value; } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs index 6a975d80f7ed..906e35fdba43 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.ComponentModel; +using System.Threading; using Azure.Core; namespace Azure.Messaging.ServiceBus @@ -51,7 +52,7 @@ public int PrefetchCount /// /// Gets or sets the maximum duration within which the session lock will be renewed automatically. This value - /// should be greater than the queue's LockDuration Property. + /// should be greater than the queue's LockDuration Property. To specify an infinite duration, use . /// /// /// The maximum duration during which session locks are automatically renewed. The default value is 5 minutes. @@ -66,7 +67,10 @@ public TimeSpan MaxAutoLockRenewalDuration set { - Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration)); + if (value != Timeout.InfiniteTimeSpan) + { + Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration)); + } _maxAutoRenewDuration = value; } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs index f505f37d48fe..a664f355127c 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs @@ -178,6 +178,7 @@ public void ProcessorOptionsValidation() options.PrefetchCount = 0; options.MaxReceiveWaitTime = TimeSpan.FromSeconds(1); options.MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(0); + options.MaxAutoLockRenewalDuration = Timeout.InfiniteTimeSpan; } [Test] diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs index 8f629269e373..78ac0ea48d30 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs @@ -188,6 +188,7 @@ public void ProcessorOptionsValidation() options.PrefetchCount = 0; options.SessionIdleTimeout = TimeSpan.FromSeconds(1); options.MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(0); + options.MaxAutoLockRenewalDuration = Timeout.InfiniteTimeSpan; } [Test] diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusOptions.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusOptions.cs index b94c74e1575f..d6f931ea6699 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusOptions.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Config/ServiceBusOptions.cs @@ -3,6 +3,7 @@ using System; using System.Net; +using System.Threading; using System.Threading.Tasks; using Azure.Core; using Azure.Messaging.ServiceBus; @@ -73,6 +74,8 @@ public ServiceBusRetryOptions ClientRetryOptions /// Gets or sets the maximum duration within which the lock will be renewed automatically. This /// value should be greater than the longest message lock duration; for example, the LockDuration Property. /// The default value is 5 minutes. This does not apply for functions that receive a batch of messages. + /// To specify an infinite duration, use or -00:00:00.0010000 + /// if specifying via host.json. /// public TimeSpan MaxAutoLockRenewalDuration { @@ -80,7 +83,11 @@ public TimeSpan MaxAutoLockRenewalDuration set { - Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration)); + if (value != Timeout.InfiniteTimeSpan) + { + Argument.AssertNotNegative(value, nameof(MaxAutoLockRenewalDuration)); + } + _maxAutoRenewDuration = value; } } diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj index 432c181e8dfe..b64f6c379bef 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj @@ -37,4 +37,9 @@ + + + + + diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusHostBuilderExtensionsTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusHostBuilderExtensionsTests.cs index ae7df7d419ff..9a8397092d6b 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusHostBuilderExtensionsTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusHostBuilderExtensionsTests.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Linq; using System.Net; +using System.Threading; using Azure.Messaging.ServiceBus; using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Config; @@ -99,6 +100,32 @@ public void ConfigureOptions_Format_Returns_Expected() Assert.AreEqual(10, result.ClientRetryOptions.MaxRetries); } + [Test] + public void ConfigureOptions_InfiniteTimeSpans_Format_Returns_Expected() + { + ServiceBusOptions options = CreateOptionsFromConfigInfiniteTimeSpans(); + JObject jObject = new JObject + { + { ExtensionPath, JObject.Parse(((IOptionsFormatter)options).Format()) } + }; + + ServiceBusOptions result = TestHelpers.GetConfiguredOptions( + b => + { + b.AddServiceBus(); + }, + jsonStream: new BinaryData(jObject.ToString()).ToStream()); + + Assert.AreEqual(123, result.PrefetchCount); + + Assert.AreEqual(123, result.MaxConcurrentCalls); + Assert.False(result.AutoCompleteMessages); + Assert.AreEqual(Timeout.InfiniteTimeSpan, result.MaxAutoLockRenewalDuration); + Assert.AreEqual(Timeout.InfiniteTimeSpan, result.MaxBatchWaitTime); + Assert.AreEqual("http://proxyserver:8080/", ((WebProxy)result.WebProxy).Address.AbsoluteUri); + Assert.AreEqual(10, result.ClientRetryOptions.MaxRetries); + } + private static ServiceBusOptions CreateOptionsFromConfig() { var values = new Dictionary @@ -124,6 +151,31 @@ private static ServiceBusOptions CreateOptionsFromConfig() return options; } + private static ServiceBusOptions CreateOptionsFromConfigInfiniteTimeSpans() + { + var values = new Dictionary + { + { $"{ExtensionPath}:PrefetchCount", "123" }, + { $"ConnectionStrings:ServiceBus", "TestConnectionString" }, + { $"{ExtensionPath}:MaxConcurrentCalls", "123" }, + { $"{ExtensionPath}:AutoCompleteMessages", "false" }, + { $"{ExtensionPath}:MaxAutoLockRenewalDuration", "-00:00:00.0010000" }, + { $"{ExtensionPath}:MaxConcurrentSessions", "123" }, + { $"{ExtensionPath}:TransportType", "AmqpWebSockets" }, + { $"{ExtensionPath}:MaxMessageBatchSize", "20" }, + { $"{ExtensionPath}:MinMessageBatchSize", "10" }, + { $"{ExtensionPath}:MaxBatchWaitTime", "-00:00:00.0010000" }, + { $"{ExtensionPath}:WebProxy", "http://proxyserver:8080/" }, + { $"{ExtensionPath}:ClientRetryOptions:MaxRetries", "10" }, + }; + + ServiceBusOptions options = TestHelpers.GetConfiguredOptions(b => + { + b.AddServiceBus(); + }, values); + return options; + } + private static ServiceBusOptions CreateOptionsFromConfigBackCompat() { var values = new Dictionary diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusOptionsTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusOptionsTests.cs index d2b18b8546d3..a65060f4b804 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusOptionsTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Config/ServiceBusOptionsTests.cs @@ -106,6 +106,25 @@ public void ToProcessorOptions_ReturnsExpectedValue() Assert.AreEqual(sbOptions.MaxConcurrentCalls, processorOptions.MaxConcurrentCalls); } + [Test] + public void ToProcessorOptions_InfiniteTimeSpans_ReturnsExpectedValue() + { + ServiceBusOptions sbOptions = new ServiceBusOptions + { + AutoCompleteMessages = false, + PrefetchCount = 123, + MaxAutoLockRenewalDuration = Timeout.InfiniteTimeSpan, + SessionIdleTimeout = Timeout.InfiniteTimeSpan, + MaxConcurrentCalls = 123 + }; + + ServiceBusProcessorOptions processorOptions = sbOptions.ToProcessorOptions(true, false); + Assert.AreEqual(true, processorOptions.AutoCompleteMessages); + Assert.AreEqual(sbOptions.PrefetchCount, processorOptions.PrefetchCount); + Assert.AreEqual(sbOptions.MaxAutoLockRenewalDuration, processorOptions.MaxAutoLockRenewalDuration); + Assert.AreEqual(sbOptions.MaxConcurrentCalls, processorOptions.MaxConcurrentCalls); + } + [Test] [Category("DynamicConcurrency")] public void ToProcessorOptions_DynamicConcurrencyEnabled_ReturnsExpectedValue() diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs index e4ebe3bb657c..30e5df90a5df 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/ServiceBusEndToEndTests.cs @@ -27,6 +27,7 @@ using Azure.Core.Shared; using Azure.Core.Tests; using Azure.Messaging.ServiceBus.Diagnostics; +using Microsoft.Extensions.Configuration; using Constants = Microsoft.Azure.WebJobs.ServiceBus.Constants; namespace Microsoft.Azure.WebJobs.Host.EndToEndTests @@ -242,6 +243,22 @@ public async Task TestSingle_AutoCompleteEnabledOnTrigger_CompleteInFunction() } } + [Test] + public async Task TestSingle_InfiniteLockRenewal() + { + await WriteQueueMessage("{'Name': 'Test1', 'Value': 'Value'}"); + var host = BuildHost( + SetInfiniteLockRenewal); + using (host) + { + bool result = _waitHandle1.WaitOne(SBTimeoutMills); + Assert.True(result); + await host.StopAsync(); + var logs = host.GetTestLoggerProvider().GetAllLogMessages(); + Assert.IsNotEmpty(logs.Where(message => message.FormattedMessage.Contains("RenewMessageLock"))); + } + } + [Test] public async Task TestSingle_Dispose() { @@ -829,6 +846,14 @@ private async Task TestSingleDrainMode(bool sendToQueue) sbOptions.AutoCompleteMessages = false; })); + private static Action SetInfiniteLockRenewal => + builder => + builder.ConfigureAppConfiguration(b => + b.AddInMemoryCollection(new Dictionary + { + { "AzureWebJobs:Extensions:ServiceBus:MaxAutoLockRenewalDuration", "-00:00:00.0010000" }, + })); + private static Action BuildDrainHost() { return builder => @@ -1632,6 +1657,19 @@ public static async Task RunAsync( } } + public class TestSingleInfiniteLockRenewal + { + public static async Task RunAsync( + [ServiceBusTrigger(FirstQueueNameKey)] + ServiceBusReceivedMessage message, + ServiceBusMessageActions messageActions) + { + // wait long enough to trigger lock renewal + await Task.Delay(TimeSpan.FromSeconds(20)); + _waitHandle1.Set(); + } + } + public class TestSingleDispose { public static async Task RunAsync(