diff --git a/yarn-project/archiver/src/test/mock_l2_block_source.ts b/yarn-project/archiver/src/test/mock_l2_block_source.ts index 3d06e42e7391..c767266a449d 100644 --- a/yarn-project/archiver/src/test/mock_l2_block_source.ts +++ b/yarn-project/archiver/src/test/mock_l2_block_source.ts @@ -17,7 +17,12 @@ import { } from '@aztec/stdlib/block'; import { Checkpoint, L1PublishedData, PublishedCheckpoint } from '@aztec/stdlib/checkpoint'; import type { ContractClassPublic, ContractDataSource, ContractInstanceWithAddress } from '@aztec/stdlib/contract'; -import { EmptyL1RollupConstants, type L1RollupConstants, getSlotRangeForEpoch } from '@aztec/stdlib/epoch-helpers'; +import { + EmptyL1RollupConstants, + type L1RollupConstants, + getEpochAtSlot, + getSlotRangeForEpoch, +} from '@aztec/stdlib/epoch-helpers'; import { type BlockHeader, TxExecutionResult, TxHash, TxReceipt, TxStatus } from '@aztec/stdlib/tx'; import type { UInt64 } from '@aztec/stdlib/types'; @@ -30,6 +35,7 @@ export class MockL2BlockSource implements L2BlockSource, ContractDataSource { private provenBlockNumber: number = 0; private finalizedBlockNumber: number = 0; private checkpointedBlockNumber: number = 0; + private currentSlotNumber: SlotNumber = SlotNumber(0); private log = createLogger('archiver:mock_l2_block_source'); @@ -40,6 +46,7 @@ export class MockL2BlockSource implements L2BlockSource, ContractDataSource { this.l2Blocks.push(block); } + this.currentSlotNumber = SlotNumber(this.l2Blocks[this.l2Blocks.length - 1].header.globalVariables.slotNumber); this.log.verbose(`Created ${numBlocks} blocks in the mock L2 block source`); } @@ -68,6 +75,10 @@ export class MockL2BlockSource implements L2BlockSource, ContractDataSource { this.checkpointedBlockNumber = checkpointedBlockNumber; } + public setCurrentSlotNumber(slot: SlotNumber) { + this.currentSlotNumber = slot; + } + /** * Method to fetch the rollup contract address at the base-layer. * @returns The rollup address. @@ -396,11 +407,12 @@ export class MockL2BlockSource implements L2BlockSource, ContractDataSource { } getL2EpochNumber(): Promise { - throw new Error('Method not implemented.'); + const epochDuration = DefaultL1ContractsConfig.aztecEpochDuration; + return Promise.resolve(getEpochAtSlot(this.currentSlotNumber, { epochDuration })); } getL2SlotNumber(): Promise { - throw new Error('Method not implemented.'); + return Promise.resolve(this.currentSlotNumber); } isEpochComplete(_epochNumber: EpochNumber): Promise { diff --git a/yarn-project/end-to-end/src/fixtures/e2e_prover_test.ts b/yarn-project/end-to-end/src/fixtures/e2e_prover_test.ts index 004c672d2afb..8cec1540d153 100644 --- a/yarn-project/end-to-end/src/fixtures/e2e_prover_test.ts +++ b/yarn-project/end-to-end/src/fixtures/e2e_prover_test.ts @@ -255,6 +255,7 @@ export class FullProverTest { txGatheringTimeoutMs: 24_000, proverNodeFailedEpochStore: undefined, proverNodeEpochProvingDelayMs: undefined, + proverNodeOptimisticProcessing: true, }; const sponsoredFPCAddress = await getSponsoredFPCAddress(); const { prefilledPublicData } = await getGenesisValues( diff --git a/yarn-project/end-to-end/src/fixtures/setup.ts b/yarn-project/end-to-end/src/fixtures/setup.ts index 32d4255595b2..9eb263eac163 100644 --- a/yarn-project/end-to-end/src/fixtures/setup.ts +++ b/yarn-project/end-to-end/src/fixtures/setup.ts @@ -739,6 +739,7 @@ export function createAndSyncProverNode( proverNodeFailedEpochStore: undefined, proverId: EthAddress.fromNumber(1), proverNodeEpochProvingDelayMs: undefined, + proverNodeOptimisticProcessing: true, ...proverNodeConfig, }; diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 7d3d4bcf32d1..89e956977ae6 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -171,6 +171,7 @@ export type EnvVar = | 'PROVER_NODE_TX_GATHERING_BATCH_SIZE' | 'PROVER_NODE_TX_GATHERING_MAX_PARALLEL_REQUESTS_PER_NODE' | 'PROVER_NODE_TX_GATHERING_TIMEOUT_MS' + | 'PROVER_NODE_OPTIMISTIC_PROCESSING' | 'PROVER_PUBLISHER_PRIVATE_KEY' | 'PROVER_PUBLISHER_PRIVATE_KEYS' | 'PROVER_PUBLISHER_ADDRESSES' diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 3afbcf88dfe4..bd2b0205b7aa 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -203,6 +203,8 @@ export class P2PClient break; case 'chain-checkpointed': break; + case 'epoch-completed': + break; default: { const _: never = event; break; diff --git a/yarn-project/prover-client/src/orchestrator/checkpoint-proving-state.ts b/yarn-project/prover-client/src/orchestrator/checkpoint-proving-state.ts index 7b92edcd2d6e..58fa9a4d5a55 100644 --- a/yarn-project/prover-client/src/orchestrator/checkpoint-proving-state.ts +++ b/yarn-project/prover-client/src/orchestrator/checkpoint-proving-state.ts @@ -68,7 +68,7 @@ export class CheckpointProvingState { public readonly index: number, public readonly constants: CheckpointConstantData, public readonly totalNumBlocks: number, - private readonly finalBlobBatchingChallenges: FinalBlobBatchingChallenges, + private finalBlobBatchingChallenges: FinalBlobBatchingChallenges | undefined, private readonly headerOfLastBlockInPreviousCheckpoint: BlockHeader, private readonly lastArchiveSiblingPath: Tuple, private readonly l1ToL2Messages: Fr[], @@ -91,6 +91,11 @@ export class CheckpointProvingState { this.firstBlockNumber = BlockNumber(headerOfLastBlockInPreviousCheckpoint.globalVariables.blockNumber + 1); } + /** Sets the final blob batching challenges. Called when epoch structure is known. */ + public setFinalBlobBatchingChallenges(challenges: FinalBlobBatchingChallenges) { + this.finalBlobBatchingChallenges = challenges; + } + public get epochNumber(): number { return this.parentEpoch.epochNumber; } @@ -283,6 +288,9 @@ export class CheckpointProvingState { if (!this.startBlobAccumulator) { throw new Error('Start blob accumulator is not set.'); } + if (!this.finalBlobBatchingChallenges) { + throw new Error('Final blob batching challenges are not set.'); + } // `blobFields` must've been set if `startBlobAccumulator` is set (in `accumulateBlobs`). const blobFields = this.blobFields!; diff --git a/yarn-project/prover-client/src/orchestrator/epoch-proving-state.ts b/yarn-project/prover-client/src/orchestrator/epoch-proving-state.ts index 36fbd0856ab8..f3c1c063e3d0 100644 --- a/yarn-project/prover-client/src/orchestrator/epoch-proving-state.ts +++ b/yarn-project/prover-client/src/orchestrator/epoch-proving-state.ts @@ -51,19 +51,25 @@ export type ProvingResult = { status: 'success' } | { status: 'failure'; reason: * Captures resolve and reject callbacks to provide a promise base interface to the consumer of our proving. */ export class EpochProvingState { - private checkpointProofs: UnbalancedTreeStore< - ProofState - >; + // Deferred until setStructure() is called. + private checkpointProofs: + | UnbalancedTreeStore> + | undefined; private checkpointPaddingProof: | ProofState | undefined; private rootRollupProof: ProofState | undefined; private checkpoints: (CheckpointProvingState | undefined)[] = []; - private startBlobAccumulator: BatchedBlobAccumulator; + // Deferred until setStructure() is called. + private startBlobAccumulator: BatchedBlobAccumulator | undefined; private endBlobAccumulator: BatchedBlobAccumulator | undefined; private finalBatchedBlob: BatchedBlob | undefined; private provingStateLifecycle = PROVING_STATE_LIFECYCLE.PROVING_STATE_CREATED; + // Deferred until setStructure() is called. + private _totalNumCheckpoints: number | undefined; + private _finalBlobBatchingChallenges: FinalBlobBatchingChallenges | undefined; + // Map from tx hash to chonk verifier proof promise. Used when kickstarting chonk verifier proofs before tx processing. public readonly cachedChonkVerifierProofs = new Map< string, @@ -74,12 +80,28 @@ export class EpochProvingState { constructor( public readonly epochNumber: EpochNumber, - public readonly totalNumCheckpoints: number, - private readonly finalBlobBatchingChallenges: FinalBlobBatchingChallenges, private onCheckpointBlobAccumulatorSet: (checkpoint: CheckpointProvingState) => void, private completionCallback: (result: ProvingResult) => void, private rejectionCallback: (reason: string) => void, - ) { + ) {} + + /** Returns the total number of checkpoints. Throws if structure has not been set. */ + public get totalNumCheckpoints(): number { + if (this._totalNumCheckpoints === undefined) { + throw new Error('Epoch structure not set. Call setStructure() first.'); + } + return this._totalNumCheckpoints; + } + + /** Returns true if the epoch structure (totalNumCheckpoints, finalBlobBatchingChallenges) has been set. */ + public hasStructure(): boolean { + return this._totalNumCheckpoints !== undefined; + } + + /** Sets the epoch structure. Called when the epoch is complete. */ + public setStructure(totalNumCheckpoints: number, finalBlobBatchingChallenges: FinalBlobBatchingChallenges) { + this._totalNumCheckpoints = totalNumCheckpoints; + this._finalBlobBatchingChallenges = finalBlobBatchingChallenges; this.checkpointProofs = new UnbalancedTreeStore(totalNumCheckpoints); this.startBlobAccumulator = BatchedBlobAccumulator.newWithChallenges(finalBlobBatchingChallenges); } @@ -98,7 +120,8 @@ export class EpochProvingState { newL1ToL2MessageTreeSnapshot: AppendOnlyTreeSnapshot, newL1ToL2MessageSubtreeRootSiblingPath: Tuple, ): CheckpointProvingState { - if (checkpointIndex >= this.totalNumCheckpoints) { + // If structure is already set, validate the checkpoint index. + if (this.hasStructure() && checkpointIndex >= this.totalNumCheckpoints) { throw new Error( `Unable to start a new checkpoint at index ${checkpointIndex}. Expected at most ${this.totalNumCheckpoints} checkpoints.`, ); @@ -108,7 +131,7 @@ export class EpochProvingState { checkpointIndex, constants, totalNumBlocks, - this.finalBlobBatchingChallenges, + this._finalBlobBatchingChallenges, previousBlockHeader, lastArchiveSiblingPath, l1ToL2Messages, @@ -121,7 +144,7 @@ export class EpochProvingState { ); this.checkpoints[checkpointIndex] = checkpoint; - if (this.checkpoints.filter(c => !!c).length === this.totalNumCheckpoints) { + if (this.hasStructure() && this.checkpoints.filter(c => !!c).length === this.totalNumCheckpoints) { this.provingStateLifecycle = PROVING_STATE_LIFECYCLE.PROVING_STATE_FULL; } @@ -155,6 +178,10 @@ export class EpochProvingState { // Returns true if we are still able to accept checkpoints, false otherwise. public isAcceptingCheckpoints() { + // If structure isn't set yet, always accept checkpoints (we don't know the limit). + if (!this.hasStructure()) { + return true; + } return this.checkpoints.filter(c => !!c).length < this.totalNumCheckpoints; } @@ -165,14 +192,14 @@ export class EpochProvingState { typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH >, ): TreeNodeLocation { - return this.checkpointProofs.setLeaf(checkpointIndex, { provingOutput }); + return this.getCheckpointProofs().setLeaf(checkpointIndex, { provingOutput }); } public tryStartProvingCheckpointMerge(location: TreeNodeLocation) { - if (this.checkpointProofs.getNode(location)?.isProving) { + if (this.getCheckpointProofs().getNode(location)?.isProving) { return false; } else { - this.checkpointProofs.setNode(location, { isProving: true }); + this.getCheckpointProofs().setNode(location, { isProving: true }); return true; } } @@ -184,7 +211,7 @@ export class EpochProvingState { typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH >, ) { - this.checkpointProofs.setNode(location, { provingOutput }); + this.getCheckpointProofs().setNode(location, { provingOutput }); } public tryStartProvingRootRollup() { @@ -219,6 +246,11 @@ export class EpochProvingState { } public async accumulateCheckpointOutHashes() { + // Cannot accumulate until structure is known. + if (!this.hasStructure()) { + return; + } + const treeCalculator = await MerkleTreeCalculator.create(OUT_HASH_TREE_HEIGHT, undefined, (left, right) => Promise.resolve(shaMerkleHash(left, right)), ); @@ -261,6 +293,11 @@ export class EpochProvingState { } public async setBlobAccumulators() { + // Cannot accumulate until structure is known and start blob accumulator is created. + if (!this.hasStructure() || !this.startBlobAccumulator) { + return; + } + let previousAccumulator = this.startBlobAccumulator; // Accumulate blobs as far as we can for this epoch. for (let i = 0; i < this.totalNumCheckpoints; i++) { @@ -292,11 +329,13 @@ export class EpochProvingState { } public getParentLocation(location: TreeNodeLocation) { - return this.checkpointProofs.getParentLocation(location); + return this.getCheckpointProofs().getParentLocation(location); } public getCheckpointMergeRollupInputs(mergeLocation: TreeNodeLocation) { - const [left, right] = this.checkpointProofs.getChildren(mergeLocation).map(c => c?.provingOutput); + const [left, right] = this.getCheckpointProofs() + .getChildren(mergeLocation) + .map(c => c?.provingOutput); if (!left || !right) { throw new Error('At least one child is not ready for the checkpoint merge rollup.'); } @@ -334,6 +373,9 @@ export class EpochProvingState { } public isReadyForCheckpointMerge(location: TreeNodeLocation) { + if (!this.checkpointProofs) { + return false; + } return !!this.checkpointProofs.getSibling(location)?.provingOutput; } @@ -368,11 +410,20 @@ export class EpochProvingState { this.completionCallback(result); } + /** Returns the checkpointProofs store, asserting that structure has been set. */ + private getCheckpointProofs() { + if (!this.checkpointProofs) { + throw new Error('Epoch structure not set. Call setStructure() first.'); + } + return this.checkpointProofs; + } + #getChildProofsForRoot() { const rootLocation = { level: 0, index: 0 }; + const proofs = this.getCheckpointProofs(); // If there's only 1 block, its block root proof will be stored at the root. return this.totalNumCheckpoints === 1 - ? [this.checkpointProofs.getNode(rootLocation)?.provingOutput, this.checkpointPaddingProof?.provingOutput] - : this.checkpointProofs.getChildren(rootLocation).map(c => c?.provingOutput); + ? [proofs.getNode(rootLocation)?.provingOutput, this.checkpointPaddingProof?.provingOutput] + : proofs.getChildren(rootLocation).map(c => c?.provingOutput); } } diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator.ts b/yarn-project/prover-client/src/orchestrator/orchestrator.ts index 4ac203a6c271..6c2808f95cbe 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator.ts @@ -129,11 +129,7 @@ export class ProvingOrchestrator implements EpochProver { return Promise.resolve(); } - public startNewEpoch( - epochNumber: EpochNumber, - totalNumCheckpoints: number, - finalBlobBatchingChallenges: FinalBlobBatchingChallenges, - ) { + public startNewEpoch(epochNumber: EpochNumber) { if (this.provingState?.verifyState()) { throw new Error( `Cannot start epoch ${epochNumber} when epoch ${this.provingState.epochNumber} is still being processed.`, @@ -142,11 +138,9 @@ export class ProvingOrchestrator implements EpochProver { const { promise: _promise, resolve, reject } = promiseWithResolvers(); const promise = _promise.catch((reason): ProvingResult => ({ status: 'failure', reason })); - this.logger.info(`Starting epoch ${epochNumber} with ${totalNumCheckpoints} checkpoints.`); + this.logger.info(`Starting epoch ${epochNumber}. Block-level proving can begin immediately.`); this.provingState = new EpochProvingState( epochNumber, - totalNumCheckpoints, - finalBlobBatchingChallenges, provingState => this.checkAndEnqueueCheckpointRootRollup(provingState), resolve, reject, @@ -154,6 +148,32 @@ export class ProvingOrchestrator implements EpochProver { this.provingPromise = promise; } + public async setEpochStructure( + totalNumCheckpoints: number, + finalBlobBatchingChallenges: FinalBlobBatchingChallenges, + ) { + if (!this.provingState) { + throw new Error('Empty epoch proving state. Call startNewEpoch before setting epoch structure.'); + } + + this.logger.info( + `Setting epoch ${this.provingState.epochNumber} structure with ${totalNumCheckpoints} checkpoints.`, + ); + this.provingState.setStructure(totalNumCheckpoints, finalBlobBatchingChallenges); + + // Update blob batching challenges on all existing checkpoints. + for (let i = 0; i < totalNumCheckpoints; i++) { + const checkpoint = this.provingState.getCheckpointProvingState(i); + if (checkpoint) { + checkpoint.setFinalBlobBatchingChallenges(finalBlobBatchingChallenges); + } + } + + // Re-trigger accumulation for all completed checkpoints. + await this.provingState.accumulateCheckpointOutHashes(); + await this.provingState.setBlobAccumulators(); + } + /** * Starts a new checkpoint. * @param checkpointIndex - The index of the checkpoint in the epoch. diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator_errors.test.ts b/yarn-project/prover-client/src/orchestrator/orchestrator_errors.test.ts index 6f5e642e54fd..658863edc8a6 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator_errors.test.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator_errors.test.ts @@ -39,7 +39,8 @@ describe('prover/orchestrator/errors', () => { describe('errors', () => { it('throws if adding too many transactions', async () => { - orchestrator.startNewEpoch(EpochNumber(1), 1 /* numCheckpoints */, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1 /* numCheckpoints */, finalBlobChallenges); await orchestrator.startNewCheckpoint( 0, // checkpointIndex @@ -58,7 +59,8 @@ describe('prover/orchestrator/errors', () => { }); it('throws if adding too many blocks', async () => { - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await orchestrator.startNewCheckpoint( 0, // checkpointIndex @@ -79,7 +81,8 @@ describe('prover/orchestrator/errors', () => { }); it('throws if adding empty block as non-first block', async () => { - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await orchestrator.startNewCheckpoint( 0, // checkpointIndex @@ -103,7 +106,8 @@ describe('prover/orchestrator/errors', () => { }); it('throws if adding a transaction before starting checkpoint', async () => { - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await expect(async () => await orchestrator.addTxs(block.txs)).rejects.toThrow( /Proving state for block 1 not found/, @@ -111,7 +115,8 @@ describe('prover/orchestrator/errors', () => { }); it('throws if adding a transaction before starting block', async () => { - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await orchestrator.startNewCheckpoint( 0, // checkpointIndex constants, @@ -125,7 +130,8 @@ describe('prover/orchestrator/errors', () => { }); it('throws if completing a block before start', async () => { - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await orchestrator.startNewCheckpoint( 0, // checkpointIndex constants, @@ -139,7 +145,8 @@ describe('prover/orchestrator/errors', () => { }); it('throws if adding to a cancelled block', async () => { - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await orchestrator.startNewCheckpoint( 0, // checkpointIndex constants, @@ -158,7 +165,8 @@ describe('prover/orchestrator/errors', () => { it('rejects if too many l1 to l2 messages are provided', async () => { const l1ToL2Messages = new Array(NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP + 1).fill(new Fr(0n)); - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await expect( async () => await orchestrator.startNewCheckpoint( diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator_failures.test.ts b/yarn-project/prover-client/src/orchestrator/orchestrator_failures.test.ts index 0c127406b133..03896bab6784 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator_failures.test.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator_failures.test.ts @@ -54,7 +54,8 @@ describe('prover/orchestrator/failures', () => { ); const finalBlobChallenges = await context.getFinalBlobChallenges(); - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); for (let checkpointIndex = 0; checkpointIndex < checkpoints.length; checkpointIndex++) { const { constants, blocks, l1ToL2Messages, previousBlockHeader } = checkpoints[checkpointIndex]; diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator_lifecycle.test.ts b/yarn-project/prover-client/src/orchestrator/orchestrator_lifecycle.test.ts index d8838354136c..1bb9254ea4c6 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator_lifecycle.test.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator_lifecycle.test.ts @@ -48,7 +48,8 @@ describe('prover/orchestrator/lifecycle', () => { }); const finalBlobChallenges = await context.getFinalBlobChallenges(); - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await orchestrator.startNewCheckpoint( 0, // checkpointIndex @@ -92,7 +93,8 @@ describe('prover/orchestrator/lifecycle', () => { }); const finalBlobChallenges = await context.getFinalBlobChallenges(); - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await orchestrator.startNewCheckpoint(0, constants, [], 1, previousBlockHeader); diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator_multiple_checkpoints.test.ts b/yarn-project/prover-client/src/orchestrator/orchestrator_multiple_checkpoints.test.ts index 0763a535ff45..dd576b1cb328 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator_multiple_checkpoints.test.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator_multiple_checkpoints.test.ts @@ -35,7 +35,8 @@ describe('prover/orchestrator/multi-checkpoints', () => { logger.info(`Starting new epoch with ${numCheckpoints} checkpoints`); const finalBlobChallenges = await context.getFinalBlobChallenges(); - context.orchestrator.startNewEpoch(EpochNumber(1), numCheckpoints, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(1)); + await context.orchestrator.setEpochStructure(numCheckpoints, finalBlobChallenges); for (let i = 0; i < checkpoints.length; i++) { const { @@ -90,7 +91,8 @@ describe('prover/orchestrator/multi-checkpoints', () => { const epochNumber = epochIndex + 1; const { checkpoints, finalBlobChallenges } = epochs[epochIndex]; logger.info(`Starting epoch ${epochNumber} with ${checkpoints.length} checkpoints`); - context.orchestrator.startNewEpoch(EpochNumber(epochNumber), checkpoints.length, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(epochNumber)); + await context.orchestrator.setEpochStructure(checkpoints.length, finalBlobChallenges); for (let i = 0; i < checkpoints.length; i++) { const { diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator_rollup_structure.test.ts b/yarn-project/prover-client/src/orchestrator/orchestrator_rollup_structure.test.ts index 5ca5d0bcbe66..8e85b163b760 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator_rollup_structure.test.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator_rollup_structure.test.ts @@ -122,7 +122,8 @@ describe('prover/orchestrator/rollup-structure', () => { }); const finalBlobChallenges = await context.getFinalBlobChallenges(); - context.orchestrator.startNewEpoch(EpochNumber(1) /* epochNumber */, numCheckpoints, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(1) /* epochNumber */); + await context.orchestrator.setEpochStructure(numCheckpoints, finalBlobChallenges); for (let checkpointIndex = 0; checkpointIndex < checkpoints.length; checkpointIndex++) { const { constants, blocks, l1ToL2Messages, previousBlockHeader } = checkpoints[checkpointIndex]; @@ -198,7 +199,8 @@ describe('prover/orchestrator/rollup-structure', () => { }); const finalBlobChallenges = await context.getFinalBlobChallenges(); - context.orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(1)); + await context.orchestrator.setEpochStructure(1, finalBlobChallenges); await context.orchestrator.startNewCheckpoint( 0, // checkpointIndex diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator_single_blocks.test.ts b/yarn-project/prover-client/src/orchestrator/orchestrator_single_blocks.test.ts index 79fd868b090e..0382d2b7b0fc 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator_single_blocks.test.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator_single_blocks.test.ts @@ -26,7 +26,8 @@ describe('prover/orchestrator/blocks', () => { } = await context.makeCheckpoint(1, { numTxsPerBlock: 0 }); const finalBlobChallenges = await context.getFinalBlobChallenges(); - context.orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(1)); + await context.orchestrator.setEpochStructure(1, finalBlobChallenges); await context.orchestrator.startNewCheckpoint( 0, // checkpointIndex @@ -52,7 +53,8 @@ describe('prover/orchestrator/blocks', () => { } = await context.makeCheckpoint(1, { numTxsPerBlock: 1 }); const finalBlobChallenges = await context.getFinalBlobChallenges(); - context.orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(1)); + await context.orchestrator.setEpochStructure(1, finalBlobChallenges); await context.orchestrator.startNewCheckpoint( 0, // checkpointIndex @@ -84,7 +86,8 @@ describe('prover/orchestrator/blocks', () => { }); const finalBlobChallenges = await context.getFinalBlobChallenges(); - context.orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(1)); + await context.orchestrator.setEpochStructure(1, finalBlobChallenges); await context.orchestrator.startNewCheckpoint( 0, // checkpointIndex diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator_single_checkpoint.test.ts b/yarn-project/prover-client/src/orchestrator/orchestrator_single_checkpoint.test.ts index be7a9b017843..a5d5e2d11618 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator_single_checkpoint.test.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator_single_checkpoint.test.ts @@ -34,7 +34,8 @@ describe('prover/orchestrator/single-checkpoint', () => { }); const finalBlobChallenges = await context.getFinalBlobChallenges(); - context.orchestrator.startNewEpoch(EpochNumber(1), numCheckpoints, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(1)); + await context.orchestrator.setEpochStructure(numCheckpoints, finalBlobChallenges); await context.orchestrator.startNewCheckpoint( 0, // checkpointIndex @@ -80,7 +81,8 @@ describe('prover/orchestrator/single-checkpoint', () => { }); const finalBlobChallenges = await context.getFinalBlobChallenges(); - context.orchestrator.startNewEpoch(EpochNumber(1), numCheckpoints, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(1)); + await context.orchestrator.setEpochStructure(numCheckpoints, finalBlobChallenges); await context.orchestrator.startNewCheckpoint( 0, // checkpointIndex diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator_workflow.test.ts b/yarn-project/prover-client/src/orchestrator/orchestrator_workflow.test.ts index 2ac46e75b7fc..781dac6bd83d 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator_workflow.test.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator_workflow.test.ts @@ -84,7 +84,8 @@ describe('prover/orchestrator', () => { }); const finalBlobChallenges = await context.getFinalBlobChallenges(); - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await orchestrator.startNewCheckpoint( 0, // checkpointIndex @@ -136,7 +137,8 @@ describe('prover/orchestrator', () => { } = await context.makeCheckpoint(numBlocks); const finalBlobChallenges = await context.getFinalBlobChallenges(); - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await orchestrator.startNewCheckpoint( 0, // checkpointIndex @@ -165,7 +167,8 @@ describe('prover/orchestrator', () => { } = await context.makeCheckpoint(numBlocks); const finalBlobChallenges = await context.getFinalBlobChallenges(); - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await orchestrator.startNewCheckpoint( 0, // checkpointIndex @@ -201,7 +204,8 @@ describe('prover/orchestrator', () => { }); const finalBlobChallenges = await context.getFinalBlobChallenges(); - orchestrator.startNewEpoch(EpochNumber(1), 1, finalBlobChallenges); + orchestrator.startNewEpoch(EpochNumber(1)); + await orchestrator.setEpochStructure(1, finalBlobChallenges); await orchestrator.startNewCheckpoint( 0, // checkpointIndex @@ -248,7 +252,8 @@ describe('prover/orchestrator', () => { ); const finalBlobChallenges = await context.getFinalBlobChallenges(); - context.orchestrator.startNewEpoch(EpochNumber(1), numCheckpoints, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(1)); + await context.orchestrator.setEpochStructure(numCheckpoints, finalBlobChallenges); // Start checkpoint in reverse order. for (let checkpointIndex = numCheckpoints - 1; checkpointIndex >= 0; checkpointIndex--) { @@ -276,6 +281,40 @@ describe('prover/orchestrator', () => { expect(epoch.proof).toBeDefined(); }); + it('completes epoch proof when setEpochStructure is called after block processing', async () => { + const numBlocks = 1; + const { + constants, + blocks: [{ header, txs }], + previousBlockHeader, + } = await context.makeCheckpoint(numBlocks); + + const finalBlobChallenges = await context.getFinalBlobChallenges(); + orchestrator.startNewEpoch(EpochNumber(1)); + // Do NOT call setEpochStructure yet — structure is deferred. + + await orchestrator.startNewCheckpoint( + 0, // checkpointIndex + constants, + [], + numBlocks, + previousBlockHeader, + ); + + const { blockNumber, timestamp } = header.globalVariables; + await orchestrator.startNewBlock(blockNumber, timestamp, txs.length); + + // Process txs and complete the block without epoch structure. + await orchestrator.addTxs(txs); + await orchestrator.setBlockCompleted(blockNumber); + + // NOW set the epoch structure — this triggers accumulation and unblocks checkpoint root + root rollup. + await orchestrator.setEpochStructure(1, finalBlobChallenges); + + const result = await orchestrator.finalizeEpoch(); + expect(result.proof).toBeDefined(); + }); + it('can add checkpoints asynchronously', async () => { const numCheckpoints = 4; const numBlocksPerCheckpoint = 2; @@ -285,7 +324,8 @@ describe('prover/orchestrator', () => { ); const finalBlobChallenges = await context.getFinalBlobChallenges(); - context.orchestrator.startNewEpoch(EpochNumber(1), numCheckpoints, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(1)); + await context.orchestrator.setEpochStructure(numCheckpoints, finalBlobChallenges); await Promise.all( checkpoints.map(async (checkpoint, checkpointIndex) => { diff --git a/yarn-project/prover-client/src/prover-client/server-epoch-prover.ts b/yarn-project/prover-client/src/prover-client/server-epoch-prover.ts index dd1715757d6c..94237ee55810 100644 --- a/yarn-project/prover-client/src/prover-client/server-epoch-prover.ts +++ b/yarn-project/prover-client/src/prover-client/server-epoch-prover.ts @@ -18,13 +18,15 @@ export class ServerEpochProver implements EpochProver { private orchestrator: ProvingOrchestrator, ) {} - startNewEpoch( - epochNumber: EpochNumber, + startNewEpoch(epochNumber: EpochNumber): void { + this.orchestrator.startNewEpoch(epochNumber); + this.facade.start(); + } + setEpochStructure( totalNumCheckpoints: number, finalBlobBatchingChallenges: FinalBlobBatchingChallenges, - ): void { - this.orchestrator.startNewEpoch(epochNumber, totalNumCheckpoints, finalBlobBatchingChallenges); - this.facade.start(); + ): Promise { + return this.orchestrator.setEpochStructure(totalNumCheckpoints, finalBlobBatchingChallenges); } startNewCheckpoint( checkpointIndex: number, diff --git a/yarn-project/prover-client/src/test/bb_prover_full_rollup.test.ts b/yarn-project/prover-client/src/test/bb_prover_full_rollup.test.ts index bc66a883cc0c..5c1b98080247 100644 --- a/yarn-project/prover-client/src/test/bb_prover_full_rollup.test.ts +++ b/yarn-project/prover-client/src/test/bb_prover_full_rollup.test.ts @@ -53,7 +53,8 @@ describe('prover/bb_prover/full-rollup', () => { ); const finalBlobChallenges = await context.getFinalBlobChallenges(); - context.orchestrator.startNewEpoch(EpochNumber(1), numCheckpoints, finalBlobChallenges); + context.orchestrator.startNewEpoch(EpochNumber(1)); + await context.orchestrator.setEpochStructure(numCheckpoints, finalBlobChallenges); for (let checkpointIndex = 0; checkpointIndex < numCheckpoints; checkpointIndex++) { const { constants, blocks, l1ToL2Messages, previousBlockHeader } = checkpoints[checkpointIndex]; diff --git a/yarn-project/prover-node/src/actions/rerun-epoch-proving-job.ts b/yarn-project/prover-node/src/actions/rerun-epoch-proving-job.ts index 2108d5a09342..570b9d4c5cbb 100644 --- a/yarn-project/prover-node/src/actions/rerun-epoch-proving-job.ts +++ b/yarn-project/prover-node/src/actions/rerun-epoch-proving-job.ts @@ -34,7 +34,6 @@ export async function rerunEpochProvingJob( const publicProcessorFactory = new PublicProcessorFactory(archiver, undefined, undefined, log.getBindings()); const publisher = { submitEpochProof: () => Promise.resolve(true) }; - const l2BlockSourceForReorgDetection = undefined; const deadline = undefined; // This starts a local proving broker that does not get exposed as a service. This should be good enough for @@ -44,18 +43,32 @@ export async function rerunEpochProvingJob( const prover = await createProverClient(config, worldState, broker, telemetry); const provingJob = new EpochProvingJob( - jobData, + jobData.epochNumber, worldState, prover.createEpochProver(), publicProcessorFactory, publisher, - l2BlockSourceForReorgDetection, metrics, deadline, - { skipEpochCheck: true }, + {}, + undefined, // submissionGate — not needed for reruns. log.getBindings(), ); + // Push all checkpoints and mark epoch complete. + const lastBlocks = jobData.checkpoints.map(checkpoint => checkpoint.blocks.at(-1)!); + const previousBlockHeaders = [jobData.previousBlockHeader, ...lastBlocks.map(block => block.header).slice(0, -1)]; + for (let i = 0; i < jobData.checkpoints.length; i++) { + const checkpoint = jobData.checkpoints[i]; + provingJob.addCheckpoint( + checkpoint, + jobData.l1ToL2Messages[checkpoint.number] ?? [], + previousBlockHeaders[i], + jobData.txs, + ); + } + provingJob.setEpochComplete(jobData.attestations); + log.info(`Rerunning epoch proving job for epoch ${jobData.epochNumber}`); await provingJob.run(); log.info(`Completed job for epoch ${jobData.epochNumber} with status ${provingJob.getState()}`); diff --git a/yarn-project/prover-node/src/config.ts b/yarn-project/prover-node/src/config.ts index dac85d6f3f45..f4116e9aa269 100644 --- a/yarn-project/prover-node/src/config.ts +++ b/yarn-project/prover-node/src/config.ts @@ -47,6 +47,8 @@ export type SpecificProverNodeConfig = { proverNodeFailedEpochStore: string | undefined; proverNodeEpochProvingDelayMs: number | undefined; proverNodeDisableProofPublish?: boolean; + /** Enable optimistic proving: start block-level proving during epoch instead of waiting for completion. */ + proverNodeOptimisticProcessing: boolean; txGatheringTimeoutMs: number; txGatheringIntervalMs: number; txGatheringBatchSize: number; @@ -103,6 +105,11 @@ const specificProverNodeConfigMappings: ConfigMappingsType = { diff --git a/yarn-project/prover-node/src/factory.ts b/yarn-project/prover-node/src/factory.ts index 6f74ee485d3c..c0d6865f9472 100644 --- a/yarn-project/prover-node/src/factory.ts +++ b/yarn-project/prover-node/src/factory.ts @@ -29,7 +29,6 @@ import { createWorldStateSynchronizer } from '@aztec/world-state'; import { createPublicClient, fallback, http } from 'viem'; import { type ProverNodeConfig, createKeyStoreForProver } from './config.js'; -import { EpochMonitor } from './monitors/epoch-monitor.js'; import { ProverNode } from './prover-node.js'; import { ProverPublisherFactory } from './prover-publisher-factory.js'; @@ -195,18 +194,13 @@ export async function createProverNode( 'txGatheringTimeoutMs', 'proverNodeFailedEpochStore', 'proverNodeDisableProofPublish', + 'proverNodeOptimisticProcessing', 'dataDirectory', 'l1ChainId', 'rollupVersion', ), }; - const epochMonitor = await EpochMonitor.create( - archiver, - { pollingIntervalMs: config.proverNodePollingIntervalMs, provingDelayMs: config.proverNodeEpochProvingDelayMs }, - telemetry, - ); - const l1Metrics = new L1Metrics( telemetry.getMeter('ProverNodeL1Metrics'), publicClient, @@ -221,7 +215,6 @@ export async function createProverNode( archiver, worldStateSynchronizer, p2pClient, - epochMonitor, rollupContract, l1Metrics, proverNodeConfig, diff --git a/yarn-project/prover-node/src/job/epoch-proving-job.test.ts b/yarn-project/prover-node/src/job/epoch-proving-job.test.ts index 3f572901e304..43cbed011e71 100644 --- a/yarn-project/prover-node/src/job/epoch-proving-job.test.ts +++ b/yarn-project/prover-node/src/job/epoch-proving-job.test.ts @@ -1,14 +1,15 @@ import { BatchedBlob } from '@aztec/blob-lib/types'; -import { BlockNumber, CheckpointNumber, EpochNumber } from '@aztec/foundation/branded-types'; +import { CheckpointNumber, EpochNumber } from '@aztec/foundation/branded-types'; import { fromEntries, times, timesParallel } from '@aztec/foundation/collection'; import { EthAddress } from '@aztec/foundation/eth-address'; import { toArray } from '@aztec/foundation/iterable'; +import { promiseWithResolvers } from '@aztec/foundation/promise'; +import { retryUntil } from '@aztec/foundation/retry'; import { sleep } from '@aztec/foundation/sleep'; import type { PublicProcessor, PublicProcessorFactory } from '@aztec/simulator/server'; import { PublicSimulatorConfig } from '@aztec/stdlib/avm'; -import { CommitteeAttestation, type L2BlockSource } from '@aztec/stdlib/block'; -import { Checkpoint, PublishedCheckpoint } from '@aztec/stdlib/checkpoint'; -import type { L1RollupConstants } from '@aztec/stdlib/epoch-helpers'; +import { CommitteeAttestation } from '@aztec/stdlib/block'; +import { Checkpoint } from '@aztec/stdlib/checkpoint'; import type { EpochProver, MerkleTreeWriteOperations, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; import { Proof } from '@aztec/stdlib/proofs'; import { RootRollupPublicInputs } from '@aztec/stdlib/rollup'; @@ -20,14 +21,12 @@ import { type MockProxy, mock } from 'jest-mock-extended'; import { ProverNodeJobMetrics } from '../metrics.js'; import type { ProverNodePublisher } from '../prover-node-publisher.js'; -import type { EpochProvingJobData } from './epoch-proving-job-data.js'; import { EpochProvingJob } from './epoch-proving-job.js'; describe('epoch-proving-job', () => { // Dependencies let prover: MockProxy; let publisher: MockProxy; - let l2BlockSource: MockProxy; let worldState: MockProxy; let publicProcessorFactory: MockProxy; let metrics: ProverNodeJobMetrics; @@ -54,34 +53,44 @@ describe('epoch-proving-job', () => { const proverId = EthAddress.random(); // Subject factory - const createJob = (opts: { deadline?: Date; parallelBlockLimit?: number; skipSubmitProof?: boolean } = {}) => { + const createJob = ( + opts: { + deadline?: Date; + parallelBlockLimit?: number; + skipSubmitProof?: boolean; + submissionGate?: Promise; + } = {}, + ) => { const txsMap = new Map(txs.map(tx => [tx.getTxHash().toString(), tx])); + const l1ToL2Messages: Record = fromEntries(checkpoints.map(c => [c.number, []])); - const data: EpochProvingJobData = { - checkpoints, - txs: txsMap, - epochNumber: EpochNumber(epochNumber), - l1ToL2Messages: fromEntries(checkpoints.map(c => [c.number, []])), - previousBlockHeader: initialHeader, - attestations, - }; - return new EpochProvingJob( - data, + const job = new EpochProvingJob( + EpochNumber(epochNumber), worldState, prover, publicProcessorFactory, publisher, - l2BlockSource, metrics, opts.deadline, { parallelBlockLimit: opts.parallelBlockLimit ?? 32, skipSubmitProof: opts.skipSubmitProof }, + opts.submissionGate, ); + + // Push checkpoints and mark epoch complete. + const lastBlocks = checkpoints.map(checkpoint => checkpoint.blocks.at(-1)!); + const previousBlockHeaders = [initialHeader, ...lastBlocks.map(block => block.header).slice(0, -1)]; + for (let i = 0; i < checkpoints.length; i++) { + const checkpoint = checkpoints[i]; + job.addCheckpoint(checkpoint, l1ToL2Messages[checkpoint.number] ?? [], previousBlockHeaders[i], txsMap); + } + job.setEpochComplete(attestations); + + return job; }; beforeEach(async () => { prover = mock(); publisher = mock(); - l2BlockSource = mock(); worldState = mock(); publicProcessorFactory = mock(); db = mock(); @@ -114,14 +123,6 @@ describe('epoch-proving-job', () => { const txHashes = checkpoints.map(c => c.blocks.map(b => b.body.txEffects.map(tx => tx.txHash))).flat(2); txs = txHashes.map(txHash => ({ txHash, getTxHash: () => txHash }) as Tx); - l2BlockSource.getBlockHeader.mockResolvedValue(initialHeader); - l2BlockSource.getL1Constants.mockResolvedValue({ ethereumSlotDuration: 0.1 } as L1RollupConstants); - l2BlockSource.getCheckpointedBlockHeadersForEpoch.mockResolvedValue( - checkpoints.map(c => c.blocks.map(b => b.header)).flat(), - ); - l2BlockSource.getCheckpoints.mockResolvedValue([ - { checkpoint: checkpoints.at(-1)!, attestations } as PublishedCheckpoint, - ]); publicProcessorFactory.create.mockReturnValue(publicProcessor); db.getInitialHeader.mockReturnValue(initialHeader); worldState.fork.mockResolvedValue(db); @@ -217,16 +218,107 @@ describe('epoch-proving-job', () => { expect(publisher.submitEpochProof).not.toHaveBeenCalled(); }); - it('halts if a new block for the epoch is found', async () => { - const newHeaders = times(NUM_BLOCKS + 1, i => BlockHeader.random({ blockNumber: BlockNumber(i + 1) })); - l2BlockSource.getCheckpointedBlockHeadersForEpoch.mockResolvedValue(newHeaders); + it('processes checkpoints pushed after run starts', async () => { + const txsMap = new Map(txs.map(tx => [tx.getTxHash().toString(), tx])); + const l1ToL2Messages: Record = fromEntries(checkpoints.map(c => [c.number, []])); - const job = createJob(); - await job.run(); + const job = new EpochProvingJob( + EpochNumber(epochNumber), + worldState, + prover, + publicProcessorFactory, + publisher, + metrics, + undefined, // deadline + { parallelBlockLimit: 32 }, + ); + + // Start run() — it calls startNewEpoch() then blocks on epochCompleteResolver. + const runPromise = job.run(); + + // Give run() a tick to execute past startNewEpoch() and hit the await. + await sleep(10); + + // Push checkpoints while run() is waiting on epochCompleteResolver. + const lastBlocks = checkpoints.map(checkpoint => checkpoint.blocks.at(-1)!); + const previousBlockHeaders = [initialHeader, ...lastBlocks.map(block => block.header).slice(0, -1)]; + for (let i = 0; i < checkpoints.length; i++) { + const checkpoint = checkpoints[i]; + job.addCheckpoint(checkpoint, l1ToL2Messages[checkpoint.number] ?? [], previousBlockHeaders[i], txsMap); + } + + // Signal epoch complete — unblocks run()'s first await. + job.setEpochComplete(attestations); + + // run() now waits for Promise.all(checkpointProcessingPromises), then finalizes. + await runPromise; + + expect(job.getState()).toEqual('completed'); + // Verify startNewEpoch was called (happens in run() before waiting). + expect(prover.startNewEpoch).toHaveBeenCalled(); + // Verify all checkpoints were processed. + expect(prover.startNewCheckpoint).toHaveBeenCalledTimes(NUM_CHECKPOINTS); + expect(publisher.submitEpochProof).toHaveBeenCalled(); + }); + + it('waits for slow checkpoint processing after epoch marked complete', async () => { + // Make block processing slow so checkpoints take time to finish. + prover.startNewBlock.mockImplementation(() => sleep(500)); + + const txsMap = new Map(txs.map(tx => [tx.getTxHash().toString(), tx])); + const l1ToL2Messages: Record = fromEntries(checkpoints.map(c => [c.number, []])); + + const job = new EpochProvingJob( + EpochNumber(epochNumber), + worldState, + prover, + publicProcessorFactory, + publisher, + metrics, + undefined, + { parallelBlockLimit: 32 }, + ); + + const runPromise = job.run(); + await sleep(10); + + // Push one checkpoint — starts slow processing. + const lastBlocks = checkpoints.map(checkpoint => checkpoint.blocks.at(-1)!); + const previousBlockHeaders = [initialHeader, ...lastBlocks.map(block => block.header).slice(0, -1)]; + job.addCheckpoint(checkpoints[0], l1ToL2Messages[checkpoints[0].number] ?? [], previousBlockHeaders[0], txsMap); + + // Epoch complete signal arrives while checkpoint is still processing. + job.setEpochComplete(attestations); + + // run() should NOT finalize until checkpoint processing completes. + await runPromise; + + expect(job.getState()).toEqual('completed'); + expect(prover.finalizeEpoch).toHaveBeenCalled(); + expect(publisher.submitEpochProof).toHaveBeenCalled(); + }); + + it('stops gracefully when waiting for epoch completion', async () => { + const job = new EpochProvingJob( + EpochNumber(epochNumber), + worldState, + prover, + publicProcessorFactory, + publisher, + metrics, + undefined, + { parallelBlockLimit: 32 }, + ); - expect(job.getState()).toEqual('reorg'); + // run() blocks on epochCompleteResolver. + void job.run(); + await sleep(50); + + // stop() resolves epochCompleteResolver to unblock run(). + await job.stop(); + + expect(job.getState()).toEqual('stopped'); expect(publisher.submitEpochProof).not.toHaveBeenCalled(); - expect(prover.cancel).toHaveBeenCalled(); }); it('skips publishing when skipSubmitProof is enabled', async () => { @@ -237,4 +329,78 @@ describe('epoch-proving-job', () => { expect(prover.finalizeEpoch).toHaveBeenCalled(); expect(publisher.submitEpochProof).not.toHaveBeenCalled(); }); + + it('awaits submission gate before publishing proof', async () => { + const gate = promiseWithResolvers(); + const job = createJob({ submissionGate: gate.promise }); + + void job.run(); + + // Wait for the job to reach 'awaiting-submission'. + await retryUntil(() => job.getState() === 'awaiting-submission', 'awaiting-submission', 5); + expect(job.getState()).toEqual('awaiting-submission'); + expect(publisher.submitEpochProof).not.toHaveBeenCalled(); + + // Resolve gate → job should complete. + gate.resolve(); + await retryUntil(() => job.getState() === 'completed', 'completed', 5); + expect(job.getState()).toEqual('completed'); + expect(publisher.submitEpochProof).toHaveBeenCalled(); + }); + + it('stops cleanly when awaiting submission gate', async () => { + const gate = promiseWithResolvers(); + const job = createJob({ submissionGate: gate.promise }); + + void job.run(); + + // Wait for the job to reach 'awaiting-submission'. + await retryUntil(() => job.getState() === 'awaiting-submission', 'awaiting-submission', 5); + expect(job.getState()).toEqual('awaiting-submission'); + + // Stop should not deadlock — stopResolver unblocks the gate race. + await job.stop(); + expect(job.getState()).toEqual('stopped'); + }); + + it('times out while awaiting submission gate', async () => { + const gate = promiseWithResolvers(); + const deadline = new Date(Date.now() + 100); + const job = createJob({ submissionGate: gate.promise, deadline }); + + await job.run(); + + expect(job.getState()).toEqual('timed-out'); + expect(publisher.submitEpochProof).not.toHaveBeenCalled(); + }); + + it('ignores duplicate checkpoint additions', async () => { + const txsMap = new Map(txs.map(tx => [tx.getTxHash().toString(), tx])); + + const job = new EpochProvingJob( + EpochNumber(epochNumber), + worldState, + prover, + publicProcessorFactory, + publisher, + metrics, + undefined, + { parallelBlockLimit: 32 }, + ); + + const runPromise = job.run(); + await sleep(10); + + // Add same checkpoint twice. + const previousHeader = initialHeader; + job.addCheckpoint(checkpoints[0], [], previousHeader, txsMap); + job.addCheckpoint(checkpoints[0], [], previousHeader, txsMap); + + job.setEpochComplete(attestations); + await runPromise; + + expect(job.getState()).toEqual('completed'); + // startNewCheckpoint should be called only once for the single unique checkpoint. + expect(prover.startNewCheckpoint).toHaveBeenCalledTimes(1); + }); }); diff --git a/yarn-project/prover-node/src/job/epoch-proving-job.ts b/yarn-project/prover-node/src/job/epoch-proving-job.ts index 0546940e4806..e2e8be89ec87 100644 --- a/yarn-project/prover-node/src/job/epoch-proving-job.ts +++ b/yarn-project/prover-node/src/job/epoch-proving-job.ts @@ -1,17 +1,16 @@ import { NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP } from '@aztec/constants'; -import { asyncPool } from '@aztec/foundation/async-pool'; import { BlockNumber, EpochNumber } from '@aztec/foundation/branded-types'; import { padArrayEnd } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/curves/bn254'; import { type Logger, type LoggerBindings, createLogger } from '@aztec/foundation/log'; -import { RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; +import { promiseWithResolvers } from '@aztec/foundation/promise'; import { Timer } from '@aztec/foundation/timer'; import { getVKTreeRoot } from '@aztec/noir-protocol-circuits-types/vk-tree'; import { protocolContractsHash } from '@aztec/protocol-contracts'; import { buildFinalBlobChallenges } from '@aztec/prover-client/helpers'; import type { PublicProcessor, PublicProcessorFactory } from '@aztec/simulator/server'; import { PublicSimulatorConfig } from '@aztec/stdlib/avm'; -import type { L2Block, L2BlockSource } from '@aztec/stdlib/block'; +import type { CommitteeAttestation, L2Block } from '@aztec/stdlib/block'; import type { Checkpoint } from '@aztec/stdlib/checkpoint'; import { type EpochProver, @@ -21,25 +20,33 @@ import { } from '@aztec/stdlib/interfaces/server'; import { CheckpointConstantData } from '@aztec/stdlib/rollup'; import { MerkleTreeId } from '@aztec/stdlib/trees'; -import type { ProcessedTx, Tx } from '@aztec/stdlib/tx'; +import type { BlockHeader, ProcessedTx, Tx } from '@aztec/stdlib/tx'; import { Attributes, type Traceable, type Tracer, trackSpan } from '@aztec/telemetry-client'; import * as crypto from 'node:crypto'; import type { ProverNodeJobMetrics } from '../metrics.js'; import type { ProverNodePublisher } from '../prover-node-publisher.js'; -import { type EpochProvingJobData, validateEpochProvingJobData } from './epoch-proving-job-data.js'; +import type { EpochProvingJobData } from './epoch-proving-job-data.js'; export type EpochProvingJobOptions = { parallelBlockLimit?: number; - skipEpochCheck?: boolean; skipSubmitProof?: boolean; }; +/** Data needed to process a single checkpoint within an epoch proving job. */ +type CheckpointData = { + checkpoint: Checkpoint; + txs: Map; + l1ToL2Messages: Fr[]; + previousBlockHeader: BlockHeader; +}; + /** - * Job that grabs a range of blocks from the unfinalized chain from L1, gets their txs given their hashes, - * re-executes their public calls, generates a rollup proof, and submits it to L1. This job will update the - * world state as part of public call execution via the public processor. + * Job that proves an epoch and submits the proof to L1. Supports both push-based (optimistic) + * and batch modes. In push-based mode, checkpoints are added via addCheckpoint() as they arrive + * during the epoch, and setEpochComplete() signals finalization. In batch mode, all data is + * provided upfront via addCheckpoint() followed immediately by setEpochComplete(). */ export class EpochProvingJob implements Traceable { private state: EpochProvingJobState = 'initialized'; @@ -47,28 +54,41 @@ export class EpochProvingJob implements Traceable { private uuid: string; private runPromise: Promise | undefined; - private epochCheckPromise: RunningPromise | undefined; private deadlineTimeoutHandler: NodeJS.Timeout | undefined; + /** Promises for all checkpoint processing tasks. */ + private checkpointProcessingPromises: Promise[] = []; + /** Successfully processed checkpoints, in order they completed. */ + private processedCheckpoints: Checkpoint[] = []; + /** Resolves when the epoch is complete and we know all checkpoints. */ + private epochCompleteResolver = promiseWithResolvers<{ attestations: CommitteeAttestation[] }>(); + + /** Resolves to unblock the submission gate on stop/timeout. */ + private stopResolver = promiseWithResolvers(); + + /** Tracks the next expected checkpoint index. */ + private nextCheckpointIndex = 0; + /** Checkpoint numbers already added, for dedup. */ + private addedCheckpointNumbers: Set = new Set(); + public readonly tracer: Tracer; constructor( - private data: EpochProvingJobData, + private readonly epochNumber: EpochNumber, private dbProvider: Pick, private prover: EpochProver, private publicProcessorFactory: PublicProcessorFactory, private publisher: Pick, - private l2BlockSource: L2BlockSource | undefined, private metrics: ProverNodeJobMetrics, private deadline: Date | undefined, private config: EpochProvingJobOptions, + private submissionGate?: Promise, bindings?: LoggerBindings, ) { - validateEpochProvingJobData(data); this.uuid = crypto.randomUUID(); this.log = createLogger('prover-node:epoch-proving-job', { ...bindings, - instanceId: `epoch-${data.epochNumber}`, + instanceId: `epoch-${epochNumber}`, }); this.tracer = metrics.tracer; } @@ -82,62 +102,93 @@ export class EpochProvingJob implements Traceable { } public getEpochNumber(): EpochNumber { - return this.data.epochNumber; + return this.epochNumber; } public getDeadline(): Date | undefined { return this.deadline; } + /** + * Returns proving data for failure upload. Collects all processed checkpoints into the legacy format. + * Note: This may be incomplete if the job failed before all checkpoints were processed. + */ public getProvingData(): EpochProvingJobData { - return this.data; + const checkpoints = this.processedCheckpoints.sort((a, b) => a.number - b.number); + const txs = new Map(); + const l1ToL2Messages: Record = {}; + + // We don't have perfect data reconstruction — this is best-effort for debugging. + return { + epochNumber: this.epochNumber, + checkpoints, + txs, + l1ToL2Messages, + previousBlockHeader: undefined as any, // May not be available + attestations: [], + }; } - private get epochNumber() { - return this.data.epochNumber; - } + /** + * Called by ProverNode when a new checkpoint arrives for this epoch. + * Gathers checkpoint data and starts processing immediately. + */ + addCheckpoint( + checkpoint: Checkpoint, + l1ToL2Messages: Fr[], + previousBlockHeader: BlockHeader, + txs: Map, + ): void { + if (this.addedCheckpointNumbers.has(checkpoint.number)) { + this.log.warn(`Duplicate checkpoint ${checkpoint.number} ignored`, { uuid: this.uuid }); + return; + } + this.addedCheckpointNumbers.add(checkpoint.number); + const checkpointIndex = this.nextCheckpointIndex++; + this.log.verbose(`Adding checkpoint ${checkpoint.number} (index ${checkpointIndex}) for processing`, { + uuid: this.uuid, + checkpointNumber: checkpoint.number, + }); - private get checkpoints() { - return this.data.checkpoints; - } + const data: CheckpointData = { + checkpoint, + txs, + l1ToL2Messages, + previousBlockHeader, + }; - private get txs() { - return this.data.txs; + const promise = this.processCheckpoint(checkpointIndex, data).catch(err => { + if (err && err.name === 'HaltExecutionError') { + return; + } + this.log.error(`Error processing checkpoint ${checkpoint.number}`, err, { uuid: this.uuid }); + throw err; + }); + this.checkpointProcessingPromises.push(promise); } - private get attestations() { - return this.data.attestations; + /** + * Called by ProverNode when epoch is complete. + * @param attestations - The attestations for the last checkpoint. + */ + setEpochComplete(attestations: CommitteeAttestation[]): void { + this.log.verbose(`Epoch ${this.epochNumber} marked complete`, { uuid: this.uuid }); + this.epochCompleteResolver.resolve({ attestations }); } /** - * Proves the given epoch and submits the proof to L1. + * Proves the epoch and submits the proof to L1. + * Waits for epoch completion signal and all checkpoint processing to finish, + * then finalizes the epoch proof. */ @trackSpan('EpochProvingJob.run', function () { - return { [Attributes.EPOCH_NUMBER]: this.data.epochNumber }; + return { [Attributes.EPOCH_NUMBER]: this.epochNumber }; }) public async run() { this.scheduleDeadlineStop(); - if (!this.config.skipEpochCheck) { - await this.scheduleEpochCheck(); - } - const attestations = this.attestations.map(attestation => attestation.toViem()); const epochNumber = this.epochNumber; - const epochSizeCheckpoints = this.checkpoints.length; - const epochSizeBlocks = this.checkpoints.reduce((accum, checkpoint) => accum + checkpoint.blocks.length, 0); - const epochSizeTxs = this.checkpoints.reduce( - (accum, checkpoint) => - accum + checkpoint.blocks.reduce((accumC, block) => accumC + block.body.txEffects.length, 0), - 0, - ); - const fromCheckpoint = this.checkpoints[0].number; - const toCheckpoint = this.checkpoints.at(-1)!.number; - const fromBlock = this.checkpoints[0].blocks[0].number; - const toBlock = this.checkpoints.at(-1)!.blocks.at(-1)!.number; - this.log.info(`Starting epoch ${epochNumber} proving job with checkpoints ${fromCheckpoint} to ${toCheckpoint}`, { - fromBlock, - toBlock, - epochSizeTxs, + this.log.info(`Starting epoch ${epochNumber} proving job`, { epochNumber, uuid: this.uuid, }); @@ -148,103 +199,56 @@ export class EpochProvingJob implements Traceable { this.runPromise = promise; try { - const blobFieldsPerCheckpoint = this.checkpoints.map(checkpoint => checkpoint.toBlobFields()); - const finalBlobBatchingChallenges = await buildFinalBlobChallenges(blobFieldsPerCheckpoint); - - this.prover.startNewEpoch(epochNumber, epochSizeCheckpoints, finalBlobBatchingChallenges); - await this.prover.startChonkVerifierCircuits(Array.from(this.txs.values())); + this.prover.startNewEpoch(epochNumber); - // Everything in the epoch should have the same chainId and version. - const { chainId, version } = this.checkpoints[0].blocks[0].header.globalVariables; + // Wait for epoch completion signal. + const { attestations } = await this.epochCompleteResolver.promise; - const previousBlockHeaders = this.gatherPreviousBlockHeaders(); + // Wait for all checkpoint processing to finish. + await Promise.all(this.checkpointProcessingPromises); - await asyncPool(this.config.parallelBlockLimit ?? 32, this.checkpoints, async checkpoint => { - this.checkState(); - - const checkpointIndex = checkpoint.number - fromCheckpoint; - const checkpointConstants = CheckpointConstantData.from({ - chainId, - version, - vkTreeRoot: getVKTreeRoot(), - protocolContractsHash: protocolContractsHash, - proverId: this.prover.getProverId().toField(), - slotNumber: checkpoint.header.slotNumber, - coinbase: checkpoint.header.coinbase, - feeRecipient: checkpoint.header.feeRecipient, - gasFees: checkpoint.header.gasFees, - }); - const previousHeader = previousBlockHeaders[checkpointIndex]; - const l1ToL2Messages = this.getL1ToL2Messages(checkpoint); - - this.log.verbose(`Starting processing checkpoint ${checkpoint.number}`, { - number: checkpoint.number, - checkpointHash: checkpoint.hash().toString(), - lastArchive: checkpoint.header.lastArchiveRoot, - previousHeader: previousHeader.hash(), - uuid: this.uuid, - }); - - await this.prover.startNewCheckpoint( - checkpointIndex, - checkpointConstants, - l1ToL2Messages, - checkpoint.blocks.length, - previousHeader, - ); + // === Phase 2: Finalize === + const allCheckpoints = this.processedCheckpoints.sort((a, b) => a.number - b.number); + const epochSizeCheckpoints = allCheckpoints.length; + const epochSizeBlocks = allCheckpoints.reduce((accum, cp) => accum + cp.blocks.length, 0); + const epochSizeTxs = allCheckpoints.reduce( + (accum, cp) => accum + cp.blocks.reduce((accumC, block) => accumC + block.body.txEffects.length, 0), + 0, + ); + const fromCheckpoint = allCheckpoints[0].number; + const toCheckpoint = allCheckpoints.at(-1)!.number; - for (const block of checkpoint.blocks) { - const globalVariables = block.header.globalVariables; - const txs = this.getTxs(block); - - this.log.verbose(`Starting processing block ${block.number}`, { - number: block.number, - blockHash: (await block.hash()).toString(), - lastArchive: block.header.lastArchive.root, - noteHashTreeRoot: block.header.state.partial.noteHashTree.root, - nullifierTreeRoot: block.header.state.partial.nullifierTree.root, - publicDataTreeRoot: block.header.state.partial.publicDataTree.root, - ...globalVariables, - numTxs: txs.length, - }); - - // Start block proving - await this.prover.startNewBlock(block.number, globalVariables.timestamp, txs.length); - - // Process public fns - const db = await this.createFork(BlockNumber(block.number - 1), l1ToL2Messages); - const config = PublicSimulatorConfig.from({ - proverId: this.prover.getProverId().toField(), - skipFeeEnforcement: false, - collectDebugLogs: false, - collectHints: true, - collectPublicInputs: true, - collectStatistics: false, - }); - const publicProcessor = this.publicProcessorFactory.create(db, globalVariables, config); - const processed = await this.processTxs(publicProcessor, txs); - await this.prover.addTxs(processed); - await db.close(); - this.log.verbose(`Processed all ${txs.length} txs for block ${block.number}`, { - blockNumber: block.number, - blockHash: (await block.hash()).toString(), - uuid: this.uuid, - }); - - // Mark block as completed to pad it - const expectedBlockHeader = block.header; - await this.prover.setBlockCompleted(block.number, expectedBlockHeader); - } + this.log.info(`All ${epochSizeCheckpoints} checkpoints processed. Setting epoch structure and finalizing.`, { + epochNumber, + fromCheckpoint, + toCheckpoint, + epochSizeBlocks, + epochSizeTxs, + uuid: this.uuid, }); + // Compute final blob challenges and set epoch structure. + const blobFieldsPerCheckpoint = allCheckpoints.map(checkpoint => checkpoint.toBlobFields()); + const finalBlobBatchingChallenges = await buildFinalBlobChallenges(blobFieldsPerCheckpoint); + await this.prover.setEpochStructure(epochSizeCheckpoints, finalBlobBatchingChallenges); + const executionTime = timer.ms(); this.progressState('awaiting-prover'); const { publicInputs, proof, batchedBlobInputs } = await this.prover.finalizeEpoch(); this.log.info(`Finalized proof for epoch ${epochNumber}`, { epochNumber, uuid: this.uuid, duration: timer.ms() }); + if (this.submissionGate) { + this.progressState('awaiting-submission'); + this.log.verbose(`Awaiting submission gate for epoch ${epochNumber}`, { uuid: this.uuid }); + await Promise.race([this.submissionGate, this.stopResolver.promise]); + this.checkState(); + } + this.progressState('publishing-proof'); + const viemAttestations = attestations.map(a => a.toViem()); + if (this.config.skipSubmitProof) { this.log.info( `Proof publishing is disabled. Dropping valid proof for epoch ${epochNumber} (checkpoints ${fromCheckpoint} to ${toCheckpoint})`, @@ -261,7 +265,7 @@ export class EpochProvingJob implements Traceable { publicInputs, proof, batchedBlobInputs, - attestations, + attestations: viemAttestations, }); if (!success) { throw new Error('Failed to submit epoch proof to L1'); @@ -283,17 +287,112 @@ export class EpochProvingJob implements Traceable { return; } this.log.error(`Error running epoch ${epochNumber} prover job`, err, { uuid: this.uuid, epochNumber }); - if (this.state === 'processing' || this.state === 'awaiting-prover' || this.state === 'publishing-proof') { + if ( + this.state === 'processing' || + this.state === 'awaiting-prover' || + this.state === 'awaiting-submission' || + this.state === 'publishing-proof' + ) { this.state = 'failed'; } } finally { clearTimeout(this.deadlineTimeoutHandler); - await this.epochCheckPromise?.stop(); await this.prover.stop(); resolve(); } } + /** Processes a single checkpoint: starts chonk verifiers, processes each block. */ + private async processCheckpoint(checkpointIndex: number, data: CheckpointData) { + this.checkState(); + + const { checkpoint, txs, l1ToL2Messages, previousBlockHeader } = data; + + const { chainId, version } = checkpoint.blocks[0].header.globalVariables; + const checkpointConstants = CheckpointConstantData.from({ + chainId, + version, + vkTreeRoot: getVKTreeRoot(), + protocolContractsHash: protocolContractsHash, + proverId: this.prover.getProverId().toField(), + slotNumber: checkpoint.header.slotNumber, + coinbase: checkpoint.header.coinbase, + feeRecipient: checkpoint.header.feeRecipient, + gasFees: checkpoint.header.gasFees, + }); + + this.log.verbose(`Starting processing checkpoint ${checkpoint.number}`, { + number: checkpoint.number, + checkpointHash: checkpoint.hash().toString(), + lastArchive: checkpoint.header.lastArchiveRoot, + previousHeader: previousBlockHeader.hash(), + uuid: this.uuid, + }); + + await this.prover.startNewCheckpoint( + checkpointIndex, + checkpointConstants, + l1ToL2Messages, + checkpoint.blocks.length, + previousBlockHeader, + ); + + // Start chonk verifiers for this checkpoint's txs. + const allTxs = checkpoint.blocks.flatMap(block => + block.body.txEffects.map(txEffect => txs.get(txEffect.txHash.toString())!), + ); + await this.prover.startChonkVerifierCircuits(allTxs); + + for (const block of checkpoint.blocks) { + const globalVariables = block.header.globalVariables; + const blockTxs = this.getBlockTxs(block, txs); + + this.log.verbose(`Starting processing block ${block.number}`, { + number: block.number, + blockHash: (await block.hash()).toString(), + lastArchive: block.header.lastArchive.root, + noteHashTreeRoot: block.header.state.partial.noteHashTree.root, + nullifierTreeRoot: block.header.state.partial.nullifierTree.root, + publicDataTreeRoot: block.header.state.partial.publicDataTree.root, + ...globalVariables, + numTxs: blockTxs.length, + }); + + // Start block proving. + await this.prover.startNewBlock(block.number, globalVariables.timestamp, blockTxs.length); + + // Process public fns. + const db = await this.createFork(BlockNumber(block.number - 1), l1ToL2Messages); + const config = PublicSimulatorConfig.from({ + proverId: this.prover.getProverId().toField(), + skipFeeEnforcement: false, + collectDebugLogs: false, + collectHints: true, + collectPublicInputs: true, + collectStatistics: false, + }); + const publicProcessor = this.publicProcessorFactory.create(db, globalVariables, config); + const processed = await this.processTxs(publicProcessor, blockTxs); + await this.prover.addTxs(processed); + await db.close(); + this.log.verbose(`Processed all ${blockTxs.length} txs for block ${block.number}`, { + blockNumber: block.number, + blockHash: (await block.hash()).toString(), + uuid: this.uuid, + }); + + // Mark block as completed. + const expectedBlockHeader = block.header; + await this.prover.setBlockCompleted(block.number, expectedBlockHeader); + } + + this.processedCheckpoints.push(checkpoint); + } + + private getBlockTxs(block: L2Block, txs: Map): Tx[] { + return block.body.txEffects.map(txEffect => txs.get(txEffect.txHash.toString())!); + } + /** * Create a new db fork for tx processing, inserting all L1 to L2. * REFACTOR: The prover already spawns a db fork of its own for each block, so we may be able to do away with just one fork. @@ -328,6 +427,10 @@ export class EpochProvingJob implements Traceable { public async stop(state: EpochProvingJobTerminalState = 'stopped') { this.state = state; this.prover.cancel(); + // Resolve the stop resolver to unblock the submission gate if waiting. + this.stopResolver.resolve(); + // Resolve the epoch complete promise to unblock run() if waiting. + this.epochCompleteResolver.resolve({ attestations: [] }); if (this.runPromise) { await this.runPromise; } @@ -353,57 +456,6 @@ export class EpochProvingJob implements Traceable { } } - /** - * Kicks off a running promise that queries the archiver for the set of L2 blocks of the current epoch. - * If those change, stops the proving job with a `rerun` state, so the node re-enqueues it. - */ - private async scheduleEpochCheck() { - const l2BlockSource = this.l2BlockSource; - if (!l2BlockSource) { - this.log.warn(`No L2 block source available, skipping epoch check`); - return; - } - - const intervalMs = Math.ceil((await l2BlockSource.getL1Constants()).ethereumSlotDuration / 2) * 1000; - this.epochCheckPromise = new RunningPromise( - async () => { - const blockHeaders = await l2BlockSource.getCheckpointedBlockHeadersForEpoch(this.epochNumber); - const blockHashes = await Promise.all(blockHeaders.map(header => header.hash())); - const thisBlocks = this.checkpoints.flatMap(checkpoint => checkpoint.blocks); - const thisBlockHashes = await Promise.all(thisBlocks.map(block => block.hash())); - if ( - blockHeaders.length !== thisBlocks.length || - !blockHashes.every((block, i) => block.equals(thisBlockHashes[i])) - ) { - this.log.warn('Epoch blocks changed underfoot', { - uuid: this.uuid, - epochNumber: this.epochNumber, - oldBlockHashes: thisBlockHashes, - newBlockHashes: blockHashes, - }); - void this.stop('reorg'); - } - }, - this.log, - intervalMs, - ).start(); - this.log.verbose(`Scheduled epoch check for epoch ${this.epochNumber} every ${intervalMs}ms`); - } - - /* Returns the last block header in the previous checkpoint for all checkpoints in the epoch */ - private gatherPreviousBlockHeaders() { - const lastBlocks = this.checkpoints.map(checkpoint => checkpoint.blocks.at(-1)!); - return [this.data.previousBlockHeader, ...lastBlocks.map(block => block.header).slice(0, -1)]; - } - - private getTxs(block: L2Block): Tx[] { - return block.body.txEffects.map(txEffect => this.txs.get(txEffect.txHash.toString())!); - } - - private getL1ToL2Messages(checkpoint: Checkpoint) { - return this.data.l1ToL2Messages[checkpoint.number]; - } - private async processTxs(publicProcessor: PublicProcessor, txs: Tx[]): Promise { const { deadline } = this; const [processedTxs, failedTxs] = await publicProcessor.process(txs, { deadline }); diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index 4b9c28d2c8b7..08723a47b6ee 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -1,15 +1,14 @@ import { GENESIS_BLOCK_HEADER_HASH } from '@aztec/constants'; import { RollupContract } from '@aztec/ethereum/contracts'; -import { BlockNumber, CheckpointNumber, EpochNumber } from '@aztec/foundation/branded-types'; +import { BlockNumber, CheckpointNumber, EpochNumber, SlotNumber } from '@aztec/foundation/branded-types'; import { timesParallel } from '@aztec/foundation/collection'; import { EthAddress } from '@aztec/foundation/eth-address'; import { promiseWithResolvers } from '@aztec/foundation/promise'; import { retryUntil } from '@aztec/foundation/retry'; import { sleep } from '@aztec/foundation/sleep'; import type { P2PClient, TxProvider } from '@aztec/p2p'; -import type { PublicProcessorFactory } from '@aztec/simulator/server'; import { CommitteeAttestation, GENESIS_CHECKPOINT_HEADER_HASH, type L2BlockSource } from '@aztec/stdlib/block'; -import { Checkpoint, type PublishedCheckpoint } from '@aztec/stdlib/checkpoint'; +import { Checkpoint, L1PublishedData, PublishedCheckpoint } from '@aztec/stdlib/checkpoint'; import type { ContractDataSource } from '@aztec/stdlib/contract'; import { EmptyL1RollupConstants } from '@aztec/stdlib/epoch-helpers'; import { @@ -26,9 +25,7 @@ import { L1Metrics } from '@aztec/telemetry-client'; import { type MockProxy, mock } from 'jest-mock-extended'; import type { SpecificProverNodeConfig } from './config.js'; -import type { EpochProvingJobData } from './job/epoch-proving-job-data.js'; import type { EpochProvingJob } from './job/epoch-proving-job.js'; -import { EpochMonitor } from './monitors/epoch-monitor.js'; import type { ProverNodePublisher } from './prover-node-publisher.js'; import { ProverNode } from './prover-node.js'; import { ProverPublisherFactory } from './prover-publisher-factory.js'; @@ -43,7 +40,6 @@ describe('prover-node', () => { let worldState: MockProxy; let p2p: MockProxy; let txProvider: MockProxy; - let epochMonitor: MockProxy; let config: SpecificProverNodeConfig; let rollupContract: MockProxy; let publisherFactory: MockProxy; @@ -75,7 +71,6 @@ describe('prover-node', () => { contractDataSource, worldState, p2p, - epochMonitor, rollupContract, l1Metrics, config, @@ -90,10 +85,13 @@ describe('prover-node', () => { l1ToL2MessageSource = mock(); contractDataSource = mock(); worldState = mock(); - epochMonitor = mock(); txProvider = mock(); rollupContract = mock(); + rollupContract.getTips.mockResolvedValue({ + pending: CheckpointNumber(0), + proven: CheckpointNumber(0), + }); publisherFactory = mock(); publisherFactory.create.mockResolvedValue(publisher); @@ -106,6 +104,7 @@ describe('prover-node', () => { proverNodeMaxPendingJobs: 3, proverNodePollingIntervalMs: 10, proverNodeMaxParallelBlocksPerEpoch: 32, + proverNodeOptimisticProcessing: false, txGatheringIntervalMs: 100, txGatheringBatchSize: 10, txGatheringMaxParallelRequestsPerNode: 5, @@ -190,35 +189,39 @@ describe('prover-node', () => { }); it('starts a proof on a finished epoch', async () => { - await proverNode.handleEpochReadyToProve(EpochNumber.fromBigInt(10n)); + await proverNode.startProof(EpochNumber.fromBigInt(10n)); expect(jobs[0].epochNumber).toEqual(EpochNumber.fromBigInt(10n)); expect(jobs[0].job.getDeadline()).toEqual(new Date((l1GenesisTime + 10 + 2) * 1000)); expect(proverNode.totalJobCount).toEqual(1); }); it('requests a publisher for each epoch', async () => { - await proverNode.handleEpochReadyToProve(EpochNumber.fromBigInt(10n)); + await proverNode.startProof(EpochNumber.fromBigInt(10n)); expect(publisherFactory.create).toHaveBeenCalledTimes(1); }); it('does not start a proof if there are no checkpoints in the epoch', async () => { l2BlockSource.getCheckpointsForEpoch.mockResolvedValue([]); - await proverNode.handleEpochReadyToProve(EpochNumber.fromBigInt(10n)); - expect(proverNode.totalJobCount).toEqual(0); + await proverNode.startProof(EpochNumber.fromBigInt(10n)); + // Job is created but immediately cleaned up when gatherEpochData fails. + expect(proverNode.totalJobCount).toEqual(1); + expect(proverNode.getActiveJobCount()).toEqual(0); }); it('does not start a proof if there is a tx missing from coordinator', async () => { txProvider.getTxsForBlock.mockResolvedValue({ missingTxs: [TxHash.random()], txs: [] }); - await proverNode.handleEpochReadyToProve(EpochNumber.fromBigInt(10n)); - expect(proverNode.totalJobCount).toEqual(0); + await proverNode.startProof(EpochNumber.fromBigInt(10n)); + // Job is created but immediately cleaned up when gatherTxs fails. + expect(proverNode.totalJobCount).toEqual(1); + expect(proverNode.getActiveJobCount()).toEqual(0); }); it('does not prove the same epoch twice', async () => { const firstJob = promiseWithResolvers(); proverNode.nextJobRun = () => firstJob.promise; proverNode.nextJobState = 'processing'; - await proverNode.handleEpochReadyToProve(EpochNumber.fromBigInt(10n)); - await proverNode.handleEpochReadyToProve(EpochNumber.fromBigInt(10n)); + await proverNode.startProof(EpochNumber.fromBigInt(10n)); + await proverNode.startProof(EpochNumber.fromBigInt(10n)); firstJob.resolve(); expect(proverNode.totalJobCount).toEqual(1); @@ -226,46 +229,660 @@ describe('prover-node', () => { it('restarts a proof on a reorg', async () => { proverNode.nextJobState = 'reorg'; - await proverNode.handleEpochReadyToProve(EpochNumber.fromBigInt(10n)); + await proverNode.startProof(EpochNumber.fromBigInt(10n)); await retryUntil(() => proverNode.totalJobCount === 2, 'job retried', 5); expect(proverNode.totalJobCount).toEqual(2); }); it('does not restart a proof on an error', async () => { proverNode.nextJobState = 'failed'; - await proverNode.handleEpochReadyToProve(EpochNumber.fromBigInt(10n)); + await proverNode.startProof(EpochNumber.fromBigInt(10n)); await sleep(1000); expect(proverNode.totalJobCount).toEqual(1); }); + it('cleans up failed job so epoch can be retried', async () => { + // First call: gatherEpochData throws because no checkpoints exist. + l2BlockSource.getCheckpointsForEpoch.mockResolvedValueOnce([]); + await proverNode.startProof(EpochNumber.fromBigInt(10n)); + // createJobForEpoch was called (totalJobCount incremented) but createProvingJob cleaned + // up the internal maps when gatherEpochData threw. + expect(proverNode.totalJobCount).toEqual(1); + expect(proverNode.getActiveJobCount()).toEqual(0); + + // Second call: gatherEpochData succeeds. Without the cleanup in createProvingJob's catch, + // the orphaned job would remain in the internal activeJobsByEpoch map. + await proverNode.startProof(EpochNumber.fromBigInt(10n)); + expect(proverNode.totalJobCount).toEqual(2); + }); + + describe('block stream event routing', () => { + // Checkpoints with controlled slot numbers for event-driven tests. + // All slots in epoch 1 (with epochDuration=100, slots 100-199 map to epoch 1). + const EPOCH_DURATION = 100; + const EPOCH_NUMBER = EpochNumber(1); + let publishedCheckpoints: PublishedCheckpoint[]; + + beforeEach(async () => { + // Override L1 constants so all checkpoint slots fall in the same epoch. + l2BlockSource.getL1Constants.mockResolvedValue({ + ...EmptyL1RollupConstants, + l1GenesisTime: BigInt(l1GenesisTime), + epochDuration: EPOCH_DURATION, + }); + + // Create checkpoints with slot numbers all in epoch 1. + const startBlockNumber = 20; + checkpoints = await Promise.all( + [0, 1, 2].map(i => + Checkpoint.random(CheckpointNumber(i + 1), { + numBlocks: 1, + startBlockNumber: startBlockNumber + i, + slotNumber: SlotNumber(EPOCH_DURATION + i), // slots 100, 101, 102 → epoch 1 + }), + ), + ); + previousBlockHeader = BlockHeader.random({ blockNumber: BlockNumber(startBlockNumber - 1) }); + + publishedCheckpoints = checkpoints.map( + cp => new PublishedCheckpoint(cp, L1PublishedData.random(), [CommitteeAttestation.random()]), + ); + + l2BlockSource.getProvenBlockNumber.mockResolvedValue(BlockNumber.ZERO); + + // getBlockHeader returns the previous block header for the first block's predecessor. + l2BlockSource.getBlockHeader.mockImplementation(number => + Promise.resolve(number === checkpoints[0].blocks[0].number - 1 ? previousBlockHeader : undefined), + ); + + // Recreate prover node so the fresh L1Constants mock is used. + proverNode = createProverNode(); + }); + + it('creates job when epoch completes in non-optimistic mode', async () => { + config.proverNodeOptimisticProcessing = false; + proverNode = createProverNode(); + + const lastBlock = checkpoints.at(-1)!.blocks.at(-1)!; + + // Checkpoints arrive — no job yet (non-optimistic waits for epoch completion). + for (const pub of publishedCheckpoints) { + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: pub, + block: { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }, + }); + } + expect(jobs.length).toEqual(0); + + // Epoch completes — job created with all checkpoints pushed. + await proverNode.handleBlockStreamEvent({ + type: 'epoch-completed', + epochNumber: EPOCH_NUMBER, + }); + + expect(jobs.length).toEqual(1); + expect(jobs[0].epochNumber).toEqual(EPOCH_NUMBER); + expect(jobs[0].job.addCheckpoint).toHaveBeenCalledTimes(3); + expect(jobs[0].job.setEpochComplete).toHaveBeenCalledTimes(1); + }); + + it('creates job immediately on first checkpoint in optimistic mode', async () => { + config.proverNodeOptimisticProcessing = true; + proverNode = createProverNode(); + + // Keep job.run() pending so runJob doesn't clean up state while we send events. + const { promise: runPromise, resolve: resolveRun } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise; + proverNode.nextJobState = 'processing'; + + const lastBlock = checkpoints.at(-1)!.blocks.at(-1)!; + const blockId = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }; + + // First checkpoint — job created immediately. + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[0], + block: blockId, + }); + expect(jobs.length).toEqual(1); + expect(jobs[0].job.addCheckpoint).toHaveBeenCalledTimes(1); + expect(jobs[0].job.setEpochComplete).not.toHaveBeenCalled(); + + // More checkpoints — pushed to the same job. + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[1], + block: blockId, + }); + expect(jobs.length).toEqual(1); + expect(jobs[0].job.addCheckpoint).toHaveBeenCalledTimes(2); + + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[2], + block: blockId, + }); + expect(jobs[0].job.addCheckpoint).toHaveBeenCalledTimes(3); + + // Epoch completes — setEpochComplete called on existing job. + await proverNode.handleBlockStreamEvent({ + type: 'epoch-completed', + epochNumber: EPOCH_NUMBER, + }); + expect(jobs.length).toEqual(1); + expect(jobs[0].job.setEpochComplete).toHaveBeenCalledTimes(1); + + // Let run complete for clean shutdown. + resolveRun(); + }); + + it('stops active jobs on chain-pruned event', async () => { + config.proverNodeOptimisticProcessing = true; + proverNode = createProverNode(); + + // Keep job.run() pending so the job stays active. + const { promise: runPromise, resolve: resolveRun } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise; + proverNode.nextJobState = 'processing'; + + const lastBlock = checkpoints.at(-1)!.blocks.at(-1)!; + const blockId = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }; + + // Create an active job. + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[0], + block: blockId, + }); + expect(jobs.length).toEqual(1); + + // Chain pruned — job stopped. + await proverNode.handleBlockStreamEvent({ + type: 'chain-pruned', + block: { number: BlockNumber(10), hash: '0xdead' }, + checkpoint: { number: CheckpointNumber(0), hash: '0xdead' }, + }); + + expect(jobs[0].job.stop).toHaveBeenCalledWith('reorg'); + + // Let run complete for clean shutdown. + resolveRun(); + }); + + it('skips checkpoints for already proven blocks', async () => { + config.proverNodeOptimisticProcessing = true; + proverNode = createProverNode(); + + // Mark all blocks as already proven. + l2BlockSource.getProvenBlockNumber.mockResolvedValue(BlockNumber(100)); + + const lastBlock = checkpoints.at(-1)!.blocks.at(-1)!; + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[0], + block: { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }, + }); + + // No job created because blocks are already proven. + expect(jobs.length).toEqual(0); + }); + + it('deduplicates repeated checkpoint events', async () => { + config.proverNodeOptimisticProcessing = true; + proverNode = createProverNode(); + + const { promise: runPromise, resolve: resolveRun } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise; + proverNode.nextJobState = 'processing'; + + const lastBlock = checkpoints.at(-1)!.blocks.at(-1)!; + const blockId = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }; + + // Send same checkpoint event twice. + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[0], + block: blockId, + }); + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[0], + block: blockId, + }); + + expect(jobs.length).toEqual(1); + expect(jobs[0].job.addCheckpoint).toHaveBeenCalledTimes(1); + + resolveRun(); + }); + + it('passes through distinct checkpoints', async () => { + config.proverNodeOptimisticProcessing = true; + proverNode = createProverNode(); + + const { promise: runPromise, resolve: resolveRun } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise; + proverNode.nextJobState = 'processing'; + + const lastBlock = checkpoints.at(-1)!.blocks.at(-1)!; + const blockId = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }; + + // Send 3 different checkpoint events. + for (const pub of publishedCheckpoints) { + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: pub, + block: blockId, + }); + } + + expect(jobs.length).toEqual(1); + expect(jobs[0].job.addCheckpoint).toHaveBeenCalledTimes(3); + + resolveRun(); + }); + + it('does not skip intermediate checkpoints that become proven mid-epoch', async () => { + config.proverNodeOptimisticProcessing = true; + proverNode = createProverNode(); + + const { promise: runPromise, resolve: resolveRun } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise; + proverNode.nextJobState = 'processing'; + + const lastBlock = checkpoints.at(-1)!.blocks.at(-1)!; + const blockId = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }; + + // First checkpoint: proven block is 0, so it passes the guard and creates a job. + l2BlockSource.getProvenBlockNumber.mockResolvedValueOnce(BlockNumber.ZERO); + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[0], + block: blockId, + }); + expect(jobs.length).toEqual(1); + expect(jobs[0].job.addCheckpoint).toHaveBeenCalledTimes(1); + + // Simulate checkpoint 1's blocks becoming proven (e.g. via cheat codes). + const checkpoint1LastBlock = checkpoints[0].blocks.at(-1)!.number; + l2BlockSource.getProvenBlockNumber.mockResolvedValue(BlockNumber(checkpoint1LastBlock)); + + // Second checkpoint: its last block <= provenBlockNumber, but the epoch is already + // in progress so it must NOT be skipped. + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[1], + block: blockId, + }); + expect(jobs[0].job.addCheckpoint).toHaveBeenCalledTimes(2); + + // Third checkpoint: same — must also pass through. + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[2], + block: blockId, + }); + expect(jobs[0].job.addCheckpoint).toHaveBeenCalledTimes(3); + + resolveRun(); + }); + }); + + describe('submission gate ordering', () => { + // Checkpoints with controlled slot numbers for gate tests. + const EPOCH_DURATION = 100; + let publishedCheckpoints: PublishedCheckpoint[]; + + beforeEach(async () => { + l2BlockSource.getL1Constants.mockResolvedValue({ + ...EmptyL1RollupConstants, + l1GenesisTime: BigInt(l1GenesisTime), + epochDuration: EPOCH_DURATION, + }); + + const startBlockNumber = 20; + checkpoints = await Promise.all( + [0, 1, 2].map(i => + Checkpoint.random(CheckpointNumber(i + 1), { + numBlocks: 1, + startBlockNumber: startBlockNumber + i, + slotNumber: SlotNumber(EPOCH_DURATION + i), + }), + ), + ); + previousBlockHeader = BlockHeader.random({ blockNumber: BlockNumber(startBlockNumber - 1) }); + + publishedCheckpoints = checkpoints.map( + cp => new PublishedCheckpoint(cp, L1PublishedData.random(), [CommitteeAttestation.random()]), + ); + + l2BlockSource.getProvenBlockNumber.mockResolvedValue(BlockNumber.ZERO); + // Return a header for any requested block number so custom checkpoints work. + l2BlockSource.getBlockHeader.mockImplementation(number => + Promise.resolve(BlockHeader.random({ blockNumber: BlockNumber(number as number) })), + ); + + proverNode = createProverNode(); + }); + + it('resolves gate when L1 proven tip allows', async () => { + config.proverNodeOptimisticProcessing = true; + proverNode = createProverNode(); + + // proven=0 on L1. Epoch 1 has fromCheckpoint=1 → 1-1=0 <= 0 → gate resolves. + rollupContract.getTips.mockResolvedValue({ + pending: CheckpointNumber(10), + proven: CheckpointNumber(0), + }); + + const { promise: runPromise, resolve: resolveRun } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise; + proverNode.nextJobState = 'processing'; + + const lastBlock = checkpoints.at(-1)!.blocks.at(-1)!; + const blockId = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }; + + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[0], + block: blockId, + }); + + // Gate should be resolved and removed from the map. + expect(proverNode.getSubmissionGates().has(1)).toBe(false); + + resolveRun(); + }); + + it('does not resolve gate when L1 proven tip is behind', async () => { + config.proverNodeOptimisticProcessing = true; + proverNode = createProverNode(); + + // proven=0 on L1. We need a checkpoint with fromCheckpoint=3 → 3-1=2 > 0 → gate not resolved. + rollupContract.getTips.mockResolvedValue({ + pending: CheckpointNumber(10), + proven: CheckpointNumber(0), + }); + + const { promise: runPromise, resolve: resolveRun } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise; + proverNode.nextJobState = 'processing'; + + // Create a checkpoint with number 3 for epoch 1. + const cp3 = await Checkpoint.random(CheckpointNumber(3), { + numBlocks: 1, + startBlockNumber: 30, + slotNumber: SlotNumber(EPOCH_DURATION), + }); + const pub3 = new PublishedCheckpoint(cp3, L1PublishedData.random(), [CommitteeAttestation.random()]); + + const lastBlock = cp3.blocks.at(-1)!; + const blockId = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }; + + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: pub3, + block: blockId, + }); + + // Gate should still be in the map (not resolved). + const epoch1 = EpochNumber(1); + expect(proverNode.getSubmissionGates().has(epoch1)).toBe(true); + + resolveRun(); + }); + + it('resolves gate on chain-proven event', async () => { + config.proverNodeOptimisticProcessing = true; + proverNode = createProverNode(); + + const { promise: runPromise, resolve: resolveRun } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise; + proverNode.nextJobState = 'processing'; + + // Initially proven=0. Create two epochs worth of checkpoints. + rollupContract.getTips.mockResolvedValue({ + pending: CheckpointNumber(10), + proven: CheckpointNumber(0), + }); + + // Epoch 1: checkpoint 1 (fromCheckpoint=1, 1-1=0 <= 0 → resolves immediately). + const lastBlock1 = checkpoints[0].blocks.at(-1)!; + const blockId1 = { number: lastBlock1.number, hash: (await lastBlock1.hash()).toString() }; + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: publishedCheckpoints[0], + block: blockId1, + }); + + const epoch1 = EpochNumber(1); + expect(proverNode.getSubmissionGates().has(epoch1)).toBe(false); // Resolved. + + // Epoch 2: checkpoint 4 with slot in epoch 2. fromCheckpoint=4, 4-1=3 > 0 → not resolved. + const { promise: runPromise2, resolve: resolveRun2 } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise2; + proverNode.nextJobState = 'processing'; + + const cp4 = await Checkpoint.random(CheckpointNumber(4), { + numBlocks: 1, + startBlockNumber: 40, + slotNumber: SlotNumber(EPOCH_DURATION * 2), + }); + const pub4 = new PublishedCheckpoint(cp4, L1PublishedData.random(), [CommitteeAttestation.random()]); + const lastBlock2 = cp4.blocks.at(-1)!; + const blockId2 = { number: lastBlock2.number, hash: (await lastBlock2.hash()).toString() }; + + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: pub4, + block: blockId2, + }); + + const epoch2 = EpochNumber(2); + expect(proverNode.getSubmissionGates().has(epoch2)).toBe(true); // Not resolved. + + // Now proven advances to 3 (epoch 1 fully proven). Fire chain-proven. + rollupContract.getTips.mockResolvedValue({ + pending: CheckpointNumber(10), + proven: CheckpointNumber(3), + }); + + await proverNode.handleBlockStreamEvent({ + type: 'chain-proven', + block: { number: BlockNumber(22), hash: '0xabc' }, + }); + + // Epoch 2 gate should now be resolved (4-1=3 <= 3). + expect(proverNode.getSubmissionGates().has(epoch2)).toBe(false); + + resolveRun(); + resolveRun2(); + }); + + it('does not resolve gate on partial epoch proven', async () => { + config.proverNodeOptimisticProcessing = true; + proverNode = createProverNode(); + + const { promise: runPromise, resolve: resolveRun } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise; + proverNode.nextJobState = 'processing'; + + // Epoch 1 has checkpoints [1,2,3]. All slots in epoch 1. + rollupContract.getTips.mockResolvedValue({ + pending: CheckpointNumber(10), + proven: CheckpointNumber(0), + }); + + const lastBlock = checkpoints.at(-1)!.blocks.at(-1)!; + const blockId = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }; + + // Push all 3 checkpoints for epoch 1. + for (const pub of publishedCheckpoints) { + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: pub, + block: blockId, + }); + } + + const epoch1 = EpochNumber(1); + expect(proverNode.getSubmissionGates().has(epoch1)).toBe(false); // fromCheckpoint=1, 1-1=0 <= 0. + + // Epoch 2 starts at checkpoint 4, slot in epoch 2. + const { promise: runPromise2, resolve: resolveRun2 } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise2; + proverNode.nextJobState = 'processing'; + + const cp4 = await Checkpoint.random(CheckpointNumber(4), { + numBlocks: 1, + startBlockNumber: 40, + slotNumber: SlotNumber(EPOCH_DURATION * 2), + }); + const pub4 = new PublishedCheckpoint(cp4, L1PublishedData.random(), [CommitteeAttestation.random()]); + const lastBlock2 = cp4.blocks.at(-1)!; + const blockId2 = { number: lastBlock2.number, hash: (await lastBlock2.hash()).toString() }; + + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: pub4, + block: blockId2, + }); + + const epoch2 = EpochNumber(2); + expect(proverNode.getSubmissionGates().has(epoch2)).toBe(true); // 4-1=3 > 0. + + // Partial proven: only checkpoints 1-2 proven → proven=2. + rollupContract.getTips.mockResolvedValue({ + pending: CheckpointNumber(10), + proven: CheckpointNumber(2), + }); + await proverNode.handleBlockStreamEvent({ + type: 'chain-proven', + block: { number: BlockNumber(21), hash: '0xpartial' }, + }); + + // Epoch 2 should NOT resolve (4-1=3 > 2). + expect(proverNode.getSubmissionGates().has(epoch2)).toBe(true); + + // Full epoch 1 proven: proven=3. + rollupContract.getTips.mockResolvedValue({ + pending: CheckpointNumber(10), + proven: CheckpointNumber(3), + }); + await proverNode.handleBlockStreamEvent({ + type: 'chain-proven', + block: { number: BlockNumber(22), hash: '0xfull' }, + }); + + // Now epoch 2 resolves (4-1=3 <= 3). + expect(proverNode.getSubmissionGates().has(epoch2)).toBe(false); + + resolveRun(); + resolveRun2(); + }); + + it('cleans up gates on chain prune', async () => { + config.proverNodeOptimisticProcessing = true; + proverNode = createProverNode(); + + rollupContract.getTips.mockResolvedValue({ + pending: CheckpointNumber(10), + proven: CheckpointNumber(0), + }); + + // Keep job pending. + const { promise: runPromise, resolve: resolveRun } = promiseWithResolvers(); + proverNode.nextJobRun = () => runPromise; + proverNode.nextJobState = 'processing'; + + // Create a checkpoint with number 3 so gate won't immediately resolve (3-1=2 > 0). + const cp3 = await Checkpoint.random(CheckpointNumber(3), { + numBlocks: 1, + startBlockNumber: 30, + slotNumber: SlotNumber(EPOCH_DURATION), + }); + const pub3 = new PublishedCheckpoint(cp3, L1PublishedData.random(), [CommitteeAttestation.random()]); + const lastBlock = cp3.blocks.at(-1)!; + const blockId = { number: lastBlock.number, hash: (await lastBlock.hash()).toString() }; + + await proverNode.handleBlockStreamEvent({ + type: 'chain-checkpointed', + checkpoint: pub3, + block: blockId, + }); + + const epoch1 = EpochNumber(1); + expect(proverNode.getSubmissionGates().has(epoch1)).toBe(true); + + // Chain prune — gate should be removed. + await proverNode.handleBlockStreamEvent({ + type: 'chain-pruned', + block: { number: BlockNumber(10), hash: '0xdead' }, + checkpoint: { number: CheckpointNumber(0), hash: '0xdead' }, + }); + + expect(proverNode.getSubmissionGates().has(epoch1)).toBe(false); + + resolveRun(); + }); + }); + class TestProverNode extends ProverNode { public totalJobCount = 0; public nextJobState: EpochProvingJobState = 'completed'; public nextJobRun: () => Promise = () => Promise.resolve(); - protected override doCreateEpochProvingJob( - data: EpochProvingJobData, - deadline: Date | undefined, - _publicProcessorFactory: PublicProcessorFactory, - ): EpochProvingJob { + protected override async createJobForEpoch(epochNumber: EpochNumber): Promise { + this.publisher = await this.publisherFactory.create(); const state = this.nextJobState; this.nextJobState = 'completed'; const run = this.nextJobRun; this.nextJobRun = () => Promise.resolve(); + const deadlineTs = (l1GenesisTime + Number(epochNumber) + 2) * 1000; const job = mock({ run, getState: () => state, - getEpochNumber: () => data.epochNumber, - getDeadline: () => deadline, + getEpochNumber: () => epochNumber, + getDeadline: () => new Date(deadlineTs), }); job.getId.mockReturnValue(jobs.length.toString()); - jobs.push({ epochNumber: data.epochNumber, job }); + job.addCheckpoint.mockImplementation(() => {}); + job.setEpochComplete.mockImplementation(() => {}); + job.getProvingData.mockReturnValue({ + epochNumber, + checkpoints: [], + txs: new Map(), + l1ToL2Messages: {}, + previousBlockHeader: BlockHeader.empty(), + attestations: [], + }); + // Register in the base class's maps (same as base createJobForEpoch). + this.jobs.set(job.getId(), job); + this.activeJobsByEpoch.set(epochNumber, job); + + // Create submission gate (mirrors base class). + const gate = promiseWithResolvers(); + this.submissionGates.set(epochNumber, gate); + await this.tryResolveNextSubmission(); + + jobs.push({ epochNumber, job }); this.totalJobCount++; return job; } - public override triggerMonitors() { - return super.triggerMonitors(); + /** Exposes the size of the internal activeJobsByEpoch map for testing. */ + public getActiveJobCount() { + return this.activeJobsByEpoch.size; + } + + /** Exposes submission gates for testing. */ + public getSubmissionGates() { + return this.submissionGates; + } + + public override triggerBlockStream() { + return super.triggerBlockStream(); } public override getJobs(): Promise<{ uuid: string; status: EpochProvingJobState; epochNumber: EpochNumber }[]> { diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 06d61cb3f080..bfc43718e1e2 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -5,15 +5,17 @@ import { assertRequired, compact, pick, sum } from '@aztec/foundation/collection import type { Fr } from '@aztec/foundation/curves/bn254'; import { memoize } from '@aztec/foundation/decorators'; import { createLogger } from '@aztec/foundation/log'; +import { type PromiseWithResolvers, promiseWithResolvers } from '@aztec/foundation/promise'; import { DateProvider } from '@aztec/foundation/timer'; import type { DataStoreConfig } from '@aztec/kv-store/config'; import type { P2PClient } from '@aztec/p2p'; import { PublicProcessorFactory } from '@aztec/simulator/server'; -import type { L2BlockSource } from '@aztec/stdlib/block'; -import type { Checkpoint } from '@aztec/stdlib/checkpoint'; +import type { L2BlockSource, L2BlockStreamEvent, L2BlockStreamEventHandler } from '@aztec/stdlib/block'; +import { L2BlockStream, L2TipsMemoryStore } from '@aztec/stdlib/block'; +import type { Checkpoint, PublishedCheckpoint } from '@aztec/stdlib/checkpoint'; import type { ChainConfig } from '@aztec/stdlib/config'; import type { ContractDataSource } from '@aztec/stdlib/contract'; -import { getProofSubmissionDeadlineTimestamp } from '@aztec/stdlib/epoch-helpers'; +import { getEpochAtSlot, getProofSubmissionDeadlineTimestamp } from '@aztec/stdlib/epoch-helpers'; import { type EpochProverManager, EpochProvingJobTerminalState, @@ -25,7 +27,7 @@ import { } from '@aztec/stdlib/interfaces/server'; import type { L1ToL2MessageSource } from '@aztec/stdlib/messaging'; import type { P2PClientType } from '@aztec/stdlib/p2p'; -import type { Tx } from '@aztec/stdlib/tx'; +import type { BlockHeader, Tx } from '@aztec/stdlib/tx'; import { Attributes, L1Metrics, @@ -41,7 +43,6 @@ import type { SpecificProverNodeConfig } from './config.js'; import type { EpochProvingJobData } from './job/epoch-proving-job-data.js'; import { EpochProvingJob, type EpochProvingJobState } from './job/epoch-proving-job.js'; import { ProverNodeJobMetrics, ProverNodeRewardsMetrics } from './metrics.js'; -import type { EpochMonitor, EpochMonitorHandler } from './monitors/epoch-monitor.js'; import type { ProverNodePublisher } from './prover-node-publisher.js'; import type { ProverPublisherFactory } from './prover-publisher-factory.js'; @@ -53,15 +54,29 @@ type DataStoreOptions = Pick & Pick = new Map(); + protected jobs: Map = new Map(); + /** Active jobs indexed by epoch number. */ + protected activeJobsByEpoch: Map = new Map(); + /** Tracks epochs that have been marked as complete. */ + private completedEpochs: Set = new Set(); + /** Pending checkpoints per epoch, for non-optimistic mode. */ + private pendingCheckpoints: Map = new Map(); + /** Previous block headers per epoch, for linking checkpoints. */ + private previousBlockHeaders: Map = new Map(); + /** Submission gates per epoch: resolved when L1 proven tip allows submission. */ + protected submissionGates: Map> = new Map(); + private config: ProverNodeOptions; private jobMetrics: ProverNodeJobMetrics; private rewardsMetrics: ProverNodeRewardsMetrics; + private blockStream: L2BlockStream | undefined; + private l2TipsStore: L2TipsMemoryStore | undefined; + public readonly tracer: Tracer; protected publisher: ProverNodePublisher | undefined; @@ -74,7 +89,6 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable protected readonly contractDataSource: ContractDataSource, protected readonly worldState: WorldStateSynchronizer, protected readonly p2pClient: Pick, 'getTxProvider'> & Partial, - protected readonly epochsMonitor: EpochMonitor, protected readonly rollupContract: RollupContract, protected readonly l1Metrics: L1Metrics, config: Partial = {}, @@ -84,6 +98,7 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable proverNodePollingIntervalMs: 1_000, proverNodeMaxPendingJobs: 100, proverNodeMaxParallelBlocksPerEpoch: 32, + proverNodeOptimisticProcessing: true, txGatheringIntervalMs: 1_000, txGatheringBatchSize: 10, txGatheringMaxParallelRequestsPerNode: 100, @@ -111,43 +126,21 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable return this.p2pClient; } - /** - * Handles an epoch being completed by starting a proof for it if there are no active jobs for it. - * @param epochNumber - The epoch number that was just completed. - * @returns false if there is an error, true otherwise - */ - async handleEpochReadyToProve(epochNumber: EpochNumber): Promise { - try { - this.log.debug(`Running jobs as ${epochNumber} is ready to prove`, { - jobs: Array.from(this.jobs.values()).map(job => `${job.getEpochNumber()}:${job.getId()}`), - }); - const activeJobs = await this.getActiveJobsForEpoch(epochNumber); - if (activeJobs.length > 0) { - this.log.warn(`Not starting proof for ${epochNumber} since there are active jobs for the epoch`, { - activeJobs: activeJobs.map(job => job.uuid), - }); - return true; - } - await this.startProof(epochNumber); - return true; - } catch (err) { - if (err instanceof EmptyEpochError) { - this.log.info(`Not starting proof for ${epochNumber} since no blocks were found`); - } else { - this.log.error(`Error handling epoch completed`, err); - } - return false; - } - } - /** * Starts the prover node so it periodically checks for unproven epochs in the unfinalized chain from L1 and * starts proving jobs for them. */ async start() { - this.epochsMonitor.start(this); await this.publisherFactory.start(); this.publisher = await this.publisherFactory.create(); + + // Set up the L2BlockStream to receive checkpoint events. + this.l2TipsStore = new L2TipsMemoryStore(); + this.blockStream = new L2BlockStream(this.l2BlockSource, this.l2TipsStore, this, this.log, { + pollIntervalMS: this.config.proverNodePollingIntervalMs, + }); + this.blockStream.start(); + await this.rewardsMetrics.start(); this.l1Metrics.start(); this.log.info(`Started Prover Node with prover id ${this.prover.getProverId().toString()}`, this.config); @@ -158,12 +151,16 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable */ async stop() { this.log.info('Stopping ProverNode'); - await this.epochsMonitor.stop(); + await this.blockStream?.stop(); await this.prover.stop(); await tryStop(this.p2pClient); await tryStop(this.l2BlockSource); await tryStop(this.publisherFactory); this.publisher?.interrupt(); + for (const [, gate] of this.submissionGates) { + gate.resolve(); + } + this.submissionGates.clear(); await Promise.all(Array.from(this.jobs.values()).map(job => job.stop())); await this.worldState.stop(); this.rewardsMetrics.stop(); @@ -172,6 +169,163 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable this.log.info('Stopped ProverNode'); } + /** Handles events from the L2BlockStream. */ + public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise { + await this.l2TipsStore?.handleBlockStreamEvent(event); + switch (event.type) { + case 'chain-checkpointed': + await this.onCheckpointAvailable(event.checkpoint); + break; + case 'chain-pruned': + await this.onChainPruned(event); + break; + case 'chain-proven': + await this.tryResolveNextSubmission(); + break; + case 'epoch-completed': + await this.onEpochCompleted(event.epochNumber); + break; + default: + break; + } + } + + @trackSpan('ProverNode.onCheckpointAvailable', _pub => ({ + [Attributes.EPOCH_NUMBER]: _pub.checkpoint.header.slotNumber, + })) + private async onCheckpointAvailable(publishedCheckpoint: PublishedCheckpoint) { + const checkpoint = publishedCheckpoint.checkpoint; + const l1Constants = await this.getL1Constants(); + const epoch = getEpochAtSlot(checkpoint.header.slotNumber, l1Constants); + + const lastBlockInCheckpoint = checkpoint.blocks.at(-1)!.number; + + // Skip already proven checkpoints, but only if we haven't started this epoch yet. + // Once any checkpoint for an epoch is accepted, ALL subsequent checkpoints must be processed + // to maintain the previousBlockHeader chain (otherwise we get gaps when intermediate + // checkpoints become proven before the stream delivers them). + if (!this.pendingCheckpoints.has(epoch) && !this.activeJobsByEpoch.has(epoch)) { + const lastProvenBlock = await this.l2BlockSource.getProvenBlockNumber(); + if (lastBlockInCheckpoint <= lastProvenBlock) { + return; + } + } + + this.log.debug(`Checkpoint ${checkpoint.number} received for epoch ${epoch}`, { + checkpointNumber: checkpoint.number, + epoch, + lastBlockInCheckpoint, + }); + + // Track checkpoint per epoch, deduplicating by checkpoint number. + if (!this.pendingCheckpoints.has(epoch)) { + this.pendingCheckpoints.set(epoch, []); + } + const existing = this.pendingCheckpoints.get(epoch)!; + if (existing.some(e => e.checkpoint.number === checkpoint.number)) { + return; + } + existing.push({ checkpoint, published: publishedCheckpoint }); + + if (this.config.proverNodeOptimisticProcessing) { + // Optimistic mode: find or create job, push checkpoint immediately. + let job = this.activeJobsByEpoch.get(epoch); + if (!job) { + job = await this.createJobForEpoch(epoch); + void this.runJob(job); + } + await this.pushCheckpointToJob(job, epoch, checkpoint, publishedCheckpoint); + } + } + + private async onChainPruned(event: { + block: { number: BlockNumber; hash: string }; + checkpoint: { number: CheckpointNumber; hash: string }; + }) { + const prunedBlockNumber = event.block.number; + this.log.warn(`Chain pruned to block ${prunedBlockNumber}`, event); + + // Stop jobs for epochs affected by the reorg. + for (const [epoch, job] of this.activeJobsByEpoch) { + // If any of the job's processed checkpoints have blocks past the pruned block, stop the job. + this.submissionGates.delete(epoch); + await job.stop('reorg'); + this.activeJobsByEpoch.delete(epoch); + this.completedEpochs.delete(epoch); + this.pendingCheckpoints.delete(epoch); + this.previousBlockHeaders.delete(epoch); + this.jobs.delete(job.getId()); + } + } + + private async onEpochCompleted(completedEpoch: EpochNumber) { + for (const [epoch, checkpoints] of this.pendingCheckpoints) { + if (this.completedEpochs.has(epoch)) { + continue; + } + if (completedEpoch < epoch) { + continue; + } + + this.completedEpochs.add(epoch); + const attestations = checkpoints.at(-1)?.published.attestations ?? []; + + let job = this.activeJobsByEpoch.get(epoch); + if (!job) { + job = await this.createJobForEpoch(EpochNumber(epoch)); + for (const entry of checkpoints) { + await this.pushCheckpointToJob(job, epoch, entry.checkpoint, entry.published); + } + void this.runJob(job); + } + job.setEpochComplete(attestations); + } + } + + /** Pushes a checkpoint to a job with all required data. */ + private async pushCheckpointToJob( + job: EpochProvingJob, + epoch: number, + checkpoint: Checkpoint, + _publishedCheckpoint: PublishedCheckpoint, + ) { + // Gather L1 to L2 messages. + const l1ToL2Messages = await this.l1ToL2MessageSource.getL1ToL2Messages(checkpoint.number); + + // Gather txs. + const deadline = new Date(this.dateProvider.now() + this.config.txGatheringTimeoutMs); + const txProvider = this.p2pClient.getTxProvider(); + const blocks = checkpoint.blocks; + const txsByBlock = await Promise.all(blocks.map(block => txProvider.getTxsForBlock(block, { deadline }))); + const txs = new Map(); + for (const { txs: blockTxs, missingTxs } of txsByBlock) { + if (missingTxs.length > 0) { + throw new Error( + `Txs not found for checkpoint ${checkpoint.number}: ${missingTxs.map(h => h.toString()).join(', ')}`, + ); + } + for (const tx of blockTxs) { + txs.set(tx.getTxHash().toString(), tx); + } + } + + // Determine previous block header. + let previousBlockHeader = this.previousBlockHeaders.get(epoch); + if (!previousBlockHeader) { + const firstBlockNumber = checkpoint.blocks[0].number; + previousBlockHeader = await this.gatherPreviousBlockHeader(EpochNumber(epoch), firstBlockNumber - 1); + } + + // Sync world state up to the last block in the checkpoint. + const lastBlockNumber = checkpoint.blocks.at(-1)!.number; + await this.worldState.syncImmediate(lastBlockNumber); + + job.addCheckpoint(checkpoint, l1ToL2Messages, previousBlockHeader, txs); + + // Update the previous block header for the next checkpoint. + this.previousBlockHeaders.set(epoch, checkpoint.blocks.at(-1)!.header); + } + /** Returns world state status. */ public async getWorldStateSyncStatus(): Promise { const { syncSummary } = await this.worldState.status(); @@ -185,10 +339,21 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable /** * Starts a proving process and returns immediately. + * Skips if there's already an active job for this epoch. */ public async startProof(epochNumber: EpochNumber) { - const job = await this.createProvingJob(epochNumber, { skipEpochCheck: true }); - void this.runJob(job); + const activeJobs = await this.getActiveJobsForEpoch(epochNumber); + if (activeJobs.length > 0) { + this.log.debug(`Skipping proof for epoch ${epochNumber}, already has active job`, { epochNumber }); + return; + } + + try { + const job = await this.createProvingJob(epochNumber); + void this.runJob(job); + } catch (err) { + this.log.error(`Error creating proving job for epoch ${epochNumber}`, err, { epochNumber }); + } } private async runJob(job: EpochProvingJob) { @@ -201,8 +366,7 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable ctx.state = state; if (state === 'reorg') { - this.log.warn(`Running new job for epoch ${epochNumber} due to reorg`, ctx); - await this.createProvingJob(epochNumber); + this.log.warn(`Job for epoch ${epochNumber} stopped due to reorg, will retry`, ctx); } else if (state === 'failed') { this.log.error(`Job for ${epochNumber} exited with state ${state}`, ctx); await this.tryUploadEpochFailure(job); @@ -213,6 +377,45 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable this.log.error(`Error proving epoch ${epochNumber}`, err, ctx); } finally { this.jobs.delete(job.getId()); + this.activeJobsByEpoch.delete(epochNumber); + this.completedEpochs.delete(epochNumber); + this.pendingCheckpoints.delete(epochNumber); + this.previousBlockHeaders.delete(epochNumber); + this.submissionGates.delete(epochNumber); + void this.tryResolveNextSubmission(); + + // Retry on reorg: the epoch data may have changed. + if (job.getState() === 'reorg') { + void this.startProof(epochNumber); + } + } + } + + /** Resolves submission gates for epochs whose fromCheckpoint can submit given the L1 proven tip. */ + protected async tryResolveNextSubmission(): Promise { + if (this.submissionGates.size === 0) { + return; + } + const { proven } = await this.rollupContract.getTips(); + const sortedEpochs = [...this.submissionGates.keys()].sort((a, b) => a - b); + for (const epoch of sortedEpochs) { + const checkpoints = this.pendingCheckpoints.get(epoch); + if (!checkpoints || checkpoints.length === 0) { + // No checkpoint info yet (legacy/non-optimistic path) — resolve immediately. + const gate = this.submissionGates.get(epoch)!; + gate.resolve(); + this.submissionGates.delete(epoch); + continue; + } + const fromCheckpoint = checkpoints[0].checkpoint.number; + if (fromCheckpoint - 1 <= proven) { + const gate = this.submissionGates.get(epoch)!; + this.log.verbose(`Resolving submission gate for epoch ${epoch}`, { proven, fromCheckpoint }); + gate.resolve(); + this.submissionGates.delete(epoch); + } else { + break; // Epochs are sorted; later ones can't submit either. + } } } @@ -264,26 +467,11 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable } } - @trackSpan('ProverNode.createProvingJob', epochNumber => ({ [Attributes.EPOCH_NUMBER]: epochNumber })) - private async createProvingJob(epochNumber: EpochNumber, opts: { skipEpochCheck?: boolean } = {}) { + /** Creates a new EpochProvingJob for the given epoch. Does NOT gather data or push checkpoints. */ + protected async createJobForEpoch(epochNumber: EpochNumber): Promise { this.checkMaximumPendingJobs(); - this.publisher = await this.publisherFactory.create(); - // Gather all data for this epoch - const epochData = await this.gatherEpochData(epochNumber); - const fromCheckpoint = epochData.checkpoints[0].number; - const toCheckpoint = epochData.checkpoints.at(-1)!.number; - const fromBlock = epochData.checkpoints[0].blocks[0].number; - const toBlock = epochData.checkpoints.at(-1)!.blocks.at(-1)!.number; - this.log.verbose( - `Creating proving job for epoch ${epochNumber} for checkpoint range ${fromCheckpoint} to ${toCheckpoint} and block range ${fromBlock} to ${toBlock}`, - ); - - // Fast forward world state to right before the target block and get a fork - await this.worldState.syncImmediate(toBlock); - - // Create a processor factory const publicProcessorFactory = new PublicProcessorFactory( this.contractDataSource, this.dateProvider, @@ -291,14 +479,79 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable this.log.getBindings(), ); - // Set deadline for this job to run. It will abort if it takes too long. const deadlineTs = getProofSubmissionDeadlineTimestamp(epochNumber, await this.getL1Constants()); const deadline = new Date(Number(deadlineTs) * 1000); - const job = this.doCreateEpochProvingJob(epochData, deadline, publicProcessorFactory, this.publisher, opts); + + const { proverNodeMaxParallelBlocksPerEpoch: parallelBlockLimit, proverNodeDisableProofPublish } = this.config; + + const gate = promiseWithResolvers(); + this.submissionGates.set(epochNumber, gate); + + const job = new EpochProvingJob( + epochNumber, + this.worldState, + this.prover.createEpochProver(), + publicProcessorFactory, + this.publisher, + this.jobMetrics, + deadline, + { parallelBlockLimit, skipSubmitProof: proverNodeDisableProofPublish }, + gate.promise, + this.log.getBindings(), + ); + this.jobs.set(job.getId(), job); + this.activeJobsByEpoch.set(epochNumber, job); + await this.tryResolveNextSubmission(); return job; } + /** + * Creates a proving job with all data gathered upfront (legacy/non-optimistic path). + * Used by startProof() which is called externally. + */ + @trackSpan('ProverNode.createProvingJob', epochNumber => ({ [Attributes.EPOCH_NUMBER]: epochNumber })) + private async createProvingJob(epochNumber: EpochNumber) { + const job = await this.createJobForEpoch(epochNumber); + + try { + // Gather all data for this epoch. + const epochData = await this.gatherEpochData(epochNumber); + const fromCheckpoint = epochData.checkpoints[0].number; + const toCheckpoint = epochData.checkpoints.at(-1)!.number; + const fromBlock = epochData.checkpoints[0].blocks[0].number; + const toBlock = epochData.checkpoints.at(-1)!.blocks.at(-1)!.number; + this.log.verbose( + `Creating proving job for epoch ${epochNumber} for checkpoint range ${fromCheckpoint} to ${toCheckpoint} and block range ${fromBlock} to ${toBlock}`, + ); + + // Fast forward world state. + await this.worldState.syncImmediate(toBlock); + + // Push all checkpoints to the job. + const previousBlockHeaders = this.gatherPreviousBlockHeaders(epochData); + for (let i = 0; i < epochData.checkpoints.length; i++) { + const checkpoint = epochData.checkpoints[i]; + job.addCheckpoint( + checkpoint, + epochData.l1ToL2Messages[checkpoint.number], + previousBlockHeaders[i], + epochData.txs, + ); + } + + // Mark epoch complete. + job.setEpochComplete(epochData.attestations); + + return job; + } catch (err) { + // Clean up the registered job so the epoch is not permanently blocked. + this.jobs.delete(job.getId()); + this.activeJobsByEpoch.delete(epochNumber); + throw err; + } + } + @memoize private getL1Constants() { return this.l2BlockSource.getL1Constants(); @@ -366,32 +619,17 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable return header; } - /** Extracted for testing purposes. */ - protected doCreateEpochProvingJob( - data: EpochProvingJobData, - deadline: Date | undefined, - publicProcessorFactory: PublicProcessorFactory, - publisher: ProverNodePublisher, - opts: { skipEpochCheck?: boolean } = {}, - ) { - const { proverNodeMaxParallelBlocksPerEpoch: parallelBlockLimit, proverNodeDisableProofPublish } = this.config; - return new EpochProvingJob( - data, - this.worldState, - this.prover.createEpochProver(), - publicProcessorFactory, - publisher, - this.l2BlockSource, - this.jobMetrics, - deadline, - { parallelBlockLimit, skipSubmitProof: proverNodeDisableProofPublish, ...opts }, - this.log.getBindings(), - ); + /** Returns the last block header in the previous checkpoint for all checkpoints in the epoch. */ + private gatherPreviousBlockHeaders(epochData: EpochProvingJobData) { + const lastBlocks = epochData.checkpoints.map(checkpoint => checkpoint.blocks.at(-1)!); + return [epochData.previousBlockHeader, ...lastBlocks.map(block => block.header).slice(0, -1)]; } /** Extracted for testing purposes. */ - protected async triggerMonitors() { - await this.epochsMonitor.work(); + protected async triggerBlockStream() { + if (this.blockStream) { + await this.blockStream.sync(); + } } private validateConfig() { diff --git a/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts b/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts index 30ce59561faf..e3645fe90d27 100644 --- a/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts +++ b/yarn-project/pxe/src/block_synchronizer/block_synchronizer.ts @@ -90,6 +90,8 @@ export class BlockSynchronizer implements L2BlockStreamEventHandler { } break; } + case 'epoch-completed': + break; case 'chain-pruned': { const currentAnchorBlockHeader = await this.anchorBlockStore.getBlockHeader(); const currentAnchorBlockNumber = currentAnchorBlockHeader.getBlockNumber(); diff --git a/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts b/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts index dbd8fc28c3f9..5f468dca929d 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/interfaces.ts @@ -1,3 +1,5 @@ +import type { EpochNumber } from '@aztec/foundation/branded-types'; + import type { PublishedCheckpoint } from '../../checkpoint/published_checkpoint.js'; import type { L2Block } from '../l2_block.js'; import type { CheckpointId, L2BlockId, L2Tips } from '../l2_block_source.js'; @@ -39,6 +41,10 @@ export type L2BlockStreamEvent = | /** Reports new finalized block (proven and finalized on L1). */ { type: 'chain-finalized'; block: L2BlockId; + } + | /** Reports that an epoch has completed based on L1 time. */ { + type: 'epoch-completed'; + epochNumber: EpochNumber; }; export type L2TipsStore = L2BlockStreamEventHandler & L2BlockStreamLocalDataProvider; diff --git a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts index 9d6e5beeebac..7014dfd663eb 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.test.ts @@ -1,5 +1,5 @@ import { GENESIS_BLOCK_HEADER_HASH } from '@aztec/constants'; -import { BlockNumber, CheckpointNumber } from '@aztec/foundation/branded-types'; +import { BlockNumber, CheckpointNumber, EpochNumber } from '@aztec/foundation/branded-types'; import { compactArray } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/curves/bn254'; @@ -122,6 +122,9 @@ describe('L2BlockStream', () => { ), ); + // Default: no epoch information available + blockSource.getL2EpochNumber.mockResolvedValue(undefined); + // Returns published checkpoints - each checkpoint contains just the one block for simplicity // Respects the limit parameter and returns up to `limit` checkpoints blockSource.getCheckpoints.mockImplementation((checkpointNumber: CheckpointNumber, limit: number) => @@ -1450,6 +1453,105 @@ describe('L2BlockStream', () => { }); }); + describe('epoch-completed', () => { + let localData: TestL2BlockStreamLocalDataProvider; + let handler: TestL2BlockStreamEventHandler; + let blockStream: TestL2BlockStream; + + beforeEach(() => { + localData = new TestL2BlockStreamLocalDataProvider(); + handler = new TestL2BlockStreamEventHandler(); + blockStream = new TestL2BlockStream(blockSource, localData, handler, undefined, { batchSize: 10 }); + }); + + it('does not emit epoch-completed on first work call (initialization only)', async () => { + setRemoteTips(5); + blockSource.getL2EpochNumber.mockResolvedValue(EpochNumber(2)); + + await blockStream.work(); + + const epochEvents = handler.events.filter(e => e.type === 'epoch-completed'); + expect(epochEvents).toHaveLength(0); + }); + + it('does not emit epoch-completed when epoch is unchanged between polls', async () => { + setRemoteTips(5); + blockSource.getL2EpochNumber.mockResolvedValue(EpochNumber(2)); + + await blockStream.work(); + handler.clearEvents(); + + await blockStream.work(); + + const epochEvents = handler.events.filter(e => e.type === 'epoch-completed'); + expect(epochEvents).toHaveLength(0); + }); + + it('emits epoch-completed when epoch advances', async () => { + setRemoteTips(5); + blockSource.getL2EpochNumber.mockResolvedValue(EpochNumber(2)); + + await blockStream.work(); + handler.clearEvents(); + + blockSource.getL2EpochNumber.mockResolvedValue(EpochNumber(3)); + + await blockStream.work(); + + const epochEvents = handler.events.filter(e => e.type === 'epoch-completed'); + expect(epochEvents).toHaveLength(1); + expect(epochEvents[0]).toEqual({ type: 'epoch-completed', epochNumber: EpochNumber(2) }); + }); + + it('emits epoch-completed after all other events', async () => { + setRemoteTips(5); + blockSource.getL2EpochNumber.mockResolvedValue(EpochNumber(2)); + + await blockStream.work(); + handler.clearEvents(); + + setRemoteTips(10, 0, 8, 6); + localData.proposed.number = BlockNumber(5); + localData.proven.block.number = BlockNumber(5); + localData.finalized.block.number = BlockNumber(5); + blockSource.getL2EpochNumber.mockResolvedValue(EpochNumber(3)); + + await blockStream.work(); + + const lastEvent = handler.events.at(-1)!; + expect(lastEvent).toEqual({ type: 'epoch-completed', epochNumber: EpochNumber(2) }); + }); + + it('emits only one event when multiple epochs elapse', async () => { + setRemoteTips(5); + blockSource.getL2EpochNumber.mockResolvedValue(EpochNumber(1)); + + await blockStream.work(); + handler.clearEvents(); + + blockSource.getL2EpochNumber.mockResolvedValue(EpochNumber(4)); + + await blockStream.work(); + + const epochEvents = handler.events.filter(e => e.type === 'epoch-completed'); + expect(epochEvents).toHaveLength(1); + expect(epochEvents[0]).toEqual({ type: 'epoch-completed', epochNumber: EpochNumber(3) }); + }); + + it('does not emit epoch-completed when getL2EpochNumber returns undefined', async () => { + setRemoteTips(5); + // getL2EpochNumber returns undefined (default mock) + + await blockStream.work(); + handler.clearEvents(); + + await blockStream.work(); + + const epochEvents = handler.events.filter(e => e.type === 'epoch-completed'); + expect(epochEvents).toHaveLength(0); + }); + }); + describe('skipFinalized', () => { let localData: TestL2BlockStreamLocalDataProvider; let handler: TestL2BlockStreamEventHandler; diff --git a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts index dae14e6fb962..10d4b2dc2365 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/l2_block_stream.ts @@ -1,4 +1,4 @@ -import { BlockNumber, CheckpointNumber } from '@aztec/foundation/branded-types'; +import { BlockNumber, CheckpointNumber, EpochNumber } from '@aztec/foundation/branded-types'; import { AbortError } from '@aztec/foundation/error'; import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; @@ -15,12 +15,14 @@ export class L2BlockStream { private readonly runningPromise: RunningPromise; private isSyncing = false; private hasStarted = false; + private lastKnownEpoch: EpochNumber | undefined; constructor( private l2BlockSource: Pick< L2BlockSource, 'getBlocks' | 'getBlockHeader' | 'getL2Tips' | 'getCheckpoints' | 'getCheckpointedBlocks' - >, + > & + Partial>, private localData: L2BlockStreamLocalDataProvider, private handler: L2BlockStreamEventHandler, private readonly log = createLogger('types:block_stream'), @@ -231,6 +233,16 @@ export class L2BlockStream { if (localTips.finalized !== undefined && sourceTips.finalized.block.number !== localTips.finalized.block.number) { await this.emitEvent({ type: 'chain-finalized', block: sourceTips.finalized.block }); } + + // Detect epoch transitions based on L1 time. + const currentEpoch = await this.l2BlockSource.getL2EpochNumber?.(); + if (currentEpoch !== undefined && this.lastKnownEpoch !== undefined && currentEpoch > this.lastKnownEpoch) { + const completedEpoch = EpochNumber(currentEpoch - 1); + await this.emitEvent({ type: 'epoch-completed', epochNumber: completedEpoch }); + } + if (currentEpoch !== undefined) { + this.lastKnownEpoch = currentEpoch; + } } catch (err: any) { if (err.name === 'AbortError') { return; @@ -277,7 +289,7 @@ export class L2BlockStream { private async emitEvent(event: L2BlockStreamEvent) { this.log.debug( - `Emitting ${event.type} (${event.type === 'blocks-added' ? event.blocks.length : event.type === 'chain-checkpointed' ? event.checkpoint.checkpoint.number : event.block.number})`, + `Emitting ${event.type} (${event.type === 'blocks-added' ? event.blocks.length : event.type === 'chain-checkpointed' ? event.checkpoint.checkpoint.number : event.type === 'epoch-completed' ? event.epochNumber : event.block.number})`, ); await this.handler.handleBlockStreamEvent(event); if (!this.isRunning() && !this.isSyncing) { diff --git a/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts b/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts index 80ea95e7190c..a92e57e2b577 100644 --- a/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts +++ b/yarn-project/stdlib/src/block/l2_block_stream/l2_tips_store_base.ts @@ -105,6 +105,8 @@ export abstract class L2TipsStoreBase implements L2BlockStreamEventHandler, L2Bl case 'chain-finalized': await this.handleChainFinalized(event); break; + case 'epoch-completed': + break; } } diff --git a/yarn-project/stdlib/src/interfaces/epoch-prover.ts b/yarn-project/stdlib/src/interfaces/epoch-prover.ts index 4a0103ea66b8..21c14eeeda15 100644 --- a/yarn-project/stdlib/src/interfaces/epoch-prover.ts +++ b/yarn-project/stdlib/src/interfaces/epoch-prover.ts @@ -14,16 +14,22 @@ import type { IBlockFactory } from './block-builder.js'; /** Coordinates the proving of an entire epoch. */ export interface EpochProver extends Omit { /** - * Starts a new epoch. Must be the first method to be called. + * Starts a new epoch. Block-level proving can begin immediately after this call. + * Call setEpochStructure() once the epoch is complete to unblock checkpoint root rollups. * @param epochNumber - The epoch number. - * @param totalNumCheckpoints - The total number of checkpoints expected in the epoch (must be at least one). - * @param finalBlobBatchingChallenges - The final blob batching challenges for the epoch. **/ - startNewEpoch( - epochNumber: EpochNumber, + startNewEpoch(epochNumber: EpochNumber): void; + + /** + * Sets epoch structure once the epoch is complete. Must be called before finalizeEpoch(). + * Unblocks checkpoint root rollups and above. + * @param totalNumCheckpoints - The total number of checkpoints in the epoch. + * @param finalBlobBatchingChallenges - The final blob batching challenges for the epoch. + */ + setEpochStructure( totalNumCheckpoints: number, finalBlobBatchingChallenges: FinalBlobBatchingChallenges, - ): void; + ): Promise; /** * Starts a new checkpoint. diff --git a/yarn-project/stdlib/src/interfaces/prover-node.ts b/yarn-project/stdlib/src/interfaces/prover-node.ts index b28438708136..abe8321ef403 100644 --- a/yarn-project/stdlib/src/interfaces/prover-node.ts +++ b/yarn-project/stdlib/src/interfaces/prover-node.ts @@ -8,6 +8,7 @@ const EpochProvingJobState = [ 'initialized', 'processing', 'awaiting-prover', + 'awaiting-submission', 'publishing-proof', 'completed', 'failed', diff --git a/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts b/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts index 665b8f83329b..3270f9a1c033 100644 --- a/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts +++ b/yarn-project/telemetry-client/src/wrappers/l2_block_stream.ts @@ -14,7 +14,8 @@ export class TraceableL2BlockStream extends L2BlockStream implements Traceable { l2BlockSource: Pick< L2BlockSource, 'getBlocks' | 'getBlockHeader' | 'getL2Tips' | 'getCheckpoints' | 'getCheckpointedBlocks' - >, + > & + Partial>, localData: L2BlockStreamLocalDataProvider, handler: L2BlockStreamEventHandler, public readonly tracer: Tracer, diff --git a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts index daa67b1882d1..1dda03099cea 100644 --- a/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts +++ b/yarn-project/world-state/src/synchronizer/server_world_state_synchronizer.ts @@ -291,6 +291,8 @@ export class ServerWorldStateSynchronizer case 'chain-finalized': await this.handleChainFinalized(event.block.number); break; + case 'epoch-completed': + break; } }