From cf7c5e53b5fc535fd2b63d409d8390a9cc3ec7f4 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 1 Sep 2020 22:55:14 -0500 Subject: [PATCH] fix(throttleTime): ensure the spacing between throttles is always at least the throttled amount Works to align the behavior with expectations set by lodash's throttle - Updates tests - Ensures trailing throttle will wait to notify and then complete - Ensures that every time we emit a value a new throttle period starts fixes #3712 related #4864 fixes #2727 closes #4727 related #4429 --- spec/operators/throttleTime-spec.ts | 338 ++++++++++++++----------- src/internal/operators/throttleTime.ts | 112 ++++---- 2 files changed, 250 insertions(+), 200 deletions(-) diff --git a/spec/operators/throttleTime-spec.ts b/spec/operators/throttleTime-spec.ts index fc48a92654..f16ea44b5f 100644 --- a/spec/operators/throttleTime-spec.ts +++ b/spec/operators/throttleTime-spec.ts @@ -1,200 +1,254 @@ +/** @prettier */ import { expect } from 'chai'; -import { hot, cold, expectObservable, expectSubscriptions, time } from '../helpers/marble-testing'; import { throttleTime, take, map, mergeMap } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import { of, concat, timer } from 'rxjs'; - -declare const rxTestScheduler: TestScheduler; +import { observableMatcher } from '../helpers/observableMatcher'; /** @test {throttleTime} */ describe('throttleTime operator', () => { - it('should immediately emit the first value in each time window', () => { - const e1 = hot('-a-x-y----b---x-cx---|'); - const subs = '^ !'; - const expected = '-a--------b-----c----|'; - - const result = e1.pipe(throttleTime(50, rxTestScheduler)); + let rxTest: TestScheduler; - expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + beforeEach(() => { + rxTest = new TestScheduler(observableMatcher); }); - it('should throttle events by 50 time units', (done: MochaDone) => { - of(1, 2, 3).pipe(throttleTime(50)) - .subscribe((x: number) => { - expect(x).to.equal(1); - }, null, done); - }); + describe('defailt behavior { leading: true, trailing: false }', () => { + it('should immediately emit the first value in each time window', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' -a-x-y----b---x-cx---|'); + // ----| ----| ----| + const expected = '-a--------b-----c----|'; + const subs = ' ^--------------------!'; - it('should throttle events multiple times', () => { - const expected = ['1-0', '2-0']; - concat( - timer(0, 10, rxTestScheduler).pipe(take(3), map((x: number) => '1-' + x)), - timer(80, 10, rxTestScheduler).pipe(take(5), map((x: number) => '2-' + x)) - ).pipe( - throttleTime(50, rxTestScheduler) - ).subscribe((x: string) => { - expect(x).to.equal(expected.shift()); - }); + const result = e1.pipe(throttleTime(5, rxTest)); - rxTestScheduler.flush(); - }); + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); - it('should simply mirror the source if values are not emitted often enough', () => { - const e1 = hot('-a--------b-----c----|'); - const subs = '^ !'; - const expected = '-a--------b-----c----|'; + it('should throttle events by 5 time units', (done: MochaDone) => { + of(1, 2, 3) + .pipe(throttleTime(5)) + .subscribe( + (x: number) => { + expect(x).to.equal(1); + }, + null, + done + ); + }); - expectObservable(e1.pipe(throttleTime(50, rxTestScheduler))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); - }); + it('should throttle events multiple times', () => { + const expected = ['1-0', '2-0']; + concat( + timer(0, 1, rxTest).pipe( + take(3), + map((x: number) => '1-' + x) + ), + timer(8, 1, rxTest).pipe( + take(5), + map((x: number) => '2-' + x) + ) + ) + .pipe(throttleTime(5, rxTest)) + .subscribe((x: string) => { + expect(x).to.equal(expected.shift()); + }); + + rxTest.flush(); + }); - it('should handle a busy producer emitting a regular repeating sequence', () => { - const e1 = hot('abcdefabcdefabcdefabcdefa|'); - const subs = '^ !'; - const expected = 'a-----a-----a-----a-----a|'; + it('should simply mirror the source if values are not emitted often enough', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' -a--------b-----c----|'); + const subs = ' ^--------------------!'; + const expected = '-a--------b-----c----|'; - expectObservable(e1.pipe(throttleTime(50, rxTestScheduler))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); - }); + expectObservable(e1.pipe(throttleTime(5, rxTest))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); - it('should complete when source does not emit', () => { - const e1 = hot('-----|'); - const subs = '^ !'; - const expected = '-----|'; + it('should handle a busy producer emitting a regular repeating sequence', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' abcdefabcdefabcdefabcdefa|'); + const subs = ' ^------------------------!'; + const expected = 'a-----a-----a-----a-----a|'; - expectObservable(e1.pipe(throttleTime(50, rxTestScheduler))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); - }); + expectObservable(e1.pipe(throttleTime(5, rxTest))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); - it('should raise error when source does not emit and raises error', () => { - const e1 = hot('-----#'); - const subs = '^ !'; - const expected = '-----#'; + it('should complete when source does not emit', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' -----|'); + const subs = ' ^----!'; + const expected = '-----|'; - expectObservable(e1.pipe(throttleTime(10, rxTestScheduler))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); - }); + expectObservable(e1.pipe(throttleTime(5, rxTest))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); - it('should handle an empty source', () => { - const e1 = cold('|'); - const subs = '(^!)'; - const expected = '|'; + it('should raise error when source does not emit and raises error', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' -----#'); + const subs = ' ^----!'; + const expected = '-----#'; - expectObservable(e1.pipe(throttleTime(30, rxTestScheduler))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); - }); + expectObservable(e1.pipe(throttleTime(10, rxTest))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); - it('should handle a never source', () => { - const e1 = cold('-'); - const subs = '^'; - const expected = '-'; + it('should handle an empty source', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' |'); + const subs = ' (^!)'; + const expected = '|'; - expectObservable(e1.pipe(throttleTime(30, rxTestScheduler))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); - }); + expectObservable(e1.pipe(throttleTime(30, rxTest))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); - it('should handle a throw source', () => { - const e1 = cold('#'); - const subs = '(^!)'; - const expected = '#'; + it('should handle a never source', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' -'); + const subs = ' ^'; + const expected = '-'; - expectObservable(e1.pipe(throttleTime(30, rxTestScheduler))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); - }); + expectObservable(e1.pipe(throttleTime(30, rxTest))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); - it('should throttle and does not complete when source does not completes', () => { - const e1 = hot('-a--(bc)-------d----------------'); - const unsub = ' !'; - const subs = '^ !'; - const expected = '-a-------------d----------------'; + it('should handle a throw source', () => { + rxTest.run(({ cold, expectObservable, expectSubscriptions }) => { + const e1 = cold(' #'); + const subs = ' (^!)'; + const expected = '#'; - expectObservable(e1.pipe(throttleTime(50, rxTestScheduler)), unsub).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); - }); + expectObservable(e1.pipe(throttleTime(30, rxTest))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); - it('should not break unsubscription chains when result is unsubscribed explicitly', () => { - const e1 = hot('-a--(bc)-------d----------------'); - const subs = '^ !'; - const expected = '-a-------------d----------------'; - const unsub = ' !'; + it('should throttle and does not complete when source does not completes', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' -a--(bc)-------d----------------'); + const unsub = ' -------------------------------!'; + const subs = ' ^------------------------------!'; + const expected = '-a-------------d----------------'; - const result = e1.pipe( - mergeMap((x: string) => of(x)), - throttleTime(50, rxTestScheduler), - mergeMap((x: string) => of(x)) - ); + expectObservable(e1.pipe(throttleTime(5, rxTest)), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); - expectObservable(result, unsub).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); - }); + it('should not break unsubscription chains when result is unsubscribed explicitly', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' -a--(bc)-------d----------------'); + const subs = ' ^------------------------------!'; + const expected = '-a-------------d----------------'; + const unsub = ' -------------------------------!'; + + const result = e1.pipe( + mergeMap((x: string) => of(x)), + throttleTime(5, rxTest), + mergeMap((x: string) => of(x)) + ); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); - it('should throttle values until source raises error', () => { - const e1 = hot('-a--(bc)-------d---------------#'); - const subs = '^ !'; - const expected = '-a-------------d---------------#'; + it('should throttle values until source raises error', () => { + rxTest.run(({ hot, expectObservable, expectSubscriptions }) => { + const e1 = hot(' -a--(bc)-------d---------------#'); + const subs = ' ^------------------------------!'; + const expected = '-a-------------d---------------#'; - expectObservable(e1.pipe(throttleTime(50, rxTestScheduler))).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(subs); + expectObservable(e1.pipe(throttleTime(5, rxTest))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(subs); + }); + }); }); describe('throttleTime(fn, { leading: true, trailing: true })', () => { - it('should immediately emit the first and last values in each time window', () => { - const e1 = hot('-a-xy-----b--x--cxxx--|'); - const e1subs = '^ !'; - const t = time( '----| '); - const expected = '-a---y----b---x-c---x-|'; - - const result = e1.pipe(throttleTime(t, rxTestScheduler, { leading: true, trailing: true })); - - expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); + it('should immediately emit the first and last values in each time window', () => { + rxTest.run(({ hot, time, expectObservable, expectSubscriptions }) => { + const e1 = hot(' -a-xy-----b--x--cxxx--|'); + const e1subs = ' ^---------------------!'; + const t = time(' ----| '); + // ----|----|---|---| + const expected = '-a---y----b---x---x---(x|)'; + + const result = e1.pipe(throttleTime(t, rxTest, { leading: true, trailing: true })); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); it('should emit the value if only a single one is given', () => { - const e1 = hot('-a--------------------|'); - const t = time('----| '); - const expected = '-a--------------------|'; + rxTest.run(({ hot, time, expectObservable }) => { + const e1 = hot(' -a--------------------|'); + const t = time(' ----| '); + const expected = '-a--------------------|'; - const result = e1.pipe(throttleTime(t, rxTestScheduler, { leading: true, trailing: true })); + const result = e1.pipe(throttleTime(t, rxTest, { leading: true, trailing: true })); - expectObservable(result).toBe(expected); + expectObservable(result).toBe(expected); + }); }); }); describe('throttleTime(fn, { leading: false, trailing: true })', () => { - it('should immediately emit the last value in each time window', () => { - const e1 = hot('-a-xy-----b--x--cxxx--|'); - const e1subs = '^ !'; - const t = time( '----| '); - const expected = '-----y--------x-----x-|'; - - const result = e1.pipe(throttleTime(t, rxTestScheduler, { leading: false, trailing: true })); - - expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); + it('should immediately emit the last value in each time window', () => { + rxTest.run(({ hot, time, expectObservable, expectSubscriptions }) => { + const e1 = hot(' -a-xy-----b--x--cxxx--|'); + const e1subs = ' ^---------------------!'; + const t = time(' ----| '); + // ----|---|----|---|---| + const expected = '-----y--------x---x---(x|)'; + + const result = e1.pipe(throttleTime(t, rxTest, { leading: false, trailing: true })); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); it('should emit the last throttled value when complete', () => { - const e1 = hot('-a-xy-----b--x--cxx|'); - const e1subs = '^ !'; - const t = time('----| '); - const expected = '-----y--------x----(x|)'; + rxTest.run(({ hot, time, expectObservable, expectSubscriptions }) => { + const e1 = hot(' -a-xy-----b--x--cxx|'); + const e1subs = ' ^------------------!'; + const t = time(' ----| '); + // ----|---|----|---|---| + const expected = '-----y--------x---x---|'; - const result = e1.pipe(throttleTime(t, rxTestScheduler, { leading: false, trailing: true })); + const result = e1.pipe(throttleTime(t, rxTest, { leading: false, trailing: true })); - expectObservable(result).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); it('should emit the value if only a single one is given', () => { - const e1 = hot('-a--------------------|'); - const t = time('----| '); - const expected = '-----a----------------|'; + rxTest.run(({ hot, time, expectObservable }) => { + const e1 = hot(' -a--------------------|'); + const t = time(' ----| '); + const expected = '-----a----------------|'; - const result = e1.pipe(throttleTime(t, rxTestScheduler, { leading: false, trailing: true })); + const result = e1.pipe(throttleTime(t, rxTest, { leading: false, trailing: true })); - expectObservable(result).toBe(expected); + expectObservable(result).toBe(expected); + }); }); }); }); diff --git a/src/internal/operators/throttleTime.ts b/src/internal/operators/throttleTime.ts index 4f10e8908b..3308a55fd4 100644 --- a/src/internal/operators/throttleTime.ts +++ b/src/internal/operators/throttleTime.ts @@ -1,7 +1,8 @@ +/** @prettier */ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { async } from '../scheduler/async'; +import { asyncScheduler } from '../scheduler/async'; import { Observable } from '../Observable'; import { ThrottleConfig, defaultThrottleConfig } from './throttle'; import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types'; @@ -74,34 +75,29 @@ import { lift } from '../util/lift'; * @see {@link sampleTime} * @see {@link throttle} * - * @param {number} duration Time to wait before emitting another value after + * @param duration Time to wait before emitting another value after * emitting the last value, measured in milliseconds or the time unit determined * internally by the optional `scheduler`. - * @param {SchedulerLike} [scheduler=async] The {@link SchedulerLike} to use for - * managing the timers that handle the throttling. - * @param {Object} config a configuration object to define `leading` and + * @param scheduler The {@link SchedulerLike} to use for + * managing the timers that handle the throttling. Defaults to {@link asyncScheduler}. + * @param config a configuration object to define `leading` and * `trailing` behavior. Defaults to `{ leading: true, trailing: false }`. - * @return {Observable} An Observable that performs the throttle operation to + * @return An Observable that performs the throttle operation to * limit the rate of emissions from the source. - * @name throttleTime */ -export function throttleTime(duration: number, - scheduler: SchedulerLike = async, - config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction { +export function throttleTime( + duration: number, + scheduler: SchedulerLike = asyncScheduler, + config: ThrottleConfig = defaultThrottleConfig +): MonoTypeOperatorFunction { return (source: Observable) => lift(source, new ThrottleTimeOperator(duration, scheduler, !!config.leading, !!config.trailing)); } class ThrottleTimeOperator implements Operator { - constructor(private duration: number, - private scheduler: SchedulerLike, - private leading: boolean, - private trailing: boolean) { - } + constructor(private duration: number, private scheduler: SchedulerLike, private leading: boolean, private trailing: boolean) {} call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe( - new ThrottleTimeSubscriber(subscriber, this.duration, this.scheduler, this.leading, this.trailing) - ); + return source.subscribe(new ThrottleTimeSubscriber(subscriber, this.duration, this.scheduler, this.leading, this.trailing)); } } @@ -112,63 +108,63 @@ class ThrottleTimeOperator implements Operator { */ class ThrottleTimeSubscriber extends Subscriber { private throttled: Subscription | null = null; - private _hasTrailingValue: boolean = false; - private _trailingValue: T | null = null; + private trailingValue: T | null = null; + private hasTrailingValue = false; + private isComplete = false; - constructor(destination: Subscriber, - private duration: number, - private scheduler: SchedulerLike, - private leading: boolean, - private trailing: boolean) { + constructor( + destination: Subscriber, + private duration: number, + private scheduler: SchedulerLike, + private leading: boolean, + private trailing: boolean + ) { super(destination); } protected _next(value: T) { + const { destination } = this; if (this.throttled) { if (this.trailing) { - this._trailingValue = value; - this._hasTrailingValue = true; + this.trailingValue = value; + this.hasTrailingValue = true; } } else { - this.add(this.throttled = this.scheduler.schedule>(dispatchNext as any, this.duration, { subscriber: this })); if (this.leading) { - this.destination.next(value); + destination.next(value); } else if (this.trailing) { - this._trailingValue = value; - this._hasTrailingValue = true; + this.trailingValue = value; + this.hasTrailingValue = true; } + this.throttle(); } } - protected _complete() { - if (this._hasTrailingValue) { - this.destination.next(this._trailingValue); - this.destination.complete(); - } else { - this.destination.complete(); - } + private throttle() { + const { destination } = this; + (destination as Subscription).add( + (this.throttled = this.scheduler.schedule(() => { + this.throttled = null; + const { trailing, trailingValue, hasTrailingValue } = this; + if (trailing && hasTrailingValue) { + this.hasTrailingValue = false; + this.trailingValue = null; + destination.next(trailingValue); + this.throttle(); + } + if (this.isComplete) { + destination.complete(); + } + }, this.duration)) + ); } - clearThrottle() { - const throttled = this.throttled; - if (throttled) { - if (this.trailing && this._hasTrailingValue) { - this.destination.next(this._trailingValue); - this._trailingValue = null; - this._hasTrailingValue = false; - } - throttled.unsubscribe(); - this.remove(throttled); - this.throttled = null!; + protected _complete() { + this.isComplete = true; + const { trailing, throttled, destination } = this; + if (!throttled || !trailing) { + destination.complete(); } + this.unsubscribe(); } } - -interface DispatchArg { - subscriber: ThrottleTimeSubscriber; -} - -function dispatchNext(arg: DispatchArg) { - const { subscriber } = arg; - subscriber.clearThrottle(); -}