Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions yarn-project/aztec/src/cli/cmds/start_prover_agent.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { times } from '@aztec/foundation/collection';
import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import { Agent, makeUndiciFetch } from '@aztec/foundation/json-rpc/undici';
import { type LogFn } from '@aztec/foundation/log';
import { buildServerCircuitProver } from '@aztec/prover-client';
import {
Expand All @@ -10,7 +11,7 @@ import {
proverAgentConfigMappings,
} from '@aztec/prover-client/broker';
import { getProverNodeAgentConfigFromEnv } from '@aztec/prover-node';
import { initTelemetryClient, telemetryClientConfigMappings } from '@aztec/telemetry-client';
import { initTelemetryClient, makeTracedFetch, telemetryClientConfigMappings } from '@aztec/telemetry-client';

import { extractRelevantOptions } from '../util.js';
import { getVersions } from '../versioning.js';
Expand Down Expand Up @@ -39,7 +40,8 @@ export async function startProverAgent(
process.exit(1);
}

const broker = createProvingJobBrokerClient(config.proverBrokerUrl, getVersions());
const fetch = makeTracedFetch([1, 2, 3], false, makeUndiciFetch(new Agent({ connections: 10 })));
const broker = createProvingJobBrokerClient(config.proverBrokerUrl, getVersions(), fetch);

const telemetry = initTelemetryClient(extractRelevantOptions(options, telemetryClientConfigMappings, 'tel'));
const prover = await buildServerCircuitProver(config, telemetry);
Expand Down
8 changes: 6 additions & 2 deletions yarn-project/aztec/src/cli/cmds/start_prover_node.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { P2PApiSchema, ProverNodeApiSchema, type ProvingJobBroker, createAztecNodeClient } from '@aztec/circuit-types';
import { NULL_KEY } from '@aztec/ethereum';
import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import { Agent, makeUndiciFetch } from '@aztec/foundation/json-rpc/undici';
import { type LogFn } from '@aztec/foundation/log';
import { ProvingJobConsumerSchema, createProvingJobBrokerClient } from '@aztec/prover-client/broker';
import {
Expand All @@ -9,7 +10,7 @@ import {
getProverNodeConfigFromEnv,
proverNodeConfigMappings,
} from '@aztec/prover-node';
import { initTelemetryClient, telemetryClientConfigMappings } from '@aztec/telemetry-client';
import { initTelemetryClient, makeTracedFetch, telemetryClientConfigMappings } from '@aztec/telemetry-client';

import { mnemonicToAccount } from 'viem/accounts';

Expand Down Expand Up @@ -69,7 +70,10 @@ export async function startProverNode(

let broker: ProvingJobBroker;
if (proverConfig.proverBrokerUrl) {
broker = createProvingJobBrokerClient(proverConfig.proverBrokerUrl, getVersions(proverConfig));
// at 1TPS we'd enqueue ~1k tube proofs and ~1k AVM proofs immediately
// set a lower connectio limit such that we don't overload the server
const fetch = makeTracedFetch([1, 2, 3], false, makeUndiciFetch(new Agent({ connections: 100 })));
broker = createProvingJobBrokerClient(proverConfig.proverBrokerUrl, getVersions(proverConfig), fetch);
} else if (options.proverBroker) {
({ broker } = await startProverBroker(options, signalHandlers, services, userLog));
} else {
Expand Down
4 changes: 3 additions & 1 deletion yarn-project/foundation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
"./fs": "./dest/fs/index.js",
"./buffer": "./dest/buffer/index.js",
"./json-rpc": "./dest/json-rpc/index.js",
"./json-rpc/server": "./dest/json-rpc/server/index.js",
"./json-rpc/client": "./dest/json-rpc/client/index.js",
"./json-rpc/server": "./dest/json-rpc/server/index.js",
"./json-rpc/undici": "./dest/json-rpc/client/undici.js",
"./json-rpc/test": "./dest/json-rpc/test/index.js",
"./iterable": "./dest/iterable/index.js",
"./log": "./dest/log/index.js",
Expand Down Expand Up @@ -126,6 +127,7 @@
"pino": "^9.5.0",
"pino-pretty": "^13.0.0",
"sha3": "^2.1.4",
"undici": "^7.3.0",
"zod": "^3.23.8"
},
"devDependencies": {
Expand Down
9 changes: 9 additions & 0 deletions yarn-project/foundation/src/json-rpc/client/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ import { jsonStringify } from '../convert.js';

const log = createLogger('json-rpc:json_rpc_client');

export type JsonRpcFetch = (
host: string,
rpcMethod: string,
body: any,
useApiEndpoints: boolean,
extraHeaders?: Record<string, string>,
noRetry?: boolean,
) => Promise<{ response: any; headers: { get: (header: string) => string | null | undefined } }>;

/**
* A normal fetch function that does not retry.
* Alternatives are a fetch function with retries, or a mocked fetch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { format } from 'util';

import { type Logger, createLogger } from '../../log/pino-logger.js';
import { type ApiSchema, type ApiSchemaFor, schemaHasMethod } from '../../schemas/api.js';
import { defaultFetch } from './fetch.js';
import { type JsonRpcFetch, defaultFetch } from './fetch.js';

export type SafeJsonRpcClientOptions = {
useApiEndpoints?: boolean;
namespaceMethods?: string | false;
fetch?: typeof defaultFetch;
fetch?: JsonRpcFetch;
log?: Logger;
onResponse?: (res: {
response: any;
Expand Down
75 changes: 75 additions & 0 deletions yarn-project/foundation/src/json-rpc/client/undici.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { Agent, type Dispatcher } from 'undici';

import { createLogger } from '../../log/pino-logger.js';
import { NoRetryError } from '../../retry/index.js';
import { jsonStringify } from '../convert.js';
import { type JsonRpcFetch } from './fetch.js';

const log = createLogger('json-rpc:json_rpc_client:undici');

export { Agent };

export function makeUndiciFetch(client = new Agent()): JsonRpcFetch {
return async (
host: string,
rpcMethod: string,
body: any,
useApiEndpoints: boolean,
extraHeaders: Record<string, string> = {},
noRetry = false,
) => {
log.trace(`JsonRpcClient.fetch: ${host} ${rpcMethod}`, { host, rpcMethod, body });
let resp: Dispatcher.ResponseData<string>;
try {
resp = await client.request<string>({
method: 'POST',
origin: new URL(host),
path: useApiEndpoints ? rpcMethod : '/',
body: jsonStringify(body),
headers: {
...extraHeaders,
'content-type': 'application/json',
},
});
} catch (err) {
const errorMessage = `Error fetching from host ${host} with method ${rpcMethod}: ${String(err)}`;
throw new Error(errorMessage);
}

let responseJson: any;
const responseOk = resp.statusCode >= 200 && resp.statusCode <= 299;
try {
responseJson = await resp.body.json();
} catch (err) {
if (!responseOk) {
throw new Error('HTTP ' + resp.statusCode);
}
throw new Error(`Failed to parse body as JSON: ${await resp.body.text()}`);
}

if (!responseOk) {
const errorMessage = `Error ${resp.statusCode} response from server ${host} on ${rpcMethod}: ${responseJson.error.message}`;
if (noRetry || (resp.statusCode >= 400 && resp.statusCode < 500)) {
throw new NoRetryError(errorMessage);
} else {
throw new Error(errorMessage);
}
}

const headers = new Headers();
for (const [key, value] of Object.entries(resp.headers)) {
if (typeof value === 'string') {
headers.append(key, value);
} else if (Array.isArray(value)) {
for (const v of value) {
headers.append(key, v);
}
}
}

return {
response: responseJson,
headers,
};
};
}
4 changes: 2 additions & 2 deletions yarn-project/telemetry-client/src/wrappers/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { ATTR_JSONRPC_METHOD, ATTR_JSONRPC_REQUEST_ID } from '../vendor/attribut
* @param log - Optional logger for logging attempts.
* @returns A fetch function.
*/
export function makeTracedFetch(retries: number[], defaultNoRetry: boolean, log?: Logger) {
export function makeTracedFetch(retries: number[], defaultNoRetry: boolean, fetch = defaultFetch, log?: Logger) {
return (
host: string,
rpcMethod: string,
Expand All @@ -35,7 +35,7 @@ export function makeTracedFetch(retries: number[], defaultNoRetry: boolean, log?
const headers = { ...extraHeaders };
propagation.inject(context.active(), headers);
return await retry(
() => defaultFetch(host, rpcMethod, body, useApiEndpoints, headers, noRetry ?? defaultNoRetry),
() => fetch(host, rpcMethod, body, useApiEndpoints, headers, noRetry ?? defaultNoRetry),
`JsonRpcClient request ${rpcMethod} to ${host}`,
makeBackoff(retries),
log,
Expand Down
8 changes: 8 additions & 0 deletions yarn-project/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ __metadata:
supertest: "npm:^6.3.3"
ts-node: "npm:^10.9.1"
typescript: "npm:^5.0.4"
undici: "npm:^7.3.0"
viem: "npm:2.22.8"
zod: "npm:^3.23.8"
languageName: unknown
Expand Down Expand Up @@ -20953,6 +20954,13 @@ __metadata:
languageName: node
linkType: hard

"undici@npm:^7.3.0":
version: 7.3.0
resolution: "undici@npm:7.3.0"
checksum: 10/8165c3a18cd045c70b009db006bee1784c5ba405634e8d2017107f65220a23190e2dfbbcb314bec71add1e43cabcd7515cd29b655c819f22e59b5e2e7025db20
languageName: node
linkType: hard

"unicorn-magic@npm:^0.1.0":
version: 0.1.0
resolution: "unicorn-magic@npm:0.1.0"
Expand Down