diff --git a/spec/operators/throttleTime-spec.ts b/spec/operators/throttleTime-spec.ts index e668c9b9c7..a017237ad1 100644 --- a/spec/operators/throttleTime-spec.ts +++ b/spec/operators/throttleTime-spec.ts @@ -49,63 +49,110 @@ describe('Observable.prototype.throttleTime', () => { it('should simply mirror the source if values are not emitted often enough', () => { const e1 = hot('-a--------b-----c----|'); const subs = '^ !'; + const t = time('-----| '); const expected = '-a--------b-----c----|'; - expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); - it('should handle a busy producer emitting a regular repeating sequence', () => { - const e1 = hot('abcdefabcdefabcdefabcdefa|'); - const subs = '^ !'; - const expected = 'a-----a-----a-----a-----a|'; + it('should handle a busy producer emitting a regular repeating sequence with leading: true, trailing: false', () => { + const e1 = hot('a12345b12345c12345d123|'); + const subs = '^ !'; + const t = time('-----| '); + const expected = 'a-----b-----c-----d---|'; - expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: true, trailing: false})).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a busy producer emitting a regular repeating sequence with leading: true, trailing: true', () => { + const e1 = hot('a1234b1234c1234d12e--|'); + const subs = '^ !'; + const t = time('-----| '); + const expected = 'a----b----c----d----e|'; + + expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: true, trailing: true})).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should not drop values if values are far apart with leading: true, trailing: true', () => { + const e1 = hot('ab-----------c|'); + const subs = '^ !'; + const t = time('-----| '); + const expected = 'a----b-------c|'; + + expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: true, trailing: true})).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a busy producer emitting a regular repeating sequence with leading: false, trailing: true', () => { + const e1 = hot('12345a1234b1234c12d--|'); + const subs = '^ !'; + const t = time('-----| '); + const expected = '-----a----b----c----d|'; + + expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: false, trailing: true})).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should only emit trailing values with leading: false, trailing: true', () => { + const e1 = hot('ab-----------c--d--|'); + const subs = '^ !'; + const t = time('-----| '); + const expected = '-----b------------d|'; + + expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: false, trailing: true})).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should complete when source does not emit', () => { const e1 = hot('-----|'); const subs = '^ !'; + const t = time('-----|'); const expected = '-----|'; - expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should raise error when source does not emit and raises error', () => { const e1 = hot('-----#'); const subs = '^ !'; + const t = time('| '); const expected = '-----#'; - expectObservable(e1.throttleTime(10, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should handle an empty source', () => { const e1 = cold('|'); const subs = '(^!)'; + const t = time('---|'); const expected = '|'; - expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should handle a never source', () => { const e1 = cold('-'); const subs = '^'; + const t = time('---|'); const expected = '-'; - expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should handle a throw source', () => { const e1 = cold('#'); const subs = '(^!)'; + const t = time('---|'); const expected = '#'; - expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); @@ -113,9 +160,10 @@ describe('Observable.prototype.throttleTime', () => { const e1 = hot('-a--(bc)-------d----------------'); const unsub = ' !'; const subs = '^ !'; + const t = time('-----| '); const expected = '-a-------------d----------------'; - expectObservable(e1.throttleTime(50, rxTestScheduler), unsub).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler), unsub).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); @@ -123,11 +171,12 @@ describe('Observable.prototype.throttleTime', () => { const e1 = hot('-a--(bc)-------d----------------'); const subs = '^ !'; const expected = '-a-------------d----------------'; + const t = time('-----| '); const unsub = ' !'; const result = e1 .mergeMap((x: string) => Observable.of(x)) - .throttleTime(50, rxTestScheduler) + .throttleTime(t, rxTestScheduler) .mergeMap((x: string) => Observable.of(x)); expectObservable(result, unsub).toBe(expected); @@ -137,18 +186,19 @@ describe('Observable.prototype.throttleTime', () => { it('should throttle values until source raises error', () => { const e1 = hot('-a--(bc)-------d---------------#'); const subs = '^ !'; + const t = time('-----| '); const expected = '-a-------------d---------------#'; - expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); describe('throttleTime(fn, { leading: true, trailing: true })', () => { asDiagram('throttleTime(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => { - const e1 = hot('-a-xy-----b--x--cxxx--|'); - const e1subs = '^ !'; - const t = time( '----| '); - const expected = '-a---y----b---x-c---x-|'; + const e1 = hot('-a-xy-----b--x--cxxx---|'); + const e1subs = '^ !'; + const t = time( '----| '); + const expected = '-a---y----b---x---x---x|'; const result = e1.throttleTime(t, rxTestScheduler, { leading: true, trailing: true }); @@ -158,11 +208,11 @@ describe('Observable.prototype.throttleTime', () => { }); describe('throttleTime(fn, { leading: false, trailing: true })', () => { - asDiagram('throttleTime(fn, { leading: false, trailing: true })')('should immediately emit the first value in each time window', () => { - const e1 = hot('-a-xy-----b--x--cxxx--|'); - const e1subs = '^ !'; - const t = time( '----| '); - const expected = '-----y--------x-----x-|'; + asDiagram('throttleTime(fn, { leading: false, trailing: true })')('should emit last given value in each time window', () => { + const e1 = hot('-a-xy-----b--x--cxcd---|'); + const e1subs = '^ !'; + const t = time( '----| '); + const expected = '-----y--------x---c---d|'; const result = e1.throttleTime(t, rxTestScheduler, { leading: false, trailing: true }); diff --git a/src/operators/throttleTime.ts b/src/operators/throttleTime.ts index d81920a38d..fe6f151c7f 100644 --- a/src/operators/throttleTime.ts +++ b/src/operators/throttleTime.ts @@ -85,30 +85,37 @@ class ThrottleTimeSubscriber extends Subscriber { } protected _next(value: T) { - if (this.throttled) { - if (this.trailing) { - this._trailingValue = value; - this._hasTrailingValue = true; - } - } else { - this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })); - if (this.leading) { - this.destination.next(value); - } + if (!this.throttled && this.leading) { + this.throttle(); + this.destination.next(value); + } else if (this.trailing) { + this._trailingValue = value; + this._hasTrailingValue = true; + } + + if (!this.throttled) { + this.throttle(); } } + private throttle(): void { + this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })); + } + clearThrottle() { const throttled = this.throttled; + if (throttled) { + throttled.unsubscribe(); + this.throttled = null; + if (this.trailing && this._hasTrailingValue) { - this.destination.next(this._trailingValue); + const trailingValue = this._trailingValue; this._trailingValue = null; this._hasTrailingValue = false; + this.throttle(); + this.destination.next(trailingValue); } - throttled.unsubscribe(); - this.remove(throttled); - this.throttled = null; } } }