Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8ffd2c8
fix(fetch-instrumentation): propagate cancel to cloned response reade…
Lei-k Aug 29, 2025
56346a9
chore(fetch-instrumentation): fix lint issues and add spec reference …
Lei-k Aug 29, 2025
9470bbf
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 1, 2025
885082a
chore(changelog): add entry for fix(instrumentation-fetch) (#5894)
Lei-k Sep 1, 2025
9f5c4f2
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 1, 2025
dc0cd9a
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 2, 2025
1773f13
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 2, 2025
9aec9f3
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 2, 2025
4b60dec
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 3, 2025
75cbbbd
test(instrumentation-fetch): add long-lived streaming request test to…
Lei-k Sep 4, 2025
0bceedc
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 4, 2025
8babf3a
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 8, 2025
738e28a
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 10, 2025
aeaafda
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 10, 2025
e9bf623
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 11, 2025
421196a
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 12, 2025
bebd82a
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 15, 2025
1753d37
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 16, 2025
487c0b5
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 16, 2025
b6f19b6
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 17, 2025
f0b5097
chore(fetch-instrumentation): fix lint
Lei-k Sep 17, 2025
e78faf5
Merge branch 'main' into fix/instrumentation-fetch
Lei-k Sep 17, 2025
832b061
chore(instrumentation-fetch): add TODO to switch to consumer-driven m…
Lei-k Sep 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2
* fix(otlp-exporter-base): prioritize `esnext` export condition as it is more specific [#5458](https://github.com/open-telemetry/opentelemetry-js/pull/5458)
* fix(otlp-exporter-base): consider relative urls as valid in browser environments [#5807](https://github.com/open-telemetry/opentelemetry-js/pull/5807)
* fix(instrumentation-fetch): Use ESM version of semconv instead of CJS. Users expecting mixed ESM and CJS modules will now only get ESM modules. [#5878](https://github.com/open-telemetry/opentelemetry-js/pull/5878) @overbalance
* fix(instrumentation-fetch): release HTTP connection when response body is cancelled [#5894](https://github.com/open-telemetry/opentelemetry-js/pull/5894) @Lei-k

### :house: Internal

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ export class FetchInstrumentation extends InstrumentationBase<FetchInstrumentati
getFetchBodyLength(...args)
.then(bodyLength => {
if (!bodyLength) return;

if (plugin._semconvStability & SemconvStability.OLD) {
createdSpan.setAttribute(
ATTR_HTTP_REQUEST_CONTENT_LENGTH_UNCOMPRESSED,
Expand Down Expand Up @@ -451,16 +450,72 @@ export class FetchInstrumentation extends InstrumentationBase<FetchInstrumentati
}
}

function withCancelPropagation(
body: ReadableStream<Uint8Array> | null,
readerClone: ReadableStreamDefaultReader<Uint8Array>
): ReadableStream<Uint8Array> | null {
if (!body) return null;

const reader = body.getReader();

return new ReadableStream({
async pull(controller) {
try {
const { value, done } = await reader.read();
if (done) {
reader.releaseLock();
controller.close();
} else {
controller.enqueue(value);
}
} catch (err) {
controller.error(err);
reader.cancel(err).catch(_ => {});

try {
reader.releaseLock();
} catch {
// Spec reference:
// https://streams.spec.whatwg.org/#default-reader-release-lock
//
// releaseLock() only throws if called on an invalid reader
// (i.e. reader.[[stream]] is undefined, meaning the lock is already released
// or the reader was never associated). In normal use this cannot happen.
// This catch is defensive only.
}
}
},
cancel(reason) {
readerClone.cancel(reason).catch(_ => {});
return reader.cancel(reason);
},
});
}

function onSuccess(
span: api.Span,
resolve: (value: Response | PromiseLike<Response>) => void,
response: Response
): void {
let proxiedResponse: Response | null = null;

try {
// TODO: Switch to a consumer-driven model and drop `resClone`.
// Keeping eager consumption here to preserve current behavior and avoid breaking existing tests.
// Context: discussion in PR #5894 → https://github.com/open-telemetry/opentelemetry-js/pull/5894
const resClone = response.clone();
Comment thread
Lei-k marked this conversation as resolved.
const body = resClone.body;
if (body) {
const reader = body.getReader();

const wrappedBody = withCancelPropagation(response.body, reader);

proxiedResponse = new Response(wrappedBody, {
status: response.status,
statusText: response.statusText,
headers: response.headers,
});

const read = (): void => {
reader.read().then(
({ done }) => {
Expand All @@ -481,7 +536,7 @@ export class FetchInstrumentation extends InstrumentationBase<FetchInstrumentati
endSpanOnSuccess(span, response);
}
} finally {
resolve(response);
resolve(proxiedResponse ?? response);
}
}

Expand All @@ -502,11 +557,9 @@ export class FetchInstrumentation extends InstrumentationBase<FetchInstrumentati
api.trace.setSpan(api.context.active(), createdSpan),
() => {
plugin._addHeaders(options, url);
// Important to execute "_callRequestHook" after "_addHeaders", allowing the consumer code to override the request headers.
plugin._callRequestHook(createdSpan, options);
plugin._tasksCount++;
// TypeScript complains about arrow function captured a this typed as globalThis
// ts(7041)

return original
.apply(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2388,5 +2388,125 @@ describe('fetch', () => {
});
});
});

describe('long-lived streaming requests', () => {
let tracePromise: Promise<api.Span> | undefined;
let pushes = 0;
let timer: any;

const streamHandler = () => {
const encoder = new TextEncoder();

return msw.http.get('/api/stream', () => {
const stream = new ReadableStream<Uint8Array>({
start(controller) {
// Continuously push data to simulate a long connection
timer = setInterval(() => {
if (pushes >= 50) {
clearInterval(timer);
controller.close();
return;
}
pushes += 1;
controller.enqueue(encoder.encode(`data: ${pushes}\n`));
}, 50);
},
});

const response = new msw.HttpResponse(stream, {
status: 200,
headers: {
'Content-Type': 'text/plain; charset=utf-8',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});

return response;
});
};

async function tracedFetch({
handlers = [streamHandler()],
callback = () => fetch('/api/stream', { method: 'GET' }),
config = {},
}: {
handlers?: msw.RequestHandler[];
callback?: () => Promise<Response>;
config?: FetchInstrumentationConfig;
} = {}): Promise<{ response: Response }> {
await startWorker(...handlers);

const response = await new Promise<Response>(resolve => {
tracePromise = trace(async () => {
resolve(await callback());
}, config);
});

return { response: response };
}

const assertFirstChunk = async (response: Response) => {
assert.ok(
response.body instanceof ReadableStream,
'response.body should be a ReadableStream'
);
const reader = response.body.getReader();
const first = await reader.read();
assert.strictEqual(first.done, false, 'first chunk should not be done');
const text = new TextDecoder().decode(first.value);
assert.match(
text,
/^data: \d+\n$/,
'first chunk should match "data: <n>\\n"'
);
return reader;
};

beforeEach(() => {
if (timer) {
clearInterval(timer);
timer = undefined;
}

pushes = 0;
tracePromise = undefined;
});

afterEach(() => {
if (timer) {
clearInterval(timer);
timer = undefined;
}
});

describe('when client cancels the reader', () => {
it('should cancel stream and release the connection', async () => {
const { response } = await tracedFetch();

const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => {
reject(new Error('trace should finish before timeout'));
}, 1000);
});

// Read the first chunk to confirm the stream is live
const reader = await assertFirstChunk(response);

reader.cancel('test-cancel');

await Promise.race([tracePromise, timeoutPromise]);

assert.strictEqual(
exportedSpans.length,
1,
'should create a single span'
);

const span: tracing.ReadableSpan = exportedSpans[0];
assert.ok(span.ended, 'span should be ended');
});
});
});
});
});
Loading