From 8ffd2c874b94e5b28cfa1ce7adb1426df0f042aa Mon Sep 17 00:00:00 2001 From: Neil Date: Sat, 30 Aug 2025 00:39:05 +0800 Subject: [PATCH 1/6] fix(fetch-instrumentation): propagate cancel to cloned response reader to release HTTP connection --- .../src/fetch.ts | 51 +++++++++++++++++-- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts b/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts index 2da858869f8..3d056b6ad5e 100644 --- a/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts +++ b/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts @@ -410,7 +410,6 @@ export class FetchInstrumentation extends InstrumentationBase { if (!bodyLength) return; - if (plugin._semconvStability & SemconvStability.OLD) { createdSpan.setAttribute( ATTR_HTTP_REQUEST_CONTENT_LENGTH_UNCOMPRESSED, @@ -451,16 +450,60 @@ export class FetchInstrumentation extends InstrumentationBase | null, + readerClone: ReadableStreamDefaultReader + ): ReadableStream | null { + if (!body) return null; + + const reader = body.getReader(); + + return new ReadableStream({ + async pull(controller) { + try { + const { value, done } = await reader.read(); + if (done) { + reader.releaseLock(); + controller.close(); + } else { + controller.enqueue(value); + } + } catch (err) { + controller.error(err); + reader.cancel(err).catch((_) => {}); + + try { reader.releaseLock() } catch(_) {}; + } + }, + cancel(reason) { + readerClone.cancel(reason).catch((_) => {}); + return reader.cancel(reason); + } + }); + } + function onSuccess( span: api.Span, resolve: (value: Response | PromiseLike) => void, response: Response ): void { + + let proxiedResponse: Response | null = null; + try { const resClone = response.clone(); const body = resClone.body; if (body) { const reader = body.getReader(); + + const wrappedBody = withCancelPropagation(response.body, reader); + + proxiedResponse = new Response(wrappedBody, { + status: response.status, + statusText: response.statusText, + headers: response.headers, + }); + const read = (): void => { reader.read().then( ({ done }) => { @@ -481,7 +524,7 @@ export class FetchInstrumentation extends InstrumentationBase { plugin._addHeaders(options, url); - // Important to execute "_callRequestHook" after "_addHeaders", allowing the consumer code to override the request headers. plugin._callRequestHook(createdSpan, options); plugin._tasksCount++; - // TypeScript complains about arrow function captured a this typed as globalThis - // ts(7041) + return original .apply( self, From 56346a99db2be9c6bf2b50cc4c65d5341ae7911c Mon Sep 17 00:00:00 2001 From: Neil Date: Sat, 30 Aug 2025 01:09:06 +0800 Subject: [PATCH 2/6] chore(fetch-instrumentation): fix lint issues and add spec reference comments --- .../src/fetch.ts | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts b/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts index 3d056b6ad5e..20299670034 100644 --- a/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts +++ b/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts @@ -470,15 +470,25 @@ export class FetchInstrumentation extends InstrumentationBase {}); + reader.cancel(err).catch(_ => {}); - try { reader.releaseLock() } catch(_) {}; + try { + reader.releaseLock(); + } catch { + // Spec reference: + // https://streams.spec.whatwg.org/#default-reader-release-lock + // + // releaseLock() only throws if called on an invalid reader + // (i.e. reader.[[stream]] is undefined, meaning the lock is already released + // or the reader was never associated). In normal use this cannot happen. + // This catch is defensive only. + } } }, cancel(reason) { - readerClone.cancel(reason).catch((_) => {}); + readerClone.cancel(reason).catch(_ => {}); return reader.cancel(reason); - } + }, }); } @@ -487,7 +497,6 @@ export class FetchInstrumentation extends InstrumentationBase) => void, response: Response ): void { - let proxiedResponse: Response | null = null; try { From 885082a0053dd804f6dd0da75f68fea7c3a555da Mon Sep 17 00:00:00 2001 From: Neil Date: Tue, 2 Sep 2025 00:10:39 +0800 Subject: [PATCH 3/6] chore(changelog): add entry for fix(instrumentation-fetch) (#5894) --- experimental/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/experimental/CHANGELOG.md b/experimental/CHANGELOG.md index bbb6e59b4be..798bf056e38 100644 --- a/experimental/CHANGELOG.md +++ b/experimental/CHANGELOG.md @@ -41,6 +41,7 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2 * fix(otlp-exporter-base): prioritize `esnext` export condition as it is more specific [#5458](https://github.com/open-telemetry/opentelemetry-js/pull/5458) * fix(otlp-exporter-base): consider relative urls as valid in browser environments [#5807](https://github.com/open-telemetry/opentelemetry-js/pull/5807) * fix(instrumentation-fetch): Use ESM version of semconv instead of CJS. Users expecting mixed ESM and CJS modules will now only get ESM modules. [#5878](https://github.com/open-telemetry/opentelemetry-js/pull/5878) @overbalance +* fix(instrumentation-fetch): release HTTP connection when response body is cancelled [#5894](https://github.com/open-telemetry/opentelemetry-js/pull/5894) @Lei-k ### :books: Documentation From 75cbbbd7433a7ce0a949d215a81a539bcc74d415 Mon Sep 17 00:00:00 2001 From: Neil Date: Thu, 4 Sep 2025 23:40:51 +0800 Subject: [PATCH 4/6] test(instrumentation-fetch): add long-lived streaming request test to ensure span ends on reader.cancel() --- .../test/fetch.test.ts | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts b/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts index 5a49397f104..d30b966de03 100644 --- a/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts +++ b/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts @@ -2388,5 +2388,126 @@ describe('fetch', () => { }); }); }); + + describe('long-lived streaming requests', () => { + let tracePromise: Promise | undefined; + let pushes = 0; + let timer: any; + + const streamHandler = () => { + const encoder = new TextEncoder(); + + return msw.http.get('/api/stream', () => { + + const stream = new ReadableStream({ + start(controller) { + // Continuously push data to simulate a long connection + timer = setInterval(() => { + if (pushes >= 50) { + clearInterval(timer); + controller.close(); + return; + } + pushes += 1; + controller.enqueue(encoder.encode(`data: ${pushes}\n`)); + }, 50); + }, + }); + + const response = new msw.HttpResponse(stream, { + status: 200, + headers: { + 'Content-Type': 'text/plain; charset=utf-8', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + }); + + return response; + }); + }; + + async function tracedFetch({ + handlers = [streamHandler()], + callback = () => fetch('/api/stream', { method: 'GET' }), + config = {}, + }: { + handlers?: msw.RequestHandler[]; + callback?: () => Promise; + config?: FetchInstrumentationConfig; + } = {}): Promise<{ response: Response }> { + await startWorker(...handlers); + + const response = await new Promise(resolve => { + tracePromise = trace(async () => { + resolve(await callback()); + }, config); + }); + + return { response: response }; + } + + const assertFirstChunk = async (response: Response) => { + assert.ok( + response.body instanceof ReadableStream, + 'response.body should be a ReadableStream' + ); + const reader = response.body.getReader(); + const first = await reader.read(); + assert.strictEqual(first.done, false, 'first chunk should not be done'); + const text = new TextDecoder().decode(first.value); + assert.match( + text, + /^data: \d+\n$/, + 'first chunk should match "data: \\n"' + ); + return reader; + }; + + beforeEach(() => { + if(timer) { + clearInterval(timer); + timer = undefined; + } + + pushes = 0; + tracePromise = undefined; + }); + + afterEach(() => { + if(timer) { + clearInterval(timer); + timer = undefined; + } + }); + + describe('when client cancels the reader', () => { + it('should cancel stream and release the connection', async () => { + const { response } = await tracedFetch(); + + const timeoutPromise = new Promise((_, reject) => { + setTimeout(() => { + reject(new Error('trace should finish before timeout')); + }, 1000); + }); + + // Read the first chunk to confirm the stream is live + const reader = await assertFirstChunk(response); + + reader.cancel('test-cancel'); + + await Promise.race([tracePromise, timeoutPromise]); + + assert.strictEqual( + exportedSpans.length, + 1, + 'should create a single span' + ); + + const span: tracing.ReadableSpan = exportedSpans[0]; + assert.ok(span.ended, 'span should be ended'); + }); + }); + }); }); }); From f0b50977b7d7e8b3190fe09133a7f4a9c0f5c29f Mon Sep 17 00:00:00 2001 From: Neil Date: Wed, 17 Sep 2025 23:03:29 +0800 Subject: [PATCH 5/6] chore(fetch-instrumentation): fix lint --- .../opentelemetry-instrumentation-fetch/test/fetch.test.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts b/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts index d30b966de03..43d3eb6116a 100644 --- a/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts +++ b/experimental/packages/opentelemetry-instrumentation-fetch/test/fetch.test.ts @@ -2398,7 +2398,6 @@ describe('fetch', () => { const encoder = new TextEncoder(); return msw.http.get('/api/stream', () => { - const stream = new ReadableStream({ start(controller) { // Continuously push data to simulate a long connection @@ -2465,7 +2464,7 @@ describe('fetch', () => { }; beforeEach(() => { - if(timer) { + if (timer) { clearInterval(timer); timer = undefined; } @@ -2475,7 +2474,7 @@ describe('fetch', () => { }); afterEach(() => { - if(timer) { + if (timer) { clearInterval(timer); timer = undefined; } From 832b061b714d4cecfc910575ab11aacb4e3835b8 Mon Sep 17 00:00:00 2001 From: Neil Date: Thu, 18 Sep 2025 00:11:53 +0800 Subject: [PATCH 6/6] chore(instrumentation-fetch): add TODO to switch to consumer-driven model and drop resClone; keep eager consumption for now (see PR #5894) --- .../packages/opentelemetry-instrumentation-fetch/src/fetch.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts b/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts index 20299670034..443d1b60c66 100644 --- a/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts +++ b/experimental/packages/opentelemetry-instrumentation-fetch/src/fetch.ts @@ -500,6 +500,9 @@ export class FetchInstrumentation extends InstrumentationBase