Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export class BlockProvingState {
| ProofState<BlockRollupPublicInputs, typeof NESTED_RECURSIVE_ROLLUP_HONK_PROOF_LENGTH>
| undefined;
private builtBlockHeader: BlockHeader | undefined;
private builtArchive: AppendOnlyTreeSnapshot | undefined;
private endState: StateReference | undefined;
private endSpongeBlob: SpongeBlob | undefined;
private txs: TxProvingState[] = [];
Expand Down Expand Up @@ -232,6 +233,14 @@ export class BlockProvingState {
return this.builtBlockHeader;
}

public setBuiltArchive(archive: AppendOnlyTreeSnapshot) {
this.builtArchive = archive;
}

public getBuiltArchive() {
return this.builtArchive;
}

public getStartSpongeBlob() {
return this.startSpongeBlob;
}
Expand Down
89 changes: 40 additions & 49 deletions yarn-project/prover-client/src/orchestrator/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ import { EpochProvingState, type ProvingResult, type TreeSnapshots } from './epo
import { ProvingOrchestratorMetrics } from './orchestrator_metrics.js';
import { TxProvingState } from './tx-proving-state.js';

type WorldStateFork = {
fork: MerkleTreeWriteOperations;
cleanupPromise: Promise<void> | undefined;
};

/**
* Implements an event driven proving scheduler to build the recursive proof tree. The idea being:
* 1. Transactions are provided to the scheduler post simulation.
Expand All @@ -97,7 +92,7 @@ export class ProvingOrchestrator implements EpochProver {
private provingPromise: Promise<ProvingResult> | undefined = undefined;
private metrics: ProvingOrchestratorMetrics;
// eslint-disable-next-line aztec-custom/no-non-primitive-in-collections
private dbs: Map<BlockNumber, WorldStateFork> = new Map();
private dbs: Map<BlockNumber, MerkleTreeWriteOperations> = new Map();
private logger: Logger;

constructor(
Expand Down Expand Up @@ -182,7 +177,7 @@ export class ProvingOrchestrator implements EpochProver {
const db = await this.dbProvider.fork(lastBlockNumber);

const firstBlockNumber = BlockNumber(lastBlockNumber + 1);
this.dbs.set(firstBlockNumber, { fork: db, cleanupPromise: undefined });
this.dbs.set(firstBlockNumber, db);

// Get archive sibling path before any block in this checkpoint lands.
const lastArchiveSiblingPath = await getLastSiblingPath(MerkleTreeId.ARCHIVE, db);
Expand Down Expand Up @@ -240,9 +235,9 @@ export class ProvingOrchestrator implements EpochProver {
if (!this.dbs.has(blockNumber)) {
// Fork world state at the end of the immediately previous block
const db = await this.dbProvider.fork(BlockNumber(blockNumber - 1));
this.dbs.set(blockNumber, { fork: db, cleanupPromise: undefined });
this.dbs.set(blockNumber, db);
}
const db = this.dbs.get(blockNumber)!.fork;
const db = this.getDbForBlock(blockNumber);

// Get archive snapshot and sibling path before any txs in this block lands.
const lastArchiveTreeSnapshot = await getTreeSnapshot(MerkleTreeId.ARCHIVE, db);
Expand Down Expand Up @@ -317,7 +312,7 @@ export class ProvingOrchestrator implements EpochProver {

this.logger.info(`Adding ${txs.length} transactions to block ${blockNumber}`);

const db = this.dbs.get(blockNumber)!.fork;
const db = this.getDbForBlock(blockNumber);
const lastArchive = provingState.lastArchiveTreeSnapshot;
const newL1ToL2MessageTreeSnapshot = provingState.newL1ToL2MessageTreeSnapshot;
const spongeBlobState = provingState.getStartSpongeBlob().clone();
Expand Down Expand Up @@ -445,14 +440,20 @@ export class ProvingOrchestrator implements EpochProver {
throw new Error('Block header mismatch');
}

// Get db for this block
const db = this.dbs.get(provingState.blockNumber)!.fork;
// Get db for this block and remove from map — no other code should use it after this point.
const db = this.getDbForBlock(provingState.blockNumber);
this.dbs.delete(provingState.blockNumber);

// Update the archive tree, so we're ready to start processing the next block:
this.logger.verbose(
`Updating archive tree with block ${provingState.blockNumber} header ${(await header.hash()).toString()}`,
);
await db.updateArchive(header);
// Update the archive tree, capture the snapshot, and close the fork deterministically.
try {
this.logger.verbose(
`Updating archive tree with block ${provingState.blockNumber} header ${(await header.hash()).toString()}`,
);
await db.updateArchive(header);
provingState.setBuiltArchive(await getTreeSnapshot(MerkleTreeId.ARCHIVE, db));
} finally {
await db.close();
}

await this.verifyBuiltBlockAgainstSyncedState(provingState);

Expand All @@ -472,6 +473,13 @@ export class ProvingOrchestrator implements EpochProver {
this.logger.debug('Block root rollup proof not built yet, skipping header check.');
return;
}

const newArchive = provingState.getBuiltArchive();
if (!newArchive) {
this.logger.debug('Archive snapshot not yet captured, skipping header check.');
return;
}

const header = await buildHeaderFromCircuitOutputs(output);

if (!(await header.hash()).equals(await builtBlockHeader.hash())) {
Expand All @@ -480,11 +488,7 @@ export class ProvingOrchestrator implements EpochProver {
return;
}

// Get db for this block
const blockNumber = provingState.blockNumber;
const db = this.dbs.get(blockNumber)!.fork;

const newArchive = await getTreeSnapshot(MerkleTreeId.ARCHIVE, db);
const syncedArchive = await getTreeSnapshot(MerkleTreeId.ARCHIVE, this.dbProvider.getSnapshot(blockNumber));
if (!syncedArchive.equals(newArchive)) {
this.logger.error(
Expand All @@ -502,12 +506,6 @@ export class ProvingOrchestrator implements EpochProver {
provingState.reject(`New archive mismatch.`);
return;
}

// TODO(palla/prover): This closes the fork only on the happy path. If this epoch orchestrator
// is aborted and never reaches this point, it will leak the fork. We need to add a global cleanup,
// but have to make sure it only runs once all operations are completed, otherwise some function here
// will attempt to access the fork after it was closed.
void this.cleanupDBFork(blockNumber);
}

/**
Expand All @@ -523,6 +521,19 @@ export class ProvingOrchestrator implements EpochProver {
}

this.provingState?.cancel();

for (const [blockNumber, db] of this.dbs.entries()) {
void db.close().catch(err => this.logger.error(`Error closing db for block ${blockNumber}`, err));
}
this.dbs.clear();
}

private getDbForBlock(blockNumber: BlockNumber): MerkleTreeWriteOperations {
const db = this.dbs.get(blockNumber);
if (!db) {
throw new Error(`World state fork for block ${blockNumber} not found.`);
}
return db;
}

/**
Expand Down Expand Up @@ -554,24 +565,6 @@ export class ProvingOrchestrator implements EpochProver {
return epochProofResult;
}

private async cleanupDBFork(blockNumber: BlockNumber): Promise<void> {
this.logger.debug(`Cleaning up world state fork for ${blockNumber}`);
const fork = this.dbs.get(blockNumber);
if (!fork) {
return;
}

try {
if (!fork.cleanupPromise) {
fork.cleanupPromise = fork.fork.close();
}
await fork.cleanupPromise;
this.dbs.delete(blockNumber);
} catch (err) {
this.logger.error(`Error closing db for block ${blockNumber}`, err);
}
}

/**
* Enqueue a job to be scheduled
* @param provingState - The proving state object being operated on
Expand Down Expand Up @@ -894,17 +887,15 @@ export class ProvingOrchestrator implements EpochProver {
const leafLocation = provingState.setBlockRootRollupProof(result);
const checkpointProvingState = provingState.parentCheckpoint;

// If the proofs were slower than the block header building, then we need to try validating the block header hashes here.
// Verification is called from both here and setBlockCompleted. Whichever runs last
// will be the first to see all three pieces (header, proof output, archive) and run the checks.
await this.verifyBuiltBlockAgainstSyncedState(provingState);

if (checkpointProvingState.totalNumBlocks === 1) {
await this.checkAndEnqueueCheckpointRootRollup(checkpointProvingState);
} else {
await this.checkAndEnqueueNextBlockMergeRollup(checkpointProvingState, leafLocation);
}

// We are finished with the block at this point, ensure the fork is cleaned up
void this.cleanupDBFork(provingState.blockNumber);
},
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,7 @@ describe('prover/orchestrator/errors', () => {
await orchestrator.startNewBlock(blockNumber, timestamp, 1);
orchestrator.cancel();

await expect(async () => await orchestrator.addTxs(block.txs)).rejects.toThrow(
'Invalid proving state when adding a tx',
);
await expect(async () => await orchestrator.addTxs(block.txs)).rejects.toThrow('World state fork for block');
});

it('rejects if too many l1 to l2 messages are provided', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ describe('prover/orchestrator', () => {

const result = await orchestrator.finalizeEpoch();
expect(result.proof).toBeDefined();
const numForks = orchestrator.getNumActiveForks();
expect(numForks).toEqual(0);
// Forks are closed deterministically in setBlockCompleted, so no cancel() needed.
expect(orchestrator.getNumActiveForks()).toEqual(0);
});

it('can start chonk verifier proofs before adding processed txs', async () => {
Expand Down
Loading