Skip to content

Commit 56daefa

Browse files
committed
Migrated IAsyncCollection and IBatchAsyncCollection to ValueTask
1 parent 449ccde commit 56daefa

17 files changed

+56
-43
lines changed

AsyncCollections.Benchmark/BlockingCollectionAdapter.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ public void Add( T item )
2929
_collection.Add( item );
3030
}
3131

32-
public Task<T> TakeAsync( System.Threading.CancellationToken cancellationToken )
32+
public ValueTask<T> TakeAsync( System.Threading.CancellationToken cancellationToken )
3333
{
3434
T item = _collection.Take( cancellationToken );
35-
return Task.FromResult( item );
35+
return new ValueTask<T>( item );
3636
}
3737

3838
#endregion

AsyncCollections.Benchmark/NitoAsyncCollectionAdapter.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ public void Add( T item )
2828
_collection.Add( item );
2929
}
3030

31-
public Task<T> TakeAsync( System.Threading.CancellationToken cancellationToken )
31+
public ValueTask<T> TakeAsync( System.Threading.CancellationToken cancellationToken )
3232
{
33-
return _collection.TakeAsync( cancellationToken );
33+
return new ValueTask<T>( _collection.TakeAsync( cancellationToken ) );
3434
}
3535

3636
#endregion

AsyncCollections.Benchmark/TplBatchBlockAdapter.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ public TplBatchBlockAdapter( int batchSize )
2323
public int Count => _batchBlock.OutputCount;
2424

2525
public void Add( T item ) => _batchBlock.Post( item );
26-
public async Task<IReadOnlyList<T>> TakeAsync( CancellationToken cancellationToken ) => await _batchBlock.ReceiveAsync( cancellationToken );
26+
public ValueTask<IReadOnlyList<T>> TakeAsync( CancellationToken cancellationToken ) => new ValueTask<IReadOnlyList<T>>( RecieveAsync( cancellationToken ) );
27+
private async Task<IReadOnlyList<T>> RecieveAsync( CancellationToken cancellationToken ) => await _batchBlock.ReceiveAsync( cancellationToken ).ConfigureAwait( false );
2728
public void Flush() => _batchBlock.TriggerBatch();
2829

2930
public IEnumerator<IReadOnlyList<T>> GetEnumerator()

AsyncCollections.Benchmark/TplDataflowAdapter.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ public void Add( T item )
2121
buffer.Post( item );
2222
}
2323

24-
public Task<T> TakeAsync( CancellationToken cancellationToken )
24+
public ValueTask<T> TakeAsync( CancellationToken cancellationToken )
2525
{
26-
return buffer.ReceiveAsync( cancellationToken );
26+
return new ValueTask<T>( buffer.ReceiveAsync( cancellationToken ) );
2727
}
2828

2929
public IEnumerator<T> GetEnumerator()

AsyncCollections.Test/AsyncCollectionTest.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void TakeWithCanceledTokenReturnsCanceledTask()
4949
cancelSource.Cancel();
5050
Collection.Add( 42 );
5151

52-
Task<int> itemTask = Collection.TakeAsync( cancelSource.Token );
52+
ValueTask<int> itemTask = Collection.TakeAsync( cancelSource.Token );
5353
itemTask.IsCanceled.Should().BeTrue( "The task should have been canceled." );
5454
}
5555

@@ -60,7 +60,7 @@ public void CancelledTakeCancelsTask()
6060
var itemTask = Collection.TakeAsync( cancelSource.Token );
6161
cancelSource.Cancel();
6262

63-
Func<Task> asyncAct = () => itemTask;
63+
Func<Task> asyncAct = async () => await itemTask;
6464
asyncAct.ShouldThrow<TaskCanceledException>();
6565

6666
Collection.Add( 42 );

AsyncCollections.Test/AsyncQueueTest.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void CountsAreCorrectIfTailsMatch()
8888
public async Task EverythingWorksIfSegmentIsFilledByOneKindOfItems( Order insertionOrder )
8989
{
9090
int[] items = Enumerable.Range( 0, _itemsToOverflowSegment ).ToArray();
91-
Task<int>[] tasks = null;
91+
ValueTask<int>[] tasks = null;
9292

9393
switch ( insertionOrder )
9494
{
@@ -104,7 +104,7 @@ public async Task EverythingWorksIfSegmentIsFilledByOneKindOfItems( Order insert
104104
}
105105

106106
tasks.Should().OnlyContain( t => t.IsCompleted );
107-
int[] values = await Task.WhenAll( tasks ).ConfigureAwait( true );
107+
int[] values = await Task.WhenAll( tasks.Select( t => t.AsTask() ) ).ConfigureAwait( true );
108108
values.Should().BeEquivalentTo( items ).And.BeInAscendingOrder();
109109
}
110110

@@ -132,7 +132,7 @@ public void EnumeratorDoesNotReturnItemsThatHaveBeenRemovedBetweenMoveNextCalls(
132132
}
133133
}
134134

135-
private Task<int>[] InsertAwaiters( int awaiterCount ) => Enumerable.Repeat( 0, awaiterCount ).Select( _ => Collection.TakeAsync() ).ToArray();
135+
private ValueTask<int>[] InsertAwaiters( int awaiterCount ) => Enumerable.Repeat( 0, awaiterCount ).Select( _ => Collection.TakeAsync() ).ToArray();
136136

137137
private void InsertItems( params int[] items )
138138
{

AsyncCollections/AsyncBatchQueue.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ public void Add( T item )
5454
/// <summary>
5555
/// Removes and returns a batch from the collection in an asynchronous manner.
5656
/// </summary>
57-
public Task<IReadOnlyList<T>> TakeAsync() => TakeAsync( CancellationToken.None );
57+
public ValueTask<IReadOnlyList<T>> TakeAsync() => TakeAsync( CancellationToken.None );
5858

5959
/// <summary>
6060
/// Removes and returns a batch from the collection in an asynchronous manner.
6161
/// </summary>
62-
public Task<IReadOnlyList<T>> TakeAsync( CancellationToken cancellationToken ) => _batchQueue.TakeAsync( cancellationToken );
62+
public ValueTask<IReadOnlyList<T>> TakeAsync( CancellationToken cancellationToken ) => _batchQueue.TakeAsync( cancellationToken );
6363

6464
/// <summary>
6565
/// <para>Forces a new batch to be created and made available for consuming even if amount of the pending items has not reached <see cref="BatchSize"/> yet.</para>

AsyncCollections/AsyncBoundedPriorityQueue.cs

+10-4
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,21 @@ public void Add( T item, int priority )
8080
/// <summary>
8181
/// Removes and returns an item with the highest priority from the collection in an asynchronous manner.
8282
/// </summary>
83-
public Task<PrioritizedItem<T>> TakeAsync() => TakeAsync( CancellationToken.None );
83+
public ValueTask<PrioritizedItem<T>> TakeAsync() => TakeAsync( CancellationToken.None );
8484

8585
/// <summary>
8686
/// Removes and returns an item with the highest priority from the collection in an asynchronous manner.
8787
/// </summary>
88-
async Task<T> IAsyncCollection<T>.TakeAsync( System.Threading.CancellationToken cancellationToken )
88+
ValueTask<T> IAsyncCollection<T>.TakeAsync( System.Threading.CancellationToken cancellationToken )
8989
{
90-
PrioritizedItem<T> prioritizedItem = await base.TakeAsync( cancellationToken ).ConfigureAwait( false );
91-
return prioritizedItem.Item;
90+
ValueTask<PrioritizedItem<T>> prioritizedItemTask = TakeAsync( cancellationToken );
91+
return prioritizedItemTask.IsCompletedSuccessfully ? new ValueTask<T>( prioritizedItemTask.Result.Item ) : new ValueTask<T>( UnwrapAsync( prioritizedItemTask ) );
92+
}
93+
94+
private async Task<T> UnwrapAsync( ValueTask<PrioritizedItem<T>> prioritizedItemTask )
95+
{
96+
PrioritizedItem<T> result = await prioritizedItemTask.ConfigureAwait( false );
97+
return result.Item;
9298
}
9399

94100
IEnumerator<T> IEnumerable<T>.GetEnumerator() => ( this as IEnumerable<PrioritizedItem<T>> ).Select( prioritizedItem => prioritizedItem.Item ).GetEnumerator();

AsyncCollections/AsyncCollection.cs

+5-5
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ private bool TryAdd( T item )
8585
/// <summary>
8686
/// Removes and returns an item from the collection in an asynchronous manner.
8787
/// </summary>
88-
public Task<T> TakeAsync( CancellationToken cancellationToken )
88+
public ValueTask<T> TakeAsync( CancellationToken cancellationToken )
8989
=> cancellationToken.IsCancellationRequested
90-
? CanceledTask<T>.Value
90+
? CanceledValueTask<T>.Value
9191
: TakeAsync( new CompletionSourceAwaiterFactory<T>( cancellationToken ) );
9292

93-
private Task<T> TakeAsync<TAwaiterFactory>( TAwaiterFactory awaiterFactory ) where TAwaiterFactory : IAwaiterFactory<T>
93+
private ValueTask<T> TakeAsync<TAwaiterFactory>( TAwaiterFactory awaiterFactory ) where TAwaiterFactory : IAwaiterFactory<T>
9494
{
9595
long balanceAfterCurrentAwaiter = Interlocked.Decrement( ref _queueBalance );
9696

@@ -111,7 +111,7 @@ private Task<T> TakeAsync<TAwaiterFactory>( TAwaiterFactory awaiterFactory ) whe
111111
while ( !_itemQueue.TryTake( out item ) )
112112
spin.SpinOnce();
113113

114-
return Task.FromResult( item );
114+
return new ValueTask<T>( item );
115115
}
116116
}
117117

@@ -176,7 +176,7 @@ public static Task<AnyResult<T>> TakeFromAnyAsync( AsyncCollection<T>[] collecti
176176
if ( exclusiveSources.IsAwaiterCreated( index ) )
177177
return null;
178178

179-
Task<T> collectionTask = collection.TakeAsync( exclusiveSources.CreateAwaiterFactory( index ) );
179+
ValueTask<T> collectionTask = collection.TakeAsync( exclusiveSources.CreateAwaiterFactory( index ) );
180180

181181
// One of the collections already had an item and returned it directly
182182
if ( collectionTask != null && collectionTask.IsCompleted )

AsyncCollections/AsyncQueue.cs

+11-11
Original file line numberDiff line numberDiff line change
@@ -83,20 +83,20 @@ public void Add( T item )
8383
spin.SpinOnce();
8484
}
8585

86-
public Task<T> TakeAsync( CancellationToken cancellationToken )
86+
public ValueTask<T> TakeAsync( CancellationToken cancellationToken )
8787
=> cancellationToken.IsCancellationRequested
88-
? CanceledTask<T>.Value
88+
? CanceledValueTask<T>.Value
8989
: TakeWithoutValidationAsync( cancellationToken );
9090

91-
private Task<T> TakeWithoutValidationAsync( CancellationToken cancellationToken )
91+
private ValueTask<T> TakeWithoutValidationAsync( CancellationToken cancellationToken )
9292
{
9393
SpinWait spin = new SpinWait();
9494

9595
while ( true )
9696
{
97-
Task<T> result = Volatile.Read( ref _awaiterTail ).TryTakeAsync( cancellationToken );
97+
ValueTask<T>? result = Volatile.Read( ref _awaiterTail ).TryTakeAsync( cancellationToken );
9898
if ( result != null )
99-
return result;
99+
return result.Value;
100100

101101
spin.SpinOnce();
102102
}
@@ -260,17 +260,17 @@ private IAwaiter<T> SpinUntilAwaiterIsReady( int slot )
260260
}
261261
}
262262

263-
public Task<T> TryTakeAsync( CancellationToken cancellationToken )
263+
public ValueTask<T>? TryTakeAsync( CancellationToken cancellationToken )
264264
=> TryTakeAsync( cancellationToken, Interlocked.Increment( ref _awaiterIndex ) );
265265

266-
private Task<T> TryTakeAsync( CancellationToken cancellationToken, int slot )
266+
private ValueTask<T>? TryTakeAsync( CancellationToken cancellationToken, int slot )
267267
=> slot < SegmentSize
268268
? TryTakeWithoutValidationAsync( cancellationToken, slot )
269-
: null;
269+
: (ValueTask<T>?) null;
270270

271-
private Task<T> TryTakeWithoutValidationAsync( CancellationToken cancellationToken, int slot )
271+
private ValueTask<T> TryTakeWithoutValidationAsync( CancellationToken cancellationToken, int slot )
272272
{
273-
Task<T> result;
273+
ValueTask<T> result;
274274

275275
/// The order here differs from what <see cref="TryAdd(T)"/> does: we capture the slot *before* inserting an awaiter.
276276
/// We do it to avoid allocating an awaiter / registering the cancellation that we're not gonna need in case we lose.
@@ -284,7 +284,7 @@ private Task<T> TryTakeWithoutValidationAsync( CancellationToken cancellationTok
284284
}
285285
else
286286
{
287-
result = Task.FromResult( _items[ slot ] );
287+
result = new ValueTask<T>( _items[ slot ] );
288288
ClearSlot( slot );
289289
}
290290

AsyncCollections/IAsyncBatchCollection.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ public interface IAsyncBatchCollection<T> : IReadOnlyCollection<IReadOnlyList<T>
1111

1212
void Add( T item );
1313
void Flush();
14-
Task<IReadOnlyList<T>> TakeAsync( CancellationToken cancellationToken );
14+
ValueTask<IReadOnlyList<T>> TakeAsync( CancellationToken cancellationToken );
1515
}
1616

1717
public static class AsyncBatchCollectionExtensions
1818
{
19-
public static Task<IReadOnlyList<T>> TakeAsync<T>( this IAsyncBatchCollection<T> collection ) => collection.TakeAsync( CancellationToken.None );
19+
public static ValueTask<IReadOnlyList<T>> TakeAsync<T>( this IAsyncBatchCollection<T> collection ) => collection.TakeAsync( CancellationToken.None );
2020
public static TimerAsyncBatchQueue<T> WithFlushEvery<T>( this IAsyncBatchCollection<T> collection, TimeSpan flushPeriod ) => new TimerAsyncBatchQueue<T>( collection, flushPeriod );
2121
}
2222
}

AsyncCollections/IAsyncCollection.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ public interface IAsyncCollection<T> : IReadOnlyCollection<T>
2525
/// <summary>
2626
/// Removes and returns an item from the collection in an asynchronous manner.
2727
/// </summary>
28-
Task<T> TakeAsync( CancellationToken cancellationToken );
28+
ValueTask<T> TakeAsync( CancellationToken cancellationToken );
2929
}
3030

3131
public static class AsyncCollectionExtensions
3232
{
3333
/// <summary>
3434
/// Removes and returns an item from the collection in an asynchronous manner.
3535
/// </summary>
36-
public static Task<T> TakeAsync<T>( this IAsyncCollection<T> collection )
36+
public static ValueTask<T> TakeAsync<T>( this IAsyncCollection<T> collection )
3737
{
3838
return collection.TakeAsync( CancellationToken.None );
3939
}

AsyncCollections/Internal/CancelledTask.cs

+5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66

77
namespace HellBrick.Collections.Internal
88
{
9+
internal static class CanceledValueTask<T>
10+
{
11+
public static readonly ValueTask<T> Value = new ValueTask<T>( CanceledTask<T>.Value );
12+
}
13+
914
internal static class CanceledTask<T>
1015
{
1116
static CanceledTask()

AsyncCollections/Internal/CompletionSourceAwaiterFactory.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ private class CompletionSourceAwaiter : IAwaiter<T>
2929
public CompletionSourceAwaiter( CancellationToken cancellationToken )
3030
{
3131
_completionSource = new TaskCompletionSource<T>();
32-
Task = _completionSource.Task.WithYield();
32+
Task = new ValueTask<T>( _completionSource.Task.WithYield() );
3333

3434
_registration = cancellationToken.Register(
3535
state =>
@@ -47,7 +47,7 @@ public bool TrySetResult( T result )
4747
return _completionSource.TrySetResult( result );
4848
}
4949

50-
public Task<T> Task { get; }
50+
public ValueTask<T> Task { get; }
5151
}
5252

5353
#region IEquatable<CompletionSourceAwaiterFactory<T>>

AsyncCollections/Internal/ExclusiveCompletionSourceGroup.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public CancellationRegistrationHolder( CancellationTokenRegistration registratio
9090

9191
private class ExclusiveCompletionSource : IAwaiter<T>
9292
{
93+
private static readonly ValueTask<T> _neverEndingTask = new ValueTask<T>( new TaskCompletionSource<T>().Task );
9394
private readonly ExclusiveCompletionSourceGroup<T> _group;
9495
private readonly int _id;
9596

@@ -131,7 +132,7 @@ public bool TrySetResult( T result )
131132
}
132133

133134
// The value will never be actually used.
134-
public Task<T> Task => null;
135+
public ValueTask<T> Task => _neverEndingTask;
135136
}
136137

137138
public struct Factory : IAwaiterFactory<T>, IEquatable<Factory>

AsyncCollections/Internal/IAwaiter.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ internal interface IAwaiter<T>
2020
/// <summary>
2121
/// The task that's completed when the awaiter gets the result.
2222
/// </summary>
23-
Task<T> Task { get; }
23+
ValueTask<T> Task { get; }
2424
}
2525
}

AsyncCollections/TimerAsyncBatchQueue.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public TimerAsyncBatchQueue( IAsyncBatchCollection<T> innerCollection, TimeSpan
2222
public int BatchSize => _innerCollection.BatchSize;
2323
public int Count => _innerCollection.Count;
2424
public void Add( T item ) => _innerCollection.Add( item );
25-
public Task<IReadOnlyList<T>> TakeAsync( CancellationToken cancellationToken ) => _innerCollection.TakeAsync( cancellationToken );
25+
public ValueTask<IReadOnlyList<T>> TakeAsync( CancellationToken cancellationToken ) => _innerCollection.TakeAsync( cancellationToken );
2626
public void Flush() => _innerCollection.Flush();
2727
public IEnumerator<IReadOnlyList<T>> GetEnumerator() => _innerCollection.GetEnumerator();
2828
IEnumerator IEnumerable.GetEnumerator() => ( _innerCollection as IEnumerable ).GetEnumerator();

0 commit comments

Comments
 (0)