Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs: add FileHandle.prototype.readableStream() #39331

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,52 @@ Reads data from the file and stores that in the given buffer.
If the file is not modified concurrently, the end-of-file is reached when the
number of bytes read is zero.

#### `filehandle.readableWebStream()`
<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental

* Returns: {ReadableStream}

Returns a `ReadableStream` that may be used to read the files data.

An error will be thrown if this method is called more than once or is called
after the `FileHandle` is closed or closing.

```mjs
import {
open,
} from 'node:fs/promises';

const file = await open('./some/file/to/read');

for await (const chunk of file.readableWebStream())
console.log(chunk);

await file.close();
```

```cjs
const {
open,
} = require('fs/promises');

(async () => {
const file = await open('./some/file/to/read');

for await (const chunk of file.readableWebStream())
console.log(chunk);

await file.close();
})();
```

While the `ReadableStream` will read the file to completion, it will not
close the `FileHandle` automatically. User code must still call the
`fileHandle.close()` method.

#### `filehandle.readFile(options)`
<!-- YAML
added: v10.0.0
Expand Down
39 changes: 37 additions & 2 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const {
codes: {
ERR_FS_FILE_TOO_LARGE,
ERR_INVALID_ARG_VALUE,
ERR_INVALID_STATE,
ERR_METHOD_NOT_IMPLEMENTED,
},
AbortError,
Expand Down Expand Up @@ -90,12 +91,21 @@ const kCloseResolve = Symbol('kCloseResolve');
const kCloseReject = Symbol('kCloseReject');
const kRef = Symbol('kRef');
const kUnref = Symbol('kUnref');
const kLocked = Symbol('kLocked');

const { kUsePromises } = binding;
const {
JSTransferable, kDeserialize, kTransfer, kTransferList
} = require('internal/worker/js_transferable');

const {
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');

const {
readableStreamCancel,
} = require('internal/webstreams/readablestream');

const getDirectoryEntriesPromise = promisify(getDirents);
const validateRmOptionsPromise = promisify(validateRmOptions);

Expand Down Expand Up @@ -209,6 +219,33 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
return this[kClosePromise];
}

/**
* @typedef {import('../webstreams/readablestream').ReadableStream
* } ReadableStream
* @returns {ReadableStream}
*/
readableWebStream() {
if (this[kFd] === -1)
throw new ERR_INVALID_STATE('The FileHandle is closed');
if (this[kClosePromise])
throw new ERR_INVALID_STATE('The FileHandle is closing');
if (this[kLocked])
throw new ERR_INVALID_STATE('The FileHandle is locked');
this[kLocked] = true;

const readable = newReadableStreamFromStreamBase(
this[kHandle],
undefined,
{ ondone: () => this[kUnref]() });

this[kRef]();
this.once('close', () => {
readableStreamCancel(readable);
});

return readable;
}
jasnell marked this conversation as resolved.
Show resolved Hide resolved

[kTransfer]() {
if (this[kClosePromise] || this[kRefs] > 1) {
throw lazyDOMException('Cannot transfer FileHandle while in use',
Expand Down Expand Up @@ -788,8 +825,6 @@ module.exports = {
appendFile,
readFile,
watch,

kHandle,
},

FileHandle,
Expand Down
21 changes: 20 additions & 1 deletion lib/internal/webstreams/adapters.js
Original file line number Diff line number Diff line change
Expand Up @@ -859,12 +859,20 @@ function newWritableStreamFromStreamBase(streamBase, strategy) {
* @param {QueuingStrategy} strategy
* @returns {ReadableStream}
*/
function newReadableStreamFromStreamBase(streamBase, strategy) {
function newReadableStreamFromStreamBase(streamBase, strategy, options = {}) {
validateObject(streamBase, 'streamBase');
validateObject(options, 'options');

const {
ondone = () => {},
} = options;

if (typeof streamBase.onread === 'function')
throw new ERR_INVALID_STATE('StreamBase already has a consumer');

if (typeof ondone !== 'function')
throw new ERR_INVALID_ARG_TYPE('options.ondone', 'Function', ondone);

let controller;

streamBase.onread = (arrayBuffer) => {
Expand All @@ -877,6 +885,11 @@ function newReadableStreamFromStreamBase(streamBase, strategy) {
if (nread === UV_EOF) {
controller.close();
streamBase.readStop();
try {
ondone();
} catch (error) {
controller.error(error);
}
return;
}

Expand All @@ -899,6 +912,12 @@ function newReadableStreamFromStreamBase(streamBase, strategy) {

cancel() {
const promise = createDeferredPromise();
try {
ondone();
} catch (error) {
promise.reject(error);
return promise.promise;
}
const req = new ShutdownWrap();
req.oncomplete = () => promise.resolve();
const err = streamBase.shutdown(req);
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const {
const {
isArrayBufferView,
isDataView,
} = require('util/types');
} = require('internal/util/types');

const {
createDeferredPromise,
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/webstreams/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const {

const {
isPromise,
} = require('util/types');
} = require('internal/util/types');

const {
inspect,
Expand Down
73 changes: 40 additions & 33 deletions src/node_file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -345,44 +345,51 @@ MaybeLocal<Promise> FileHandle::ClosePromise() {
Isolate* isolate = env()->isolate();
EscapableHandleScope scope(isolate);
Local<Context> context = env()->context();

Local<Value> close_resolver =
object()->GetInternalField(FileHandle::kClosingPromiseSlot);
if (!close_resolver.IsEmpty() && !close_resolver->IsUndefined()) {
CHECK(close_resolver->IsPromise());
return close_resolver.As<Promise>();
}

CHECK(!closed_);
CHECK(!closing_);
CHECK(!reading_);

auto maybe_resolver = Promise::Resolver::New(context);
CHECK(!maybe_resolver.IsEmpty());
Local<Promise::Resolver> resolver = maybe_resolver.ToLocalChecked();
Local<Promise> promise = resolver.As<Promise>();
CHECK(!reading_);
if (!closed_ && !closing_) {
closing_ = true;
Local<Object> close_req_obj;
if (!env()
->fdclose_constructor_template()
->NewInstance(env()->context())
.ToLocal(&close_req_obj)) {
return MaybeLocal<Promise>();
}
CloseReq* req = new CloseReq(env(), close_req_obj, promise, object());
auto AfterClose = uv_fs_callback_t{[](uv_fs_t* req) {
std::unique_ptr<CloseReq> close(CloseReq::from_req(req));
CHECK_NOT_NULL(close);
close->file_handle()->AfterClose();
Isolate* isolate = close->env()->isolate();
if (req->result < 0) {
HandleScope handle_scope(isolate);
close->Reject(
UVException(isolate, static_cast<int>(req->result), "close"));
} else {
close->Resolve();
}
}};
int ret = req->Dispatch(uv_fs_close, fd_, AfterClose);
if (ret < 0) {
req->Reject(UVException(isolate, ret, "close"));
delete req;

Local<Object> close_req_obj;
if (!env()->fdclose_constructor_template()
->NewInstance(env()->context()).ToLocal(&close_req_obj)) {
return MaybeLocal<Promise>();
}
closing_ = true;
object()->SetInternalField(FileHandle::kClosingPromiseSlot, promise);

CloseReq* req = new CloseReq(env(), close_req_obj, promise, object());
auto AfterClose = uv_fs_callback_t{[](uv_fs_t* req) {
std::unique_ptr<CloseReq> close(CloseReq::from_req(req));
CHECK_NOT_NULL(close);
close->file_handle()->AfterClose();
Isolate* isolate = close->env()->isolate();
if (req->result < 0) {
HandleScope handle_scope(isolate);
close->Reject(
UVException(isolate, static_cast<int>(req->result), "close"));
} else {
close->Resolve();
}
} else {
// Already closed. Just reject the promise immediately
resolver->Reject(context, UVException(isolate, UV_EBADF, "close"))
.Check();
}};
int ret = req->Dispatch(uv_fs_close, fd_, AfterClose);
if (ret < 0) {
req->Reject(UVException(isolate, ret, "close"));
delete req;
}

return scope.Escape(promise);
}

Expand Down Expand Up @@ -2538,7 +2545,7 @@ void Initialize(Local<Object> target,
env->SetProtoMethod(fd, "close", FileHandle::Close);
env->SetProtoMethod(fd, "releaseFD", FileHandle::ReleaseFD);
Local<ObjectTemplate> fdt = fd->InstanceTemplate();
fdt->SetInternalFieldCount(StreamBase::kInternalFieldCount);
fdt->SetInternalFieldCount(FileHandle::kInternalFieldCount);
StreamBase::AddMethods(env, fd);
env->SetConstructorFunction(target, "FileHandle", fd);
env->set_fd_constructor_template(fdt);
Expand Down
6 changes: 6 additions & 0 deletions src/node_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ class FileHandleReadWrap final : public ReqWrap<uv_fs_t> {
// the object is garbage collected
class FileHandle final : public AsyncWrap, public StreamBase {
public:
enum InternalFields {
kFileHandleBaseField = StreamBase::kInternalFieldCount,
kClosingPromiseSlot,
kInternalFieldCount
};

static FileHandle* New(BindingData* binding_data,
int fd,
v8::Local<v8::Object> obj = v8::Local<v8::Object>());
Expand Down
5 changes: 5 additions & 0 deletions test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ const expectedModules = new Set([
'NativeModule internal/util/inspect',
'NativeModule internal/util/iterable_weak_map',
'NativeModule internal/util/types',
'NativeModule internal/webstreams/util',
'NativeModule internal/webstreams/writablestream',
'NativeModule internal/webstreams/readablestream',
'NativeModule internal/webstreams/queuingstrategies',
'NativeModule internal/webstreams/adapters',
'NativeModule internal/validators',
'NativeModule internal/vm/module',
'NativeModule internal/worker/io',
Expand Down
87 changes: 87 additions & 0 deletions test/parallel/test-filehandle-readablestream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
'use strict';

const common = require('../common');
const assert = require('assert');

const {
readFileSync,
} = require('fs');

const {
open,
} = require('fs/promises');

const check = readFileSync(__filename, { encoding: 'utf8' });

// Make sure the ReadableStream works...
(async () => {
const dec = new TextDecoder();
const file = await open(__filename);
let data = '';
for await (const chunk of file.readableWebStream())
data += dec.decode(chunk);

assert.strictEqual(check, data);

assert.throws(() => file.readableWebStream(), {
code: 'ERR_INVALID_STATE',
});

await file.close();
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream fails if the
// FileHandle is already closed.
(async () => {
const file = await open(__filename);
await file.close();

assert.throws(() => file.readableWebStream(), {
code: 'ERR_INVALID_STATE',
});
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream fails if the
// FileHandle is already closing.
(async () => {
const file = await open(__filename);
file.close();

assert.throws(() => file.readableWebStream(), {
code: 'ERR_INVALID_STATE',
});
})().then(common.mustCall());

// Make sure the ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream();
const reader = readable.getReader();
file.close();
await reader.closed;
})().then(common.mustCall());

// Make sure the ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream();
file.close();
const reader = readable.getReader();
await reader.closed;
})().then(common.mustCall());

// Make sure that the FileHandle is properly marked "in use"
// when a ReadableStream has been acquired for it.
(async () => {
const file = await open(__filename);
file.readableWebStream();
const mc = new MessageChannel();
mc.port1.onmessage = common.mustNotCall();
assert.throws(() => mc.port2.postMessage(file, [file]), {
code: 25 // DataCloneError
});
mc.port1.close();
await file.close();
})().then(common.mustCall());