diff --git a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts index c9537af6c601..8c45fe0f55cd 100644 --- a/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts +++ b/yarn-project/prover-client/src/proving_broker/broker_prover_facade.ts @@ -4,7 +4,9 @@ import type { NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH, RECURSIVE_PROOF_LENGTH, } from '@aztec/constants'; +import { asyncPool } from '@aztec/foundation/async-pool'; import { EpochNumber } from '@aztec/foundation/branded-types'; +import { chunk } from '@aztec/foundation/collection'; import { type Logger, type LoggerBindings, createLogger } from '@aztec/foundation/log'; import { type PromiseWithResolvers, RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; import { truncate } from '@aztec/foundation/string'; @@ -226,17 +228,11 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { // We collect all returned notifications and return them const allCompleted = new Set(); try { - let numRequests = 0; - while (ids.length > 0) { - const slice = ids.splice(0, SNAPSHOT_SYNC_CHECK_MAX_REQUEST_SIZE); - const completed = await this.broker.getCompletedJobs(slice); + const batches = ids.length > 0 ? chunk(ids, SNAPSHOT_SYNC_CHECK_MAX_REQUEST_SIZE) : [[]]; + await asyncPool(1, batches, async batch => { + const completed = await this.broker.getCompletedJobs(batch); completed.forEach(id => allCompleted.add(id)); - ++numRequests; - } - if (numRequests === 0) { - const final = await this.broker.getCompletedJobs([]); - final.forEach(id => allCompleted.add(id)); - } + }); } catch (err) { this.log.error(`Error thrown when requesting completed job notifications from the broker`, err); } @@ -352,12 +348,8 @@ export class BrokerCircuitProverFacade implements ServerCircuitProver { .map(id => this.jobs.get(id)!) .filter(x => x !== undefined); const totalJobsToRetrieve = toBeRetrieved.length; - let totalJobsRetrieved = 0; - while (toBeRetrieved.length > 0) { - const slice = toBeRetrieved.splice(0, MAX_CONCURRENT_JOB_SETTLED_REQUESTS); - const results = await Promise.all(slice.map(job => processJob(job!))); - totalJobsRetrieved += results.filter(x => x).length; - } + const results = await asyncPool(MAX_CONCURRENT_JOB_SETTLED_REQUESTS, toBeRetrieved, job => processJob(job)); + const totalJobsRetrieved = results.filter(x => x).length; if (totalJobsToRetrieve > 0) { this.log.verbose( `Successfully retrieved ${totalJobsRetrieved} of ${totalJobsToRetrieve} jobs that should be ready, total ready jobs is now: ${this.jobsToRetrieve.size}`,