diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 9d3d7590fb..88f3be9cfd 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -12,7 +12,6 @@ declare const cold: typeof marbleTestingSignature.cold; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; -const Subscriber = Rx.Subscriber; const Observable = Rx.Observable; declare const __root__: any; @@ -49,18 +48,6 @@ describe('Observable', () => { }); }); - it('should not send error to error handler for observable have source', () => { - const source = Observable.of(1); - const observable = new Observable(); - (observable as any).source = source; - - expect(() => { - observable.subscribe((x) => { - throw new Error('error'); - }); - }).to.throw(); - }); - describe('forEach', () => { it('should iterate and return a Promise', (done: MochaDone) => { const expected = [1, 2, 3]; @@ -123,66 +110,62 @@ describe('Observable', () => { }); }); - it('should handle a synchronous throw from the next handler and tear down', (done: MochaDone) => { - const expected = new Error('I told, you Bobby Boucher, twos are the debil!'); - let unsubscribeCalled = false; + it('should handle a synchronous throw from the next handler', () => { + const expected = new Error('I told, you Bobby Boucher, threes are the debil!'); const syncObservable = new Observable((observer: Rx.Observer) => { observer.next(1); observer.next(2); observer.next(3); - - return () => { - unsubscribeCalled = true; - }; + observer.next(4); }); - const results = []; - syncObservable.forEach((x) => { + const results: Array = []; + + return syncObservable.forEach((x) => { results.push(x); - if (x === 2) { + if (x === 3) { throw expected; } }).then( () => { - done(new Error('should not be called')); + throw new Error('should not be called'); }, (err) => { results.push(err); - expect(results).to.deep.equal([1, 2, expected]); - expect(unsubscribeCalled).to.be.true; - done(); - }); + // Since the consuming code can no longer interfere with the synchronous + // producer, the remaining results are nexted. + expect(results).to.deep.equal([1, 2, 3, 4, expected]); + } + ); }); - it('should handle an asynchronous throw from the next handler and tear down', (done: MochaDone) => { + it('should handle an asynchronous throw from the next handler and tear down', () => { const expected = new Error('I told, you Bobby Boucher, twos are the debil!'); - let unsubscribeCalled = false; - const syncObservable = new Observable((observer: Rx.Observer) => { + const asyncObservable = new Observable((observer: Rx.Observer) => { let i = 1; const id = setInterval(() => observer.next(i++), 1); return () => { clearInterval(id); - unsubscribeCalled = true; }; }); - const results = []; - syncObservable.forEach((x) => { + const results: Array = []; + + return asyncObservable.forEach((x) => { results.push(x); if (x === 2) { throw expected; } }).then( () => { - done(new Error('should not be called')); + throw new Error('should not be called'); }, (err) => { results.push(err); expect(results).to.deep.equal([1, 2, expected]); - expect(unsubscribeCalled).to.be.true; - done(); - }); + } + ); }); }); @@ -263,36 +246,14 @@ describe('Observable', () => { expect(unsubscribeCalled).to.be.false; }); - it('should run unsubscription logic when an error is sent synchronously and subscribe is called with no arguments', () => { - let unsubscribeCalled = false; - const source = new Observable((subscriber: Rx.Subscriber) => { - subscriber.error(0); - return () => { - unsubscribeCalled = true; - }; - }); - - try { - source.subscribe(); - } catch (e) { - // error'ing to an empty Observer re-throws, so catch and ignore it here. - } - - expect(unsubscribeCalled).to.be.true; - }); - it('should run unsubscription logic when an error is sent asynchronously and subscribe is called with no arguments', (done: MochaDone) => { const sandbox = sinon.sandbox.create(); const fakeTimer = sandbox.useFakeTimers(); let unsubscribeCalled = false; - const source = new Observable((subscriber: Rx.Subscriber) => { + const source = new Observable(observer => { const id = setInterval(() => { - try { - subscriber.error(0); - } catch (e) { - // asynchronously error'ing to an empty Observer re-throws, so catch and ignore it here. - } + observer.error(0); }, 1); return () => { clearInterval(id); @@ -300,7 +261,11 @@ describe('Observable', () => { }; }); - source.subscribe(); + source.subscribe({ + error (err) { + /* noop: expected error */ + } + }); setTimeout(() => { let err; @@ -343,128 +308,93 @@ describe('Observable', () => { expect(unsubscribeCalled).to.be.true; }); - it('should run unsubscription logic when an error is thrown sending messages synchronously', () => { - let messageError = false; - let messageErrorValue = false; - let unsubscribeCalled = false; - - let sub; - const source = new Observable((observer: Rx.Observer) => { - observer.next('boo!'); - return () => { - unsubscribeCalled = true; - }; - }); - - try { - sub = source.subscribe((x: string) => { throw x; }); - } catch (e) { - messageError = true; - messageErrorValue = e; - } - - expect(sub).to.be.a('undefined'); - expect(unsubscribeCalled).to.be.true; - expect(messageError).to.be.true; - expect(messageErrorValue).to.equal('boo!'); - }); + it('should ignore next messages after unsubscription', (done) => { + let times = 0; - it('should dispose of the subscriber when an error is thrown sending messages synchronously', () => { - let messageError = false; - let messageErrorValue = false; - let unsubscribeCalled = false; + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + }); - let sub; - const subscriber = new Subscriber((x: string) => { throw x; }); - const source = new Observable((observer: Rx.Observer) => { - observer.next('boo!'); return () => { - unsubscribeCalled = true; + clearInterval(id); + expect(times).to.equal(2); + done(); }; - }); - - try { - sub = source.subscribe(subscriber); - } catch (e) { - messageError = true; - messageErrorValue = e; - } - - expect(sub).to.be.a('undefined'); - expect(subscriber.closed).to.be.true; - expect(unsubscribeCalled).to.be.true; - expect(messageError).to.be.true; - expect(messageErrorValue).to.equal('boo!'); - }); - - it('should ignore next messages after unsubscription', () => { - let times = 0; - - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.next(0); }) .do(() => times += 1) .subscribe( function() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } } ); - expect(times).to.equal(2); }); - it('should ignore error messages after unsubscription', () => { + it('should ignore error messages after unsubscription', (done) => { let times = 0; let errorCalled = false; - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.error(0); + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + if (i === 3) { + observer.error(new Error()); + } + }); + + return () => { + clearInterval(id); + expect(times).to.equal(2); + expect(errorCalled).to.be.false; + done(); + }; }) .do(() => times += 1) .subscribe( function() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } }, function() { errorCalled = true; } ); - - expect(times).to.equal(2); - expect(errorCalled).to.be.false; }); - it('should ignore complete messages after unsubscription', () => { + it('should ignore complete messages after unsubscription', (done) => { let times = 0; let completeCalled = false; - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.complete(); + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + if (i === 3) { + observer.complete(); + } + }); + + return () => { + clearInterval(id); + expect(times).to.equal(2); + expect(completeCalled).to.be.false; + done(); + }; }) .do(() => times += 1) .subscribe( function() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } }, null, function() { completeCalled = true; } ); - - expect(times).to.equal(2); - expect(completeCalled).to.be.false; }); describe('when called with an anonymous observer', () => { @@ -518,109 +448,91 @@ describe('Observable', () => { }).not.to.throw(); }); - it('should run unsubscription logic when an error is thrown sending messages synchronously to an' + - ' anonymous observer', () => { - let messageError = false; - let messageErrorValue = false; - let unsubscribeCalled = false; + it('should ignore next messages after unsubscription', (done) => { + let times = 0; - //intentionally not using lambda to avoid typescript's this context capture - const o = { - myValue: 'foo', - next: function next(x) { - expect(this.myValue).to.equal('foo'); - throw x; - } - }; + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + }); - let sub; - const source = new Observable((observer: Rx.Observer) => { - observer.next('boo!'); return () => { - unsubscribeCalled = true; + clearInterval(id); + expect(times).to.equal(2); + done(); }; - }); - - try { - sub = source.subscribe(o); - } catch (e) { - messageError = true; - messageErrorValue = e; - } - - expect(sub).to.be.a('undefined'); - expect(unsubscribeCalled).to.be.true; - expect(messageError).to.be.true; - expect(messageErrorValue).to.equal('boo!'); - }); - - it('should ignore next messages after unsubscription', () => { - let times = 0; - - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.next(0); }) .do(() => times += 1) .subscribe({ next() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } } }); - - expect(times).to.equal(2); }); - it('should ignore error messages after unsubscription', () => { + it('should ignore error messages after unsubscription', (done) => { let times = 0; let errorCalled = false; - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.error(0); + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + if (i === 3) { + observer.error(new Error()); + } + }); + return () => { + clearInterval(id); + expect(times).to.equal(2); + expect(errorCalled).to.be.false; + done(); + }; }) .do(() => times += 1) .subscribe({ next() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } }, error() { errorCalled = true; } }); - - expect(times).to.equal(2); - expect(errorCalled).to.be.false; }); - it('should ignore complete messages after unsubscription', () => { + it('should ignore complete messages after unsubscription', (done) => { let times = 0; let completeCalled = false; - new Observable((observer: Rx.Observer) => { - observer.next(0); - observer.next(0); - observer.next(0); - observer.complete(); + const subscription = new Observable((observer: Rx.Observer) => { + let i = 0; + const id = setInterval(() => { + observer.next(i++); + if (i === 3) { + observer.complete(); + } + }); + + return () => { + clearInterval(id); + expect(times).to.equal(2); + expect(completeCalled).to.be.false; + done(); + }; }) .do(() => times += 1) .subscribe({ next() { if (times === 2) { - this.unsubscribe(); + subscription.unsubscribe(); } }, complete() { completeCalled = true; } }); - expect(times).to.equal(2); - expect(completeCalled).to.be.false; }); }); }); diff --git a/spec/Subscriber-spec.ts b/spec/Subscriber-spec.ts index cd2d13e18c..46de3d52b6 100644 --- a/spec/Subscriber-spec.ts +++ b/spec/Subscriber-spec.ts @@ -1,36 +1,10 @@ import { expect } from 'chai'; -import * as sinon from 'sinon'; import * as Rx from '../src/Rx'; const Subscriber = Rx.Subscriber; /** @test {Subscriber} */ describe('Subscriber', () => { - describe('when created through create()', () => { - it('should not call error() if next() handler throws an error', () => { - const errorSpy = sinon.spy(); - const completeSpy = sinon.spy(); - - const subscriber = Subscriber.create( - (value: any) => { - if (value === 2) { - throw 'error!'; - } - }, - errorSpy, - completeSpy - ); - - subscriber.next(1); - expect(() => { - subscriber.next(2); - }).to.throw('error!'); - - expect(errorSpy).not.have.been.called; - expect(completeSpy).not.have.been.called; - }); - }); - it('should ignore next messages after unsubscription', () => { let times = 0; diff --git a/spec/observables/dom/ajax-spec.ts b/spec/observables/dom/ajax-spec.ts index 19c3e271d7..8ef53dc638 100644 --- a/spec/observables/dom/ajax-spec.ts +++ b/spec/observables/dom/ajax-spec.ts @@ -57,7 +57,7 @@ describe('Observable.ajax', () => { expect(axObjectStub).to.have.been.called; }); - it('should throw if not able to create XMLHttpRequest', () => { + it('should raise an error if not able to create XMLHttpRequest', () => { root.XMLHttpRequest = null; root.ActiveXObject = null; @@ -66,9 +66,7 @@ describe('Observable.ajax', () => { method: '' }; - expect(() => { - Rx.Observable.ajax(obj).subscribe(); - }).to.throw(); + Rx.Observable.ajax(obj).subscribe(null, err => expect(err).to.exist); }); it('should create XMLHttpRequest for CORS', () => { @@ -100,7 +98,7 @@ describe('Observable.ajax', () => { expect(xDomainStub).to.have.been.called; }); - it('should throw if not able to create CORS request', () => { + it('should raise an error if not able to create CORS request', () => { root.XMLHttpRequest = null; root.XDomainRequest = null; @@ -111,9 +109,7 @@ describe('Observable.ajax', () => { withCredentials: true }; - expect(() => { - Rx.Observable.ajax(obj).subscribe(); - }).to.throw(); + Rx.Observable.ajax(obj).subscribe(null, err => expect(err).to.exist); }); it('should set headers', () => { @@ -805,7 +801,7 @@ describe('Observable.ajax', () => { it('should work fine when XMLHttpRequest ontimeout property is monkey patched', function() { Object.defineProperty(root.XMLHttpRequest.prototype, 'ontimeout', { - set: function (fn: (e: ProgressEvent) => any) { + set(fn: (e: ProgressEvent) => any) { const wrapFn = (ev: ProgressEvent) => { const result = fn.call(this, ev); if (result === false) { @@ -825,7 +821,11 @@ describe('Observable.ajax', () => { }; Rx.Observable.ajax(ajaxRequest) - .subscribe(); + .subscribe({ + error(err) { + /* expected, ignore */ + } + }); const request = MockXMLHttpRequest.mostRecent; try { @@ -881,7 +881,7 @@ describe('Observable.ajax', () => { it('should work fine when XMLHttpRequest onerror property is monkey patched', function() { Object.defineProperty(root.XMLHttpRequest.prototype, 'onerror', { - set: function (fn: (e: ProgressEvent) => any) { + set(fn: (e: ProgressEvent) => any) { const wrapFn = (ev: ProgressEvent) => { const result = fn.call(this, ev); if (result === false) { @@ -899,7 +899,11 @@ describe('Observable.ajax', () => { Rx.Observable.ajax({ url: '/flibbertyJibbet' }) - .subscribe(); + .subscribe({ + error(err) { + /* expected */ + } + }); const request = MockXMLHttpRequest.mostRecent; diff --git a/spec/observables/from-promise-spec.ts b/spec/observables/from-promise-spec.ts index 1bb3f1369b..63f33164a5 100644 --- a/spec/observables/from-promise-spec.ts +++ b/spec/observables/from-promise-spec.ts @@ -155,56 +155,4 @@ describe('Observable.fromPromise', () => { done(); }); }); - - if (typeof process === 'object' && Object.prototype.toString.call(process) === '[object process]') { - it('should globally throw unhandled errors on process', (done: MochaDone) => { - const originalException = process.listeners('uncaughtException'); - process.removeAllListeners('uncaughtException'); - process.once('uncaughtException', function (error) { - expect(error).to.be.an('error', 'fail'); - originalException.forEach(l => process.addListener('uncaughtException', l)); - done(); - }); - - Observable.fromPromise(Promise.reject('bad')) - .subscribe( - (x: any) => { done(new Error('should not be called')); }, - (e: any) => { - expect(e).to.equal('bad'); - throw new Error('fail'); - }, () => { - done(new Error('should not be called')); - }); - }); - } else if (typeof window === 'object' && - (Object.prototype.toString.call(window) === '[object global]' || Object.prototype.toString.call(window) === '[object Window]')) { - it('should globally throw unhandled errors on window', (done: MochaDone) => { - const expected = ['Uncaught fail', 'fail', 'Script error.', 'uncaught exception: fail']; - const current = window.onerror; - window.onerror = null; - - let invoked = false; - function onException(e) { - if (invoked) { - return; - } - invoked = true; - expect(expected).to.contain(e); - window.onerror = current; - done(); - } - - window.onerror = onException; - - Observable.fromPromise(Promise.reject('bad')) - .subscribe( - (x: any) => { done(new Error('should not be called')); }, - (e: any) => { - expect(e).to.equal('bad'); - throw 'fail'; - }, () => { - done(new Error('should not be called')); - }); - }); - } -}); \ No newline at end of file +}); diff --git a/spec/observables/fromEvent-spec.ts b/spec/observables/fromEvent-spec.ts index d6c0cfe0cb..00a297a07c 100644 --- a/spec/observables/fromEvent-spec.ts +++ b/spec/observables/fromEvent-spec.ts @@ -125,8 +125,9 @@ describe('Observable.fromEvent', () => { } }; - const subscribe = () => Observable.fromEvent(obj, 'click').subscribe(); - expect(subscribe).to.throw(TypeError, 'Invalid event target'); + Observable.fromEvent(obj as any, 'click').subscribe({ + error(err) { expect(err).to.deep.equal(new TypeError('Invalid event target')); } + }); }); it('should pass through options to addEventListener', () => { diff --git a/spec/operators/catch-spec.ts b/spec/operators/catch-spec.ts index 22996203b8..13477fcfce 100644 --- a/spec/operators/catch-spec.ts +++ b/spec/operators/catch-spec.ts @@ -297,44 +297,6 @@ describe('Observable.prototype.catch', () => { sandbox.restore(); }); - it('should chain a throw from a promise using throw', (done: MochaDone) => { - const subscribeSpy = sinon.spy(); - const testError = new Error('BROKEN PROMISE'); - Observable.fromPromise(Promise.reject(testError)).catch(err => { - throw new Error('BROKEN THROW'); - }).subscribe(subscribeSpy); - - trueSetTimeout(() => { - try { - timers.tick(1); - } catch (e) { - expect(subscribeSpy).not.to.be.called; - expect(e.message).to.equal('BROKEN THROW'); - return done(); - } - done(new Error('This should have thrown an error')); - }, 0); - }); - - it('should chain a throw from a promise using Observable.throw', (done: MochaDone) => { - const subscribeSpy = sinon.spy(); - const testError = new Error('BROKEN PROMISE'); - Observable.fromPromise(Promise.reject(testError)).catch(err => - Observable.throw(new Error('BROKEN THROW')) - ).subscribe(subscribeSpy); - - trueSetTimeout(() => { - try { - timers.tick(1); - } catch (e) { - expect(subscribeSpy).not.to.be.called; - expect(e.message).to.equal('BROKEN THROW'); - return done(); - } - done(new Error('This should have thrown an error')); - }, 0); - }); - it('should chain a throw from a promise using Observable.throw', (done: MochaDone) => { const subscribeSpy = sinon.spy(); const errorSpy = sinon.spy(); diff --git a/spec/operators/do-spec.ts b/spec/operators/do-spec.ts index 2c22d7f488..397686da83 100644 --- a/spec/operators/do-spec.ts +++ b/spec/operators/do-spec.ts @@ -13,7 +13,8 @@ const Subject = Rx.Subject; /** @test {do} */ describe('Observable.prototype.do', () => { - asDiagram('do(x => console.log(x))')('should mirror multiple values and complete', () => { + asDiagram('do(x => console.log(x))') + ('should mirror multiple values and complete', () => { const e1 = cold('--1--2--3--|'); const e1subs = '^ !'; const expected = '--1--2--3--|'; @@ -68,8 +69,8 @@ describe('Observable.prototype.do', () => { it('should handle everything with a Subject', (done: MochaDone) => { const expected = [1, 2, 3]; - const results = []; - const subject = new Subject(); + const results: number[] = []; + const subject = new Subject(); subject.subscribe({ next: (x: any) => { @@ -85,7 +86,7 @@ describe('Observable.prototype.do', () => { }); Observable.of(1, 2, 3) - .do(>subject) + .do(subject) .subscribe(); }); diff --git a/spec/operators/map-spec.ts b/spec/operators/map-spec.ts index 5b23d44735..e296e485e4 100644 --- a/spec/operators/map-spec.ts +++ b/spec/operators/map-spec.ts @@ -13,7 +13,6 @@ const Observable = Rx.Observable; // function shortcuts const addDrama = function (x) { return x + '!'; }; const identity = function (x) { return x; }; -const throwError = function () { throw new Error(); }; /** @test {map} */ describe('Observable.prototype.map', () => { @@ -89,16 +88,6 @@ describe('Observable.prototype.map', () => { expectSubscriptions(a.subscriptions).toBe(asubs); }); - it('should propagate errors from subscribe', () => { - const r = () => { - Observable.of(1) - .map(identity) - .subscribe(throwError); - }; - - expect(r).to.throw(); - }); - it('should not map an empty observable', () => { const a = cold('|'); const asubs = '(^!)'; @@ -187,19 +176,14 @@ describe('Observable.prototype.map', () => { const expected = '--a--b---c----d--|'; const values = {a: 5, b: 14, c: 23, d: 32}; - let invoked = 0; const foo = { value: 42 }; const r = a .map(function (x: string, index: number) { - invoked++; expect(this).to.equal(foo); return (parseInt(x) + 1) + (index * 10); - }, foo) - .do(null, null, () => { - expect(invoked).to.equal(4); - }); + }, foo); expectObservable(r).toBe(expected, values); expectSubscriptions(a.subscriptions).toBe(asubs); diff --git a/spec/operators/mapTo-spec.ts b/spec/operators/mapTo-spec.ts index b270d3b1db..39a0e3bdec 100644 --- a/spec/operators/mapTo-spec.ts +++ b/spec/operators/mapTo-spec.ts @@ -1,4 +1,4 @@ -import { expect } from 'chai'; + import * as Rx from '../../src/Rx'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports @@ -10,9 +10,6 @@ declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscript const Observable = Rx.Observable; -// function shortcuts -const throwError = function () { throw new Error(); }; - /** @test {mapTo} */ describe('Observable.prototype.mapTo', () => { asDiagram('mapTo(\'a\')')('should map multiple values', () => { @@ -61,16 +58,6 @@ describe('Observable.prototype.mapTo', () => { expectSubscriptions(a.subscriptions).toBe(asubs); }); - it('should propagate errors from subscribe', () => { - const r = () => { - Observable.of(1) - .mapTo(-1) - .subscribe(throwError); - }; - - expect(r).to.throw(); - }); - it('should not map an empty observable', () => { const a = cold('|'); const asubs = '(^!)'; diff --git a/spec/util/subscribeToResult-spec.ts b/spec/util/subscribeToResult-spec.ts index d7810c587c..c82ec2f4d5 100644 --- a/spec/util/subscribeToResult-spec.ts +++ b/spec/util/subscribeToResult-spec.ts @@ -4,8 +4,6 @@ import { subscribeToResult } from '../../src/util/subscribeToResult'; import { OuterSubscriber } from '../../src/OuterSubscriber'; import { $$iterator } from '../../src/symbol/iterator'; import $$symbolObservable from 'symbol-observable'; -import { Observable } from '../../src/Observable'; -import { Subject } from '../../src/Subject'; describe('subscribeToResult', () => { it('should synchronously complete when subscribe to scalarObservable', () => { @@ -180,14 +178,4 @@ describe('subscribeToResult', () => { subscribeToResult(subscriber, null); }); - - it('should not swallow exception in inner subscriber', () => { - const source = new Subject(); - - source.mergeMapTo(Observable.of(1, 2, 3)).subscribe(() => { - throw new Error('meh'); - }); - - expect(() => source.next()).to.throw(); - }); -}); \ No newline at end of file +}); diff --git a/src/Observable.ts b/src/Observable.ts index d09810c843..adb3d6e5f2 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -203,13 +203,6 @@ export class Observable implements Subscribable { sink.add(this.source ? this._subscribe(sink) : this._trySubscribe(sink)); } - if (sink.syncErrorThrowable) { - sink.syncErrorThrowable = false; - if (sink.syncErrorThrown) { - throw sink.syncErrorValue; - } - } - return sink; } @@ -217,8 +210,6 @@ export class Observable implements Subscribable { try { return this._subscribe(sink); } catch (err) { - sink.syncErrorThrown = true; - sink.syncErrorValue = err; sink.error(err); } } @@ -248,25 +239,13 @@ export class Observable implements Subscribable { // accessing subscription below in the closure due to Temporal Dead Zone. let subscription: Subscription; subscription = this.subscribe((value) => { - if (subscription) { - // if there is a subscription, then we can surmise - // the next handling is asynchronous. Any errors thrown - // need to be rejected explicitly and unsubscribe must be - // called manually - try { - next(value); - } catch (err) { - reject(err); + try { + next(value); + } catch (err) { + reject(err); + if (subscription) { subscription.unsubscribe(); } - } else { - // if there is NO subscription, then we're getting a nexted - // value synchronously during subscription. We can just call it. - // If it errors, Observable's `subscribe` will ensure the - // unsubscription logic is called, then synchronously rethrow the error. - // After that, Promise will trap the error and send it - // down the rejection path. - next(value); } }, reject, resolve); }); diff --git a/src/Subscriber.ts b/src/Subscriber.ts index 36d8459191..9d1ed87f57 100644 --- a/src/Subscriber.ts +++ b/src/Subscriber.ts @@ -33,14 +33,9 @@ export class Subscriber extends Subscription implements Observer { error?: (e?: any) => void, complete?: () => void): Subscriber { const subscriber = new Subscriber(next, error, complete); - subscriber.syncErrorThrowable = false; return subscriber; } - public syncErrorValue: any = null; - public syncErrorThrown: boolean = false; - public syncErrorThrowable: boolean = false; - protected isStopped: boolean = false; protected destination: PartialObserver; // this `any` is the escape hatch to erase extra type param (e.g. R) @@ -71,14 +66,12 @@ export class Subscriber extends Subscription implements Observer { this.destination = (> destinationOrNext); ( this.destination).add(this); } else { - this.syncErrorThrowable = true; - this.destination = new SafeSubscriber(this, > destinationOrNext); + this.destination = new SafeSubscriber(> destinationOrNext); } break; } default: - this.syncErrorThrowable = true; - this.destination = new SafeSubscriber(this, <((value: T) => void)> destinationOrNext, error, complete); + this.destination = new SafeSubscriber(<((value: T) => void)> destinationOrNext, error, complete); break; } } @@ -167,8 +160,7 @@ class SafeSubscriber extends Subscriber { private _context: any; - constructor(private _parentSubscriber: Subscriber, - observerOrNext?: PartialObserver | ((value: T) => void), + constructor(observerOrNext?: PartialObserver | ((value: T) => void), error?: (e?: any) => void, complete?: () => void) { super(); @@ -199,10 +191,10 @@ class SafeSubscriber extends Subscriber { next(value?: T): void { if (!this.isStopped && this._next) { - const { _parentSubscriber } = this; - if (!_parentSubscriber.syncErrorThrowable) { - this.__tryOrUnsub(this._next, value); - } else if (this.__tryOrSetError(_parentSubscriber, this._next, value)) { + try { + this._next.call(this._context, value); + } catch (err) { + this._hostReportError(err); this.unsubscribe(); } } @@ -210,69 +202,37 @@ class SafeSubscriber extends Subscriber { error(err?: any): void { if (!this.isStopped) { - const { _parentSubscriber } = this; if (this._error) { - if (!_parentSubscriber.syncErrorThrowable) { - this.__tryOrUnsub(this._error, err); - this.unsubscribe(); - } else { - this.__tryOrSetError(_parentSubscriber, this._error, err); - this.unsubscribe(); + try { + this._error.call(this._context, err); + } catch (err) { + this._hostReportError(err); } - } else if (!_parentSubscriber.syncErrorThrowable) { - this.unsubscribe(); - throw err; } else { - _parentSubscriber.syncErrorValue = err; - _parentSubscriber.syncErrorThrown = true; - this.unsubscribe(); + this._hostReportError(err); } + this.unsubscribe(); } } complete(): void { if (!this.isStopped) { - const { _parentSubscriber } = this; if (this._complete) { - const wrappedComplete = () => this._complete.call(this._context); - - if (!_parentSubscriber.syncErrorThrowable) { - this.__tryOrUnsub(wrappedComplete); - this.unsubscribe(); - } else { - this.__tryOrSetError(_parentSubscriber, wrappedComplete); - this.unsubscribe(); + try { + this._complete.call(this._context); + } catch (err) { + this._hostReportError(err); } - } else { - this.unsubscribe(); } - } - } - - private __tryOrUnsub(fn: Function, value?: any): void { - try { - fn.call(this._context, value); - } catch (err) { this.unsubscribe(); - throw err; } } - private __tryOrSetError(parent: Subscriber, fn: Function, value?: any): boolean { - try { - fn.call(this._context, value); - } catch (err) { - parent.syncErrorValue = err; - parent.syncErrorThrown = true; - return true; - } - return false; - } - protected _unsubscribe(): void { - const { _parentSubscriber } = this; this._context = null; - this._parentSubscriber = null; - _parentSubscriber.unsubscribe(); + } + + private _hostReportError(err: any) { + setTimeout(() => { throw err; }); } } diff --git a/src/observable/ErrorObservable.ts b/src/observable/ErrorObservable.ts index e759bc813a..34eb9fb44d 100644 --- a/src/observable/ErrorObservable.ts +++ b/src/observable/ErrorObservable.ts @@ -72,8 +72,6 @@ export class ErrorObservable extends Observable { const error = this.error; const scheduler = this.scheduler; - subscriber.syncErrorThrowable = true; - if (scheduler) { return scheduler.schedule(ErrorObservable.dispatch, 0, { error, subscriber diff --git a/src/operators/tap.ts b/src/operators/tap.ts index a88c7d2018..b9e43128c4 100644 --- a/src/operators/tap.ts +++ b/src/operators/tap.ts @@ -4,6 +4,8 @@ import { Observable } from '../Observable'; import { PartialObserver } from '../Observer'; import { TeardownLogic } from '../Subscription'; import { MonoTypeOperatorFunction } from '../interfaces'; +import { noop } from '../util/noop'; +import { isFunction } from '../util/isFunction'; /* tslint:disable:max-line-length */ export function tap(next: (x: T) => void, error?: (e: any) => void, complete?: () => void): MonoTypeOperatorFunction; @@ -65,7 +67,7 @@ class DoOperator implements Operator { private complete?: () => void) { } call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new DoSubscriber(subscriber, this.nextOrObserver, this.error, this.complete)); + return source.subscribe(new TapSubscriber(subscriber, this.nextOrObserver, this.error, this.complete)); } } @@ -74,49 +76,61 @@ class DoOperator implements Operator { * @ignore * @extends {Ignored} */ -class DoSubscriber extends Subscriber { - private safeSubscriber: Subscriber; +class TapSubscriber extends Subscriber { + private _context: any; + + private _tapNext: ((value: T) => void) = noop; + + private _tapError: ((err: any) => void) = noop; + + private _tapComplete: (() => void) = noop; constructor(destination: Subscriber, - nextOrObserver?: PartialObserver | ((x: T) => void), - error?: (e: any) => void, + observerOrNext?: PartialObserver | ((value: T) => void), + error?: (e?: any) => void, complete?: () => void) { - super(destination); - - const safeSubscriber = new Subscriber(nextOrObserver, error, complete); - safeSubscriber.syncErrorThrowable = true; - this.add(safeSubscriber); - this.safeSubscriber = safeSubscriber; - } + super(destination); + this._tapError = error || noop; + this._tapComplete = complete || noop; + if (isFunction(observerOrNext)) { + this._context = this; + this._tapNext = observerOrNext; + } else if (observerOrNext) { + this._context = observerOrNext; + this._tapNext = observerOrNext.next || noop; + this._tapError = observerOrNext.error || noop; + this._tapComplete = observerOrNext.complete || noop; + } + } - protected _next(value: T): void { - const { safeSubscriber } = this; - safeSubscriber.next(value); - if (safeSubscriber.syncErrorThrown) { - this.destination.error(safeSubscriber.syncErrorValue); - } else { - this.destination.next(value); + _next(value: T) { + try { + this._tapNext.call(this._context, value); + } catch (err) { + this.destination.error(err); + return; } + this.destination.next(value); } - protected _error(err: any): void { - const { safeSubscriber } = this; - safeSubscriber.error(err); - if (safeSubscriber.syncErrorThrown) { - this.destination.error(safeSubscriber.syncErrorValue); - } else { + _error(err: any) { + try { + this._tapError.call(this._context, err); + } catch (err) { this.destination.error(err); + return; } + this.destination.error(err); } - protected _complete(): void { - const { safeSubscriber } = this; - safeSubscriber.complete(); - if (safeSubscriber.syncErrorThrown) { - this.destination.error(safeSubscriber.syncErrorValue); - } else { - this.destination.complete(); + _complete() { + try { + this._tapComplete.call(this._context, ); + } catch (err) { + this.destination.error(err); + return; } + return this.destination.complete(); } } diff --git a/src/util/subscribeToResult.ts b/src/util/subscribeToResult.ts index 944921e17f..ae19fe5795 100644 --- a/src/util/subscribeToResult.ts +++ b/src/util/subscribeToResult.ts @@ -30,7 +30,6 @@ export function subscribeToResult(outerSubscriber: OuterSubscriber, destination.complete(); return null; } else { - destination.syncErrorThrowable = true; return result.subscribe(destination); } } else if (isArrayLike(result)) {