diff --git a/src/core/Akka.Streams.Tests/Implementation/ChannelSourceSpec.cs b/src/core/Akka.Streams.Tests/Implementation/ChannelSourceSpec.cs index f0535eef1d6..2961df9b577 100644 --- a/src/core/Akka.Streams.Tests/Implementation/ChannelSourceSpec.cs +++ b/src/core/Akka.Streams.Tests/Implementation/ChannelSourceSpec.cs @@ -6,6 +6,9 @@ //----------------------------------------------------------------------- using System; +using System.Collections.Concurrent; +using System.Collections.Immutable; +using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -92,5 +95,52 @@ public async Task ChannelSource_must_read_incoming_events() probe.ExpectNext(4); probe.ExpectNext(5); } + + /// + /// Reproduces GitHub issue #7940: NullReferenceException when completing + /// a ChannelReader while the stream is waiting for data. + /// + [Fact(DisplayName = "ChannelSource should not throw NRE when completing channel while waiting for data")] + public async Task ChannelSource_should_not_throw_NRE_when_completing_channel_while_waiting_for_data() + { + // This test reproduces the race condition from #7940 + // Run multiple iterations to increase chance of hitting the race + for (var iteration = 0; iteration < 20; iteration++) + { + var channel = Channel.CreateUnbounded(); + var processed = new ConcurrentBag(); + + // Exactly matches the repro from the issue - using ImmutableArray.Create and Sink.Ignore + var streamTask = ChannelSource.FromReader(channel.Reader) + .Select(ImmutableArray.Create) + .Select(s => + { + foreach (var item in s) processed.Add(item); + return Done.Instance; + }) + .ToMaterialized(Sink.Ignore(), Keep.Right) + .Run(_materializer); + + // Write some items + var testInput = Enumerable.Range(1, 5).Select(i => i.ToString()).ToList(); + foreach (var item in testInput) + await channel.Writer.WriteAsync(item); + + // Wait 1 second for stream to process items and then wait for more data + // This is the key to reproducing the race - the stream needs to be + // waiting in WaitToReadAsync when we complete the writer (channel is empty) + await Task.Delay(1000); + + // Complete the channel - this can cause NRE if there's a race + // between OnReaderComplete and the async continuation of WaitToReadAsync + channel.Writer.Complete(); + + // Stream should complete cleanly without exceptions + await streamTask; + + // Verify all items were processed + processed.Count.Should().Be(5, $"iteration {iteration} failed"); + } + } } } diff --git a/src/core/Akka.Streams/Implementation/ChannelSources.cs b/src/core/Akka.Streams/Implementation/ChannelSources.cs index 7097bef3516..5871561c388 100644 --- a/src/core/Akka.Streams/Implementation/ChannelSources.cs +++ b/src/core/Akka.Streams/Implementation/ChannelSources.cs @@ -6,6 +6,7 @@ //----------------------------------------------------------------------- using System; +using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; using Akka.Streams.Stage; @@ -21,6 +22,10 @@ sealed class ChannelSourceLogic : OutGraphStageLogic private readonly Action _onReaderComplete; private readonly Action> _onReadReady; + // Flag to prevent race condition between OnReaderComplete and OnValueRead + // when channel completion and WaitToReadAsync fire simultaneously (issue #7940) + private int _completing; + public ChannelSourceLogic(SourceShape source, Outlet outlet, ChannelReader reader) : base(source) { @@ -44,20 +49,41 @@ public ChannelSourceLogic(SourceShape source, Outlet outlet, private void OnReaderComplete(Exception reason) { + // Use atomic compare-exchange to ensure only one completion path runs + // This prevents race with OnValueRead when both fire simultaneously + if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0) + return; // Already completing from another path + if (reason is null) CompleteStage(); else FailStage(reason); } - private void OnValueReadFailure(Exception reason) => FailStage(reason); + private void OnValueReadFailure(Exception reason) + { + // Use atomic compare-exchange to ensure only one completion path runs + if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0) + return; // Already completing from another path + + FailStage(reason); + } private void OnValueRead(bool dataAvailable) { if (dataAvailable && _reader.TryRead(out var element)) + { Push(_outlet, element); + } else + { + // Use atomic compare-exchange to ensure only one completion path runs + // This prevents race with OnReaderComplete when both fire simultaneously + if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0) + return; // Already completing from another path + CompleteStage(); + } } public override void OnPull() @@ -73,9 +99,17 @@ public override void OnPull() { var dataAvailable = continuation.GetAwaiter().GetResult(); if (dataAvailable && _reader.TryRead(out element)) + { Push(_outlet, element); + } else + { + // Use atomic compare-exchange to ensure only one completion path runs + if (Interlocked.CompareExchange(ref _completing, 1, 0) != 0) + return; // Already completing from another path + CompleteStage(); + } } else continuation.AsTask().ContinueWith(_onReadReady);