Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fromNodeStream): adds fromNodeStream readable #124

Merged
merged 7 commits into from
Nov 2, 2017
Merged
15 changes: 8 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"commitmsg": "validate-commit-msg",
"lint:src": "tslint --fix --project -p tsconfig.json -c tslint.json \"src/**/*.ts\"",
"lint:spec": "tslint --fix --project -p spec/tsconfig.json -c tslint.json \"spec/**/*.ts\"",
"prepublishOnly": "echo \"Error: do 'npm run release' instead of 'npm publish'\" && exit 1"
"prepublishOnly": "echo \"Error: do 'npm run release' instead of 'npm publish'\" && exit 1",
"postinstall": "git apply --ignore-whitespace patches/tsickle+0.24.1.patch"
},
"author": "Matthew Podwysocki <[email protected]>",
"homepage": "https://github.com/ReactiveX/IxJS#readme",
Expand All @@ -42,11 +43,11 @@
"CHANGELOG.md"
],
"dependencies": {
"tslib": "^1.7.1"
"tslib": "^1.8.0"
},
"devDependencies": {
"@types/node": "8.0.47",
"@std/esm": "0.12.5",
"@types/node": "8.0.47",
"@types/tape": "4.2.31",
"chalk": "2.3.0",
"command-line-args": "4.0.7",
Expand All @@ -61,7 +62,7 @@
"esdoc-standard-plugin": "1.0.0",
"google-closure-compiler": "20171023.0.1",
"gulp": "github:gulpjs/gulp#4.0",
"gulp-json-transform": "0.4.3",
"gulp-json-transform": "0.4.5",
"gulp-rename": "1.2.2",
"gulp-sourcemaps": "2.6.1",
"gulp-typescript": "3.2.3",
Expand All @@ -84,13 +85,13 @@
"tape-async": "2.3.0",
"trash": "4.1.0",
"ts-node": "3.3.0",
"tsickle": "0.23.3",
"tsickle": "0.24.1",
"tslint": "5.8.0",
"tsutils": "2.12.1",
"typescript": "2.5.3",
"uglifyjs-webpack-plugin": "1.0.0",
"uglifyjs-webpack-plugin": "1.0.1",
"validate-commit-msg": "2.14.0",
"webpack": "3.8.0"
"webpack": "3.8.1"
},
"config": {
"commitizen": {
Expand Down
12 changes: 12 additions & 0 deletions patches/tsickle+0.24.1.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
diff --git a/node_modules/tsickle/built/src/jsdoc.js b/node_modules/tsickle/built/src/jsdoc.js
index c29b669..785dee7 100644
--- a/node_modules/tsickle/built/src/jsdoc.js
+++ b/node_modules/tsickle/built/src/jsdoc.js
@@ -111,6 +111,7 @@ var JSDOC_TAGS_WITH_TYPES = new Set([
// others (e.g. @suppress). We should introduce a proper model class with a more suitable data
// strucure (e.g. a Map<TagName, Values[]>).
function parse(comment) {
+ return null;
// Make sure we have proper line endings before parsing on Windows.
comment = util_1.normalizeLineEndings(comment);
// TODO(evanm): this is a pile of hacky regexes for now, because we
2 changes: 2 additions & 0 deletions spec/Ix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import { iterable as iterable_ } from '../src/Ix.internal';
import { iterablePipe as iterablePipe_ } from '../src/Ix.internal';
import { asynciterable as asynciterable_ } from '../src/Ix.internal';
import { asynciterablePipe as asynciterablePipe_ } from '../src/Ix.internal';
import { fromNodeStream as fromNodeStream_ } from '../src/Ix.internal';

export let Iterable: typeof Iterable_ = Ix.Iterable;
export let AsyncSink: typeof AsyncSink_ = Ix.AsyncSink;
Expand All @@ -49,3 +50,4 @@ export let iterable: typeof iterable_ = IxInternal.iterable;
export let iterablePipe: typeof iterablePipe_ = IxInternal.iterablePipe;
export let asynciterable: typeof asynciterable_ = IxInternal.asynciterable;
export let asynciterablePipe: typeof asynciterablePipe_ = IxInternal.asynciterablePipe;
export let fromNodeStream: typeof fromNodeStream_ = IxInternal.fromNodeStream;
48 changes: 48 additions & 0 deletions spec/asynciterable/fromnodestream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import * as Ix from '../Ix';
import * as test from 'tape-async';
const { fromNodeStream } = Ix;
import { Readable, ReadableOptions } from 'stream';

class Counter extends Readable {
private _index: number;
private _max: number;

constructor(options?: ReadableOptions) {
super(options);
this._max = 3;
this._index = 1;
}

_read() {
const i = this._index++;
if (i > this._max) {
this.push(null);
} else {
const buf = Buffer.from(`${i}`, 'ascii');
this.push(buf);
}
}
}

test('AsyncIterable#fromNodeStream with readable', async t => {
const c = new Counter();
const xs = fromNodeStream(c);

const it = xs[Symbol.asyncIterator]();
let next = await it.next();
t.false(next.done);
t.equal((next.value as Buffer).compare(new Buffer('1', 'ascii')), 0);

next = await it.next();
t.false(next.done);
t.equal((next.value as Buffer).compare(new Buffer('2', 'ascii')), 0);

next = await it.next();
t.false(next.done);
t.equal((next.value as Buffer).compare(new Buffer('3', 'ascii')), 0);

next = await it.next();
t.true(next.done);

t.end();
});
3 changes: 3 additions & 0 deletions src/Ix.internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ try {
/* not the UMD bundle */
}
/** end google declarations */

import './add/asynciterable/fromnodestream';
export { fromNodeStream } from './asynciterable/fromnodestream';
2 changes: 1 addition & 1 deletion src/add/asynciterable/case.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _case as caseStatic } from '../../asynciterable/case';

AsyncIterableX['case'] = caseStatic;
AsyncIterableX.case = caseStatic;

export declare namespace asynciterable {
let _case: typeof caseStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/asynciterable/catch.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _catchStatic as catchStatic } from '../../asynciterable/catch';

AsyncIterableX['catch'] = catchStatic;
AsyncIterableX.catch = catchStatic;

export declare namespace asynciterable {
let _catchStatic: typeof catchStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/asynciterable/for.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _for as forStatic } from '../../asynciterable/for';

AsyncIterableX['for'] = forStatic;
AsyncIterableX.for = forStatic;

export declare namespace asynciterable {
let _for: typeof forStatic;
Expand Down
8 changes: 8 additions & 0 deletions src/add/asynciterable/fromnodestream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { fromNodeStream as fromNodeStreamStatic } from '../../asynciterable/fromnodestream';

AsyncIterableX.fromNodeStream = fromNodeStreamStatic;

declare module '../../asynciterable/asynciterablex' {
namespace AsyncIterableX { export let fromNodeStream: typeof fromNodeStreamStatic; }
}
2 changes: 1 addition & 1 deletion src/add/asynciterable/if.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _if as ifStatic } from '../../asynciterable/if';

AsyncIterableX['if'] = ifStatic;
AsyncIterableX.if = ifStatic;

export declare namespace asynciterable {
let _if: typeof ifStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/asynciterable/throw.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _throw as throwStatic } from '../../asynciterable/throw';

AsyncIterableX['throw'] = throwStatic;
AsyncIterableX.throw = throwStatic;

export declare namespace asynciterable {
let _throw: typeof throwStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/asynciterable/while.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncIterableX } from '../../asynciterable/asynciterablex';
import { _while as whileStatic } from '../../asynciterable/while';

AsyncIterableX['while'] = whileStatic;
AsyncIterableX.while = whileStatic;

export declare namespace asynciterable {
let _while: typeof whileStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/case.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _case as caseStatic } from '../../iterable/case';

IterableX['case'] = caseStatic;
IterableX.case = caseStatic;

export declare namespace iterable {
let _case: typeof caseStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/catch.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _catchStatic as catchStatic } from '../../iterable/catch';

IterableX['catch'] = catchStatic;
IterableX.catch = catchStatic;

export declare namespace iterable {
let _catchStatic: typeof catchStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/for.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _for as forStatic } from '../../iterable/for';

IterableX['for'] = forStatic;
IterableX.for = forStatic;

export declare namespace iterable {
let _for: typeof forStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/if.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _if as ifStatic } from '../../iterable/if';

IterableX['if'] = ifStatic;
IterableX.if = ifStatic;

export declare namespace iterable {
let _if: typeof ifStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/throw.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _throw as throwStatic } from '../../iterable/throw';

IterableX['throw'] = throwStatic;
IterableX.throw = throwStatic;

export declare namespace iterable {
let _throw: typeof throwStatic;
Expand Down
2 changes: 1 addition & 1 deletion src/add/iterable/while.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IterableX } from '../../iterable/iterablex';
import { _while as whileStatic } from '../../iterable/while';

IterableX['while'] = whileStatic;
IterableX.while = whileStatic;

export declare namespace iterable {
let _while: typeof whileStatic;
Expand Down
99 changes: 99 additions & 0 deletions src/asynciterable/fromnodestream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { Readable } from 'stream';
import { AsyncIterableX } from './asynciterablex';

enum StreamState {
NonFlowing,
Readable,
Ended,
Errored
}

class ReadableStreamAsyncIterable extends AsyncIterableX<string | Buffer>
implements AsyncIterator<string | Buffer> {
private _stream: Readable;
private _size?: number;
private _state: StreamState;
private _error: any;
private _rejectFns: Set<(err: any) => void>;

constructor(stream: Readable, size?: number) {
super();
this._stream = stream;
this._size = size;
this._state = StreamState.NonFlowing;
this._error = null;
this._rejectFns = new Set<(err: any) => void>();

const onError = (err: any) => {
this._state = StreamState.Errored;
this._error = err;
for (const rejectFn of this._rejectFns) {
rejectFn(err);
}
};

const onEnd = () => {
this._state = StreamState.Ended;
};

this._stream.once('error', onError);
this._stream.once('end', onEnd);
}

[Symbol.asyncIterator](): AsyncIterator<string | Buffer> {
return this;
}

async next(): Promise<IteratorResult<string | Buffer>> {
if (this._state === StreamState.NonFlowing) {
await Promise.race([this._waitReadable(), this._waitEnd()]);
return this.next();
}

if (this._state === StreamState.Ended) {
return ({ done: true, value: undefined } as any) as IteratorResult<string | Buffer>;
}

if (this._state === StreamState.Errored) {
throw this._error;
}

const read = this._stream.read(this._size);
if (read !== null) {
return { done: false, value: <string | Buffer>read };
} else {
this._state = StreamState.NonFlowing;
return this.next();
}
}

private _waitReadable(): Promise<void> {
return new Promise<void>((resolve, reject) => {
const onReadable = () => {
this._state = StreamState.Readable;
this._rejectFns.delete(reject);
resolve();
};

this._rejectFns.add(reject);
this._stream.once('readable', onReadable);
});
}

private _waitEnd(): Promise<void> {
return new Promise<void>((resolve, reject) => {
const onEnd = () => {
this._state = StreamState.Ended;
this._rejectFns.delete(reject);
resolve();
};

this._rejectFns.add(reject);
this._stream.once('end', onEnd);
});
}
}

export function fromNodeStream(stream: Readable, size?: number): AsyncIterableX<string | Buffer> {
return new ReadableStreamAsyncIterable(stream, size);
}