Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"art": "0.10.1",
"babel-plugin-syntax-hermes-parser": "^0.32.0",
"babel-plugin-syntax-trailing-function-commas": "^6.5.0",
"busboy": "^1.6.0",
"chalk": "^3.0.0",
"cli-table": "^0.3.1",
"coffee-script": "^1.12.7",
Expand Down
104 changes: 83 additions & 21 deletions packages/react-server-dom-esm/src/server/ReactFlightDOMServerNode.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import {
} from 'react-client/src/ReactFlightClientStreamConfigNode';

import type {TemporaryReferenceSet} from 'react-server/src/ReactFlightServerTemporaryReferences';
import type {FileHandle} from 'react-server/src/ReactFlightReplyServer';

export {createTemporaryReferenceSet} from 'react-server/src/ReactFlightServerTemporaryReferences';

Expand Down Expand Up @@ -329,6 +330,16 @@ function prerenderToNodeStream(
});
}

type PendingFile = {
name: string,
file: FileHandle,
complete: boolean,
// Lazily allocated when a text field arrives after this file's 'file'
// event but before its (deferred) 'end' event. Stored as flat
// [name1, value1, name2, value2, ...] pairs.
queuedFields: null | Array<string>,
};

function decodeReplyFromBusboy<T>(
busboyStream: Busboy,
moduleBasePath: ServerManifest,
Expand All @@ -344,14 +355,62 @@ function decodeReplyFromBusboy<T>(
undefined,
options ? options.arraySizeLimit : undefined,
);
let pendingFiles = 0;
const queuedFields: Array<string> = [];

// Buffer of files in arrival (payload) order. Text fields that arrive while a
// file is in flight are queued on the most recent pending file's
// `queuedFields` so they can be resolved together when that file completes.
// Fields that arrive while the buffer is empty bypass it and resolve
// immediately. This makes the backing FormData's insertion order match the
// payload's entry order.
//
// We drain by advancing a pointer rather than shifting from the front so the
// total drain stays O(N).
const pendingFiles: Array<PendingFile> = [];
let flushedUpTo = 0;
let bodyFinished = false;
let closed = false;

function flush() {
while (flushedUpTo < pendingFiles.length) {
const pendingFile = pendingFiles[flushedUpTo];
if (!pendingFile.complete) {
// This file is still streaming. Hold later files and fields until it
// completes so the backing FormData reflects payload order.
return;
Comment thread
unstubbable marked this conversation as resolved.
}
try {
resolveFileComplete(response, pendingFile.name, pendingFile.file);
const queuedFields = pendingFile.queuedFields;
if (queuedFields !== null) {
for (let i = 0; i < queuedFields.length; i += 2) {
resolveField(response, queuedFields[i], queuedFields[i + 1]);
}
}
} catch (error) {
busboyStream.destroy(error);
return;
}
flushedUpTo++;
}
// Fully drained — release the drained wrapper objects for GC instead of
// letting them accumulate until the response promise resolves.
pendingFiles.length = 0;
flushedUpTo = 0;
if (bodyFinished && !closed) {
closed = true;
close(response);
}
}

busboyStream.on('field', (name, value) => {
if (pendingFiles > 0) {
// Because the 'end' event fires two microtasks after the next 'field'
// we would resolve files and fields out of order. To handle this properly
// we queue any fields we receive until the previous file is done.
queuedFields.push(name, value);
if (flushedUpTo < pendingFiles.length) {
// A file is in flight; queue the field on the most recent pending file so
// it resolves after that file, preserving payload order.
const mostRecentPendingFile = pendingFiles[pendingFiles.length - 1];
if (mostRecentPendingFile.queuedFields === null) {
mostRecentPendingFile.queuedFields = [];
}
mostRecentPendingFile.queuedFields.push(name, value);
} else {
try {
resolveField(response, name, value);
Expand All @@ -371,29 +430,32 @@ function decodeReplyFromBusboy<T>(
);
return;
}
pendingFiles++;
const file = resolveFileInfo(response, name, filename, mimeType);
const pendingFile: PendingFile = {
name,
file,
complete: false,
queuedFields: null,
};
pendingFiles.push(pendingFile);
value.on('data', chunk => {
resolveFileChunk(response, file, chunk);
});
value.on('end', () => {
try {
resolveFileComplete(response, name, file);
pendingFiles--;
if (pendingFiles === 0) {
// Release any queued fields
for (let i = 0; i < queuedFields.length; i += 2) {
resolveField(response, queuedFields[i], queuedFields[i + 1]);
}
queuedFields.length = 0;
}
resolveFileChunk(response, file, chunk);
} catch (error) {
busboyStream.destroy(error);
}
});
value.on('error', error => {
busboyStream.destroy(error);
});
value.on('end', () => {
pendingFile.complete = true;
flush();
});
});
busboyStream.on('finish', () => {
close(response);
bodyFinished = true;
flush();
Comment thread
unstubbable marked this conversation as resolved.
});
busboyStream.on('error', err => {
reportGlobalError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import {
import {textEncoder} from 'react-server/src/ReactServerStreamConfigNode';

import type {TemporaryReferenceSet} from 'react-server/src/ReactFlightServerTemporaryReferences';
import type {FileHandle} from 'react-server/src/ReactFlightReplyServer';

export {createTemporaryReferenceSet} from 'react-server/src/ReactFlightServerTemporaryReferences';

Expand Down Expand Up @@ -560,6 +561,16 @@ export function registerServerActions(manifest: ServerManifest) {
serverManifest = manifest;
}

type PendingFile = {
name: string,
file: FileHandle,
complete: boolean,
// Lazily allocated when a text field arrives after this file's 'file'
// event but before its (deferred) 'end' event. Stored as flat
// [name1, value1, name2, value2, ...] pairs.
queuedFields: null | Array<string>,
};

export function decodeReplyFromBusboy<T>(
busboyStream: Busboy,
options?: {
Expand All @@ -574,14 +585,62 @@ export function decodeReplyFromBusboy<T>(
undefined,
options ? options.arraySizeLimit : undefined,
);
let pendingFiles = 0;
const queuedFields: Array<string> = [];

// Buffer of files in arrival (payload) order. Text fields that arrive while a
// file is in flight are queued on the most recent pending file's
// `queuedFields` so they can be resolved together when that file completes.
// Fields that arrive while the buffer is empty bypass it and resolve
// immediately. This makes the backing FormData's insertion order match the
// payload's entry order.
//
// We drain by advancing a pointer rather than shifting from the front so the
// total drain stays O(N).
const pendingFiles: Array<PendingFile> = [];
let flushedUpTo = 0;
let bodyFinished = false;
let closed = false;

function flush() {
while (flushedUpTo < pendingFiles.length) {
const pendingFile = pendingFiles[flushedUpTo];
if (!pendingFile.complete) {
// This file is still streaming. Hold later files and fields until it
// completes so the backing FormData reflects payload order.
return;
}
try {
resolveFileComplete(response, pendingFile.name, pendingFile.file);
const queuedFields = pendingFile.queuedFields;
if (queuedFields !== null) {
for (let i = 0; i < queuedFields.length; i += 2) {
resolveField(response, queuedFields[i], queuedFields[i + 1]);
}
}
} catch (error) {
busboyStream.destroy(error);
return;
}
flushedUpTo++;
}
// Fully drained — release the drained wrapper objects for GC instead of
// letting them accumulate until the response promise resolves.
pendingFiles.length = 0;
flushedUpTo = 0;
if (bodyFinished && !closed) {
closed = true;
close(response);
}
}

busboyStream.on('field', (name, value) => {
if (pendingFiles > 0) {
// Because the 'end' event fires two microtasks after the next 'field'
// we would resolve files and fields out of order. To handle this properly
// we queue any fields we receive until the previous file is done.
queuedFields.push(name, value);
if (flushedUpTo < pendingFiles.length) {
// A file is in flight; queue the field on the most recent pending file so
// it resolves after that file, preserving payload order.
const mostRecentPendingFile = pendingFiles[pendingFiles.length - 1];
if (mostRecentPendingFile.queuedFields === null) {
mostRecentPendingFile.queuedFields = [];
}
mostRecentPendingFile.queuedFields.push(name, value);
} else {
try {
resolveField(response, name, value);
Expand All @@ -601,29 +660,32 @@ export function decodeReplyFromBusboy<T>(
);
return;
}
pendingFiles++;
const file = resolveFileInfo(response, name, filename, mimeType);
const pendingFile: PendingFile = {
name,
file,
complete: false,
queuedFields: null,
};
pendingFiles.push(pendingFile);
value.on('data', chunk => {
resolveFileChunk(response, file, chunk);
});
value.on('end', () => {
try {
resolveFileComplete(response, name, file);
pendingFiles--;
if (pendingFiles === 0) {
// Release any queued fields
for (let i = 0; i < queuedFields.length; i += 2) {
resolveField(response, queuedFields[i], queuedFields[i + 1]);
}
queuedFields.length = 0;
}
resolveFileChunk(response, file, chunk);
} catch (error) {
busboyStream.destroy(error);
}
});
value.on('error', error => {
busboyStream.destroy(error);
});
value.on('end', () => {
pendingFile.complete = true;
flush();
});
});
busboyStream.on('finish', () => {
close(response);
bodyFinished = true;
flush();
});
busboyStream.on('error', err => {
reportGlobalError(
Expand Down
Loading
Loading