Skip to content

Commit 51e022b

Browse files
kwonojbenlesh
authored andcommitted
feat(onErrorResumeNext): add onErrorResumeNext operator
closes #1665
1 parent 09da093 commit 51e022b

File tree

6 files changed

+278
-0
lines changed

6 files changed

+278
-0
lines changed

Diff for: spec/observables/onErrorResumeNext-spec.ts

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import * as Rx from '../../dist/cjs/Rx';
2+
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};
3+
4+
const Observable = Rx.Observable;
5+
6+
describe('Observable.onErrorResumeNext', () => {
7+
it('should continue with observables', () => {
8+
const source = hot('--a--b--#');
9+
const next1 = cold( '--c--d--#');
10+
const next2 = cold( '--e--#');
11+
const next3 = cold( '--f--g--|');
12+
const subs = '^ !';
13+
const expected = '--a--b----c--d----e----f--g--|';
14+
15+
expectObservable(Observable.onErrorResumeNext(source, next1, next2, next3)).toBe(expected);
16+
expectSubscriptions(source.subscriptions).toBe(subs);
17+
});
18+
19+
it('should continue array of observables', () => {
20+
const source = hot('--a--b--#');
21+
const next = [ source,
22+
cold( '--c--d--#'),
23+
cold( '--e--#'),
24+
cold( '--f--g--|')];
25+
const subs = '^ !';
26+
const expected = '--a--b----c--d----e----f--g--|';
27+
28+
expectObservable(Observable.onErrorResumeNext(next)).toBe(expected);
29+
expectSubscriptions(source.subscriptions).toBe(subs);
30+
});
31+
32+
it('should complete single observable throws', () => {
33+
const source = hot('#');
34+
const subs = '(^!)';
35+
const expected = '|';
36+
37+
expectObservable(Observable.onErrorResumeNext(source)).toBe(expected);
38+
expectSubscriptions(source.subscriptions).toBe(subs);
39+
});
40+
});

Diff for: spec/operators/onErrorResumeNext-spec.ts

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import {expect} from 'chai';
2+
import * as Rx from '../../dist/cjs/Rx';
3+
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};
4+
5+
const Observable = Rx.Observable;
6+
7+
describe('Observable.prototype.onErrorResumeNext', () => {
8+
asDiagram('onErrorResumeNext')('should continue observable sequence with next observable', () => {
9+
const source = hot('--a--b--#');
10+
const next = cold( '--c--d--|');
11+
const subs = '^ !';
12+
const expected = '--a--b----c--d--|';
13+
14+
expectObservable(source.onErrorResumeNext(next)).toBe(expected);
15+
expectSubscriptions(source.subscriptions).toBe(subs);
16+
});
17+
18+
it('should continue with hot observables', () => {
19+
const source = hot('--a--b--#');
20+
const next = hot('-----x----c--d--|');
21+
const subs = '^ !';
22+
const expected = '--a--b----c--d--|';
23+
24+
expectObservable(source.onErrorResumeNext(next)).toBe(expected);
25+
expectSubscriptions(source.subscriptions).toBe(subs);
26+
});
27+
28+
it('should continue with array of multiple observables throw error', () => {
29+
const source = hot('--a--b--#');
30+
const next = [cold( '--c--d--#'),
31+
cold( '--e--#'),
32+
cold( '--f--g--|')];
33+
const subs = '^ !';
34+
const expected = '--a--b----c--d----e----f--g--|';
35+
36+
expectObservable(source.onErrorResumeNext(next)).toBe(expected);
37+
expectSubscriptions(source.subscriptions).toBe(subs);
38+
});
39+
40+
it('should continue with multiple observables throw error', () => {
41+
const source = hot('--a--b--#');
42+
const next1 = cold( '--c--d--#');
43+
const next2 = cold( '--e--#');
44+
const next3 = cold( '--f--g--|');
45+
const subs = '^ !';
46+
const expected = '--a--b----c--d----e----f--g--|';
47+
48+
expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected);
49+
expectSubscriptions(source.subscriptions).toBe(subs);
50+
});
51+
52+
it('should continue with multiple observables does not throw error', () => {
53+
const source = hot('--a--b--|');
54+
const next1 = cold( '--c--d--|');
55+
const next2 = cold( '--e--|');
56+
const next3 = cold( '--f--g--|');
57+
const subs = '^ !';
58+
const expected = '--a--b----c--d----e----f--g--|';
59+
60+
expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected);
61+
expectSubscriptions(source.subscriptions).toBe(subs);
62+
});
63+
64+
it('should continue after empty observable', () => {
65+
const source = hot('|');
66+
const next1 = cold('--c--d--|');
67+
const next2 = cold( '--e--#');
68+
const next3 = cold( '--f--g--|');
69+
const subs = '^ !';
70+
const expected = '--c--d----e----f--g--|';
71+
72+
expectObservable(source.onErrorResumeNext(next1, next2, next3)).toBe(expected);
73+
expectSubscriptions(source.subscriptions).toBe(subs);
74+
});
75+
76+
it('should not complete with observble does not ends', () => {
77+
const source = hot('--a--b--|');
78+
const next1 = cold( '--');
79+
const subs = '^ ';
80+
const expected = '--a--b----';
81+
82+
expectObservable(source.onErrorResumeNext(next1)).toBe(expected);
83+
expectSubscriptions(source.subscriptions).toBe(subs);
84+
});
85+
86+
it('should not continue with observble does not ends', () => {
87+
const source = hot('--');
88+
const next1 = cold( '-a--b-');
89+
const subs = '^ ';
90+
const expected = '-';
91+
92+
expectObservable(source.onErrorResumeNext(next1)).toBe(expected);
93+
expectSubscriptions(source.subscriptions).toBe(subs);
94+
});
95+
96+
it('should complete observable with next observable throws', () => {
97+
const source = hot('--a--b--#');
98+
const next = cold( '--c--d--#');
99+
const subs = '^ !';
100+
const expected = '--a--b----c--d--|';
101+
102+
expectObservable(source.onErrorResumeNext(next)).toBe(expected);
103+
expectSubscriptions(source.subscriptions).toBe(subs);
104+
});
105+
106+
it('should work with promise', (done: MochaDone) => {
107+
const expected = [1, 2];
108+
const source = Observable.concat(Observable.of(1), Observable.throw('meh'));
109+
110+
source.onErrorResumeNext(Promise.resolve(2))
111+
.subscribe(x => {
112+
expect(expected.shift()).to.equal(x);
113+
}, (err: any) => {
114+
done(new Error('should not be called'));
115+
}, () => {
116+
expect(expected).to.be.empty;
117+
done();
118+
});
119+
});
120+
});

Diff for: src/Rx.ts

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import './add/observable/merge';
2626
import './add/observable/race';
2727
import './add/observable/never';
2828
import './add/observable/of';
29+
import './add/observable/onErrorResumeNext';
2930
import './add/observable/range';
3031
import './add/observable/using';
3132
import './add/observable/throw';
@@ -91,6 +92,7 @@ import './add/operator/mergeScan';
9192
import './add/operator/min';
9293
import './add/operator/multicast';
9394
import './add/operator/observeOn';
95+
import './add/operator/onErrorResumeNext';
9496
import './add/operator/pairwise';
9597
import './add/operator/partition';
9698
import './add/operator/pluck';

Diff for: src/add/observable/onErrorResumeNext.ts

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import {Observable} from '../../Observable';
2+
import {onErrorResumeNextStatic} from '../../operator/onErrorResumeNext';
3+
4+
Observable.onErrorResumeNext = onErrorResumeNextStatic;
5+
6+
declare module '../../Observable' {
7+
namespace Observable {
8+
export let onErrorResumeNext: typeof onErrorResumeNextStatic;
9+
}
10+
}

Diff for: src/add/operator/onErrorResumeNext.ts

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import {Observable} from '../../Observable';
2+
import {onErrorResumeNext, OnErrorResumeNextSignature} from '../../operator/onErrorResumeNext';
3+
4+
Observable.prototype.onErrorResumeNext = onErrorResumeNext;
5+
6+
declare module '../../Observable' {
7+
interface Observable<T> {
8+
onErrorResumeNext: OnErrorResumeNextSignature<T>;
9+
}
10+
}

Diff for: src/operator/onErrorResumeNext.ts

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import {Observable, ObservableInput} from '../Observable';
2+
import {FromObservable} from '../observable/FromObservable';
3+
import {Operator} from '../Operator';
4+
import {Subscriber} from '../Subscriber';
5+
import {isArray} from '../util/isArray';
6+
import {OuterSubscriber} from '../OuterSubscriber';
7+
import {InnerSubscriber} from '../InnerSubscriber';
8+
import {subscribeToResult} from '../util/subscribeToResult';
9+
10+
export function onErrorResumeNext<T, R>(...nextSources: Array<ObservableInput<any> |
11+
Array<ObservableInput<any>> |
12+
((...values: Array<any>) => R)>): Observable<R> {
13+
if (nextSources.length === 1 && isArray(nextSources[0])) {
14+
nextSources = <Array<Observable<any>>>nextSources[0];
15+
}
16+
17+
return this.lift(new OnErrorResumeNextOperator<T, R>(nextSources));
18+
}
19+
20+
/* tslint:disable:max-line-length */
21+
export interface OnErrorResumeNextSignature<T> {
22+
<R>(v: ObservableInput<R>): Observable<R>;
23+
<T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<R>;
24+
<T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<R>;
25+
<T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<R>;
26+
<T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<R>;
27+
28+
<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
29+
<R>(array: ObservableInput<any>[]): Observable<R>;
30+
}
31+
/* tslint:enable:max-line-length */
32+
33+
/* tslint:disable:max-line-length */
34+
export function onErrorResumeNextStatic<R>(v: ObservableInput<R>): Observable<R>;
35+
export function onErrorResumeNextStatic<T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<R>;
36+
export function onErrorResumeNextStatic<T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<R>;
37+
export function onErrorResumeNextStatic<T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<R>;
38+
export function onErrorResumeNextStatic<T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<R>;
39+
40+
export function onErrorResumeNextStatic<R>(...observables: Array<ObservableInput<any> | ((...values: Array<any>) => R)>): Observable<R>;
41+
export function onErrorResumeNextStatic<R>(array: ObservableInput<any>[]): Observable<R>;
42+
/* tslint:enable:max-line-length */
43+
44+
export function onErrorResumeNextStatic<T, R>(...nextSources: Array<ObservableInput<any> |
45+
Array<ObservableInput<any>> |
46+
((...values: Array<any>) => R)>): Observable<R> {
47+
let source: ObservableInput<any> = null;
48+
49+
if (nextSources.length === 1 && isArray(nextSources[0])) {
50+
nextSources = <Array<ObservableInput<any>>>nextSources[0];
51+
}
52+
source = nextSources.shift();
53+
54+
return new FromObservable(source, null).lift(new OnErrorResumeNextOperator<T, R>(nextSources));
55+
}
56+
57+
class OnErrorResumeNextOperator<T, R> implements Operator<T, R> {
58+
constructor(private nextSources: Array<ObservableInput<any>>) {
59+
}
60+
61+
call(subscriber: Subscriber<R>, source: any): any {
62+
return source._subscribe(new OnErrorResumeNextSubscriber(subscriber, this.nextSources));
63+
}
64+
}
65+
66+
class OnErrorResumeNextSubscriber<T, R> extends OuterSubscriber<T, R> {
67+
constructor(protected destination: Subscriber<T>,
68+
private nextSources: Array<ObservableInput<any>>) {
69+
super(destination);
70+
}
71+
72+
notifyError(error: any, innerSub: InnerSubscriber<T, any>): void {
73+
this.subscribeToNextSource();
74+
}
75+
76+
notifyComplete(innerSub: InnerSubscriber<T, any>): void {
77+
this.subscribeToNextSource();
78+
}
79+
80+
protected _error(err: any): void {
81+
this.subscribeToNextSource();
82+
}
83+
84+
protected _complete(): void {
85+
this.subscribeToNextSource();
86+
}
87+
88+
private subscribeToNextSource(): void {
89+
const next = this.nextSources.shift();
90+
if (next) {
91+
this.add(subscribeToResult(this, next));
92+
} else {
93+
this.destination.complete();
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)