diff --git a/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts b/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts index 0ef040705640..323e797cb974 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts @@ -44,12 +44,12 @@ describe('ProvingAgent', () => { proofDB = { getProofInput: jest.fn(), getProofOutput: jest.fn(), - saveProofInput: jest.fn(), - saveProofOutput: jest.fn(), + saveProofInput: jest.fn(() => Promise.resolve('' as ProofUri)), + saveProofOutput: jest.fn(() => Promise.resolve('' as ProofUri)), }; allowList = [ProvingRequestType.BASE_PARITY]; - agent = new ProvingAgent(jobSource, proofDB, prover, allowList); + agent = new ProvingAgent(jobSource, proofDB, prover, allowList, agentPollIntervalMs); }); afterEach(async () => { @@ -76,12 +76,15 @@ describe('ProvingAgent', () => { await jest.advanceTimersByTimeAsync(agentPollIntervalMs); expect(jobSource.getProvingJob).toHaveBeenCalledTimes(1); + expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(1); await jest.advanceTimersByTimeAsync(agentPollIntervalMs); expect(jobSource.getProvingJob).toHaveBeenCalledTimes(1); + expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(2); await jest.advanceTimersByTimeAsync(agentPollIntervalMs); expect(jobSource.getProvingJob).toHaveBeenCalledTimes(1); + expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(3); // let's resolve the proof const result = makePublicInputsAndRecursiveProof( @@ -121,7 +124,9 @@ describe('ProvingAgent', () => { agent.start(); await jest.advanceTimersByTimeAsync(agentPollIntervalMs); - expect(jobSource.reportProvingJobError).toHaveBeenCalledWith(job.id, 'test error', false, { allowList }); + expect(jobSource.reportProvingJobError).toHaveBeenCalledWith(job.id, expect.stringContaining('test error'), false, { + allowList, + }); }); it('sets the retry flag on when reporting an error', async () => { @@ -134,7 +139,9 @@ describe('ProvingAgent', () => { agent.start(); await jest.advanceTimersByTimeAsync(agentPollIntervalMs); - expect(jobSource.reportProvingJobError).toHaveBeenCalledWith(job.id, err.message, true, { allowList }); + expect(jobSource.reportProvingJobError).toHaveBeenCalledWith(job.id, expect.stringContaining(err.message), true, { + allowList, + }); }); it('reports jobs in progress to the job source', async () => { @@ -193,23 +200,32 @@ describe('ProvingAgent', () => { // this should cause the agent to abort the current job and start the new one const secondJobResponse = makeBaseParityJob(); - jobSource.reportProvingJobProgress.mockResolvedValueOnce(secondJobResponse); proofDB.getProofInput.mockResolvedValueOnce(secondJobResponse.inputs); const secondProof = promiseWithResolvers>(); jest.spyOn(prover, 'getBaseParityProof').mockReturnValueOnce(secondProof.promise); + jobSource.reportProvingJobProgress.mockResolvedValueOnce(secondJobResponse); + await jest.advanceTimersByTimeAsync(agentPollIntervalMs); - expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(3); - expect(jobSource.reportProvingJobProgress).toHaveBeenLastCalledWith(firstJob.job.id, firstJob.time, { + expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(4); + expect(jobSource.reportProvingJobProgress).toHaveBeenNthCalledWith(3, firstJob.job.id, firstJob.time, { allowList: [ProvingRequestType.BASE_PARITY], }); + expect(jobSource.reportProvingJobProgress).toHaveBeenNthCalledWith( + 4, + secondJobResponse.job.id, + secondJobResponse.time, + { + allowList: [ProvingRequestType.BASE_PARITY], + }, + ); expect(firstProofAborted).toBe(true); // agent should have switched now await jest.advanceTimersByTimeAsync(agentPollIntervalMs); - expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(4); + expect(jobSource.reportProvingJobProgress).toHaveBeenCalledTimes(5); expect(jobSource.reportProvingJobProgress).toHaveBeenLastCalledWith( secondJobResponse.job.id, secondJobResponse.time, @@ -217,8 +233,6 @@ describe('ProvingAgent', () => { allowList: [ProvingRequestType.BASE_PARITY], }, ); - - secondProof.resolve(makeBaseParityResult()); }); it('immediately starts working on the next job', async () => { diff --git a/yarn-project/prover-client/src/proving_broker/proving_agent.ts b/yarn-project/prover-client/src/proving_broker/proving_agent.ts index 61f3892df56f..15cd19ff1b3c 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_agent.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_agent.ts @@ -1,10 +1,10 @@ +import { AbortError } from '@aztec/foundation/error'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { truncate } from '@aztec/foundation/string'; -import { Timer } from '@aztec/foundation/timer'; import { ProvingError } from '@aztec/stdlib/errors'; import type { - ProvingJob, + GetProvingJobResponse, ProvingJobConsumer, ProvingJobId, ProvingJobInputs, @@ -31,7 +31,6 @@ export class ProvingAgent implements Traceable { private currentJobController?: ProvingJobController; private runningPromise: RunningPromise; private instrumentation: ProvingAgentInstrumentation; - private idleTimer: Timer | undefined; public readonly tracer: Tracer; @@ -64,7 +63,6 @@ export class ProvingAgent implements Traceable { } public start(): void { - this.idleTimer = new Timer(); this.runningPromise.start(); } @@ -75,39 +73,63 @@ export class ProvingAgent implements Traceable { @trackSpan('ProvingAgent.safeWork') private async work() { - // every tick we need to - // (1) either do a heartbeat, telling the broker that we're working - // (2) get a new job - // If during (1) the broker returns a new job that means we can cancel the current job and start the new one - let maybeJob: { job: ProvingJob; time: number } | undefined; - if (this.currentJobController?.getStatus() === ProvingJobControllerStatus.PROVING) { - maybeJob = await this.broker.reportProvingJobProgress( - this.currentJobController.getJobId(), - this.currentJobController.getStartedAt(), - { allowList: this.proofAllowList }, - ); + // every tick we need to take one of the following actions: + // 1. send a hearbeat to the broker that we're working on some job + // 2. if the job is complete, send its result to the broker + // 3. get a job from the broker + // Any one of these actions could give us a new job to work on. If that happens we abort the current job. + // + // This loop gets triggered in one of two ways: + // - either on a timer (see pollIntervalMs) + // - or when a proof completes + let maybeJob: GetProvingJobResponse | undefined; + + if (this.currentJobController) { + const status = this.currentJobController.getStatus(); + const jobId = this.currentJobController.getJobId(); + const proofType = this.currentJobController.getProofType(); + const startedAt = this.currentJobController.getStartedAt(); + const result = this.currentJobController.getResult(); + + if (status === ProvingJobControllerStatus.RUNNING) { + maybeJob = await this.broker.reportProvingJobProgress(jobId, startedAt, { allowList: this.proofAllowList }); + } else if (status === ProvingJobControllerStatus.DONE) { + if (result) { + maybeJob = await this.reportResult(jobId, proofType, result); + } else { + this.log.warn( + `Job controller for job ${this.currentJobController.getJobId()} is done but doesn't have a result`, + { jobId }, + ); + maybeJob = await this.reportResult( + jobId, + proofType, + new ProvingError('No result found after proving', undefined, /* retry */ true), + ); + } + + this.currentJobController = undefined; + } else { + // IDLE status should not be seen because a job is started as soon as it is created + this.log.warn(`Idle job controller for job: ${this.currentJobController.getJobId()}. Skipping main loop work`, { + jobId: this.currentJobController.getJobId(), + }); + return; + } } else { maybeJob = await this.broker.getProvingJob({ allowList: this.proofAllowList }); } - if (!maybeJob) { - return; - } - - if (this.idleTimer) { - this.instrumentation.recordIdleTime(this.idleTimer); + if (maybeJob) { + await this.startJob(maybeJob); } - this.idleTimer = undefined; - - const { job, time } = maybeJob; - await this.startJob(job, time); } - private async startJob(job: ProvingJob, startedAt: number): Promise { + private async startJob({ job, time: startedAt }: GetProvingJobResponse): Promise { let abortedProofJobId: string | undefined; let abortedProofName: string | undefined; - if (this.currentJobController?.getStatus() === ProvingJobControllerStatus.PROVING) { + if (this.currentJobController?.getStatus() === ProvingJobControllerStatus.RUNNING) { abortedProofJobId = this.currentJobController.getJobId(); abortedProofName = this.currentJobController.getProofTypeName(); this.currentJobController?.abort(); @@ -122,7 +144,7 @@ export class ProvingAgent implements Traceable { }); if (maybeJob) { - return this.startJob(maybeJob.job, maybeJob.time); + return this.startJob(maybeJob); } return; @@ -134,7 +156,11 @@ export class ProvingAgent implements Traceable { job.epochNumber, startedAt, this.circuitProver, - this.handleJobResult, + () => { + // trigger a run of the main work loop when proving completes + // no need to await this here. The controller will stay alive (in DONE state) until the result is send to the broker + void this.runningPromise.trigger(); + }, ); if (abortedProofJobId) { @@ -154,28 +180,30 @@ export class ProvingAgent implements Traceable { this.currentJobController.start(); } - handleJobResult = async ( + private async reportResult( jobId: ProvingJobId, type: T, - err: Error | undefined, - result: ProvingJobResultsMap[T] | undefined, - ) => { - let maybeJob: { job: ProvingJob; time: number } | undefined; - if (err) { - const retry = err.name === ProvingError.NAME ? (err as ProvingError).retry : false; - this.log.error(`Job id=${jobId} type=${ProvingRequestType[type]} failed err=${err.message} retry=${retry}`, err); - maybeJob = await this.broker.reportProvingJobError(jobId, err.message, retry, { allowList: this.proofAllowList }); - } else if (result) { + result: ProvingJobResultsMap[T] | Error, + ): Promise { + let maybeJob: GetProvingJobResponse | undefined; + if (result instanceof AbortError) { + // no-op + this.log.warn(`Job id=${jobId} was aborted. Not reporting result back to broker`, result); + } else if (result instanceof Error) { + const retry = result.name === ProvingError.NAME ? (result as ProvingError).retry : false; + this.log.error( + `Job id=${jobId} type=${ProvingRequestType[type]} failed err=${result.message} retry=${retry}`, + result, + ); + maybeJob = await this.broker.reportProvingJobError(jobId, result.message, retry, { + allowList: this.proofAllowList, + }); + } else { const outputUri = await this.proofStore.saveProofOutput(jobId, type, result); this.log.info(`Job id=${jobId} type=${ProvingRequestType[type]} completed outputUri=${truncate(outputUri)}`); maybeJob = await this.broker.reportProvingJobSuccess(jobId, outputUri, { allowList: this.proofAllowList }); } - if (maybeJob) { - const { job, time } = maybeJob; - await this.startJob(job, time); - } else { - this.idleTimer = new Timer(); - } - }; + return maybeJob; + } } diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker_agent_integration.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker_agent_integration.test.ts new file mode 100644 index 000000000000..634ab18a81d2 --- /dev/null +++ b/yarn-project/prover-client/src/proving_broker/proving_broker_agent_integration.test.ts @@ -0,0 +1,141 @@ +import { times } from '@aztec/foundation/collection'; +import { randomInt, sha256 } from '@aztec/foundation/crypto'; +import { createLogger } from '@aztec/foundation/log'; +import { promiseWithResolvers } from '@aztec/foundation/promise'; +import { sleep } from '@aztec/foundation/sleep'; +import { ProvingJob, makeProvingJobId } from '@aztec/stdlib/interfaces/server'; +import { ProvingRequestType } from '@aztec/stdlib/proofs'; +import { makeBaseParityInputs, makeParityPublicInputs } from '@aztec/stdlib/testing'; + +import { jest } from '@jest/globals'; + +import { MockProver } from '../test/mock_prover.js'; +import { InlineProofStore } from './proof_store/inline_proof_store.js'; +import type { ProofStore } from './proof_store/proof_store.js'; +import { ProvingAgent } from './proving_agent.js'; +import { ProvingBroker } from './proving_broker.js'; +import { InMemoryBrokerDatabase } from './proving_broker_database/memory.js'; + +const AGENTS = 5; +const TOTAL_JOBS = 200; +const FAILURE_RATE = 0.5; +const JOB_TIMEOUT = 3000; +const WORK_LOOP = 100; + +describe('ProvingBroker <-> ProvingAgent integration', () => { + let broker: ProvingBroker; + let agents: ProvingAgent[]; + let prover: MockProver; + let store: ProofStore; + + beforeEach(async () => { + broker = new ProvingBroker(new InMemoryBrokerDatabase(), { + proverBrokerJobTimeoutMs: JOB_TIMEOUT, + proverBrokerJobMaxRetries: 3, + proverBrokerPollIntervalMs: WORK_LOOP, + proverBrokerMaxEpochsToKeepResultsFor: 1, + }); + + addBrokerDelay('getProvingJob', 5, 50); + addBrokerDelay('reportProvingJobProgress', 1, 10); + addBrokerDelay('reportProvingJobSuccess', 20, 200); // this delay is longer because there's more data to upload + addBrokerDelay('reportProvingJobError', 1, 10); + + prover = new MockProver(); + store = new InlineProofStore(); + agents = times( + AGENTS, + i => new ProvingAgent(broker, store, prover, [], WORK_LOOP, undefined, createLogger('prover-agent-' + i)), + ); + + await broker.start(); + agents.forEach(agent => agent.start()); + }); + + afterEach(async () => { + await Promise.all(agents.map(agent => agent.stop())); + await broker.stop(); + }); + + it('completes job queue', async () => { + const jobs: Record = {}; + const deferreds: Record> = {}; + const signals: Record = {}; + + const duplicateJobs: string[] = []; + + jest.spyOn(prover, 'getBaseParityProof').mockImplementation((inputs, signal) => { + const inputsHash = sha256(inputs.toBuffer()); + const id = makeProvingJobId(0, ProvingRequestType.BASE_PARITY, inputsHash.toString('hex')); + // job was given to two agents + if (deferreds[id]) { + duplicateJobs.push(id); + return deferreds[id].promise; + } + signals[id] = signal!; + deferreds[id] = promiseWithResolvers(); + return deferreds[id].promise; + }); + + const enqueueRandomJob = async () => { + while (true) { + const inputs = makeBaseParityInputs(randomInt(Number.MAX_SAFE_INTEGER)); + const inputsHash = sha256(inputs.toBuffer()); + const id = makeProvingJobId(0, ProvingRequestType.BASE_PARITY, inputsHash.toString('hex')); + if (jobs[id]) { + continue; + } + + jobs[id] = { + id, + type: ProvingRequestType.BASE_PARITY, + inputsUri: await store.saveProofInput(id, ProvingRequestType.BASE_PARITY, inputs), + epochNumber: 0, + }; + await broker.enqueueProvingJob(jobs[id]); + break; + } + }; + + const resolveRandomActiveJobs = (count = 1, failureRate = 0): number => { + const pendingJobs = Object.entries(deferreds); + let completed = 0; + while (pendingJobs.length > 0 && completed < count) { + const [[id, deferred]] = pendingJobs.splice(randomInt(pendingJobs.length), 1); + if (Math.random() < failureRate) { + deferred.reject(new Error('test error')); + } else { + deferred.resolve(makeParityPublicInputs()); + } + delete deferreds[id]; + completed++; + } + return completed; + }; + + for (let i = 0; i < TOTAL_JOBS; i++) { + await enqueueRandomJob(); + } + + let completed = 0; + while (completed < TOTAL_JOBS) { + completed += resolveRandomActiveJobs(Math.ceil(agents.length / 2), FAILURE_RATE); + + // make sure no jobs have been cancelled + expect(Object.values(signals).some(signal => signal.aborted)).toBe(false); + // and no jobs have been double booked + expect(duplicateJobs).toEqual([]); + + await sleep(WORK_LOOP); + } + }); + + function addBrokerDelay(fn: keyof ProvingBroker, minDelay: number, maxDelay: number): void { + const original = broker[fn] as any; + const spy = jest.spyOn(broker, fn as any); + spy.mockImplementation(async (...args) => { + await sleep(minDelay + Math.random() * (maxDelay - minDelay)); + return original.apply(broker, args); + }); + } +}); diff --git a/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts b/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts index fff18c9d28bf..6878c2f76f80 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_job_controller.test.ts @@ -1,4 +1,5 @@ import { RECURSIVE_PROOF_LENGTH } from '@aztec/constants'; +import { AbortError } from '@aztec/foundation/error'; import { promiseWithResolvers } from '@aztec/foundation/promise'; import { sleep } from '@aztec/foundation/sleep'; import { type ProvingJobId, makePublicInputsAndRecursiveProof } from '@aztec/stdlib/interfaces/server'; @@ -38,7 +39,7 @@ describe('ProvingJobController', () => { it('reports PROVING status while busy', () => { controller.start(); - expect(controller.getStatus()).toBe(ProvingJobControllerStatus.PROVING); + expect(controller.getStatus()).toBe(ProvingJobControllerStatus.RUNNING); }); it('reports DONE status after job is done', async () => { @@ -47,14 +48,15 @@ describe('ProvingJobController', () => { expect(controller.getStatus()).toBe(ProvingJobControllerStatus.DONE); }); - it('reports ABORTED status after job is aborted', async () => { + it('reports aborted error after cancellation', async () => { controller.start(); controller.abort(); await sleep(1); // give promises a chance to complete - expect(controller.getStatus()).toBe(ProvingJobControllerStatus.ABORTED); + expect(controller.getStatus()).toBe(ProvingJobControllerStatus.DONE); + expect(controller.getResult()).toBeInstanceOf(AbortError); }); - it('calls onComplete with the proof', async () => { + it('calls onComplete', async () => { const resp = makePublicInputsAndRecursiveProof( makeParityPublicInputs(), makeRecursiveProof(RECURSIVE_PROOF_LENGTH), @@ -64,7 +66,8 @@ describe('ProvingJobController', () => { controller.start(); await sleep(1); // give promises a chance to complete - expect(onComplete).toHaveBeenCalledWith('1', ProvingRequestType.BASE_PARITY, undefined, resp); + expect(onComplete).toHaveBeenCalled(); + expect(controller.getResult()).toEqual(resp); }); it('calls onComplete with the error', async () => { @@ -73,7 +76,8 @@ describe('ProvingJobController', () => { controller.start(); await sleep(1); - expect(onComplete).toHaveBeenCalledWith('1', ProvingRequestType.BASE_PARITY, err, undefined); + expect(onComplete).toHaveBeenCalled(); + expect(controller.getResult()).toEqual(err); }); it('does not crash if onComplete throws', async () => { @@ -85,18 +89,10 @@ describe('ProvingJobController', () => { controller.start(); await sleep(1); expect(onComplete).toHaveBeenCalled(); + expect(controller.getResult()).toBeDefined(); }); - it('does not crash if onComplete rejects', async () => { - const err = new Error('test error'); - onComplete.mockRejectedValueOnce(err); - - controller.start(); - await sleep(1); - expect(onComplete).toHaveBeenCalled(); - }); - - it('does not call onComplete if abort is called', async () => { + it('calls onComplete if abort is called but result is masked', async () => { const { promise, resolve } = promiseWithResolvers(); jest.spyOn(prover, 'getBaseParityProof').mockReturnValueOnce(promise); @@ -119,6 +115,7 @@ describe('ProvingJobController', () => { ); await sleep(1); - expect(onComplete).not.toHaveBeenCalled(); + expect(onComplete).toHaveBeenCalled(); + expect(controller.getResult()).toBeInstanceOf(AbortError); }); }); diff --git a/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts b/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts index 21aebcefba43..61654d2a9208 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_job_controller.ts @@ -1,3 +1,5 @@ +import { randomBytes } from '@aztec/foundation/crypto'; +import { AbortError } from '@aztec/foundation/error'; import { createLogger } from '@aztec/foundation/log'; import type { ProvingJobId, @@ -9,24 +11,15 @@ import { ProvingRequestType } from '@aztec/stdlib/proofs'; export enum ProvingJobControllerStatus { IDLE = 'idle', - PROVING = 'proving', + RUNNING = 'running', DONE = 'done', - ABORTED = 'aborted', -} - -interface ProvingJobCompletionCallback { - ( - jobId: ProvingJobId, - type: T, - error: Error | undefined, - result: ProvingJobResultsMap[T] | undefined, - ): void | Promise; } export class ProvingJobController { private status: ProvingJobControllerStatus = ProvingJobControllerStatus.IDLE; private promise?: Promise; private abortController = new AbortController(); + private result?: ProvingJobResultsMap[ProvingRequestType] | Error; constructor( private jobId: ProvingJobId, @@ -34,13 +27,13 @@ export class ProvingJobController { private epochNumber: number, private startedAt: number, private circuitProver: ServerCircuitProver, - private onComplete: ProvingJobCompletionCallback, - private log = createLogger('prover-client:proving-agent:job-controller'), + private onComplete: () => void, + private log = createLogger('prover-client:proving-agent:job-controller-' + randomBytes(4).toString('hex')), ) {} public start(): void { if (this.status !== ProvingJobControllerStatus.IDLE) { - this.log.verbose( + this.log.warn( `Job controller for jobId=${this.jobId} not starting because it is not idle currentStatus=${this.status}`, { currentStatus: this.status, @@ -50,63 +43,23 @@ export class ProvingJobController { return; } - this.status = ProvingJobControllerStatus.PROVING; - this.log.verbose(`Job controller started jobId=${this.jobId}`, { + this.promise = this.run(); + + this.log.info(`Job controller started jobId=${this.jobId}`, { jobId: this.jobId, }); - - this.promise = this.generateProof() - .then( - result => { - if (this.status === ProvingJobControllerStatus.ABORTED) { - this.log.warn(`Job controller for jobId=${this.jobId} completed successfully but job was aborted`, { - currentStatus: this.status, - jobId: this.jobId, - }); - return; - } - this.status = ProvingJobControllerStatus.DONE; - this.log.verbose(`Job controller for jobId=${this.jobId} completed successfully`, { - jobId: this.jobId, - }); - return this.onComplete(this.jobId, this.inputs.type, undefined, result); - }, - error => { - if (this.status === ProvingJobControllerStatus.ABORTED) { - this.log.warn(`Job controller for jobId=${this.jobId} finished with an error but job was aborted`, { - currentStatus: this.status, - jobId: this.jobId, - }); - return; - } - - if (error.name === 'AbortError') { - // Ignore abort errors - return; - } - - this.log.verbose(`Job controller for jobId=${this.jobId} finished with an error`, { - jobId: this.jobId, - err: error, - }); - - this.status = ProvingJobControllerStatus.DONE; - return this.onComplete(this.jobId, this.inputs.type, error, undefined); - }, - ) - .catch(err => { - this.log.error(`Job constroller failed to send result for jobId=${this.jobId}: ${err}`, err, { - jobId: this.jobId, - }); - }); } public getStatus(): ProvingJobControllerStatus { return this.status; } + public getResult(): ProvingJobResultsMap[ProvingRequestType] | Error | undefined { + return this.result; + } + public abort(): void { - if (this.status !== ProvingJobControllerStatus.PROVING) { + if (this.status !== ProvingJobControllerStatus.RUNNING) { this.log.warn(`Tried to abort job controller for jobId=${this.jobId} but it is not running`, { currentStatus: this.status, jobId: this.jobId, @@ -114,9 +67,8 @@ export class ProvingJobController { return; } - this.status = ProvingJobControllerStatus.ABORTED; this.abortController.abort(); - this.log.verbose(`Aborted job controller for jobId=${this.jobId}`, { + this.log.warn(`Aborted job controller for jobId=${this.jobId}`, { jobId: this.jobId, }); } @@ -125,6 +77,10 @@ export class ProvingJobController { return this.jobId; } + public getProofType(): ProvingRequestType { + return this.inputs.type; + } + public getStartedAt(): number { return this.startedAt; } @@ -133,6 +89,36 @@ export class ProvingJobController { return ProvingRequestType[this.inputs.type]; } + private run = async () => { + this.status = ProvingJobControllerStatus.RUNNING; + let result: ProvingJobResultsMap[ProvingRequestType] | Error; + try { + result = await this.generateProof(); + } catch (err) { + if (err && err instanceof Error) { + result = err; + } else { + result = new Error('Unknown proving error: ' + String(err), { cause: err }); + } + } + + if (this.abortController.signal.aborted) { + this.log.warn(`Job controller for jobId=${this.jobId} completed but job was aborted`, { + currentStatus: this.status, + jobId: this.jobId, + }); + result = new AbortError('Proof was aborted'); + } + + this.result = result; + this.status = ProvingJobControllerStatus.DONE; + try { + this.onComplete(); + } catch (err) { + this.log.warn(`On complete handler error: ${err}`, { jobId: this.jobId }); + } + }; + private async generateProof(): Promise { const { type, inputs } = this.inputs; const signal = this.abortController.signal;