Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions yarn-project/prover-client/src/proving_broker/proving_agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ describe('ProvingAgent', () => {
proofDB = {
getProofInput: jest.fn(),
getProofOutput: jest.fn(),
saveProofInput: jest.fn(),
saveProofOutput: jest.fn(),
saveProofInput: jest.fn(() => Promise.resolve('' as ProofUri)),
saveProofOutput: jest.fn(() => Promise.resolve('' as ProofUri)),
};

allowList = [ProvingRequestType.BASE_PARITY];
agent = new ProvingAgent(jobSource, proofDB, prover, allowList);
agent = new ProvingAgent(jobSource, proofDB, prover, allowList, agentPollIntervalMs);
});

afterEach(async () => {
Expand All @@ -76,12 +76,15 @@ describe('ProvingAgent', () => {

await jest.advanceTimersByTimeAsync(agentPollIntervalMs);
expect(jobSource.getProvingJob).toHaveBeenCalledTimes(1);
expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(1);

await jest.advanceTimersByTimeAsync(agentPollIntervalMs);
expect(jobSource.getProvingJob).toHaveBeenCalledTimes(1);
expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(2);

await jest.advanceTimersByTimeAsync(agentPollIntervalMs);
expect(jobSource.getProvingJob).toHaveBeenCalledTimes(1);
expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(3);

// let's resolve the proof
const result = makePublicInputsAndRecursiveProof(
Expand Down Expand Up @@ -121,7 +124,9 @@ describe('ProvingAgent', () => {
agent.start();

await jest.advanceTimersByTimeAsync(agentPollIntervalMs);
expect(jobSource.reportProvingJobError).toHaveBeenCalledWith(job.id, 'test error', false, { allowList });
expect(jobSource.reportProvingJobError).toHaveBeenCalledWith(job.id, expect.stringContaining('test error'), false, {
allowList,
});
});

it('sets the retry flag on when reporting an error', async () => {
Expand All @@ -134,7 +139,9 @@ describe('ProvingAgent', () => {
agent.start();

await jest.advanceTimersByTimeAsync(agentPollIntervalMs);
expect(jobSource.reportProvingJobError).toHaveBeenCalledWith(job.id, err.message, true, { allowList });
expect(jobSource.reportProvingJobError).toHaveBeenCalledWith(job.id, expect.stringContaining(err.message), true, {
allowList,
});
});

it('reports jobs in progress to the job source', async () => {
Expand Down Expand Up @@ -193,32 +200,39 @@ describe('ProvingAgent', () => {
// this should cause the agent to abort the current job and start the new one
const secondJobResponse = makeBaseParityJob();

jobSource.reportProvingJobProgress.mockResolvedValueOnce(secondJobResponse);
proofDB.getProofInput.mockResolvedValueOnce(secondJobResponse.inputs);

const secondProof =
promiseWithResolvers<PublicInputsAndRecursiveProof<ParityPublicInputs, typeof RECURSIVE_PROOF_LENGTH>>();
jest.spyOn(prover, 'getBaseParityProof').mockReturnValueOnce(secondProof.promise);

jobSource.reportProvingJobProgress.mockResolvedValueOnce(secondJobResponse);

await jest.advanceTimersByTimeAsync(agentPollIntervalMs);
expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(3);
expect(jobSource.reportProvingJobProgress).toHaveBeenLastCalledWith(firstJob.job.id, firstJob.time, {
expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(4);
expect(jobSource.reportProvingJobProgress).toHaveBeenNthCalledWith(3, firstJob.job.id, firstJob.time, {
allowList: [ProvingRequestType.BASE_PARITY],
});
expect(jobSource.reportProvingJobProgress).toHaveBeenNthCalledWith(
4,
secondJobResponse.job.id,
secondJobResponse.time,
{
allowList: [ProvingRequestType.BASE_PARITY],
},
);
expect(firstProofAborted).toBe(true);

// agent should have switched now
await jest.advanceTimersByTimeAsync(agentPollIntervalMs);
expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(4);
expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(5);
expect(jobSource.reportProvingJobProgress).toHaveBeenLastCalledWith(
secondJobResponse.job.id,
secondJobResponse.time,
{
allowList: [ProvingRequestType.BASE_PARITY],
},
);

secondProof.resolve(makeBaseParityResult());
});

it('immediately starts working on the next job', async () => {
Expand Down
120 changes: 74 additions & 46 deletions yarn-project/prover-client/src/proving_broker/proving_agent.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { AbortError } from '@aztec/foundation/error';
import { createLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { truncate } from '@aztec/foundation/string';
import { Timer } from '@aztec/foundation/timer';
import { ProvingError } from '@aztec/stdlib/errors';
import type {
ProvingJob,
GetProvingJobResponse,
ProvingJobConsumer,
ProvingJobId,
ProvingJobInputs,
Expand All @@ -31,7 +31,6 @@ export class ProvingAgent implements Traceable {
private currentJobController?: ProvingJobController;
private runningPromise: RunningPromise;
private instrumentation: ProvingAgentInstrumentation;
private idleTimer: Timer | undefined;

public readonly tracer: Tracer;

Expand Down Expand Up @@ -64,7 +63,6 @@ export class ProvingAgent implements Traceable {
}

public start(): void {
this.idleTimer = new Timer();
this.runningPromise.start();
}

Expand All @@ -75,39 +73,63 @@ export class ProvingAgent implements Traceable {

@trackSpan('ProvingAgent.safeWork')
private async work() {
// every tick we need to
// (1) either do a heartbeat, telling the broker that we're working
// (2) get a new job
// If during (1) the broker returns a new job that means we can cancel the current job and start the new one
let maybeJob: { job: ProvingJob; time: number } | undefined;
if (this.currentJobController?.getStatus() === ProvingJobControllerStatus.PROVING) {
maybeJob = await this.broker.reportProvingJobProgress(
this.currentJobController.getJobId(),
this.currentJobController.getStartedAt(),
{ allowList: this.proofAllowList },
);
// every tick we need to take one of the following actions:
// 1. send a hearbeat to the broker that we're working on some job
// 2. if the job is complete, send its result to the broker
// 3. get a job from the broker
// Any one of these actions could give us a new job to work on. If that happens we abort the current job.
//
// This loop gets triggered in one of two ways:
// - either on a timer (see pollIntervalMs)
// - or when a proof completes
let maybeJob: GetProvingJobResponse | undefined;

if (this.currentJobController) {
const status = this.currentJobController.getStatus();
const jobId = this.currentJobController.getJobId();
const proofType = this.currentJobController.getProofType();
const startedAt = this.currentJobController.getStartedAt();
const result = this.currentJobController.getResult();

if (status === ProvingJobControllerStatus.RUNNING) {
maybeJob = await this.broker.reportProvingJobProgress(jobId, startedAt, { allowList: this.proofAllowList });
} else if (status === ProvingJobControllerStatus.DONE) {
if (result) {
maybeJob = await this.reportResult(jobId, proofType, result);
} else {
this.log.warn(
`Job controller for job ${this.currentJobController.getJobId()} is done but doesn't have a result`,
{ jobId },
);
maybeJob = await this.reportResult(
jobId,
proofType,
new ProvingError('No result found after proving', undefined, /* retry */ true),
);
}

this.currentJobController = undefined;
} else {
// IDLE status should not be seen because a job is started as soon as it is created
this.log.warn(`Idle job controller for job: ${this.currentJobController.getJobId()}. Skipping main loop work`, {
jobId: this.currentJobController.getJobId(),
});
return;
}
} else {
maybeJob = await this.broker.getProvingJob({ allowList: this.proofAllowList });
}

if (!maybeJob) {
return;
}

if (this.idleTimer) {
this.instrumentation.recordIdleTime(this.idleTimer);
if (maybeJob) {
await this.startJob(maybeJob);
}
this.idleTimer = undefined;

const { job, time } = maybeJob;
await this.startJob(job, time);
}

private async startJob(job: ProvingJob, startedAt: number): Promise<void> {
private async startJob({ job, time: startedAt }: GetProvingJobResponse): Promise<void> {
let abortedProofJobId: string | undefined;
let abortedProofName: string | undefined;

if (this.currentJobController?.getStatus() === ProvingJobControllerStatus.PROVING) {
if (this.currentJobController?.getStatus() === ProvingJobControllerStatus.RUNNING) {
abortedProofJobId = this.currentJobController.getJobId();
abortedProofName = this.currentJobController.getProofTypeName();
this.currentJobController?.abort();
Expand All @@ -122,7 +144,7 @@ export class ProvingAgent implements Traceable {
});

if (maybeJob) {
return this.startJob(maybeJob.job, maybeJob.time);
return this.startJob(maybeJob);
}

return;
Expand All @@ -134,7 +156,11 @@ export class ProvingAgent implements Traceable {
job.epochNumber,
startedAt,
this.circuitProver,
this.handleJobResult,
() => {
// trigger a run of the main work loop when proving completes
// no need to await this here. The controller will stay alive (in DONE state) until the result is send to the broker
void this.runningPromise.trigger();
},
);

if (abortedProofJobId) {
Expand All @@ -154,28 +180,30 @@ export class ProvingAgent implements Traceable {
this.currentJobController.start();
}

handleJobResult = async <T extends ProvingRequestType>(
private async reportResult<T extends ProvingRequestType>(
jobId: ProvingJobId,
type: T,
err: Error | undefined,
result: ProvingJobResultsMap[T] | undefined,
) => {
let maybeJob: { job: ProvingJob; time: number } | undefined;
if (err) {
const retry = err.name === ProvingError.NAME ? (err as ProvingError).retry : false;
this.log.error(`Job id=${jobId} type=${ProvingRequestType[type]} failed err=${err.message} retry=${retry}`, err);
maybeJob = await this.broker.reportProvingJobError(jobId, err.message, retry, { allowList: this.proofAllowList });
} else if (result) {
result: ProvingJobResultsMap[T] | Error,
): Promise<GetProvingJobResponse | undefined> {
let maybeJob: GetProvingJobResponse | undefined;
if (result instanceof AbortError) {
// no-op
this.log.warn(`Job id=${jobId} was aborted. Not reporting result back to broker`, result);
} else if (result instanceof Error) {
const retry = result.name === ProvingError.NAME ? (result as ProvingError).retry : false;
this.log.error(
`Job id=${jobId} type=${ProvingRequestType[type]} failed err=${result.message} retry=${retry}`,
result,
);
maybeJob = await this.broker.reportProvingJobError(jobId, result.message, retry, {
allowList: this.proofAllowList,
});
} else {
const outputUri = await this.proofStore.saveProofOutput(jobId, type, result);
this.log.info(`Job id=${jobId} type=${ProvingRequestType[type]} completed outputUri=${truncate(outputUri)}`);
maybeJob = await this.broker.reportProvingJobSuccess(jobId, outputUri, { allowList: this.proofAllowList });
}

if (maybeJob) {
const { job, time } = maybeJob;
await this.startJob(job, time);
} else {
this.idleTimer = new Timer();
}
};
return maybeJob;
}
}
Loading