diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index 978ca31b371..e033e886091 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -70,6 +70,7 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2 * fix(otlp-exporter-base): prioritize `esnext` export condition as it is more specific [#5458](https://github.com/open-telemetry/opentelemetry-js/pull/5458) * fix(otlp-exporter-base): consider relative urls as valid in browser environments [#5807](https://github.com/open-telemetry/opentelemetry-js/pull/5807) * fix(instrumentation-fetch): Use ESM version of semconv instead of CJS. Users expecting mixed ESM and CJS modules will now only get ESM modules. [#5878](https://github.com/open-telemetry/opentelemetry-js/pull/5878) @overbalance +* fix(instrumentation-fetch): release HTTP connection when response body is cancelled [#5894](https://github.com/open-telemetry/opentelemetry-js/pull/5894) @Lei-k ### :house: Internal diff --git a/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts b/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts index 2da858869f8..443d1b60c66 100644 --- a/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts +++ b/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts @@ -410,7 +410,6 @@ export class FetchInstrumentation extends InstrumentationBase { if (!bodyLength) return; - if (plugin._semconvStability & SemconvStability.OLD) { createdSpan.setAttribute( ATTR_HTTP_REQUEST_CONTENT_LENGTH_UNCOMPRESSED, @@ -451,16 +450,72 @@ export class FetchInstrumentation extends InstrumentationBase | null, + readerClone: ReadableStreamDefaultReader + ): ReadableStream | null { + if (!body) return null; + + const reader = body.getReader(); + + return new ReadableStream({ + async pull(controller) { + try { + const { value, done } = await reader.read(); + if (done) { + reader.releaseLock(); + controller.close(); + } else { + controller.enqueue(value); + } + } catch (err) { + controller.error(err); + reader.cancel(err).catch(_ => {}); + + try { + reader.releaseLock(); + } catch { + // Spec reference: + // https://streams.spec.whatwg.org/#default-reader-release-lock + // + // releaseLock() only throws if called on an invalid reader + // (i.e. reader.[[stream]] is undefined, meaning the lock is already released + // or the reader was never associated). In normal use this cannot happen. + // This catch is defensive only. + } + } + }, + cancel(reason) { + readerClone.cancel(reason).catch(_ => {}); + return reader.cancel(reason); + }, + }); + } + function onSuccess( span: api.Span, resolve: (value: Response | PromiseLike) => void, response: Response ): void { + let proxiedResponse: Response | null = null; + try { + // TODO: Switch to a consumer-driven model and drop `resClone`. + // Keeping eager consumption here to preserve current behavior and avoid breaking existing tests. + // Context: discussion in PR #5894 → https://github.com/open-telemetry/opentelemetry-js/pull/5894 const resClone = response.clone(); const body = resClone.body; if (body) { const reader = body.getReader(); + + const wrappedBody = withCancelPropagation(response.body, reader); + + proxiedResponse = new Response(wrappedBody, { + status: response.status, + statusText: response.statusText, + headers: response.headers, + }); + const read = (): void => { reader.read().then( ({ done }) => { @@ -481,7 +536,7 @@ export class FetchInstrumentation extends InstrumentationBase { plugin._addHeaders(options, url); - // Important to execute "_callRequestHook" after "_addHeaders", allowing the consumer code to override the request headers. plugin._callRequestHook(createdSpan, options); plugin._tasksCount++; - // TypeScript complains about arrow function captured a this typed as globalThis - // ts(7041) + return original .apply( self, diff --git a/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts b/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts index 5a49397f104..43d3eb6116a 100644 --- a/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts +++ b/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts @@ -2388,5 +2388,125 @@ describe('fetch', () => { }); }); }); + + describe('long-lived streaming requests', () => { + let tracePromise: Promise | undefined; + let pushes = 0; + let timer: any; + + const streamHandler = () => { + const encoder = new TextEncoder(); + + return msw.http.get('/api/stream', () => { + const stream = new ReadableStream({ + start(controller) { + // Continuously push data to simulate a long connection + timer = setInterval(() => { + if (pushes >= 50) { + clearInterval(timer); + controller.close(); + return; + } + pushes += 1; + controller.enqueue(encoder.encode(`data: ${pushes}\n`)); + }, 50); + }, + }); + + const response = new msw.HttpResponse(stream, { + status: 200, + headers: { + 'Content-Type': 'text/plain; charset=utf-8', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + }); + + return response; + }); + }; + + async function tracedFetch({ + handlers = [streamHandler()], + callback = () => fetch('/api/stream', { method: 'GET' }), + config = {}, + }: { + handlers?: msw.RequestHandler[]; + callback?: () => Promise; + config?: FetchInstrumentationConfig; + } = {}): Promise<{ response: Response }> { + await startWorker(...handlers); + + const response = await new Promise(resolve => { + tracePromise = trace(async () => { + resolve(await callback()); + }, config); + }); + + return { response: response }; + } + + const assertFirstChunk = async (response: Response) => { + assert.ok( + response.body instanceof ReadableStream, + 'response.body should be a ReadableStream' + ); + const reader = response.body.getReader(); + const first = await reader.read(); + assert.strictEqual(first.done, false, 'first chunk should not be done'); + const text = new TextDecoder().decode(first.value); + assert.match( + text, + /^data: \d+\n$/, + 'first chunk should match "data: \\n"' + ); + return reader; + }; + + beforeEach(() => { + if (timer) { + clearInterval(timer); + timer = undefined; + } + + pushes = 0; + tracePromise = undefined; + }); + + afterEach(() => { + if (timer) { + clearInterval(timer); + timer = undefined; + } + }); + + describe('when client cancels the reader', () => { + it('should cancel stream and release the connection', async () => { + const { response } = await tracedFetch(); + + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => { + reject(new Error('trace should finish before timeout')); + }, 1000); + }); + + // Read the first chunk to confirm the stream is live + const reader = await assertFirstChunk(response); + + reader.cancel('test-cancel'); + + await Promise.race([tracePromise, timeoutPromise]); + + assert.strictEqual( + exportedSpans.length, + 1, + 'should create a single span' + ); + + const span: tracing.ReadableSpan = exportedSpans[0]; + assert.ok(span.ended, 'span should be ended'); + }); + }); + }); }); });