Skip to content

Commit

Permalink
chore: Address comments and add comments to the code
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Sep 3, 2020
1 parent 6481175 commit d256401
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 11 deletions.
8 changes: 4 additions & 4 deletions spec/operators/throttleTime-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ describe('throttleTime operator', () => {

it('should emit the last throttled value when complete', () => {
rxTest.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' -a-xy-----b--x--cxx|');
const e1subs = ' ^------------------!';
const t = time(' ----| ');
const e1 = hot(' -a-xy-----b--x--cxx-|');
const e1subs = ' ^-------------------!';
const t = time(' ----| ');
// ----|---|----|---|---|
const expected = '-----y--------x---x---|';
const expected = '-----y--------x---x-|';

const result = e1.pipe(throttleTime(t, rxTest, { leading: false, trailing: true }));

Expand Down
38 changes: 31 additions & 7 deletions src/internal/operators/throttleTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,6 @@ class ThrottleTimeOperator<T> implements Operator<T, T> {
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class ThrottleTimeSubscriber<T> extends Subscriber<T> {
private throttled: Subscription | null = null;
private trailingValue: T | null = null;
Expand All @@ -125,34 +120,58 @@ class ThrottleTimeSubscriber<T> extends Subscriber<T> {
protected _next(value: T) {
const { destination } = this;
if (this.throttled) {
// There is already a throttle timer going,
// we don't emit in this case.
if (this.trailing) {
// Update the trailing value, so that when the
// throttle executes, it has a trailing value to emit.
this.trailingValue = value;
this.hasTrailingValue = true;
}
} else {
if (this.leading) {
// With the leading behavior, if we're not
// throttled we can emit the value right away,
// but then we have to start the throttle.
destination.next(value);
} else if (this.trailing) {
// We're NOT leading, but we are trailing, then
// we need to set the trailing value so once the throttle
// is done we can emit it. We don't do this if we're leading
// as well, because that would result in a double emission.
this.trailingValue = value;
this.hasTrailingValue = true;
}

this.throttle();
}
}

/**
* Schedules the throttle delay.
*/
private throttle() {
const { destination } = this;
(destination as Subscription).add(
(this.throttled = this.scheduler.schedule(() => {
this.throttled = null;
const { trailing, trailingValue, hasTrailingValue } = this;
if (trailing && hasTrailingValue) {
// We're trailing, so emit the trailing value if
// we have one.
this.hasTrailingValue = false;
this.trailingValue = null;
destination.next(trailingValue);

// If we have emitted a value, though, we need
// to make sure that we throttle to keep the emitted
// values spaced out by the given throttle.
this.throttle();
}

if (this.isComplete) {
// The source completed, we can't get any more values
// so we can complete, which will tear everything down.
destination.complete();
}
}, this.duration))
Expand All @@ -161,8 +180,13 @@ class ThrottleTimeSubscriber<T> extends Subscriber<T> {

protected _complete() {
this.isComplete = true;
const { trailing, throttled, destination } = this;
if (!throttled || !trailing) {
const { trailing, throttled, hasTrailingValue, destination } = this;
// If we're not throttled, we close because there's clearly nothing we're waiting for.
// If we're not using trailing values, we can close, because there couldn't be an trailing values
// trapped that we want to emit. And even if we are using trailing values, if the source
// completes, and we don't have a trailing value yet, we can complete because it cannot
// possibly provide one at that poin.
if (!throttled || !trailing || !hasTrailingValue) {
destination.complete();
}
this.unsubscribe();
Expand Down

0 comments on commit d256401

Please sign in to comment.