Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions streams/to_transform_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,21 @@
*
* @typeparam I The type of the chunks in the source stream.
* @typeparam O The type of the chunks in the transformed stream.
* @param transformer A function to transform.
* @param writableStrategy An object that optionally defines a queuing strategy for the stream.
* @param readableStrategy An object that optionally defines a queuing strategy for the stream.
* @param transformer A function to transform. Must return an iterable or async iterable.
* @param writableStrategy An object that optionally defines a queuing strategy
* for the stream's internal buffer between source and transformer.
* @param readableStrategy An object that optionally defines a queuing strategy
* for the stream's output buffer.
* @returns A {@linkcode TransformStream} that transforms the source stream as defined by the provided transformer.
*
* @throws {TypeError} If `transformer` does not return an iterable or async iterable.
*
* When the output stream is cancelled, the cancellation is propagated to both
* the iterator (via `throw()`) and the source readable stream.
*
* When the iterator throws an error, the error is propagated to both the output
* readable stream and the source readable stream (via `cancel()`).
*
* @example Build a transform stream that multiplies each value by 100
* ```ts
* import { toTransformStream } from "@std/streams/to-transform-stream";
Expand Down Expand Up @@ -72,9 +82,14 @@ export function toTransformStream<I, O>(
} = new TransformStream<I, I>(undefined, writableStrategy);

const iterable = transformer(readable);
const iterator: Iterator<O> | AsyncIterator<O> =
(iterable as AsyncIterable<O>)[Symbol.asyncIterator]?.() ??
(iterable as Iterable<O>)[Symbol.iterator]?.();
const iterator: Iterator<O> | AsyncIterator<O> | undefined =
(iterable as AsyncIterable<O> | null)?.[Symbol.asyncIterator]?.() ??
(iterable as Iterable<O> | null)?.[Symbol.iterator]?.();
if (!iterator) {
throw new TypeError(
"Transformer must return an iterable or async iterable",
);
}
return {
writable,
readable: new ReadableStream<O>({
Expand Down
58 changes: 57 additions & 1 deletion streams/to_transform_stream_test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2026 the Deno authors. MIT license.

import { assertEquals, assertRejects } from "@std/assert";
import { assertEquals, assertRejects, assertThrows } from "@std/assert";
import { toTransformStream } from "./to_transform_stream.ts";

Deno.test({
Expand Down Expand Up @@ -33,6 +33,38 @@ Deno.test({
},
});

Deno.test({
name: "toTransformStream() throws if transformer returns non-iterable",
fn() {
assertThrows(
() => {
// @ts-expect-error Testing invalid input
toTransformStream(() => null);
},
TypeError,
"Transformer must return an iterable or async iterable",
);

assertThrows(
() => {
// @ts-expect-error Testing invalid input
toTransformStream(() => 123);
},
TypeError,
"Transformer must return an iterable or async iterable",
);

assertThrows(
() => {
// @ts-expect-error Testing invalid input
toTransformStream(() => ({ notAnIterator: true }));
},
TypeError,
"Transformer must return an iterable or async iterable",
);
},
});

Deno.test({
name: "toTransformStream() propagates the error from readable 1",
async fn(t) {
Expand Down Expand Up @@ -207,3 +239,27 @@ Deno.test({
);
},
});

Deno.test({
name: "toTransformStream() executes finally blocks on cancellation",
async fn() {
let finallyCalled = false;

const readable = ReadableStream.from([1, 2, 3])
.pipeThrough(toTransformStream(async function* (src) {
try {
for await (const chunk of src) {
yield chunk;
}
} finally {
finallyCalled = true;
}
}));

const reader = readable.getReader();
await reader.read(); // Read first chunk
await reader.cancel("test cancellation");

assertEquals(finallyCalled, true);
},
});