diff --git a/.gitignore b/.gitignore index 76add87..63520da 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ node_modules -dist \ No newline at end of file +dist +build diff --git a/spec/mocha.opts b/spec/mocha.opts index 030e030..ee364a5 100644 --- a/spec/mocha.opts +++ b/spec/mocha.opts @@ -1,2 +1,2 @@ ---compilers ts:ts-node/register -spec/**/*-spec.ts \ No newline at end of file +--require ts-node/register +spec/**/*-spec.ts diff --git a/spec/retryBackoff-spec.ts b/spec/retryBackoff-spec.ts index e05b540..0435ae8 100644 --- a/spec/retryBackoff-spec.ts +++ b/spec/retryBackoff-spec.ts @@ -1,8 +1,8 @@ import { expect } from 'chai'; -import { retryBackoff } from '../src/index'; -import { of, Observable, Observer, throwError, Subject } from 'rxjs'; -import { map, mergeMap, concat, multicast, refCount } from 'rxjs/operators'; +import { Observable, Observer, of, Subject, throwError } from 'rxjs'; +import { concat, map, mergeMap, multicast, refCount } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; +import { retryBackoff } from '../src/index'; describe('retryBackoff operator', () => { let testScheduler: TestScheduler; @@ -253,7 +253,7 @@ describe('retryBackoff operator', () => { of(1, 2, 3) .pipe( concat(throwError('bad!')), - multicast(() => new Subject()), + multicast(() => new Subject()), refCount(), retryBackoff({ initialInterval: 1, maxRetries: 4 }) ) @@ -350,4 +350,101 @@ describe('retryBackoff operator', () => { expectSubscriptions(source.subscriptions).toBe(subs); }); }); + + it('should be referentially transparent', () => { + testScheduler.run(({ expectObservable, cold, expectSubscriptions }) => { + const source1 = cold('--#'); + const source2 = cold('--#'); + const unsub = ' ---------!'; + const subs = [ + ' ^-! ', + ' ---^-! ', + ' -------^-!', + ]; + const expected = ' ----------'; + + const op = retryBackoff({ + initialInterval: 1, + }); + + expectObservable(source1.pipe(op), unsub).toBe(expected); + expectSubscriptions(source1.subscriptions).toBe(subs); + + expectObservable(source2.pipe(op), unsub).toBe(expected); + expectSubscriptions(source2.subscriptions).toBe(subs); + }); + }); + + it('should ensure interval state is per-subscription', () => { + testScheduler.run(({ expectObservable, cold, expectSubscriptions }) => { + const source = cold('--#'); + const sub1 = ' ^--------!'; + const sub2 = ' ----------^--------!'; + const subs = [ + ' ^-! ', + ' ---^-! ', + ' -------^-!', + ' ----------^-! ', + ' -------------^-! ', + ' -----------------^-!', + ]; + const expected = ' ----------'; + + const result = source.pipe(retryBackoff({ + initialInterval: 1, + })); + + expectObservable(result, sub1).toBe(expected); + expectObservable(result, sub2).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); + + it('should reset the delay when resetOnSuccess is true', () => { + testScheduler.run(({ expectObservable, cold, expectSubscriptions }) => { + const source = cold('--1-2-3-#'); + const subs = [ + ' ^-------!', + ' ---------^-------!', + ' ------------------^-------!', + ' ---------------------------^-------!' + // interval always reset to 1 ^ + ]; + const unsub = ' -----------------------------------!'; + const expected = ' --1-2-3----1-2-3----1-2-3----1-2-3--'; + + expectObservable( + source.pipe( + retryBackoff({ + initialInterval: 1, + resetOnSuccess: true + }) + ), + unsub + ).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); + + it('should not reset the delay on consecutive errors when resetOnSuccess is true', () => { + testScheduler.run(({ expectObservable, cold, expectSubscriptions }) => { + const source = cold('--------#'); + const unsub = ' -------------------------------------!'; + const subs = [ + ' ^-------! ', + ' ---------^-------! ', + ' -------------------^-------! ', + ' -------------------------------^-----!' + ]; + const expected = ' --------------------------------------'; + + const result = source.pipe(retryBackoff({ + initialInterval: 1, + resetOnSuccess: true + })); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + }); }); diff --git a/src/operators/retryBackoff.ts b/src/operators/retryBackoff.ts index f7cd394..7bbde31 100644 --- a/src/operators/retryBackoff.ts +++ b/src/operators/retryBackoff.ts @@ -1,7 +1,6 @@ -import { iif, Observable, throwError, timer } from 'rxjs'; -import { concatMap, retryWhen } from 'rxjs/operators'; - -import { getDelay, exponentialBackoffDelay } from '../utils'; +import { defer, iif, Observable, throwError, timer } from 'rxjs'; +import { concatMap, retryWhen, tap } from 'rxjs/operators'; +import { exponentialBackoffDelay, getDelay } from '../utils'; export interface RetryBackoffConfig { // Initial interval. It will eventually go as high as maxInterval. @@ -10,6 +9,9 @@ export interface RetryBackoffConfig { maxRetries?: number; // Maximum delay between retries. maxInterval?: number; + // When set to `true` every successful emission will reset the delay and the + // error count. + resetOnSuccess?: boolean; // Conditional retry. shouldRetry?: (error: any) => boolean; backoffDelay?: (iteration: number, initialInterval: number) => number; @@ -31,20 +33,32 @@ export function retryBackoff( maxRetries = Infinity, maxInterval = Infinity, shouldRetry = () => true, - backoffDelay = exponentialBackoffDelay + resetOnSuccess = false, + backoffDelay = exponentialBackoffDelay, } = typeof config === 'number' ? { initialInterval: config } : config; return (source: Observable) => - source.pipe( - retryWhen(errors => - errors.pipe( - concatMap((error, i) => - iif( - () => i < maxRetries && shouldRetry(error), - timer(getDelay(backoffDelay(i, initialInterval), maxInterval)), - throwError(error) - ) + defer(() => { + let index = 0; + return source.pipe( + retryWhen(errors => + errors.pipe( + concatMap(error => { + const attempt = index++; + return iif( + () => attempt < maxRetries && shouldRetry(error), + timer( + getDelay(backoffDelay(attempt, initialInterval), maxInterval) + ), + throwError(error) + ); + }) ) - ) - ) - ); + ), + tap(() => { + if (resetOnSuccess) { + index = 0; + } + }) + ); + }); }