Skip to content

Commit

Permalink
feat(from): Collapse from to include Observable
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Jul 21, 2017
1 parent a92ac44 commit b130e9c
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 163 deletions.
98 changes: 98 additions & 0 deletions spec/asynciterable/from-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,101 @@ test('AsyncIterable#from from array-like with selector', async t => {
await noNext(t, it);
t.end();
});

test('AsyncIterable#from from promise', async t => {
const xs = Promise.resolve(42);
const res = from(xs);

const it = res[Symbol.asyncIterator]();
await hasNext(t, it, 42);
await noNext(t, it);
t.end();
});

test('AsyncIterable#from from promise with selector', async t => {
const xs = Promise.resolve(42);
const res = from(xs, (x, i) => x + i);

const it = res[Symbol.asyncIterator]();
await hasNext(t, it, 42);
await noNext(t, it);
t.end();
});

interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}

interface Subscription {
unsubscribe: () => void;
}

interface Observable<T> {
subscribe: (observer: Observer<T>) => Subscription;
}

class EmptySubscription implements Subscription {
unsubscribe() {
// tslint:disable-next-line:no-empty
}
}

class TestObservable<T> implements Observable<T> {
private _subscribe: (observer: Observer<T>) => Subscription;

constructor(subscribe: (observer: Observer<T>) => Subscription) {
this._subscribe = subscribe;
}

subscribe(observer: Observer<T>) {
return this._subscribe(observer);
}
}

test('AsyncIterable#fromObservable with completion', async t => {
const xs = new TestObservable<number>(obs => {
obs.next(42);
obs.complete();
return new EmptySubscription();
});
const ys = from(xs);

const it = ys[Symbol.asyncIterator]();
await hasNext(t, it, 42);
await noNext(t, it);
t.end();
});

test('AsyncIterable#fromObservable with completion', async t => {
const xs = new TestObservable<number>(obs => {
obs.next(42);
obs.complete();
return new EmptySubscription();
});
const ys = from(xs, (x, i) => x + i);

const it = ys[Symbol.asyncIterator]();
await hasNext(t, it, 42);
await noNext(t, it);
t.end();
});

test('AsyncIterable#fromObservable with error', async t => {
const err = new Error();
const xs = new TestObservable<number>(obs => {
obs.error(err);
return new EmptySubscription();
});
const ys = from(xs);

const it = ys[Symbol.asyncIterator]();
try {
await it.next();
} catch (e) {
t.same(err, e);
}

t.end();
});
68 changes: 0 additions & 68 deletions spec/asynciterable/fromobservable-spec.ts

This file was deleted.

10 changes: 0 additions & 10 deletions src/add/asynciterable/fromobservable.ts

This file was deleted.

1 change: 0 additions & 1 deletion src/asynciterable/__modules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ export { forEach } from './foreach';
export { from } from './from';
export { fromEvent } from './fromevent';
export { fromEventPattern } from './fromeventpattern';
export { fromObservable } from './fromobservable';
export { generate } from './generate';
export { generateTime } from './generatetime';
export { groupBy } from './groupby';
Expand Down
162 changes: 147 additions & 15 deletions src/asynciterable/from.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,171 @@ import { bindCallback } from '../internal/bindcallback';
import { identityAsync } from '../internal/identity';
import { toLength } from '../internal/tolength';
import { isIterable, isAsyncIterable } from '../internal/isiterable';
import { Observable } from '../observer';

class FromArrayIterable<TSource, TResult = TSource> extends AsyncIterableX<TResult> {
private _source: ArrayLike<TSource>;
private _selector: (value: TSource, index: number) => TResult | Promise<TResult>;

constructor(
source: ArrayLike<TSource>,
selector: (value: TSource, index: number) => TResult | Promise<TResult>) {
super();
this._source = source;
this._selector = selector;
}

async *[Symbol.asyncIterator]() {
let i = 0;
const length = toLength((<ArrayLike<TSource>>this._source).length);
while (i < length) {
yield await this._selector(this._source[i], i++);
}
}
}

class FromAsyncIterable<TSource, TResult = TSource> extends AsyncIterableX<TResult> {
private _source: Iterable<TSource | PromiseLike<TSource>> | AsyncIterable<TSource> | ArrayLike<TSource>;
private _source: Iterable<TSource | PromiseLike<TSource>> | AsyncIterable<TSource>;
private _selector: (value: TSource, index: number) => TResult | Promise<TResult>;

constructor(
source: Iterable<TSource | PromiseLike<TSource>> | AsyncIterable<TSource> | ArrayLike<TSource>,
source: Iterable<TSource | PromiseLike<TSource>> | AsyncIterable<TSource>,
selector: (value: TSource, index: number) => TResult | Promise<TResult>) {
super();
this._source = source;
this._selector = selector;
}

async *[Symbol.asyncIterator]() {
const iterable = isIterable(this._source) || isAsyncIterable(this._source);
let i = 0;
if (iterable) {
for await (let item of <AsyncIterable<TSource>>this._source) {
yield await this._selector(item, i++);
}
} else {
let length = toLength((<ArrayLike<TSource>>this._source).length);
while (i < length) {
let val = (<ArrayLike<TSource>>this._source)[i];
yield await this._selector(val, i++);
for await (let item of <AsyncIterable<TSource>>this._source) {
yield await this._selector(item, i++);
}
}
}

class FromPromiseIterable<TSource, TResult = TSource> extends AsyncIterableX<TResult> {
private _source: PromiseLike<TSource>;
private _selector: (value: TSource, index: number) => TResult | Promise<TResult>;

constructor(
source: PromiseLike<TSource>,
selector: (value: TSource, index: number) => TResult | Promise<TResult>) {
super();
this._source = source;
this._selector = selector;
}

async *[Symbol.asyncIterator]() {
const item = await this._source;
yield await this._selector(item, 0);
}
}

class AsyncObserver<TSource> {
public values: TSource[];
public hasError: boolean;
public hasCompleted: boolean;
public errorValue: any;
public closed: boolean;

constructor() {
this.values = [];
this.hasCompleted = false;
this.hasError = false;
this.errorValue = null;
this.closed = false;
}

next(value: TSource) {
if (!this.closed) {
this.values.push(value);
}
}

error(err: any) {
if (!this.closed) {
this.closed = true;
this.hasError = true;
this.errorValue = err;
}
}

complete() {
if (!this.closed) {
this.closed = true;
}
}
}

class FromObservableAsyncIterable<TSource, TResult = TSource> extends AsyncIterableX<TResult> {
private _observable: Observable<TSource>;
private _selector: (value: TSource, index: number) => TResult | Promise<TResult>;

constructor(
observable: Observable<TSource>,
selector: (value: TSource, index: number) => TResult | Promise<TResult>) {
super();
this._observable = observable;
this._selector = selector;
}

async *[Symbol.asyncIterator]() {
const observer = new AsyncObserver<TSource>();
const subscription = this._observable.subscribe(observer);

let i = 0;
while (1) {
if (observer.values.length > 0) {
yield await this._selector(observer.values.shift(), i++);
} else if (observer.closed) {
subscription.unsubscribe();
if (observer.hasError) {
throw observer.errorValue;
} else {
break;
}
}
}
}
}

export type AsyncIterableInput<TSource> =
Iterable<TSource | PromiseLike<TSource>> |
AsyncIterable<TSource> |
ArrayLike<TSource> |
PromiseLike<TSource> |
Observable<TSource>;

function isPromise(x: any): x is PromiseLike<any> {
return x != null && Object(x) === x && typeof x['then'] === 'function';
}

function isObservable(x: any): x is Observable<any> {
return x != null && Object(x) === x && typeof x['subscribe'] === 'function';
}

function isArrayLike(x: any): x is ArrayLike<any> {
return x != null && Object(x) === x && typeof x['length'] === 'number';
}

export function from<TSource, TResult = TSource>(
source: Iterable<TSource | PromiseLike<TSource>> | AsyncIterable<TSource> | ArrayLike<TSource>,
fn: (value: TSource, index: number) => TResult | Promise<TResult> = identityAsync,
source: AsyncIterableInput<TSource>,
selector: (value: TSource, index: number) => TResult | Promise<TResult> = identityAsync,
thisArg?: any): AsyncIterableX<TResult> {
return new FromAsyncIterable<TSource, TResult>(source, bindCallback(fn, thisArg, 2));
const fn = bindCallback(selector, thisArg, 2);
if (isIterable(source) || isAsyncIterable(source)) {
return new FromAsyncIterable<TSource, TResult>(source, fn);
}
if (isPromise(source)) {
return new FromPromiseIterable<TSource, TResult>(source, fn);
}
if (isObservable(source)) {
return new FromObservableAsyncIterable<TSource, TResult>(source, fn);
}
if (isArrayLike(source)) {
return new FromArrayIterable<TSource, TResult>(source, fn);
}

throw new TypeError('Input type not supported');
}
Loading

0 comments on commit b130e9c

Please sign in to comment.