Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Web Streams cleanup #56819

Merged
merged 17 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions packages/next/src/experimental/testmode/proxy/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ import { UNHANDLED } from './types'
import type { FetchHandler } from './fetch-api'
import { handleFetch } from './fetch-api'

function readBody(req: IncomingMessage): Promise<Buffer> {
return new Promise<Buffer>((resolve, reject) => {
const acc: Buffer[] = []
req.on('data', (chunk) => {
acc.push(chunk)
})
req.on('end', () => {
resolve(Buffer.concat(acc))
})
req.on('error', reject)
})
async function readBody(req: IncomingMessage): Promise<Buffer> {
const acc: Buffer[] = []

for await (const chunk of req) {
acc.push(chunk)
}

return Buffer.concat(acc)
}

export async function createProxyServer({
Expand Down
5 changes: 1 addition & 4 deletions packages/next/src/export/routes/app-page.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ export async function generatePrefetchRsc(
renderOpts
)

prefetchRenderResult.pipe(res)
await res.hasStreamed

const prefetchRscData = Buffer.concat(res.buffers)
const prefetchRscData = await prefetchRenderResult.toUnchunkedString(true)

if ((renderOpts as any).store.staticPrefetchBailout) return

Expand Down
6 changes: 5 additions & 1 deletion packages/next/src/export/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ export interface AmpValidation {
export type FileWriter = (
type: string,
path: string,
content: any,
content:
| string
| NodeJS.ArrayBufferView
| Iterable<string | NodeJS.ArrayBufferView>
| AsyncIterable<string | NodeJS.ArrayBufferView>,
encodingOptions?: WriteFileOptions
) => Promise<void>

Expand Down
21 changes: 11 additions & 10 deletions packages/next/src/server/app-render/action-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,20 @@ function nodeToWebReadableStream(nodeReadable: import('stream').Readable) {
return Readable.toWeb(nodeReadable)
}

const iterator = nodeReadable[Symbol.asyncIterator]()

return new ReadableStream({
start(controller) {
nodeReadable.on('data', (chunk) => {
controller.enqueue(chunk)
})
pull: async (controller) => {
const { value, done } = await iterator.next()

nodeReadable.on('end', () => {
if (done) {
controller.close()
})

nodeReadable.on('error', (error) => {
controller.error(error)
})
} else {
controller.enqueue(value)
}
},
cancel: () => {
iterator.return?.()
},
})
} else {
Expand Down
7 changes: 3 additions & 4 deletions packages/next/src/server/app-render/app-render.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
renderToInitialFizzStream,
createBufferedTransformStream,
continueFizzStream,
streamToBufferedResult,
cloneTransformStream,
} from '../stream-utils/node-web-streams-helper'
import { canSegmentBeOverridden } from '../../client/components/match-segments'
Expand Down Expand Up @@ -908,7 +907,7 @@ async function renderToHTMLOrFlightImpl(
})

if (staticGenerationStore.isStaticGeneration) {
const htmlResult = await streamToBufferedResult(renderResult)
const htmlResult = await renderResult.toUnchunkedString(true)

// if we encountered any unexpected errors during build
// we fail the prerendering phase and the build
Expand All @@ -918,9 +917,9 @@ async function renderToHTMLOrFlightImpl(

// TODO-APP: derive this from same pass to prevent additional
// render during static generation
const stringifiedFlightPayload = await streamToBufferedResult(
const stringifiedFlightPayload = await (
await generateFlight(ctx)
)
).toUnchunkedString(true)

if (staticGenerationStore.forceStatic === false) {
staticGenerationStore.revalidate = 0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { RenderOpts } from './types'
import type { FlightResponseRef } from './flight-response-ref'
import type { AppPageModule } from '../future/route-modules/app-page/module'
import type { createErrorHandler } from './create-error-handler'

import React, { use } from 'react'
import type { createErrorHandler } from './create-error-handler'
import { useFlightResponse } from './use-flight-response'

/**
Expand Down
90 changes: 46 additions & 44 deletions packages/next/src/server/app-render/use-flight-response.tsx
Original file line number Diff line number Diff line change
@@ -1,14 +1,47 @@
import type { ClientReferenceManifest } from '../../build/webpack/plugins/flight-manifest-plugin'
import type { FlightResponseRef } from './flight-response-ref'
import { encodeText, decodeText } from '../stream-utils/encode-decode'

import { htmlEscapeJsonString } from '../htmlescape'
import {
createDecodeTransformStream,
createEncodeTransformStream,
} from '../stream-utils/encode-decode'

const isEdgeRuntime = process.env.NEXT_RUNTIME === 'edge'

const INLINE_FLIGHT_PAYLOAD_BOOTSTRAP = 0
const INLINE_FLIGHT_PAYLOAD_DATA = 1
const INLINE_FLIGHT_PAYLOAD_FORM_STATE = 2

function createFlightTransformer(
nonce: string | undefined,
formState: unknown | null
) {
const startScriptTag = nonce
? `<script nonce=${JSON.stringify(nonce)}>`
: '<script>'

return new TransformStream<string, string>({
// Bootstrap the flight information.
start(controller) {
controller.enqueue(
`${startScriptTag}(self.__next_f=self.__next_f||[]).push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_BOOTSTRAP])
)});self.__next_f.push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_FORM_STATE, formState])
)})</script>`
)
},
transform(chunk, controller) {
const scripts = `${startScriptTag}self.__next_f.push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_DATA, chunk])
)})</script>`

controller.enqueue(scripts)
},
})
}

/**
* Render Flight stream.
* This is only used for renderToHTML, the Flight response does not need additional wrappers.
Expand Down Expand Up @@ -49,50 +82,19 @@ export function useFlightResponse(
})
flightResponseRef.current = res

let bootstrapped = false
// We only attach CSS chunks to the inlined data.
const forwardReader = forwardStream.getReader()
const writer = writable.getWriter()
const startScriptTag = nonce
? `<script nonce=${JSON.stringify(nonce)}>`
: '<script>'
const textDecoder = new TextDecoder()

function read() {
forwardReader.read().then(({ done, value }) => {
if (!bootstrapped) {
bootstrapped = true
writer.write(
encodeText(
`${startScriptTag}(self.__next_f=self.__next_f||[]).push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_BOOTSTRAP])
)});self.__next_f.push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_FORM_STATE, formState])
)})</script>`
)
)
}
if (done) {
// Add a setTimeout here because the error component is too small, the first forwardReader.read() read will return the full chunk
// and then it immediately set flightResponseRef.current as null.
// react renders the component twice, the second render will run into the state with useFlightResponse where flightResponseRef.current is null,
// so it tries to render the flight payload again
setTimeout(() => {
flightResponseRef.current = null
})
writer.close()
} else {
const responsePartial = decodeText(value, textDecoder)
const scripts = `${startScriptTag}self.__next_f.push(${htmlEscapeJsonString(
JSON.stringify([INLINE_FLIGHT_PAYLOAD_DATA, responsePartial])
)})</script>`

writer.write(encodeText(scripts))
read()
}
forwardStream
.pipeThrough(createDecodeTransformStream())
.pipeThrough(createFlightTransformer(nonce, formState))
.pipeThrough(createEncodeTransformStream())
.pipeTo(writable)
.finally(() => {
// Once the last encoding stream has flushed, then unset the flight
// response ref.
flightResponseRef.current = null
})
.catch((err) => {
console.error('Unexpected error while rendering Flight stream', err)
})
}
read()

return res
}
40 changes: 19 additions & 21 deletions packages/next/src/server/base-http/web.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import type { IncomingHttpHeaders, OutgoingHttpHeaders } from 'http'
import type { FetchMetrics } from './index'

import { toNodeOutgoingHttpHeaders } from '../web/utils'
import { BaseNextRequest, BaseNextResponse } from './index'
import { DetachedPromise } from '../../lib/detached-promise'

export class WebNextRequest extends BaseNextRequest<ReadableStream | null> {
public request: Request
Expand Down Expand Up @@ -32,27 +34,10 @@ export class WebNextRequest extends BaseNextRequest<ReadableStream | null> {
export class WebNextResponse extends BaseNextResponse<WritableStream> {
private headers = new Headers()
private textBody: string | undefined = undefined
private _sent = false

private sendPromise = new Promise<void>((resolve) => {
this.sendResolve = resolve
})
private sendResolve?: () => void
private response = this.sendPromise.then(() => {
return new Response(this.textBody ?? this.transformStream.readable, {
headers: this.headers,
status: this.statusCode,
statusText: this.statusMessage,
})
})

public statusCode: number | undefined
public statusMessage: string | undefined

get sent() {
return this._sent
}

constructor(public transformStream = new TransformStream()) {
super(transformStream.writable)
}
Expand Down Expand Up @@ -99,12 +84,25 @@ export class WebNextResponse extends BaseNextResponse<WritableStream> {
return this
}

send() {
this.sendResolve?.()
private readonly sendPromise = new DetachedPromise<void>()
private _sent = false
public send() {
this.sendPromise.resolve()
this._sent = true
}

toResponse() {
return this.response
get sent() {
return this._sent
}

public async toResponse() {
// If we haven't called `send` yet, wait for it to be called.
if (!this.sent) await this.sendPromise.promise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Why not just return a promise, and resolve that promise with the response?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toResponse is (as expected) called every request. As soon as you call await it will pause execution to get the value of the promise (even if it's already resolved). To avoid unnecissary pauses, this check bypasses that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is an async function, so this will always have at least 1 tick delay. If you just return a promise resolved to the Response, we keep the same number of overall ticks and (slightly) simplify this method.


return new Response(this.textBody ?? this.transformStream.readable, {
headers: this.headers,
status: this.statusCode,
statusText: this.statusMessage,
})
}
}
9 changes: 4 additions & 5 deletions packages/next/src/server/body-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ export function requestToBodyStream(
stream: Readable
) {
return new context.ReadableStream({
start(controller) {
stream.on('data', (chunk) =>
start: async (controller) => {
for await (const chunk of stream) {
controller.enqueue(new KUint8Array([...new Uint8Array(chunk)]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to construct a Uint8Array just to clone it into a KUint8Array?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
stream.on('end', () => controller.close())
stream.on('error', (err) => controller.error(err))
}
controller.close()
},
})
}
Expand Down
28 changes: 3 additions & 25 deletions packages/next/src/server/font-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
DEFAULT_SANS_SERIF_FONT,
} from '../shared/lib/constants'
const capsizeFontsMetrics = require('next/dist/server/capsize-font-metrics.json')
const https = require('https')

const CHROME_UA =
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36'
Expand All @@ -22,30 +21,9 @@ function isGoogleFont(url: string): boolean {
return url.startsWith(GOOGLE_FONT_PROVIDER)
}

function getFontForUA(url: string, UA: string): Promise<String> {
return new Promise((resolve, reject) => {
let rawData: any = ''
https
.get(
url,
{
headers: {
'user-agent': UA,
},
},
(res: any) => {
res.on('data', (chunk: any) => {
rawData += chunk
})
res.on('end', () => {
resolve(rawData.toString('utf8'))
})
}
)
.on('error', (e: Error) => {
reject(e)
})
})
async function getFontForUA(url: string, UA: string): Promise<string> {
const res = await fetch(url, { headers: { 'user-agent': UA } })
return await res.text()
Comment on lines +24 to +26
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was too simple to pass up updating 😅

}

export async function getFontDefinitionFromNetwork(
Expand Down
Loading
Loading