Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .changeset/beige-teams-spend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
'@graphql-hive/envelop': minor
'@graphql-hive/apollo': minor
'@graphql-hive/core': minor
'@graphql-hive/yoga': minor
---

Support circuit breaking for usage reporting.

Circuit breaking is a fault-tolerance pattern that prevents a system from repeatedly calling a failing service. When errors or timeouts exceed a set threshold, the circuit “opens,” blocking further requests until the service recovers.

This ensures that during a network issue or outage, the service using the Hive SDK remains healthy and is not overwhelmed by failed usage reports or repeated retries.

```ts
import { createClient } from "@graphql-hive/core"

const client = createClient({
agent: {
circuitBreaker: {
/**
* Count of requests before starting evaluating.
* Default: 5
*/
volumeThreshold: 5,
/**
* After which time a request should be treated as a timeout in milleseconds
* Default: 5_000
*/
timeout: 5_000,
/**
* Percentage of requests failing before the circuit breaker kicks in.
* Default: 50
*/
errorThresholdPercentage: 1,
/**
* After what time the circuit breaker is attempting to retry sending requests in milliseconds
* Default: 30_000
*/
resetTimeout: 10_000,
},
}
})
```
3 changes: 3 additions & 0 deletions packages/libraries/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
"graphql": "^0.13.0 || ^14.0.0 || ^15.0.0 || ^16.0.0"
},
"dependencies": {
"@graphql-hive/signal": "^2.0.0",
"@graphql-tools/utils": "^10.0.0",
"@whatwg-node/fetch": "^0.10.6",
"async-retry": "^1.3.3",
"js-md5": "0.8.3",
"lodash.sortby": "^4.7.0",
"opossum": "^9.0.0",
"tiny-lru": "^8.0.2"
},
"devDependencies": {
Expand All @@ -58,6 +60,7 @@
"@types/async-retry": "1.4.8",
"@types/js-md5": "0.8.0",
"@types/lodash.sortby": "4.7.9",
"@types/opossum": "8.1.9",
"graphql": "16.9.0",
"nock": "14.0.10",
"tslib": "2.8.1",
Expand Down
53 changes: 53 additions & 0 deletions packages/libraries/core/playground/agent-circuit-breaker.ts
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit hard to unit test this functionality, so I added this file that I can play around with the values for the circuit breaker.

I encourage reviewers to do the same to get an understanding on how this works.

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
*
* Just a small playground to play around with different scenarios arounf the agent.
* You can run it like this: `bun run --watch packages/libraries/core/playground/agent-circuit-breaker.ts`
*/

import { createAgent } from '../src/client/agent.js';

let data: Array<{}> = [];

const agent = createAgent<{}>(
{
debug: true,
endpoint: 'http://127.0.0.1',
token: 'noop',
async fetch(_url, _opts) {
// throw new Error('FAIL FAIL');
console.log('SENDING!');
return new Response('ok', {
status: 200,
});
},
circuitBreaker: {
timeout: 1_000,
errorThresholdPercentage: 1,
resetTimeout: 10_000,
volumeThreshold: 0,
},
maxSize: 1,
maxRetries: 0,
},
{
body() {
data = [];
return String(data);
},
data: {
clear() {
data = [];
},
size() {
return data.length;
},
set(d) {
data.push(d);
},
},
},
);

setInterval(() => {
agent.capture({});
}, 1_000);
166 changes: 146 additions & 20 deletions packages/libraries/core/src/client/agent.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,42 @@
import type CircuitBreaker from 'opossum';
import { fetch as defaultFetch } from '@whatwg-node/fetch';
import { version } from '../version.js';
import { http } from './http-client.js';
import type { Logger } from './types.js';
import { createHiveLogger } from './utils.js';

type ReadOnlyResponse = Pick<Response, 'status' | 'text' | 'json' | 'statusText'>;

export type AgentCircuitBreakerConfiguration = {
/**
* After which time a request should be treated as a timeout in milleseconds
* Default: 5_000
*/
timeout: number;
/**
* Percentage after what the circuit breaker should kick in.
* Default: 50
*/
errorThresholdPercentage: number;
/**
* Count of requests before starting evaluating.
* Default: 5
*/
volumeThreshold: number;
/**
* After what time the circuit breaker is attempting to retry sending requests in milliseconds
* Default: 30_000
*/
resetTimeout: number;
};

const defaultCircuitBreakerConfiguration: AgentCircuitBreakerConfiguration = {
timeout: 5_000,
errorThresholdPercentage: 50,
volumeThreshold: 5,
resetTimeout: 30_000,
};

export interface AgentOptions {
enabled?: boolean;
name?: string;
Expand Down Expand Up @@ -48,7 +81,11 @@ export interface AgentOptions {
* WHATWG Compatible fetch implementation
* used by the agent to send reports
*/
fetch?: typeof fetch;
fetch?: typeof defaultFetch;
/**
* Circuit Breaker Configuration
*/
circuitBreaker?: AgentCircuitBreakerConfiguration;
}

export function createAgent<TEvent>(
Expand All @@ -75,15 +112,16 @@ export function createAgent<TEvent>(
maxRetries: 3,
sendInterval: 10_000,
maxSize: 25,
logger: console,
name: 'hive-client',
circuitBreaker: defaultCircuitBreakerConfiguration,
version,
...pluginOptions,
logger: createHiveLogger(pluginOptions.logger ?? console, '[agent]'),
};

const enabled = options.enabled !== false;

let timeoutID: any = null;
let timeoutID: ReturnType<typeof setTimeout> | null = null;

function schedule() {
if (timeoutID) {
Expand Down Expand Up @@ -143,6 +181,27 @@ export function createAgent<TEvent>(
return send({ throwOnError: true, skipSchedule: true });
}

async function sendHTTPCall(buffer: string | Buffer<ArrayBufferLike>): Promise<Response> {
const signal = breaker.getSignal();
return await http.post(options.endpoint, buffer, {
headers: {
accept: 'application/json',
'content-type': 'application/json',
Authorization: `Bearer ${options.token}`,
'User-Agent': `${options.name}/${options.version}`,
...headers(),
},
timeout: options.timeout,
retry: {
retries: options.maxRetries,
factor: 2,
},
logger: options.logger,
fetchImplementation: pluginOptions.fetch,
signal,
});
}

async function send(sendOptions?: {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are basically doing the following:

circuit breaker ( # shuts down if underlying fails or times out aggressively
  http client ( # calls fetch and retries if non okay response
      fetch( # good old fetch
          # the payload we want to send
       )
   )
)

throwOnError?: boolean;
skipSchedule: boolean;
Expand All @@ -160,23 +219,7 @@ export function createAgent<TEvent>(
data.clear();

debugLog(`Sending report (queue ${dataToSend})`);
const response = await http
.post(options.endpoint, buffer, {
headers: {
accept: 'application/json',
'content-type': 'application/json',
Authorization: `Bearer ${options.token}`,
'User-Agent': `${options.name}/${options.version}`,
...headers(),
},
timeout: options.timeout,
retry: {
retries: options.maxRetries,
factor: 2,
},
logger: options.logger,
fetchImplementation: pluginOptions.fetch,
})
const response = sendFromBreaker(buffer)
.then(res => {
debugLog(`Report sent!`);
return res;
Expand Down Expand Up @@ -215,9 +258,92 @@ export function createAgent<TEvent>(
});
}

/**
* We support Cloudflare, which does not has the `events` module.
* So we lazy load opossum which has `events` as a dependency.
*/
const breakerLogger = createHiveLogger(options.logger, '[circuit breaker]');

let breaker: CircuitBreakerInterface<
Parameters<typeof sendHTTPCall>,
ReturnType<typeof sendHTTPCall>
>;

breakerLogger.info('initialize circuit breaker');
const loadCircuitBreakerPromise = loadCircuitBreaker(
CircuitBreaker => {
breakerLogger.info('started');
const realBreaker = new CircuitBreaker(sendHTTPCall, {
...options.circuitBreaker,
autoRenewAbortController: true,
});

realBreaker.on('open', () =>
breakerLogger.error('circuit opened - backend seems unreachable.'),
);
realBreaker.on('halfOpen', () =>
breakerLogger.info('circuit half open - testing backend connectivity'),
);
realBreaker.on('close', () => breakerLogger.info('circuit closed - backend recovered '));

// @ts-expect-error missing definition in typedefs for `opposum`
breaker = realBreaker;
},
() => {
breakerLogger.info('circuit breaker not supported on platform');
breaker = {
getSignal() {
return undefined;
},
fire: sendHTTPCall,
};
},
);

async function sendFromBreaker(...args: Parameters<typeof breaker.fire>) {
if (!breaker) {
await loadCircuitBreakerPromise;
}

try {
return await breaker.fire(...args);
} catch (err: unknown) {
if (err instanceof Error && 'code' in err) {
if (err.code === 'EOPENBREAKER') {
breakerLogger.info('circuit open - sending report skipped');
return null;
}
if (err.code === 'ETIMEDOUT') {
breakerLogger.info('circuit open - sending report aborted - timed out');
return null;
}
}

throw err;
}
}

return {
capture,
sendImmediately,
dispose,
};
}

type CircuitBreakerInterface<TI extends unknown[] = unknown[], TR = unknown> = {
fire(...args: TI): TR;
getSignal(): AbortSignal | undefined;
};

async function loadCircuitBreaker(
success: (breaker: typeof CircuitBreaker) => void,
error: () => void,
): Promise<void> {
const packageName = 'opossum';
try {
const module = await import(packageName);
success(module.default);
} catch (err) {
error();
}
}
14 changes: 13 additions & 1 deletion packages/libraries/core/src/client/http-client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncRetry from 'async-retry';
import { abortSignalAny } from '@graphql-hive/signal';
import { crypto, fetch, URL } from '@whatwg-node/fetch';
import type { Logger } from './types.js';

Expand All @@ -21,6 +22,8 @@ interface SharedConfig {
* @default {response => response.ok}
**/
isRequestOk?: ResponseAssertFunction;
/** Optional abort signal */
signal?: AbortSignal;
}

/**
Expand Down Expand Up @@ -78,6 +81,8 @@ export async function makeFetchCall(
* @default {response => response.ok}
**/
isRequestOk?: ResponseAssertFunction;
/** Optional abort signal */
signal?: AbortSignal;
},
): Promise<Response> {
const logger = config.logger;
Expand All @@ -104,7 +109,8 @@ export async function makeFetchCall(
);

const getDuration = measureTime();
const signal = AbortSignal.timeout(config.timeout ?? 20_000);
const timeoutSignal = AbortSignal.timeout(config.timeout ?? 20_000);
const signal = config.signal ? abortSignalAny([config.signal, timeoutSignal]) : timeoutSignal;

const response = await (config.fetchImplementation ?? fetch)(endpoint, {
method: config.method,
Expand Down Expand Up @@ -135,6 +141,12 @@ export async function makeFetchCall(
throw new Error(`Unexpected HTTP error. (x-request-id=${requestId})`, { cause: error });
});

if (config.signal?.aborted === true) {
const error = config.signal.reason ?? new Error('Request aborted externally.');
bail(error);
throw error;
}

if (isRequestOk(response)) {
logger?.info(
`${config.method} ${endpoint} (x-request-id=${requestId}) succeeded with status ${response.status} ${getDuration()}.`,
Expand Down
Loading
Loading