Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸ› Bug Report β€” Runtime APIs: piping ReadableStream to an IdentityTransformStream results in uncaught TypeError: This WritableStream has been closed. #992

Open
mrbbot opened this issue Aug 8, 2023 · 4 comments

Comments

@mrbbot
Copy link
Contributor

mrbbot commented Aug 8, 2023

Hey! πŸ‘‹ With the following workerd configuration...

using Workerd = import "/workerd/workerd.capnp";

const config :Workerd.Config = (
  services = [
    ( name = "main", worker = .worker ),
  ],
  sockets = [
    ( name = "http", address = "*:8080", http = (), service = "main" ),
  ]
);

const worker :Workerd.Worker = (
  modules = [
    ( name = "index.mjs", esModule = embed "index.mjs" )
  ],
  compatibilityDate = "2023-08-01",
);
// index.mjs
export default {
  async fetch(request, env, ctx) {
    const readable = new ReadableStream({
      pull(controller) {
        controller.enqueue(new TextEncoder().encode("abc"));
        controller.close();
      }
    });

    const identity = new IdentityTransformStream();
    ctx.waitUntil(readable.pipeTo(identity.writable));

    return new Response(identity.readable);
  }
}

...and running workerd serve config.capnp --verbose, logs Uncaught (in promise); exception = TypeError: This WritableStream has been closed. on request.

Finding the error string and adding KJ_DBGs to all those places shows the error is coming from:

auto reason = js.v8TypeError("This WritableStream has been closed."_kj);

Replacing const readable = new ReadableStream(...); with...

const { readable, writable } = new TransformStream();
const writer = writable.getWriter();
void writer.write(encoder.encode("abc"));
void writer.close();

...also leads to the same error.

Replacing new IdentityTransformStream() with new TransformStream() fixes the issue.

@hoangprod
Copy link

  // Create a DecompressionStream for the 'deflate' format
  const decompressionStream = new DecompressionStream('deflate');
  
  // Create a ReadableStream with the decrypted data
  const readableStream = new ReadableStream({
      start(controller) {
      controller.enqueue(new Uint8Array(decrypted));
      controller.close();
      },
  });


  // Pipe the decrypted data through the decompression stream
  return readableStream.pipeThrough(decompressionStream);

I am getting the same here after the return of the new Response

    const inflatedBody = await decrypt_r2(c, await file_contents.arrayBuffer());

    return new Response(inflatedBody, { 
        status: 200,
        headers: {
            'Content-Type': 'application/octet-stream',
            'Content-Disposition': `attachment; filename="${filename}"`
        }
    });
X [ERROR] Uncaught (async) TypeError: This WritableStream has been closed.
X [ERROR] Uncaught (in promise) TypeError: This WritableStream has been close

@marbemac
Copy link

marbemac commented Sep 13, 2023

Ditto on the CompressionStream side of things. As soon as I add .pipeThrough(new CompressionStream('gzip')) I start receiving the error described in this issue.

I think the issue might be broader than the title suggests... in my case I'm just piping a ReadableStream through a CompressionStream (no IdentityTransformStream involved), along these lines:

const eventStream = someReadableStream
  .pipeThrough(new TextEncoderStream())
  .pipeThrough(new CompressionStream('gzip')); // the error goes away if compression stream is not included

This is during local dev with "wrangler": "3.7.0" - have not checked prod yet.

@gildas-lormeau
Copy link

gildas-lormeau commented May 21, 2024

This problem has also been reported to me in zip.js. Here's a complete example below to reproduce the issue with the CompressionStream API.

export default {
  async fetch() {
    const { readable, writable } = new CompressionStream("deflate-raw");
    const helloWorldStream = new ReadableStream({
      start(controller) {
        controller.enqueue(new TextEncoder().encode("Hello, World!"));
        controller.close();
      }
    });

    helloWorldStream.pipeTo(writable);
    
    return new Response(readable, {
      headers: {
        "Content-Disposition": 'attachment; filename="file.raw"',
        "Content-Type": "application/octet-stream",
        "Cache-Control": "no-cache",
      }
    });
  }
};

It can be circumvented with a fake CompressionStream API defined as below.

globalThis.CompressionStream = class {
  constructor(format) {
    const compressionStream = new CompressionStream(format);
    const writer = compressionStream.writable.getWriter();
    return new TransformStream({
      async transform(chunk) {
        await writer.write(chunk);
      },
      async flush() {
        await writer.close();
      }
    });
  }
};

// App code below
// ...

@hhoughgg
Copy link

hhoughgg commented Aug 12, 2024

I ran into the same issue and pipe seems to cause the same problems as pipeThrough just with less frequency. This is an example of what I did as a workaround.

async function pipeThrough(r: ReadableStream, w: WritableStream) {
  const writer = w.getWriter()
  for await (const chunk of r) {
    writer.write(chunk)
  }
  writer.close()
}

const { readable, writable } = new TransformStream<string, string>()
const compressor = new CompressionStream('gzip')

const pipe = readable
  .pipeThrough(new TextEncoderStream())

pipeThrough(pipe, compressor.writable) // replacement for .pipeThrough(new CompressionStream('gzip'))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants