Skip to content

Commit 352a7c1

Browse files
Fix starvation issue with session processor when MaxCallsPerSession > 1 (#14937)
1 parent d1ed549 commit 352a7c1

File tree

6 files changed

+143
-31
lines changed

6 files changed

+143
-31
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ public AmqpReceiver(
140140
timeout: timeout,
141141
prefetchCount: prefetchCount,
142142
receiveMode: receiveMode,
143-
sessionId: SessionId,
144143
isSessionReceiver: isSessionReceiver),
145144
link => CloseLink(link));
146145

@@ -165,7 +164,6 @@ private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
165164
TimeSpan timeout,
166165
uint prefetchCount,
167166
ReceiveMode receiveMode,
168-
string sessionId,
169167
bool isSessionReceiver)
170168
{
171169
ServiceBusEventSource.Log.CreateReceiveLinkStart(_identifier);
@@ -177,20 +175,33 @@ private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
177175
timeout: timeout,
178176
prefetchCount: prefetchCount,
179177
receiveMode: receiveMode,
180-
sessionId: sessionId,
178+
sessionId: SessionId,
181179
isSessionReceiver: isSessionReceiver,
182180
cancellationToken: CancellationToken.None).ConfigureAwait(false);
183181
if (isSessionReceiver)
184182
{
185-
// Refresh the session lock value in case we have reconnected a link.
186-
// SessionId need not be refreshed here as we do not allow reconnecting
187-
// a receiver instance to a different session.
188183
SessionLockedUntil = link.Settings.Properties.TryGetValue<long>(
189184
AmqpClientConstants.LockedUntilUtc, out var lockedUntilUtcTicks) ?
190185
new DateTime(lockedUntilUtcTicks, DateTimeKind.Utc)
191186
: DateTime.MinValue;
187+
188+
var source = (Source)link.Settings.Source;
189+
if (!source.FilterSet.TryGetValue<string>(AmqpClientConstants.SessionFilterName, out var tempSessionId))
190+
{
191+
link.Session.SafeClose();
192+
throw new ServiceBusException(true, Resources.SessionFilterMissing);
193+
}
194+
195+
if (string.IsNullOrWhiteSpace(tempSessionId))
196+
{
197+
link.Session.SafeClose();
198+
throw new ServiceBusException(true, Resources.AmqpFieldSessionId);
199+
}
200+
// This will only have changed if sessionId was left blank when constructing the session
201+
// receiver.
202+
SessionId = tempSessionId;
192203
}
193-
ServiceBusEventSource.Log.CreateReceiveLinkComplete(_identifier);
204+
ServiceBusEventSource.Log.CreateReceiveLinkComplete(_identifier, SessionId);
194205
link.Closed += OnReceiverLinkClosed;
195206
return link;
196207
}
@@ -1280,23 +1291,6 @@ await _retryPolicy.RunOperation(
12801291
link = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false),
12811292
_connectionScope,
12821293
cancellationToken).ConfigureAwait(false);
1283-
1284-
if (_isSessionReceiver)
1285-
{
1286-
var source = (Source)link.Settings.Source;
1287-
if (!source.FilterSet.TryGetValue<string>(AmqpClientConstants.SessionFilterName, out var tempSessionId))
1288-
{
1289-
link.Session.SafeClose();
1290-
throw new ServiceBusException(true, Resources.SessionFilterMissing);
1291-
}
1292-
1293-
if (string.IsNullOrWhiteSpace(tempSessionId))
1294-
{
1295-
link.Session.SafeClose();
1296-
throw new ServiceBusException(true, Resources.AmqpFieldSessionId);
1297-
}
1298-
SessionId = tempSessionId;
1299-
}
13001294
}
13011295

13021296
private bool HasLinkCommunicationError(ReceivingAmqpLink link) =>

sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ internal ServiceBusEventSource() { }
182182
internal const int SendLinkClosedEvent = 100;
183183
internal const int ManagementLinkClosedEvent = 101;
184184

185+
internal const int ProcessorMessageHandlerStartEvent = 102;
186+
internal const int ProcessorMessageHandlerCompleteEvent = 103;
187+
internal const int ProcessorMessageHandlerExceptionEvent = 104;
188+
185189
#endregion
186190
// add new event numbers here incrementing from previous
187191

@@ -743,6 +747,33 @@ public void ProcessorErrorHandlerThrewException(string exception)
743747
}
744748
}
745749

750+
[Event(ProcessorMessageHandlerStartEvent, Level = EventLevel.Informational, Message = "{0}: User message handler start: Message: SequenceNumber: {1}")]
751+
public void ProcessorMessageHandlerStart(string identifier, long sequenceNumber)
752+
{
753+
if (IsEnabled())
754+
{
755+
WriteEvent(ProcessorMessageHandlerStartEvent, identifier, sequenceNumber);
756+
}
757+
}
758+
759+
[Event(ProcessorMessageHandlerCompleteEvent, Level = EventLevel.Informational, Message = "{0}: User message handler complete: Message: SequenceNumber: {1}")]
760+
public void ProcessorMessageHandlerComplete(string identifier, long sequenceNumber)
761+
{
762+
if (IsEnabled())
763+
{
764+
WriteEvent(ProcessorMessageHandlerCompleteEvent, identifier, sequenceNumber);
765+
}
766+
}
767+
768+
[Event(ProcessorMessageHandlerExceptionEvent, Level = EventLevel.Error, Message = "{0}: User message handler complete: Message: SequenceNumber: {1}, Exception: {2}")]
769+
public void ProcessorMessageHandlerException(string identifier, long sequenceNumber, string exception)
770+
{
771+
if (IsEnabled())
772+
{
773+
WriteEvent(ProcessorMessageHandlerExceptionEvent, identifier, sequenceNumber, exception);
774+
}
775+
}
776+
746777
#endregion region
747778

748779
#region Rule management
@@ -992,12 +1023,12 @@ public virtual void CreateReceiveLinkStart(string identifier)
9921023
}
9931024
}
9941025

995-
[Event(CreateReceiveLinkCompleteEvent, Level = EventLevel.Informational, Message = "Receive link created for Identifier: {0}.")]
996-
public virtual void CreateReceiveLinkComplete(string identifier)
1026+
[Event(CreateReceiveLinkCompleteEvent, Level = EventLevel.Informational, Message = "Receive link created for Identifier: {0}. Session Id: {1}")]
1027+
public virtual void CreateReceiveLinkComplete(string identifier, string sessionId)
9971028
{
9981029
if (IsEnabled())
9991030
{
1000-
WriteEvent(CreateReceiveLinkCompleteEvent, identifier);
1031+
WriteEvent(CreateReceiveLinkCompleteEvent, identifier, sessionId);
10011032
}
10021033
}
10031034

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,17 @@ private async Task ProcessOneMessage(
170170

171171
errorSource = ServiceBusErrorSource.UserCallback;
172172

173-
await OnMessageHandler(message, cancellationToken).ConfigureAwait(false);
173+
try
174+
{
175+
ServiceBusEventSource.Log.ProcessorMessageHandlerStart(_identifier, message.SequenceNumber);
176+
await OnMessageHandler(message, cancellationToken).ConfigureAwait(false);
177+
ServiceBusEventSource.Log.ProcessorMessageHandlerComplete(_identifier, message.SequenceNumber);
178+
}
179+
catch (Exception ex)
180+
{
181+
ServiceBusEventSource.Log.ProcessorMessageHandlerException(_identifier, message.SequenceNumber, ex.ToString());
182+
throw;
183+
}
174184

175185
if (Receiver.ReceiveMode == ReceiveMode.PeekLock &&
176186
_processorOptions.AutoComplete &&

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ internal class SessionReceiverManager : ReceiverManager
3636
private CancellationTokenSource _sessionLockRenewalCancellationSource;
3737
private Task _sessionLockRenewalTask;
3838
private CancellationTokenSource _sessionCancellationSource = new CancellationTokenSource();
39+
private bool _receiveTimeout;
40+
3941
protected override ServiceBusReceiver Receiver => _receiver;
4042

4143
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
@@ -80,10 +82,16 @@ private async Task<bool> EnsureCanProcess(CancellationToken cancellationToken)
8082
{
8183
await WaitSemaphore(cancellationToken).ConfigureAwait(false);
8284
releaseSemaphore = true;
83-
if (_threadCount >= _maxCallsPerSession)
85+
86+
// If a receive call timed out for this session, avoid adding more threads
87+
// if we don't intend to leave the receiver open on receive timeouts. This
88+
// will help ensure other sessions get a chance to be processed.
89+
if (_threadCount >= _maxCallsPerSession ||
90+
(_receiveTimeout && !_keepOpenOnReceiveTimeout))
8491
{
8592
return false;
8693
}
94+
8795
if (_receiver == null)
8896
{
8997
await CreateAndInitializeSessionReceiver(cancellationToken).ConfigureAwait(false);
@@ -185,7 +193,7 @@ public override async Task CloseReceiverIfNeeded(
185193
_threadCount--;
186194
if (_threadCount == 0 && !processorCancellationToken.IsCancellationRequested)
187195
{
188-
if (!_keepOpenOnReceiveTimeout ||
196+
if ((_receiveTimeout && !_keepOpenOnReceiveTimeout) ||
189197
!AutoRenewLock ||
190198
_sessionLockRenewalCancellationSource.IsCancellationRequested)
191199
{
@@ -234,6 +242,7 @@ await RaiseExceptionReceived(
234242
// Always at least attempt to dispose. If this fails, it won't be retried.
235243
await _receiver.DisposeAsync().ConfigureAwait(false);
236244
_receiver = null;
245+
_receiveTimeout = false;
237246
}
238247
}
239248

@@ -269,6 +278,7 @@ public override async Task ReceiveAndProcessMessagesAsync(CancellationToken proc
269278
{
270279
// Break out of the loop to allow a new session to
271280
// be processed.
281+
_receiveTimeout = true;
272282
break;
273283
}
274284
await ProcessOneMessageWithinScopeAsync(

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,5 +271,73 @@ public async Task LogsPluginExceptionEvents()
271271
Assert.AreEqual(2, _listener.EventsById(ServiceBusEventSource.PluginExceptionEvent).Count());
272272
};
273273
}
274+
275+
[Test]
276+
public async Task LogsProcessorEvents()
277+
{
278+
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
279+
{
280+
await using var client = GetClient();
281+
var sender = client.CreateSender(scope.QueueName);
282+
await sender.SendMessageAsync(GetMessage());
283+
await using var processor = client.CreateProcessor(scope.QueueName);
284+
var tcs = new TaskCompletionSource<bool>();
285+
286+
Task ProcessMessage(ProcessMessageEventArgs args)
287+
{
288+
tcs.SetResult(true);
289+
return Task.CompletedTask;
290+
}
291+
292+
Task ExceptionHandler(ProcessErrorEventArgs args)
293+
{
294+
return Task.CompletedTask;
295+
}
296+
297+
processor.ProcessMessageAsync += ProcessMessage;
298+
processor.ProcessErrorAsync += ExceptionHandler;
299+
300+
await processor.StartProcessingAsync();
301+
await tcs.Task;
302+
await processor.StopProcessingAsync();
303+
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerStartEvent);
304+
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerCompleteEvent);
305+
}
306+
}
307+
308+
[Test]
309+
public async Task LogsProcessorExceptionEvent()
310+
{
311+
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
312+
{
313+
await using var client = GetClient();
314+
var sender = client.CreateSender(scope.QueueName);
315+
await sender.SendMessageAsync(GetMessage());
316+
await using var processor = client.CreateProcessor(scope.QueueName);
317+
var tcs = new TaskCompletionSource<bool>();
318+
319+
Task ProcessMessage(ProcessMessageEventArgs args)
320+
{
321+
tcs.SetResult(true);
322+
throw new Exception();
323+
}
324+
325+
Task ExceptionHandler(ProcessErrorEventArgs args)
326+
{
327+
throw new Exception();
328+
}
329+
330+
processor.ProcessMessageAsync += ProcessMessage;
331+
processor.ProcessErrorAsync += ExceptionHandler;
332+
333+
await processor.StartProcessingAsync();
334+
await tcs.Task;
335+
await processor.StopProcessingAsync();
336+
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerStartEvent);
337+
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerExceptionEvent);
338+
_listener.SingleEventById(ServiceBusEventSource.ProcessorErrorHandlerThrewExceptionEvent);
339+
340+
}
341+
}
274342
}
275343
}

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1486,7 +1486,6 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
14861486
[TestCase(10, 10, 1)]
14871487
[TestCase(10, 5, 2)]
14881488
[TestCase(10, 20, 5)]
1489-
[Timeout(60 * 1000 * 15)]
14901489
public async Task MaxCallsPerSessionRespected(int numSessions, int maxConcurrentSessions, int maxCallsPerSession)
14911490
{
14921491
await using (var scope = await ServiceBusScope.CreateWithQueue(

0 commit comments

Comments
 (0)