Skip to content

Commit

Permalink
feat(fromNodeStream): adds fromNodeStream readable (#124)
Browse files Browse the repository at this point in the history
* feat(fromNodeStream): adds fromNodeStream readable

* feat(fromNodeStream): add add* file

* build(tsickle): update and patch tsickle to ignore our jsdoc comments

* refactor(ix): use dot notation to silence tsickle warnings

* feat(fromNodeStream): add unit tests

* refactor(fromNodeStream): remove fromNodeStream from Ix export/UMD bundle
  • Loading branch information
mattpodwysocki authored Nov 2, 2017
1 parent 11f862c commit 952509e
Show file tree
Hide file tree
Showing 19 changed files with 192 additions and 19 deletions.
15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"commitmsg": "validate-commit-msg",
"lint:src": "tslint --fix --project -p tsconfig.json -c tslint.json \"src/**/*.ts\"",
"lint:spec": "tslint --fix --project -p spec/tsconfig.json -c tslint.json \"spec/**/*.ts\"",
"prepublishOnly": "echo \"Error: do 'npm run release' instead of 'npm publish'\" && exit 1"
"prepublishOnly": "echo \"Error: do 'npm run release' instead of 'npm publish'\" && exit 1",
"postinstall": "git apply --ignore-whitespace patches/tsickle+0.24.1.patch"
},
"author": "Matthew Podwysocki <[email protected]>",
"homepage": "https://github.com/ReactiveX/IxJS#readme",
Expand All @@ -42,11 +43,11 @@
"CHANGELOG.md"
],
"dependencies": {
"tslib": "^1.7.1"
"tslib": "^1.8.0"
},
"devDependencies": {
"@types/node": "8.0.47",
"@std/esm": "0.12.5",
"@types/node": "8.0.47",
"@types/tape": "4.2.31",
"chalk": "2.3.0",
"command-line-args": "4.0.7",
Expand All @@ -61,7 +62,7 @@
"esdoc-standard-plugin": "1.0.0",
"google-closure-compiler": "20171023.0.1",
"gulp": "github:gulpjs/gulp#4.0",
"gulp-json-transform": "0.4.3",
"gulp-json-transform": "0.4.5",
"gulp-rename": "1.2.2",
"gulp-sourcemaps": "2.6.1",
"gulp-typescript": "3.2.3",
Expand All @@ -84,13 +85,13 @@
"tape-async": "2.3.0",
"trash": "4.1.0",
"ts-node": "3.3.0",
"tsickle": "0.23.3",
"tsickle": "0.24.1",
"tslint": "5.8.0",
"tsutils": "2.12.1",
"typescript": "2.5.3",
"uglifyjs-webpack-plugin": "1.0.0",
"uglifyjs-webpack-plugin": "1.0.1",
"validate-commit-msg": "2.14.0",
"webpack": "3.8.0"
"webpack": "3.8.1"
},
"config": {
"commitizen": {
Expand Down
12 changes: 12 additions & 0 deletions patches/tsickle+0.24.1.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
diff --git a/node_modules/tsickle/built/src/jsdoc.js b/node_modules/tsickle/built/src/jsdoc.js
index c29b669..785dee7 100644
--- a/node_modules/tsickle/built/src/jsdoc.js
+++ b/node_modules/tsickle/built/src/jsdoc.js
@@ -111,6 +111,7 @@ var JSDOC_TAGS_WITH_TYPES = new Set([
// others (e.g. @suppress). We should introduce a proper model class with a more suitable data
// strucure (e.g. a Map<TagName, Values[]>).
function parse(comment) {
+ return null;
// Make sure we have proper line endings before parsing on Windows.
comment = util_1.normalizeLineEndings(comment);
// TODO(evanm): this is a pile of hacky regexes for now, because we
2 changes: 2 additions & 0 deletions spec/Ix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import { iterable as iterable_ } from '../src/Ix.internal';
import { iterablePipe as iterablePipe_ } from '../src/Ix.internal';
import { asynciterable as asynciterable_ } from '../src/Ix.internal';
import { asynciterablePipe as asynciterablePipe_ } from '../src/Ix.internal';
import { fromNodeStream as fromNodeStream_ } from '../src/Ix.internal';

export let Iterable: typeof Iterable_ = Ix.Iterable;
export let AsyncSink: typeof AsyncSink_ = Ix.AsyncSink;
Expand All @@ -49,3 +50,4 @@ export let iterable: typeof iterable_ = IxInternal.iterable;
export let iterablePipe: typeof iterablePipe_ = IxInternal.iterablePipe;
export let asynciterable: typeof asynciterable_ = IxInternal.asynciterable;
export let asynciterablePipe: typeof asynciterablePipe_ = IxInternal.asynciterablePipe;
export let fromNodeStream: typeof fromNodeStream_ = IxInternal.fromNodeStream;
48 changes: 48 additions & 0 deletions spec/asynciterable/fromnodestream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import * as Ix from '../Ix';
import * as test from 'tape-async';
const { fromNodeStream } = Ix;
import { Readable, ReadableOptions } from 'stream';

class Counter extends Readable {
private _index: number;
private _max: number;

constructor(options?: ReadableOptions) {
super(options);
this._max = 3;
this._index = 1;
}

_read() {
const i = this._index++;
if (i > this._max) {
this.push(null);
} else {
const buf = Buffer.from(`${i}`, 'ascii');
this.push(buf);
}
}
}

test('AsyncIterable#fromNodeStream with readable', async t => {
const c = new Counter();
const xs = fromNodeStream(c);

const it = xs[Symbol.asyncIterator]();
let next = await it.next();
t.false(next.done);
t.equal((next.value as Buffer).compare(new Buffer('1', 'ascii')), 0);

next = await it.next();
t.false(next.done);
t.equal((next.value as Buffer).compare(new Buffer('2', 'ascii')), 0);

next = await it.next();
t.false(next.done);
t.equal((next.value as Buffer).compare(new Buffer('3', 'ascii')), 0);

next = await it.next();
t.true(next.done);

t.end();
});
3 changes: 3 additions & 0 deletions src/Ix.internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ try {
/* not the UMD bundle */
}
/** end google declarations */

import './add/asynciterable/fromnodestream';
export { fromNodeStream } from './asynciterable/fromnodestream';
2 changes: 1 addition & 1 deletion src/add/asynciterable/case.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _case as caseStatic } from '../../asynciterable/case';

AsyncIterableX['case'] = caseStatic;
AsyncIterableX.case = caseStatic;

export declare namespace asynciterable {
let _case: typeof caseStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/asynciterable/catch.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _catchStatic as catchStatic } from '../../asynciterable/catch';

AsyncIterableX['catch'] = catchStatic;
AsyncIterableX.catch = catchStatic;

export declare namespace asynciterable {
let _catchStatic: typeof catchStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/asynciterable/for.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _for as forStatic } from '../../asynciterable/for';

AsyncIterableX['for'] = forStatic;
AsyncIterableX.for = forStatic;

export declare namespace asynciterable {
let _for: typeof forStatic;
Expand Down
8 changes: 8 additions & 0 deletions src/add/asynciterable/fromnodestream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { fromNodeStream as fromNodeStreamStatic } from '../../asynciterable/fromnodestream';

AsyncIterableX.fromNodeStream = fromNodeStreamStatic;

declare module '../../asynciterable/asynciterablex' {
namespace AsyncIterableX { export let fromNodeStream: typeof fromNodeStreamStatic; }
}
2 changes: 1 addition & 1 deletion src/add/asynciterable/if.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _if as ifStatic } from '../../asynciterable/if';

AsyncIterableX['if'] = ifStatic;
AsyncIterableX.if = ifStatic;

export declare namespace asynciterable {
let _if: typeof ifStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/asynciterable/throw.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _throw as throwStatic } from '../../asynciterable/throw';

AsyncIterableX['throw'] = throwStatic;
AsyncIterableX.throw = throwStatic;

export declare namespace asynciterable {
let _throw: typeof throwStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/asynciterable/while.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _while as whileStatic } from '../../asynciterable/while';

AsyncIterableX['while'] = whileStatic;
AsyncIterableX.while = whileStatic;

export declare namespace asynciterable {
let _while: typeof whileStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/case.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _case as caseStatic } from '../../iterable/case';

IterableX['case'] = caseStatic;
IterableX.case = caseStatic;

export declare namespace iterable {
let _case: typeof caseStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/catch.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _catchStatic as catchStatic } from '../../iterable/catch';

IterableX['catch'] = catchStatic;
IterableX.catch = catchStatic;

export declare namespace iterable {
let _catchStatic: typeof catchStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/for.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _for as forStatic } from '../../iterable/for';

IterableX['for'] = forStatic;
IterableX.for = forStatic;

export declare namespace iterable {
let _for: typeof forStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/if.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _if as ifStatic } from '../../iterable/if';

IterableX['if'] = ifStatic;
IterableX.if = ifStatic;

export declare namespace iterable {
let _if: typeof ifStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/throw.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _throw as throwStatic } from '../../iterable/throw';

IterableX['throw'] = throwStatic;
IterableX.throw = throwStatic;

export declare namespace iterable {
let _throw: typeof throwStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/while.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _while as whileStatic } from '../../iterable/while';

IterableX['while'] = whileStatic;
IterableX.while = whileStatic;

export declare namespace iterable {
let _while: typeof whileStatic;
Expand Down
99 changes: 99 additions & 0 deletions src/asynciterable/fromnodestream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { Readable } from 'stream';
import { AsyncIterableX } from './asynciterablex';

enum StreamState {
NonFlowing,
Readable,
Ended,
Errored
}

class ReadableStreamAsyncIterable extends AsyncIterableX<string | Buffer>
implements AsyncIterator<string | Buffer> {
private _stream: Readable;
private _size?: number;
private _state: StreamState;
private _error: any;
private _rejectFns: Set<(err: any) => void>;

constructor(stream: Readable, size?: number) {
super();
this._stream = stream;
this._size = size;
this._state = StreamState.NonFlowing;
this._error = null;
this._rejectFns = new Set<(err: any) => void>();

const onError = (err: any) => {
this._state = StreamState.Errored;
this._error = err;
for (const rejectFn of this._rejectFns) {
rejectFn(err);
}
};

const onEnd = () => {
this._state = StreamState.Ended;
};

this._stream.once('error', onError);
this._stream.once('end', onEnd);
}

[Symbol.asyncIterator](): AsyncIterator<string | Buffer> {
return this;
}

async next(): Promise<IteratorResult<string | Buffer>> {
if (this._state === StreamState.NonFlowing) {
await Promise.race([this._waitReadable(), this._waitEnd()]);
return this.next();
}

if (this._state === StreamState.Ended) {
return ({ done: true, value: undefined } as any) as IteratorResult<string | Buffer>;
}

if (this._state === StreamState.Errored) {
throw this._error;
}

const read = this._stream.read(this._size);
if (read !== null) {
return { done: false, value: <string | Buffer>read };
} else {
this._state = StreamState.NonFlowing;
return this.next();
}
}

private _waitReadable(): Promise<void> {
return new Promise<void>((resolve, reject) => {
const onReadable = () => {
this._state = StreamState.Readable;
this._rejectFns.delete(reject);
resolve();
};

this._rejectFns.add(reject);
this._stream.once('readable', onReadable);
});
}

private _waitEnd(): Promise<void> {
return new Promise<void>((resolve, reject) => {
const onEnd = () => {
this._state = StreamState.Ended;
this._rejectFns.delete(reject);
resolve();
};

this._rejectFns.add(reject);
this._stream.once('end', onEnd);
});
}
}

export function fromNodeStream(stream: Readable, size?: number): AsyncIterableX<string | Buffer> {
return new ReadableStreamAsyncIterable(stream, size);
}

0 comments on commit 952509e

Please sign in to comment.