Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions src/core/Akka.Streams.Tests/Implementation/ChannelSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
//-----------------------------------------------------------------------

using System;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -92,5 +95,52 @@ public async Task ChannelSource_must_read_incoming_events()
probe.ExpectNext(4);
probe.ExpectNext(5);
}

/// <summary>
/// Reproduces GitHub issue #7940: NullReferenceException when completing
/// a ChannelReader while the stream is waiting for data.
/// </summary>
[Fact(DisplayName = "ChannelSource should not throw NRE when completing channel while waiting for data")]
public async Task ChannelSource_should_not_throw_NRE_when_completing_channel_while_waiting_for_data()
{
// This test reproduces the race condition from #7940
// Run multiple iterations to increase chance of hitting the race
for (var iteration = 0; iteration < 20; iteration++)
{
var channel = Channel.CreateUnbounded<string>();
var processed = new ConcurrentBag<string>();

// Exactly matches the repro from the issue - using ImmutableArray.Create and Sink.Ignore<Done>
var streamTask = ChannelSource.FromReader(channel.Reader)
.Select(ImmutableArray.Create)
.Select(s =>
{
foreach (var item in s) processed.Add(item);
return Done.Instance;
})
.ToMaterialized(Sink.Ignore<Done>(), Keep.Right)
.Run(_materializer);

// Write some items
var testInput = Enumerable.Range(1, 5).Select(i => i.ToString()).ToList();
foreach (var item in testInput)
await channel.Writer.WriteAsync(item);

// Wait 1 second for stream to process items and then wait for more data
// This is the key to reproducing the race - the stream needs to be
// waiting in WaitToReadAsync when we complete the writer (channel is empty)
await Task.Delay(1000);

// Complete the channel - this can cause NRE if there's a race
// between OnReaderComplete and the async continuation of WaitToReadAsync
channel.Writer.Complete();

// Stream should complete cleanly without exceptions
await streamTask;

// Verify all items were processed
processed.Count.Should().Be(5, $"iteration {iteration} failed");
}
}
}
}
36 changes: 35 additions & 1 deletion src/core/Akka.Streams/Implementation/ChannelSources.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//-----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Akka.Streams.Stage;
Expand All @@ -21,6 +22,10 @@ sealed class ChannelSourceLogic<T> : OutGraphStageLogic
private readonly Action<Exception> _onReaderComplete;
private readonly Action<Task<bool>> _onReadReady;

// Flag to prevent race condition between OnReaderComplete and OnValueRead
// when channel completion and WaitToReadAsync fire simultaneously (issue #7940)
private int _completing;

public ChannelSourceLogic(SourceShape<T> source, Outlet<T> outlet,
ChannelReader<T> reader) : base(source)
{
Expand All @@ -44,20 +49,41 @@ public ChannelSourceLogic(SourceShape<T> source, Outlet<T> outlet,

private void OnReaderComplete(Exception reason)
{
// Use atomic compare-exchange to ensure only one completion path runs
// This prevents race with OnValueRead when both fire simultaneously
if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0)
return; // Already completing from another path

if (reason is null)
CompleteStage();
else
FailStage(reason);
}

private void OnValueReadFailure(Exception reason) => FailStage(reason);
private void OnValueReadFailure(Exception reason)
{
// Use atomic compare-exchange to ensure only one completion path runs
if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0)
return; // Already completing from another path

FailStage(reason);
}

private void OnValueRead(bool dataAvailable)
{
if (dataAvailable && _reader.TryRead(out var element))
{
Push(_outlet, element);
}
else
{
// Use atomic compare-exchange to ensure only one completion path runs
// This prevents race with OnReaderComplete when both fire simultaneously
if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0)
return; // Already completing from another path

CompleteStage();
}
}

public override void OnPull()
Expand All @@ -73,9 +99,17 @@ public override void OnPull()
{
var dataAvailable = continuation.GetAwaiter().GetResult();
if (dataAvailable && _reader.TryRead(out element))
{
Push(_outlet, element);
}
else
{
// Use atomic compare-exchange to ensure only one completion path runs
if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0)
return; // Already completing from another path

CompleteStage();
}
}
else
continuation.AsTask().ContinueWith(_onReadReady);
Expand Down