Skip to content

Commit

Permalink
fs: implement byob mode for readableWebStream()
Browse files Browse the repository at this point in the history
Fixes: #45853
PR-URL: #46933
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
debadree25 authored Apr 10, 2023
1 parent a57b8dd commit a9c3d92
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 17 deletions.
10 changes: 9 additions & 1 deletion doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,14 +446,22 @@ Reads data from the file and stores that in the given buffer.
If the file is not modified concurrently, the end-of-file is reached when the
number of bytes read is zero.
#### `filehandle.readableWebStream()`
#### `filehandle.readableWebStream(options)`
<!-- YAML
added: v17.0.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/46933
description: Added option to create a 'bytes' stream.
-->
> Stability: 1 - Experimental
* `options` {Object}
* `type` {string|undefined} Whether to open a normal or a `'bytes'` stream.
**Default:** `undefined`
* Returns: {ReadableStream}
Returns a `ReadableStream` that may be used to read the files data.
Expand Down
77 changes: 61 additions & 16 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const {
SafePromisePrototypeFinally,
Symbol,
Uint8Array,
FunctionPrototypeBind,
} = primordials;

const { fs: constants } = internalBinding('constants');
Expand Down Expand Up @@ -249,29 +250,73 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
* } ReadableStream
* @returns {ReadableStream}
*/
readableWebStream() {
readableWebStream(options = kEmptyObject) {
if (this[kFd] === -1)
throw new ERR_INVALID_STATE('The FileHandle is closed');
if (this[kClosePromise])
throw new ERR_INVALID_STATE('The FileHandle is closing');
if (this[kLocked])
throw new ERR_INVALID_STATE('The FileHandle is locked');
this[kLocked] = true;
const {
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');
const readable = newReadableStreamFromStreamBase(
this[kHandle],
undefined,
{ ondone: () => this[kUnref]() });

const {
readableStreamCancel,
} = require('internal/webstreams/readablestream');
this[kRef]();
this.once('close', () => {
readableStreamCancel(readable);
});

if (options.type !== undefined) {
validateString(options.type, 'options.type');
}

let readable;

if (options.type !== 'bytes') {
const {
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');
readable = newReadableStreamFromStreamBase(
this[kHandle],
undefined,
{ ondone: () => this[kUnref]() });

const {
readableStreamCancel,
} = require('internal/webstreams/readablestream');
this[kRef]();
this.once('close', () => {
readableStreamCancel(readable);
});
} else {
const {
readableStreamCancel,
ReadableStream,
} = require('internal/webstreams/readablestream');

const readFn = FunctionPrototypeBind(this.read, this);
const ondone = FunctionPrototypeBind(this[kUnref], this);

readable = new ReadableStream({
type: 'bytes',
autoAllocateChunkSize: 16384,

async pull(controller) {
const view = controller.byobRequest.view;
const { bytesRead } = await readFn(view, view.byteOffset, view.byteLength);

if (bytesRead === 0) {
ondone();
controller.close();
}

controller.byobRequest.respond(bytesRead);
},

cancel() {
ondone();
},
});

this[kRef]();

this.once('close', () => {
readableStreamCancel(readable);
});
}

return readable;
}
Expand Down
84 changes: 84 additions & 0 deletions test/parallel/test-filehandle-readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,87 @@ const check = readFileSync(__filename, { encoding: 'utf8' });
mc.port1.close();
await file.close();
})().then(common.mustCall());

// Make sure 'bytes' stream works
(async () => {
const file = await open(__filename);
const dec = new TextDecoder();
const readable = file.readableWebStream({ type: 'bytes' });
const reader = readable.getReader({ mode: 'byob' });

let data = '';
let result;
do {
const buff = new ArrayBuffer(100);
result = await reader.read(new DataView(buff));
if (result.value !== undefined) {
data += dec.decode(result.value);
assert.ok(result.value.byteLength <= 100);
}
} while (!result.done);

assert.strictEqual(check, data);

assert.throws(() => file.readableWebStream(), {
code: 'ERR_INVALID_STATE',
});

await file.close();
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream 'bytes' stream
// fails if the FileHandle is already closed.
(async () => {
const file = await open(__filename);
await file.close();

assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
code: 'ERR_INVALID_STATE',
});
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream 'bytes' stream
// fails if the FileHandle is already closing.
(async () => {
const file = await open(__filename);
file.close();

assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
code: 'ERR_INVALID_STATE',
});
})().then(common.mustCall());

// Make sure the 'bytes' ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream({ type: 'bytes' });
const reader = readable.getReader({ mode: 'byob' });
file.close();
await reader.closed;
})().then(common.mustCall());

// Make sure the 'bytes' ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream({ type: 'bytes' });
file.close();
const reader = readable.getReader({ mode: 'byob' });
await reader.closed;
})().then(common.mustCall());

// Make sure that the FileHandle is properly marked "in use"
// when a 'bytes' ReadableStream has been acquired for it.
(async () => {
const file = await open(__filename);
file.readableWebStream({ type: 'bytes' });
const mc = new MessageChannel();
mc.port1.onmessage = common.mustNotCall();
assert.throws(() => mc.port2.postMessage(file, [file]), {
code: 25,
name: 'DataCloneError',
});
mc.port1.close();
await file.close();
})().then(common.mustCall());

0 comments on commit a9c3d92

Please sign in to comment.