Skip to content

Commit

Permalink
Merge pull request #3176 from benlesh/smaller-timeout
Browse files Browse the repository at this point in the history
refactor(timeout): implement in terms of timeoutWith
  • Loading branch information
benlesh authored Mar 30, 2018
2 parents 92ce14b + dd0cc7c commit 21ac689
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 71 deletions.
2 changes: 1 addition & 1 deletion compat/operator/timeoutWith.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ export function timeoutWith<T, R>(this: Observable<T>, due: number | Date, withO
export function timeoutWith<T, R>(this: Observable<T>, due: number | Date,
withObservable: ObservableInput<R>,
scheduler: SchedulerLike = asyncScheduler): Observable<T | R> {
return higherOrder(due, withObservable, scheduler)(this as any);
return higherOrder<T, R>(due, withObservable, scheduler)(this as any);
}
72 changes: 3 additions & 69 deletions src/internal/operators/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { TimeoutError } from '../util/TimeoutError';
import { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types';
import { timeoutWith } from './timeoutWith';
import { throwError } from '../observable/throwError';

/**
*
Expand Down Expand Up @@ -73,73 +75,5 @@ import { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic
*/
export function timeout<T>(due: number | Date,
scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T> {
const absoluteTimeout = isDate(due);
const waitFor = absoluteTimeout ? (+due - scheduler.now()) : Math.abs(<number>due);
return (source: Observable<T>) => source.lift(new TimeoutOperator(waitFor, absoluteTimeout, scheduler, new TimeoutError()));
}

class TimeoutOperator<T> implements Operator<T, T> {
constructor(private waitFor: number,
private absoluteTimeout: boolean,
private scheduler: SchedulerLike,
private errorInstance: TimeoutError) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new TimeoutSubscriber<T>(
subscriber, this.absoluteTimeout, this.waitFor, this.scheduler, this.errorInstance
));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class TimeoutSubscriber<T> extends Subscriber<T> {

private action: SchedulerAction<TimeoutSubscriber<T>> = null;

constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
private waitFor: number,
private scheduler: SchedulerLike,
private errorInstance: TimeoutError) {
super(destination);
this.scheduleTimeout();
}

private static dispatchTimeout<T>(subscriber: TimeoutSubscriber<T>): void {
subscriber.error(subscriber.errorInstance);
}

private scheduleTimeout(): void {
const { action } = this;
if (action) {
// Recycle the action if we've already scheduled one. All the production
// Scheduler Actions mutate their state/delay time and return themeselves.
// VirtualActions are immutable, so they create and return a clone. In this
// case, we need to set the action reference to the most recent VirtualAction,
// to ensure that's the one we clone from next time.
this.action = (<SchedulerAction<TimeoutSubscriber<T>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<SchedulerAction<TimeoutSubscriber<T>>> this.scheduler.schedule<TimeoutSubscriber<T>>(
TimeoutSubscriber.dispatchTimeout, this.waitFor, this
)));
}
}

protected _next(value: T): void {
if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
super._next(value);
}

protected _unsubscribe() {
this.action = null;
this.scheduler = null;
this.errorInstance = null;
}
return timeoutWith(due, throwError(new TimeoutError()), scheduler);
}
1 change: 0 additions & 1 deletion src/internal/operators/timeoutWith.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { subscribeToResult } from '../util/subscribeToResult';
import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction, SchedulerAction, SchedulerLike, TeardownLogic } from '../types';

/* tslint:disable:max-line-length */
export function timeoutWith<T>(due: number | Date, withObservable: ObservableInput<T>, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export function timeoutWith<T, R>(due: number | Date, withObservable: ObservableInput<R>, scheduler?: SchedulerLike): OperatorFunction<T, T | R>;
/* tslint:enable:max-line-length */

Expand Down

0 comments on commit 21ac689

Please sign in to comment.