Skip to content

Akka.Streams: Have SelectAsyncUnordered use local async function instead of ContinueWith#7531

Merged
Aaronontheweb merged 3 commits into
akkadotnet:devfrom
Aaronontheweb:selectasyncunordered
Mar 18, 2025
Merged

Akka.Streams: Have SelectAsyncUnordered use local async function instead of ContinueWith#7531
Aaronontheweb merged 3 commits into
akkadotnet:devfrom
Aaronontheweb:selectasyncunordered

Conversation

@Aaronontheweb

@Aaronontheweb Aaronontheweb commented Mar 18, 2025

Copy link
Copy Markdown
Member

Changes

Replicates some of the behaviors and fixes we made to SelectAsync in #7521

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

…ntinueWith`

Replicates some of the behaviors and fixes we made to `SelectAsync` in akkadotnet#7521
@Aaronontheweb Aaronontheweb added this to the 1.5.40 milestone Mar 18, 2025
@Aaronontheweb

Copy link
Copy Markdown
Member Author

More than likely I am going to need to update some unit tests

@Aaronontheweb Aaronontheweb left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detailed my changes

TryPull(_stage.In);
return;

async Task RunTask(Task<TOut> tt)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use an async local function with an await on the end-user Task<TOut> instead of using ContinueWith - this will result in better error handling and fewer weird task context issues.

Replicates the same work we did on #7521

var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().Message.Should().Be("err2");
(await c.ExpectErrorAsync()).Message.Should().Be("err2");

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer get AggregateExceptions here unless the test completes immediately, in which case we might have to harden this in the future.

var sub = await c.ExpectSubscriptionAsync();
sub.Request(10);
c.ExpectError().InnerException.Message.Should().Be("err1");
(await c.ExpectErrorAsync()).Message.Should().Be("err1");

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer get AggregateExceptions here unless the test completes immediately, in which case we might have to harden this in the future.

@Arkatufus Arkatufus left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb enabled auto-merge (squash) March 18, 2025 15:46
@Aaronontheweb Aaronontheweb disabled auto-merge March 18, 2025 18:21
@Aaronontheweb Aaronontheweb merged commit 5015c5c into akkadotnet:dev Mar 18, 2025
@Aaronontheweb Aaronontheweb deleted the selectasyncunordered branch March 18, 2025 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants