Skip to content

Commit ec9400d

Browse files
authored
[Flight Reply] Encode ReadableStream and AsyncIterables (#28893)
Same as #28847 but in the other direction. Like other promises, this doesn't actually stream in the outgoing direction. It buffers until the stream is done. This is mainly due to our protocol remains compatible with Safari's lack of outgoing streams until recently. However, the stream chunks are encoded as separate fields and so does support the busboy streaming on the receiving side.
1 parent 5fcfd71 commit ec9400d

File tree

4 files changed

+722
-14
lines changed

4 files changed

+722
-14
lines changed

packages/react-client/src/ReactFlightReplyClient.js

+141-5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences';
2020
import {
2121
enableRenderableContext,
2222
enableBinaryFlight,
23+
enableFlightReadableStream,
2324
} from 'shared/ReactFeatureFlags';
2425

2526
import {
@@ -28,6 +29,7 @@ import {
2829
REACT_CONTEXT_TYPE,
2930
REACT_PROVIDER_TYPE,
3031
getIteratorFn,
32+
ASYNC_ITERATOR,
3133
} from 'shared/ReactSymbols';
3234

3335
import {
@@ -206,6 +208,123 @@ export function processReply(
206208
return '$' + tag + blobId.toString(16);
207209
}
208210

211+
function serializeReadableStream(stream: ReadableStream): string {
212+
if (formData === null) {
213+
// Upgrade to use FormData to allow us to stream this value.
214+
formData = new FormData();
215+
}
216+
const data = formData;
217+
218+
pendingParts++;
219+
const streamId = nextPartId++;
220+
221+
// Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the
222+
// receiving side. It also implies that different chunks can be split up or merged as opposed
223+
// to a readable stream that happens to have Uint8Array as the type which might expect it to be
224+
// received in the same slices.
225+
// $FlowFixMe: This is a Node.js extension.
226+
let supportsBYOB: void | boolean = stream.supportsBYOB;
227+
if (supportsBYOB === undefined) {
228+
try {
229+
// $FlowFixMe[extra-arg]: This argument is accepted.
230+
stream.getReader({mode: 'byob'}).releaseLock();
231+
supportsBYOB = true;
232+
} catch (x) {
233+
supportsBYOB = false;
234+
}
235+
}
236+
237+
const reader = stream.getReader();
238+
239+
function progress(entry: {done: boolean, value: ReactServerValue, ...}) {
240+
if (entry.done) {
241+
// eslint-disable-next-line react-internal/safe-string-coercion
242+
data.append(formFieldPrefix + streamId, 'C'); // Close signal
243+
pendingParts--;
244+
if (pendingParts === 0) {
245+
resolve(data);
246+
}
247+
} else {
248+
try {
249+
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
250+
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
251+
// eslint-disable-next-line react-internal/safe-string-coercion
252+
data.append(formFieldPrefix + streamId, partJSON);
253+
reader.read().then(progress, reject);
254+
} catch (x) {
255+
reject(x);
256+
}
257+
}
258+
}
259+
reader.read().then(progress, reject);
260+
261+
return '$' + (supportsBYOB ? 'r' : 'R') + streamId.toString(16);
262+
}
263+
264+
function serializeAsyncIterable(
265+
iterable: $AsyncIterable<ReactServerValue, ReactServerValue, void>,
266+
iterator: $AsyncIterator<ReactServerValue, ReactServerValue, void>,
267+
): string {
268+
if (formData === null) {
269+
// Upgrade to use FormData to allow us to stream this value.
270+
formData = new FormData();
271+
}
272+
const data = formData;
273+
274+
pendingParts++;
275+
const streamId = nextPartId++;
276+
277+
// Generators/Iterators are Iterables but they're also their own iterator
278+
// functions. If that's the case, we treat them as single-shot. Otherwise,
279+
// we assume that this iterable might be a multi-shot and allow it to be
280+
// iterated more than once on the receiving server.
281+
const isIterator = iterable === iterator;
282+
283+
// There's a race condition between when the stream is aborted and when the promise
284+
// resolves so we track whether we already aborted it to avoid writing twice.
285+
function progress(
286+
entry:
287+
| {done: false, +value: ReactServerValue, ...}
288+
| {done: true, +value: ReactServerValue, ...},
289+
) {
290+
if (entry.done) {
291+
if (entry.value === undefined) {
292+
// eslint-disable-next-line react-internal/safe-string-coercion
293+
data.append(formFieldPrefix + streamId, 'C'); // Close signal
294+
} else {
295+
// Unlike streams, the last value may not be undefined. If it's not
296+
// we outline it and encode a reference to it in the closing instruction.
297+
try {
298+
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
299+
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
300+
data.append(formFieldPrefix + streamId, 'C' + partJSON); // Close signal
301+
} catch (x) {
302+
reject(x);
303+
return;
304+
}
305+
}
306+
pendingParts--;
307+
if (pendingParts === 0) {
308+
resolve(data);
309+
}
310+
} else {
311+
try {
312+
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
313+
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
314+
// eslint-disable-next-line react-internal/safe-string-coercion
315+
data.append(formFieldPrefix + streamId, partJSON);
316+
iterator.next().then(progress, reject);
317+
} catch (x) {
318+
reject(x);
319+
return;
320+
}
321+
}
322+
}
323+
324+
iterator.next().then(progress, reject);
325+
return '$' + (isIterator ? 'x' : 'X') + streamId.toString(16);
326+
}
327+
209328
function resolveToJSON(
210329
this:
211330
| {+[key: string | number]: ReactServerValue}
@@ -349,11 +468,9 @@ export function processReply(
349468
reject(reason);
350469
}
351470
},
352-
reason => {
353-
// In the future we could consider serializing this as an error
354-
// that throws on the server instead.
355-
reject(reason);
356-
},
471+
// In the future we could consider serializing this as an error
472+
// that throws on the server instead.
473+
reject,
357474
);
358475
return serializePromiseID(promiseId);
359476
}
@@ -486,6 +603,25 @@ export function processReply(
486603
return Array.from((iterator: any));
487604
}
488605

606+
if (enableFlightReadableStream) {
607+
// TODO: ReadableStream is not available in old Node. Remove the typeof check later.
608+
if (
609+
typeof ReadableStream === 'function' &&
610+
value instanceof ReadableStream
611+
) {
612+
return serializeReadableStream(value);
613+
}
614+
const getAsyncIterator: void | (() => $AsyncIterator<any, any, any>) =
615+
(value: any)[ASYNC_ITERATOR];
616+
if (typeof getAsyncIterator === 'function') {
617+
// We treat AsyncIterables as a Fragment and as such we might need to key them.
618+
return serializeAsyncIterable(
619+
(value: any),
620+
getAsyncIterator.call((value: any)),
621+
);
622+
}
623+
}
624+
489625
// Verify that this is a simple plain object.
490626
const proto = getPrototypeOf(value);
491627
if (

packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReply-test.js

+161
Original file line numberDiff line numberDiff line change
@@ -376,4 +376,165 @@ describe('ReactFlightDOMReply', () => {
376376
// This should've been the same reference that we already saw.
377377
expect(response.children).toBe(children);
378378
});
379+
380+
// @gate enableFlightReadableStream
381+
it('should supports streaming ReadableStream with objects', async () => {
382+
let controller1;
383+
let controller2;
384+
const s1 = new ReadableStream({
385+
start(c) {
386+
controller1 = c;
387+
},
388+
});
389+
const s2 = new ReadableStream({
390+
start(c) {
391+
controller2 = c;
392+
},
393+
});
394+
395+
const promise = ReactServerDOMClient.encodeReply({s1, s2});
396+
397+
controller1.enqueue({hello: 'world'});
398+
controller2.enqueue({hi: 'there'});
399+
400+
controller1.enqueue('text1');
401+
controller2.enqueue('text2');
402+
403+
controller1.close();
404+
controller2.close();
405+
406+
const body = await promise;
407+
408+
const result = await ReactServerDOMServer.decodeReply(
409+
body,
410+
webpackServerMap,
411+
);
412+
const reader1 = result.s1.getReader();
413+
const reader2 = result.s2.getReader();
414+
415+
expect(await reader1.read()).toEqual({
416+
value: {hello: 'world'},
417+
done: false,
418+
});
419+
expect(await reader2.read()).toEqual({
420+
value: {hi: 'there'},
421+
done: false,
422+
});
423+
424+
expect(await reader1.read()).toEqual({
425+
value: 'text1',
426+
done: false,
427+
});
428+
expect(await reader1.read()).toEqual({
429+
value: undefined,
430+
done: true,
431+
});
432+
expect(await reader2.read()).toEqual({
433+
value: 'text2',
434+
done: false,
435+
});
436+
expect(await reader2.read()).toEqual({
437+
value: undefined,
438+
done: true,
439+
});
440+
});
441+
442+
// @gate enableFlightReadableStream
443+
it('should supports streaming AsyncIterables with objects', async () => {
444+
let resolve;
445+
const wait = new Promise(r => (resolve = r));
446+
const multiShotIterable = {
447+
async *[Symbol.asyncIterator]() {
448+
const next = yield {hello: 'A'};
449+
expect(next).toBe(undefined);
450+
await wait;
451+
yield {hi: 'B'};
452+
return 'C';
453+
},
454+
};
455+
const singleShotIterator = (async function* () {
456+
const next = yield {hello: 'D'};
457+
expect(next).toBe(undefined);
458+
await wait;
459+
yield {hi: 'E'};
460+
return 'F';
461+
})();
462+
463+
await resolve();
464+
465+
const body = await ReactServerDOMClient.encodeReply({
466+
multiShotIterable,
467+
singleShotIterator,
468+
});
469+
const result = await ReactServerDOMServer.decodeReply(
470+
body,
471+
webpackServerMap,
472+
);
473+
474+
const iterator1 = result.multiShotIterable[Symbol.asyncIterator]();
475+
const iterator2 = result.singleShotIterator[Symbol.asyncIterator]();
476+
477+
expect(iterator1).not.toBe(result.multiShotIterable);
478+
expect(iterator2).toBe(result.singleShotIterator);
479+
480+
expect(await iterator1.next()).toEqual({
481+
value: {hello: 'A'},
482+
done: false,
483+
});
484+
expect(await iterator2.next()).toEqual({
485+
value: {hello: 'D'},
486+
done: false,
487+
});
488+
489+
expect(await iterator1.next()).toEqual({
490+
value: {hi: 'B'},
491+
done: false,
492+
});
493+
expect(await iterator2.next()).toEqual({
494+
value: {hi: 'E'},
495+
done: false,
496+
});
497+
expect(await iterator1.next()).toEqual({
498+
value: 'C', // Return value
499+
done: true,
500+
});
501+
expect(await iterator1.next()).toEqual({
502+
value: undefined,
503+
done: true,
504+
});
505+
506+
expect(await iterator2.next()).toEqual({
507+
value: 'F', // Return value
508+
done: true,
509+
});
510+
511+
// Multi-shot iterables should be able to do the same thing again
512+
const iterator3 = result.multiShotIterable[Symbol.asyncIterator]();
513+
514+
expect(iterator3).not.toBe(iterator1);
515+
516+
// We should be able to iterate over the iterable again and it should be
517+
// synchronously available using instrumented promises so that React can
518+
// rerender it synchronously.
519+
expect(iterator3.next().value).toEqual({
520+
value: {hello: 'A'},
521+
done: false,
522+
});
523+
expect(iterator3.next().value).toEqual({
524+
value: {hi: 'B'},
525+
done: false,
526+
});
527+
expect(iterator3.next().value).toEqual({
528+
value: 'C', // Return value
529+
done: true,
530+
});
531+
expect(iterator3.next().value).toEqual({
532+
value: undefined,
533+
done: true,
534+
});
535+
536+
expect(() => iterator3.next('this is not allowed')).toThrow(
537+
'Values cannot be passed to next() of AsyncIterables passed to Client Components.',
538+
);
539+
});
379540
});

0 commit comments

Comments
 (0)