-
Notifications
You must be signed in to change notification settings - Fork 29.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: introduce Body #39483
stream: introduce Body #39483
Conversation
@nodejs/streams |
This needs more work on docs and tests but the basics should be there. Would appreciate some general feedback on the concept. |
66d5378
to
bf97d01
Compare
This also helps the ecosystem with consuming streamlike objects, e.g: async function myProduceApi() {
return compose(myReadableOfChoice())
}
async function myConsumeApi(body) {
await pipeline(body, myWritableOfChoice())
}
function myTransformApi(body) {
return compose(body, myTransformerOfChoice())
} e.g. in undici we currently need to have quite complicated and brittle code in order to support as many different input types as possible, this PR would allow the ecosystem to unify this. |
39a70c4
to
d4c3a1f
Compare
|
@jasnell you might want to consider this for quic and/or the new http APi's. |
Did compose already ship into a release? If not, it would be better to avoid shipping it or flagging it as experimental, otherwise we will need to go through a deprecation cycle. |
It hasn't shipped. Right now it can only ship in v17 as it depends on semver major. |
39027d9
to
babe5ec
Compare
This introduce a new stream primitive called Body which helps with performance, ergonomics and compatibility when working with different types of data producers and consumers. Using Body it will be possible to delay converting streamlike objects as long as possible and enable some optimizations where we can avoid e.g. intermediate node streams.
This comment has been minimized.
This comment has been minimized.
Co-authored-by: Voltrex <[email protected]>
Co-authored-by: Voltrex <[email protected]>
Co-authored-by: Voltrex <[email protected]>
Co-authored-by: Voltrex <[email protected]>
added: REPLACEME | ||
--> | ||
|
||
* Returns: {Readable} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Returns: {Readable} | |
* Returns: {Writable} |
@@ -1722,6 +1722,141 @@ const cleanup = finished(rs, (err) => { | |||
// ... | |||
}); | |||
``` | |||
#### Class: `stream.Body` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While it makes things a bit more complicated, I would generally prefer to separate out the Body
mixin pieces here from the Node.js specific additional APIs. Or, if not that, let's not call this Body
to avoid confusion. stream.Consumers
perhaps?
Further, I think an API with static methods similar to would be nicer here... it gives us more options for wrapping the methods in various API specific ways.
const ab = await stream.consumers.arrayBuffer(readable);
// ...
const ab = await stream.consumers.blob(stream.compose(async function()* { yield 'hello'; }));
I really don't want to introduce yet another top level data encapsulation object given how many we already have.
|
||
Returns a promise that fulfills with an {Buffer} containing a copy of the body data. | ||
|
||
### `body.nodeStream()` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we already have the node-to-web adapters, and your new compose
function hopefully landing soon, I don't think we need these. Let's keep this focused on the accumulator funtions.
Let's do it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make it explicit. I'm -1 on introducing a new Body
object. This should provide utilities for making it possible to implement the Body
mixin pattern but having it as a separate new kind of object is counter productive.
@jasnell The primary problem this tries to resolve is to convert to a specific consumer as late as possible as to avoid intermediate glue streams. The problem with e.g. consider e.g. const composed = compose(createMyWebStream(), ...transforms, createMyWebTransformStream())
myWebApi(composed.toWeb()) Which ends up with: TransformStream => Duplex (glue) => ReadableStream (glue) => consumer If we instead had a "placeholder" e.g. TransformStream => consumer If we want to achieve this in terms of performance then IMO we do need another top level data structure (sorry). Even without compose this becomes the problem as you often have to decide early in what form you receive an data stream, even if that form might not fit the final consumer, e.g. In a way I guess this is just extending the BodyMixin? The other ergonomic parts of this PR I would like but I don't feel so strongly about it. |
Except the async function text() {
let text = '';
for await (const chunk of stream.readableWebStream().pipeThrough(new TextDecoderStream()))
text += chunk;
return chunk;
} Having the additional intermediate What does make sense to me is providing the underlying accumulator functions so they can be used in multiple places... For instance, in the QUIC stream class I could provide methods... const {
text,
json,
arrayBuffer,
} = require('stream/accumulators');
// ..
class Stream {
// ...
async function text() {
return text(this.readableWebStream());
}
async function json() {
return json(this.readableWebStream());
}
async function arrayBuffer() {
return arrayBuffer(this.readableWebStream());
}
} Or, perhaps using the mixin model... something like... const {
kGetReadableWebStream,
mixinBody,
} = require('stream/accumulators');
// ..
class Stream {
get [kGetReadableWebStream]() {
return this.readableWebStream();
}
}
mixinBody(Stream.prototype);
// Such that mixinBody adds the common set of Body mixin methods to the `Stream` prototype. |
You still need to run through a lot of glue here. Just writing directly to a buffer or string would still be faster than e.g. in undici we could totally skip all glue and do: class Body {
[kPush](chunk) {
if (this[kType] === CONSUME_STRING_TYPE) {
if (chunk) {
this[kString] += chunk
} else {
this[kResolve]()
}
}
}
text () {
if (this[kType]) throw new TypeError('disturbed')
this[kType] = CONSUME_STRING_TYPE
return new Promise((resolve, reject) => {
this[kResolve] = resolve
this[kReject] = reject
})
}
} Please see https://github.com/nodejs/undici/pull/898/files for a complete example.
That misses the point that we should try to avoid consuming streams multiple times. But if we skip that concern then yes, guess that is an option for the stream + Again the primary point is to avoid glue when consuming data streams, i.e. a top level "lazy stream type" data structure. |
So there are 2 parts of this, i.e. we usually have two glue steps every time we consume data, e.g.
1, Is about implementing some kind of body mixin where the different ways of consuming can be implemented without glue. So basically what we want is:
|
@nodejs/streams I think we need more opinions and feedback here to get further in discussion. |
I've read the thread and see the arguments but I feel like in order to hold an informed opinion I'd need to spend ±10-20 good hours on this. This is why I've also been avoiding most of the compose discussions - I see the merit but I feel like coming in with an uninformed opinion because I don't have the capacity to be informed would just hinder the people contributing. If you'd like I'm happy to take a look in ~a month when hopefully Daniel (the 👶) sleeps through the night, I'm done with the move and I'm less as new at Microsoft :) |
Let's wait for what happens with #39520 and whether or not that can be used or affect this PR. I'll wait with pushing for this PR until then. |
This introduces a new stream primitive called Body which helps
with performance, ergonomics and compatibility when working
with different types of data producers and consumers.
Using Body it will be possible to delay converting streamlike
objects as long as possible and enable some optimizations where
we can avoid e.g. intermediate node streams.
The current implementation does not yet take advantage of possible optimizations. However, I think that is something we can add and improve on in the future.