Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make both node & web stream example the same #1

Open
jimmywarting opened this issue Sep 4, 2023 · 9 comments
Open

make both node & web stream example the same #1

jimmywarting opened this issue Sep 4, 2023 · 9 comments

Comments

@jimmywarting
Copy link

NodeJS streams as well as web streams are both async iterable and both yields uint8arrays. and both env have TextDecoderStream in the global namespace

so there is no need to use eventEmitter and writing different code for both.

so this

let stream = fs.createReadStream(filePath);

let parser = null;
let result = null;

stream.on('data', (chunk) => {
  // convert from Buffer
  let strChunk = chunk.toString();
  // on first chunk, infer schema and init parser
  parser ??= initParser(inferSchema(strChunk));
  // incremental parse to string arrays
  parser.chunk(strChunk, parser.stringArrs);
});

stream.on('end', () => {
  result = p.end();
});

and this:

let stream = fs.createReadStream(filePath);

let webStream = Stream.Readable.toWeb(stream);
let textStream = webStream.pipeThrough(new TextDecoderStream());

let parser = null;

for await (const strChunk of textStream) {
  parser ??= initParser(inferSchema(strChunk));
  parser.chunk(strChunk, parser.stringArrs);
}

let result = parser.end();

could be written as:

for await (const chunk of stream) {
  parser ??= initParser(inferSchema(strChunk.toString()));
  parser.chunk(strChunk, parser.stringArrs);
});

for await (const strChunk of textStream) {
  parser ??= initParser(inferSchema(strChunk));
  parser.chunk(strChunk, parser.stringArrs);
}
@jimmywarting
Copy link
Author

but i would change the node example to use TextDecoderStream in any case instead of relying on NodeJS specific stuff for better cross comp with other platform.

And i would have used ReadableStream.from(iterable) instead of importing node:streams that would add other dependencies into your final web bundle.

let stream = fs.createReadStream(filePath);

let webStream = ReadableStream.from(stream);
let textStream = webStream.pipeThrough(new TextDecoderStream());

let parser = null;

for await (const strChunk of textStream) {
  parser ??= initParser(inferSchema(strChunk));
  parser.chunk(strChunk, parser.stringArrs);
}

let result = parser.end();

@jimmywarting
Copy link
Author

jimmywarting commented Sep 4, 2023

there is also this new fs.openAsBlob(path) that returns a blob with a .stream() method.

let blob = await fs.openAsBlob(filePath);
let stream = blob.stream().pipeThrough(new TextDecoderStream())

let parser = null;

for await (const strChunk of textStream) {
  parser ??= initParser(inferSchema(strChunk));
  parser.chunk(strChunk, parser.stringArrs);
}

let result = parser.end();

but it requires node v20+

@jimmywarting
Copy link
Author

jimmywarting commented Sep 4, 2023

maybe you could also get some perf boost by moving the parser ??= initParser(inferSchema(strChunk)); out of the loop?

let blob = await fs.openAsBlob(filePath);
let stream = blob.stream().pipeThrough(new TextDecoderStream());

let iterator = stream.values();

let firstChunk = await await iterator.next()
let parser = initParser(inferSchema(firstChunk.value));

parser.chunk(firstChunk.value, parser.stringArrs);

// continue on the same iterator.
for await (const strChunk of iterator) {
  parser.chunk(strChunk, parser.stringArrs);
}

let result = parser.end();

@jimmywarting
Copy link
Author

could also use two loops as described by this: https://stackoverflow.com/a/51020535/1008999

let blob = await fs.openAsBlob(filePath);
let stream = blob.stream().pipeThrough(new TextDecoderStream());
let iterator = stream.values();

// this will only loop 1 time.
for await (const strChunk of iterator) {
  const parser = initParser(inferSchema(strChunk));
  parser.chunk(strChunk, parser.stringArrs);

  // continue on the same iterator by consuming the rest of the iterator.
  for await (const strChunk of iterator) {
    parser.chunk(strChunk, parser.stringArrs);
  }

  let result = parser.end();
}

@leeoniya leeoniya reopened this Sep 4, 2023
@leeoniya
Copy link
Owner

leeoniya commented Sep 4, 2023

hey, yeah we can try these out and see.

in my testing, the older/more mature apis tend to be faster. for example, the .toWeb variant is like 20%-30% slower.

all these variations would be good for a wiki page or something, but i'd like to keep the fastest option in the main readme.

this thread is kind of nice affirmation that i made the right choice to keep the chunk-feeding pipeline external to the core.

maybe you could also get some perf boost by moving the parser ??= initParser(inferSchema(strChunk)); out of the loop?

the chunk size is fixed to 64KB [1], so you have 16 of these per MB. if we're maxing out at about 300MB/s then that's 4800 null checks that should be noise-level (even if the JIT doesnt optimize them out, which looks like it should be easy). i'll bet that nullish coalesce on its own is on the order of millions/sec. i'm happy to be proven wrong...with data ;)

[1] nodejs/node#41611 (comment)

@leeoniya
Copy link
Owner

leeoniya commented Sep 5, 2023

it looks like nodejs/node#49089 just landed in v20.6.0: https://nodejs.org/en/blog/release/v20.6.0 👀

@ollyfg
Copy link

ollyfg commented Sep 7, 2023

Not really worth opening as it's own issue, especially if these examples are getting reworked anyhow, but there is a typo in the node example here

That variable should be called parser, not p!

@leeoniya
Copy link
Owner

leeoniya commented Sep 7, 2023

oops!

@jimmywarting
Copy link
Author

Wanted to open up a discussion. But that was closed.
I know it was not a issue so i closed it after i wrote this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants