-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Summit Topic: Node.js Streams promise support #216
Comments
I’d wager this is probably “intermediate” and should list my session as a follow-up |
@mcollina How is this different that the other session on Streams? |
@WaleedAshraf - sounds good |
From 30 minutes to 1 hour |
Promisify
|
Very rough idea: No promises writeable.write, instead we use duality and do the async iterator API. Async iterators do: {
next(value): Promise<nextValueResult>
return(value): Promise<nextIfNoYieldOrFinallyOrLastValueResult>
throw(err): Promise<nextValueResult>
} You can make a direct dual interface (moving parts around): {
next(nextValueResult): Promise<value>
return(nextIfNoYieldOrFinallyOrLastValueResult): Promise<value>
throw(nextValueResult): MaybeRejectedPromsie<err>
} The first thing you will notice is that async iterators as an interface is already a dual interface. We haven't actually changed anything here in orxcer to get the API - since async iterators are bidiractional and kinda-dual already. That is, if you had an API that did:
Error handling would work like in async iterator. |
What do we put in HTTP.promises? nothing. Another idea is service workers - but that is not really a server HTTP API. I would recommend we just promisify the building blocks of HTTP (streams and event emitters). For the client: probably |
proof of concept: Writable.prototype.write[promisify.custom] = function writeProm () {
return (chunk) => new Promise((resolve, reject) => {
let isError = false
const handleErr = (err) => {
reject(err)
isError = true
}
this.once('error', handleErr)
const drain = this.write(chunk) === false
if (drain) {
this.once('drain', () => {
this.removeEventListener(handleErr)
resolve()
})
return
}
waitForTheRightMoment(() => {
this.removeEventListener(handleErr)
resolve()
})
})
} example usage: const { promisify } = require('util')
const write = promisify(MyWritableStream)
async function run () {
for await (const item of iterable) {
await write(item)
}
}
run.catch((err) => console.error(err)) |
Using a const writeController = new WriteController();
for await (const chunk of source) {
const buffered = sink.write(chunk, { writeController });
await somethingRisky();
// Produces a promise that represents the successful flushing of all previous
// chunks written to the sink and associated with the `WriteController`.
await writeController.next();
} If you do not want to block on each chunk but only are interested in knowing if all chunks have been collectively flushed: const writeController = new WriteController();
for await (const chunk of source) {
const buffered = sink.write(chunk, { writeController });
// Maybe interested in knowing about highWaterMark
await somethingRisky();
}
// Since we're outside the `for await` loop, we can also easily defer waiting for flushing
// here.
await writeController.next(); The idea here could also live totally outside of const writeController = new WriteController();
for await (const chunk of source) {
const whouldWait = writeController.write(sink, chunk);
if (shouldWait) {
await writeController.draining();
}
}
// I want to make sure all my chunks have flushed
await writeController.flushed(); |
Might be incorrect, but this could be what const http = require('http')
const { promisify } = require('util')
http.promises.get = promisify(http.get)
(async () => {
try {
const res = await http.promises.get('localhost:3000')
for await (let chunk of res) {
await ...
}
} catch (err) {
console.log(err)
process.exit(1)
}
})() |
Potential const req = request('https://example.com');
const res = await req.response;
for await (let chunk of res.body) {
console.log(chunk);
} |
@Ethan-Arrowood fwiw that's basically the import { fetch } from 'http';
const res = await fetch('http://localhost:3000'); // we can use top-level await
for await (let chunk of res) {
await ...
} |
Easy one: promisify const server = http.createServer(() => { });
const address = await server.listen(0); |
var chunks = [a,b,c]
for (chunk : chunks) {
// Returns a promise that resolves when we can write the next chunk (equivalent to true/false return)
const c = w.write(chunk, () => {})
// Resolves when writing the chunk has been completed (equivalent to write callback)
c.done.then(() => {})
await c;
} |
Using thenables for async function run () {
for await (const item of iterable) {
const queued = w.write(item)
// await queued.then()
// await queued.flushed.then()
}
}
run.catch((err) => console.error(err)) For errors, I think that an error on the chunk should be called to the |
this is what i was thinking too. But also |
A few constraints I can think of for promisifying
I think the sequence of events is very similar for successful requests and those that errored and if you were to offer a Promise API then you would be able to return control to the caller just once, either with a rejection or resolved Promise. Issue 1If you were to reject a promise returned by Issue 2In the case of a premature connection close after the response is received then the callback API still offers the result of the request through the 'response' event but even if you would have the data to be able to include in the reject it is considered an anti-pattern to include such information in a promise rejection. |
For for await (const connection of http.createServer()) {
// ...
} Anything in this block that uses the natural async function flow (via await) will always block the next connection. The only way to avoid this is by creating a callback off of some async primitive. |
const server = http.createServer((req, res) => {
let chunks = [];
for await (let data of req.data) {
chunks.push(data)
}
// or if you only care about the full body, not chunks
await req.dataEnd();
}) Plus similar async iterator methods for any |
Is it possible to stream this session by any chance? (As in open a zoom session) |
@Fishrock123 why would they have to be thenables and not regular promises. @BridgeAR it recently ended I believe. |
yeah agreed, I've updated #216 (comment) with "waitForTheRightMoment" to symbolize the order of events. We can use internals to establish the "right moment" and adapt that to a more internally coupled implementation |
@benjamingr yes, it was in parallel with the releases one, so I could not join earlier but I thought maybe they where still ongoing. Thanks though. |
Sure thing see you in March |
@benjamingr to avoid unhandled promises when people do not actually need to wait for one or the other. Other options:
|
People can join on the zoom link.
On Fri, Dec 13, 2019 at 11:54 AM Ruben Bridgewater ***@***.***> wrote:
Is it possible to stream this session by any chance? (As in open a zoom
session)
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#216?email_source=notifications&email_token=AKYQ3QIVKXB736ZN2JLV3OTQYO45PA5CNFSM4JTN6RX2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOEG2QT3Y#issuecomment-565512687>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AKYQ3QKHQ6HNVOV46G7BYCTQYO45PANCNFSM4JTN6RXQ>
.
--
Eva Howe
Operations Manager
This Dot Labs
[email protected]
|
@evahowe note that this session has ended a while ago and the next one is streaming :] |
I feel like something like this works reasonably well, whether you care about errors or not: Writable.prototype.write[promisify.custom] = () =>
function promisifiedWrite(chunk, encoding) {
let syncErr;
const needDrain = !this.write(chunk, encoding, (err) => {
syncErr = err
});
const stream = this;
return {
then() {
if (syncErr) {
throw syncErr;
}
if (needDrain) {
return new Promise((resolve, reject) => {
stream.once('error', reject);
stream.once('drain', () => {
stream.removeEventListener(reject);
resolve();
});
});
}
}
};
}; Wouldn't leave it as a promisified function though, ideally it's first-class. |
apapirovski did you mean to make that Writable.prototype.write[promisify.custom] = () => function (chunk, encoding) {
let syncErr;
const needDrain = !this.write(chunk, encoding, (err) => {
syncErr = err
});
if (syncErr) return Promise.reject(err); // does this always happen synchronously?
if (!needDrain) return Promise.resolve();
return EventEmitter.once(this, 'drain'); // once handles errors
}; |
@benjamingr nope, meant that to be as is... The implementation you've posted will reject a promise regardless. Avoiding that was an explicit design decision. But also to be fair the throw is kinda sketchy. I don't think that might even work correctly lol. (Although seems to in my testing...) Either way, that line could be swapped for Anyway, sounds like you're suggesting something more like this... Writable.prototype.write[promisify.custom] = () =>
function promisifiedWrite(chunk, encoding) {
let syncErr;
const needDrain = !this.write(chunk, encoding, (err) => {
syncErr = err
});
const stream = this;
return {
get then() {
if (syncErr) {
return Promise.reject(syncErr);
}
if (needDrain) {
return new Promise((resolve, reject) => {
stream.once('error', reject);
stream.once('drain', () => {
stream.removeEventListener(reject);
resolve();
});
});
}
}
};
}; Similar in behavior but the downside is that inspecting the returned object in any way can cause the promises to be created. Either way, my proposal technically can leak memory if someone awaits the promise too late and it required draining... Which I guess brings you to something more like this: Writable.prototype.write[promisify.custom] = () =>
function promisifiedWrite(chunk, encoding) {
let syncErr;
const needDrain = !this.write(chunk, encoding, (err) => {
syncErr = err
});
let drainFn;
if (needDrain) {
stream.once('drain', () => {
needDrain = false;
if (drainFn) {
drainFn();
}
});
}
const stream = this;
return {
then() {
if (syncErr) {
return Promise.reject(syncErr);
}
if (needDrain) {
return new Promise((resolve, reject) => {
stream.once('error', reject);
drainFn = () => {
stream.removeEventListener(reject);
resolve();
};
});
}
}
};
}; |
The Promises/A+ spec (and thus the ECMAScript spec) explicitly requires calling I think lazy promises are mostly confusing (since promises are not actions) - and I agree that if we explore it we should not execute the action on the If what we want is to not create a rejection unless someone is listening and to not listen to certain events until someone is listening (which is arguably confusing) - maybe something like: const lazy = fn => class extends Promise { // so we get catch, finally etc that all call `then`
#promise = null;
then(onFulfilled, onRejected) {
this.#promise = this.#promise || fn();
return this.#promise.then(onFulfilled, onRejected);
}
};
Writable.prototype.write[promisify.custom] = () => function (chunk, encoding) {
let syncErr;
const needDrain = !this.write(chunk, encoding, (err) => {
syncErr = err
});
if (syncErr) {
let writeError = Promise.reject(err);
writeError.catch(() => {}); // don't care about this rejection no need to track it
return writeError;
}
if (!needDrain) return Promise.resolve(); // no effects
return lazy(() => EventEmitter.once(this, 'drain'));
}; |
That's much neater haha
Yeah, I was just taking off from where Jeremiah left it. I can see why some people would want to have the option to not get unhandled rejections in certain scenarios when using the promisified API. (Also need to handle the |
Tweet thread of the session https://twitter.com/trivikram/status/1205512468954521600 |
@mcollina are there any notes or other artifacts for this session? If so can you share them here? I am happy to make a PR and add them to the summit directory or feel free to raise one by yourself. Thanks! |
Most of the discussion is captured in this issue. I plan to open a couple of issues on Node in the coming weeks to keep the discussion going. |
Ok, gonna close this issue then. |
This is my take: Writable.prototype.write[promisify.custom] = () => function (chunk, encoding) {
return new Promise((resolve, reject) => {
let complete = false;
const needDrain = !this.write(chunk, encoding, (err) => {
complete = true;
if (err) {
reject(err)
} else {
resolve();
}
this.off('error', reject);
});
if (complete) {
// Do nothing
} else if (needDrain) {
this.on('error', reject);
} else {
resolve();
}
});
}; Of course all of these has the downside of: let bytesSuccessfullyWritten = 0;
await w.write(buf);
bytesSuccessfullyWritten += buf.length; // BUG
// Coming here does NOT mean that the above write was successful I think an API like this would make more semantic sense: let bytesSuccessfullyWritten = 0;
let bytesBuffered = 0;
for await (const chunks of source) {
bufferedBytes += chunk.length;
if (!w.write(chunk)) {
await w.flush();
bytesSuccessfullyWritten += bytesBuffered;
bytesBuffered = 0;
}
}
await w.end(); Which would avoid confusion caused by the natural assumption that a successfully resolved promise from |
I think the main problem with any variation of I was thinking that some of the challenges around tracking which writes have been flushed and which haven't. Imagine scenarios in which there are multiple logical bits of logic 'concurrently' writing to the same In that scenario, I imagine a Hypothetical interface interface Writer {
/**
* Create a Writer for a given Writable stream.
*
* _Note: No absolute need for this to be a class-style api. It could
* easily be `require('stream').createWriter(writable)`._
*/
new(writable: Stream.Writable): Writer;
/**
* Wait for the stream to be drained.
*
* Moral equivalent to `require('events').once(this.writable, 'drain')`.
*/
drained(): Promise<void>;
/**
* Wait for all chunks written via this `Writer` to be flushed.
*/
flushed(): Promise<void>;
/**
* Write a chunk of data to the stream and produce a `WriteResult`.
*
* @param chunk The chunk of data to be written
* @param encoding Optional chunk encoding
*/
write(chunk: string | Buffer, encoding?: string): WriteResult;
}
interface WriteResult {
/**
* Return a Promise for when this chunk of data is flushed
*
* _Note: Could also be a getter. Important bit is that a promise is
* only produced when this api is *used*._
*/
flushed(): Promise<void>;
/**
* Indication of whether the consumer should block on the `Promise` returned by `Writer#drained`
*/
shouldWaitForDrain: boolean;
} Usage example: const writer = new Writer(sink);
for await (const chunk of source) {
const result = writer.write(chunk);
if (result.shouldWaitForDrain) {
await writer.drained();
}
// If I was interested in making sure each individual chunk was
// flushed, I could uncomment the following:
//
// await result.flushed();
}
// Now that all chunks from source have been written to sink, we want to
// make sure that all those chunks have actually been flushed.
// We're not interested in each specific chunk's flush state but instead
// that _ALL_ chunks have been flushed here so we're not looking at
// `WriteResult#flushed`.
await writer.flushed(); |
Topic of the session
We have added async iterators to stream a quite successfully! It's time to think of what's missing.
Type of the session
Follow-up / Set-up sessions (if any)
Level
Pre-requisite knowledge
Describe the session
Session facilitator(s) and Github handle(s)
Additional context (optional)
The text was updated successfully, but these errors were encountered: