Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Introducing System.IO.Pipelines #980

Merged
merged 3 commits into from
Nov 15, 2016
Merged

Introducing System.IO.Pipelines #980

merged 3 commits into from
Nov 15, 2016

Conversation

davidfowl
Copy link
Member

@davidfowl davidfowl commented Nov 15, 2016

  • Golang took the name channels so now we have pipelines

@KrzysztofCwalina

- Golang took the name channels so no we have pipelines
@jchannon
Copy link

So what if Golang named it first, why can't 2 languages have the same named concept?

If a golang user comes to .net they will immediately know what channels is.

@JesperTreetop
Copy link

@jchannon Yes, they will have an idea of what channels should be and be highly confused, because channels in Go are a completely different concept. What is now called Pipelines in .NET is an optimized specialization of streams. In Go, "Channels are a typed conduit through which you can send and receive values with the channel operator, <-."

@stephentoub
Copy link
Member

The project at https://github.com/dotnet/corefxlab/tree/master/src/System.Threading.Tasks.Channels maps much more closely in concept to Go's channel.

ArraySegment<byte> data;
if (memory.TryGetArray(out data))
{
await stream.WriteAsync(data.Array, data.Offset, data.Count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the awaits on tasks should use ConfigureAwait(false)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh god, ConfigureAwait....

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still true?

/// <returns></returns>
public static async Task CopyToAsync(this ReadableBuffer buffer, Stream stream)
{
foreach (var memory in buffer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it common/likely for buffer to have more than one "memory"? If so, you could special case that and return the result of stream.WriteAsync directly, avoiding the extra async method frame.

/// <summary>
/// Returns a boolean indicating if the gate is "open"
/// </summary>
public bool IsCompleted => _gateState == _gateIsOpen;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_gateState == _gateIsOpen [](start = 35, length = 25)

Unless it's misused, checking here and in other places in the file != null should provide the same behavior as == _gateIsOpen, and without needing to load the static field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll file bugs for the Gate/Signal changes. I want to get this in asap since it's a giant "move the code over" PR. I'd prefer if there was a reusable TCS in the framework, these 2 types are basically that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if there was a reusable TCS in the framework, these 2 types are basically that.

I've considered that several times over the years. I think it'd be reasonable to add something to TCS that would essentially allow it to dump its existing task and create a new one, but that's really not sufficient to replace these custom awaitables... for that, you'd not only need to make TCS reusable, but you'd need to allow a Task to go from completed to not completed, which could potentially break a whole lot of code if used incorrectly. Such a task-resetting feature could only be used in a closed environment where the producer and consumer of the task coordinate, as otherwise the producer won't know when it's safe to reset (the same kind of issue that exists with pooled data structures in general).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this is one of those systems. We just need primitives that give the power. Maybe a new namespace? System.*.KnowWhatYouAreDoing

public void GetResult()
{
// Clear the active continuation to "reset" the state of this event
Interlocked.Exchange(ref _gateState, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interlocked.Exchange [](start = 12, length = 20)

Why an interlocked?

private MemoryPoolBlock Lease(
[CallerMemberName] string memberName = "",
[CallerFilePath] string sourceFilePath = "",
[CallerLineNumber] int sourceLineNumber = 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to store these in DEBUG-only fields?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to figure out a way to get these diagnostics without being in debug mode. But these are fine for now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But these are fine for now

How so? They don't appear to be stored or used anywhere, even when compiled for DEBUG, so they don't appear to be useful right now at all, even in DEBUG.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, these are remnants of the past.

@khalidabuhakmeh
Copy link
Contributor

I like it!

else if (fin)
{
return new ValueTask<int>(0);
}
Copy link
Member

@stephentoub stephentoub Nov 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this just be:

if (actual != 0 || fin)
{
    return new ValueTask<int>(actual);
}

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fin being true doesn't mean the buffer is empty. I'd rather keep the logic split for now and do further testing to make sure that length is 0 when fin is true (in this case).

else
{
// Needs .AsTask to match Stream's Async method return types
_cachedTask = task.AsTask();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_cachedTask = task.AsTask(); [](start = 16, length = 28)

I'm not sure this is a good thing to do. First, the operation could fail, in which case you've cached a task that has an exception, and the next time you check _cachedTask.Result, it'll throw. Second, you could end up in a situation where a call to ReadAsync blocks waiting on the _cachedTask. I think you only want to cache it in the task.IsCompletedSuccessfully case.

public override Task FlushAsync(CancellationToken cancellationToken)
{
// No-op since writes are immediate.
return Task.FromResult(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task.FromResult(0) [](start = 19, length = 18)

Task.CompletedTask

task.ContinueWith(t => callback.Invoke(t));
}
return task;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
writer.CompleteReader();
}
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better as:

writer.CopyToAsync(stream).ContinueWith((t,s) =>
{
    var innerWriter = (PipelineReaderWriter)s;
    if (t.IsFaulted) innerWriter.CompleteReader(task.Exception.InnerException);
    else innerWriter.CompleteReader();
}, writer, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);

That way, you avoid the closure/delegate allocation closing over state, the continuation is invoked as part of the antecedent task's completion (rather than queueing another work item, and the continuation will not be forced back to any custom TaskScheduler that was present. Similar comments apply to other uses of ContinueWith in this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yea, I was being lazy here. Though passing of those arguments is one of the reasons I try to avoid continue with...

/// <summary>
/// Provides a wrapper around the ZLib decompression API
/// </summary>
internal sealed class Inflater : IDisposable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @ianhays... we should think through exposing Inflater/Deflator in corefx publicly, so that other consumers besides DeflateStream can use it directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we need to design clean low level APIs.


In reply to: 88018514 [](ancestors = 88018514)

Value = value;
}
public T Value { get; set; }
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.Runtime.CompilerServices.StrongBox

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice


// Win32 file impl
// TODO: Other platforms
public unsafe void OpenReadFile(string path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you still need this now that FileStream has an optimized CopyToAsync implementation? What further changes would be necessary there to obviate the need for this here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a proof of concept done way before FileStream.CopyToAsync had those changes. Actually it would be better to use FileStream.CTA adapter directly now (at least on .NET Core) because you get the cross platform benefits. There are some questions as to whether we ship this at all but for now its just an experiment. Ideally we'd have a unix impl so we could see what that looks like.

/// a custom awaiter is provided. Works like a <see cref="ManualResetEvent "/> - the <see cref="Reset"/> method must
/// be called between operations.
/// </summary>
internal class Signal : INotifyCompletion
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ICriticalNotifyCompletion


public void OnCompleted(Action continuation)
{
if (continuation != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (continuation != null) [](start = 11, length = 26)

This would only happen if it was misused... the compiler will never pass a null continuation.

var oldValue = Interlocked.CompareExchange(ref _continuation, continuation, null);

if (ReferenceEquals(oldValue, _completedSentinel))
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments as on Gate about this.

private Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken, object state)
{
var tcs = new TaskCompletionSource<object>(state);
var task = WriteAsync(buffer, offset, count, cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var task = WriteAsync(buffer, offset, count, cancellationToken); [](start = 12, length = 64)

In cases like this, where the async operation is likely to complete synchronously, you're better off adding a fast path, e.g.

Task task = WriteAsync(...);
if (task.Status == TaskStatus.RanToCompletion)
{
    return task;
}

var tcs = ...
...

@adamralph
Copy link

Does it matter that golang might introduce something called pipelines later which is again completely different to this?

@nzall
Copy link

nzall commented Nov 15, 2016

@adamralph golang already has a concept called pipelines, which is a series of stages (goroutines running the same function) connected by channels. Since there is only a limited set of words in the English vocabulary that are suitable for software purposes, chances are that the good ones are already taken.

@omariom
Copy link
Contributor

omariom commented Nov 15, 2016

@davidfowl
What is the allocation profile of such approach compared to the traditional streams?

var listener = new UvTcpListener(thread, new IPEndPoint(ip, port));
listener.OnConnection(async connection =>
{
var channel = MakePipeline(connection);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't you rename "channel" as well?

@KrzysztofCwalina
Copy link
Member

using System.Buffers;

All the files should have the standard copyright/license notice


Refers to: src/System.IO.Pipelines/ArrayBufferPool.cs:1 in 56ee6e7. [](commit_id = 56ee6e7, deletion_comment = False)


namespace System.IO.Pipelines
{
public class PipelineConnectionStream : Stream
Copy link
Member

@KrzysztofCwalina KrzysztofCwalina Nov 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream [](start = 44, length = 6)

I think the adapters should be separated (namespace and possibly package wise) from the pure-pipeline APIs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System.IO.Pipelines.Bridge? System.IO.Pipelines.Streams?

/// <summary>
/// The underlying <see cref="PipelineReaderWriter"/> the <see cref="PipelineReader"/> communicates over.
/// </summary>
protected readonly PipelineReaderWriter _input;
Copy link
Member

@KrzysztofCwalina KrzysztofCwalina Nov 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the fact that reader wraps reader/writers shows that we don't yet have a good name for the reader/writer. Having said that, I think we should merge as-is and think about it later.


namespace System.IO.Pipelines.Networking.Libuv.Interop
{
public struct SockAddr
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for struct layout?

@KrzysztofCwalina
Copy link
Member

🕐

@adamralph
Copy link

With reference to @stephentoub's comment, is the implication that System.IO.Pipelines aligns more closely with golang pipelines (referenced by @nzall)?

@stephentoub
Copy link
Member

stephentoub commented Nov 15, 2016

With reference to @stephentoub's comment, is the implication that System.IO.Pipelines aligns more closely with golang pipelines (referenced by @nzall)?

A "pipeline" is a general pattern, a series of processing elements (often called stages) with data handed from one stage to the next. A pipeline can be implemented in many languages/frameworks; in Go, it's typically implemented using a goroutine for each stage and using a channel as the mechanism for handing data between stages. But the concept is in no way specific to Go.

@adamralph
Copy link

@stephentoub thanks for the explanation. I'm trying to rationalise the rename from "channels" to "pipelines". I can see how the rename probably makes sense in this case, but basing the decision on another platform having "taken" the generic noun you intended to use feels a bit off somehow.

@JesperTreetop
Copy link

@adamralph Look at the repository @stephentoub linked to - they're not changing the name of this thing "because we can't use names taken in other languages", but because they already are testing something which would be closer to channels and want to name that thing channels instead.

@adamralph
Copy link

@JesperTreetop that sounds like a good reason. It's a very different reason to that given by the OP.

@dazinator
Copy link

Couldn't resist commenting.. I have literally no idea what this is.. but I love it +1. Look forward to working it out through experimentation :-)

@malachib
Copy link

malachib commented Dec 9, 2016

I'm excited about this, it looks like a potentially fresher and better tool than Streams, especially for in-process / short-hop data movement

// Copy from the file reader to the console writer
input.CopyToAsync(output).GetAwaiter().GetResult();

input.Complete();
Copy link

@dazinator dazinator Dec 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you forget to call Complete() ? Do you think there would be any benefit if IPipelineReader / 'IPipelineReader' implemented IDisposable, and called Complete() in it's dispose?

@davidfowl davidfowl deleted the davidfowl/pipelines branch December 23, 2016 16:20
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.