Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"art": "0.10.1",
"babel-plugin-syntax-hermes-parser": "^0.32.0",
"babel-plugin-syntax-trailing-function-commas": "^6.5.0",
"busboy": "^1.6.0",
"chalk": "^3.0.0",
"cli-table": "^0.3.1",
"coffee-script": "^1.12.7",
Expand Down
111 changes: 90 additions & 21 deletions packages/react-server-dom-esm/src/server/ReactFlightDOMServerNode.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import {
} from 'react-client/src/ReactFlightClientStreamConfigNode';

import type {TemporaryReferenceSet} from 'react-server/src/ReactFlightServerTemporaryReferences';
import type {FileHandle} from 'react-server/src/ReactFlightReplyServer';

export {createTemporaryReferenceSet} from 'react-server/src/ReactFlightServerTemporaryReferences';

Expand Down Expand Up @@ -329,6 +330,17 @@ function prerenderToNodeStream(
});
}

type PendingFile = {
name: string,
file: FileHandle,
complete: boolean,
// Lazily allocated when a text field arrives after this file's 'file'
// event but before its (deferred) 'end' event. Stored as flat
// [name1, value1, name2, value2, ...] pairs.
queuedFields: null | Array<string>,
next: null | PendingFile,
};

function decodeReplyFromBusboy<T>(
busboyStream: Busboy,
moduleBasePath: ServerManifest,
Expand All @@ -344,14 +356,55 @@ function decodeReplyFromBusboy<T>(
undefined,
options ? options.arraySizeLimit : undefined,
);
let pendingFiles = 0;
const queuedFields: Array<string> = [];

// Linked list of pending files in arrival (payload) order. Text fields that
// arrive while a file is in flight are queued on the tail file's
// `queuedFields` so they can be resolved together when that file completes.
// Fields that arrive while the list is empty bypass it and resolve
// immediately. This makes the backing FormData's insertion order match the
// payload's entry order.
let head: null | PendingFile = null;
let tail: null | PendingFile = null;
let bodyFinished = false;
let closed = false;

function flush() {
while (head !== null) {
const current = head;
if (!current.complete) {
// This file is still streaming. Hold later files and fields until it
// completes so the backing FormData reflects payload order.
return;
Comment thread
unstubbable marked this conversation as resolved.
}
try {
resolveFileComplete(response, current.name, current.file);
const queuedFields = current.queuedFields;
if (queuedFields !== null) {
for (let i = 0; i < queuedFields.length; i += 2) {
resolveField(response, queuedFields[i], queuedFields[i + 1]);
}
}
} catch (error) {
busboyStream.destroy(error);
return;
}
head = current.next;
}
tail = null;
if (bodyFinished && !closed) {
closed = true;
close(response);
}
}

busboyStream.on('field', (name, value) => {
if (pendingFiles > 0) {
// Because the 'end' event fires two microtasks after the next 'field'
// we would resolve files and fields out of order. To handle this properly
// we queue any fields we receive until the previous file is done.
queuedFields.push(name, value);
if (tail !== null) {
// A file is in flight; queue the field on the tail (most recent) pending
// file so it resolves after that file, preserving payload order.
if (tail.queuedFields === null) {
tail.queuedFields = [];
}
tail.queuedFields.push(name, value);
} else {
try {
resolveField(response, name, value);
Expand All @@ -371,29 +424,45 @@ function decodeReplyFromBusboy<T>(
);
return;
}
pendingFiles++;
const file = resolveFileInfo(response, name, filename, mimeType);
const pendingFile: PendingFile = {
name,
file,
complete: false,
queuedFields: null,
next: null,
};
if (tail === null) {
head = pendingFile;
} else {
tail.next = pendingFile;
}
tail = pendingFile;
value.on('data', chunk => {
resolveFileChunk(response, file, chunk);
});
value.on('end', () => {
try {
resolveFileComplete(response, name, file);
pendingFiles--;
if (pendingFiles === 0) {
// Release any queued fields
for (let i = 0; i < queuedFields.length; i += 2) {
resolveField(response, queuedFields[i], queuedFields[i + 1]);
}
queuedFields.length = 0;
}
resolveFileChunk(response, file, chunk);
} catch (error) {
busboyStream.destroy(error);
}
});
value.on('error', error => {
busboyStream.destroy(error);
});
value.on('end', () => {
pendingFile.complete = true;
flush();
});
});
busboyStream.on('finish', () => {
close(response);
bodyFinished = true;
flush();
Comment thread
unstubbable marked this conversation as resolved.
if (!closed) {
// Invariant: busboy delays 'finish' until every file's 'end' event has
// fired, so the flush above should always close the response.
busboyStream.destroy(
new Error('Reply finished with incomplete file part.'),
);
Comment thread
unstubbable marked this conversation as resolved.
Outdated
}
});
busboyStream.on('error', err => {
reportGlobalError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import {
import {textEncoder} from 'react-server/src/ReactServerStreamConfigNode';

import type {TemporaryReferenceSet} from 'react-server/src/ReactFlightServerTemporaryReferences';
import type {FileHandle} from 'react-server/src/ReactFlightReplyServer';

export {createTemporaryReferenceSet} from 'react-server/src/ReactFlightServerTemporaryReferences';

Expand Down Expand Up @@ -560,6 +561,17 @@ export function registerServerActions(manifest: ServerManifest) {
serverManifest = manifest;
}

type PendingFile = {
name: string,
file: FileHandle,
complete: boolean,
// Lazily allocated when a text field arrives after this file's 'file'
// event but before its (deferred) 'end' event. Stored as flat
// [name1, value1, name2, value2, ...] pairs.
queuedFields: null | Array<string>,
next: null | PendingFile,
};

export function decodeReplyFromBusboy<T>(
busboyStream: Busboy,
options?: {
Expand All @@ -574,14 +586,55 @@ export function decodeReplyFromBusboy<T>(
undefined,
options ? options.arraySizeLimit : undefined,
);
let pendingFiles = 0;
const queuedFields: Array<string> = [];

// Linked list of pending files in arrival (payload) order. Text fields that
// arrive while a file is in flight are queued on the tail file's
// `queuedFields` so they can be resolved together when that file completes.
// Fields that arrive while the list is empty bypass it and resolve
// immediately. This makes the backing FormData's insertion order match the
// payload's entry order.
let head: null | PendingFile = null;
let tail: null | PendingFile = null;
let bodyFinished = false;
let closed = false;

function flush() {
while (head !== null) {
const current = head;
if (!current.complete) {
// This file is still streaming. Hold later files and fields until it
// completes so the backing FormData reflects payload order.
return;
}
try {
resolveFileComplete(response, current.name, current.file);
const queuedFields = current.queuedFields;
if (queuedFields !== null) {
for (let i = 0; i < queuedFields.length; i += 2) {
resolveField(response, queuedFields[i], queuedFields[i + 1]);
}
}
} catch (error) {
busboyStream.destroy(error);
return;
}
head = current.next;
}
tail = null;
if (bodyFinished && !closed) {
closed = true;
close(response);
}
}

busboyStream.on('field', (name, value) => {
if (pendingFiles > 0) {
// Because the 'end' event fires two microtasks after the next 'field'
// we would resolve files and fields out of order. To handle this properly
// we queue any fields we receive until the previous file is done.
queuedFields.push(name, value);
if (tail !== null) {
// A file is in flight; queue the field on the tail (most recent) pending
// file so it resolves after that file, preserving payload order.
if (tail.queuedFields === null) {
tail.queuedFields = [];
}
tail.queuedFields.push(name, value);
} else {
try {
resolveField(response, name, value);
Expand All @@ -601,29 +654,45 @@ export function decodeReplyFromBusboy<T>(
);
return;
}
pendingFiles++;
const file = resolveFileInfo(response, name, filename, mimeType);
const pendingFile: PendingFile = {
name,
file,
complete: false,
queuedFields: null,
next: null,
};
if (tail === null) {
head = pendingFile;
} else {
tail.next = pendingFile;
}
tail = pendingFile;
value.on('data', chunk => {
resolveFileChunk(response, file, chunk);
});
value.on('end', () => {
try {
resolveFileComplete(response, name, file);
pendingFiles--;
if (pendingFiles === 0) {
// Release any queued fields
for (let i = 0; i < queuedFields.length; i += 2) {
resolveField(response, queuedFields[i], queuedFields[i + 1]);
}
queuedFields.length = 0;
}
resolveFileChunk(response, file, chunk);
} catch (error) {
busboyStream.destroy(error);
}
});
value.on('error', error => {
busboyStream.destroy(error);
});
value.on('end', () => {
pendingFile.complete = true;
flush();
});
});
busboyStream.on('finish', () => {
close(response);
bodyFinished = true;
flush();
if (!closed) {
// Invariant: busboy delays 'finish' until every file's 'end' event has
// fired, so the flush above should always close the response.
busboyStream.destroy(
new Error('Reply finished with incomplete file part.'),
);
}
});
busboyStream.on('error', err => {
reportGlobalError(
Expand Down
Loading
Loading