Skip to content

Commit a542693

Browse files
committed
feat: circuit breaker for usage reporting
1 parent a606559 commit a542693

File tree

4 files changed

+100
-17
lines changed

4 files changed

+100
-17
lines changed

packages/libraries/core/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
"async-retry": "^1.3.3",
5151
"js-md5": "0.8.3",
5252
"lodash.sortby": "^4.7.0",
53+
"opossum": "^9.0.0",
5354
"tiny-lru": "^8.0.2"
5455
},
5556
"devDependencies": {
@@ -58,6 +59,7 @@
5859
"@types/async-retry": "1.4.8",
5960
"@types/js-md5": "0.8.0",
6061
"@types/lodash.sortby": "4.7.9",
62+
"@types/opossum": "8.1.9",
6163
"graphql": "16.9.0",
6264
"nock": "14.0.10",
6365
"tslib": "2.8.1",

packages/libraries/core/src/client/agent.ts

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,33 @@
1+
import CircuitBreaker from 'opossum';
2+
import { fetch as defaultFetch } from '@whatwg-node/fetch';
13
import { version } from '../version.js';
24
import { http } from './http-client.js';
35
import type { Logger } from './types.js';
46

57
type ReadOnlyResponse = Pick<Response, 'status' | 'text' | 'json' | 'statusText'>;
68

9+
export type AgentCircuitBreakerConfiguration = {
10+
/** after which time a request should be treated as a timeout in milleseconds */
11+
timeout: number;
12+
/** percentage after what the circuit breaker should kick in. */
13+
errorThresholdPercentage: number;
14+
/** count of requests before starting evaluating. */
15+
volumeThreshold: number;
16+
/** after what time the circuit breaker is resetted in milliseconds */
17+
resetTimeout: number;
18+
};
19+
20+
const defaultCircuitBreakerConfiguration: AgentCircuitBreakerConfiguration = {
21+
// if call takes > 5s, count as a failure
22+
timeout: 5000,
23+
// trip if 50% of calls fail
24+
errorThresholdPercentage: 50,
25+
// need at least 5 calls before evaluating
26+
volumeThreshold: 5,
27+
// after 30s, try half-open state
28+
resetTimeout: 30000,
29+
};
30+
731
export interface AgentOptions {
832
enabled?: boolean;
933
name?: string;
@@ -47,7 +71,11 @@ export interface AgentOptions {
4771
* WHATWG Compatible fetch implementation
4872
* used by the agent to send reports
4973
*/
50-
fetch?: typeof fetch;
74+
fetch?: typeof defaultFetch;
75+
/**
76+
* Circuit Breaker Configuration
77+
*/
78+
circuitBreaker?: AgentCircuitBreakerConfiguration;
5179
}
5280

5381
export function createAgent<TEvent>(
@@ -76,12 +104,13 @@ export function createAgent<TEvent>(
76104
maxSize: 25,
77105
logger: console,
78106
name: 'hive-client',
107+
circuitBreaker: defaultCircuitBreakerConfiguration,
79108
...pluginOptions,
80109
};
81110

82111
const enabled = options.enabled !== false;
83112

84-
let timeoutID: any = null;
113+
let timeoutID: ReturnType<typeof setTimeout> | null = null;
85114

86115
function schedule() {
87116
if (timeoutID) {
@@ -131,20 +160,22 @@ export function createAgent<TEvent>(
131160

132161
if (data.size() >= options.maxSize) {
133162
debugLog('Sending immediately');
134-
setImmediate(() => send({ throwOnError: false, skipSchedule: true }));
163+
setImmediate(() => breaker.fire({ throwOnError: false, skipSchedule: true }));
135164
}
136165
}
137166

138167
function sendImmediately(event: TEvent): Promise<ReadOnlyResponse | null> {
139168
data.set(event);
140169
debugLog('Sending immediately');
141-
return send({ throwOnError: true, skipSchedule: true });
170+
return breaker.fire({ throwOnError: true, skipSchedule: true });
142171
}
143172

144173
async function send(sendOptions?: {
145174
throwOnError?: boolean;
146175
skipSchedule: boolean;
147176
}): Promise<ReadOnlyResponse | null> {
177+
const signal: AbortSignal = breaker.getSignal();
178+
148179
if (!data.size() || !enabled) {
149180
if (!sendOptions?.skipSchedule) {
150181
schedule();
@@ -174,6 +205,7 @@ export function createAgent<TEvent>(
174205
},
175206
logger: options.logger,
176207
fetchImplementation: pluginOptions.fetch,
208+
signal,
177209
})
178210
.then(res => {
179211
debugLog(`Report sent!`);
@@ -207,12 +239,21 @@ export function createAgent<TEvent>(
207239
await Promise.all(inProgressCaptures);
208240
}
209241

210-
await send({
242+
await breaker.fire({
211243
skipSchedule: true,
212244
throwOnError: false,
213245
});
214246
}
215247

248+
const breaker = new CircuitBreaker(send, {
249+
...options.circuitBreaker,
250+
autoRenewAbortController: true,
251+
});
252+
253+
breaker.on('open', () => errorLog('circuit opened - backend unreachable'));
254+
breaker.on('halfOpen', () => debugLog('testing backend connectivity'));
255+
breaker.on('close', () => debugLog('backend recovered - circuit closed'));
256+
216257
return {
217258
capture,
218259
sendImmediately,

packages/libraries/core/src/client/http-client.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ interface SharedConfig {
2121
* @default {response => response.ok}
2222
**/
2323
isRequestOk?: ResponseAssertFunction;
24+
/** Optional abort signal */
25+
signal?: AbortSignal;
2426
}
2527

2628
/**
@@ -78,6 +80,8 @@ export async function makeFetchCall(
7880
* @default {response => response.ok}
7981
**/
8082
isRequestOk?: ResponseAssertFunction;
83+
/** Optional abort signal */
84+
signal?: AbortSignal;
8185
},
8286
): Promise<Response> {
8387
const logger = config.logger;
@@ -104,7 +108,10 @@ export async function makeFetchCall(
104108
);
105109

106110
const getDuration = measureTime();
107-
const signal = AbortSignal.timeout(config.timeout ?? 20_000);
111+
const timeoutSignal = AbortSignal.timeout(config.timeout ?? 20_000);
112+
const signal = config.signal
113+
? AbortSignal.any([config.signal, timeoutSignal])
114+
: timeoutSignal;
108115

109116
const response = await (config.fetchImplementation ?? fetch)(endpoint, {
110117
method: config.method,
@@ -135,6 +142,12 @@ export async function makeFetchCall(
135142
throw new Error(`Unexpected HTTP error. (x-request-id=${requestId})`, { cause: error });
136143
});
137144

145+
if (config.signal?.aborted === true) {
146+
// TODO: maybe log some message?
147+
bail(new Error('Request aborted.'));
148+
return;
149+
}
150+
138151
if (isRequestOk(response)) {
139152
logger?.info(
140153
`${config.method} ${endpoint} (x-request-id=${requestId}) succeeded with status ${response.status} ${getDuration()}.`,

pnpm-lock.yaml

Lines changed: 38 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)