Skip to content

Commit

Permalink
fix(throttle): now properly trailing throttles for individual values
Browse files Browse the repository at this point in the history
BREAKING CHANGES: This changes the behavior of throttle, in particular
throttling with both leading and trailing behaviors set to true, to more
closely match the throttling behavior of lodash and other libraries.
Throttling now starts immediately after any emission from the
observable, and values will not be double emitted for both leading and
trailing values

closes ReactiveX#2864
  • Loading branch information
benlesh committed Mar 30, 2018
1 parent 7b8a3e3 commit 5fabee8
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 73 deletions.
94 changes: 61 additions & 33 deletions spec/operators/throttle-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,28 +209,24 @@ describe('Observable.prototype.throttle', () => {
});

it('should propagate error thrown from durationSelector function', () => {
const e1 = hot('abcdefabcdabcdefghabca| ');
const e1subs = '^ ! ';
const e2 = [cold('-----| '),
cold( '---| '),
cold( '-------| ')];
const e2subs = ['^ ! ',
' ^ ! '];
const expected = 'a-----a---# ';
const s1 = hot('--^--x--x--x--x--x--x--e--x--x--x--|');
const s1Subs = '^ !';
const n1 = cold( '----|');
const n1Subs = [' ^ ! ',
' ^ ! ',
' ^ ! '];
const exp = '---x-----x-----x-----(e#)';

let i = 0;
const result = e1.throttle(() => {
if (i === 2) {
throw 'error';
const result = s1.throttle(() => {
if (i++ === 3) {
throw new Error('lol');
}
return e2[i++];
return n1;
});

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
for (let j = 0; j < e2subs.length; j++) {
expectSubscriptions(e2[j].subscriptions).toBe(e2subs[j]);
}
expectObservable(result).toBe(exp, undefined, new Error('lol'));
expectSubscriptions(s1.subscriptions).toBe(s1Subs);
expectSubscriptions(n1.subscriptions).toBe(n1Subs);
});

it('should complete when source does not emit', () => {
Expand Down Expand Up @@ -353,37 +349,69 @@ describe('Observable.prototype.throttle', () => {

describe('throttle(fn, { leading: true, trailing: true })', () => {
asDiagram('throttle(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx--|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! '];
const expected = '-a---y----b---x-c---x-|';
const e1 = hot('-a-xy-----b--x--cxxx------|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ !'];
const expected = '-a---y----b---x---x---x---|';

const result = e1.throttle(() => e2, { leading: true, trailing: true });

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should work for individual values', () => {
const s1 = hot('-^-x------------------|');
const s1Subs = '^ !';
const n1 = cold( '------------------------|');
const n1Subs = [' ^ !'];
const exp = '--x------------------|';

const result = s1.throttle(() => n1, { leading: true, trailing: true });
expectObservable(result).toBe(exp);
expectSubscriptions(s1.subscriptions).toBe(s1Subs);
expectSubscriptions(n1.subscriptions).toBe(n1Subs);
});
});

describe('throttle(fn, { leading: false, trailing: true })', () => {
asDiagram('throttle(fn, { leading: false, trailing: true })')('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx--|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! '];
const expected = '-----y--------x-----x-|';
const e1 = hot('-a-xy-----b--x--cxxx------|');
const e1subs = '^ !';
const e2 = cold( '----| ');
const e2subs = [' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ ! ',
' ^ !'];
const expected = '-a---y----b---x---x---x---|';

const result = e1.throttle(() => e2, { leading: false, trailing: true });
const result = e1.throttle(() => e2, { leading: true, trailing: true });

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should work for individual values', () => {
const s1 = hot('-^-x------------------|');
const s1Subs = '^ !';
const n1 = cold( '------------------------|');
const n1Subs = [' ^ !'];
const exp = '--x------------------|';

const result = s1.throttle(() => n1, { leading: true, trailing: true });
expectObservable(result).toBe(exp);
expectSubscriptions(s1.subscriptions).toBe(s1Subs);
expectSubscriptions(n1.subscriptions).toBe(n1Subs);
});
});
});
77 changes: 37 additions & 40 deletions src/internal/operators/throttle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,38 +83,47 @@ class ThrottleOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
private throttled: Subscription;
private _trailingValue: T;
private _hasTrailingValue = false;
private _throttled: Subscription;
private _sendValue: T;
private _hasValue = false;

constructor(protected destination: Subscriber<T>,
private durationSelector: (value: T) => SubscribableOrPromise<any>,
private durationSelector: (value: T) => SubscribableOrPromise<number>,
private _leading: boolean,
private _trailing: boolean) {
super(destination);
}

protected _next(value: T): void {
if (this.throttled) {
if (this._trailing) {
this._hasTrailingValue = true;
this._trailingValue = value;
}
} else {
const duration = this.tryDurationSelector(value);
if (duration) {
this.add(this.throttled = subscribeToResult(this, duration));
}
this._hasValue = true;
this._sendValue = value;

if (!this._throttled) {
if (this._leading) {
this.destination.next(value);
if (this._trailing) {
this._hasTrailingValue = true;
this._trailingValue = value;
}
this.send();
} else {
this.throttle(value);
}
}
}

private send() {
const { _hasValue, _sendValue } = this;
if (_hasValue) {
this.destination.next(_sendValue);
this.throttle(_sendValue);
}
this._hasValue = false;
this._sendValue = null;
}

private throttle(value: T): void {
const duration = this.tryDurationSelector(value);
if (duration) {
this.add(this._throttled = subscribeToResult(this, duration));
}
}

private tryDurationSelector(value: T): SubscribableOrPromise<any> {
try {
return this.durationSelector(value);
Expand All @@ -124,37 +133,25 @@ class ThrottleSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}

protected _unsubscribe() {
const { throttled, _trailingValue, _hasTrailingValue, _trailing } = this;

this._trailingValue = null;
this._hasTrailingValue = false;

if (throttled) {
this.remove(throttled);
this.throttled = null;
throttled.unsubscribe();
private throttlingDone() {
const { _throttled, _trailing } = this;
if (_throttled) {
_throttled.unsubscribe();
}
}
this._throttled = null;

private _sendTrailing() {
const { destination, throttled, _trailing, _trailingValue, _hasTrailingValue } = this;
if (throttled && _trailing && _hasTrailingValue) {
destination.next(_trailingValue);
this._trailingValue = null;
this._hasTrailingValue = false;
if (_trailing) {
this.send();
}
}

notifyNext(outerValue: T, innerValue: R,
outerIndex: number, innerIndex: number,
innerSub: InnerSubscriber<T, R>): void {
this._sendTrailing();
this._unsubscribe();
this.throttlingDone();
}

notifyComplete(): void {
this._sendTrailing();
this._unsubscribe();
this.throttlingDone();
}
}

0 comments on commit 5fabee8

Please sign in to comment.