Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.

Commit 6da7fe7

Browse files
authored
Graceful shutdown (message draining) (#102)
1 parent 6c63de3 commit 6da7fe7

File tree

6 files changed

+831
-257
lines changed

6 files changed

+831
-257
lines changed

src/Microsoft.Azure.WebJobs.Extensions.ServiceBus/Listeners/ServiceBusListener.cs

Lines changed: 72 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider
3838
private ClientEntity _clientEntity;
3939
private bool _disposed;
4040
private bool _started;
41+
// Serialize execution of StopAsync to avoid calling Unregister* concurrently
42+
private readonly SemaphoreSlim _stopAsyncSemaphore = new SemaphoreSlim(1, 1);
4143

4244
private IMessageSession _messageSession;
4345
private SessionMessageProcessor _sessionMessageProcessor;
@@ -118,27 +120,53 @@ public Task StartAsync(CancellationToken cancellationToken)
118120
public async Task StopAsync(CancellationToken cancellationToken)
119121
{
120122
ThrowIfDisposed();
121-
122-
if (!_started)
123+
await _stopAsyncSemaphore.WaitAsync();
123124
{
124-
throw new InvalidOperationException("The listener has not yet been started or has already been stopped.");
125-
}
125+
try
126+
{
127+
if (!_started)
128+
{
129+
throw new InvalidOperationException("The listener has not yet been started or has already been stopped.");
130+
}
126131

127-
// cancel our token source to signal any in progress
128-
// ProcessMessageAsync invocations to cancel
129-
_cancellationTokenSource.Cancel();
132+
// Unregister* methods stop new messages from being processed while allowing in-flight messages to complete.
133+
// As the amount of time functions are allowed to complete processing varies by SKU, we specify max timespan
134+
// as the amount of time Service Bus SDK should wait for in-flight messages to complete procesing after
135+
// unregistering the message handler so that functions have as long as the host continues to run time to complete.
136+
if (_singleDispatch)
137+
{
138+
if (_isSessionsEnabled)
139+
{
140+
if (_clientEntity != null)
141+
{
142+
if (_clientEntity is QueueClient queueClient)
143+
{
144+
await queueClient.UnregisterSessionHandlerAsync(TimeSpan.MaxValue);
145+
}
146+
else
147+
{
148+
SubscriptionClient subscriptionClient = _clientEntity as SubscriptionClient;
149+
await subscriptionClient.UnregisterSessionHandlerAsync(TimeSpan.MaxValue);
150+
}
151+
}
152+
}
153+
else
154+
{
155+
if (_receiver != null && _receiver.IsValueCreated)
156+
{
157+
await Receiver.UnregisterMessageHandlerAsync(TimeSpan.MaxValue);
158+
}
159+
}
160+
}
161+
// Batch processing will be stopped via the _started flag on its next iteration
130162

131-
if (_receiver != null && _receiver.IsValueCreated)
132-
{
133-
await Receiver.CloseAsync();
134-
_receiver = CreateMessageReceiver();
135-
}
136-
if (_clientEntity != null)
137-
{
138-
await _clientEntity.CloseAsync();
139-
_clientEntity = null;
163+
_started = false;
164+
}
165+
finally
166+
{
167+
_stopAsyncSemaphore.Release();
168+
}
140169
}
141-
_started = false;
142170
}
143171

144172
public void Cancel()
@@ -176,6 +204,9 @@ public void Dispose()
176204
_clientEntity = null;
177205
}
178206

207+
_stopAsyncSemaphore.Dispose();
208+
_cancellationTokenSource.Dispose();
209+
179210
_disposed = true;
180211
}
181212
}
@@ -192,37 +223,39 @@ private Lazy<SessionClient> CreateSessionClient()
192223

193224
internal async Task ProcessMessageAsync(Message message, CancellationToken cancellationToken)
194225
{
195-
CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token);
196-
197-
if (!await _messageProcessor.BeginProcessingMessageAsync(message, linkedCts.Token))
226+
using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token))
198227
{
199-
return;
200-
}
228+
if (!await _messageProcessor.BeginProcessingMessageAsync(message, linkedCts.Token))
229+
{
230+
return;
231+
}
201232

202-
ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateSingle(message);
203-
input.MessageReceiver = Receiver;
233+
ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateSingle(message);
234+
input.MessageReceiver = Receiver;
204235

205-
TriggeredFunctionData data = input.GetTriggerFunctionData();
206-
FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token);
207-
await _messageProcessor.CompleteProcessingMessageAsync(message, result, linkedCts.Token);
236+
TriggeredFunctionData data = input.GetTriggerFunctionData();
237+
FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token);
238+
await _messageProcessor.CompleteProcessingMessageAsync(message, result, linkedCts.Token);
239+
}
208240
}
209241

210242
internal async Task ProcessSessionMessageAsync(IMessageSession session, Message message, CancellationToken cancellationToken)
211243
{
212-
CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token);
213-
214-
_messageSession = session;
215-
if (!await _sessionMessageProcessor.BeginProcessingMessageAsync(session, message, linkedCts.Token))
244+
using (CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token))
216245
{
217-
return;
218-
}
246+
_messageSession = session;
247+
if (!await _sessionMessageProcessor.BeginProcessingMessageAsync(session, message, linkedCts.Token))
248+
{
249+
return;
250+
}
219251

220-
ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateSingle(message);
221-
input.MessageReceiver = session;
252+
ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateSingle(message);
253+
input.MessageReceiver = session;
222254

223-
TriggeredFunctionData data = input.GetTriggerFunctionData();
224-
FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token);
225-
await _sessionMessageProcessor.CompleteProcessingMessageAsync(session, message, result, linkedCts.Token);
255+
TriggeredFunctionData data = input.GetTriggerFunctionData();
256+
FunctionResult result = await _triggerExecutor.TryExecuteAsync(data, linkedCts.Token);
257+
await _sessionMessageProcessor.CompleteProcessingMessageAsync(session, message, result, linkedCts.Token);
258+
}
226259
}
227260

228261
internal void StartMessageBatchReceiver(CancellationToken cancellationToken)
@@ -246,6 +279,7 @@ internal void StartMessageBatchReceiver(CancellationToken cancellationToken)
246279
{
247280
if (!_started || cancellationToken.IsCancellationRequested)
248281
{
282+
_logger.LogInformation("Message processing has been stopped or cancelled");
249283
return;
250284
}
251285

src/Microsoft.Azure.WebJobs.Extensions.ServiceBus/WebJobs.Extensions.ServiceBus.csproj

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<RootNamespace>Microsoft.Azure.WebJobs.ServiceBus</RootNamespace>
77
<PackageId>Microsoft.Azure.WebJobs.Extensions.ServiceBus</PackageId>
88
<Description>Microsoft Azure WebJobs SDK ServiceBus Extension</Description>
9-
<Version>4.1.2</Version>
9+
<Version>4.2.0$(VersionSuffix)</Version>
1010
<CommitHash Condition="$(CommitHash) == ''">N/A</CommitHash>
1111
<InformationalVersion>$(Version) Commit hash: $(CommitHash)</InformationalVersion>
1212
<Authors>Microsoft</Authors>
@@ -28,6 +28,7 @@
2828
<StyleCopTreatErrorsAsWarnings>false</StyleCopTreatErrorsAsWarnings>
2929
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
3030
<WarningsAsErrors />
31+
<NoWarn>1701;1702</NoWarn>
3132
</PropertyGroup>
3233

3334
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
@@ -37,16 +38,17 @@
3738
</PropertyGroup>
3839

3940
<ItemGroup>
40-
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="4.1.1" />
41-
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.16" />
42-
<PackageReference Include="Microsoft.Azure.WebJobs.Sources" Version="3.0.16" />
41+
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="4.2.0" />
42+
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.19" />
43+
<PackageReference Include="Microsoft.Azure.WebJobs.Sources" Version="3.0.19" />
4344
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
4445
<PrivateAssets>all</PrivateAssets>
4546
</PackageReference>
4647
</ItemGroup>
4748

4849
<ItemGroup>
4950
<AdditionalFiles Include="..\..\stylecop.json" Link="stylecop.json" />
51+
<None Include="..\..\webjobs.png" Pack="true" PackagePath="\" />
5052
</ItemGroup>
5153

5254
</Project>
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# Service Bus Extension for Azure Functions guide to running integration tests locally
2+
Integration tests are implemented in the `EndToEndTests` and `SessionsEndToEndTests` classes and require special configuration to execute locally in Visual Studio or via dotnet test.
3+
4+
All configuration is done via a json file called `appsettings.tests` which on windows should be located in the `%USERPROFILE%\.azurefunctions` folder (e.g. `C:\Users\user123\.azurefunctions`)
5+
6+
**Note:** *The specifics of the configuration will change when the validation code is modified so check the code for the latest configuration if the tests do not pass as this readme file may not have been updated with each code change.*
7+
8+
Create the appropriate Azure resources if needed as explained below and create or update the `appsettings.tests` file in the location specified above by copying the configuration below and replacing all the `PLACEHOLDER` values
9+
10+
appsettings.tests contents
11+
```
12+
{
13+
"ConnectionStrings": {
14+
"ServiceBus": "PLACEHOLDER",
15+
"ServiceBusSecondary": "PLACEHOLDER"
16+
},
17+
"AzureWebJobsStorage": "PLACEHOLDER"
18+
}
19+
```
20+
Create a storage account and configure its connection string into `AzureWebJobsStorage`. This will be used by the webjobs hosts created by the tests.
21+
22+
Create two service bus namespaces and configure their connection strings in `ConnectionStrings:ServiceBus` and `ConnectionStrings:ServiceBusSecondary`.
23+
1. In the namespace configured into `ConnectionStrings:ServiceBus`, create queues with the following names:
24+
1. `core-test-queue1`
25+
2. `core-test-queue2`
26+
3. `core-test-queue3`
27+
4. `core-test-queue1-sessions` (enable sessions when creating)
28+
2. In the namespace configured into `ConnectionStrings:ServiceBus`, create topics and subscriptions with the following names:
29+
1. `core-test-topic1` with two subscriptions: `sub1` and `sub2`
30+
2. `core-test-topic1-sessions` with one subscription: `sub1-sessions` (enable sessions in the subscription when creating)
31+
2. In the namespace configured into `ConnectionStrings:ServiceBusSecondary`, create queues with the following names:
32+
1. `core-test-queue1`
33+
34+
Change the message lock duration setting on all queues and subscriptions to 5 minutes to all for delays associated with stepping through code in debug mode.

0 commit comments

Comments
 (0)