Skip to content

Commit

Permalink
fix: reset retryWhen state on recover
Browse files Browse the repository at this point in the history
Fixes RxJS issue ReactiveX/rxjs#1413
  • Loading branch information
jiayihu committed May 27, 2017
1 parent 60cee88 commit b2bc404
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 19 deletions.
76 changes: 63 additions & 13 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,98 @@ function isPageActive(): boolean {
}

export interface IOptions {
/**
* Period of the interval to run the source$
*/
interval: number;

/**
* How many attempts on error, before throwing definitely to polling subscriber
*/
attempts?: number;

/**
* Strategy taken on source$ errors, with attempts to recover.
*
* 'esponential' will retry waiting an increasing esponential time between attempts.
* You can pass the unit amount, which will be multiplied to the esponential factor.
*
* 'random' will retry waiting a random time between attempts. You can pass the range of randomness.
*
* 'consecutive' will retry waiting a constant time between attempts. You can
* pass the constant, otherwise the polling interval will be used.
*/
backoffStrategy?: 'esponential' | 'random' | 'consecutive';

/**
* Esponential delay factors (2, 4, 16, 32...) will be multiplied to the unit
* to get final amount if 'esponential' strategy is used.
*/
esponentialUnit?: number;

/**
* Range of milli-seconds to pick a random delay between error retries if 'random'
* strategy is used.
*/
randomRange?: [number, number];

/**
* Constant time to delay error retries if 'consecutive' strategy is used
*/
constantTime?: number;
}

const defaultOptions = {
const defaultOptions: Partial<IOptions> = {
attempts: 9,
backoffStrategy: 'esponential',
esponentialUnit: 1000, // 1 second
randomRange: [1000, 10000],
};

/**
* Run a polling stream for the source$
* @param source$ Observable to fetch the data
* @param interval Period of the polling
* @param attempts Number of times to retry. The last retry attempt will wait for 2^attempts seconds.
* @param request$ Source Observable which will be ran every interval
* @param userOptions Polling options
* @param scheduler Scheduler of internal timers. Useful for testing.
*/
export default function polling<T>(
request$: Observable<T>,
userOptions: IOptions,
scheduler?: Scheduler,
): Observable<T> {
export default function polling<T>( request$: Observable<T>, userOptions: IOptions, scheduler?: Scheduler ): Observable<T> {
const options = Object.assign({}, defaultOptions, userOptions);

/**
* Currently any new error, after recover, continues the series of increasing
* delays, like 2 consequent errors would do. This is a bug of RxJS. To workaround
* the issue we use the difference with the counter value at the last recover.
* @see https://github.com/ReactiveX/rxjs/issues/1413
*/
let allErrorsCount = 0;
let lastRecoverCount = 0;

return Observable.fromEvent(document, 'visibilitychange')
.startWith(null)
.switchMap(() => {
if (isPageActive()) {
return Observable.interval(options.interval, scheduler)
.startWith(null) // Immediately run the first call
.switchMap(() => request$)
.retryWhen(errors => {
return errors.scan((errorCount, err) => {
.retryWhen(errors$ => {
return errors$.scan((errorCount, err) => {
// If already tempted too many times don't retry
if (errorCount >= options.attempts) throw err;

return errorCount + 1;
}, 0).switchMap(errorCount => {
const esponentialDelay = Math.pow(2, errorCount) * 1000;
allErrorsCount = errorCount;
const consecutiveErrorsCount = allErrorsCount - lastRecoverCount;
const esponentialDelay = Math.pow(2, consecutiveErrorsCount) * options.esponentialUnit;

return Observable.timer(esponentialDelay);
return Observable.timer(esponentialDelay, null, scheduler);
});
});
}

return Observable.empty();
}).do(() => {
// Update the counter after every successful polling
lastRecoverCount = allErrorsCount;
});
}
19 changes: 13 additions & 6 deletions test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,24 @@ describe('Backoff behaviour', function() {
});

test('It should retry on error', () => {
const source$ = scheduler.createColdObservable('-1-2-#');
const expected = '-1-2----1-(2|)';
const polling$ = polling(source$, { interval: 60, esponentialUnit: 10 }, scheduler).take(4);

scheduler.expectObservable(polling$).toBe(expected, { 1: '1', 2: '2' });
scheduler.flush();
});

test('It should reset delays on not consecutive errors', () => {
/**
* This test is a bit tricky. It tests that the source$ errored only once
* and that the error has been recovered. It MUST although avoid erroring twice
* because `.retryWhen` doesn't reset its state after recover. This cause the
* second error to continue the series of increasing delays, like 2 consequent
* `.retryWhen` doesn't reset its state after a recover. This cause the
* next error to continue the series of increasing delays, like 2 consecutive
* errors would do.
* @see https://github.com/ReactiveX/rxjs/issues/1413
*/
const source$ = scheduler.createColdObservable('-1-2-#');
const expected = '-1-2----1-(2|)';
const polling$ = polling(source$, { interval: 60, esponentialUnit: 10 }, scheduler).take(4);
const expected = '-1-2----1-2----(1|)';
const polling$ = polling(source$, { interval: 60, esponentialUnit: 10 }, scheduler).take(5);

scheduler.expectObservable(polling$).toBe(expected, { 1: '1', 2: '2' });
scheduler.flush();
Expand Down

0 comments on commit b2bc404

Please sign in to comment.