Skip to content

Commit

Permalink
Merge pull request #1681 from danielmarbach/reset-event
Browse files Browse the repository at this point in the history
Async flow control
  • Loading branch information
lukebakken authored Sep 18, 2024
2 parents fd36d23 + a15c1f8 commit 317945c
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 51 deletions.
3 changes: 2 additions & 1 deletion projects/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<PackageVersion Include="System.Threading.Channels" Version="6.0.0" />
<PackageVersion Include="System.Text.Json" Version="6.0.0" />
<PackageVersion Include="System.Net.Http.Json" Version="6.0.0" />
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
</ItemGroup>
<ItemGroup Condition="$(TargetFramework)=='net472'">
<PackageVersion Include="System.Text.Json" Version="6.0.0" />
Expand All @@ -46,4 +47,4 @@
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<GlobalPackageReference Include="MinVer" Version="5.0.0" />
</ItemGroup>
</Project>
</Project>
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,6 @@
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
<PackageReference Include="System.Memory" />
<PackageReference Include="System.Threading.Channels" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" />
</ItemGroup>
</Project>
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/client/framing/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ public override ValueTask BasicAckAsync(ulong deliveryTag, bool multiple,
CancellationToken cancellationToken)
{
var method = new BasicAck(deliveryTag, multiple);
return ModelSendAsync(method, cancellationToken);
return ModelSendAsync(in method, cancellationToken);
}

public override ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue,
CancellationToken cancellationToken)
{
var method = new BasicNack(deliveryTag, multiple, requeue);
return ModelSendAsync(method, cancellationToken);
return ModelSendAsync(in method, cancellationToken);
}

public override ValueTask BasicRejectAsync(ulong deliveryTag, bool requeue,
CancellationToken cancellationToken)
{
var method = new BasicReject(deliveryTag, requeue);
return ModelSendAsync(method, cancellationToken);
return ModelSendAsync(in method, cancellationToken);
}

/// <summary>
Expand Down
155 changes: 155 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AsyncManualResetEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace RabbitMQ.Client.client.impl
{
sealed class AsyncManualResetEvent : IValueTaskSource
{
private ManualResetValueTaskSourceCore<bool> _valueTaskSource;
private bool _isSet;

public AsyncManualResetEvent(bool initialState = false)
{
_isSet = initialState;
_valueTaskSource.Reset();
if (initialState)
{
_valueTaskSource.SetResult(true);
}
}

public bool IsSet => Volatile.Read(ref _isSet);

public async ValueTask WaitAsync(CancellationToken cancellationToken)
{
if (IsSet)
{
return;
}

cancellationToken.ThrowIfCancellationRequested();

CancellationTokenRegistration tokenRegistration =
#if NET6_0_OR_GREATER
cancellationToken.UnsafeRegister(
static state =>
{
var (source, token) = ((ManualResetValueTaskSourceCore<bool>, CancellationToken))state!;
source.SetException(new OperationCanceledException(token));
}, (_valueTaskSource, cancellationToken));
#else
cancellationToken.Register(
static state =>
{
var (source, token) = ((ManualResetValueTaskSourceCore<bool>, CancellationToken))state!;
source.SetException(new OperationCanceledException(token));
},
state: (_valueTaskSource, cancellationToken), useSynchronizationContext: false);
#endif
try
{
await new ValueTask(this, _valueTaskSource.Version)
.ConfigureAwait(false);
}
finally
{
#if NET6_0_OR_GREATER
await tokenRegistration.DisposeAsync()
.ConfigureAwait(false);
#else
tokenRegistration.Dispose();
#endif
}
}

public void Set()
{
if (IsSet)
{
return;
}

Volatile.Write(ref _isSet, true);
_valueTaskSource.SetResult(true);
}

public void Reset()
{
if (!IsSet)
{
return;
}

Volatile.Write(ref _isSet, false);
_valueTaskSource.Reset();
}

void IValueTaskSource.GetResult(short token)
{
if (token != _valueTaskSource.Version)
{
ThrowIncorrectTokenException();
}

_valueTaskSource.GetResult(token);
}

ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
if (token != _valueTaskSource.Version)
{
ThrowIncorrectTokenException();
}

return _valueTaskSource.GetStatus(token);
}

void IValueTaskSource.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags)
{
if (token != _valueTaskSource.Version)
{
ThrowIncorrectTokenException();
}

_valueTaskSource.OnCompleted(continuation, state, token, flags);
}

[DoesNotReturn]
static void ThrowIncorrectTokenException() =>
throw new InvalidOperationException("ValueTask cannot be awaited multiple times.");
}
}
Loading

0 comments on commit 317945c

Please sign in to comment.