Skip to content

Commit

Permalink
Merge pull request #1311 from stebet/asyncconnectandpublish
Browse files Browse the repository at this point in the history
Adding fully asynchronous versions of connect and publish.
  • Loading branch information
lukebakken authored May 2, 2023
2 parents e909e1f + ab93546 commit 5b64ca9
Show file tree
Hide file tree
Showing 24 changed files with 426 additions and 134 deletions.
12 changes: 12 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ csharp_style_inlined_variable_declaration = true:suggestion
csharp_style_throw_expression = true:suggestion
csharp_style_conditional_delegate_call = true:suggestion

# Async methods should have "Async" suffix
dotnet_naming_rule.async_methods_end_in_async.symbols = any_async_methods
dotnet_naming_rule.async_methods_end_in_async.style = end_in_async
dotnet_naming_rule.async_methods_end_in_async.severity = warning

dotnet_naming_symbols.any_async_methods.applicable_kinds = method
dotnet_naming_symbols.any_async_methods.applicable_accessibilities = *
dotnet_naming_symbols.any_async_methods.required_modifiers = async

dotnet_naming_style.end_in_async.required_suffix = Async
dotnet_naming_style.end_in_async.capitalization = pascal_case_style

# Other features
csharp_style_prefer_index_operator = false:none
csharp_style_prefer_range_operator = false:none
Expand Down
9 changes: 8 additions & 1 deletion projects/Benchmarks/Benchmarks.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<PropertyGroup Condition="$([MSBuild]::IsOSPlatform('Windows'))">
<TargetFrameworks>net6.0;net472</TargetFrameworks>
</PropertyGroup>

<PropertyGroup Condition="!$([MSBuild]::IsOSPlatform('Windows'))">
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<AssemblyOriginatorKeyFile>../rabbit.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
Expand Down
20 changes: 20 additions & 0 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,26 @@ void BasicPublish<TProperties>(string exchange, string routingKey, in TPropertie
/// </remarks>
void BasicPublish<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
/// <summary>
/// Asynchronously publishes a message.
/// </summary>
/// <remarks>
/// <para>
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
/// <summary>
/// Asynchronously publishes a message.
/// </summary>
/// <remarks>
/// <para>
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;
#nullable disable

/// <summary>
Expand Down
7 changes: 7 additions & 0 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client.client.impl;

namespace RabbitMQ.Client
Expand Down Expand Up @@ -93,6 +94,12 @@ public static void BasicPublish(this IChannel channel, string exchange, string r

public static void BasicPublish(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublish(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);

public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);

public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
#nullable disable

/// <summary>
Expand Down
6 changes: 6 additions & 0 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
//---------------------------------------------------------------------------

using System.Collections.Generic;
using System.Threading.Tasks;
using RabbitMQ.Client.client.framing;
using RabbitMQ.Client.Impl;

Expand Down Expand Up @@ -109,6 +110,11 @@ public override void _Private_ConnectionOpen(string virtualHost)
ChannelSend(new ConnectionOpen(virtualHost));
}

public override ValueTask _Private_ConnectionOpenAsync(string virtualHost)
{
return ModelSendAsync(new ConnectionOpen(virtualHost));
}

public override void _Private_ConnectionSecureOk(byte[] response)
{
ChannelSend(new ConnectionSecureOk(response));
Expand Down
8 changes: 8 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,14 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublish(exchange, routingKey, in basicProperties, body, mandatory);

public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);

public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);

public void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
{
ThrowIfDisposed();
Expand Down
141 changes: 86 additions & 55 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ namespace RabbitMQ.Client.Impl
internal abstract class ChannelBase : IChannel, IRecoverable
{
///<summary>Only used to kick-start a connection open
///sequence. See <see cref="Connection.Open"/> </summary>
internal BlockingCell<ConnectionStartDetails> m_connectionStartCell;
///sequence. See <see cref="Connection.OpenAsync"/> </summary>
internal TaskCompletionSource<ConnectionStartDetails> m_connectionStartCell;

// AMQP only allows one RPC operation to be active at a time.
private readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1);
private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue();
private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim(true);

Expand Down Expand Up @@ -239,32 +241,18 @@ private async Task CloseAsync(ShutdownEventArgs reason, bool abort)
}
}

internal void ConnectionOpen(string virtualHost)
internal async ValueTask ConnectionOpenAsync(string virtualHost)
{
var k = new SimpleBlockingRpcContinuation();
lock (_rpcLock)
{
Enqueue(k);
try
{
_Private_ConnectionOpen(virtualHost);
}
catch (AlreadyClosedException)
{
// let continuation throw OperationInterruptedException,
// which is a much more suitable exception before connection
// negotiation finishes
}
k.GetReply(HandshakeContinuationTimeout);
}
await _Private_ConnectionOpenAsync(virtualHost).TimeoutAfter(HandshakeContinuationTimeout);
}

internal ConnectionSecureOrTune ConnectionSecureOk(byte[] response)
internal async ValueTask<ConnectionSecureOrTune> ConnectionSecureOkAsync(byte[] response)
{
var k = new ConnectionStartRpcContinuation();
lock (_rpcLock)
var k = new ConnectionSecureOrTuneContinuation();
await _rpcSemaphore.WaitAsync().ConfigureAwait(false);
Enqueue(k);
try
{
Enqueue(k);
try
{
_Private_ConnectionSecureOk(response);
Expand All @@ -275,31 +263,40 @@ internal ConnectionSecureOrTune ConnectionSecureOk(byte[] response)
// which is a much more suitable exception before connection
// negotiation finishes
}
k.GetReply(HandshakeContinuationTimeout);

return await k;
}
finally
{
_rpcSemaphore.Release();
}
return k.m_result;
}

internal ConnectionSecureOrTune ConnectionStartOk(IDictionary<string, object> clientProperties, string mechanism, byte[] response, string locale)
internal async ValueTask<ConnectionSecureOrTune> ConnectionStartOkAsync(IDictionary<string, object> clientProperties, string mechanism, byte[] response,
string locale)
{
var k = new ConnectionStartRpcContinuation();
lock (_rpcLock)
var k = new ConnectionSecureOrTuneContinuation();
await _rpcSemaphore.WaitAsync().ConfigureAwait(false);
Enqueue(k);
try
{
Enqueue(k);
try
{
_Private_ConnectionStartOk(clientProperties, mechanism,
response, locale);
_Private_ConnectionStartOk(clientProperties, mechanism, response, locale);
}
catch (AlreadyClosedException)
{
// let continuation throw OperationInterruptedException,
// which is a much more suitable exception before connection
// negotiation finishes
}
k.GetReply(HandshakeContinuationTimeout);

return await k;
}
finally
{
_rpcSemaphore.Release();
}
return k.m_result;
}

protected abstract bool DispatchAsynchronous(in IncomingCommand cmd);
Expand All @@ -324,7 +321,7 @@ internal void FinishClose()
Session.Close(reason);
}

m_connectionStartCell?.ContinueWithValue(null);
m_connectionStartCell?.TrySetResult(null);
}

private void HandleCommand(in IncomingCommand cmd)
Expand Down Expand Up @@ -385,6 +382,12 @@ protected void ChannelSend<T>(in T method) where T : struct, IOutgoingAmqpMethod
Session.Transmit(in method);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected ValueTask ModelSendAsync<T>(in T method) where T : struct, IOutgoingAmqpMethod
{
return Session.TransmitAsync(in method);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body)
where TMethod : struct, IOutgoingAmqpMethod
Expand All @@ -397,6 +400,19 @@ protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader heade
Session.Transmit(in method, in header, body);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body)
where TMethod : struct, IOutgoingAmqpMethod
where THeader : IAmqpHeader
{
if (!_flowControlBlock.IsSet)
{
_flowControlBlock.Wait();
}

return Session.TransmitAsync(in method, in header, body);
}

internal void OnCallbackException(CallbackExceptionEventArgs args)
{
_callbackExceptionWrapper.Invoke(this, args);
Expand Down Expand Up @@ -730,13 +746,7 @@ protected void HandleConnectionClose(in IncomingCommand cmd)

protected void HandleConnectionSecure(in IncomingCommand cmd)
{
var challenge = new ConnectionSecure(cmd.MethodBytes.Span)._challenge;
cmd.ReturnMethodBuffer();
var k = (ConnectionStartRpcContinuation)_continuationQueue.Next();
k.m_result = new ConnectionSecureOrTune
{
m_challenge = challenge
};
var k = (ConnectionSecureOrTuneContinuation)_continuationQueue.Next();
k.HandleCommand(IncomingCommand.Empty); // release the continuation.
}

Expand All @@ -758,25 +768,14 @@ protected void HandleConnectionStart(in IncomingCommand cmd)
m_mechanisms = method._mechanisms,
m_locales = method._locales
};
m_connectionStartCell.ContinueWithValue(details);
m_connectionStartCell?.SetResult(details);
m_connectionStartCell = null;
}

protected void HandleConnectionTune(in IncomingCommand cmd)
{
var connectionTune = new ConnectionTune(cmd.MethodBytes.Span);
cmd.ReturnMethodBuffer();
var k = (ConnectionStartRpcContinuation)_continuationQueue.Next();
k.m_result = new ConnectionSecureOrTune
{
m_tuneDetails =
{
m_channelMax = connectionTune._channelMax,
m_frameMax = connectionTune._frameMax,
m_heartbeatInSeconds = connectionTune._heartbeat
}
};
k.HandleCommand(IncomingCommand.Empty); // release the continuation.
var k = (ConnectionSecureOrTuneContinuation)_continuationQueue.Next();
k.HandleCommand(cmd); // release the continuation.
}

protected void HandleConnectionUnblocked()
Expand Down Expand Up @@ -815,6 +814,8 @@ protected void HandleQueueDeclareOk(in IncomingCommand cmd)

public abstract void _Private_ConnectionOpen(string virtualHost);

public abstract ValueTask _Private_ConnectionOpenAsync(string virtualHost);

public abstract void _Private_ConnectionSecureOk(byte[] response);

public abstract void _Private_ConnectionStartOk(IDictionary<string, object> clientProperties, string mechanism, byte[] response, string locale);
Expand Down Expand Up @@ -930,6 +931,36 @@ public void BasicPublish<TProperties>(CachedString exchange, CachedString routin
ChannelSend(in cmd, in basicProperties, body);
}

public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (NextPublishSeqNo > 0)
{
lock (_confirmLock)
{
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
}
}

var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
return ModelSendAsync(in cmd, in basicProperties, body);
}

public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
{
if (NextPublishSeqNo > 0)
{
lock (_confirmLock)
{
_pendingDeliveryTags.AddLast(NextPublishSeqNo++);
}
}

var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
return ModelSendAsync(in cmd, in basicProperties, body);
}

public void UpdateSecret(string newSecret, string reason)
{
if (newSecret is null)
Expand Down
Loading

0 comments on commit 5b64ca9

Please sign in to comment.