Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .changeset/beige-teams-spend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
'@graphql-hive/envelop': minor
'@graphql-hive/apollo': minor
'@graphql-hive/core': minor
'@graphql-hive/yoga': minor
---

Support circuit breaking for usage reporting.

Circuit breaking is a fault-tolerance pattern that prevents a system from repeatedly calling a failing service. When errors or timeouts exceed a set threshold, the circuit “opens,” blocking further requests until the service recovers.

This ensures that during a network issue or outage, the service using the Hive SDK remains healthy and is not overwhelmed by failed usage reports or repeated retries.

```ts
import { createClient } from "@graphql-hive/core"

const client = createClient({
agent: {
circuitBreaker: {
/**
* Count of requests before starting evaluating.
* Default: 5
*/
volumeThreshold: 5,
/**
* After which time a request should be treated as a timeout in milleseconds
* Default: 5_000
*/
timeout: 5_000,
/**
* Percentage of requests failing before the circuit breaker kicks in.
* Default: 50
*/
errorThresholdPercentage: 1,
/**
* After what time the circuit breaker is attempting to retry sending requests in milliseconds
* Default: 30_000
*/
resetTimeout: 10_000,
},
}
})
```
2 changes: 2 additions & 0 deletions packages/libraries/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"async-retry": "^1.3.3",
"js-md5": "0.8.3",
"lodash.sortby": "^4.7.0",
"opossum": "^9.0.0",
"tiny-lru": "^8.0.2"
},
"devDependencies": {
Expand All @@ -58,6 +59,7 @@
"@types/async-retry": "1.4.8",
"@types/js-md5": "0.8.0",
"@types/lodash.sortby": "4.7.9",
"@types/opossum": "8.1.9",
"graphql": "16.9.0",
"nock": "14.0.10",
"tslib": "2.8.1",
Expand Down
53 changes: 53 additions & 0 deletions packages/libraries/core/playground/agent-circuit-breaker.ts
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit hard to unit test this functionality, so I added this file that I can play around with the values for the circuit breaker.

I encourage reviewers to do the same to get an understanding on how this works.

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
*
* Just a small playground to play around with different scenarios arounf the agent.
* You can run it like this: `bun run --watch packages/libraries/core/playground/agent-circuit-breaker.ts`
*/

import { createAgent } from '../src/client/agent.js';

let data: Array<{}> = [];

const agent = createAgent<{}>(
{
debug: true,
endpoint: 'http://127.0.0.1',
token: 'noop',
async fetch(url, opts) {

Check failure on line 16 in packages/libraries/core/playground/agent-circuit-breaker.ts

View workflow job for this annotation

GitHub Actions / code-style / eslint-and-prettier

'opts' is defined but never used. Allowed unused args must match /^_/u

Check failure on line 16 in packages/libraries/core/playground/agent-circuit-breaker.ts

View workflow job for this annotation

GitHub Actions / code-style / eslint-and-prettier

'url' is defined but never used. Allowed unused args must match /^_/u
// throw new Error('FAIL FAIL');
console.log('SENDING!');
return new Response('ok', {
status: 200,
});
},
circuitBreaker: {
timeout: 1_000,
errorThresholdPercentage: 1,
resetTimeout: 10_000,
volumeThreshold: 0,
},
maxSize: 1,
maxRetries: 0,
},
{
body() {
data = [];
return String(data);
},
data: {
clear() {
data = [];
},
size() {
return data.length;
},
set(d) {
data.push(d);
},
},
},
);

setInterval(() => {
agent.capture({});
}, 1_000);
116 changes: 96 additions & 20 deletions packages/libraries/core/src/client/agent.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,42 @@
import CircuitBreaker from 'opossum';
import { fetch as defaultFetch } from '@whatwg-node/fetch';
import { version } from '../version.js';
import { http } from './http-client.js';
import type { Logger } from './types.js';
import { createHiveLogger } from './utils.js';

type ReadOnlyResponse = Pick<Response, 'status' | 'text' | 'json' | 'statusText'>;

export type AgentCircuitBreakerConfiguration = {
/**
* After which time a request should be treated as a timeout in milleseconds
* Default: 5_000
*/
timeout: number;
/**
* Percentage after what the circuit breaker should kick in.
* Default: 50
*/
errorThresholdPercentage: number;
/**
* Count of requests before starting evaluating.
* Default: 5
*/
volumeThreshold: number;
/**
* After what time the circuit breaker is attempting to retry sending requests in milliseconds
* Default: 30_000
*/
resetTimeout: number;
};

const defaultCircuitBreakerConfiguration: AgentCircuitBreakerConfiguration = {
timeout: 5_000,
errorThresholdPercentage: 50,
volumeThreshold: 5,
resetTimeout: 30_000,
};

export interface AgentOptions {
enabled?: boolean;
name?: string;
Expand Down Expand Up @@ -48,7 +81,11 @@ export interface AgentOptions {
* WHATWG Compatible fetch implementation
* used by the agent to send reports
*/
fetch?: typeof fetch;
fetch?: typeof defaultFetch;
/**
* Circuit Breaker Configuration
*/
circuitBreaker?: AgentCircuitBreakerConfiguration;
}

export function createAgent<TEvent>(
Expand All @@ -75,15 +112,16 @@ export function createAgent<TEvent>(
maxRetries: 3,
sendInterval: 10_000,
maxSize: 25,
logger: console,
name: 'hive-client',
circuitBreaker: defaultCircuitBreakerConfiguration,
version,
...pluginOptions,
logger: createHiveLogger(pluginOptions.logger ?? console, '[agent]'),
};

const enabled = options.enabled !== false;

let timeoutID: any = null;
let timeoutID: ReturnType<typeof setTimeout> | null = null;

function schedule() {
if (timeoutID) {
Expand Down Expand Up @@ -143,6 +181,28 @@ export function createAgent<TEvent>(
return send({ throwOnError: true, skipSchedule: true });
}

async function sendHTTPCall(buffer: string | Buffer<ArrayBufferLike>) {
// @ts-expect-error missing definition in typedefs for `opposum`
const signal: AbortSignal = breaker.getSignal();
return await http.post(options.endpoint, buffer, {
headers: {
accept: 'application/json',
'content-type': 'application/json',
Authorization: `Bearer ${options.token}`,
'User-Agent': `${options.name}/${options.version}`,
...headers(),
},
timeout: options.timeout,
retry: {
retries: options.maxRetries,
factor: 2,
},
logger: options.logger,
fetchImplementation: pluginOptions.fetch,
signal,
});
}

async function send(sendOptions?: {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are basically doing the following:

circuit breaker ( # shuts down if underlying fails or times out aggressively
  http client ( # calls fetch and retries if non okay response
      fetch( # good old fetch
          # the payload we want to send
       )
   )
)

throwOnError?: boolean;
skipSchedule: boolean;
Expand All @@ -160,23 +220,7 @@ export function createAgent<TEvent>(
data.clear();

debugLog(`Sending report (queue ${dataToSend})`);
const response = await http
.post(options.endpoint, buffer, {
headers: {
accept: 'application/json',
'content-type': 'application/json',
Authorization: `Bearer ${options.token}`,
'User-Agent': `${options.name}/${options.version}`,
...headers(),
},
timeout: options.timeout,
retry: {
retries: options.maxRetries,
factor: 2,
},
logger: options.logger,
fetchImplementation: pluginOptions.fetch,
})
const response = sendFromBreaker(buffer)
.then(res => {
debugLog(`Report sent!`);
return res;
Expand Down Expand Up @@ -215,6 +259,38 @@ export function createAgent<TEvent>(
});
}

const breaker = new CircuitBreaker(sendHTTPCall, {
...options.circuitBreaker,
autoRenewAbortController: true,
});

const breakerLogger = createHiveLogger(options.logger, ' [circuit breaker]');

async function sendFromBreaker(...args: Parameters<typeof breaker.fire>) {
try {
return await breaker.fire(...args);
} catch (err: unknown) {
if (err instanceof Error && 'code' in err) {
if (err.code === 'EOPENBREAKER') {
breakerLogger.info('circuit open - sending report skipped');
return null;
}
if (err.code === 'ETIMEDOUT') {
breakerLogger.info('circuit open - sending report aborted - timed out');
return null;
}
}

throw err;
}
}

breaker.on('open', () => breakerLogger.error('circuit opened - backend seems unreachable.'));
breaker.on('halfOpen', () =>
breakerLogger.info('circuit half open - testing backend connectivity'),
);
breaker.on('close', () => breakerLogger.info('circuit closed - backend recovered '));

return {
capture,
sendImmediately,
Expand Down
16 changes: 15 additions & 1 deletion packages/libraries/core/src/client/http-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ interface SharedConfig {
* @default {response => response.ok}
**/
isRequestOk?: ResponseAssertFunction;
/** Optional abort signal */
signal?: AbortSignal;
}

/**
Expand Down Expand Up @@ -78,6 +80,8 @@ export async function makeFetchCall(
* @default {response => response.ok}
**/
isRequestOk?: ResponseAssertFunction;
/** Optional abort signal */
signal?: AbortSignal;
},
): Promise<Response> {
const logger = config.logger;
Expand All @@ -104,7 +108,10 @@ export async function makeFetchCall(
);

const getDuration = measureTime();
const signal = AbortSignal.timeout(config.timeout ?? 20_000);
const timeoutSignal = AbortSignal.timeout(config.timeout ?? 20_000);
const signal = config.signal
? AbortSignal.any([config.signal, timeoutSignal])
: timeoutSignal;

const response = await (config.fetchImplementation ?? fetch)(endpoint, {
method: config.method,
Expand Down Expand Up @@ -135,6 +142,13 @@ export async function makeFetchCall(
throw new Error(`Unexpected HTTP error. (x-request-id=${requestId})`, { cause: error });
});

if (config.signal?.aborted === true) {
const error = config.signal.reason ?? new Error('Request aborted externally.');
// TODO: maybe log some message?
bail(error);
throw error;
}

if (isRequestOk(response)) {
logger?.info(
`${config.method} ${endpoint} (x-request-id=${requestId}) succeeded with status ${response.status} ${getDuration()}.`,
Expand Down
Loading
Loading