From 881187c8a5cda1d090d1469e85ef8d84874442c4 Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Tue, 17 Feb 2026 10:40:58 +0000 Subject: [PATCH] fix: async world state cleanups --- .../src/orchestrator/block-proving-state.ts | 9 ++ .../src/orchestrator/orchestrator.ts | 89 +++++++++---------- .../orchestrator/orchestrator_errors.test.ts | 4 +- .../orchestrator_workflow.test.ts | 4 +- 4 files changed, 52 insertions(+), 54 deletions(-) diff --git a/yarn-project/prover-client/src/orchestrator/block-proving-state.ts b/yarn-project/prover-client/src/orchestrator/block-proving-state.ts index fb49b4be2d0e..53c7a594db82 100644 --- a/yarn-project/prover-client/src/orchestrator/block-proving-state.ts +++ b/yarn-project/prover-client/src/orchestrator/block-proving-state.ts @@ -55,6 +55,7 @@ export class BlockProvingState { | ProofState | undefined; private builtBlockHeader: BlockHeader | undefined; + private builtArchive: AppendOnlyTreeSnapshot | undefined; private endState: StateReference | undefined; private endSpongeBlob: SpongeBlob | undefined; private txs: TxProvingState[] = []; @@ -232,6 +233,14 @@ export class BlockProvingState { return this.builtBlockHeader; } + public setBuiltArchive(archive: AppendOnlyTreeSnapshot) { + this.builtArchive = archive; + } + + public getBuiltArchive() { + return this.builtArchive; + } + public getStartSpongeBlob() { return this.startSpongeBlob; } diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator.ts b/yarn-project/prover-client/src/orchestrator/orchestrator.ts index 23a9f040a550..001e91e0de34 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator.ts @@ -71,11 +71,6 @@ import { EpochProvingState, type ProvingResult, type TreeSnapshots } from './epo import { ProvingOrchestratorMetrics } from './orchestrator_metrics.js'; import { TxProvingState } from './tx-proving-state.js'; -type WorldStateFork = { - fork: MerkleTreeWriteOperations; - cleanupPromise: Promise | undefined; -}; - /** * Implements an event driven proving scheduler to build the recursive proof tree. The idea being: * 1. Transactions are provided to the scheduler post simulation. @@ -97,7 +92,7 @@ export class ProvingOrchestrator implements EpochProver { private provingPromise: Promise | undefined = undefined; private metrics: ProvingOrchestratorMetrics; // eslint-disable-next-line aztec-custom/no-non-primitive-in-collections - private dbs: Map = new Map(); + private dbs: Map = new Map(); private logger: Logger; constructor( @@ -182,7 +177,7 @@ export class ProvingOrchestrator implements EpochProver { const db = await this.dbProvider.fork(lastBlockNumber); const firstBlockNumber = BlockNumber(lastBlockNumber + 1); - this.dbs.set(firstBlockNumber, { fork: db, cleanupPromise: undefined }); + this.dbs.set(firstBlockNumber, db); // Get archive sibling path before any block in this checkpoint lands. const lastArchiveSiblingPath = await getLastSiblingPath(MerkleTreeId.ARCHIVE, db); @@ -240,9 +235,9 @@ export class ProvingOrchestrator implements EpochProver { if (!this.dbs.has(blockNumber)) { // Fork world state at the end of the immediately previous block const db = await this.dbProvider.fork(BlockNumber(blockNumber - 1)); - this.dbs.set(blockNumber, { fork: db, cleanupPromise: undefined }); + this.dbs.set(blockNumber, db); } - const db = this.dbs.get(blockNumber)!.fork; + const db = this.getDbForBlock(blockNumber); // Get archive snapshot and sibling path before any txs in this block lands. const lastArchiveTreeSnapshot = await getTreeSnapshot(MerkleTreeId.ARCHIVE, db); @@ -317,7 +312,7 @@ export class ProvingOrchestrator implements EpochProver { this.logger.info(`Adding ${txs.length} transactions to block ${blockNumber}`); - const db = this.dbs.get(blockNumber)!.fork; + const db = this.getDbForBlock(blockNumber); const lastArchive = provingState.lastArchiveTreeSnapshot; const newL1ToL2MessageTreeSnapshot = provingState.newL1ToL2MessageTreeSnapshot; const spongeBlobState = provingState.getStartSpongeBlob().clone(); @@ -445,14 +440,20 @@ export class ProvingOrchestrator implements EpochProver { throw new Error('Block header mismatch'); } - // Get db for this block - const db = this.dbs.get(provingState.blockNumber)!.fork; + // Get db for this block and remove from map — no other code should use it after this point. + const db = this.getDbForBlock(provingState.blockNumber); + this.dbs.delete(provingState.blockNumber); - // Update the archive tree, so we're ready to start processing the next block: - this.logger.verbose( - `Updating archive tree with block ${provingState.blockNumber} header ${(await header.hash()).toString()}`, - ); - await db.updateArchive(header); + // Update the archive tree, capture the snapshot, and close the fork deterministically. + try { + this.logger.verbose( + `Updating archive tree with block ${provingState.blockNumber} header ${(await header.hash()).toString()}`, + ); + await db.updateArchive(header); + provingState.setBuiltArchive(await getTreeSnapshot(MerkleTreeId.ARCHIVE, db)); + } finally { + await db.close(); + } await this.verifyBuiltBlockAgainstSyncedState(provingState); @@ -472,6 +473,13 @@ export class ProvingOrchestrator implements EpochProver { this.logger.debug('Block root rollup proof not built yet, skipping header check.'); return; } + + const newArchive = provingState.getBuiltArchive(); + if (!newArchive) { + this.logger.debug('Archive snapshot not yet captured, skipping header check.'); + return; + } + const header = await buildHeaderFromCircuitOutputs(output); if (!(await header.hash()).equals(await builtBlockHeader.hash())) { @@ -480,11 +488,7 @@ export class ProvingOrchestrator implements EpochProver { return; } - // Get db for this block const blockNumber = provingState.blockNumber; - const db = this.dbs.get(blockNumber)!.fork; - - const newArchive = await getTreeSnapshot(MerkleTreeId.ARCHIVE, db); const syncedArchive = await getTreeSnapshot(MerkleTreeId.ARCHIVE, this.dbProvider.getSnapshot(blockNumber)); if (!syncedArchive.equals(newArchive)) { this.logger.error( @@ -502,12 +506,6 @@ export class ProvingOrchestrator implements EpochProver { provingState.reject(`New archive mismatch.`); return; } - - // TODO(palla/prover): This closes the fork only on the happy path. If this epoch orchestrator - // is aborted and never reaches this point, it will leak the fork. We need to add a global cleanup, - // but have to make sure it only runs once all operations are completed, otherwise some function here - // will attempt to access the fork after it was closed. - void this.cleanupDBFork(blockNumber); } /** @@ -523,6 +521,19 @@ export class ProvingOrchestrator implements EpochProver { } this.provingState?.cancel(); + + for (const [blockNumber, db] of this.dbs.entries()) { + void db.close().catch(err => this.logger.error(`Error closing db for block ${blockNumber}`, err)); + } + this.dbs.clear(); + } + + private getDbForBlock(blockNumber: BlockNumber): MerkleTreeWriteOperations { + const db = this.dbs.get(blockNumber); + if (!db) { + throw new Error(`World state fork for block ${blockNumber} not found.`); + } + return db; } /** @@ -554,24 +565,6 @@ export class ProvingOrchestrator implements EpochProver { return epochProofResult; } - private async cleanupDBFork(blockNumber: BlockNumber): Promise { - this.logger.debug(`Cleaning up world state fork for ${blockNumber}`); - const fork = this.dbs.get(blockNumber); - if (!fork) { - return; - } - - try { - if (!fork.cleanupPromise) { - fork.cleanupPromise = fork.fork.close(); - } - await fork.cleanupPromise; - this.dbs.delete(blockNumber); - } catch (err) { - this.logger.error(`Error closing db for block ${blockNumber}`, err); - } - } - /** * Enqueue a job to be scheduled * @param provingState - The proving state object being operated on @@ -894,7 +887,8 @@ export class ProvingOrchestrator implements EpochProver { const leafLocation = provingState.setBlockRootRollupProof(result); const checkpointProvingState = provingState.parentCheckpoint; - // If the proofs were slower than the block header building, then we need to try validating the block header hashes here. + // Verification is called from both here and setBlockCompleted. Whichever runs last + // will be the first to see all three pieces (header, proof output, archive) and run the checks. await this.verifyBuiltBlockAgainstSyncedState(provingState); if (checkpointProvingState.totalNumBlocks === 1) { @@ -902,9 +896,6 @@ export class ProvingOrchestrator implements EpochProver { } else { await this.checkAndEnqueueNextBlockMergeRollup(checkpointProvingState, leafLocation); } - - // We are finished with the block at this point, ensure the fork is cleaned up - void this.cleanupDBFork(provingState.blockNumber); }, ); } diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator_errors.test.ts b/yarn-project/prover-client/src/orchestrator/orchestrator_errors.test.ts index 6f5e642e54fd..3d148f6beca7 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator_errors.test.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator_errors.test.ts @@ -151,9 +151,7 @@ describe('prover/orchestrator/errors', () => { await orchestrator.startNewBlock(blockNumber, timestamp, 1); orchestrator.cancel(); - await expect(async () => await orchestrator.addTxs(block.txs)).rejects.toThrow( - 'Invalid proving state when adding a tx', - ); + await expect(async () => await orchestrator.addTxs(block.txs)).rejects.toThrow('World state fork for block'); }); it('rejects if too many l1 to l2 messages are provided', async () => { diff --git a/yarn-project/prover-client/src/orchestrator/orchestrator_workflow.test.ts b/yarn-project/prover-client/src/orchestrator/orchestrator_workflow.test.ts index 2ac46e75b7fc..0ada8a3c4267 100644 --- a/yarn-project/prover-client/src/orchestrator/orchestrator_workflow.test.ts +++ b/yarn-project/prover-client/src/orchestrator/orchestrator_workflow.test.ts @@ -183,8 +183,8 @@ describe('prover/orchestrator', () => { const result = await orchestrator.finalizeEpoch(); expect(result.proof).toBeDefined(); - const numForks = orchestrator.getNumActiveForks(); - expect(numForks).toEqual(0); + // Forks are closed deterministically in setBlockCompleted, so no cancel() needed. + expect(orchestrator.getNumActiveForks()).toEqual(0); }); it('can start chonk verifier proofs before adding processed txs', async () => {