Skip to content

Commit

Permalink
Fix pipe typings, add AsyncIterable fromDOMStream and toDOMStream imp…
Browse files Browse the repository at this point in the history
…lementations (#266)

* test(from-spec): fix Observable typings in from-spec

* fix(asynciterable-pipe): fix asynciterable pipe typings

fix #265

* feat(asynciterable-toDOMStream): add AsyncIterable#toDOMStream implementation

* feat(asynciterable-fromDOMStream): add AsyncIterable.fromDOMStream implementation

* chore(cleanup): remove dead comment, update dependencies

* fix(asynciterable-toNodeStream): the iterator should complete so the tests can pass
  • Loading branch information
trxcllnt authored and mattpodwysocki committed Mar 4, 2019
1 parent 9d66807 commit 2e00c95
Show file tree
Hide file tree
Showing 19 changed files with 738 additions and 229 deletions.
11 changes: 5 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,11 @@
"CHANGELOG.md"
],
"dependencies": {
"@types/node": "^10.12.18",
"is-stream": "1.1.0",
"@types/node": "^11.10.4",
"tslib": "^1.9.3"
},
"devDependencies": {
"@mattiasbuelens/web-streams-polyfill": "0.2.1",
"@mattiasbuelens/web-streams-polyfill": "0.3.1",
"@types/glob": "7.1.1",
"@types/jest": "23.3.11",
"async-done": "1.3.1",
Expand Down Expand Up @@ -79,7 +78,7 @@
"jest": "23.6.0",
"jest-codemods": "0.19.1",
"jest-environment-node-debug": "2.0.0",
"jest-silent-reporter": "0.1.1",
"jest-silent-reporter": "0.1.2",
"json": "9.0.6",
"lerna": "3.8.4",
"lint-staged": "8.1.0",
Expand All @@ -93,10 +92,10 @@
"source-map-loader": "0.2.4",
"terser-webpack-plugin": "1.2.1",
"ts-jest": "23.10.5",
"ts-node": "7.0.1",
"ts-node": "8.0.2",
"tslint": "5.12.0",
"typedoc": "0.13.0",
"typescript": "3.2.2",
"typescript": "3.3.3333",
"validate-commit-msg": "2.14.0",
"web-stream-tools": "0.0.1",
"webpack": "4.28.3"
Expand Down
15 changes: 15 additions & 0 deletions spec/Ix.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
import '@mattiasbuelens/web-streams-polyfill';

/* tslint:disable */
// import this before assigning window global since it does a `typeof window` check
require('web-stream-tools');

/* tslint:disable */
(<any>global).window = (<any>global).window || global;

// Fix for Jest in node v10.x
Object.defineProperty(ArrayBuffer, Symbol.hasInstance, {
writable: true,
configurable: true,
value(inst: any) {
return inst && inst.constructor && inst.constructor.name === 'ArrayBuffer';
}
});

// Require rxjs first so we pick up its polyfilled Symbol.observable
require('rxjs/symbol/observable');

Expand Down
82 changes: 82 additions & 0 deletions spec/asynciterable-operators/todomstream-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import '../asynciterablehelpers';
import { AsyncIterable, toDOMStream } from '../Ix';

(() => {
if (!toDOMStream || process.env.TEST_DOM_STREAMS !== 'true') {
return test('not testing node streams because process.env.TEST_DOM_STREAMS !== "true"', () => {
/**/
});
}

const stringsItr = () => AsyncIterable.from([1, 2, 3]).map(i => `${i}`);
const buffersItr = () => stringsItr().map(val => Buffer.from(val));
const objectsItr = () => stringsItr().map(val => ({ val }));
const compare = <T>(a: T, b: T) => {
let aVal = ArrayBuffer.isView(a) ? `${Buffer.from(a.buffer, a.byteOffset, a.byteLength)}` : a;
let bVal = ArrayBuffer.isView(b) ? `${Buffer.from(b.buffer, b.byteOffset, b.byteLength)}` : b;
// poor man's deep-equals
try {
expect(aVal).toEqual(bVal);
} catch (e) {
return false;
}
return true;
};

describe(`AsyncIterable#toDOMStream`, () => {
describe(`DefaultController`, () => {
const expectedStrings = ['1', '2', '3'];
const expectedObjects = expectedStrings.map(val => ({ val }));
const expectedBuffers = expectedStrings.map(x => Buffer.from(x));
test(`yields Strings`, async () => {
const expected = AsyncIterable.from(expectedStrings);
const actual = stringsItr().toDOMStream();
await expect(actual).toEqualStream(expected, compare);
});
test(`yields Buffers`, async () => {
const expected = AsyncIterable.from(expectedBuffers);
const actual = buffersItr().toDOMStream();
await expect(actual).toEqualStream(expected, compare);
});
test(`yields Objects`, async () => {
const expected = AsyncIterable.from(expectedObjects);
const actual = objectsItr().toDOMStream();
await expect(actual).toEqualStream(expected, compare);
});
});

describe(`ReadableByteStreamController (byobRequest)`, () => {
const expectedStrings = ['123'];
const expectedBuffers = expectedStrings.map(x => Buffer.from(x));
test(`yields Strings`, async () => {
const expected = AsyncIterable.from(expectedBuffers);
const actual = stringsItr()
.map(x => Buffer.from(x))
.toDOMStream({ type: 'bytes' });
await expect(actual).toEqualStream(expected, compare);
});
test(`yields Buffers`, async () => {
const expected = AsyncIterable.from(expectedBuffers);
const actual = buffersItr().toDOMStream({ type: 'bytes' });
await expect(actual).toEqualStream(expected, compare);
});
});

describe(`ReadableByteStreamController (autoAllocateChunkSize)`, () => {
const expectedStrings = ['123'];
const expectedBuffers = expectedStrings.map(x => Buffer.from(x));
test(`yields Strings`, async () => {
const expected = AsyncIterable.from(expectedBuffers);
const actual = stringsItr()
.map(x => Buffer.from(x))
.toDOMStream({ type: 'bytes', autoAllocateChunkSize: 1024 });
await expect(actual).toEqualStream(expected, compare);
});
test(`yields Buffers`, async () => {
const expected = AsyncIterable.from(expectedBuffers);
const actual = buffersItr().toDOMStream({ type: 'bytes', autoAllocateChunkSize: 1024 });
await expect(actual).toEqualStream(expected, compare);
});
});
});
})();
11 changes: 8 additions & 3 deletions spec/asynciterable/from-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import * as Ix from '../Ix';
const { from } = Ix.AsyncIterable;
import { hasNext, noNext } from '../asynciterablehelpers';
import { hasNext, noNext, toObserver } from '../asynciterablehelpers';
import { setInterval, clearInterval } from 'timers';
import { PartialObserver } from '../../src/observer';

test('AsyncIterable#from from promise list', async () => {
const xs: Iterable<Promise<number>> = [
Expand Down Expand Up @@ -153,8 +154,12 @@ class TestObservable<T> implements Observable<T> {
this._subscribe = subscribe;
}

subscribe(observer: Observer<T>) {
return this._subscribe(observer);
subscribe(
next?: PartialObserver<T> | ((value: T) => void) | null,
error?: ((err: any) => void) | null,
complete?: (() => void) | null
) {
return this._subscribe(toObserver(next, error, complete));
}
}

Expand Down
57 changes: 57 additions & 0 deletions spec/asynciterable/fromdomstream-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import '../asynciterablehelpers';
import { Readable, ReadableOptions } from 'stream';
import { fromDOMStream, AsyncIterable } from '../Ix';

(() => {
if (!fromDOMStream || process.env.TEST_DOM_STREAMS !== 'true') {
return test('not testing node streams because process.env.TEST_DOM_STREAMS !== "true"', () => {
/**/
});
}

/* tslint:disable */
const { toStream } = require('web-stream-tools').default;

class Counter extends Readable {
private _index: number;
private _max: number;

constructor(options?: ReadableOptions) {
super(options);
this._max = 3;
this._index = 0;
}

_read() {
this.push(++this._index > this._max ? null : `${this._index}`);
}
}

const compare = <T>(a: T, b: T) => {
let aVal = ArrayBuffer.isView(a) ? `${Buffer.from(a.buffer, a.byteOffset, a.byteLength)}` : a;
let bVal = ArrayBuffer.isView(b) ? `${Buffer.from(b.buffer, b.byteOffset, b.byteLength)}` : b;
// poor man's deep-equals
try {
expect(aVal).toEqual(bVal);
} catch (e) {
return false;
}
return true;
};

describe(`AsyncIterable#fromDOMStream`, () => {
test('objectMode: true', async () => {
const c = toStream(new Counter({ objectMode: true }));
const xs = fromDOMStream(c) as AsyncIterable<string>;
const expected = AsyncIterable.from(['1', '2', '3']);
await expect(xs).toEqualStream(expected, compare);
});

test('objectMode: false', async () => {
const c = toStream(new Counter({ objectMode: false }));
const xs = fromDOMStream(c) as AsyncIterable<Buffer>;
const expected = AsyncIterable.from(['1', '2', '3'].map(s => Buffer.from(s)));
await expect(xs).toEqualStream(expected, compare);
});
});
})();
35 changes: 34 additions & 1 deletion spec/asynciterablehelpers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as Ix from './Ix';
import { Observer, PartialObserver } from '../src/observer';

export async function hasNext<T>(source: AsyncIterator<T>, expected: T) {
const { done, value } = await source.next();
Expand Down Expand Up @@ -65,6 +66,31 @@ function cast(source: any): any {
return source instanceof Ix.AsyncIterable ? source : Ix.AsyncIterable.as(source);
}

const noop = (_?: any) => {
/**/
};

export function toObserver<T>(
next?: PartialObserver<T> | ((value: T) => void) | null,
error?: ((err: any) => void) | null,
complete?: (() => void) | null
): Observer<T> {
if (next && typeof next === 'object') {
const observer = <any>next;
return {
next: (observer.next || noop).bind(observer),
error: (observer.error || noop).bind(observer),
complete: (observer.complete || noop).bind(observer)
};
} else {
return {
next: typeof next === 'function' ? next : noop,
error: typeof error === 'function' ? error : noop,
complete: typeof complete === 'function' ? complete : noop
};
}
}

declare global {
namespace jest {
interface Matchers<R> {
Expand Down Expand Up @@ -102,7 +128,7 @@ expect.extend({
const it1 = Ix.AsyncIterable.from(expected)[Symbol.asyncIterator]();
while (!(next1 = await it1.next()).done) {
expectedCount++;
if (!(next2 = await it2.next()).done) {
if (!(next2 = await it2.next(getValueByteLength(next1.value))).done) {
actualCount++;
if (!(await comparer(next1.value, next2.value))) {
results.push(
Expand All @@ -123,3 +149,10 @@ expect.extend({
return { pass: results.length === 0, message: () => results.join('\n') };
}
});

function getValueByteLength(value: any) {
if (value && value.buffer instanceof ArrayBuffer) {
return value.byteLength;
}
return undefined;
}
6 changes: 3 additions & 3 deletions spec/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* tslint:disable */
process.on('unhandledRejection', error => {
process.on('unhandledRejection', (error: any) => {
// Won't execute
console.log('unhandledRejection', error.test);
});
Expand All @@ -8,5 +8,5 @@ const cwd = process.cwd();
const resolve = require('path').resolve;
require('glob')(`./spec/**/*-spec.ts`, (err: Error, files: string[]) => {
if (err) throw err;
files.forEach((file) => require(resolve(cwd, file)));
});
files.forEach(file => require(resolve(cwd, file)));
});
77 changes: 8 additions & 69 deletions src/Ix.dom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,72 +16,11 @@ export { OrderedAsyncIterable } from './Ix';
export { OrderedAsyncIterableBase } from './Ix';
export { AsyncSink, Iterable, AsyncIterable } from './Ix';

import { OperatorAsyncFunction } from './interfaces';

declare module './asynciterable/asynciterablex' {
interface AsyncIterableX<T> {
pipe(): AsyncIterableX<T>;
pipe<A>(op1: OperatorAsyncFunction<T, A>): AsyncIterableX<A>;
pipe<A, B>(
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>
): AsyncIterableX<B>;
pipe<A, B, C>(
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>
): AsyncIterableX<C>;
pipe<A, B, C, D>(
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>
): AsyncIterableX<D>;
pipe<A, B, C, D, E>(
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>,
op5: OperatorAsyncFunction<D, E>
): AsyncIterableX<E>;
pipe<A, B, C, D, E, F>(
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>,
op5: OperatorAsyncFunction<D, E>,
op6: OperatorAsyncFunction<E, F>
): AsyncIterableX<F>;
pipe<A, B, C, D, E, F, G>(
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>,
op5: OperatorAsyncFunction<D, E>,
op6: OperatorAsyncFunction<E, F>,
op7: OperatorAsyncFunction<F, G>
): AsyncIterableX<G>;
pipe<A, B, C, D, E, F, G, H>(
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>,
op5: OperatorAsyncFunction<D, E>,
op6: OperatorAsyncFunction<E, F>,
op7: OperatorAsyncFunction<F, G>,
op8: OperatorAsyncFunction<G, H>
): AsyncIterableX<H>;
pipe<A, B, C, D, E, F, G, H, I>(
op1: OperatorAsyncFunction<T, A>,
op2: OperatorAsyncFunction<A, B>,
op3: OperatorAsyncFunction<B, C>,
op4: OperatorAsyncFunction<C, D>,
op5: OperatorAsyncFunction<D, E>,
op6: OperatorAsyncFunction<E, F>,
op7: OperatorAsyncFunction<F, G>,
op8: OperatorAsyncFunction<G, H>,
op9: OperatorAsyncFunction<H, I>
): AsyncIterableX<I>;
pipe<R>(...operations: OperatorAsyncFunction<T, R>[]): AsyncIterableX<R>;
}
}
import './add/asynciterable/fromdomstream';
import './add/asynciterable-operators/todomstream';
export { toDOMStream } from './asynciterable/todomstream';
export {
fromDOMStream,
AsyncIterableReadableStream,
AsyncIterableReadableByteStream
} from './asynciterable/fromdomstream';
Loading

0 comments on commit 2e00c95

Please sign in to comment.