Skip to content

Commit

Permalink
[Filesystem] workaround for nodejs/node#35862
Browse files Browse the repository at this point in the history
  • Loading branch information
alekitto committed Oct 30, 2020
1 parent c9358b5 commit 0c52338
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 17 deletions.
54 changes: 54 additions & 0 deletions src/StreamWrapper/File/ReadableStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { Readable } from 'stream';

/**
* @memberOf Jymfony.Component.Filesystem.StreamWrapper.File
* @internal Use this instead of fs.createReadableStream led to undefined behavior
* (https://github.com/nodejs/node/issues/35862).
*/
export default class ReadableStream extends Readable {
/**
* Constructor
*
* @param {Jymfony.Component.Filesystem.StreamWrapper.File.Resource} resource
*/
constructor(resource) {
super();

/**
* @type {Jymfony.Component.Filesystem.StreamWrapper.File.Resource}
*
* @private
*/
this._resource = resource;
}

/**
* @param {number} size
*
* @returns {Promise<void>}
*/
async _read(size) {
const handle = this._resource.handle;
let result;

try {
do {
const { buffer, bytesRead } = await handle.read(Buffer.alloc(size), 0, size, this._resource.position);

if (0 === bytesRead) {
this.push(null);
return;
}

this._resource.advance(bytesRead);
if (bytesRead !== size) {
result = this.push(buffer.slice(0, bytesRead));
} else {
result = this.push(buffer);
}
} while (result);
} catch (err) {
this.destroy(err);
}
}
}
48 changes: 48 additions & 0 deletions src/StreamWrapper/File/WritableStream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { Writable } from 'stream';

/**
* @memberOf Jymfony.Component.Filesystem.StreamWrapper.File
*/
export default class WritableStream extends Writable {
/**
* Constructor
*
* @param {Jymfony.Component.Filesystem.StreamWrapper.File.Resource} resource
*/
constructor(resource) {
super();

/**
* @type {Jymfony.Component.Filesystem.StreamWrapper.File.Resource}
*
* @private
*/
this._resource = resource;
}

/**
* @param {Buffer|string} chunk
* @param {string} encoding
* @param {Function} callback
*
* @returns {Promise<void>}
*
* @private
*/
async _write(chunk, encoding, callback) {
const handle = this._resource.handle;

try {
if (! isBuffer(chunk)) {
chunk = Buffer.from(chunk, encoding);
}

const { bytesWritten } = await handle.write(chunk, 0, chunk.length, this._resource.position);
this._resource.advance(bytesWritten);

callback();
} catch (err) {
callback(err);
}
}
}
15 changes: 5 additions & 10 deletions src/StreamWrapper/FileStreamWrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import {
symlink,
unlink
} from 'fs/promises';
import { createReadStream, createWriteStream } from 'fs';
import { dirname, resolve as pathResolve } from 'path';
import { parse as urlParse } from 'url';

const File = Jymfony.Component.Filesystem.File;
const AbstractStreamWrapper = Jymfony.Component.Filesystem.StreamWrapper.AbstractStreamWrapper;
const File = Jymfony.Component.Filesystem.File;
const ReadableStream = Jymfony.Component.Filesystem.StreamWrapper.File.ReadableStream;
const Resource = Jymfony.Component.Filesystem.StreamWrapper.File.Resource;
const StreamWrapperInterface = Jymfony.Component.Filesystem.StreamWrapper.StreamWrapperInterface;
const WritableStream = Jymfony.Component.Filesystem.StreamWrapper.File.WritableStream;

const Storage = function () {};
Storage.prototype = {};
Expand Down Expand Up @@ -201,20 +202,14 @@ export default class FileStreamWrapper extends AbstractStreamWrapper {
* @inheritdoc
*/
createReadableStream(resource) {
return createReadStream(null, {
fd: resource.handle.fd,
autoClose: false,
});
return new ReadableStream(resource);
}

/**
* @inheritdoc
*/
createWritableStream(resource) {
return createWriteStream(null, {
fd: resource.handle.fd,
autoClose: false,
});
return new WritableStream(resource);
}

/**
Expand Down
34 changes: 27 additions & 7 deletions test/OpenFileTest.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Readable, Writable } from 'stream';
import { fsyncSync, readFileSync } from 'fs';
import { expect } from 'chai';
import { promisify } from 'util';

const OpenFile = Jymfony.Component.Filesystem.OpenFile;
const FileStreamWrapper = Jymfony.Component.Filesystem.StreamWrapper.FileStreamWrapper;

const { expect } = require('chai');
const fs = require('fs');
const stream = require('stream');

describe('[Filesystem] OpenFile', function () {
beforeEach(() => {
Expand Down Expand Up @@ -33,24 +35,42 @@ describe('[Filesystem] OpenFile', function () {
const file = await new OpenFile(__dirname + '/../fixtures/TESTFILE.txt', 'r');
const readable = await file.createReadableStream();

expect(readable).to.be.instanceOf(stream.Readable);
expect(readable).to.be.instanceOf(Readable);
readable.on('data', buf => {
expect(buf.toString('utf-8')).to.be.equal('THIS IS A TEST\n');
});

await new Promise((resolve, reject) => {
readable.on('end', resolve);
readable.on('error', reject);

readable.read();
});

await file.close();
});

it('createWritableStream should return a stream', async () => {
const file = await new OpenFile(__dirname + '/../fixtures/WRITEFILE', 'w');
const path = __dirname + '/../fixtures/WRITEFILE';
const file = await new OpenFile(path, 'w');
const writable = await file.createWritableStream();

expect(writable).to.be.instanceOf(stream.Writable);
expect(writable).to.be.instanceOf(Writable);
await promisify(Writable.prototype.write).call(writable, 'This is ', 'utf-8');
await promisify(Writable.prototype.write).call(writable, 'a te', 'utf-8');
await promisify(Writable.prototype.write).call(writable, 'st of writing', 'utf-8');

await file.close();

const str = readFileSync(path, { encoding: 'utf-8' });
expect(str).to.be.equal('This is a test of writing');
});

it('fwrite should write to file', async () => {
const file = await new OpenFile(__dirname + '/../fixtures/WRITEFILE', 'w');
expect(await file.fwrite(Buffer.from('TEST FILE'))).to.be.equal(9);
fs.fsyncSync((await file._resource).handle.fd);

fsyncSync((await file._resource).handle.fd);
expect(await file.getSize()).to.be.equal(9);

await file.close();
Expand Down

0 comments on commit 0c52338

Please sign in to comment.