Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release History
## 4.2.0
### Improvements
- Enable a way to Unregister Message Handler and Session Handler [PR 14021](https://github.com/Azure/azure-sdk-for-net/pull/14021)

## 4.1.3 (2020-04-17)
- Add `GetQueuesRuntimeInfoAsync`, `GetTopicsRuntimeInfoAsync` and `GetSubscriptionsRuntimeInfoAsync` to `ManagementClient` to allow retrieval of batched entity runtime information. [PR 10261](https://github.com/Azure/azure-sdk-for-net/pull/10261)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public interface IReceiverClient : IClientEntity
/// <remarks>Enable prefetch to speed up the receive rate.</remarks>
void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions messageHandlerOptions);

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout);

/// <summary>
/// Completes a <see cref="Message"/> using its lock token. This will delete the message from the queue.
/// </summary>
Expand Down Expand Up @@ -115,4 +123,4 @@ public interface IReceiverClient : IClientEntity
/// </remarks>
Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ public class MessageReceiver : ClientEntity, IMessageReceiver
int prefetchCount;
long lastPeekedSequenceNumber;
MessageReceivePump receivePump;
// Cancellation token to cancel the message pump. Once this is fired, all future message handling operations registered by application will be
// cancelled.
CancellationTokenSource receivePumpCancellationTokenSource;
// Cancellation token to cancel the inflight message handling operations registered by application in the message pump.
CancellationTokenSource runningTaskCancellationTokenSource;

/// <summary>
/// Creates a new MessageReceiver from a <see cref="ServiceBusConnectionStringBuilder"/>.
Expand Down Expand Up @@ -899,6 +903,51 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.OnMessageHandler(messageHandlerOptions, handler);
}

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();

if (inflightMessageHandlerTasksWaitTimeout <= TimeSpan.Zero)
{
throw Fx.Exception.ArgumentOutOfRange(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout));
}

MessagingEventSource.Log.UnregisterMessageHandlerStart(this.ClientId);
lock (this.messageReceivePumpSyncLock)
{
if (this.receivePump == null || this.receivePumpCancellationTokenSource.IsCancellationRequested)
{
// Silently return if handler has already been unregistered.
return;
}

this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
}

Stopwatch stopWatch = Stopwatch.StartNew();
while (this.receivePump != null
&& stopWatch.Elapsed < inflightMessageHandlerTasksWaitTimeout
&& this.receivePump.maxConcurrentCallsSemaphoreSlim.CurrentCount < this.receivePump.registerHandlerOptions.MaxConcurrentCalls)
{
await Task.Delay(10).ConfigureAwait(false);
}

lock (this.messageReceivePumpSyncLock)
{
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
}
MessagingEventSource.Log.UnregisterMessageHandlerStop(this.ClientId);
}

/// <summary>
/// Registers a <see cref="ServiceBusPlugin"/> to be used with this receiver.
/// </summary>
Expand Down Expand Up @@ -1003,6 +1052,9 @@ protected override async Task OnClosingAsync()
{
this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
// For back-compatibility
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
}
}
Expand Down Expand Up @@ -1279,7 +1331,13 @@ protected virtual void OnMessageHandler(
}

this.receivePumpCancellationTokenSource = new CancellationTokenSource();
this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token);

if (this.runningTaskCancellationTokenSource == null)
{
this.runningTaskCancellationTokenSource = new CancellationTokenSource();
}

this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token, this.runningTaskCancellationTokenSource.Token);
}

try
Expand All @@ -1295,6 +1353,8 @@ protected virtual void OnMessageHandler(
{
this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
}
}
Expand Down
8 changes: 8 additions & 0 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,13 @@ public interface IQueueClient : IReceiverClient, ISenderClient
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
/// <remarks>Enable prefetch to speed up the receive rate. </remarks>
void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, SessionHandlerOptions sessionHandlerOptions);

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,13 @@ public interface ISubscriptionClient : IReceiverClient
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
/// <remarks>Enable prefetch to speed up the receive rate. </remarks>
void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, SessionHandlerOptions sessionHandlerOptions);

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,28 @@ namespace Microsoft.Azure.ServiceBus

sealed class MessageReceivePump
{
public readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
public readonly MessageHandlerOptions registerHandlerOptions;
readonly Func<Message, CancellationToken, Task> onMessageCallback;
readonly string endpoint;
readonly MessageHandlerOptions registerHandlerOptions;
readonly IMessageReceiver messageReceiver;
readonly CancellationToken pumpCancellationToken;
readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
readonly CancellationToken runningTaskCancellationToken;
readonly ServiceBusDiagnosticSource diagnosticSource;

public MessageReceivePump(IMessageReceiver messageReceiver,
MessageHandlerOptions registerHandlerOptions,
Func<Message, CancellationToken, Task> callback,
Uri endpoint,
CancellationToken pumpCancellationToken)
CancellationToken pumpCancellationToken,
CancellationToken runningTaskCancellationToken)
{
this.messageReceiver = messageReceiver ?? throw new ArgumentNullException(nameof(messageReceiver));
this.registerHandlerOptions = registerHandlerOptions;
this.onMessageCallback = callback;
this.endpoint = endpoint.Authority;
this.pumpCancellationToken = pumpCancellationToken;
this.runningTaskCancellationToken = runningTaskCancellationToken;
this.maxConcurrentCallsSemaphoreSlim = new SemaphoreSlim(this.registerHandlerOptions.MaxConcurrentCalls);
this.diagnosticSource = new ServiceBusDiagnosticSource(messageReceiver.Path, endpoint);
}
Expand Down Expand Up @@ -163,7 +166,7 @@ async Task MessageDispatchTask(Message message)
try
{
MessagingEventSource.Log.MessageReceiverPumpUserCallbackStart(this.messageReceiver.ClientId, message);
await this.onMessageCallback(message, this.pumpCancellationToken).ConfigureAwait(false);
await this.onMessageCallback(message, this.runningTaskCancellationToken).ConfigureAwait(false);

MessagingEventSource.Log.MessageReceiverPumpUserCallbackStop(this.messageReceiver.ClientId, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,42 @@ public void ManagementSerializationException(string objectName, string details =
this.WriteEvent(117, objectName, details);
}
}

[Event(118, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler start.")]
public void UnregisterMessageHandlerStart(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(118, clientId);
}
}

[Event(119, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler done.")]
public void UnregisterMessageHandlerStop(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(119, clientId);
}
}

[Event(120, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler start.")]
public void UnregisterSessionHandlerStart(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(120, clientId);
}
}

[Event(121, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler done.")]
public void UnregisterSessionHandlerStop(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(121, clientId);
}
}
}

internal static class TraceHelper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<AssemblyTitle>Azure ServiceBus SDK</AssemblyTitle>
<Description>This is the next generation Azure Service Bus .NET Standard client library that focuses on queues &amp; topics. For more information about Service Bus, see https://azure.microsoft.com/en-us/services/service-bus/</Description>
<Version>4.1.3</Version>
<Version>4.2.0</Version>
<PackageTags>Microsoft;Azure;Service Bus;ServiceBus;.NET;AMQP;IoT;Queue;Topic</PackageTags>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
Expand Down
24 changes: 24 additions & 0 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,18 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
}

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();
await this.InnerReceiver.UnregisterMessageHandlerAsync(inflightMessageHandlerTasksWaitTimeout).ConfigureAwait(false);
}

/// <summary>
/// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages.
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the queue client.
Expand Down Expand Up @@ -476,6 +488,18 @@ public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationTo
this.SessionPumpHost.OnSessionHandler(handler, sessionHandlerOptions);
}

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();
await this.SessionPumpHost.UnregisterSessionHandlerAsync(inflightSessionHandlerTasksWaitTimeout).ConfigureAwait(false);
}

/// <summary>
/// Schedules a message to appear on Service Bus at a later time.
/// </summary>
Expand Down
54 changes: 53 additions & 1 deletion sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

namespace Microsoft.Azure.ServiceBus
{
using Microsoft.Azure.ServiceBus.Primitives;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -12,6 +14,7 @@ internal sealed class SessionPumpHost
readonly object syncLock;
SessionReceivePump sessionReceivePump;
CancellationTokenSource sessionPumpCancellationTokenSource;
CancellationTokenSource runningTaskCancellationTokenSource;
readonly Uri endpoint;

public SessionPumpHost(string clientId, ReceiveMode receiveMode, ISessionClient sessionClient, Uri endpoint)
Expand All @@ -35,6 +38,9 @@ public void Close()
{
this.sessionPumpCancellationTokenSource?.Cancel();
this.sessionPumpCancellationTokenSource?.Dispose();
// For back-compatibility
this.runningTaskCancellationTokenSource?.Cancel();
this.runningTaskCancellationTokenSource?.Dispose();
this.sessionReceivePump = null;
}
}
Expand All @@ -53,14 +59,22 @@ public void OnSessionHandler(
}

this.sessionPumpCancellationTokenSource = new CancellationTokenSource();

// Running task cancellation token source can be reused if previously UnregisterSessionHandlerAsync was called
if (this.runningTaskCancellationTokenSource == null)
{
this.runningTaskCancellationTokenSource = new CancellationTokenSource();
}

this.sessionReceivePump = new SessionReceivePump(
this.ClientId,
this.SessionClient,
this.ReceiveMode,
sessionHandlerOptions,
callback,
this.endpoint,
this.sessionPumpCancellationTokenSource.Token);
this.sessionPumpCancellationTokenSource.Token,
this.runningTaskCancellationTokenSource.Token);
}

try
Expand All @@ -82,5 +96,43 @@ public void OnSessionHandler(

MessagingEventSource.Log.RegisterOnSessionHandlerStop(this.ClientId);
}

public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout)
{
if (inflightSessionHandlerTasksWaitTimeout <= TimeSpan.Zero)
{
throw Fx.Exception.ArgumentOutOfRange(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout));
}

MessagingEventSource.Log.UnregisterSessionHandlerStart(this.ClientId);
lock (this.syncLock)
{
if (this.sessionReceivePump == null || this.sessionPumpCancellationTokenSource.IsCancellationRequested)
{
// Silently return if handler has already been unregistered.
return;
}

this.sessionPumpCancellationTokenSource.Cancel();
this.sessionPumpCancellationTokenSource.Dispose();
}

Stopwatch stopWatch = Stopwatch.StartNew();
while (this.sessionReceivePump != null
&& stopWatch.Elapsed < inflightSessionHandlerTasksWaitTimeout
&& (this.sessionReceivePump.maxConcurrentSessionsSemaphoreSlim.CurrentCount <
this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentSessions
|| this.sessionReceivePump.maxPendingAcceptSessionsSemaphoreSlim.CurrentCount <
this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentAcceptSessionCalls))
{
await Task.Delay(10).ConfigureAwait(false);
}

lock (this.sessionPumpCancellationTokenSource)
{
this.sessionReceivePump = null;
}
MessagingEventSource.Log.UnregisterSessionHandlerStop(this.ClientId);
}
}
}
Loading