Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 18 additions & 24 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ public AmqpReceiver(
timeout: timeout,
prefetchCount: prefetchCount,
receiveMode: receiveMode,
sessionId: SessionId,
isSessionReceiver: isSessionReceiver),
link => CloseLink(link));

Expand All @@ -165,7 +164,6 @@ private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
TimeSpan timeout,
uint prefetchCount,
ReceiveMode receiveMode,
string sessionId,
bool isSessionReceiver)
{
ServiceBusEventSource.Log.CreateReceiveLinkStart(_identifier);
Expand All @@ -177,20 +175,33 @@ private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
timeout: timeout,
prefetchCount: prefetchCount,
receiveMode: receiveMode,
sessionId: sessionId,
sessionId: SessionId,
isSessionReceiver: isSessionReceiver,
cancellationToken: CancellationToken.None).ConfigureAwait(false);
if (isSessionReceiver)
{
// Refresh the session lock value in case we have reconnected a link.
// SessionId need not be refreshed here as we do not allow reconnecting
// a receiver instance to a different session.
SessionLockedUntil = link.Settings.Properties.TryGetValue<long>(
AmqpClientConstants.LockedUntilUtc, out var lockedUntilUtcTicks) ?
new DateTime(lockedUntilUtcTicks, DateTimeKind.Utc)
: DateTime.MinValue;

var source = (Source)link.Settings.Source;
if (!source.FilterSet.TryGetValue<string>(AmqpClientConstants.SessionFilterName, out var tempSessionId))
{
link.Session.SafeClose();
throw new ServiceBusException(true, Resources.SessionFilterMissing);
}

if (string.IsNullOrWhiteSpace(tempSessionId))
{
link.Session.SafeClose();
throw new ServiceBusException(true, Resources.AmqpFieldSessionId);
}
// This will only have changed if sessionId was left blank when constructing the session
// receiver.
SessionId = tempSessionId;
}
ServiceBusEventSource.Log.CreateReceiveLinkComplete(_identifier);
ServiceBusEventSource.Log.CreateReceiveLinkComplete(_identifier, SessionId);
link.Closed += OnReceiverLinkClosed;
return link;
}
Expand Down Expand Up @@ -1280,23 +1291,6 @@ await _retryPolicy.RunOperation(
link = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false),
_connectionScope,
cancellationToken).ConfigureAwait(false);

if (_isSessionReceiver)
{
var source = (Source)link.Settings.Source;
if (!source.FilterSet.TryGetValue<string>(AmqpClientConstants.SessionFilterName, out var tempSessionId))
{
link.Session.SafeClose();
throw new ServiceBusException(true, Resources.SessionFilterMissing);
}

if (string.IsNullOrWhiteSpace(tempSessionId))
{
link.Session.SafeClose();
throw new ServiceBusException(true, Resources.AmqpFieldSessionId);
}
SessionId = tempSessionId;
}
}

private bool HasLinkCommunicationError(ReceivingAmqpLink link) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ internal ServiceBusEventSource() { }
internal const int SendLinkClosedEvent = 100;
internal const int ManagementLinkClosedEvent = 101;

internal const int ProcessorMessageHandlerStartEvent = 102;
internal const int ProcessorMessageHandlerCompleteEvent = 103;
internal const int ProcessorMessageHandlerExceptionEvent = 104;

#endregion
// add new event numbers here incrementing from previous

Expand Down Expand Up @@ -743,6 +747,33 @@ public void ProcessorErrorHandlerThrewException(string exception)
}
}

[Event(ProcessorMessageHandlerStartEvent, Level = EventLevel.Informational, Message = "{0}: User message handler start: Message: SequenceNumber: {1}")]
public void ProcessorMessageHandlerStart(string identifier, long sequenceNumber)
{
if (IsEnabled())
{
WriteEvent(ProcessorMessageHandlerStartEvent, identifier, sequenceNumber);
}
}

[Event(ProcessorMessageHandlerCompleteEvent, Level = EventLevel.Informational, Message = "{0}: User message handler complete: Message: SequenceNumber: {1}")]
public void ProcessorMessageHandlerComplete(string identifier, long sequenceNumber)
{
if (IsEnabled())
{
WriteEvent(ProcessorMessageHandlerCompleteEvent, identifier, sequenceNumber);
}
}

[Event(ProcessorMessageHandlerExceptionEvent, Level = EventLevel.Error, Message = "{0}: User message handler complete: Message: SequenceNumber: {1}, Exception: {2}")]
public void ProcessorMessageHandlerException(string identifier, long sequenceNumber, string exception)
{
if (IsEnabled())
{
WriteEvent(ProcessorMessageHandlerExceptionEvent, identifier, sequenceNumber, exception);
}
}

#endregion region

#region Rule management
Expand Down Expand Up @@ -992,12 +1023,12 @@ public virtual void CreateReceiveLinkStart(string identifier)
}
}

[Event(CreateReceiveLinkCompleteEvent, Level = EventLevel.Informational, Message = "Receive link created for Identifier: {0}.")]
public virtual void CreateReceiveLinkComplete(string identifier)
[Event(CreateReceiveLinkCompleteEvent, Level = EventLevel.Informational, Message = "Receive link created for Identifier: {0}. Session Id: {1}")]
public virtual void CreateReceiveLinkComplete(string identifier, string sessionId)
{
if (IsEnabled())
{
WriteEvent(CreateReceiveLinkCompleteEvent, identifier);
WriteEvent(CreateReceiveLinkCompleteEvent, identifier, sessionId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to guard against null here for sessionId?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so.. passing null sessionId seems to be fine.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,17 @@ private async Task ProcessOneMessage(

errorSource = ServiceBusErrorSource.UserCallback;

await OnMessageHandler(message, cancellationToken).ConfigureAwait(false);
try
{
ServiceBusEventSource.Log.ProcessorMessageHandlerStart(_identifier, message.SequenceNumber);
await OnMessageHandler(message, cancellationToken).ConfigureAwait(false);
ServiceBusEventSource.Log.ProcessorMessageHandlerComplete(_identifier, message.SequenceNumber);
}
catch (Exception ex)
{
ServiceBusEventSource.Log.ProcessorMessageHandlerException(_identifier, message.SequenceNumber, ex.ToString());
throw;
}

if (Receiver.ReceiveMode == ReceiveMode.PeekLock &&
_processorOptions.AutoComplete &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ internal class SessionReceiverManager : ReceiverManager
private CancellationTokenSource _sessionLockRenewalCancellationSource;
private Task _sessionLockRenewalTask;
private CancellationTokenSource _sessionCancellationSource = new CancellationTokenSource();
private bool _receiveTimeout;

protected override ServiceBusReceiver Receiver => _receiver;

private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
Expand Down Expand Up @@ -80,7 +82,11 @@ private async Task<bool> EnsureCanProcess(CancellationToken cancellationToken)
{
await WaitSemaphore(cancellationToken).ConfigureAwait(false);
releaseSemaphore = true;
if (_threadCount >= _maxCallsPerSession)
if (_threadCount >= _maxCallsPerSession ||
// If a receive call timed out for this session, avoid adding more threads
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest hoisting the comment outside of the if statement. I found it confusing and had to read a couple of times before it clicked for me that it was still part of the preceding conditional.

// if we don't intend to leave the receiver open on receive timeouts. This
// will help ensure other sessions get a chance to be processed.
(_receiveTimeout && !_keepOpenOnReceiveTimeout))
{
return false;
}
Expand Down Expand Up @@ -185,7 +191,7 @@ public override async Task CloseReceiverIfNeeded(
_threadCount--;
if (_threadCount == 0 && !processorCancellationToken.IsCancellationRequested)
{
if (!_keepOpenOnReceiveTimeout ||
if ((_receiveTimeout && !_keepOpenOnReceiveTimeout) ||
!AutoRenewLock ||
_sessionLockRenewalCancellationSource.IsCancellationRequested)
{
Expand Down Expand Up @@ -234,6 +240,7 @@ await RaiseExceptionReceived(
// Always at least attempt to dispose. If this fails, it won't be retried.
await _receiver.DisposeAsync().ConfigureAwait(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the dispose fails, should _receiver and _receiveTimeout still have their state reset?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, similar to how we don't allow retrying close on sender/receiver, if the close here fails the processor will not retain a reference to the receiver and we expect the service to close the link after the idle timeout of 10 minutes.

_receiver = null;
_receiveTimeout = false;
}
}

Expand Down Expand Up @@ -269,6 +276,7 @@ public override async Task ReceiveAndProcessMessagesAsync(CancellationToken proc
{
// Break out of the loop to allow a new session to
// be processed.
_receiveTimeout = true;
break;
}
await ProcessOneMessageWithinScopeAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,5 +271,73 @@ public async Task LogsPluginExceptionEvents()
Assert.AreEqual(2, _listener.EventsById(ServiceBusEventSource.PluginExceptionEvent).Count());
};
}

[Test]
public async Task LogsProcessorEvents()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage());
await using var processor = client.CreateProcessor(scope.QueueName);
var tcs = new TaskCompletionSource<bool>();

Task ProcessMessage(ProcessMessageEventArgs args)
{
tcs.SetResult(true);
return Task.CompletedTask;
}

Task ExceptionHandler(ProcessErrorEventArgs args)
{
return Task.CompletedTask;
}

processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;

await processor.StartProcessingAsync();
await tcs.Task;
await processor.StopProcessingAsync();
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerStartEvent);
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerCompleteEvent);
}
}

[Test]
public async Task LogsProcessorExceptionEvent()
{
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false))
{
await using var client = GetClient();
var sender = client.CreateSender(scope.QueueName);
await sender.SendMessageAsync(GetMessage());
await using var processor = client.CreateProcessor(scope.QueueName);
var tcs = new TaskCompletionSource<bool>();

Task ProcessMessage(ProcessMessageEventArgs args)
{
tcs.SetResult(true);
throw new Exception();
}

Task ExceptionHandler(ProcessErrorEventArgs args)
{
throw new Exception();
}

processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;

await processor.StartProcessingAsync();
await tcs.Task;
await processor.StopProcessingAsync();
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerStartEvent);
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerExceptionEvent);
_listener.SingleEventById(ServiceBusEventSource.ProcessorErrorHandlerThrewExceptionEvent);

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1486,7 +1486,6 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
[TestCase(10, 10, 1)]
[TestCase(10, 5, 2)]
[TestCase(10, 20, 5)]
[Timeout(60 * 1000 * 15)]
public async Task MaxCallsPerSessionRespected(int numSessions, int maxConcurrentSessions, int maxCallsPerSession)
{
await using (var scope = await ServiceBusScope.CreateWithQueue(
Expand Down