Skip to content

Commit d1320b8

Browse files
authored
feat(js-sdk): circuit breaker for usage reporting (#7259)
1 parent 717b5aa commit d1320b8

File tree

9 files changed

+362
-78
lines changed

9 files changed

+362
-78
lines changed

.changeset/beige-teams-spend.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
---
2+
'@graphql-hive/envelop': minor
3+
'@graphql-hive/apollo': minor
4+
'@graphql-hive/core': minor
5+
'@graphql-hive/yoga': minor
6+
---
7+
8+
Support circuit breaking for usage reporting.
9+
10+
Circuit breaking is a fault-tolerance pattern that prevents a system from repeatedly calling a failing service. When errors or timeouts exceed a set threshold, the circuit “opens,” blocking further requests until the service recovers.
11+
12+
This ensures that during a network issue or outage, the service using the Hive SDK remains healthy and is not overwhelmed by failed usage reports or repeated retries.
13+
14+
```ts
15+
import { createClient } from "@graphql-hive/core"
16+
17+
const client = createClient({
18+
agent: {
19+
circuitBreaker: {
20+
/**
21+
* Count of requests before starting evaluating.
22+
* Default: 5
23+
*/
24+
volumeThreshold: 5,
25+
/**
26+
* Percentage of requests failing before the circuit breaker kicks in.
27+
* Default: 50
28+
*/
29+
errorThresholdPercentage: 1,
30+
/**
31+
* After what time the circuit breaker is attempting to retry sending requests in milliseconds
32+
* Default: 30_000
33+
*/
34+
resetTimeout: 10_000,
35+
},
36+
}
37+
})
38+
```

packages/libraries/core/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,13 @@
4545
"graphql": "^0.13.0 || ^14.0.0 || ^15.0.0 || ^16.0.0"
4646
},
4747
"dependencies": {
48+
"@graphql-hive/signal": "^2.0.0",
4849
"@graphql-tools/utils": "^10.0.0",
4950
"@whatwg-node/fetch": "^0.10.6",
5051
"async-retry": "^1.3.3",
5152
"js-md5": "0.8.3",
5253
"lodash.sortby": "^4.7.0",
54+
"opossum": "^9.0.0",
5355
"tiny-lru": "^8.0.2"
5456
},
5557
"devDependencies": {
@@ -58,6 +60,7 @@
5860
"@types/async-retry": "1.4.8",
5961
"@types/js-md5": "0.8.0",
6062
"@types/lodash.sortby": "4.7.9",
63+
"@types/opossum": "8.1.9",
6164
"graphql": "16.9.0",
6265
"nock": "14.0.10",
6366
"tslib": "2.8.1",
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
*
3+
* Just a small playground to play around with different scenarios arounf the agent.
4+
* You can run it like this: `bun run --watch packages/libraries/core/playground/agent-circuit-breaker.ts`
5+
*/
6+
7+
import { createAgent } from '../src/client/agent.js';
8+
9+
let data: Array<{}> = [];
10+
11+
const agent = createAgent<{}>(
12+
{
13+
debug: true,
14+
endpoint: 'http://127.0.0.1',
15+
token: 'noop',
16+
async fetch(_url, _opts) {
17+
// throw new Error('FAIL FAIL');
18+
console.log('SENDING!');
19+
return new Response('ok', {
20+
status: 200,
21+
});
22+
},
23+
circuitBreaker: {
24+
errorThresholdPercentage: 1,
25+
resetTimeout: 10_000,
26+
volumeThreshold: 0,
27+
},
28+
maxSize: 1,
29+
maxRetries: 0,
30+
},
31+
{
32+
body() {
33+
data = [];
34+
return String(data);
35+
},
36+
data: {
37+
clear() {
38+
data = [];
39+
},
40+
size() {
41+
return data.length;
42+
},
43+
set(d) {
44+
data.push(d);
45+
},
46+
},
47+
},
48+
);
49+
50+
setInterval(() => {
51+
agent.capture({});
52+
}, 1_000);

packages/libraries/core/src/client/agent.ts

Lines changed: 135 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,35 @@
1+
import { fetch as defaultFetch } from '@whatwg-node/fetch';
12
import { version } from '../version.js';
23
import { http } from './http-client.js';
34
import type { Logger } from './types.js';
5+
import { CircuitBreakerInterface, createHiveLogger, loadCircuitBreaker } from './utils.js';
46

57
type ReadOnlyResponse = Pick<Response, 'status' | 'text' | 'json' | 'statusText'>;
68

9+
export type AgentCircuitBreakerConfiguration = {
10+
/**
11+
* Percentage after what the circuit breaker should kick in.
12+
* Default: 50
13+
*/
14+
errorThresholdPercentage: number;
15+
/**
16+
* Count of requests before starting evaluating.
17+
* Default: 5
18+
*/
19+
volumeThreshold: number;
20+
/**
21+
* After what time the circuit breaker is attempting to retry sending requests in milliseconds
22+
* Default: 30_000
23+
*/
24+
resetTimeout: number;
25+
};
26+
27+
const defaultCircuitBreakerConfiguration: AgentCircuitBreakerConfiguration = {
28+
errorThresholdPercentage: 50,
29+
volumeThreshold: 10,
30+
resetTimeout: 30_000,
31+
};
32+
733
export interface AgentOptions {
834
enabled?: boolean;
935
name?: string;
@@ -48,7 +74,14 @@ export interface AgentOptions {
4874
* WHATWG Compatible fetch implementation
4975
* used by the agent to send reports
5076
*/
51-
fetch?: typeof fetch;
77+
fetch?: typeof defaultFetch;
78+
/**
79+
* Circuit Breaker Configuration.
80+
* true -> Use default configuration
81+
* false -> Disable
82+
* object -> use custom configuration see {AgentCircuitBreakerConfiguration}
83+
*/
84+
circuitBreaker?: boolean | AgentCircuitBreakerConfiguration;
5285
}
5386

5487
export function createAgent<TEvent>(
@@ -67,23 +100,31 @@ export function createAgent<TEvent>(
67100
headers?(): Record<string, string>;
68101
},
69102
) {
70-
const options: Required<Omit<AgentOptions, 'fetch'>> = {
103+
const options: Required<Omit<AgentOptions, 'fetch' | 'circuitBreaker'>> & {
104+
circuitBreaker: null | AgentCircuitBreakerConfiguration;
105+
} = {
71106
timeout: 30_000,
72107
debug: false,
73108
enabled: true,
74109
minTimeout: 200,
75110
maxRetries: 3,
76111
sendInterval: 10_000,
77112
maxSize: 25,
78-
logger: console,
79113
name: 'hive-client',
80114
version,
81115
...pluginOptions,
116+
circuitBreaker:
117+
pluginOptions.circuitBreaker == null || pluginOptions.circuitBreaker === true
118+
? defaultCircuitBreakerConfiguration
119+
: pluginOptions.circuitBreaker === false
120+
? null
121+
: pluginOptions.circuitBreaker,
122+
logger: createHiveLogger(pluginOptions.logger ?? console, '[agent]'),
82123
};
83124

84125
const enabled = options.enabled !== false;
85126

86-
let timeoutID: any = null;
127+
let timeoutID: ReturnType<typeof setTimeout> | null = null;
87128

88129
function schedule() {
89130
if (timeoutID) {
@@ -143,6 +184,27 @@ export function createAgent<TEvent>(
143184
return send({ throwOnError: true, skipSchedule: true });
144185
}
145186

187+
async function sendHTTPCall(buffer: string | Buffer<ArrayBufferLike>): Promise<Response> {
188+
const signal = breaker.getSignal();
189+
return await http.post(options.endpoint, buffer, {
190+
headers: {
191+
accept: 'application/json',
192+
'content-type': 'application/json',
193+
Authorization: `Bearer ${options.token}`,
194+
'User-Agent': `${options.name}/${options.version}`,
195+
...headers(),
196+
},
197+
timeout: options.timeout,
198+
retry: {
199+
retries: options.maxRetries,
200+
factor: 2,
201+
},
202+
logger: options.logger,
203+
fetchImplementation: pluginOptions.fetch,
204+
signal,
205+
});
206+
}
207+
146208
async function send(sendOptions?: {
147209
throwOnError?: boolean;
148210
skipSchedule: boolean;
@@ -160,23 +222,7 @@ export function createAgent<TEvent>(
160222
data.clear();
161223

162224
debugLog(`Sending report (queue ${dataToSend})`);
163-
const response = await http
164-
.post(options.endpoint, buffer, {
165-
headers: {
166-
accept: 'application/json',
167-
'content-type': 'application/json',
168-
Authorization: `Bearer ${options.token}`,
169-
'User-Agent': `${options.name}/${options.version}`,
170-
...headers(),
171-
},
172-
timeout: options.timeout,
173-
retry: {
174-
retries: options.maxRetries,
175-
factor: 2,
176-
},
177-
logger: options.logger,
178-
fetchImplementation: pluginOptions.fetch,
179-
})
225+
const response = sendFromBreaker(buffer)
180226
.then(res => {
181227
debugLog(`Report sent!`);
182228
return res;
@@ -215,6 +261,74 @@ export function createAgent<TEvent>(
215261
});
216262
}
217263

264+
let breaker: CircuitBreakerInterface<
265+
Parameters<typeof sendHTTPCall>,
266+
ReturnType<typeof sendHTTPCall>
267+
>;
268+
let loadCircuitBreakerPromise: Promise<void> | null = null;
269+
const breakerLogger = createHiveLogger(options.logger, '[circuit breaker]');
270+
271+
function noopBreaker(): typeof breaker {
272+
return {
273+
getSignal() {
274+
return undefined;
275+
},
276+
fire: sendHTTPCall,
277+
};
278+
}
279+
280+
if (options.circuitBreaker) {
281+
/**
282+
* We support Cloudflare, which does not has the `events` module.
283+
* So we lazy load opossum which has `events` as a dependency.
284+
*/
285+
breakerLogger.info('initialize circuit breaker');
286+
loadCircuitBreakerPromise = loadCircuitBreaker(
287+
CircuitBreaker => {
288+
breakerLogger.info('started');
289+
const realBreaker = new CircuitBreaker(sendHTTPCall, {
290+
...options.circuitBreaker,
291+
timeout: false,
292+
autoRenewAbortController: true,
293+
});
294+
295+
realBreaker.on('open', () =>
296+
breakerLogger.error('circuit opened - backend seems unreachable.'),
297+
);
298+
realBreaker.on('halfOpen', () =>
299+
breakerLogger.info('circuit half open - testing backend connectivity'),
300+
);
301+
realBreaker.on('close', () => breakerLogger.info('circuit closed - backend recovered '));
302+
303+
// @ts-expect-error missing definition in typedefs for `opposum`
304+
breaker = realBreaker;
305+
},
306+
() => {
307+
breakerLogger.info('circuit breaker not supported on platform');
308+
breaker = noopBreaker();
309+
},
310+
);
311+
} else {
312+
breaker = noopBreaker();
313+
}
314+
315+
async function sendFromBreaker(...args: Parameters<typeof breaker.fire>) {
316+
if (!breaker) {
317+
await loadCircuitBreakerPromise;
318+
}
319+
320+
try {
321+
return await breaker.fire(...args);
322+
} catch (err: unknown) {
323+
if (err instanceof Error && 'code' in err && err.code === 'EOPENBREAKER') {
324+
breakerLogger.info('circuit open - sending report skipped');
325+
return null;
326+
}
327+
328+
throw err;
329+
}
330+
}
331+
218332
return {
219333
capture,
220334
sendImmediately,

packages/libraries/core/src/client/http-client.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncRetry from 'async-retry';
2+
import { abortSignalAny } from '@graphql-hive/signal';
23
import { crypto, fetch, URL } from '@whatwg-node/fetch';
34
import type { Logger } from './types.js';
45

@@ -21,6 +22,8 @@ interface SharedConfig {
2122
* @default {response => response.ok}
2223
**/
2324
isRequestOk?: ResponseAssertFunction;
25+
/** Optional abort signal */
26+
signal?: AbortSignal;
2427
}
2528

2629
/**
@@ -78,6 +81,8 @@ export async function makeFetchCall(
7881
* @default {response => response.ok}
7982
**/
8083
isRequestOk?: ResponseAssertFunction;
84+
/** Optional abort signal */
85+
signal?: AbortSignal;
8186
},
8287
): Promise<Response> {
8388
const logger = config.logger;
@@ -87,6 +92,9 @@ export async function makeFetchCall(
8792
let maxTimeout = 2000;
8893
let factor = 1.2;
8994

95+
const actionHeader =
96+
config.method === 'POST' ? { 'x-client-action-id': crypto.randomUUID() } : undefined;
97+
9098
if (config.retry !== false) {
9199
retries = config.retry?.retries ?? 5;
92100
minTimeout = config.retry?.minTimeout ?? 200;
@@ -104,13 +112,15 @@ export async function makeFetchCall(
104112
);
105113

106114
const getDuration = measureTime();
107-
const signal = AbortSignal.timeout(config.timeout ?? 20_000);
115+
const timeoutSignal = AbortSignal.timeout(config.timeout ?? 20_000);
116+
const signal = config.signal ? abortSignalAny([config.signal, timeoutSignal]) : timeoutSignal;
108117

109118
const response = await (config.fetchImplementation ?? fetch)(endpoint, {
110119
method: config.method,
111120
body: config.body,
112121
headers: {
113122
'x-request-id': requestId,
123+
...actionHeader,
114124
...config.headers,
115125
},
116126
signal,
@@ -135,6 +145,12 @@ export async function makeFetchCall(
135145
throw new Error(`Unexpected HTTP error. (x-request-id=${requestId})`, { cause: error });
136146
});
137147

148+
if (config.signal?.aborted === true) {
149+
const error = config.signal.reason ?? new Error('Request aborted externally.');
150+
bail(error);
151+
throw error;
152+
}
153+
138154
if (isRequestOk(response)) {
139155
logger?.info(
140156
`${config.method} ${endpoint} (x-request-id=${requestId}) succeeded with status ${response.status} ${getDuration()}.`,

0 commit comments

Comments
 (0)