diff --git a/spec/operators/throttleTime-spec.ts b/spec/operators/throttleTime-spec.ts index f16ea44b5f..5868366eb5 100644 --- a/spec/operators/throttleTime-spec.ts +++ b/spec/operators/throttleTime-spec.ts @@ -226,11 +226,11 @@ describe('throttleTime operator', () => { it('should emit the last throttled value when complete', () => { rxTest.run(({ hot, time, expectObservable, expectSubscriptions }) => { - const e1 = hot(' -a-xy-----b--x--cxx|'); - const e1subs = ' ^------------------!'; - const t = time(' ----| '); + const e1 = hot(' -a-xy-----b--x--cxx-|'); + const e1subs = ' ^-------------------!'; + const t = time(' ----| '); // ----|---|----|---|---| - const expected = '-----y--------x---x---|'; + const expected = '-----y--------x---x-|'; const result = e1.pipe(throttleTime(t, rxTest, { leading: false, trailing: true })); diff --git a/src/internal/operators/throttleTime.ts b/src/internal/operators/throttleTime.ts index 3308a55fd4..e16caf3b60 100644 --- a/src/internal/operators/throttleTime.ts +++ b/src/internal/operators/throttleTime.ts @@ -101,11 +101,6 @@ class ThrottleTimeOperator implements Operator { } } -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ class ThrottleTimeSubscriber extends Subscriber { private throttled: Subscription | null = null; private trailingValue: T | null = null; @@ -125,21 +120,36 @@ class ThrottleTimeSubscriber extends Subscriber { protected _next(value: T) { const { destination } = this; if (this.throttled) { + // There is already a throttle timer going, + // we don't emit in this case. if (this.trailing) { + // Update the trailing value, so that when the + // throttle executes, it has a trailing value to emit. this.trailingValue = value; this.hasTrailingValue = true; } } else { if (this.leading) { + // With the leading behavior, if we're not + // throttled we can emit the value right away, + // but then we have to start the throttle. destination.next(value); } else if (this.trailing) { + // We're NOT leading, but we are trailing, then + // we need to set the trailing value so once the throttle + // is done we can emit it. We don't do this if we're leading + // as well, because that would result in a double emission. this.trailingValue = value; this.hasTrailingValue = true; } + this.throttle(); } } + /** + * Schedules the throttle delay. + */ private throttle() { const { destination } = this; (destination as Subscription).add( @@ -147,12 +157,21 @@ class ThrottleTimeSubscriber extends Subscriber { this.throttled = null; const { trailing, trailingValue, hasTrailingValue } = this; if (trailing && hasTrailingValue) { + // We're trailing, so emit the trailing value if + // we have one. this.hasTrailingValue = false; this.trailingValue = null; destination.next(trailingValue); + + // If we have emitted a value, though, we need + // to make sure that we throttle to keep the emitted + // values spaced out by the given throttle. this.throttle(); } + if (this.isComplete) { + // The source completed, we can't get any more values + // so we can complete, which will tear everything down. destination.complete(); } }, this.duration)) @@ -161,8 +180,13 @@ class ThrottleTimeSubscriber extends Subscriber { protected _complete() { this.isComplete = true; - const { trailing, throttled, destination } = this; - if (!throttled || !trailing) { + const { trailing, throttled, hasTrailingValue, destination } = this; + // If we're not throttled, we close because there's clearly nothing we're waiting for. + // If we're not using trailing values, we can close, because there couldn't be an trailing values + // trapped that we want to emit. And even if we are using trailing values, if the source + // completes, and we don't have a trailing value yet, we can complete because it cannot + // possibly provide one at that poin. + if (!throttled || !trailing || !hasTrailingValue) { destination.complete(); } this.unsubscribe();