Skip to content

Commit f0b6ff2

Browse files
committed
Support Streams/Async Iterable in Flight Reply
1 parent 0a0a3af commit f0b6ff2

File tree

4 files changed

+722
-14
lines changed

4 files changed

+722
-14
lines changed

packages/react-client/src/ReactFlightReplyClient.js

+141-5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import type {TemporaryReferenceSet} from './ReactFlightTemporaryReferences';
2020
import {
2121
enableRenderableContext,
2222
enableBinaryFlight,
23+
enableFlightReadableStream,
2324
} from 'shared/ReactFeatureFlags';
2425

2526
import {
@@ -28,6 +29,7 @@ import {
2829
REACT_CONTEXT_TYPE,
2930
REACT_PROVIDER_TYPE,
3031
getIteratorFn,
32+
ASYNC_ITERATOR,
3133
} from 'shared/ReactSymbols';
3234

3335
import {
@@ -198,6 +200,123 @@ export function processReply(
198200
return '$' + tag + blobId.toString(16);
199201
}
200202

203+
function serializeReadableStream(stream: ReadableStream): string {
204+
if (formData === null) {
205+
// Upgrade to use FormData to allow us to stream this value.
206+
formData = new FormData();
207+
}
208+
const data = formData;
209+
210+
pendingParts++;
211+
const streamId = nextPartId++;
212+
213+
// Detect if this is a BYOB stream. BYOB streams should be able to be read as bytes on the
214+
// receiving side. It also implies that different chunks can be split up or merged as opposed
215+
// to a readable stream that happens to have Uint8Array as the type which might expect it to be
216+
// received in the same slices.
217+
// $FlowFixMe: This is a Node.js extension.
218+
let supportsBYOB: void | boolean = stream.supportsBYOB;
219+
if (supportsBYOB === undefined) {
220+
try {
221+
// $FlowFixMe[extra-arg]: This argument is accepted.
222+
stream.getReader({mode: 'byob'}).releaseLock();
223+
supportsBYOB = true;
224+
} catch (x) {
225+
supportsBYOB = false;
226+
}
227+
}
228+
229+
const reader = stream.getReader();
230+
231+
function progress(entry: {done: boolean, value: ReactServerValue, ...}) {
232+
if (entry.done) {
233+
// eslint-disable-next-line react-internal/safe-string-coercion
234+
data.append(formFieldPrefix + streamId, 'C'); // Close signal
235+
pendingParts--;
236+
if (pendingParts === 0) {
237+
resolve(data);
238+
}
239+
} else {
240+
try {
241+
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
242+
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
243+
// eslint-disable-next-line react-internal/safe-string-coercion
244+
data.append(formFieldPrefix + streamId, partJSON);
245+
reader.read().then(progress, reject);
246+
} catch (x) {
247+
reject(x);
248+
}
249+
}
250+
}
251+
reader.read().then(progress, reject);
252+
253+
return '$' + (supportsBYOB ? 'r' : 'R') + streamId.toString(16);
254+
}
255+
256+
function serializeAsyncIterable(
257+
iterable: $AsyncIterable<ReactServerValue, ReactServerValue, void>,
258+
iterator: $AsyncIterator<ReactServerValue, ReactServerValue, void>,
259+
): string {
260+
if (formData === null) {
261+
// Upgrade to use FormData to allow us to stream this value.
262+
formData = new FormData();
263+
}
264+
const data = formData;
265+
266+
pendingParts++;
267+
const streamId = nextPartId++;
268+
269+
// Generators/Iterators are Iterables but they're also their own iterator
270+
// functions. If that's the case, we treat them as single-shot. Otherwise,
271+
// we assume that this iterable might be a multi-shot and allow it to be
272+
// iterated more than once on the client.
273+
const isIterator = iterable === iterator;
274+
275+
// There's a race condition between when the stream is aborted and when the promise
276+
// resolves so we track whether we already aborted it to avoid writing twice.
277+
function progress(
278+
entry:
279+
| {done: false, +value: ReactServerValue, ...}
280+
| {done: true, +value: ReactServerValue, ...},
281+
) {
282+
if (entry.done) {
283+
if (entry.value === undefined) {
284+
// eslint-disable-next-line react-internal/safe-string-coercion
285+
data.append(formFieldPrefix + streamId, 'C'); // Close signal
286+
} else {
287+
// Unlike streams, the last value may not be undefined. If it's not
288+
// we outline it and encode a reference to it in the closing instruction.
289+
try {
290+
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
291+
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
292+
data.append(formFieldPrefix + streamId, 'C' + partJSON); // Close signal
293+
} catch (x) {
294+
reject(x);
295+
return;
296+
}
297+
}
298+
pendingParts--;
299+
if (pendingParts === 0) {
300+
resolve(data);
301+
}
302+
} else {
303+
try {
304+
// $FlowFixMe[incompatible-type]: While plain JSON can return undefined we never do here.
305+
const partJSON: string = JSON.stringify(entry.value, resolveToJSON);
306+
// eslint-disable-next-line react-internal/safe-string-coercion
307+
data.append(formFieldPrefix + streamId, partJSON);
308+
iterator.next().then(progress, reject);
309+
} catch (x) {
310+
reject(x);
311+
return;
312+
}
313+
}
314+
}
315+
316+
iterator.next().then(progress, reject);
317+
return '$' + (isIterator ? 'x' : 'X') + streamId.toString(16);
318+
}
319+
201320
function resolveToJSON(
202321
this:
203322
| {+[key: string | number]: ReactServerValue}
@@ -341,11 +460,9 @@ export function processReply(
341460
reject(reason);
342461
}
343462
},
344-
reason => {
345-
// In the future we could consider serializing this as an error
346-
// that throws on the server instead.
347-
reject(reason);
348-
},
463+
// In the future we could consider serializing this as an error
464+
// that throws on the server instead.
465+
reject,
349466
);
350467
return serializePromiseID(promiseId);
351468
}
@@ -472,6 +589,25 @@ export function processReply(
472589
return Array.from((iterator: any));
473590
}
474591

592+
if (enableFlightReadableStream) {
593+
// TODO: ReadableStream is not available in old Node. Remove the typeof check later.
594+
if (
595+
typeof ReadableStream === 'function' &&
596+
value instanceof ReadableStream
597+
) {
598+
return serializeReadableStream(value);
599+
}
600+
const getAsyncIterator: void | (() => $AsyncIterator<any, any, any>) =
601+
(value: any)[ASYNC_ITERATOR];
602+
if (typeof getAsyncIterator === 'function') {
603+
// We treat AsyncIterables as a Fragment and as such we might need to key them.
604+
return serializeAsyncIterable(
605+
(value: any),
606+
getAsyncIterator.call((value: any)),
607+
);
608+
}
609+
}
610+
475611
// Verify that this is a simple plain object.
476612
const proto = getPrototypeOf(value);
477613
if (

packages/react-server-dom-webpack/src/__tests__/ReactFlightDOMReply-test.js

+161
Original file line numberDiff line numberDiff line change
@@ -376,4 +376,165 @@ describe('ReactFlightDOMReply', () => {
376376
// This should've been the same reference that we already saw.
377377
expect(response.children).toBe(children);
378378
});
379+
380+
// @gate enableFlightReadableStream
381+
it('should supports streaming ReadableStream with objects', async () => {
382+
let controller1;
383+
let controller2;
384+
const s1 = new ReadableStream({
385+
start(c) {
386+
controller1 = c;
387+
},
388+
});
389+
const s2 = new ReadableStream({
390+
start(c) {
391+
controller2 = c;
392+
},
393+
});
394+
395+
const promise = ReactServerDOMClient.encodeReply({s1, s2});
396+
397+
controller1.enqueue({hello: 'world'});
398+
controller2.enqueue({hi: 'there'});
399+
400+
controller1.enqueue('text1');
401+
controller2.enqueue('text2');
402+
403+
controller1.close();
404+
controller2.close();
405+
406+
const body = await promise;
407+
408+
const result = await ReactServerDOMServer.decodeReply(
409+
body,
410+
webpackServerMap,
411+
);
412+
const reader1 = result.s1.getReader();
413+
const reader2 = result.s2.getReader();
414+
415+
expect(await reader1.read()).toEqual({
416+
value: {hello: 'world'},
417+
done: false,
418+
});
419+
expect(await reader2.read()).toEqual({
420+
value: {hi: 'there'},
421+
done: false,
422+
});
423+
424+
expect(await reader1.read()).toEqual({
425+
value: 'text1',
426+
done: false,
427+
});
428+
expect(await reader1.read()).toEqual({
429+
value: undefined,
430+
done: true,
431+
});
432+
expect(await reader2.read()).toEqual({
433+
value: 'text2',
434+
done: false,
435+
});
436+
expect(await reader2.read()).toEqual({
437+
value: undefined,
438+
done: true,
439+
});
440+
});
441+
442+
// @gate enableFlightReadableStream
443+
it('should supports streaming AsyncIterables with objects', async () => {
444+
let resolve;
445+
const wait = new Promise(r => (resolve = r));
446+
const multiShotIterable = {
447+
async *[Symbol.asyncIterator]() {
448+
const next = yield {hello: 'A'};
449+
expect(next).toBe(undefined);
450+
await wait;
451+
yield {hi: 'B'};
452+
return 'C';
453+
},
454+
};
455+
const singleShotIterator = (async function* () {
456+
const next = yield {hello: 'D'};
457+
expect(next).toBe(undefined);
458+
await wait;
459+
yield {hi: 'E'};
460+
return 'F';
461+
})();
462+
463+
await resolve();
464+
465+
const body = await ReactServerDOMClient.encodeReply({
466+
multiShotIterable,
467+
singleShotIterator,
468+
});
469+
const result = await ReactServerDOMServer.decodeReply(
470+
body,
471+
webpackServerMap,
472+
);
473+
474+
const iterator1 = result.multiShotIterable[Symbol.asyncIterator]();
475+
const iterator2 = result.singleShotIterator[Symbol.asyncIterator]();
476+
477+
expect(iterator1).not.toBe(result.multiShotIterable);
478+
expect(iterator2).toBe(result.singleShotIterator);
479+
480+
expect(await iterator1.next()).toEqual({
481+
value: {hello: 'A'},
482+
done: false,
483+
});
484+
expect(await iterator2.next()).toEqual({
485+
value: {hello: 'D'},
486+
done: false,
487+
});
488+
489+
expect(await iterator1.next()).toEqual({
490+
value: {hi: 'B'},
491+
done: false,
492+
});
493+
expect(await iterator2.next()).toEqual({
494+
value: {hi: 'E'},
495+
done: false,
496+
});
497+
expect(await iterator1.next()).toEqual({
498+
value: 'C', // Return value
499+
done: true,
500+
});
501+
expect(await iterator1.next()).toEqual({
502+
value: undefined,
503+
done: true,
504+
});
505+
506+
expect(await iterator2.next()).toEqual({
507+
value: 'F', // Return value
508+
done: true,
509+
});
510+
511+
// Multi-shot iterables should be able to do the same thing again
512+
const iterator3 = result.multiShotIterable[Symbol.asyncIterator]();
513+
514+
expect(iterator3).not.toBe(iterator1);
515+
516+
// We should be able to iterate over the iterable again and it should be
517+
// synchronously available using instrumented promises so that React can
518+
// rerender it synchronously.
519+
expect(iterator3.next().value).toEqual({
520+
value: {hello: 'A'},
521+
done: false,
522+
});
523+
expect(iterator3.next().value).toEqual({
524+
value: {hi: 'B'},
525+
done: false,
526+
});
527+
expect(iterator3.next().value).toEqual({
528+
value: 'C', // Return value
529+
done: true,
530+
});
531+
expect(iterator3.next().value).toEqual({
532+
value: undefined,
533+
done: true,
534+
});
535+
536+
expect(() => iterator3.next('this is not allowed')).toThrow(
537+
'Values cannot be passed to next() of AsyncIterables passed to Client Components.',
538+
);
539+
});
379540
});

0 commit comments

Comments
 (0)