Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(throttleTime): leading and trailing true should only emit a single value per time frame #2749

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 73 additions & 23 deletions spec/operators/throttleTime-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,85 +49,134 @@ describe('Observable.prototype.throttleTime', () => {
it('should simply mirror the source if values are not emitted often enough', () => {
const e1 = hot('-a--------b-----c----|');
const subs = '^ !';
const t = time('-----| ');
const expected = '-a--------b-----c----|';

expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected);
expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should handle a busy producer emitting a regular repeating sequence', () => {
const e1 = hot('abcdefabcdefabcdefabcdefa|');
const subs = '^ !';
const expected = 'a-----a-----a-----a-----a|';
it('should handle a busy producer emitting a regular repeating sequence with leading: true, trailing: false', () => {
const e1 = hot('a12345b12345c12345d123|');
const subs = '^ !';
const t = time('-----| ');
const expected = 'a-----b-----c-----d---|';

expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected);
expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: true, trailing: false})).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should handle a busy producer emitting a regular repeating sequence with leading: true, trailing: true', () => {
const e1 = hot('a1234b1234c1234d12e--|');
const subs = '^ !';
const t = time('-----| ');
const expected = 'a----b----c----d----e|';

expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: true, trailing: true})).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should not drop values if values are far apart with leading: true, trailing: true', () => {
const e1 = hot('ab-----------c|');
const subs = '^ !';
const t = time('-----| ');
const expected = 'a----b-------c|';

expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: true, trailing: true})).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should handle a busy producer emitting a regular repeating sequence with leading: false, trailing: true', () => {
const e1 = hot('12345a1234b1234c12d--|');
const subs = '^ !';
const t = time('-----| ');
const expected = '-----a----b----c----d|';

expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: false, trailing: true})).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should only emit trailing values with leading: false, trailing: true', () => {
const e1 = hot('ab-----------c--d--|');
const subs = '^ !';
const t = time('-----| ');
const expected = '-----b------------d|';

expectObservable(e1.throttleTime(t, rxTestScheduler, {leading: false, trailing: true})).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should complete when source does not emit', () => {
const e1 = hot('-----|');
const subs = '^ !';
const t = time('-----|');
const expected = '-----|';

expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected);
expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should raise error when source does not emit and raises error', () => {
const e1 = hot('-----#');
const subs = '^ !';
const t = time('| ');
const expected = '-----#';

expectObservable(e1.throttleTime(10, rxTestScheduler)).toBe(expected);
expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should handle an empty source', () => {
const e1 = cold('|');
const subs = '(^!)';
const t = time('---|');
const expected = '|';

expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected);
expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should handle a never source', () => {
const e1 = cold('-');
const subs = '^';
const t = time('---|');
const expected = '-';

expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected);
expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should handle a throw source', () => {
const e1 = cold('#');
const subs = '(^!)';
const t = time('---|');
const expected = '#';

expectObservable(e1.throttleTime(30, rxTestScheduler)).toBe(expected);
expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should throttle and does not complete when source does not completes', () => {
const e1 = hot('-a--(bc)-------d----------------');
const unsub = ' !';
const subs = '^ !';
const t = time('-----| ');
const expected = '-a-------------d----------------';

expectObservable(e1.throttleTime(50, rxTestScheduler), unsub).toBe(expected);
expectObservable(e1.throttleTime(t, rxTestScheduler), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
const e1 = hot('-a--(bc)-------d----------------');
const subs = '^ !';
const expected = '-a-------------d----------------';
const t = time('-----| ');
const unsub = ' !';

const result = e1
.mergeMap((x: string) => Observable.of(x))
.throttleTime(50, rxTestScheduler)
.throttleTime(t, rxTestScheduler)
.mergeMap((x: string) => Observable.of(x));

expectObservable(result, unsub).toBe(expected);
Expand All @@ -137,18 +186,19 @@ describe('Observable.prototype.throttleTime', () => {
it('should throttle values until source raises error', () => {
const e1 = hot('-a--(bc)-------d---------------#');
const subs = '^ !';
const t = time('-----| ');
const expected = '-a-------------d---------------#';

expectObservable(e1.throttleTime(50, rxTestScheduler)).toBe(expected);
expectObservable(e1.throttleTime(t, rxTestScheduler)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

describe('throttleTime(fn, { leading: true, trailing: true })', () => {
asDiagram('throttleTime(fn, { leading: true, trailing: true })')('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx--|');
const e1subs = '^ !';
const t = time( '----| ');
const expected = '-a---y----b---x-c---x-|';
const e1 = hot('-a-xy-----b--x--cxxx---|');
const e1subs = '^ !';
const t = time( '----| ');
const expected = '-a---y----b---x---x---x|';

const result = e1.throttleTime(t, rxTestScheduler, { leading: true, trailing: true });

Expand All @@ -158,11 +208,11 @@ describe('Observable.prototype.throttleTime', () => {
});

describe('throttleTime(fn, { leading: false, trailing: true })', () => {
asDiagram('throttleTime(fn, { leading: false, trailing: true })')('should immediately emit the first value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxxx--|');
const e1subs = '^ !';
const t = time( '----| ');
const expected = '-----y--------x-----x-|';
asDiagram('throttleTime(fn, { leading: false, trailing: true })')('should emit last given value in each time window', () => {
const e1 = hot('-a-xy-----b--x--cxcd---|');
const e1subs = '^ !';
const t = time( '----| ');
const expected = '-----y--------x---c---d|';

const result = e1.throttleTime(t, rxTestScheduler, { leading: false, trailing: true });

Expand Down
35 changes: 21 additions & 14 deletions src/operators/throttleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,30 +85,37 @@ class ThrottleTimeSubscriber<T> extends Subscriber<T> {
}

protected _next(value: T) {
if (this.throttled) {
if (this.trailing) {
this._trailingValue = value;
this._hasTrailingValue = true;
}
} else {
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this }));
if (this.leading) {
this.destination.next(value);
}
if (!this.throttled && this.leading) {
this.throttle();
this.destination.next(value);
} else if (this.trailing) {
this._trailingValue = value;
this._hasTrailingValue = true;
}

if (!this.throttled) {
this.throttle();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we refactor this a little to be:

if (!this.trottled) {
  this.throttle();
  if (this.leading) {
    this.destination.next(value);
  }
}

if (this.trailing) {
  this._trailingValue = value;
  this._hasTrailingValue = true;
}

I'm not sure in it's current configuration it will work as desired, because of that else if. If it's trailing, we always want to record the value, not only if we've already started throttling.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then throttleTime would emit the same input twice on {leading:true, trailing: true}. I guess in the end it doesn't matter when throttleTime is used in conjunction with distinctUntilChanged, but it feels wrong regardless.

Observable.of(1).throttleTime(100, {leading:true, trailing: true}).subscribe((n) => console.log(n));

// 1
// 1

vs. lodash

fn = _.throttle((n) => console.log(n), 100, {leading:true, trailing: true});
fn(1);

// 1

}
}

private throttle(): void {
this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this }));
}

clearThrottle() {
const throttled = this.throttled;

if (throttled) {
throttled.unsubscribe();
this.throttled = null;

if (this.trailing && this._hasTrailingValue) {
this.destination.next(this._trailingValue);
const trailingValue = this._trailingValue;
this._trailingValue = null;
this._hasTrailingValue = false;
this.throttle();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unnecessary and changes the behavior of the operator. It's not supposed to start throttling again when the throttle is cleared.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this is exactly why I opened the PR. Because currently the operator emits 2 values each timeframe instead of 1.

If the doubleemit is intended, this PR can be closed and #2727 & #2859 rejected.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. I think there's a disconnect here. throttle() actually starts the throttle measurement. The, with leading and trailing both true, it should emit the same as it would with leading behavior, but then emit the trailing value at the end.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I okay, I actually see what you're saying now. This is to start the throttle again as soon as there's another emission (due to trailing behavior)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay... the more I look at this, the more I think it will have to be a change that lands in v 6.0, because it will be a breaking change for someone. (Albeit a fix)

this.destination.next(trailingValue);
}
throttled.unsubscribe();
this.remove(throttled);
this.throttled = null;
}
}
}
Expand Down