From ade1c0e51ebed01a6b8721b722932800ceb945db Mon Sep 17 00:00:00 2001 From: Alexander von Weiss Date: Tue, 26 Sep 2017 11:49:33 +0200 Subject: [PATCH 1/5] fix(throttleTime): leading and trailing true should only emit a single value per time frame --- spec/operators/throttleTime-spec.ts | 74 ++++++++++++++++------------- src/operators/throttleTime.ts | 32 ++++++++----- 2 files changed, 60 insertions(+), 46 deletions(-) diff --git a/spec/operators/throttleTime-spec.ts b/spec/operators/throttleTime-spec.ts index e668c9b9c7..2b19a74a78 100644 --- a/spec/operators/throttleTime-spec.ts +++ b/spec/operators/throttleTime-spec.ts @@ -55,12 +55,48 @@ describe('Observable.prototype.throttleTime', () => { expectSubscriptions(e1.subscriptions).toBe(subs); }); - it('should handle a busy producer emitting a regular repeating sequence', () => { - const e1 = hot('abcdefabcdefabcdefabcdefa|'); - const subs = '^ !'; - const expected = 'a-----a-----a-----a-----a|'; + it('should handle a busy producer emitting a regular repeating sequence with leading: true, trailing: false', () => { + const e1 = hot('a12345b12345c12345d123|'); + const subs = '^ !'; + const expected = 'a-----b-----c-----d---|'; - expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(50, rxTestScheduler, {leading: true, trailing: false})).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a busy producer emitting a regular repeating sequence with leading: true, trailing: true', () => { + const e1 = hot('a1234b1234c1234d12e--|'); + const subs = '^ !'; + const expected = 'a----b----c----d----e|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler, {leading: true, trailing: true})).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should not drop values if values are far apart with leading: true, trailing: true', () => { + const e1 = hot('ab-----------c|'); + const subs = '^ !'; + const expected = 'a----b-------c|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler, {leading: true, trailing: true})).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should handle a busy producer emitting a regular repeating sequence with leading: false, trailing: true', () => { + const e1 = hot('12345a1234b1234c12d--|'); + const subs = '^ !'; + const expected = '-----a----b----c----d|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler, {leading: false, trailing: true})).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + + it('should only emit trailing values with leading: false, trailing: true', () => { + const e1 = hot('ab-----------c--d--|'); + const subs = '^ !'; + const expected = '-----b------------d|'; + + expectObservable(e1.throttleTime(50, rxTestScheduler, {leading: false, trailing: true})).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); @@ -142,32 +178,4 @@ describe('Observable.prototype.throttleTime', () => { expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); - - describe('throttleTime(fn, { leading: true, trailing: true })', () => { - asDiagram('throttleTime(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => { - const e1 = hot('-a-xy-----b--x--cxxx--|'); - const e1subs = '^ !'; - const t = time( '----| '); - const expected = '-a---y----b---x-c---x-|'; - - const result = e1.throttleTime(t, rxTestScheduler, { leading: true, trailing: true }); - - expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - }); - - describe('throttleTime(fn, { leading: false, trailing: true })', () => { - asDiagram('throttleTime(fn, { leading: false, trailing: true })')('should immediately emit the first value in each time window', () => { - const e1 = hot('-a-xy-----b--x--cxxx--|'); - const e1subs = '^ !'; - const t = time( '----| '); - const expected = '-----y--------x-----x-|'; - - const result = e1.throttleTime(t, rxTestScheduler, { leading: false, trailing: true }); - - expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); - }); - }); }); diff --git a/src/operators/throttleTime.ts b/src/operators/throttleTime.ts index d81920a38d..e3bc893d6a 100644 --- a/src/operators/throttleTime.ts +++ b/src/operators/throttleTime.ts @@ -85,30 +85,36 @@ class ThrottleTimeSubscriber extends Subscriber { } protected _next(value: T) { - if (this.throttled) { - if (this.trailing) { - this._trailingValue = value; - this._hasTrailingValue = true; - } - } else { - this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })); - if (this.leading) { - this.destination.next(value); - } + if (!this.throttled && this.leading) { + this.destination.next(value); + } else if (this.trailing) { + this._trailingValue = value; + this._hasTrailingValue = true; + } + + if (!this.throttled) { + this.throttle(); } } + private throttle(): void { + this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })); + } + clearThrottle() { const throttled = this.throttled; + if (throttled) { + throttled.unsubscribe(); + this.remove(throttled); + this.throttled = null; + if (this.trailing && this._hasTrailingValue) { this.destination.next(this._trailingValue); this._trailingValue = null; this._hasTrailingValue = false; + this.throttle(); } - throttled.unsubscribe(); - this.remove(throttled); - this.throttled = null; } } } From 9cda512b1340e5ad6a62243167a65799acbc3330 Mon Sep 17 00:00:00 2001 From: Alexander von Weiss Date: Tue, 26 Sep 2017 11:53:22 +0200 Subject: [PATCH 2/5] fix(throttleTime): feedback from @Parakleta - schedule timer before emitting value --- src/operators/throttleTime.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/operators/throttleTime.ts b/src/operators/throttleTime.ts index e3bc893d6a..041802d93f 100644 --- a/src/operators/throttleTime.ts +++ b/src/operators/throttleTime.ts @@ -86,6 +86,7 @@ class ThrottleTimeSubscriber extends Subscriber { protected _next(value: T) { if (!this.throttled && this.leading) { + this.throttle(); this.destination.next(value); } else if (this.trailing) { this._trailingValue = value; @@ -110,10 +111,11 @@ class ThrottleTimeSubscriber extends Subscriber { this.throttled = null; if (this.trailing && this._hasTrailingValue) { - this.destination.next(this._trailingValue); + const trailingValue = this._trailingValue; this._trailingValue = null; this._hasTrailingValue = false; this.throttle(); + this.destination.next(trailingValue); } } } From 9fc78aab068dd20da94acdbd9e8b2fbeab035497 Mon Sep 17 00:00:00 2001 From: Alexander von Weiss Date: Tue, 26 Sep 2017 13:46:06 +0200 Subject: [PATCH 3/5] fix(throttleTime): add back asDiagram specs --- spec/operators/throttleTime-spec.ts | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/spec/operators/throttleTime-spec.ts b/spec/operators/throttleTime-spec.ts index 2b19a74a78..b1323f8c2a 100644 --- a/spec/operators/throttleTime-spec.ts +++ b/spec/operators/throttleTime-spec.ts @@ -178,4 +178,32 @@ describe('Observable.prototype.throttleTime', () => { expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); + + describe('throttleTime(fn, { leading: true, trailing: true })', () => { + asDiagram('throttleTime(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => { + const e1 = hot('-a-xy-----b--x--cxxx---|'); + const e1subs = '^ !'; + const t = time( '----| '); + const expected = '-a---y----b---x---x---x|'; + + const result = e1.throttleTime(t, rxTestScheduler, { leading: true, trailing: true }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + }); + + describe('throttleTime(fn, { leading: false, trailing: true })', () => { + asDiagram('throttleTime(fn, { leading: false, trailing: true })')('should emit last given value in each time window', () => { + const e1 = hot('-a-xy-----b--x--cxcd---|'); + const e1subs = '^ !'; + const t = time( '----| '); + const expected = '-----y--------x---c---d|'; + + const result = e1.throttleTime(t, rxTestScheduler, { leading: false, trailing: true }); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + }); }); From 15701a3acc3fd4f0498247d19443af000f11bb04 Mon Sep 17 00:00:00 2001 From: Alexander von Weiss Date: Tue, 26 Sep 2017 18:09:01 +0200 Subject: [PATCH 4/5] fix(throttleTime): .remove(throttled) is a noop as throttled.unsubscribe() already does it --- src/operators/throttleTime.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/operators/throttleTime.ts b/src/operators/throttleTime.ts index 041802d93f..fe6f151c7f 100644 --- a/src/operators/throttleTime.ts +++ b/src/operators/throttleTime.ts @@ -107,7 +107,6 @@ class ThrottleTimeSubscriber extends Subscriber { if (throttled) { throttled.unsubscribe(); - this.remove(throttled); this.throttled = null; if (this.trailing && this._hasTrailingValue) { From 1b6c92abb2fc61b7b0f317bf2436ee4fb3fe7927 Mon Sep 17 00:00:00 2001 From: Alexander von Weiss Date: Tue, 26 Sep 2017 18:09:41 +0200 Subject: [PATCH 5/5] fix(throttleTime): use time() to generate throttle amount --- spec/operators/throttleTime-spec.ts | 46 +++++++++++++++++++---------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/spec/operators/throttleTime-spec.ts b/spec/operators/throttleTime-spec.ts index b1323f8c2a..a017237ad1 100644 --- a/spec/operators/throttleTime-spec.ts +++ b/spec/operators/throttleTime-spec.ts @@ -49,99 +49,110 @@ describe('Observable.prototype.throttleTime', () => { it('should simply mirror the source if values are not emitted often enough', () => { const e1 = hot('-a--------b-----c----|'); const subs = '^ !'; + const t = time('-----| '); const expected = '-a--------b-----c----|'; - expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should handle a busy producer emitting a regular repeating sequence with leading: true, trailing: false', () => { const e1 = hot('a12345b12345c12345d123|'); const subs = '^ !'; + const t = time('-----| '); const expected = 'a-----b-----c-----d---|'; - expectObservable(e1.throttleTime(50, rxTestScheduler, {leading: true, trailing: false})).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: true, trailing: false})).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should handle a busy producer emitting a regular repeating sequence with leading: true, trailing: true', () => { const e1 = hot('a1234b1234c1234d12e--|'); const subs = '^ !'; + const t = time('-----| '); const expected = 'a----b----c----d----e|'; - expectObservable(e1.throttleTime(50, rxTestScheduler, {leading: true, trailing: true})).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: true, trailing: true})).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should not drop values if values are far apart with leading: true, trailing: true', () => { const e1 = hot('ab-----------c|'); const subs = '^ !'; + const t = time('-----| '); const expected = 'a----b-------c|'; - expectObservable(e1.throttleTime(50, rxTestScheduler, {leading: true, trailing: true})).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: true, trailing: true})).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should handle a busy producer emitting a regular repeating sequence with leading: false, trailing: true', () => { const e1 = hot('12345a1234b1234c12d--|'); const subs = '^ !'; + const t = time('-----| '); const expected = '-----a----b----c----d|'; - expectObservable(e1.throttleTime(50, rxTestScheduler, {leading: false, trailing: true})).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: false, trailing: true})).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should only emit trailing values with leading: false, trailing: true', () => { const e1 = hot('ab-----------c--d--|'); const subs = '^ !'; + const t = time('-----| '); const expected = '-----b------------d|'; - expectObservable(e1.throttleTime(50, rxTestScheduler, {leading: false, trailing: true})).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: false, trailing: true})).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should complete when source does not emit', () => { const e1 = hot('-----|'); const subs = '^ !'; + const t = time('-----|'); const expected = '-----|'; - expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should raise error when source does not emit and raises error', () => { const e1 = hot('-----#'); const subs = '^ !'; + const t = time('| '); const expected = '-----#'; - expectObservable(e1.throttleTime(10, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should handle an empty source', () => { const e1 = cold('|'); const subs = '(^!)'; + const t = time('---|'); const expected = '|'; - expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should handle a never source', () => { const e1 = cold('-'); const subs = '^'; + const t = time('---|'); const expected = '-'; - expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); it('should handle a throw source', () => { const e1 = cold('#'); const subs = '(^!)'; + const t = time('---|'); const expected = '#'; - expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); @@ -149,9 +160,10 @@ describe('Observable.prototype.throttleTime', () => { const e1 = hot('-a--(bc)-------d----------------'); const unsub = ' !'; const subs = '^ !'; + const t = time('-----| '); const expected = '-a-------------d----------------'; - expectObservable(e1.throttleTime(50, rxTestScheduler), unsub).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler), unsub).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); @@ -159,11 +171,12 @@ describe('Observable.prototype.throttleTime', () => { const e1 = hot('-a--(bc)-------d----------------'); const subs = '^ !'; const expected = '-a-------------d----------------'; + const t = time('-----| '); const unsub = ' !'; const result = e1 .mergeMap((x: string) => Observable.of(x)) - .throttleTime(50, rxTestScheduler) + .throttleTime(t, rxTestScheduler) .mergeMap((x: string) => Observable.of(x)); expectObservable(result, unsub).toBe(expected); @@ -173,9 +186,10 @@ describe('Observable.prototype.throttleTime', () => { it('should throttle values until source raises error', () => { const e1 = hot('-a--(bc)-------d---------------#'); const subs = '^ !'; + const t = time('-----| '); const expected = '-a-------------d---------------#'; - expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected); + expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(subs); }); @@ -183,7 +197,7 @@ describe('Observable.prototype.throttleTime', () => { asDiagram('throttleTime(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => { const e1 = hot('-a-xy-----b--x--cxxx---|'); const e1subs = '^ !'; - const t = time( '----| '); + const t = time( '----| '); const expected = '-a---y----b---x---x---x|'; const result = e1.throttleTime(t, rxTestScheduler, { leading: true, trailing: true }); @@ -197,7 +211,7 @@ describe('Observable.prototype.throttleTime', () => { asDiagram('throttleTime(fn, { leading: false, trailing: true })')('should emit last given value in each time window', () => { const e1 = hot('-a-xy-----b--x--cxcd---|'); const e1subs = '^ !'; - const t = time( '----| '); + const t = time( '----| '); const expected = '-----y--------x---c---d|'; const result = e1.throttleTime(t, rxTestScheduler, { leading: false, trailing: true });