Skip to content

Commit

Permalink
Add option to reset interval on success (#18)
Browse files Browse the repository at this point in the history
* Add build directory to gitignore

* Fix ts-node test compilation error

* Replace compilers flag with require

The mocha `--compilers` flag is deprecated in favor of `--require`:
https://github.com/mochajs/mocha/wiki/compilers-deprecation

* Add resetOnSuccess option to retryBackoff operator

* Add tests related to internal state

* Fix referential transparency issue

* fixup! Add resetOnSuccess option to retryBackoff operator

* Format retryBackoff with prettier
  • Loading branch information
vially authored May 17, 2020
1 parent ae90c16 commit 7d38283
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 24 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
node_modules
dist
dist
build
4 changes: 2 additions & 2 deletions spec/mocha.opts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
--compilers ts:ts-node/register
spec/**/*-spec.ts
--require ts-node/register
spec/**/*-spec.ts
105 changes: 101 additions & 4 deletions spec/retryBackoff-spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { expect } from 'chai';
import { retryBackoff } from '../src/index';
import { of, Observable, Observer, throwError, Subject } from 'rxjs';
import { map, mergeMap, concat, multicast, refCount } from 'rxjs/operators';
import { Observable, Observer, of, Subject, throwError } from 'rxjs';
import { concat, map, mergeMap, multicast, refCount } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { retryBackoff } from '../src/index';

describe('retryBackoff operator', () => {
let testScheduler: TestScheduler;
Expand Down Expand Up @@ -253,7 +253,7 @@ describe('retryBackoff operator', () => {
of(1, 2, 3)
.pipe(
concat(throwError('bad!')),
multicast(() => new Subject()),
multicast(() => new Subject<number>()),
refCount(),
retryBackoff({ initialInterval: 1, maxRetries: 4 })
)
Expand Down Expand Up @@ -350,4 +350,101 @@ describe('retryBackoff operator', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should be referentially transparent', () => {
testScheduler.run(({ expectObservable, cold, expectSubscriptions }) => {
const source1 = cold('--#');
const source2 = cold('--#');
const unsub = ' ---------!';
const subs = [
' ^-! ',
' ---^-! ',
' -------^-!',
];
const expected = ' ----------';

const op = retryBackoff({
initialInterval: 1,
});

expectObservable(source1.pipe(op), unsub).toBe(expected);
expectSubscriptions(source1.subscriptions).toBe(subs);

expectObservable(source2.pipe(op), unsub).toBe(expected);
expectSubscriptions(source2.subscriptions).toBe(subs);
});
});

it('should ensure interval state is per-subscription', () => {
testScheduler.run(({ expectObservable, cold, expectSubscriptions }) => {
const source = cold('--#');
const sub1 = ' ^--------!';
const sub2 = ' ----------^--------!';
const subs = [
' ^-! ',
' ---^-! ',
' -------^-!',
' ----------^-! ',
' -------------^-! ',
' -----------------^-!',
];
const expected = ' ----------';

const result = source.pipe(retryBackoff({
initialInterval: 1,
}));

expectObservable(result, sub1).toBe(expected);
expectObservable(result, sub2).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should reset the delay when resetOnSuccess is true', () => {
testScheduler.run(({ expectObservable, cold, expectSubscriptions }) => {
const source = cold('--1-2-3-#');
const subs = [
' ^-------!',
' ---------^-------!',
' ------------------^-------!',
' ---------------------------^-------!'
// interval always reset to 1 ^
];
const unsub = ' -----------------------------------!';
const expected = ' --1-2-3----1-2-3----1-2-3----1-2-3--';

expectObservable(
source.pipe(
retryBackoff({
initialInterval: 1,
resetOnSuccess: true
})
),
unsub
).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});

it('should not reset the delay on consecutive errors when resetOnSuccess is true', () => {
testScheduler.run(({ expectObservable, cold, expectSubscriptions }) => {
const source = cold('--------#');
const unsub = ' -------------------------------------!';
const subs = [
' ^-------! ',
' ---------^-------! ',
' -------------------^-------! ',
' -------------------------------^-----!'
];
const expected = ' --------------------------------------';

const result = source.pipe(retryBackoff({
initialInterval: 1,
resetOnSuccess: true
}));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
});
48 changes: 31 additions & 17 deletions src/operators/retryBackoff.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { iif, Observable, throwError, timer } from 'rxjs';
import { concatMap, retryWhen } from 'rxjs/operators';

import { getDelay, exponentialBackoffDelay } from '../utils';
import { defer, iif, Observable, throwError, timer } from 'rxjs';
import { concatMap, retryWhen, tap } from 'rxjs/operators';
import { exponentialBackoffDelay, getDelay } from '../utils';

export interface RetryBackoffConfig {
// Initial interval. It will eventually go as high as maxInterval.
Expand All @@ -10,6 +9,9 @@ export interface RetryBackoffConfig {
maxRetries?: number;
// Maximum delay between retries.
maxInterval?: number;
// When set to `true` every successful emission will reset the delay and the
// error count.
resetOnSuccess?: boolean;
// Conditional retry.
shouldRetry?: (error: any) => boolean;
backoffDelay?: (iteration: number, initialInterval: number) => number;
Expand All @@ -31,20 +33,32 @@ export function retryBackoff(
maxRetries = Infinity,
maxInterval = Infinity,
shouldRetry = () => true,
backoffDelay = exponentialBackoffDelay
resetOnSuccess = false,
backoffDelay = exponentialBackoffDelay,
} = typeof config === 'number' ? { initialInterval: config } : config;
return <T>(source: Observable<T>) =>
source.pipe(
retryWhen<T>(errors =>
errors.pipe(
concatMap((error, i) =>
iif(
() => i < maxRetries && shouldRetry(error),
timer(getDelay(backoffDelay(i, initialInterval), maxInterval)),
throwError(error)
)
defer(() => {
let index = 0;
return source.pipe(
retryWhen<T>(errors =>
errors.pipe(
concatMap(error => {
const attempt = index++;
return iif(
() => attempt < maxRetries && shouldRetry(error),
timer(
getDelay(backoffDelay(attempt, initialInterval), maxInterval)
),
throwError(error)
);
})
)
)
)
);
),
tap(() => {
if (resetOnSuccess) {
index = 0;
}
})
);
});
}

0 comments on commit 7d38283

Please sign in to comment.