Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer dispatcher improvements #997

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions projects/Benchmarks/ConsumerDispatching/AsyncBasicConsumerFake.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Benchmarks
{
internal sealed class AsyncBasicConsumerFake : IAsyncBasicConsumer, IBasicConsumer
{
private readonly ManualResetEventSlim _autoResetEvent;
private int _current;

public int Count { get; set; }

public AsyncBasicConsumerFake(ManualResetEventSlim autoResetEvent)
{
_autoResetEvent = autoResetEvent;
}

public Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
{
_current = 0;
_autoResetEvent.Set();
}
return Task.CompletedTask;
}

void IBasicConsumer.HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
IBasicProperties properties, ReadOnlyMemory<byte> body)
{
if (Interlocked.Increment(ref _current) == Count)
{
_current = 0;
_autoResetEvent.Set();
}
}

public Task HandleBasicCancel(string consumerTag) => Task.CompletedTask;

public Task HandleBasicCancelOk(string consumerTag) => Task.CompletedTask;

public Task HandleBasicConsumeOk(string consumerTag) => Task.CompletedTask;

public Task HandleModelShutdown(object model, ShutdownEventArgs reason) => Task.CompletedTask;

public IModel Model { get; }

event EventHandler<ConsumerEventArgs> IBasicConsumer.ConsumerCancelled
{
add { }
remove { }
}

public event AsyncEventHandler<ConsumerEventArgs> ConsumerCancelled
{
add { }
remove { }
}

void IBasicConsumer.HandleBasicCancelOk(string consumerTag)
{
}

void IBasicConsumer.HandleBasicConsumeOk(string consumerTag)
{
}

void IBasicConsumer.HandleModelShutdown(object model, ShutdownEventArgs reason)
{
}

void IBasicConsumer.HandleBasicCancel(string consumerTag)
{
}
}
}
68 changes: 68 additions & 0 deletions projects/Benchmarks/ConsumerDispatching/ConsumerDispatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System.Threading;
using BenchmarkDotNet.Attributes;
using RabbitMQ.Client;
using RabbitMQ.Client.ConsumerDispatching;

namespace RabbitMQ.Benchmarks
{
[Config(typeof(Config))]
[BenchmarkCategory("ConsumerDispatcher")]
public class ConsumerDispatcherBase
{
protected static readonly ManualResetEventSlim _autoResetEvent = new ManualResetEventSlim(false);

private protected IConsumerDispatcher _dispatcher;
private protected readonly AsyncBasicConsumerFake _consumer = new AsyncBasicConsumerFake(_autoResetEvent);
protected readonly string _consumerTag = "ConsumerTag";
protected readonly ulong _deliveryTag = 500UL;
protected readonly string _exchange = "Exchange";
protected readonly string _routingKey = "RoutingKey";
protected readonly IBasicProperties _properties = new Client.Framing.BasicProperties();
protected readonly byte[] _body = new byte[512];
}

public class BasicDeliverConsumerDispatching : ConsumerDispatcherBase
{
[Params(1, 30)]
public int Count { get; set; }

[Params(1, 2)]
public int Concurrency { get; set; }

[GlobalSetup(Target = nameof(AsyncConsumerDispatcher))]
public void SetUpAsyncConsumer()
{
_consumer.Count = Count;
_dispatcher = new AsyncConsumerDispatcher(null, Concurrency);
_dispatcher.HandleBasicConsumeOk(_consumer, _consumerTag);
}
[Benchmark]
public void AsyncConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
}

[GlobalSetup(Target = nameof(ConsumerDispatcher))]
public void SetUpConsumer()
{
_consumer.Count = Count;
_dispatcher = new ConsumerDispatcher(null, Concurrency);
_dispatcher.HandleBasicConsumeOk(_consumer, _consumerTag);
}
[Benchmark]
public void ConsumerDispatcher()
{
for (int i = 0; i < Count; i++)
{
_dispatcher.HandleBasicDeliver(_consumerTag, _deliveryTag, false, _exchange, _routingKey, _properties, _body, _body);
}
_autoResetEvent.Wait();
_autoResetEvent.Reset();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ namespace RabbitMQ.Client.Events
public abstract class BaseExceptionEventArgs : EventArgs
{
///<summary>Wrap an exception thrown by a callback.</summary>
public BaseExceptionEventArgs(Exception exception)
protected BaseExceptionEventArgs(IDictionary<string, object> detail, Exception exception)
{
Detail = new Dictionary<string, object>();
Detail = detail;
Exception = exception;
}

Expand All @@ -49,15 +49,6 @@ public BaseExceptionEventArgs(Exception exception)

///<summary>Access the wrapped exception.</summary>
public Exception Exception { get; }

public IDictionary<string, object> UpdateDetails(IDictionary<string, object> other)
{
foreach (KeyValuePair<string, object> pair in other)
{
Detail[pair.Key] = pair.Value;
}
return Detail;
}
}


Expand All @@ -83,26 +74,31 @@ public IDictionary<string, object> UpdateDetails(IDictionary<string, object> oth
///</remarks>
public class CallbackExceptionEventArgs : BaseExceptionEventArgs
{
public CallbackExceptionEventArgs(Exception e) : base(e)
private const string ContextString = "context";
private const string ConsumerString = "consumer";

public CallbackExceptionEventArgs(IDictionary<string, object> detail, Exception exception)
: base(detail, exception)
{
}

public static CallbackExceptionEventArgs Build(Exception e,
string context)
public static CallbackExceptionEventArgs Build(Exception e, string context)
{
var details = new Dictionary<string, object>
var details = new Dictionary<string, object>(1)
{
{"context", context}
{ContextString, context}
};
return Build(e, details);
return new CallbackExceptionEventArgs(details, e);
}

public static CallbackExceptionEventArgs Build(Exception e,
IDictionary<string, object> details)
public static CallbackExceptionEventArgs Build(Exception e, string context, object consumer)
{
var exnArgs = new CallbackExceptionEventArgs(e);
exnArgs.UpdateDetails(details);
return exnArgs;
var details = new Dictionary<string, object>(2)
{
{ContextString, context},
{ConsumerString, consumer}
};
return new CallbackExceptionEventArgs(details, e);
}
}
}
10 changes: 2 additions & 8 deletions projects/RabbitMQ.Client/client/framing/Model.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@ namespace RabbitMQ.Client.Framing.Impl
{
internal class Model: ModelBase
{
public Model(ISession session)
: base(session)
{
}

public Model(ISession session, ConsumerWorkService workService)
: base(session, workService)
public Model(bool dispatchAsync, int concurrency, ISession session) : base(dispatchAsync, concurrency, session)
{
}

Expand Down Expand Up @@ -312,7 +306,7 @@ protected override bool DispatchAsynchronous(in IncomingCommand cmd)
case ProtocolCommandId.BasicCancel:
{
var __impl = (BasicCancel)cmd.Method;
HandleBasicCancel(__impl._consumerTag, __impl._nowait);
HandleBasicCancel(__impl._consumerTag);
return true;
}
case ProtocolCommandId.BasicCancelOk:
Expand Down
85 changes: 0 additions & 85 deletions projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs

This file was deleted.

Loading