diff --git a/spec/operators/throttle-spec.ts b/spec/operators/throttle-spec.ts index 06e1d9bc13..52fcb49273 100644 --- a/spec/operators/throttle-spec.ts +++ b/spec/operators/throttle-spec.ts @@ -7,6 +7,8 @@ declare const hot: typeof marbleTestingSignature.hot; declare const cold: typeof marbleTestingSignature.cold; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; +declare const time: typeof marbleTestingSignature.time; +declare const rxTestScheduler: typeof marbleTestingSignature.rxTestScheduler; const Observable = Rx.Observable; @@ -212,28 +214,24 @@ describe('Observable.prototype.throttle', () => { }); it('should propagate error thrown from durationSelector function', () => { - const e1 = hot('abcdefabcdabcdefghabca| '); - const e1subs = '^ ! '; - const e2 = [cold('-----| '), - cold( '---| '), - cold( '-------| ')]; - const e2subs = ['^ ! ', - ' ^ ! ']; - const expected = 'a-----a---# '; + const s1 = hot('--^--x--x--x--x--x--x--e--x--x--x--|'); + const s1Subs = '^ !'; + const n1 = cold( '----|'); + const n1Subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ']; + const exp = '---x-----x-----x-----(e#)'; let i = 0; - const result = e1.throttle(() => { - if (i === 2) { - throw 'error'; + const result = s1.throttle(() => { + if (i++ === 3) { + throw new Error('lol'); } - return e2[i++]; + return n1; }); - - expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - for (let j = 0; j < e2subs.length; j++) { - expectSubscriptions(e2[j].subscriptions).toBe(e2subs[j]); - } + expectObservable(result).toBe(exp, undefined, new Error('lol')); + expectSubscriptions(s1.subscriptions).toBe(s1Subs); + expectSubscriptions(n1.subscriptions).toBe(n1Subs); }); it('should complete when source does not emit', () => { @@ -340,13 +338,16 @@ describe('Observable.prototype.throttle', () => { describe('throttle(fn, { leading: true, trailing: true })', () => { asDiagram('throttle(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => { - const e1 = hot('-a-xy-----b--x--cxxx--|'); - const e1subs = '^ !'; - const e2 = cold( '----| '); - const e2subs = [' ^ ! ', - ' ^ ! ', - ' ^ ! ']; - const expected = '-a---y----b---x-c---x-|'; + const e1 = hot('-a-xy-----b--x--cxxx------|'); + const e1subs = '^ !'; + const e2 = cold( '----| '); + const e2subs = [' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ ! ', + ' ^ !']; + const expected = '-a---y----b---x---x---x---|'; const result = e1.throttle(() => e2, { leading: true, trailing: true }); @@ -354,6 +355,19 @@ describe('Observable.prototype.throttle', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should work for individual values', () => { + const s1 = hot('-^-x------------------|'); + const s1Subs = '^ !'; + const n1 = cold( '------------------------|'); + const n1Subs = [' ^ !']; + const exp = '--x------------------|'; + + const result = s1.throttle(() => n1, { leading: true, trailing: true }); + expectObservable(result).toBe(exp); + expectSubscriptions(s1.subscriptions).toBe(s1Subs); + expectSubscriptions(n1.subscriptions).toBe(n1Subs); + }); }); describe('throttle(fn, { leading: false, trailing: true })', () => { @@ -362,9 +376,11 @@ describe('Observable.prototype.throttle', () => { const e1subs = '^ !'; const e2 = cold( '----| '); const e2subs = [' ^ ! ', + ' ^ ! ', ' ^ ! ', - ' ^ ! ']; - const expected = '-----y--------x-----x-|'; + ' ^ ! ', + ' ^ !']; + const expected = '-----y--------x---x---|'; const result = e1.throttle(() => e2, { leading: false, trailing: true }); @@ -372,5 +388,14 @@ describe('Observable.prototype.throttle', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should work for individual values', () => { + const s1 = cold('---------x---------x--------x------|'); + const n = cold('----(n|)'); + const exp = '-------------x---------x--------x--|'; + + expectObservable(s1.throttle(() => n, { trailing: true })) + .toBe(exp); + }); }); }); diff --git a/src/operators/throttle.ts b/src/operators/throttle.ts index f8085bd38a..fd93716a6d 100644 --- a/src/operators/throttle.ts +++ b/src/operators/throttle.ts @@ -83,9 +83,9 @@ class ThrottleOperator implements Operator { * @extends {Ignored} */ class ThrottleSubscriber extends OuterSubscriber { - private throttled: Subscription; - private _trailingValue: T; - private _hasTrailingValue = false; + private _throttled: Subscription; + private _sendValue: T; + private _hasValue = false; constructor(protected destination: Subscriber, private durationSelector: (value: T) => SubscribableOrPromise, @@ -95,26 +95,35 @@ class ThrottleSubscriber extends OuterSubscriber { } protected _next(value: T): void { - if (this.throttled) { - if (this._trailing) { - this._hasTrailingValue = true; - this._trailingValue = value; - } - } else { - const duration = this.tryDurationSelector(value); - if (duration) { - this.add(this.throttled = subscribeToResult(this, duration)); - } + this._hasValue = true; + this._sendValue = value; + + if (!this._throttled) { if (this._leading) { - this.destination.next(value); - if (this._trailing) { - this._hasTrailingValue = true; - this._trailingValue = value; - } + this.send(); + } else { + this.throttle(value); } } } + private send() { + const { _hasValue, _sendValue } = this; + if (_hasValue) { + this.destination.next(_sendValue); + this.throttle(_sendValue); + } + this._hasValue = false; + this._sendValue = null; + } + + private throttle(value: T): void { + const duration = this.tryDurationSelector(value); + if (duration) { + this.add(this._throttled = subscribeToResult(this, duration)); + } + } + private tryDurationSelector(value: T): SubscribableOrPromise { try { return this.durationSelector(value); @@ -124,37 +133,25 @@ class ThrottleSubscriber extends OuterSubscriber { } } - protected _unsubscribe() { - const { throttled, _trailingValue, _hasTrailingValue, _trailing } = this; - - this._trailingValue = null; - this._hasTrailingValue = false; - - if (throttled) { - this.remove(throttled); - this.throttled = null; - throttled.unsubscribe(); + private throttlingDone() { + const { _throttled, _trailing } = this; + if (_throttled) { + _throttled.unsubscribe(); } - } + this._throttled = null; - private _sendTrailing() { - const { destination, throttled, _trailing, _trailingValue, _hasTrailingValue } = this; - if (throttled && _trailing && _hasTrailingValue) { - destination.next(_trailingValue); - this._trailingValue = null; - this._hasTrailingValue = false; + if (_trailing) { + this.send(); } } notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number, innerSub: InnerSubscriber): void { - this._sendTrailing(); - this._unsubscribe(); + this.throttlingDone(); } notifyComplete(): void { - this._sendTrailing(); - this._unsubscribe(); + this.throttlingDone(); } }