Skip to content

Commit

Permalink
fix: ensure we add an abort reason
Browse files Browse the repository at this point in the history
  • Loading branch information
wyattjoh committed Oct 18, 2023
1 parent 9b754da commit c242a5f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 28 deletions.
15 changes: 9 additions & 6 deletions packages/next/src/server/pipe-readable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ import type { ServerResponse } from 'node:http'

import './node-polyfill-web-streams'

import { createAbortController } from './web/spec-extension/adapters/next-request'
import {
ResponseAbortedName,
createAbortController,
} from './web/spec-extension/adapters/next-request'
import { DetachedPromise } from '../lib/detached-promise'

export function isAbortError(e: any): e is Error & { name: 'AbortError' } {
return e?.name === 'AbortError'
return e?.name === 'AbortError' || e?.name === ResponseAbortedName
}

function createWriterFromResponse(
Expand Down Expand Up @@ -66,7 +69,7 @@ function createWriterFromResponse(
}
} catch (err) {
res.end()
throw err
throw new Error('failed to write chunk to response', { cause: err })
}
},
abort: (err) => {
Expand Down Expand Up @@ -101,8 +104,8 @@ export async function pipeToNodeResponse(
await readable.pipeTo(writer, { signal: controller.signal })
} catch (err: any) {
// If this isn't related to an abort error, re-throw it.
if (!isAbortError(err)) {
throw err
}
if (isAbortError(err)) return

throw new Error('failed to pipe response', { cause: err })
}
}
45 changes: 25 additions & 20 deletions packages/next/src/server/stream-utils/node-web-streams-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { nonNullable } from '../../lib/non-nullable'
import { getTracer } from '../lib/trace/tracer'
import { AppRenderSpan } from '../lib/trace/constants'
import { createDecodeTransformStream } from './encode-decode'
import { DetachedPromise } from '../../lib/detached-promise'

const queueTask =
process.env.NEXT_RUNTIME === 'edge' ? globalThis.setTimeout : setImmediate
Expand Down Expand Up @@ -242,7 +243,8 @@ function createDeferredSuffixStream(
function createMergedTransformStream(
dataStream: ReadableStream<Uint8Array>
): TransformStream<Uint8Array, Uint8Array> {
let dataStreamFinished: Promise<void> | null = null
let dataStreamFinished: DetachedPromise<void> | null = null

return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk)
Expand All @@ -256,30 +258,33 @@ function createMergedTransformStream(
// implementation, e.g. with a specific high-water mark. To ensure it's
// the safe timing to pipe the data stream, this extra tick is
// necessary.
dataStreamFinished = new Promise((res) =>
// We use `setTimeout` here to ensure that it's inserted after flushing
// the shell. Note that this implementation might get stale if impl
// details of Fizz change in the future.
queueTask(async () => {
try {
while (true) {
const { done, value } = await dataStreamReader.read()
if (done) {
return res()
}
controller.enqueue(value)
}
} catch (err) {
controller.error(err)
const promise = new DetachedPromise<void>()
dataStreamFinished = promise

// We use `setTimeout` here to ensure that it's inserted after flushing
// the shell. Note that this implementation might get stale if impl
// details of Fizz change in the future.
queueTask(async () => {
try {
while (true) {
const { done, value } = await dataStreamReader.read()
if (done) return promise.resolve()

controller.enqueue(value)
}
res()
})
)
} catch (err) {
controller.error(err)
}

promise.resolve()
})
}
},
flush() {
// If the data stream promise is defined, then return it as its completion
// will be the completion of the stream.
if (dataStreamFinished) {
return dataStreamFinished
return dataStreamFinished.promise
}
},
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ import { getRequestMeta } from '../../../request-meta'
import { fromNodeOutgoingHttpHeaders } from '../../utils'
import { NextRequest } from '../request'

export const ResponseAbortedName = 'ResponseAborted'
export class ResponseAborted extends Error {
public readonly name = ResponseAbortedName
}

/**
* Creates an AbortController tied to the closing of a ServerResponse (or other
* appropriate Writable).
Expand All @@ -23,7 +28,7 @@ export function createAbortController(response: Writable): AbortController {
response.once('close', () => {
if (response.writableFinished) return

controller.abort()
controller.abort(new ResponseAborted())
})

return controller
Expand All @@ -39,7 +44,9 @@ export function createAbortController(response: Writable): AbortController {
*/
export function signalFromNodeResponse(response: Writable): AbortSignal {
const { errored, destroyed } = response
if (errored || destroyed) return AbortSignal.abort(errored)
if (errored || destroyed) {
return AbortSignal.abort(errored ?? new ResponseAborted())
}

const { signal } = createAbortController(response)
return signal
Expand Down

0 comments on commit c242a5f

Please sign in to comment.