From cd9626a4f93cac6f631d5a97dd9c9b2aa8e4b5db Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Fri, 10 Nov 2017 23:38:56 +0100 Subject: [PATCH] feat(Observable): unhandled errors are now reported to HostReportErrors (#3062) BREAKING CHANGE: Unhandled errors are no longer caught and rethrown, rather they are caught and scheduled to be thrown, which causes them to be reported to window.onerror or process.on('error'), depending on the environment. Consequently, teardown after a synchronous, unhandled, error will no longer occur, as the teardown would not exist, and producer interference cannot occur --- spec/Observable-spec.ts | 322 ++++++++++---------------- spec/Subscriber-spec.ts | 26 --- spec/observables/dom/ajax-spec.ts | 28 ++- spec/observables/from-promise-spec.ts | 54 +---- spec/observables/fromEvent-spec.ts | 5 +- spec/operators/catch-spec.ts | 38 --- spec/operators/do-spec.ts | 9 +- spec/operators/map-spec.ts | 18 +- spec/operators/mapTo-spec.ts | 15 +- spec/util/subscribeToResult-spec.ts | 14 +- src/Observable.ts | 31 +-- src/Subscriber.ts | 82 ++----- src/observable/ErrorObservable.ts | 2 - src/operators/tap.ts | 78 ++++--- src/util/subscribeToResult.ts | 1 - 15 files changed, 217 insertions(+), 506 deletions(-) diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 9d3d7590fb..88f3be9cfd 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -12,7 +12,6 @@ declare const cold: typeof marbleTestingSignature.cold; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; -const Subscriber = Rx.Subscriber; const Observable = Rx.Observable; declare const __root__: any; @@ -49,18 +48,6 @@ describe('Observable', () => { }); }); - it('should not send error to error handler for observable have source', () => { - const source = Observable.of(1); - const observable = new Observable(); - (observable as any).source = source; - - expect(() => { - observable.subscribe((x) => { - throw new Error('error'); - }); - }).to.throw(); - }); - describe('forEach', () => { it('should iterate and return a Promise', (done: MochaDone) => { const expected = [1, 2, 3]; @@ -123,66 +110,62 @@ describe('Observable', () => { }); }); - it('should handle a synchronous throw from the next handler and tear down', (done: MochaDone) => { - const expected = new Error('I told, you Bobby Boucher, twos are the debil!'); - let unsubscribeCalled = false; + it('should handle a synchronous throw from the next handler', () => { + const expected = new Error('I told, you Bobby Boucher, threes are the debil!'); const syncObservable = new Observable((observer: Rx.Observer) => { observer.next(1); observer.next(2); observer.next(3); - - return () => { - unsubscribeCalled = true; - }; + observer.next(4); }); - const results = []; - syncObservable.forEach((x) => { + const results: Array = []; + + return syncObservable.forEach((x) => { results.push(x); - if (x === 2) { + if (x === 3) { throw expected; } }).then( () => { - done(new Error('should not be called')); + throw new Error('should not be called'); }, (err) => { results.push(err); - expect(results).to.deep.equal([1, 2, expected]); - expect(unsubscribeCalled).to.be.true; - done(); - }); + // Since the consuming code can no longer interfere with the synchronous + // producer, the remaining results are nexted. + expect(results).to.deep.equal([1, 2, 3, 4, expected]); + } + ); }); - it('should handle an asynchronous throw from the next handler and tear down', (done: MochaDone) => { + it('should handle an asynchronous throw from the next handler and tear down', () => { const expected = new Error('I told, you Bobby Boucher, twos are the debil!'); - let unsubscribeCalled = false; - const syncObservable = new Observable((observer: Rx.Observer) => { + const asyncObservable = new Observable((observer: Rx.Observer) => { let i = 1; const id = setInterval(() => observer.next(i++), 1); return () => { clearInterval(id); - unsubscribeCalled = true; }; }); - const results = []; - syncObservable.forEach((x) => { + const results: Array = []; + + return asyncObservable.forEach((x) => { results.push(x); if (x === 2) { throw expected; } }).then( () => { - done(new Error('should not be called')); + throw new Error('should not be called'); }, (err) => { results.push(err); expect(results).to.deep.equal([1, 2, expected]); - expect(unsubscribeCalled).to.be.true; - done(); - }); + } + ); }); }); @@ -263,36 +246,14 @@ describe('Observable', () => { expect(unsubscribeCalled).to.be.false; }); - it('should run unsubscription logic when an error is sent synchronously and subscribe is called with no arguments', () => { - let unsubscribeCalled = false; - const source = new Observable((subscriber: Rx.Subscriber) => { - subscriber.error(0); - return () => { - unsubscribeCalled = true; - }; - }); - - try { - source.subscribe(); - } catch (e) { - // error'ing to an empty Observer re-throws, so catch and ignore it here. - } - - expect(unsubscribeCalled).to.be.true; - }); - it('should run unsubscription logic when an error is sent asynchronously and subscribe is called with no arguments', (done: MochaDone) => { const sandbox = sinon.sandbox.create(); const fakeTimer = sandbox.useFakeTimers(); let unsubscribeCalled = false; - const source = new Observable((subscriber: Rx.Subscriber) => { + const source = new Observable(observer => { const id = setInterval(() => { - try { - subscriber.error(0); - } catch (e) { - // asynchronously error'ing to an empty Observer re-throws, so catch and ignore it here. - } + observer.error(0); }, 1); return () => { clearInterval(id); @@ -300,7 +261,11 @@ describe('Observable', () => { }; }); - source.subscribe(); + source.subscribe({ + error (err) { + /* noop: expected error */ + } + }); setTimeout(() => { let err; @@ -343,128 +308,93 @@ describe('Observable', () => { expect(unsubscribeCalled).to.be.true; }); - it('should run unsubscription logic when an error is thrown sending messages synchronously', () => { - let messageError = false; - let messageErrorValue = false; - let unsubscribeCalled = false; - - let sub; - const source = new Observable((observer: Rx.Observer) => { - observer.next('boo!'); - return () => { - unsubscribeCalled = true; - }; - }); - - try { - sub = source.subscribe((x: string) => { throw x; }); - } catch (e) { - messageError = true; - messageErrorValue = e; - } - - expect(sub).to.be.a('undefined'); - expect(unsubscribeCalled).to.be.true; - expect(messageError).to.be.true; - expect(messageErrorValue).to.equal('boo!'); - }); + it('should ignore next messages after unsubscription', (done) => { + let times = 0; - it('should dispose of the subscriber when an error is thrown sending messages synchronously', () => { - let messageError = false; - let messageErrorValue = false; - let unsubscribeCalled = false; + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + }); - let sub; - const subscriber = new Subscriber((x: string) => { throw x; }); - const source = new Observable((observer: Rx.Observer) => { - observer.next('boo!'); return () => { - unsubscribeCalled = true; + clearInterval(id); + expect(times).to.equal(2); + done(); }; - }); - - try { - sub = source.subscribe(subscriber); - } catch (e) { - messageError = true; - messageErrorValue = e; - } - - expect(sub).to.be.a('undefined'); - expect(subscriber.closed).to.be.true; - expect(unsubscribeCalled).to.be.true; - expect(messageError).to.be.true; - expect(messageErrorValue).to.equal('boo!'); - }); - - it('should ignore next messages after unsubscription', () => { - let times = 0; - - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.next(0); }) .do(() => times += 1) .subscribe( function() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } } ); - expect(times).to.equal(2); }); - it('should ignore error messages after unsubscription', () => { + it('should ignore error messages after unsubscription', (done) => { let times = 0; let errorCalled = false; - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.error(0); + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + if (i === 3) { + observer.error(new Error()); + } + }); + + return () => { + clearInterval(id); + expect(times).to.equal(2); + expect(errorCalled).to.be.false; + done(); + }; }) .do(() => times += 1) .subscribe( function() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } }, function() { errorCalled = true; } ); - - expect(times).to.equal(2); - expect(errorCalled).to.be.false; }); - it('should ignore complete messages after unsubscription', () => { + it('should ignore complete messages after unsubscription', (done) => { let times = 0; let completeCalled = false; - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.complete(); + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + if (i === 3) { + observer.complete(); + } + }); + + return () => { + clearInterval(id); + expect(times).to.equal(2); + expect(completeCalled).to.be.false; + done(); + }; }) .do(() => times += 1) .subscribe( function() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } }, null, function() { completeCalled = true; } ); - - expect(times).to.equal(2); - expect(completeCalled).to.be.false; }); describe('when called with an anonymous observer', () => { @@ -518,109 +448,91 @@ describe('Observable', () => { }).not.to.throw(); }); - it('should run unsubscription logic when an error is thrown sending messages synchronously to an' + - ' anonymous observer', () => { - let messageError = false; - let messageErrorValue = false; - let unsubscribeCalled = false; + it('should ignore next messages after unsubscription', (done) => { + let times = 0; - //intentionally not using lambda to avoid typescript's this context capture - const o = { - myValue: 'foo', - next: function next(x) { - expect(this.myValue).to.equal('foo'); - throw x; - } - }; + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + }); - let sub; - const source = new Observable((observer: Rx.Observer) => { - observer.next('boo!'); return () => { - unsubscribeCalled = true; + clearInterval(id); + expect(times).to.equal(2); + done(); }; - }); - - try { - sub = source.subscribe(o); - } catch (e) { - messageError = true; - messageErrorValue = e; - } - - expect(sub).to.be.a('undefined'); - expect(unsubscribeCalled).to.be.true; - expect(messageError).to.be.true; - expect(messageErrorValue).to.equal('boo!'); - }); - - it('should ignore next messages after unsubscription', () => { - let times = 0; - - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.next(0); }) .do(() => times += 1) .subscribe({ next() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } } }); - - expect(times).to.equal(2); }); - it('should ignore error messages after unsubscription', () => { + it('should ignore error messages after unsubscription', (done) => { let times = 0; let errorCalled = false; - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.error(0); + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + if (i === 3) { + observer.error(new Error()); + } + }); + return () => { + clearInterval(id); + expect(times).to.equal(2); + expect(errorCalled).to.be.false; + done(); + }; }) .do(() => times += 1) .subscribe({ next() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } }, error() { errorCalled = true; } }); - - expect(times).to.equal(2); - expect(errorCalled).to.be.false; }); - it('should ignore complete messages after unsubscription', () => { + it('should ignore complete messages after unsubscription', (done) => { let times = 0; let completeCalled = false; - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.complete(); + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + if (i === 3) { + observer.complete(); + } + }); + + return () => { + clearInterval(id); + expect(times).to.equal(2); + expect(completeCalled).to.be.false; + done(); + }; }) .do(() => times += 1) .subscribe({ next() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } }, complete() { completeCalled = true; } }); - expect(times).to.equal(2); - expect(completeCalled).to.be.false; }); }); }); diff --git a/spec/Subscriber-spec.ts b/spec/Subscriber-spec.ts index cd2d13e18c..46de3d52b6 100644 --- a/spec/Subscriber-spec.ts +++ b/spec/Subscriber-spec.ts @@ -1,36 +1,10 @@ import { expect } from 'chai'; -import * as sinon from 'sinon'; import * as Rx from '../src/Rx'; const Subscriber = Rx.Subscriber; /** @test {Subscriber} */ describe('Subscriber', () => { - describe('when created through create()', () => { - it('should not call error() if next() handler throws an error', () => { - const errorSpy = sinon.spy(); - const completeSpy = sinon.spy(); - - const subscriber = Subscriber.create( - (value: any) => { - if (value === 2) { - throw 'error!'; - } - }, - errorSpy, - completeSpy - ); - - subscriber.next(1); - expect(() => { - subscriber.next(2); - }).to.throw('error!'); - - expect(errorSpy).not.have.been.called; - expect(completeSpy).not.have.been.called; - }); - }); - it('should ignore next messages after unsubscription', () => { let times = 0; diff --git a/spec/observables/dom/ajax-spec.ts b/spec/observables/dom/ajax-spec.ts index 19c3e271d7..8ef53dc638 100644 --- a/spec/observables/dom/ajax-spec.ts +++ b/spec/observables/dom/ajax-spec.ts @@ -57,7 +57,7 @@ describe('Observable.ajax', () => { expect(axObjectStub).to.have.been.called; }); - it('should throw if not able to create XMLHttpRequest', () => { + it('should raise an error if not able to create XMLHttpRequest', () => { root.XMLHttpRequest = null; root.ActiveXObject = null; @@ -66,9 +66,7 @@ describe('Observable.ajax', () => { method: '' }; - expect(() => { - Rx.Observable.ajax(obj).subscribe(); - }).to.throw(); + Rx.Observable.ajax(obj).subscribe(null, err => expect(err).to.exist); }); it('should create XMLHttpRequest for CORS', () => { @@ -100,7 +98,7 @@ describe('Observable.ajax', () => { expect(xDomainStub).to.have.been.called; }); - it('should throw if not able to create CORS request', () => { + it('should raise an error if not able to create CORS request', () => { root.XMLHttpRequest = null; root.XDomainRequest = null; @@ -111,9 +109,7 @@ describe('Observable.ajax', () => { withCredentials: true }; - expect(() => { - Rx.Observable.ajax(obj).subscribe(); - }).to.throw(); + Rx.Observable.ajax(obj).subscribe(null, err => expect(err).to.exist); }); it('should set headers', () => { @@ -805,7 +801,7 @@ describe('Observable.ajax', () => { it('should work fine when XMLHttpRequest ontimeout property is monkey patched', function() { Object.defineProperty(root.XMLHttpRequest.prototype, 'ontimeout', { - set: function (fn: (e: ProgressEvent) => any) { + set(fn: (e: ProgressEvent) => any) { const wrapFn = (ev: ProgressEvent) => { const result = fn.call(this, ev); if (result === false) { @@ -825,7 +821,11 @@ describe('Observable.ajax', () => { }; Rx.Observable.ajax(ajaxRequest) - .subscribe(); + .subscribe({ + error(err) { + /* expected, ignore */ + } + }); const request = MockXMLHttpRequest.mostRecent; try { @@ -881,7 +881,7 @@ describe('Observable.ajax', () => { it('should work fine when XMLHttpRequest onerror property is monkey patched', function() { Object.defineProperty(root.XMLHttpRequest.prototype, 'onerror', { - set: function (fn: (e: ProgressEvent) => any) { + set(fn: (e: ProgressEvent) => any) { const wrapFn = (ev: ProgressEvent) => { const result = fn.call(this, ev); if (result === false) { @@ -899,7 +899,11 @@ describe('Observable.ajax', () => { Rx.Observable.ajax({ url: '/flibbertyJibbet' }) - .subscribe(); + .subscribe({ + error(err) { + /* expected */ + } + }); const request = MockXMLHttpRequest.mostRecent; diff --git a/spec/observables/from-promise-spec.ts b/spec/observables/from-promise-spec.ts index 1bb3f1369b..63f33164a5 100644 --- a/spec/observables/from-promise-spec.ts +++ b/spec/observables/from-promise-spec.ts @@ -155,56 +155,4 @@ describe('Observable.fromPromise', () => { done(); }); }); - - if (typeof process === 'object' && Object.prototype.toString.call(process) === '[object process]') { - it('should globally throw unhandled errors on process', (done: MochaDone) => { - const originalException = process.listeners('uncaughtException'); - process.removeAllListeners('uncaughtException'); - process.once('uncaughtException', function (error) { - expect(error).to.be.an('error', 'fail'); - originalException.forEach(l => process.addListener('uncaughtException', l)); - done(); - }); - - Observable.fromPromise(Promise.reject('bad')) - .subscribe( - (x: any) => { done(new Error('should not be called')); }, - (e: any) => { - expect(e).to.equal('bad'); - throw new Error('fail'); - }, () => { - done(new Error('should not be called')); - }); - }); - } else if (typeof window === 'object' && - (Object.prototype.toString.call(window) === '[object global]' || Object.prototype.toString.call(window) === '[object Window]')) { - it('should globally throw unhandled errors on window', (done: MochaDone) => { - const expected = ['Uncaught fail', 'fail', 'Script error.', 'uncaught exception: fail']; - const current = window.onerror; - window.onerror = null; - - let invoked = false; - function onException(e) { - if (invoked) { - return; - } - invoked = true; - expect(expected).to.contain(e); - window.onerror = current; - done(); - } - - window.onerror = onException; - - Observable.fromPromise(Promise.reject('bad')) - .subscribe( - (x: any) => { done(new Error('should not be called')); }, - (e: any) => { - expect(e).to.equal('bad'); - throw 'fail'; - }, () => { - done(new Error('should not be called')); - }); - }); - } -}); \ No newline at end of file +}); diff --git a/spec/observables/fromEvent-spec.ts b/spec/observables/fromEvent-spec.ts index d6c0cfe0cb..00a297a07c 100644 --- a/spec/observables/fromEvent-spec.ts +++ b/spec/observables/fromEvent-spec.ts @@ -125,8 +125,9 @@ describe('Observable.fromEvent', () => { } }; - const subscribe = () => Observable.fromEvent(obj, 'click').subscribe(); - expect(subscribe).to.throw(TypeError, 'Invalid event target'); + Observable.fromEvent(obj as any, 'click').subscribe({ + error(err) { expect(err).to.deep.equal(new TypeError('Invalid event target')); } + }); }); it('should pass through options to addEventListener', () => { diff --git a/spec/operators/catch-spec.ts b/spec/operators/catch-spec.ts index 22996203b8..13477fcfce 100644 --- a/spec/operators/catch-spec.ts +++ b/spec/operators/catch-spec.ts @@ -297,44 +297,6 @@ describe('Observable.prototype.catch', () => { sandbox.restore(); }); - it('should chain a throw from a promise using throw', (done: MochaDone) => { - const subscribeSpy = sinon.spy(); - const testError = new Error('BROKEN PROMISE'); - Observable.fromPromise(Promise.reject(testError)).catch(err => { - throw new Error('BROKEN THROW'); - }).subscribe(subscribeSpy); - - trueSetTimeout(() => { - try { - timers.tick(1); - } catch (e) { - expect(subscribeSpy).not.to.be.called; - expect(e.message).to.equal('BROKEN THROW'); - return done(); - } - done(new Error('This should have thrown an error')); - }, 0); - }); - - it('should chain a throw from a promise using Observable.throw', (done: MochaDone) => { - const subscribeSpy = sinon.spy(); - const testError = new Error('BROKEN PROMISE'); - Observable.fromPromise(Promise.reject(testError)).catch(err => - Observable.throw(new Error('BROKEN THROW')) - ).subscribe(subscribeSpy); - - trueSetTimeout(() => { - try { - timers.tick(1); - } catch (e) { - expect(subscribeSpy).not.to.be.called; - expect(e.message).to.equal('BROKEN THROW'); - return done(); - } - done(new Error('This should have thrown an error')); - }, 0); - }); - it('should chain a throw from a promise using Observable.throw', (done: MochaDone) => { const subscribeSpy = sinon.spy(); const errorSpy = sinon.spy(); diff --git a/spec/operators/do-spec.ts b/spec/operators/do-spec.ts index 2c22d7f488..397686da83 100644 --- a/spec/operators/do-spec.ts +++ b/spec/operators/do-spec.ts @@ -13,7 +13,8 @@ const Subject = Rx.Subject; /** @test {do} */ describe('Observable.prototype.do', () => { - asDiagram('do(x => console.log(x))')('should mirror multiple values and complete', () => { + asDiagram('do(x => console.log(x))') + ('should mirror multiple values and complete', () => { const e1 = cold('--1--2--3--|'); const e1subs = '^ !'; const expected = '--1--2--3--|'; @@ -68,8 +69,8 @@ describe('Observable.prototype.do', () => { it('should handle everything with a Subject', (done: MochaDone) => { const expected = [1, 2, 3]; - const results = []; - const subject = new Subject(); + const results: number[] = []; + const subject = new Subject(); subject.subscribe({ next: (x: any) => { @@ -85,7 +86,7 @@ describe('Observable.prototype.do', () => { }); Observable.of(1, 2, 3) - .do(>subject) + .do(subject) .subscribe(); }); diff --git a/spec/operators/map-spec.ts b/spec/operators/map-spec.ts index 5b23d44735..e296e485e4 100644 --- a/spec/operators/map-spec.ts +++ b/spec/operators/map-spec.ts @@ -13,7 +13,6 @@ const Observable = Rx.Observable; // function shortcuts const addDrama = function (x) { return x + '!'; }; const identity = function (x) { return x; }; -const throwError = function () { throw new Error(); }; /** @test {map} */ describe('Observable.prototype.map', () => { @@ -89,16 +88,6 @@ describe('Observable.prototype.map', () => { expectSubscriptions(a.subscriptions).toBe(asubs); }); - it('should propagate errors from subscribe', () => { - const r = () => { - Observable.of(1) - .map(identity) - .subscribe(throwError); - }; - - expect(r).to.throw(); - }); - it('should not map an empty observable', () => { const a = cold('|'); const asubs = '(^!)'; @@ -187,19 +176,14 @@ describe('Observable.prototype.map', () => { const expected = '--a--b---c----d--|'; const values = {a: 5, b: 14, c: 23, d: 32}; - let invoked = 0; const foo = { value: 42 }; const r = a .map(function (x: string, index: number) { - invoked++; expect(this).to.equal(foo); return (parseInt(x) + 1) + (index * 10); - }, foo) - .do(null, null, () => { - expect(invoked).to.equal(4); - }); + }, foo); expectObservable(r).toBe(expected, values); expectSubscriptions(a.subscriptions).toBe(asubs); diff --git a/spec/operators/mapTo-spec.ts b/spec/operators/mapTo-spec.ts index b270d3b1db..39a0e3bdec 100644 --- a/spec/operators/mapTo-spec.ts +++ b/spec/operators/mapTo-spec.ts @@ -1,4 +1,4 @@ -import { expect } from 'chai'; + import * as Rx from '../../src/Rx'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports @@ -10,9 +10,6 @@ declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscript const Observable = Rx.Observable; -// function shortcuts -const throwError = function () { throw new Error(); }; - /** @test {mapTo} */ describe('Observable.prototype.mapTo', () => { asDiagram('mapTo(\'a\')')('should map multiple values', () => { @@ -61,16 +58,6 @@ describe('Observable.prototype.mapTo', () => { expectSubscriptions(a.subscriptions).toBe(asubs); }); - it('should propagate errors from subscribe', () => { - const r = () => { - Observable.of(1) - .mapTo(-1) - .subscribe(throwError); - }; - - expect(r).to.throw(); - }); - it('should not map an empty observable', () => { const a = cold('|'); const asubs = '(^!)'; diff --git a/spec/util/subscribeToResult-spec.ts b/spec/util/subscribeToResult-spec.ts index d7810c587c..c82ec2f4d5 100644 --- a/spec/util/subscribeToResult-spec.ts +++ b/spec/util/subscribeToResult-spec.ts @@ -4,8 +4,6 @@ import { subscribeToResult } from '../../src/util/subscribeToResult'; import { OuterSubscriber } from '../../src/OuterSubscriber'; import { $$iterator } from '../../src/symbol/iterator'; import $$symbolObservable from 'symbol-observable'; -import { Observable } from '../../src/Observable'; -import { Subject } from '../../src/Subject'; describe('subscribeToResult', () => { it('should synchronously complete when subscribe to scalarObservable', () => { @@ -180,14 +178,4 @@ describe('subscribeToResult', () => { subscribeToResult(subscriber, null); }); - - it('should not swallow exception in inner subscriber', () => { - const source = new Subject(); - - source.mergeMapTo(Observable.of(1, 2, 3)).subscribe(() => { - throw new Error('meh'); - }); - - expect(() => source.next()).to.throw(); - }); -}); \ No newline at end of file +}); diff --git a/src/Observable.ts b/src/Observable.ts index d09810c843..adb3d6e5f2 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -203,13 +203,6 @@ export class Observable implements Subscribable { sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink)); } - if (sink.syncErrorThrowable) { - sink.syncErrorThrowable = false; - if (sink.syncErrorThrown) { - throw sink.syncErrorValue; - } - } - return sink; } @@ -217,8 +210,6 @@ export class Observable implements Subscribable { try { return this._subscribe(sink); } catch (err) { - sink.syncErrorThrown = true; - sink.syncErrorValue = err; sink.error(err); } } @@ -248,25 +239,13 @@ export class Observable implements Subscribable { // accessing subscription below in the closure due to Temporal Dead Zone. let subscription: Subscription; subscription = this.subscribe((value) => { - if (subscription) { - // if there is a subscription, then we can surmise - // the next handling is asynchronous. Any errors thrown - // need to be rejected explicitly and unsubscribe must be - // called manually - try { - next(value); - } catch (err) { - reject(err); + try { + next(value); + } catch (err) { + reject(err); + if (subscription) { subscription.unsubscribe(); } - } else { - // if there is NO subscription, then we're getting a nexted - // value synchronously during subscription. We can just call it. - // If it errors, Observable's `subscribe` will ensure the - // unsubscription logic is called, then synchronously rethrow the error. - // After that, Promise will trap the error and send it - // down the rejection path. - next(value); } }, reject, resolve); }); diff --git a/src/Subscriber.ts b/src/Subscriber.ts index 36d8459191..9d1ed87f57 100644 --- a/src/Subscriber.ts +++ b/src/Subscriber.ts @@ -33,14 +33,9 @@ export class Subscriber extends Subscription implements Observer { error?: (e?: any) => void, complete?: () => void): Subscriber { const subscriber = new Subscriber(next, error, complete); - subscriber.syncErrorThrowable = false; return subscriber; } - public syncErrorValue: any = null; - public syncErrorThrown: boolean = false; - public syncErrorThrowable: boolean = false; - protected isStopped: boolean = false; protected destination: PartialObserver; // this `any` is the escape hatch to erase extra type param (e.g. R) @@ -71,14 +66,12 @@ export class Subscriber extends Subscription implements Observer { this.destination = (> destinationOrNext); ( this.destination).add(this); } else { - this.syncErrorThrowable = true; - this.destination = new SafeSubscriber(this, > destinationOrNext); + this.destination = new SafeSubscriber(> destinationOrNext); } break; } default: - this.syncErrorThrowable = true; - this.destination = new SafeSubscriber(this, <((value: T) => void)> destinationOrNext, error, complete); + this.destination = new SafeSubscriber(<((value: T) => void)> destinationOrNext, error, complete); break; } } @@ -167,8 +160,7 @@ class SafeSubscriber extends Subscriber { private _context: any; - constructor(private _parentSubscriber: Subscriber, - observerOrNext?: PartialObserver | ((value: T) => void), + constructor(observerOrNext?: PartialObserver | ((value: T) => void), error?: (e?: any) => void, complete?: () => void) { super(); @@ -199,10 +191,10 @@ class SafeSubscriber extends Subscriber { next(value?: T): void { if (!this.isStopped && this._next) { - const { _parentSubscriber } = this; - if (!_parentSubscriber.syncErrorThrowable) { - this.__tryOrUnsub(this._next, value); - } else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) { + try { + this._next.call(this._context, value); + } catch (err) { + this._hostReportError(err); this.unsubscribe(); } } @@ -210,69 +202,37 @@ class SafeSubscriber extends Subscriber { error(err?: any): void { if (!this.isStopped) { - const { _parentSubscriber } = this; if (this._error) { - if (!_parentSubscriber.syncErrorThrowable) { - this.__tryOrUnsub(this._error, err); - this.unsubscribe(); - } else { - this.__tryOrSetError(_parentSubscriber, this._error, err); - this.unsubscribe(); + try { + this._error.call(this._context, err); + } catch (err) { + this._hostReportError(err); } - } else if (!_parentSubscriber.syncErrorThrowable) { - this.unsubscribe(); - throw err; } else { - _parentSubscriber.syncErrorValue = err; - _parentSubscriber.syncErrorThrown = true; - this.unsubscribe(); + this._hostReportError(err); } + this.unsubscribe(); } } complete(): void { if (!this.isStopped) { - const { _parentSubscriber } = this; if (this._complete) { - const wrappedComplete = () => this._complete.call(this._context); - - if (!_parentSubscriber.syncErrorThrowable) { - this.__tryOrUnsub(wrappedComplete); - this.unsubscribe(); - } else { - this.__tryOrSetError(_parentSubscriber, wrappedComplete); - this.unsubscribe(); + try { + this._complete.call(this._context); + } catch (err) { + this._hostReportError(err); } - } else { - this.unsubscribe(); } - } - } - - private __tryOrUnsub(fn: Function, value?: any): void { - try { - fn.call(this._context, value); - } catch (err) { this.unsubscribe(); - throw err; } } - private __tryOrSetError(parent: Subscriber, fn: Function, value?: any): boolean { - try { - fn.call(this._context, value); - } catch (err) { - parent.syncErrorValue = err; - parent.syncErrorThrown = true; - return true; - } - return false; - } - protected _unsubscribe(): void { - const { _parentSubscriber } = this; this._context = null; - this._parentSubscriber = null; - _parentSubscriber.unsubscribe(); + } + + private _hostReportError(err: any) { + setTimeout(() => { throw err; }); } } diff --git a/src/observable/ErrorObservable.ts b/src/observable/ErrorObservable.ts index e759bc813a..34eb9fb44d 100644 --- a/src/observable/ErrorObservable.ts +++ b/src/observable/ErrorObservable.ts @@ -72,8 +72,6 @@ export class ErrorObservable extends Observable { const error = this.error; const scheduler = this.scheduler; - subscriber.syncErrorThrowable = true; - if (scheduler) { return scheduler.schedule(ErrorObservable.dispatch, 0, { error, subscriber diff --git a/src/operators/tap.ts b/src/operators/tap.ts index a88c7d2018..b9e43128c4 100644 --- a/src/operators/tap.ts +++ b/src/operators/tap.ts @@ -4,6 +4,8 @@ import { Observable } from '../Observable'; import { PartialObserver } from '../Observer'; import { TeardownLogic } from '../Subscription'; import { MonoTypeOperatorFunction } from '../interfaces'; +import { noop } from '../util/noop'; +import { isFunction } from '../util/isFunction'; /* tslint:disable:max-line-length */ export function tap(next: (x: T) => void, error?: (e: any) => void, complete?: () => void): MonoTypeOperatorFunction; @@ -65,7 +67,7 @@ class DoOperator implements Operator { private complete?: () => void) { } call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new DoSubscriber(subscriber, this.nextOrObserver, this.error, this.complete)); + return source.subscribe(new TapSubscriber(subscriber, this.nextOrObserver, this.error, this.complete)); } } @@ -74,49 +76,61 @@ class DoOperator implements Operator { * @ignore * @extends {Ignored} */ -class DoSubscriber extends Subscriber { - private safeSubscriber: Subscriber; +class TapSubscriber extends Subscriber { + private _context: any; + + private _tapNext: ((value: T) => void) = noop; + + private _tapError: ((err: any) => void) = noop; + + private _tapComplete: (() => void) = noop; constructor(destination: Subscriber, - nextOrObserver?: PartialObserver | ((x: T) => void), - error?: (e: any) => void, + observerOrNext?: PartialObserver | ((value: T) => void), + error?: (e?: any) => void, complete?: () => void) { - super(destination); - - const safeSubscriber = new Subscriber(nextOrObserver, error, complete); - safeSubscriber.syncErrorThrowable = true; - this.add(safeSubscriber); - this.safeSubscriber = safeSubscriber; - } + super(destination); + this._tapError = error || noop; + this._tapComplete = complete || noop; + if (isFunction(observerOrNext)) { + this._context = this; + this._tapNext = observerOrNext; + } else if (observerOrNext) { + this._context = observerOrNext; + this._tapNext = observerOrNext.next || noop; + this._tapError = observerOrNext.error || noop; + this._tapComplete = observerOrNext.complete || noop; + } + } - protected _next(value: T): void { - const { safeSubscriber } = this; - safeSubscriber.next(value); - if (safeSubscriber.syncErrorThrown) { - this.destination.error(safeSubscriber.syncErrorValue); - } else { - this.destination.next(value); + _next(value: T) { + try { + this._tapNext.call(this._context, value); + } catch (err) { + this.destination.error(err); + return; } + this.destination.next(value); } - protected _error(err: any): void { - const { safeSubscriber } = this; - safeSubscriber.error(err); - if (safeSubscriber.syncErrorThrown) { - this.destination.error(safeSubscriber.syncErrorValue); - } else { + _error(err: any) { + try { + this._tapError.call(this._context, err); + } catch (err) { this.destination.error(err); + return; } + this.destination.error(err); } - protected _complete(): void { - const { safeSubscriber } = this; - safeSubscriber.complete(); - if (safeSubscriber.syncErrorThrown) { - this.destination.error(safeSubscriber.syncErrorValue); - } else { - this.destination.complete(); + _complete() { + try { + this._tapComplete.call(this._context, ); + } catch (err) { + this.destination.error(err); + return; } + return this.destination.complete(); } } diff --git a/src/util/subscribeToResult.ts b/src/util/subscribeToResult.ts index 944921e17f..ae19fe5795 100644 --- a/src/util/subscribeToResult.ts +++ b/src/util/subscribeToResult.ts @@ -30,7 +30,6 @@ export function subscribeToResult(outerSubscriber: OuterSubscriber, destination.complete(); return null; } else { - destination.syncErrorThrowable = true; return result.subscribe(destination); } } else if (isArrayLike(result)) {