diff --git a/streams/to_transform_stream.ts b/streams/to_transform_stream.ts index fee7f4e66bb2..35fd12f8b473 100644 --- a/streams/to_transform_stream.ts +++ b/streams/to_transform_stream.ts @@ -6,11 +6,21 @@ * * @typeparam I The type of the chunks in the source stream. * @typeparam O The type of the chunks in the transformed stream. - * @param transformer A function to transform. - * @param writableStrategy An object that optionally defines a queuing strategy for the stream. - * @param readableStrategy An object that optionally defines a queuing strategy for the stream. + * @param transformer A function to transform. Must return an iterable or async iterable. + * @param writableStrategy An object that optionally defines a queuing strategy + * for the stream's internal buffer between source and transformer. + * @param readableStrategy An object that optionally defines a queuing strategy + * for the stream's output buffer. * @returns A {@linkcode TransformStream} that transforms the source stream as defined by the provided transformer. * + * @throws {TypeError} If `transformer` does not return an iterable or async iterable. + * + * When the output stream is cancelled, the cancellation is propagated to both + * the iterator (via `throw()`) and the source readable stream. + * + * When the iterator throws an error, the error is propagated to both the output + * readable stream and the source readable stream (via `cancel()`). + * * @example Build a transform stream that multiplies each value by 100 * ```ts * import { toTransformStream } from "@std/streams/to-transform-stream"; @@ -72,9 +82,14 @@ export function toTransformStream( } = new TransformStream(undefined, writableStrategy); const iterable = transformer(readable); - const iterator: Iterator | AsyncIterator = - (iterable as AsyncIterable)[Symbol.asyncIterator]?.() ?? - (iterable as Iterable)[Symbol.iterator]?.(); + const iterator: Iterator | AsyncIterator | undefined = + (iterable as AsyncIterable | null)?.[Symbol.asyncIterator]?.() ?? + (iterable as Iterable | null)?.[Symbol.iterator]?.(); + if (!iterator) { + throw new TypeError( + "Transformer must return an iterable or async iterable", + ); + } return { writable, readable: new ReadableStream({ diff --git a/streams/to_transform_stream_test.ts b/streams/to_transform_stream_test.ts index 2721dd09ab66..4b5c5bd2a945 100644 --- a/streams/to_transform_stream_test.ts +++ b/streams/to_transform_stream_test.ts @@ -1,6 +1,6 @@ // Copyright 2018-2026 the Deno authors. MIT license. -import { assertEquals, assertRejects } from "@std/assert"; +import { assertEquals, assertRejects, assertThrows } from "@std/assert"; import { toTransformStream } from "./to_transform_stream.ts"; Deno.test({ @@ -33,6 +33,38 @@ Deno.test({ }, }); +Deno.test({ + name: "toTransformStream() throws if transformer returns non-iterable", + fn() { + assertThrows( + () => { + // @ts-expect-error Testing invalid input + toTransformStream(() => null); + }, + TypeError, + "Transformer must return an iterable or async iterable", + ); + + assertThrows( + () => { + // @ts-expect-error Testing invalid input + toTransformStream(() => 123); + }, + TypeError, + "Transformer must return an iterable or async iterable", + ); + + assertThrows( + () => { + // @ts-expect-error Testing invalid input + toTransformStream(() => ({ notAnIterator: true })); + }, + TypeError, + "Transformer must return an iterable or async iterable", + ); + }, +}); + Deno.test({ name: "toTransformStream() propagates the error from readable 1", async fn(t) { @@ -207,3 +239,27 @@ Deno.test({ ); }, }); + +Deno.test({ + name: "toTransformStream() executes finally blocks on cancellation", + async fn() { + let finallyCalled = false; + + const readable = ReadableStream.from([1, 2, 3]) + .pipeThrough(toTransformStream(async function* (src) { + try { + for await (const chunk of src) { + yield chunk; + } + } finally { + finallyCalled = true; + } + })); + + const reader = readable.getReader(); + await reader.read(); // Read first chunk + await reader.cancel("test cancellation"); + + assertEquals(finallyCalled, true); + }, +});