From b2bc404d408515a5cc7ce246d75eeaa741706d62 Mon Sep 17 00:00:00 2001 From: Jiayi Hu Date: Sat, 27 May 2017 17:05:51 +0200 Subject: [PATCH] fix: reset `retryWhen` state on recover Fixes RxJS issue https://github.com/ReactiveX/rxjs/issues/1413 --- index.ts | 76 ++++++++++++++++++++++++++++++++++++++-------- test/index.spec.ts | 19 ++++++++---- 2 files changed, 76 insertions(+), 19 deletions(-) diff --git a/index.ts b/index.ts index 5e2970c..ee50293 100644 --- a/index.ts +++ b/index.ts @@ -18,27 +18,72 @@ function isPageActive(): boolean { } export interface IOptions { + /** + * Period of the interval to run the source$ + */ interval: number; + + /** + * How many attempts on error, before throwing definitely to polling subscriber + */ attempts?: number; + + /** + * Strategy taken on source$ errors, with attempts to recover. + * + * 'esponential' will retry waiting an increasing esponential time between attempts. + * You can pass the unit amount, which will be multiplied to the esponential factor. + * + * 'random' will retry waiting a random time between attempts. You can pass the range of randomness. + * + * 'consecutive' will retry waiting a constant time between attempts. You can + * pass the constant, otherwise the polling interval will be used. + */ + backoffStrategy?: 'esponential' | 'random' | 'consecutive'; + + /** + * Esponential delay factors (2, 4, 16, 32...) will be multiplied to the unit + * to get final amount if 'esponential' strategy is used. + */ + esponentialUnit?: number; + + /** + * Range of milli-seconds to pick a random delay between error retries if 'random' + * strategy is used. + */ + randomRange?: [number, number]; + + /** + * Constant time to delay error retries if 'consecutive' strategy is used + */ + constantTime?: number; } -const defaultOptions = { +const defaultOptions: Partial = { attempts: 9, + backoffStrategy: 'esponential', + esponentialUnit: 1000, // 1 second + randomRange: [1000, 10000], }; /** * Run a polling stream for the source$ - * @param source$ Observable to fetch the data - * @param interval Period of the polling - * @param attempts Number of times to retry. The last retry attempt will wait for 2^attempts seconds. + * @param request$ Source Observable which will be ran every interval + * @param userOptions Polling options + * @param scheduler Scheduler of internal timers. Useful for testing. */ -export default function polling( - request$: Observable, - userOptions: IOptions, - scheduler?: Scheduler, -): Observable { +export default function polling( request$: Observable, userOptions: IOptions, scheduler?: Scheduler ): Observable { const options = Object.assign({}, defaultOptions, userOptions); + /** + * Currently any new error, after recover, continues the series of increasing + * delays, like 2 consequent errors would do. This is a bug of RxJS. To workaround + * the issue we use the difference with the counter value at the last recover. + * @see https://github.com/ReactiveX/rxjs/issues/1413 + */ + let allErrorsCount = 0; + let lastRecoverCount = 0; + return Observable.fromEvent(document, 'visibilitychange') .startWith(null) .switchMap(() => { @@ -46,20 +91,25 @@ export default function polling( return Observable.interval(options.interval, scheduler) .startWith(null) // Immediately run the first call .switchMap(() => request$) - .retryWhen(errors => { - return errors.scan((errorCount, err) => { + .retryWhen(errors$ => { + return errors$.scan((errorCount, err) => { // If already tempted too many times don't retry if (errorCount >= options.attempts) throw err; return errorCount + 1; }, 0).switchMap(errorCount => { - const esponentialDelay = Math.pow(2, errorCount) * 1000; + allErrorsCount = errorCount; + const consecutiveErrorsCount = allErrorsCount - lastRecoverCount; + const esponentialDelay = Math.pow(2, consecutiveErrorsCount) * options.esponentialUnit; - return Observable.timer(esponentialDelay); + return Observable.timer(esponentialDelay, null, scheduler); }); }); } return Observable.empty(); + }).do(() => { + // Update the counter after every successful polling + lastRecoverCount = allErrorsCount; }); } diff --git a/test/index.spec.ts b/test/index.spec.ts index f0dd1f6..158cb52 100644 --- a/test/index.spec.ts +++ b/test/index.spec.ts @@ -102,17 +102,24 @@ describe('Backoff behaviour', function() { }); test('It should retry on error', () => { + const source$ = scheduler.createColdObservable('-1-2-#'); + const expected = '-1-2----1-(2|)'; + const polling$ = polling(source$, { interval: 60, esponentialUnit: 10 }, scheduler).take(4); + + scheduler.expectObservable(polling$).toBe(expected, { 1: '1', 2: '2' }); + scheduler.flush(); + }); + + test('It should reset delays on not consecutive errors', () => { /** - * This test is a bit tricky. It tests that the source$ errored only once - * and that the error has been recovered. It MUST although avoid erroring twice - * because `.retryWhen` doesn't reset its state after recover. This cause the - * second error to continue the series of increasing delays, like 2 consequent + * `.retryWhen` doesn't reset its state after a recover. This cause the + * next error to continue the series of increasing delays, like 2 consecutive * errors would do. * @see https://github.com/ReactiveX/rxjs/issues/1413 */ const source$ = scheduler.createColdObservable('-1-2-#'); - const expected = '-1-2----1-(2|)'; - const polling$ = polling(source$, { interval: 60, esponentialUnit: 10 }, scheduler).take(4); + const expected = '-1-2----1-2----(1|)'; + const polling$ = polling(source$, { interval: 60, esponentialUnit: 10 }, scheduler).take(5); scheduler.expectObservable(polling$).toBe(expected, { 1: '1', 2: '2' }); scheduler.flush();