Skip to content

Commit

Permalink
Add resetOnSuccess option to retryBackoff operator
Browse files Browse the repository at this point in the history
  • Loading branch information
vially committed May 9, 2020
1 parent 296637b commit d1356ab
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 10 deletions.
48 changes: 48 additions & 0 deletions spec/retryBackoff-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,52 @@ describe('retryBackoff operator', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should reset the delay when resetOnSuccess is true', () => {
testScheduler.run(({ expectObservable, cold, expectSubscriptions }) => {
const source = cold('--1-2-3-#');
const subs = [
' ^-------!',
' ---------^-------!',
' ------------------^-------!',
' ---------------------------^-------!'
// interval always reset to 1 ^
];
const unsub = ' -----------------------------------!';
const expected = ' --1-2-3----1-2-3----1-2-3----1-2-3--';

expectObservable(
source.pipe(
retryBackoff({
initialInterval: 1,
resetOnSuccess: true
})
),
unsub
).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should not reset the delay on consecutive errors when resetOnSuccess is true', () => {
testScheduler.run(({ expectObservable, cold, expectSubscriptions }) => {
const source = cold('--------#');
const unsub = ' -------------------------------------!';
const subs = [
' ^-------! ',
' ---------^-------! ',
' -------------------^-------! ',
' -------------------------------^-----!'
];
const expected = ' --------------------------------------';

const result = source.pipe(retryBackoff({
initialInterval: 1,
resetOnSuccess: true
}));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
});
32 changes: 22 additions & 10 deletions src/operators/retryBackoff.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { iif, Observable, throwError, timer } from 'rxjs';
import { concatMap, retryWhen } from 'rxjs/operators';
import { concatMap, retryWhen, tap } from 'rxjs/operators';
import { exponentialBackoffDelay, getDelay } from '../utils';

import { getDelay, exponentialBackoffDelay } from '../utils';

export interface RetryBackoffConfig {
// Initial interval. It will eventually go as high as maxInterval.
Expand All @@ -10,6 +10,9 @@ export interface RetryBackoffConfig {
maxRetries?: number;
// Maximum delay between retries.
maxInterval?: number;
// When set to `true` every successful emission will reset the delay and the
// error count.
resetOnSuccess?: boolean;
// Conditional retry.
shouldRetry?: (error: any) => boolean;
backoffDelay?: (iteration: number, initialInterval: number) => number;
Expand All @@ -31,20 +34,29 @@ export function retryBackoff(
maxRetries = Infinity,
maxInterval = Infinity,
shouldRetry = () => true,
resetOnSuccess = false,
backoffDelay = exponentialBackoffDelay
} = typeof config === 'number' ? { initialInterval: config } : config;
return <T>(source: Observable<T>) =>
source.pipe(
return <T>(source: Observable<T>) => {
let index = 0;
return source.pipe(
retryWhen<T>(errors =>
errors.pipe(
concatMap((error, i) =>
iif(
() => i < maxRetries && shouldRetry(error),
timer(getDelay(backoffDelay(i, initialInterval), maxInterval)),
concatMap(error => {
const attempt = index++;
return iif(
() => attempt < maxRetries && shouldRetry(error),
timer(getDelay(backoffDelay(attempt, initialInterval), maxInterval)),
throwError(error)
)
)
})
)
)
),
tap((_: T) => {
if (resetOnSuccess) {
index = 0;
}
})
);
}
}

0 comments on commit d1356ab

Please sign in to comment.