Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have MergedLinesEnumerable implement IAsyncEnumerable<string> #109

Open
wants to merge 30 commits into
base: release-1.7
Choose a base branch
from

Conversation

Bartleby2718
Copy link

@Bartleby2718 Bartleby2718 commented Feb 26, 2024

I'm not really familiar with IAsyncEnumerable, but I took a stab at it. Looking forward to your feedback @madelson.

This closes #98.

MedallionShell.Tests/AttachingTests.cs Outdated Show resolved Hide resolved
MedallionShell.Tests/MedallionShell.Tests.csproj Outdated Show resolved Hide resolved
MedallionShell.Tests/MedallionShell.Tests.csproj Outdated Show resolved Hide resolved
MedallionShell.Tests/MedallionShell.Tests.csproj Outdated Show resolved Hide resolved
stylecop.analyzers.ruleset Outdated Show resolved Hide resolved
MedallionShell.Tests/MedallionShell.Tests.csproj Outdated Show resolved Hide resolved
MedallionShell/Streams/MergedLinesEnumerable.cs Outdated Show resolved Hide resolved
MedallionShell/Streams/MergedLinesEnumerable.cs Outdated Show resolved Hide resolved
@Bartleby2718
Copy link
Author

Build failed because the CI server uses MSBuild 16.11.2.50704, which doesn't support C# 12.

@Bartleby2718 Bartleby2718 marked this pull request as ready for review February 26, 2024 00:58
Copy link
Owner

@madelson madelson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for implementing! I left a few comments; let me know if you have questions.

@Bartleby2718 Bartleby2718 changed the base branch from master to release-1.7 March 7, 2024 00:31
@Bartleby2718 Bartleby2718 marked this pull request as draft March 7, 2024 00:31
}
}

var task1 = Task.Run(() => WriteStrings(strings1, pipe1));
var task2 = Task.Run(() => WriteStrings(strings2, pipe2));
var consumeTask = Task.Run(() => enumerable.ToList());
Task.WaitAll(task1, task2, consumeTask);
Task.WaitAll(task1, task2);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I had to swap the order for tests to pass. Is this a red flag?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's revert this change and make sure it still passes. Also, does this pass or fail on main? You didn't make any changes to Pipe I think so it may be an issue with the release branch.

Copy link
Author

@Bartleby2718 Bartleby2718 Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson I looked more into this and gathered some numbers, but I'm lost as to how I should debug this.

Note:

  • Workarounds used in the test project to run tests on master:
    • updated PackageReferences to nunit 3.13.3, NUnit3TestAdapter 3.17.0, Microsoft.NET.Test.Sdk 15.9.0
    • added NU1902,NU1903 to NoWarn and set CheckEolTargetFramework to false
chance \ Branch master
(net46 / netcoreapp2.2)
release-1.7
(net462 / net6.0)
AsyncEnumerable with Task.WaitAll(task1, task2, consumeTask)
(net462 / net6.0)
25% (current) 516ms / 8s
image
511ms / 8.4s
image
522ms / timeout in the async case, 9.2s in the sync case
image
20% (new) 408ms / 6.4s
image
374ms / 6.2s
image
282ms / timeout in the async case, 5.9s in the sync case
image

However, I noticed that a small change makes a difference.

  1. If I start consume before waiting writes, I get a timeout:
            var consumeTask = Task.Run(enumerable.ToListAsync);
            Task.WaitAll(task1, task2);
  1. If I wait for writes before I start consuming, the performance is similar to the first two columns:
            Task.WaitAll(task1, task2);
            var consumeTask = Task.Run(enumerable.ToListAsync);
  • This proves that the problem lies in consumeTask, not SpinWait.
  1. If I do the same as but use much shorter strings1 and strings2, the test completes within 20ms.
            // originally 2000
            var strings1 = Enumerable.Range(0, 20).Select(_ => Guid.NewGuid().ToString()).ToArray();
            // originally 2300
            var strings2 = Enumerable.Range(0, 23).Select(_ => Guid.NewGuid().ToString()).ToArray();
...
            // same as master or release-1.7
            var consumeTask = Task.Run(enumerable.ToListAsync);
            Task.WaitAll(task1, task2, consumeTask);

            CollectionAssert.AreEquivalent(strings1.Concat(strings2).ToList(), consumeTask.Result);

Therefore, I believe that my IAsyncEnumerable implementation is flawed in a way that somehow "explodes" for bigger inputs (the threshold fluctuates, but it's somewhere between 70 and low 100s).

Any idea how I should debug this? For one thing, I think replacing Guid.NewGuid() with a human-friendly value will help, but that's all I can think of. I can also temporarily comment out SpinWait-related code.

Copy link
Author

@Bartleby2718 Bartleby2718 Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also found that consumeTask.Status is WaitingForActivation even after a few seconds (if I don't await it or include consumeTask in the WaitAll).

Copy link
Author

@Bartleby2718 Bartleby2718 Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like ChatGPT came to the rescue again. (It wasn't helping a few hours ago.)

I lost its message, but it said something along the lines of "StreamWriter wasn't being disposed properly, causing the Pipe's InputStream to wait indefinitely."

Now the test passes, but I can't run spinWait.SpinOnce(); as often. Do you think the frequency should also be a protected virtual value?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lost its message, but it said something along the lines of "StreamWriter wasn't being disposed properly, causing the Pipe's InputStream to wait indefinitely."

Makes sense; we have do dispose the writer to end the stream. Can you point me to the relevant code change?

Now the test passes, but I can't run spinWait.SpinOnce(); as often. Do you think the frequency should also be a protected virtual value?

I'm not sure I follow here. As often as what? Does it fail when it runs more often? In what way? How does the overall time for this test case compare before and after the changes (I would expect it to be the same). Who would be overriding the frequency if it were protected virtual?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson

  1. StreamWriter changes
    FuzzTest is the one where I had to change this. Note the parameter changes of the local function WriteStrings.
  2. spinWait.SpinOnce() changes
    In FuzzTest, specifically https://github.com/madelson/MedallionShell/pull/109/files#diff-68fdbc9634d30b7e1a0bb438ab37b458f0c478766fba188c44ece72f93e41cacR102-R121, I updated the if condition from random.Next(4) == 1 to random.Next(110) == 1, so I'm spinning left often (25% -> 0.91%). If I use a greater value for random.Next (i.e. spin more often), then the test never ends (at least for like 30 minutes, after which I stop the test) only in the async case. Therefore I was wondering if MergedLinesEnumerableTestAsync and MergedLinesEnumerableTestSync should use different frequencies by overriding a protected virtual variable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson Let me know if the above makes sense!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson Bumping this thread!

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test never ends (at least for like 30 minutes, after which I stop the test) only in the async case

This makes me feel like there is a bug somewhere. It could be in the MergedLinesEnumerable changes, it could be in the test code, or it could be in the Pipe code.

What I would suggest is to (temporarily) add some logging statements to the code like this:

static class TempLogger
{
    private static readonly object Lock = new();

    public static void Log(string message)
    {
       lock (Lock)
       {
            File.AppendAllLines(@"c:\dev\log.txt", [$"[{DateTime.Now}] {message}"]);
       }
    }
}

My assumption is that at some point we should stop seeing log statements as the code will enter a hung state. We can then add additional logs to try to get closer and closer to the point where each thread stops.

From there, hopefully we can deduce why it is hanging.

@Bartleby2718
Copy link
Author

@madelson I think the PR is ready for another round of review.

BTW is there any chance we can use .NET 8 SDK in the Ubuntu pipeline?

@madelson
Copy link
Owner

BTW is there any chance we can use .NET 8 SDK in the Ubuntu pipeline?

Build failed because the CI server uses MSBuild 16.11.2.50704, which doesn't support C# 12.

These should be code-driven via the .yaml files in the repo. If you fiddle with them on your branch I think you should be able to get the behavior you're looking for.

}

public IAsyncEnumerator<string> GetAsyncEnumerator(CancellationToken cancellationToken = default) =>
// this does not allow consuming the same IEnumerable twice
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson TestConsumeTwice started failing after making this change. Explanation from ChatGPT

In the AsyncEnumerableAdapter class, the GetAsyncEnumerator() method creates a new AsyncEnumeratorAdapter instance that wraps the original IEnumerator<string>. This means that it's directly using the original enumerator from the IEnumerable<string>, which can only be enumerated once. If you try to get the async enumerator twice, it will fail because the underlying enumerator has already been exhausted.

On the other hand, the AsAsyncEnumerable() extension method creates a new enumerator each time it's called. It does this by iterating over the IEnumerable<T> items in a foreach loop. This means you can call GetAsyncEnumerator() multiple times without an issue because each call creates a new enumerator.

So, if your test expects an InvalidOperationException when calling GetAsyncEnumerator() twice, it will fail when using AsAsyncEnumerable() because this method allows multiple enumerations. If you want to preserve the single-use behavior, you should stick with the AsyncEnumerableAdapter class. If you want to allow multiple enumerations, then AsAsyncEnumerable() is the way to go. It all depends on the specific requirements of your code.

For posterity, I added this comment.

@Bartleby2718
Copy link
Author

Bartleby2718 commented Mar 11, 2024

The three failing tests are:

  1. TestStopBufferingAndDiscard - likely a problem in release-1.7 see the red X in aa09a7f
  2. FuzzTest - see this thread
  3. TestPipeline - likely a problem in release-1.7 see the red X in b0c1041
  4. TestProcessIOIsCancellable - likely a problem in release-1.7 see the red X in 2632299

Hence re-requesting review! @madelson

Copy link
Owner

@madelson madelson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bartleby2718 Bartleby2718 requested a review from madelson June 29, 2024 09:57
@Bartleby2718
Copy link
Author

@madelson To dispose the writer to end the stream, I replaced Task.WaitAll(task1, task2, consumeTask); with Task.WaitAll(task1, task2);, and all tests seem to pass now. Does this look good now?

@Bartleby2718 Bartleby2718 changed the title Close #98: Have MergedLinesEnumerable implement IAsyncEnumerable<string> Have MergedLinesEnumerable implement IAsyncEnumerable<string> Jun 29, 2024
CollectionAssert.AreEquivalent(strings1.Concat(strings2).ToList(), consumeTask.Result);
var task1 = Task.Run(() => WriteStrings(strings1, pipe1));
var task2 = Task.Run(() => WriteStrings(strings2, pipe2));
Task.WaitAll(task1, task2); // need to dispose the writer to end the stream
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bartleby2718 can we try await Task.WhenAll(task1, task2, consumeTask); here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson I've been trying to get that working, but this test fails for the async case if I do that. Not sure if the test logic is flawed (i.e. shouldn't await consumeTask if the input streams may not have been closed?) or there's a bug somewhere else.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you try specifically with the test being async and using await Task.WhenAll instead of Task.WaitAll? If that doesn't work, could be some kind of threading bug in 1.7

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson Yes, MergedLinesEnumerableTestAsync's FuzzTest fails but MergedLinesEnumerableTestSync's FuzzTest is fine. It fails even when it's run alone, but I did notice that TestPipeline(2) always fails with it if all tests are run together:
image

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madelson I somehow got the test to pass with await Task.WhenAll(task1, task2, consumeTask);! Not sure if this is the fix or it means something else needs to be fixed, but this does look promising.

Let me know what you think! (FWIW the test didn't pass within 10 secodns with if (random.Next(4) == 1).)

@Bartleby2718 Bartleby2718 requested a review from madelson June 30, 2024 07:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants