Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to pipe a writeable stream? #472

Closed
jz222 opened this issue Jun 24, 2020 · 4 comments
Closed

How to pipe a writeable stream? #472

jz222 opened this issue Jun 24, 2020 · 4 comments
Labels

Comments

@jz222
Copy link

jz222 commented Jun 24, 2020

Hi,

I would like to pipe the json2csv parser to a writeable stream to directly write the CSV file to a bucket without blocking the event loop, as we are processing rather large sets of data. I came up with the solution below. However, I was hoping there is a more elegant solution. I already have the data as an array of objects. Each object represents one row in the CSV file. What I don't like is that I loop through the data array and I was wondering how this can be avoided.

const { AsyncParser } = require('json2csv');

// Create async parser
const opts = { delimiter: separator || ',', quote: '"', flatten: false };
const transformOpts = { objectMode: true };
const asyncParser = new AsyncParser(opts, transformOpts);

// Create bucket and file instance
const bucket = storage.bucket(bucketName);
const file = bucket.file(fileName);

// Create writeable stream
const writableStream = file.createWriteStream();

// Pipe stream
asyncParser.toOutput(writableStream);

// dataArray is an array of objects. Each object represents one row in the CSV. I would like to avoid this loop.
for (let obj of dataArray) {
    asyncParser.input.push(obj);
}

asyncParser.input.push(null);

writableStream
    .on('finish', async () => console.log('successfully uploaded'));
@juanjoDiaz
Copy link
Collaborator

Hi @jz222 ,

Why do you want to remove the loop? It's just a loop and won't take any extra memory or anything.

You can use the functional Array method forEach

dataArray.forEach(obj => asyncParser.input.push(obj));
asyncParser.input.push(null);

#468 will allow you to do

asyncParser
  .from(dataArray)
  .to(writableStream);

But won't be in until v6.

In any case, where are you getting dataArray from? If dataArray comes from a database, and API call, etc., you should be able to get it as a ReadableStream and pass it to asyncParser.from(readableStream) so you don't need to load the whole thing in memory.

@knownasilya
Copy link
Collaborator

Might be a good example showing a writeable stream. Maybe a recipes section would be nice in docs.

@jz222
Copy link
Author

jz222 commented Jun 25, 2020

Thanks for getting back to me @juanjoDiaz. We are working on an integration platform. The dataArray can, therefore, originate from different sources. Since we perform various data manipulation steps, dataArray will as a result always be an in-memory array. It can contain up to 700-800k elements and I was hoping to avoid looping through the entire thing. The changes in V6 look promising.

Thank you for your time.

@juanjoDiaz
Copy link
Collaborator

No problem.

If you are integrating data from various sources and doing data manipulations, I'd really recommend that you look into doing everything with streams since they allow you to keep the memory usage under control regardless of the amount of data that you are processing.
ReadableStreams can be created easily for HTTP requests, reading local files, database queries, etc.
Data Transformations are easy to create as a Transform.
And WritableStreams can be created to send the data to an HTTP endpoint, a database, Amazon S3, etc.

But that's just my advice that no one asked for 😅

I'll close this issue. Feel free to reopen if you feel that there is something still to be answered.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants