Skip to content

Commit

Permalink
Merge pull request #1683 from danielmarbach/event-args-cancellation
Browse files Browse the repository at this point in the history
Event args cancellation
  • Loading branch information
lukebakken authored Sep 19, 2024
2 parents 317945c + 66f0e41 commit 9dab7bc
Show file tree
Hide file tree
Showing 53 changed files with 433 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
_autoResetEvent = autoResetEvent;
}

public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
{
if (Interlocked.Increment(ref _current) == Count)
{
Expand All @@ -28,11 +29,11 @@ public Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool
return Task.CompletedTask;
}

public Task HandleBasicCancelAsync(string consumerTag) => Task.CompletedTask;
public Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task HandleBasicCancelOkAsync(string consumerTag) => Task.CompletedTask;
public Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task HandleBasicConsumeOkAsync(string consumerTag) => Task.CompletedTask;
public Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default) => Task.CompletedTask;

public Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason) => Task.CompletedTask;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ public CountingConsumer(IChannel channel, uint messageCount) : base(channel)
}

/// <inheritdoc />
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
{
if (Interlocked.Decrement(ref _remainingCount) == 0)
{
Expand Down
79 changes: 42 additions & 37 deletions projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Large diffs are not rendered by default.

27 changes: 17 additions & 10 deletions projects/RabbitMQ.Client/client/api/AsyncDefaultBasicConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.Client
Expand Down Expand Up @@ -45,33 +46,36 @@ public string[] ConsumerTags
/// Retrieve the <see cref="IChannel"/> this consumer is associated with,
/// for use in acknowledging received messages, for instance.
/// </summary>
public IChannel Channel { get; private set; }
public IChannel Channel { get; }

/// <summary>
/// Called when the consumer is cancelled for reasons other than by a basicCancel:
/// e.g. the queue has been deleted (either by this channel or by any other channel).
/// See <see cref="HandleBasicCancelOkAsync"/> for notification of consumer cancellation due to basicCancel
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicCancelAsync(string consumerTag)
/// <param name="cancellationToken">The cancellation token.</param>
public virtual Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default)
{
return OnCancel(consumerTag);
return OnCancelAsync(new[] { consumerTag }, cancellationToken);
}

/// <summary>
/// Called upon successful deregistration of the consumer from the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicCancelOkAsync(string consumerTag)
/// <param name="cancellationToken">The cancellation token.</param>
public virtual Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default)
{
return OnCancel(consumerTag);
return OnCancelAsync(new[] { consumerTag }, cancellationToken);
}

/// <summary>
/// Called upon successful registration of the consumer with the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
public virtual Task HandleBasicConsumeOkAsync(string consumerTag)
/// <param name="cancellationToken">The cancellation token.</param>
public virtual Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default)
{
_consumerTags.Add(consumerTag);
IsRunning = true;
Expand All @@ -94,7 +98,8 @@ public virtual Task HandleBasicDeliverAsync(string consumerTag,
string exchange,
string routingKey,
IReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body)
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
{
// Nothing to do here.
return Task.CompletedTask;
Expand All @@ -108,18 +113,20 @@ public virtual Task HandleBasicDeliverAsync(string consumerTag,
public virtual Task HandleChannelShutdownAsync(object channel, ShutdownEventArgs reason)
{
ShutdownReason = reason;
return OnCancel(_consumerTags.ToArray());
return OnCancelAsync(ConsumerTags, reason.CancellationToken);
}

/// <summary>
/// Default implementation - overridable in subclasses.</summary>
/// <param name="consumerTags">The set of consumer tags that where cancelled</param>
/// <param name="consumerTags">The set of consumer tags that were cancelled</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <remarks>
/// This default implementation simply sets the <see cref="IsRunning"/> property to false, and takes no further action.
/// </remarks>
public virtual Task OnCancel(params string[] consumerTags)
protected virtual Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default)
{
IsRunning = false;

foreach (string consumerTag in consumerTags)
{
_consumerTags.Remove(consumerTag);
Expand Down
15 changes: 10 additions & 5 deletions projects/RabbitMQ.Client/client/api/IAsyncBasicConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQ.Client
Expand All @@ -20,19 +21,22 @@ public interface IAsyncBasicConsumer
/// See <see cref="HandleBasicCancelOkAsync"/> for notification of consumer cancellation due to basicCancel
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicCancelAsync(string consumerTag);
/// <param name="cancellationToken">The cancellation token.</param>
Task HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken = default);

/// <summary>
/// Called upon successful deregistration of the consumer from the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicCancelOkAsync(string consumerTag);
/// <param name="cancellationToken">The cancellation token.</param>
Task HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken = default);

/// <summary>
/// Called upon successful registration of the consumer with the broker.
/// </summary>
/// <param name="consumerTag">Consumer tag this consumer is registered.</param>
Task HandleBasicConsumeOkAsync(string consumerTag);
/// <param name="cancellationToken">The cancellation token.</param>
Task HandleBasicConsumeOkAsync(string consumerTag, CancellationToken cancellationToken = default);

/// <summary>
/// Called each time a message arrives for this consumer.
Expand All @@ -44,7 +48,7 @@ public interface IAsyncBasicConsumer
/// </para>
/// <para>
/// NOTE: Using the <c>body</c> outside of
/// <c><seealso cref="IAsyncBasicConsumer.HandleBasicDeliverAsync(string, ulong, bool, string, string, IReadOnlyBasicProperties, ReadOnlyMemory{byte})"/></c>
/// <c><seealso cref="IAsyncBasicConsumer.HandleBasicDeliverAsync(string, ulong, bool, string, string, IReadOnlyBasicProperties, ReadOnlyMemory{byte}, CancellationToken)"/></c>
/// requires that it be copied!
/// </para>
/// </remarks>
Expand All @@ -55,7 +59,8 @@ Task HandleBasicDeliverAsync(string consumerTag,
string exchange,
string routingKey,
IReadOnlyBasicProperties properties,
ReadOnlyMemory<byte> body);
ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default);

/// <summary>
/// Called when the channel shuts down.
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <remarks>
/// This event will never fire for connections that disable automatic recovery.
/// </remarks>
event AsyncEventHandler<EventArgs> RecoverySucceededAsync;
event AsyncEventHandler<AsyncEventArgs> RecoverySucceededAsync;

/// <summary>
/// Raised when the connection recovery fails, e.g. because reconnection or topology
Expand Down Expand Up @@ -212,7 +212,7 @@ public interface IConnection : INetworkConnection, IDisposable
/// <summary>
/// Raised when a connection is unblocked by the AMQP broker.
/// </summary>
event AsyncEventHandler<EventArgs> ConnectionUnblockedAsync;
event AsyncEventHandler<AsyncEventArgs> ConnectionUnblockedAsync;

/// <summary>
/// This method updates the secret used to authenticate this connection.
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/api/IRecoverable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ namespace RabbitMQ.Client
/// </summary>
public interface IRecoverable
{
event AsyncEventHandler<EventArgs> RecoveryAsync;
event AsyncEventHandler<AsyncEventArgs> RecoveryAsync;
}
}
16 changes: 10 additions & 6 deletions projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
//---------------------------------------------------------------------------

using System;
using System.Threading;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
{
Expand All @@ -39,7 +41,8 @@ namespace RabbitMQ.Client
/// <remarks>
/// The <see cref="ClassId"/> and <see cref="Initiator"/> properties should be used to determine the originator of the shutdown event.
/// </remarks>
public class ShutdownEventArgs : EventArgs
/// TODO: Should this be moved to the events folder and the namespace be adjusted?
public class ShutdownEventArgs : AsyncEventArgs
{
private readonly Exception? _exception;

Expand All @@ -48,16 +51,17 @@ public class ShutdownEventArgs : EventArgs
/// 0 for <see cref="ClassId"/> and <see cref="MethodId"/>.
/// </summary>
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
object? cause = null)
: this(initiator, replyCode, replyText, 0, 0, cause)
object? cause = null, CancellationToken cancellationToken = default)
: this(initiator, replyCode, replyText, 0, 0, cause, cancellationToken: cancellationToken)
{
}

/// <summary>
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters.
/// </summary>
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
ushort classId, ushort methodId, object? cause = null)
ushort classId, ushort methodId, object? cause = null, CancellationToken cancellationToken = default)
: base(cancellationToken)
{
Initiator = initiator;
ReplyCode = replyCode;
Expand All @@ -70,8 +74,8 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r
/// <summary>
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters.
/// </summary>
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception)
: this(initiator, replyCode, replyText, 0, 0)
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception, CancellationToken cancellationToken = default)
: this(initiator, replyCode, replyText, 0, 0, cancellationToken: cancellationToken)
{
_exception = exception ?? throw new ArgumentNullException(nameof(exception));
}
Expand Down
86 changes: 86 additions & 0 deletions projects/RabbitMQ.Client/client/events/AsyncEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System.Threading;

namespace RabbitMQ.Client.Events
{
/// <summary>
/// Provides data for <see cref="AsyncEventHandler{T}"/>
/// events that can be invoked asynchronously.
/// </summary>
public class AsyncEventArgs
{
/// <summary>
/// Initializes a new instance of the <see cref="AsyncEventArgs"/>
/// class.
/// </summary>
/// <param name="cancellationToken">
/// A cancellation token related to the original operation that raised
/// the event. It's important for your handler to pass this token
/// along to any asynchronous or long-running synchronous operations
/// that take a token so cancellation will correctly propagate. The
/// default value is <see cref="CancellationToken.None"/>.
/// </param>
public AsyncEventArgs(CancellationToken cancellationToken = default)
: base()
{
CancellationToken = cancellationToken;
}

/// <summary>
/// Gets a cancellation token related to the original operation that
/// raised the event. It's important for your handler to pass this
/// token along to any asynchronous or long-running synchronous
/// operations that take a token so cancellation (via something like
/// <code>
/// new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token
/// </code>
/// for example) will correctly propagate.
/// </summary>
public CancellationToken CancellationToken { get; }

public static AsyncEventArgs CreateOrDefault(CancellationToken cancellationToken)
{
if (cancellationToken.CanBeCanceled)
{
return new AsyncEventArgs(cancellationToken);
}

return Empty;
}

/// <summary>
/// Provides a value to use with events that do not have event data.
/// </summary>
public static readonly AsyncEventArgs Empty = new AsyncEventArgs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@

namespace RabbitMQ.Client.Events
{
public delegate Task AsyncEventHandler<in TEvent>(object sender, TEvent @event);
public delegate Task AsyncEventHandler<in TEvent>(object sender, TEvent @event) where TEvent : AsyncEventArgs;
}
Loading

0 comments on commit 9dab7bc

Please sign in to comment.