Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider
private ClientEntity _clientEntity;
private bool _disposed;
private bool _started;
private bool _isStopping;

private IMessageSession _messageSession;
private SessionMessageProcessor _sessionMessageProcessor;
Expand Down Expand Up @@ -124,20 +125,45 @@ public async Task StopAsync(CancellationToken cancellationToken)
throw new InvalidOperationException("The listener has not yet been started or has already been stopped.");
}

// cancel our token source to signal any in progress
// ProcessMessageAsync invocations to cancel
_cancellationTokenSource.Cancel();

if (_receiver != null && _receiver.IsValueCreated)
if (_isStopping)
{
await Receiver.CloseAsync();
_receiver = CreateMessageReceiver();
throw new InvalidOperationException("The listener is currently stopping");
}
if (_clientEntity != null)

// Unregister* methods stop new messages from being processed while allowing in-flight messages to complete.
// As the amount of time functions are allowed to complete processing varies by SKU, we specify max timespan
// as the amount of time Service Bus SDK should wait for in-flight messages to complete procesing after
// unregistering the message handler so that functions have as long as the host continues to run time to complete.
if (_singleDispatch)
{
await _clientEntity.CloseAsync();
_clientEntity = null;
_isStopping = true;

if (_isSessionsEnabled)
{
if (_clientEntity != null)
{
if (_clientEntity is QueueClient queueClient)
{
await queueClient.UnregisterSessionHandlerAsync(TimeSpan.MaxValue);
}
else
{
SubscriptionClient subscriptionClient = _clientEntity as SubscriptionClient;
await subscriptionClient.UnregisterSessionHandlerAsync(TimeSpan.MaxValue);
}
}
}
else
{
if (_receiver != null && _receiver.IsValueCreated)
{
await Receiver.UnregisterMessageHandlerAsync(TimeSpan.MaxValue);
}
}
}
// Batch processing will be stopped via the _started flag on its next iteration

_isStopping = false;
_started = false;
}

Expand Down Expand Up @@ -246,6 +272,7 @@ internal void StartMessageBatchReceiver(CancellationToken cancellationToken)
{
if (!_started || cancellationToken.IsCancellationRequested)
{
_logger.LogInformation("Message processing has been stopped or cancelled");
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
<RootNamespace>Microsoft.Azure.WebJobs.ServiceBus</RootNamespace>
<PackageId>Microsoft.Azure.WebJobs.Extensions.ServiceBus</PackageId>
<Description>Microsoft Azure WebJobs SDK ServiceBus Extension</Description>
<Version>4.1.2</Version>
<Version>4.2.0$(VersionSuffix)</Version>
<CommitHash Condition="$(CommitHash) == ''">N/A</CommitHash>
<InformationalVersion>$(Version) Commit hash: $(CommitHash)</InformationalVersion>
<Authors>Microsoft</Authors>
<Company>Microsoft</Company>
<Copyright>© Microsoft Corporation. All rights reserved.</Copyright>
<PackageLicenseUrl>https://go.microsoft.com/fwlink/?linkid=2028464</PackageLicenseUrl>
<PackageIconUrl>https://raw.githubusercontent.com/Azure/azure-webjobs-sdk/dev/webjobs.png</PackageIconUrl>
<PackageIcon>webjobs.png</PackageIcon>
<PackageProjectUrl>http://go.microsoft.com/fwlink/?LinkID=320972</PackageProjectUrl>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/Azure/azure-functions-servicebus-extension</RepositoryUrl>
Expand All @@ -28,6 +29,7 @@
<StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsAsErrors />
<NoWarn>1701;1702;NU5125;NU5048</NoWarn>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
Expand All @@ -37,16 +39,17 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="4.1.1" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.16" />
<PackageReference Include="Microsoft.Azure.WebJobs.Sources" Version="3.0.16" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="4.2.0" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.19" />
<PackageReference Include="Microsoft.Azure.WebJobs.Sources" Version="3.0.19" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<AdditionalFiles Include="..\..\stylecop.json" Link="stylecop.json" />
<None Include="..\..\webjobs.png" Pack="true" PackagePath="\" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,16 @@ public class ServiceBusEndToEndTests : IDisposable

private const string TriggerDetailsMessageStart = "Trigger Details:";

private const int SBTimeout = 60 * 1000;
private const int SBTimeout = 120 * 1000;
private const int DrainSleepTime = 60 * 1000;
private const int MaxAutoRenewDurationMin = 5;
internal static TimeSpan HostShutdownTimeout = TimeSpan.FromSeconds(120);

private static EventWaitHandle _topicSubscriptionCalled1;
private static EventWaitHandle _topicSubscriptionCalled2;
private static EventWaitHandle _eventWait;
private static EventWaitHandle _drainValidationPreDelay;
private static EventWaitHandle _drainValidationPostDelay;

// These two variables will be checked at the end of the test
private static string _resultMessage1;
Expand Down Expand Up @@ -108,6 +113,10 @@ public async Task CustomMessageProcessorTest()
{
services.AddSingleton<MessagingProvider, CustomMessagingProvider>();
})
.ConfigureServices(s =>
{
s.Configure<HostOptions>(opts => opts.ShutdownTimeout = HostShutdownTimeout);
})
.Build();

var loggerProvider = host.GetTestLoggerProvider();
Expand All @@ -133,15 +142,21 @@ public async Task MultipleAccountTest()
{
services.AddSingleton<MessagingProvider, CustomMessagingProvider>();
})
.ConfigureServices(s =>
{
s.Configure<HostOptions>(opts => opts.ShutdownTimeout = HostShutdownTimeout);
})
.Build();

await WriteQueueMessage(_secondaryConnectionString, FirstQueueName, "Test");

_topicSubscriptionCalled1 = new ManualResetEvent(initialState: false);
_topicSubscriptionCalled2 = new ManualResetEvent(initialState: false);

await host.StartAsync();

_topicSubscriptionCalled1.WaitOne(SBTimeout);
_topicSubscriptionCalled2.WaitOne(SBTimeout);

// ensure all logs have had a chance to flush
await Task.Delay(3000);
Expand All @@ -150,7 +165,8 @@ public async Task MultipleAccountTest()
await host.StopAsync();
host.Dispose();

Assert.Equal("Test-topic-1", _resultMessage1);
Assert.Equal("Test-SBQueue2SBQueue-SBQueue2SBTopic-topic-1", _resultMessage1);
Assert.Equal("Test-SBQueue2SBQueue-SBQueue2SBTopic-topic-2", _resultMessage2);
}

[Fact]
Expand Down Expand Up @@ -212,6 +228,65 @@ public async Task BindToString()
Assert.Contains("Input(foobar)", logs);
}

[Fact]
public async Task MessageDrainingQueue()
{
await TestSingleDrainMode<DrainModeValidationFunctions>(true);
}

[Fact]
public async Task MessageDrainingTopic()
{
await TestSingleDrainMode<DrainModeValidationFunctions>(false);
}

[Fact]
public async Task MessageDrainingQueueBatch()
{
await TestMultipleDrainMode<DrainModeValidationFunctions>(true);
}

[Fact]
public async Task MessageDrainingTopicBatch()
{
await TestMultipleDrainMode<DrainModeValidationFunctions>(false);
}

/*
* Helper functions
*/

private async Task TestSingleDrainMode<T>(bool sendToQueue)
{
var host = BuildTestHost<DrainModeValidationFunctions>();
await host.StartAsync();

_drainValidationPreDelay = new ManualResetEvent(initialState: false);
_drainValidationPostDelay = new ManualResetEvent(initialState: false);

if (sendToQueue)
{
await WriteQueueMessage(_primaryConnectionString, FirstQueueName, "queue-message-draining-no-sessions-1");
}
else
{
await WriteTopicMessage(_primaryConnectionString, TopicName, "topic-message-draining-no-sessions-1");
}

// Wait to ensure function invocatoin has started before draining messages
Assert.True(_drainValidationPreDelay.WaitOne(SBTimeout));

// Start draining in-flight messages
var drainModeManager = host.Services.GetService<IDrainModeManager>();
await drainModeManager.EnableDrainModeAsync(CancellationToken.None);

// Validate that function execution was allowed to complete
Assert.True(_drainValidationPostDelay.WaitOne(DrainSleepTime + SBTimeout));

await host.StopAsync();
host.Dispose();
}

private async Task TestMultiple<T>(bool isXml = false)
{
IHost host = BuildTestHost<T>();
Expand Down Expand Up @@ -242,6 +317,50 @@ private async Task TestMultiple<T>(bool isXml = false)
host.Dispose();
}

private async Task TestMultipleDrainMode<T>(bool sendToQueue)
{
IHost host = new HostBuilder()
.ConfigureDefaultTestHost<T>(b =>
{
b.AddServiceBus();
}, nameResolver: _nameResolver)
.ConfigureServices(s =>
{
s.Configure<HostOptions>(opts => opts.ShutdownTimeout = HostShutdownTimeout);
})
.Build();

await host.StartAsync();

_drainValidationPreDelay = new ManualResetEvent(initialState: false);
_drainValidationPostDelay = new ManualResetEvent(initialState: false);

if (sendToQueue)
{
await ServiceBusEndToEndTests.WriteQueueMessage(_primaryConnectionString, FirstQueueName, "{'Name': 'Test1', 'Value': 'Value'}");
await ServiceBusEndToEndTests.WriteQueueMessage(_primaryConnectionString, FirstQueueName, "{'Name': 'Test2', 'Value': 'Value'}");
}
else
{
await ServiceBusEndToEndTests.WriteTopicMessage(_primaryConnectionString, TopicName, "{'Name': 'Test1', 'Value': 'Value'}");
await ServiceBusEndToEndTests.WriteTopicMessage(_primaryConnectionString, TopicName, "{'Name': 'Test2', 'Value': 'Value'}");
}

// Wait to ensure function invocatoin has started before draining messages
Assert.True(_drainValidationPreDelay.WaitOne(SBTimeout));

// Start draining in-flight messages
var drainModeManager = host.Services.GetService<IDrainModeManager>();
await drainModeManager.EnableDrainModeAsync(CancellationToken.None);

// Validate that function execution was allowed to complete
Assert.True(_drainValidationPostDelay.WaitOne(DrainSleepTime + SBTimeout));

// Wait for the host to terminate
await host.StopAsync();
host.Dispose();
}

private async Task<int> CleanUpEntity(string queueName, string connectionString = null)
{
var messageReceiver = new MessageReceiver(!string.IsNullOrEmpty(connectionString) ? connectionString : _primaryConnectionString, queueName, ReceiveMode.ReceiveAndDelete);
Expand Down Expand Up @@ -292,6 +411,10 @@ private IHost CreateHost<T>()
{
services.AddSingleton<INameResolver>(_nameResolver);
})
.ConfigureServices(s =>
{
s.Configure<HostOptions>(opts => opts.ShutdownTimeout = HostShutdownTimeout);
})
.Build();
}

Expand Down Expand Up @@ -691,6 +814,33 @@ public static void BindToString(
}
}

public class DrainModeValidationFunctions
{
public async static void QueueNoSessions(
[ServiceBusTrigger(FirstQueueName)] Message msg,
string messageId,
ILogger logger)
{
logger.LogInformation($"DrainModeValidationFunctions.QueueNoSessions: message data {msg.Body}");
_drainValidationPreDelay.Set();
// Simulate a long running function execution to validate that drain invocation allows this to complete
await Task.Delay(DrainSleepTime);
_drainValidationPostDelay.Set();
}

public async static void TopicNoSessions(
[ServiceBusTrigger(TopicName, TopicSubscriptionName1)] Message msg,
string messageId,
ILogger logger)
{
logger.LogInformation($"DrainModeValidationFunctions.NoSessions: message data {msg.Body}");
_drainValidationPreDelay.Set();
// Simulate a long running function execution to validate that drain invocation allows this to complete
await Task.Delay(DrainSleepTime);
_drainValidationPostDelay.Set();
}
}

private class CustomMessagingProvider : MessagingProvider
{
public const string CustomMessagingCategory = "CustomMessagingProvider";
Expand All @@ -709,7 +859,7 @@ public override MessageProcessor CreateMessageProcessor(string entityPath, strin
var options = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 3,
MaxAutoRenewDuration = TimeSpan.FromMinutes(1)
MaxAutoRenewDuration = TimeSpan.FromMinutes(MaxAutoRenewDurationMin)
};

var messageReceiver = new MessageReceiver(_options.ConnectionString, entityPath);
Expand Down Expand Up @@ -753,6 +903,10 @@ private IHost BuildTestHost<TJobClass>()
{
b.AddServiceBus();
}, nameResolver: _nameResolver)
.ConfigureServices(s =>
{
s.Configure<HostOptions>(opts => opts.ShutdownTimeout = HostShutdownTimeout);
})
.Build();

return host;
Expand Down
Loading