Skip to content

fix(streams): prevent race condition in ChannelSource on channel completion#7941

Merged
Aaronontheweb merged 5 commits into
akkadotnet:devfrom
Aaronontheweb:claude-wt-20251124-132115
Nov 25, 2025
Merged

fix(streams): prevent race condition in ChannelSource on channel completion#7941
Aaronontheweb merged 5 commits into
akkadotnet:devfrom
Aaronontheweb:claude-wt-20251124-132115

Conversation

@Aaronontheweb

Copy link
Copy Markdown
Member

Summary

Root Cause

When channel.Writer.Complete() is called while the stream is blocked on WaitToReadAsync(), two async callbacks can fire simultaneously:

  1. _reader.Completion continuation → OnReaderComplete(null)CompleteStage()
  2. WaitToReadAsync() continuation → OnValueRead(false)CompleteStage()

Both paths could pass the IsStageCompleted check before either completed the stage, leading to concurrent access of stage internals and an NRE.

Fix

Added an atomic flag (_completing) using Interlocked.Exchange to ensure only one completion path ever executes. Both paths ultimately call CompleteStage(), so it doesn't matter which one wins - we just need to ensure only one runs.

Test plan

  • Added reproduction test ChannelSource_should_not_throw_NRE_when_completing_channel_while_waiting_for_data
  • All existing ChannelSourceSpec tests pass
  • Ran tests multiple times to verify stability

…letion (akkadotnet#7940)

Fixed a race condition in ChannelSourceLogic that caused intermittent
NullReferenceException when completing a ChannelWriter while the stream
was waiting for data.

The issue occurred because two async callbacks could fire simultaneously
when the channel writer completed:
1. The _reader.Completion continuation → OnReaderComplete → CompleteStage
2. The WaitToReadAsync continuation → OnValueRead(false) → CompleteStage

Both paths could pass the IsStageCompleted check before either completed
the stage, leading to concurrent access of stage internals.

The fix adds an atomic flag (_completing) using Interlocked.Exchange to
ensure only one completion path ever executes. This is applied to:
- OnReaderComplete - channel completion callback
- OnValueRead - when data is not available
- OnValueReadFailure - when read fails
- OnPull - synchronous completion path
CompareExchange is more semantically correct - it only sets the value
if it's currently 0, rather than unconditionally setting it.

@Aaronontheweb Aaronontheweb left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key to this PR is using Interlocked.CompareExchange to prevent one of two competing sources of shutdown signaling from overriding each other:

  1. Internal Akka.Streams stage completion
  2. External ChannelReader completion

This change makes that thread-safe by using an internal shutdown flag and Interlocked to read + set it.

@Arkatufus Arkatufus left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb enabled auto-merge (squash) November 24, 2025 23:04
@Aaronontheweb Aaronontheweb merged commit 41df361 into akkadotnet:dev Nov 25, 2025
11 checks passed
Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this pull request Nov 25, 2025
…letion (akkadotnet#7941)

* fix(streams): prevent race condition in ChannelSource on channel completion (akkadotnet#7940)

Fixed a race condition in ChannelSourceLogic that caused intermittent
NullReferenceException when completing a ChannelWriter while the stream
was waiting for data.

The issue occurred because two async callbacks could fire simultaneously
when the channel writer completed:
1. The _reader.Completion continuation → OnReaderComplete → CompleteStage
2. The WaitToReadAsync continuation → OnValueRead(false) → CompleteStage

Both paths could pass the IsStageCompleted check before either completed
the stage, leading to concurrent access of stage internals.

The fix adds an atomic flag (_completing) using Interlocked.Exchange to
ensure only one completion path ever executes. This is applied to:
- OnReaderComplete - channel completion callback
- OnValueRead - when data is not available
- OnValueReadFailure - when read fails
- OnPull - synchronous completion path

* refactor: use CompareExchange instead of Exchange for atomic flag

CompareExchange is more semantically correct - it only sets the value
if it's currently 0, rather than unconditionally setting it.
Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this pull request Nov 25, 2025
…letion (akkadotnet#7941)

* fix(streams): prevent race condition in ChannelSource on channel completion (akkadotnet#7940)

Fixed a race condition in ChannelSourceLogic that caused intermittent
NullReferenceException when completing a ChannelWriter while the stream
was waiting for data.

The issue occurred because two async callbacks could fire simultaneously
when the channel writer completed:
1. The _reader.Completion continuation → OnReaderComplete → CompleteStage
2. The WaitToReadAsync continuation → OnValueRead(false) → CompleteStage

Both paths could pass the IsStageCompleted check before either completed
the stage, leading to concurrent access of stage internals.

The fix adds an atomic flag (_completing) using Interlocked.Exchange to
ensure only one completion path ever executes. This is applied to:
- OnReaderComplete - channel completion callback
- OnValueRead - when data is not available
- OnValueReadFailure - when read fails
- OnPull - synchronous completion path

* refactor: use CompareExchange instead of Exchange for atomic flag

CompareExchange is more semantically correct - it only sets the value
if it's currently 0, rather than unconditionally setting it.
Aaronontheweb added a commit to Aaronontheweb/akka.net that referenced this pull request Nov 25, 2025
…letion (akkadotnet#7941)

* fix(streams): prevent race condition in ChannelSource on channel completion (akkadotnet#7940)

Fixed a race condition in ChannelSourceLogic that caused intermittent
NullReferenceException when completing a ChannelWriter while the stream
was waiting for data.

The issue occurred because two async callbacks could fire simultaneously
when the channel writer completed:
1. The _reader.Completion continuation → OnReaderComplete → CompleteStage
2. The WaitToReadAsync continuation → OnValueRead(false) → CompleteStage

Both paths could pass the IsStageCompleted check before either completed
the stage, leading to concurrent access of stage internals.

The fix adds an atomic flag (_completing) using Interlocked.Exchange to
ensure only one completion path ever executes. This is applied to:
- OnReaderComplete - channel completion callback
- OnValueRead - when data is not available
- OnValueReadFailure - when read fails
- OnPull - synchronous completion path

* refactor: use CompareExchange instead of Exchange for atomic flag

CompareExchange is more semantically correct - it only sets the value
if it's currently 0, rather than unconditionally setting it.
Aaronontheweb added a commit that referenced this pull request Nov 25, 2025
…letion (#7941) (#7951)

* fix(streams): prevent race condition in ChannelSource on channel completion (#7940)

Fixed a race condition in ChannelSourceLogic that caused intermittent
NullReferenceException when completing a ChannelWriter while the stream
was waiting for data.

The issue occurred because two async callbacks could fire simultaneously
when the channel writer completed:
1. The _reader.Completion continuation → OnReaderComplete → CompleteStage
2. The WaitToReadAsync continuation → OnValueRead(false) → CompleteStage

Both paths could pass the IsStageCompleted check before either completed
the stage, leading to concurrent access of stage internals.

The fix adds an atomic flag (_completing) using Interlocked.Exchange to
ensure only one completion path ever executes. This is applied to:
- OnReaderComplete - channel completion callback
- OnValueRead - when data is not available
- OnValueReadFailure - when read fails
- OnPull - synchronous completion path

* refactor: use CompareExchange instead of Exchange for atomic flag

CompareExchange is more semantically correct - it only sets the value
if it's currently 0, rather than unconditionally setting it.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Akka.Streams: Source.ChannelReader NullReferenceException upon dispose

2 participants