Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to use pipeline inside an infinite while loop #45861

Closed
amustaque97 opened this issue Dec 14, 2022 · 13 comments
Closed

How to use pipeline inside an infinite while loop #45861

amustaque97 opened this issue Dec 14, 2022 · 13 comments
Labels
question Issues that look for answers.

Comments

@amustaque97
Copy link

Hi there, I'm new to the stream API. I'm finding it hard to understand what is wrong with my codebase and what could be the exact fix for my problem. Suppose there is an API with 500 pages. I need to stream data from that API (all 500 pages) and stream it to the destination. Below is a code example:

import fetch from "node-fetch";
import { Readable, Writable, pipeline } from "stream";

const ss = new fs.createWriteStream("output.txt");

let i = 0;
while (true) {
  const result = await (
    await fetch(URL, {
      method: "POST",
      body: JSON.stringify(data),
      headers: {
        "Content-Type": "application/json",
        Connection: "keep-Alive",
      },
    })
  ).json();
  pipeline(
    Readable.from(result.data),
    async function* transform(src) {
      for await (const chunk of src) {
        yield JSON.stringify(chunk);
      }
    },
    ss
  );
  i += 1;
  if (i == 500) break;
}

When I run the above code: I get the following warnings:

(node:9561) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 close listeners added to [Stream]. Use emitter.setMaxListeners() to increase limit

As per my understanding, ss is ended after the first stream of data, so in the second iteration, when i = 1 ss is already closed, and it throws an error. Not sure how to keep it open or how to open ss again for writing.

@mscdex
Copy link
Contributor

mscdex commented Dec 14, 2022

  1. You're missing an await before pipeline().
  2. Put your fs.createWriteStream() right inside of the top of the loop and have it open the file in append mode.
  3. (Optional) Your while loop is really a for loop, so you're probably better off using for instead.

@mscdex mscdex added the question Issues that look for answers. label Dec 14, 2022
@amustaque97
Copy link
Author

@mscdex, thank you for the quick response your solution works like charm. Thank you once again 👏🏻

I have a follow-up question here —
what if my destination is a custom Transform stream class that doesn't support append mode.
Here is a code example for better understanding

import fetch from 'node-fetch';
import {Readable, pipeline, Transform};
import exceljs from 'exceljs';
import util from 'util';

const cpipeline = util.promisify(pipeline);

class ExcelStream extends Transform {
  constructor(options = {}) {
    super({ ...options, objectMode: true });
    this.i = 0;
    const o = {
      filename: "./test-file.xlsx",
      useStyles: true,
      useSharedStrings: true,
    };
    this.workbook = new exceljs.stream.xlsx.WorkbookWriter(o);
    this.workbook.creator = "ABCDEF";
    this.workbook.created = new Date();
    this.sheet = this.workbook.addWorksheet("Discussion Event");
    this.sheet.columns = [
      {
        header: "testColumn",
        width: 10,
        style: {
          font: {
            bold: true,
          },
        },
      },
    ];
  }

  async _transform(chunk, encoding, done) {
    try {
      this.i += 1;
      this.sheet.addRow([chunk]).commit();
    } catch (error) {
      console.log("Error while transforming data - commit failed");
    }
  }

  async _flush(cb) {
    try {
      this.sheet.commit();
      await this.workbook.commit();
    } catch (error) {
      console.log("Error cannot commit workbook");
    }
  }
}

const ss = new ExcelStream();

let i = 0;
while (true) {
  const result = await (
    await fetch(URL, {
      method: "POST",
      body: JSON.stringify(data),
      headers: {
        "Content-Type": "application/json",
        Connection: "keep-Alive",
      },
    })
  ).json();
  await cpipeline(
    Readable.from(result.data),
    async function* transform(src) {
      for await (const chunk of src) {
        yield JSON.stringify(chunk);
      }
    },
    ss
  );
   i += 1;
  if (i == 500) break;
}

I get the same warning as mentioned in the original issue. How to fix it?

@mscdex
Copy link
Contributor

mscdex commented Dec 14, 2022

AFAIK there is currently no way to prevent pipeline() from destroying/closing the destination stream. If you want to reuse the destination, you would have to use source.pipe(destination, { end: false }) instead.

@amustaque97
Copy link
Author

amustaque97 commented Dec 14, 2022

After removing the pipeline function from the code it looks something like the below, It works but I want to understand a few points.

  1. My code works only when I call r.destroy() and when I write something like this r.emit('close') it fails to write data to an excel file i.e. ExcelStream obj.
import fetch from 'node-fetch';
import {Readable, pipeline, Transform};
import exceljs from 'exceljs';
import util from 'util';

const cpipeline = util.promisify(pipeline);

class ExcelStream extends Transform {
  constructor(options = {}) {
    super({ ...options, objectMode: true });
    this.i = 0;
    const o = {
      filename: "./test-file.xlsx",
      useStyles: true,
      useSharedStrings: true,
    };
    this.workbook = new exceljs.stream.xlsx.WorkbookWriter(o);
    this.workbook.creator = "ABCDEF";
    this.workbook.created = new Date();
    this.sheet = this.workbook.addWorksheet("Discussion Event");
    this.sheet.columns = [
      {
        header: "testColumn",
        width: 10,
        style: {
          font: {
            bold: true,
          },
        },
      },
    ];
  }

  async _transform(chunk, encoding, done) {
    try {
      this.i += 1;
      this.sheet.addRow([chunk]).commit();
    } catch (error) {
      console.log("Error while transforming data - commit failed");
    }
  }

  async _flush(cb) {
    try {
      this.sheet.commit();
      await this.workbook.commit();
    } catch (error) {
      console.log("Error cannot commit workbook");
    }
  }
}

const ss = new ExcelStream();

let i = 0;
while (true) {
  const result = await (
    await fetch(URL, {
      method: "POST",
      body: JSON.stringify(data),
      headers: {
        "Content-Type": "application/json",
        Connection: "keep-Alive",
      },
    })
  ).json();
  const r = Readable.from(result.data);
      r.pipe(ss, { end: false });
      if (i === 5) {
        r.destroy(); // why destroy? why can't r.emit('close')
        break;
      }
   i += 1;
}
ss.end()
  1. When we write syntax Readable.from(['a', 'b', 'c', 'd', 'e']) while reading data from array ['a', 'b', 'c', 'd', 'e'] it is always going to pick one element each time?
  2. Could you please share some pointers on how to achieve errors and close conditions — cleanup basically the way pipeline method does it?

@mscdex
Copy link
Contributor

mscdex commented Dec 15, 2022

pipe() is async, so you'll need to create your own pipe() that returns a promise you can await on. Resolve the promise when you see the 'unpipe' event on your destination stream. This will keep your data from being mixed together.

However, since it seems like you're storing the entire http response in memory, you don't need Readable.from() at all. Just write() the response data directly to your destination stream and wait until the operation completes before the next iteration of the loop.

@aduh95
Copy link
Contributor

aduh95 commented Dec 15, 2022

You can use

import { pipeline } from 'node:stream/promises';

await pipeline(stream1, stream2, { end: false });

@mscdex
Copy link
Contributor

mscdex commented Dec 15, 2022

@aduh95 It seems that is undocumented?

@aduh95
Copy link
Contributor

aduh95 commented Dec 15, 2022

@aduh95 It seems that is undocumented?

That's being tracked in #45821.

@amustaque97
Copy link
Author

Hey @aduh95, thank you for the reply. Would it be possible to answer my follow-up query in the comment

If you're saying this:

import { pipeline } from 'node:stream/promises';

await pipeline(stream1, stream2, { end: false });

Then I'm getting an error in the second iteration of while loop and program terminates.

rror [ERR_STREAM_WRITE_AFTER_END]: write after end
    at new NodeError (node:internal/errors:371:5)
    at _write (node:internal/streams/writable:319:11)
    at ExcelStream.Writable.write (node:internal/streams/writable:334:10)
    at pump (node:internal/streams/pipeline:150:21)
    at processTicksAndRejections (node:internal/process/task_queues:96:5) {
  code: 'ERR_STREAM_WRITE_AFTER_END'
}

@mcollina
Copy link
Member

Could you provide a repository to reproduce the problem? Also include the server.

Something you could also do:

import fetch from "node-fetch";
import { Readable, Writable, pipeline } from "stream";
import { once } from "events";

const ss = new fs.createWriteStream("output.txt");

let i = 0;
while (true) {
  const result = await (
    await fetch(URL, {
      method: "POST",
      body: JSON.stringify(data),
      headers: {
        "Content-Type": "application/json",
        Connection: "keep-Alive",
      },
    })
  ).json();
  for await (const chunk of Readable.from(result.data)) {
    const toWrite = JSON.stringify(chunk);
    if (!ss.write(toWrite)) {
      await once(ss, 'drain');
    }
  )
  i += 1;
  if (i == 500) break;
}

@amustaque97
Copy link
Author

Hey @mcollina, thank you for the reply! Here is the repo link and please go through the README.md file.
https://github.com/amustaque97/node-streams
Looking forward to hearing from you

@mcollina
Copy link
Member

Here is my solution to this problem:

import fetch from 'node-fetch';
import {
  Readable,
  pipeline,
  Writable
} from 'stream';
import exceljs from 'exceljs';
import util from 'util';

const URL = 'https://jsonplaceholder.typicode.com/posts/'

const cpipeline = util.promisify(pipeline);

class ExcelStream extends Writable {
  constructor(options = {}) {
    super({
      ...options,
      objectMode: true
    });
    this.i = 0;
    const o = {
      filename: "./test-file.xlsx",
      useStyles: true,
      useSharedStrings: true,
    };
    this.workbook = new exceljs.stream.xlsx.WorkbookWriter(o);
    this.workbook.creator = "ABCDEF";
    this.workbook.created = new Date();
    this.sheet = this.workbook.addWorksheet("Discussion Event");
    this.sheet.columns = [{
      header: "testColumn",
      width: 10,
      style: {
        font: {
          bold: true,
        },
      },
    }, ];
  }

  _write (chunk, encoding, done) {
    console.log(chunk)
    try {
      this.i += 1;
      this.sheet.addRow([chunk]).commit();
    } catch (error) {
      console.log("Error while transforming data - commit failed");
      done(error)
      return
    }
    done()
  }

  _flush(cb) {
    try {
      this.sheet.commit();
      this.workbook.commit().then(() => cb(), cb);
    } catch (error) {
      console.log("Error cannot commit workbook");
    }
  }
}

async function run() {
  let i = 1;
  await cpipeline(async function * () {
    while (true) {
      let u = URL + i
      const response = await fetch(u)
      const result = await response.json()
      if (result) {
        yield result
      } else {
        break
      }

      i++
      if (i == 20) break
    }
  }, new ExcelStream())
}

run()

@joematune
Copy link

A note for future readers, fetch's response body is a readable stream. To pipe them to the same write stream outside of the loop's block scope, you can do:

import { pipeline } from "node:stream/promises";

const writeStream = fs.createWriteStream("output.txt");

while (true) {
  const response = await fetch(url, { /* ... */ });
  await pipeline(response.body, writeStream, { end: false });
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Issues that look for answers.
Projects
None yet
Development

No branches or pull requests

5 participants