-
Notifications
You must be signed in to change notification settings - Fork 3k
/
Copy pathcatchError.ts
141 lines (136 loc) · 4.51 KB
/
catchError.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/** @prettier */
import { Observable } from '../Observable';
import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
import { Subscription } from '../Subscription';
import { innerFrom } from '../observable/from';
import { OperatorSubscriber } from './OperatorSubscriber';
import { operate } from '../util/lift';
/* tslint:disable:max-line-length */
export function catchError<T, O extends ObservableInput<any>>(
selector: (err: any, caught: Observable<T>) => O
): OperatorFunction<T, T | ObservedValueOf<O>>;
/* tslint:enable:max-line-length */
/**
* Catches errors on the observable to be handled by returning a new observable or throwing an error.
*
* <span class="informal">
* It only listens to the error channel and ignores notifications.
* Handles errors from the source observable, and maps them to a new observable.
* The error may also be rethrown, or a new error can be thrown to emit an error from the result.
* </span>
*
* 
*
* This operator handles errors, but forwards along all other events to the resulting observable.
* If the source observable terminates with an error, it will map that error to a new observable,
* subscribe to it, and forward all of its events to the resulting observable.
*
* ## Examples
* Continues with a different Observable when there's an error
*
* ```ts
* import { of } from 'rxjs';
* import { map, catchError } from 'rxjs/operators';
*
* of(1, 2, 3, 4, 5).pipe(
* map(n => {
* if (n === 4) {
* throw 'four!';
* }
* return n;
* }),
* catchError(err => of('I', 'II', 'III', 'IV', 'V')),
* )
* .subscribe(x => console.log(x));
* // 1, 2, 3, I, II, III, IV, V
* ```
*
* Retries the caught source Observable again in case of error, similar to retry() operator
*
* ```ts
* import { of } from 'rxjs';
* import { map, catchError, take } from 'rxjs/operators';
*
* of(1, 2, 3, 4, 5).pipe(
* map(n => {
* if (n === 4) {
* throw 'four!';
* }
* return n;
* }),
* catchError((err, caught) => caught),
* take(30),
* )
* .subscribe(x => console.log(x));
* // 1, 2, 3, 1, 2, 3, ...
* ```
*
* Throws a new error when the source Observable throws an error
*
* ```ts
* import { of } from 'rxjs';
* import { map, catchError } from 'rxjs/operators';
*
* of(1, 2, 3, 4, 5).pipe(
* map(n => {
* if (n === 4) {
* throw 'four!';
* }
* return n;
* }),
* catchError(err => {
* throw 'error in source. Details: ' + err;
* }),
* )
* .subscribe(
* x => console.log(x),
* err => console.log(err)
* );
* // 1, 2, 3, error in source. Details: four!
* ```
*
* @see {@link onErrorResumeNext}
* @see {@link repeat}
* @see {@link repeatWhen}
* @see {@link retry }
* @see {@link retryWhen}
*
* @param {function} selector a function that takes as arguments `err`, which is the error, and `caught`, which
* is the source observable, in case you'd like to "retry" that observable by returning it again. Whatever observable
* is returned by the `selector` will be used to continue the observable chain.
* @return {Observable} An observable that originates from either the source or the observable returned by the
* catch `selector` function.
*/
export function catchError<T, O extends ObservableInput<any>>(
selector: (err: any, caught: Observable<T>) => O
): OperatorFunction<T, T | ObservedValueOf<O>> {
return operate((source, subscriber) => {
let innerSub: Subscription | null = null;
let syncUnsub = false;
let handledResult: Observable<ObservedValueOf<O>>;
innerSub = source.subscribe(
new OperatorSubscriber(subscriber, undefined, (err) => {
handledResult = innerFrom(selector(err, catchError(selector)(source)));
if (innerSub) {
innerSub.unsubscribe();
innerSub = null;
handledResult.subscribe(subscriber);
} else {
// We don't have an innerSub yet, that means the error was synchronous
// because the subscribe call hasn't returned yet.
syncUnsub = true;
}
})
);
if (syncUnsub) {
// We have a synchronous error, we need to make sure to
// teardown right away. This ensures that `finalize` is called
// at the right time, and that teardown occurs at the expected
// time between the source error and the subscription to the
// next observable.
innerSub.unsubscribe();
innerSub = null;
handledResult!.subscribe(subscriber);
}
});
}