From e9835b5de6da00dc057699c4b320b66d05cf122b Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Fri, 7 Feb 2025 16:24:12 +0000 Subject: [PATCH 1/2] feat: add undici --- .../aztec/src/cli/cmds/start_prover_agent.ts | 6 +- .../aztec/src/cli/cmds/start_prover_node.ts | 8 ++- yarn-project/foundation/package.json | 4 +- .../foundation/src/json-rpc/client/undici.ts | 60 +++++++++++++++++++ .../telemetry-client/src/wrappers/fetch.ts | 4 +- yarn-project/yarn.lock | 8 +++ 6 files changed, 83 insertions(+), 7 deletions(-) create mode 100644 yarn-project/foundation/src/json-rpc/client/undici.ts diff --git a/yarn-project/aztec/src/cli/cmds/start_prover_agent.ts b/yarn-project/aztec/src/cli/cmds/start_prover_agent.ts index 5c7925ded9e5..d704ed8584f1 100644 --- a/yarn-project/aztec/src/cli/cmds/start_prover_agent.ts +++ b/yarn-project/aztec/src/cli/cmds/start_prover_agent.ts @@ -1,5 +1,6 @@ import { times } from '@aztec/foundation/collection'; import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; +import { Agent, makeUndiciFetch } from '@aztec/foundation/json-rpc/undici'; import { type LogFn } from '@aztec/foundation/log'; import { buildServerCircuitProver } from '@aztec/prover-client'; import { @@ -10,7 +11,7 @@ import { proverAgentConfigMappings, } from '@aztec/prover-client/broker'; import { getProverNodeAgentConfigFromEnv } from '@aztec/prover-node'; -import { initTelemetryClient, telemetryClientConfigMappings } from '@aztec/telemetry-client'; +import { initTelemetryClient, makeTracedFetch, telemetryClientConfigMappings } from '@aztec/telemetry-client'; import { extractRelevantOptions } from '../util.js'; import { getVersions } from '../versioning.js'; @@ -39,7 +40,8 @@ export async function startProverAgent( process.exit(1); } - const broker = createProvingJobBrokerClient(config.proverBrokerUrl, getVersions()); + const fetch = makeTracedFetch([1, 2, 3], false, makeUndiciFetch(new Agent({ connections: 10 }))); + const broker = createProvingJobBrokerClient(config.proverBrokerUrl, getVersions(), fetch); const telemetry = initTelemetryClient(extractRelevantOptions(options, telemetryClientConfigMappings, 'tel')); const prover = await buildServerCircuitProver(config, telemetry); diff --git a/yarn-project/aztec/src/cli/cmds/start_prover_node.ts b/yarn-project/aztec/src/cli/cmds/start_prover_node.ts index 43d9d04ac1b0..80b26a7c62b3 100644 --- a/yarn-project/aztec/src/cli/cmds/start_prover_node.ts +++ b/yarn-project/aztec/src/cli/cmds/start_prover_node.ts @@ -1,6 +1,7 @@ import { P2PApiSchema, ProverNodeApiSchema, type ProvingJobBroker, createAztecNodeClient } from '@aztec/circuit-types'; import { NULL_KEY } from '@aztec/ethereum'; import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; +import { Agent, makeUndiciFetch } from '@aztec/foundation/json-rpc/undici'; import { type LogFn } from '@aztec/foundation/log'; import { ProvingJobConsumerSchema, createProvingJobBrokerClient } from '@aztec/prover-client/broker'; import { @@ -9,7 +10,7 @@ import { getProverNodeConfigFromEnv, proverNodeConfigMappings, } from '@aztec/prover-node'; -import { initTelemetryClient, telemetryClientConfigMappings } from '@aztec/telemetry-client'; +import { initTelemetryClient, makeTracedFetch, telemetryClientConfigMappings } from '@aztec/telemetry-client'; import { mnemonicToAccount } from 'viem/accounts'; @@ -69,7 +70,10 @@ export async function startProverNode( let broker: ProvingJobBroker; if (proverConfig.proverBrokerUrl) { - broker = createProvingJobBrokerClient(proverConfig.proverBrokerUrl, getVersions(proverConfig)); + // at 1TPS we'd enqueue ~1k tube proofs and ~1k AVM proofs immediately + // set a lower connectio limit such that we don't overload the server + const fetch = makeTracedFetch([1, 2, 3], false, makeUndiciFetch(new Agent({ connections: 100 }))); + broker = createProvingJobBrokerClient(proverConfig.proverBrokerUrl, getVersions(proverConfig), fetch); } else if (options.proverBroker) { ({ broker } = await startProverBroker(options, signalHandlers, services, userLog)); } else { diff --git a/yarn-project/foundation/package.json b/yarn-project/foundation/package.json index 85573396351f..fbbc863f0b9e 100644 --- a/yarn-project/foundation/package.json +++ b/yarn-project/foundation/package.json @@ -24,8 +24,9 @@ "./fs": "./dest/fs/index.js", "./buffer": "./dest/buffer/index.js", "./json-rpc": "./dest/json-rpc/index.js", - "./json-rpc/server": "./dest/json-rpc/server/index.js", "./json-rpc/client": "./dest/json-rpc/client/index.js", + "./json-rpc/server": "./dest/json-rpc/server/index.js", + "./json-rpc/undici": "./dest/json-rpc/client/undici.js", "./json-rpc/test": "./dest/json-rpc/test/index.js", "./iterable": "./dest/iterable/index.js", "./log": "./dest/log/index.js", @@ -126,6 +127,7 @@ "pino": "^9.5.0", "pino-pretty": "^13.0.0", "sha3": "^2.1.4", + "undici": "^7.3.0", "zod": "^3.23.8" }, "devDependencies": { diff --git a/yarn-project/foundation/src/json-rpc/client/undici.ts b/yarn-project/foundation/src/json-rpc/client/undici.ts new file mode 100644 index 000000000000..c7ef9c9b00c7 --- /dev/null +++ b/yarn-project/foundation/src/json-rpc/client/undici.ts @@ -0,0 +1,60 @@ +import { Agent, type Dispatcher } from 'undici'; + +import { createLogger } from '../../log/pino-logger.js'; +import { NoRetryError } from '../../retry/index.js'; +import { jsonStringify } from '../convert.js'; + +const log = createLogger('json-rpc:json_rpc_client:undici'); + +export { Agent }; + +export function makeUndiciFetch(client = new Agent()) { + return async ( + host: string, + rpcMethod: string, + body: any, + useApiEndpoints: boolean, + extraHeaders: Record = {}, + noRetry = false, + ) => { + log.trace(`JsonRpcClient.fetch: ${host} ${rpcMethod}`, { host, rpcMethod, body }); + let resp: Dispatcher.ResponseData; + try { + resp = await client.request({ + method: 'POST', + origin: new URL(host), + path: useApiEndpoints ? rpcMethod : '/', + body: jsonStringify(body), + headers: { + ...extraHeaders, + 'content-type': 'application/json', + }, + }); + } catch (err) { + const errorMessage = `Error fetching from host ${host} with method ${rpcMethod}: ${String(err)}`; + throw new Error(errorMessage); + } + + let responseJson: any; + const responseOk = resp.statusCode >= 200 && resp.statusCode <= 299; + try { + responseJson = await resp.body.json(); + } catch (err) { + if (!responseOk) { + throw new Error('HTTP ' + resp.statusCode); + } + throw new Error(`Failed to parse body as JSON: ${await resp.body.text()}`); + } + + if (!responseOk) { + const errorMessage = `Error ${resp.statusCode} response from server ${host} on ${rpcMethod}: ${responseJson.error.message}`; + if (noRetry || (resp.statusCode >= 400 && resp.statusCode < 500)) { + throw new NoRetryError(errorMessage); + } else { + throw new Error(errorMessage); + } + } + + return responseJson; + }; +} diff --git a/yarn-project/telemetry-client/src/wrappers/fetch.ts b/yarn-project/telemetry-client/src/wrappers/fetch.ts index d4506deb8245..9da03aa1b055 100644 --- a/yarn-project/telemetry-client/src/wrappers/fetch.ts +++ b/yarn-project/telemetry-client/src/wrappers/fetch.ts @@ -14,7 +14,7 @@ import { ATTR_JSONRPC_METHOD, ATTR_JSONRPC_REQUEST_ID } from '../vendor/attribut * @param log - Optional logger for logging attempts. * @returns A fetch function. */ -export function makeTracedFetch(retries: number[], defaultNoRetry: boolean, log?: Logger) { +export function makeTracedFetch(retries: number[], defaultNoRetry: boolean, fetch = defaultFetch, log?: Logger) { return ( host: string, rpcMethod: string, @@ -35,7 +35,7 @@ export function makeTracedFetch(retries: number[], defaultNoRetry: boolean, log? const headers = { ...extraHeaders }; propagation.inject(context.active(), headers); return await retry( - () => defaultFetch(host, rpcMethod, body, useApiEndpoints, headers, noRetry ?? defaultNoRetry), + () => fetch(host, rpcMethod, body, useApiEndpoints, headers, noRetry ?? defaultNoRetry), `JsonRpcClient request ${rpcMethod} to ${host}`, makeBackoff(retries), log, diff --git a/yarn-project/yarn.lock b/yarn-project/yarn.lock index 6b39a551bcef..ae1edacd43bd 100644 --- a/yarn-project/yarn.lock +++ b/yarn-project/yarn.lock @@ -812,6 +812,7 @@ __metadata: supertest: "npm:^6.3.3" ts-node: "npm:^10.9.1" typescript: "npm:^5.0.4" + undici: "npm:^7.3.0" viem: "npm:2.22.8" zod: "npm:^3.23.8" languageName: unknown @@ -20953,6 +20954,13 @@ __metadata: languageName: node linkType: hard +"undici@npm:^7.3.0": + version: 7.3.0 + resolution: "undici@npm:7.3.0" + checksum: 10/8165c3a18cd045c70b009db006bee1784c5ba405634e8d2017107f65220a23190e2dfbbcb314bec71add1e43cabcd7515cd29b655c819f22e59b5e2e7025db20 + languageName: node + linkType: hard + "unicorn-magic@npm:^0.1.0": version: 0.1.0 resolution: "unicorn-magic@npm:0.1.0" From 4630fc4c21d22a2105fabda47e55cf13160adf5d Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Wed, 12 Feb 2025 10:54:30 +0000 Subject: [PATCH 2/2] fix: correctly implement json-rpc fetch interface --- .../foundation/src/json-rpc/client/fetch.ts | 9 +++++++++ .../json-rpc/client/safe_json_rpc_client.ts | 4 ++-- .../foundation/src/json-rpc/client/undici.ts | 19 +++++++++++++++++-- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/yarn-project/foundation/src/json-rpc/client/fetch.ts b/yarn-project/foundation/src/json-rpc/client/fetch.ts index 78f8045029f9..7142e9cfc8cc 100644 --- a/yarn-project/foundation/src/json-rpc/client/fetch.ts +++ b/yarn-project/foundation/src/json-rpc/client/fetch.ts @@ -6,6 +6,15 @@ import { jsonStringify } from '../convert.js'; const log = createLogger('json-rpc:json_rpc_client'); +export type JsonRpcFetch = ( + host: string, + rpcMethod: string, + body: any, + useApiEndpoints: boolean, + extraHeaders?: Record, + noRetry?: boolean, +) => Promise<{ response: any; headers: { get: (header: string) => string | null | undefined } }>; + /** * A normal fetch function that does not retry. * Alternatives are a fetch function with retries, or a mocked fetch. diff --git a/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts b/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts index 7332df1523f1..ac4783120831 100644 --- a/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts +++ b/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts @@ -2,12 +2,12 @@ import { format } from 'util'; import { type Logger, createLogger } from '../../log/pino-logger.js'; import { type ApiSchema, type ApiSchemaFor, schemaHasMethod } from '../../schemas/api.js'; -import { defaultFetch } from './fetch.js'; +import { type JsonRpcFetch, defaultFetch } from './fetch.js'; export type SafeJsonRpcClientOptions = { useApiEndpoints?: boolean; namespaceMethods?: string | false; - fetch?: typeof defaultFetch; + fetch?: JsonRpcFetch; log?: Logger; onResponse?: (res: { response: any; diff --git a/yarn-project/foundation/src/json-rpc/client/undici.ts b/yarn-project/foundation/src/json-rpc/client/undici.ts index c7ef9c9b00c7..14bc5e49f99c 100644 --- a/yarn-project/foundation/src/json-rpc/client/undici.ts +++ b/yarn-project/foundation/src/json-rpc/client/undici.ts @@ -3,12 +3,13 @@ import { Agent, type Dispatcher } from 'undici'; import { createLogger } from '../../log/pino-logger.js'; import { NoRetryError } from '../../retry/index.js'; import { jsonStringify } from '../convert.js'; +import { type JsonRpcFetch } from './fetch.js'; const log = createLogger('json-rpc:json_rpc_client:undici'); export { Agent }; -export function makeUndiciFetch(client = new Agent()) { +export function makeUndiciFetch(client = new Agent()): JsonRpcFetch { return async ( host: string, rpcMethod: string, @@ -55,6 +56,20 @@ export function makeUndiciFetch(client = new Agent()) { } } - return responseJson; + const headers = new Headers(); + for (const [key, value] of Object.entries(resp.headers)) { + if (typeof value === 'string') { + headers.append(key, value); + } else if (Array.isArray(value)) { + for (const v of value) { + headers.append(key, v); + } + } + } + + return { + response: responseJson, + headers, + }; }; }