Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ZJONSSON committed May 11, 2024
1 parent ca37f3b commit 9ef3ade
Show file tree
Hide file tree
Showing 8 changed files with 378 additions and 367 deletions.
13 changes: 13 additions & 0 deletions jsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"compilerOptions": {
"checkJs": true,
"target": "ES2022",
"moduleResolution":"node",
"types": ["node"]
},
"exclude": [
"node_modules",
"test",
"coverage"
]
}
2 changes: 1 addition & 1 deletion lib/BufferStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const Stream = require('stream');
module.exports = function(entry) {
return new Promise(function(resolve, reject) {
const chunks = [];
const bufferStream = Stream.Transform()
const bufferStream = new Stream.Transform()
.on('finish', function() {
resolve(Buffer.concat(chunks));
})
Expand Down
10 changes: 5 additions & 5 deletions lib/Open/directory.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const signature = Buffer.alloc(4);
signature.writeUInt32LE(0x06054b50, 0);

async function getCrxHeader(source) {
const sourceStream = source.stream(0).pipe(PullStream());
const sourceStream = source.stream(0).pipe(new PullStream());

let data = await sourceStream.pull(4);
const signature = data.readUInt32LE(0);
Expand Down Expand Up @@ -46,7 +46,7 @@ function getZip64CentralDirectory(source, zip64CDL) {
throw new Error('invalid zip64 end of central dir locator signature (0x07064b50): 0x' + d64loc.signature.toString(16));
}

const dir64 = PullStream();
const dir64 = new PullStream();
source.stream(d64loc.offsetToStartOfCentralDirectory).pipe(dir64);

return dir64.pull(56);
Expand Down Expand Up @@ -75,8 +75,8 @@ function parseZip64DirRecord (dir64record) {
}

module.exports = async function centralDirectory(source, options) {
const endDir = PullStream();
const records = PullStream();
const endDir = new PullStream();
const records = new PullStream();
const tailSize = (options && options.tailSize) || 80;
let crxHeader, vars;

Expand Down Expand Up @@ -115,7 +115,7 @@ module.exports = async function centralDirectory(source, options) {
// Offset to zip64 CDL is 20 bytes before normal CDR
const zip64CDLSize = 20;
const zip64CDLOffset = sourceSize - (tailSize - endDir.match + zip64CDLSize);
const zip64CDLStream = PullStream();
const zip64CDLStream = new PullStream();

source.stream(zip64CDLOffset).pipe(zip64CDLStream);

Expand Down
2 changes: 1 addition & 1 deletion lib/Open/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module.exports = {
buffer: function(buffer, options) {
const source = {
stream: function(offset, length) {
const stream = Stream.PassThrough();
const stream = new Stream.PassThrough();
const end = length ? offset + length : undefined;
stream.end(buffer.slice(offset, end));
return stream;
Expand Down
6 changes: 3 additions & 3 deletions lib/Open/unzip.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ const parseDateTime = require('../parseDateTime');
const parseBuffer = require('../parseBuffer');

module.exports = function unzip(source, offset, _password, directoryVars, length, _entry) {
const file = PullStream();
const entry = _entry || Stream.PassThrough();
const file = new PullStream();
const entry = _entry || new Stream.PassThrough();

const req = source.stream(offset, length);
req.pipe(file).on('error', function(e) {
Expand Down Expand Up @@ -85,7 +85,7 @@ module.exports = function unzip(source, offset, _password, directoryVars, length
const fileSizeKnown = !(vars.flags & 0x08) || vars.compressedSize > 0;
let eof;

const inflater = vars.compressionMethod ? zlib.createInflateRaw() : Stream.PassThrough();
const inflater = vars.compressionMethod ? zlib.createInflateRaw() : new Stream.PassThrough();

if (fileSizeKnown) {
entry.size = vars.uncompressedSize;
Expand Down
220 changes: 109 additions & 111 deletions lib/PullStream.js
Original file line number Diff line number Diff line change
@@ -1,139 +1,137 @@
const Stream = require('stream');
const util = require('util');
const strFunction = 'function';

function PullStream() {
if (!(this instanceof PullStream))
return new PullStream();

Stream.Duplex.call(this, {decodeStrings:false, objectMode:true});
this.buffer = Buffer.from('');
const self = this;
self.on('finish', function() {
self.finished = true;
self.emit('chunk', false);
});
}

util.inherits(PullStream, Stream.Duplex);
class PullStream extends Stream.Duplex {
finished;
match;
__emittedError;
constructor(opts) {
super({decodeStrings:false, objectMode:true});
this._opts = opts;
this.buffer = Buffer.from('');
this.on('finish', () => {
this.finished = true;
this.emit('chunk', false);
});
}

PullStream.prototype._write = function(chunk, e, cb) {
this.buffer = Buffer.concat([this.buffer, chunk]);
this.cb = cb;
this.emit('chunk');
};
_write(chunk, e, cb) {
this.buffer = Buffer.concat([this.buffer, chunk]);
this.cb = cb;
this.emit('chunk');
};


// The `eof` parameter is interpreted as `file_length` if the type is number
// otherwise (i.e. buffer) it is interpreted as a pattern signaling end of stream
PullStream.prototype.stream = function(eof, includeEof) {
const p = Stream.PassThrough();
let done;
const self= this;
// The `eof` parameter is interpreted as `file_length` if the type is number
// otherwise (i.e. buffer) it is interpreted as a pattern signaling end of stream
stream(eof, includeEof) {
const p = new Stream.PassThrough();
let done;
const self= this;

function cb() {
if (typeof self.cb === strFunction) {
const callback = self.cb;
self.cb = undefined;
return callback();
function cb() {
if (typeof self.cb === strFunction) {
const callback = self.cb;
self.cb = undefined;
return callback();
}
}
}

function pull() {
let packet;
if (self.buffer && self.buffer.length) {
if (typeof eof === 'number') {
packet = self.buffer.slice(0, eof);
self.buffer = self.buffer.slice(eof);
eof -= packet.length;
done = done || !eof;
} else {
let match = self.buffer.indexOf(eof);
if (match !== -1) {
function pull() {
let packet;
if (self.buffer && self.buffer.length) {
if (typeof eof === 'number') {
packet = self.buffer.slice(0, eof);
self.buffer = self.buffer.slice(eof);
eof -= packet.length;
done = done || !eof;
} else {
let match = self.buffer.indexOf(eof);
if (match !== -1) {
// store signature match byte offset to allow us to reference
// this for zip64 offset
self.match = match;
if (includeEof) match = match + eof.length;
packet = self.buffer.slice(0, match);
self.buffer = self.buffer.slice(match);
done = true;
} else {
const len = self.buffer.length - eof.length;
if (len <= 0) {
cb();
self.match = match;
if (includeEof) match = match + eof.length;
packet = self.buffer.slice(0, match);
self.buffer = self.buffer.slice(match);
done = true;
} else {
packet = self.buffer.slice(0, len);
self.buffer = self.buffer.slice(len);
const len = self.buffer.length - eof.length;
if (len <= 0) {
cb();
} else {
packet = self.buffer.slice(0, len);
self.buffer = self.buffer.slice(len);
}
}
}
if (packet) p.write(packet, function() {
if (self.buffer.length === 0 || (eof.length && self.buffer.length <= eof.length)) cb();
});
}
if (packet) p.write(packet, function() {
if (self.buffer.length === 0 || (eof.length && self.buffer.length <= eof.length)) cb();
});
}

if (!done) {
if (self.finished) {
if (!done) {
if (self.finished) {
self.removeListener('chunk', pull);
self.emit('error', new Error('FILE_ENDED'));
return;
}

} else {
self.removeListener('chunk', pull);
self.emit('error', new Error('FILE_ENDED'));
return;
p.end();
}

} else {
self.removeListener('chunk', pull);
p.end();
}
}

self.on('chunk', pull);
pull();
return p;
};

PullStream.prototype.pull = function(eof, includeEof) {
if (eof === 0) return Promise.resolve('');
self.on('chunk', pull);
pull();
return p;
};

// If we already have the required data in buffer
// we can resolve the request immediately
if (!isNaN(eof) && this.buffer.length > eof) {
const data = this.buffer.slice(0, eof);
this.buffer = this.buffer.slice(eof);
return Promise.resolve(data);
}
pull(eof, includeEof) {
if (eof === 0) return Promise.resolve('');

// Otherwise we stream until we have it
let buffer = Buffer.from('');
const self = this;
// If we already have the required data in buffer
// we can resolve the request immediately
if (!isNaN(eof) && this.buffer.length > eof) {
const data = this.buffer.slice(0, eof);
this.buffer = this.buffer.slice(eof);
return Promise.resolve(data);
}

const concatStream = new Stream.Transform();
concatStream._transform = function(d, e, cb) {
buffer = Buffer.concat([buffer, d]);
cb();
};
// Otherwise we stream until we have it
let buffer = Buffer.from('');
const self = this;

let rejectHandler;
let pullStreamRejectHandler;
return new Promise(function(resolve, reject) {
rejectHandler = reject;
pullStreamRejectHandler = function(e) {
self.__emittedError = e;
reject(e);
const concatStream = new Stream.Transform();
concatStream._transform = function(d, e, cb) {
buffer = Buffer.concat([buffer, d]);
cb();
};
if (self.finished)
return reject(new Error('FILE_ENDED'));
self.once('error', pullStreamRejectHandler); // reject any errors from pullstream itself
self.stream(eof, includeEof)
.on('error', reject)
.pipe(concatStream)
.on('finish', function() {resolve(buffer);})
.on('error', reject);
})
.finally(function() {
self.removeListener('error', rejectHandler);
self.removeListener('error', pullStreamRejectHandler);
});
};

PullStream.prototype._read = function(){};
let rejectHandler;
let pullStreamRejectHandler;
return new Promise(function(resolve, reject) {
rejectHandler = reject;
pullStreamRejectHandler = function(e) {
self.__emittedError = e;
reject(e);
};
if (self.finished)
return reject(new Error('FILE_ENDED'));
self.once('error', pullStreamRejectHandler); // reject any errors from pullstream itself
self.stream(eof, includeEof)
.on('error', reject)
.pipe(concatStream)
.on('finish', function() {resolve(buffer);})
.on('error', reject);
})
.finally(function() {
self.removeListener('error', rejectHandler);
self.removeListener('error', pullStreamRejectHandler);
});
};
_read(){};
}

module.exports = PullStream;
Loading

0 comments on commit 9ef3ade

Please sign in to comment.