Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,11 @@ describe('FeePayerBalanceEvictionRule', () => {
expect(result.txsEvicted).toEqual(expect.arrayContaining(txHashes(tx2)));
expect(result.txsEvicted.map(txHash => txHash.toString())).not.toContain(tx1.getTxHash().toString());
expect(txPool.deleteTxs).toHaveBeenCalledWith(expect.arrayContaining(txHashes(tx2)));
// Ensure syncImmediate is called before accessing the world state snapshot
expect(worldStateSynchronizer.syncImmediate).toHaveBeenCalledWith(blockHeader.getBlockNumber());
// Ensure syncImmediate is called with blockNumber and blockHash before accessing the world state snapshot
expect(worldStateSynchronizer.syncImmediate).toHaveBeenCalledWith(
blockHeader.getBlockNumber(),
await blockHeader.hash(),
);
});

it('handles empty fee payer entries after BLOCK_MINED', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ export class FeePayerBalanceEvictionRule implements EvictionRule {

if (context.event === EvictionEvent.BLOCK_MINED) {
const blockNumber = context.block.getBlockNumber();
const blockHash = await context.block.hash();
// Ensure world state is synced to this block before accessing the snapshot.
// This handles the race where a block is added to the archiver
// but the world state hasn't synced it yet.
await this.worldState.syncImmediate(blockNumber);
await this.worldState.syncImmediate(blockNumber, blockHash);
return await this.evictForFeePayers(context.feePayers, this.worldState.getSnapshot(blockNumber), txPool);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ describe('FeePayerBalanceEvictionRule', () => {

await rule.evict(context, pool);

expect(mockWorldState.syncImmediate).toHaveBeenCalledWith(5);
expect(mockWorldState.syncImmediate).toHaveBeenCalledWith(5, await blockHeader.hash());
expect(mockWorldState.getSnapshot).toHaveBeenCalledWith(5);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ export class FeePayerBalanceEvictionRule implements EvictionRule {

if (context.event === EvictionEvent.BLOCK_MINED) {
const blockNumber = context.block.getBlockNumber();
await this.worldState.syncImmediate(blockNumber);
const blockHash = await context.block.hash();
await this.worldState.syncImmediate(blockNumber, blockHash);
return await this.evictForFeePayers(context.feePayers, this.worldState.getSnapshot(blockNumber), pool);
}

Expand Down
6 changes: 4 additions & 2 deletions yarn-project/prover-node/src/prover-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,15 @@ export class ProverNode implements EpochMonitorHandler, ProverNodeApi, Traceable
const fromCheckpoint = epochData.checkpoints[0].number;
const toCheckpoint = epochData.checkpoints.at(-1)!.number;
const fromBlock = epochData.checkpoints[0].blocks[0].number;
const toBlock = epochData.checkpoints.at(-1)!.blocks.at(-1)!.number;
const lastBlock = epochData.checkpoints.at(-1)!.blocks.at(-1)!;
const toBlock = lastBlock.number;
this.log.verbose(
`Creating proving job for epoch ${epochNumber} for checkpoint range ${fromCheckpoint} to ${toCheckpoint} and block range ${fromBlock} to ${toBlock}`,
);

// Fast forward world state to right before the target block and get a fork
await this.worldState.syncImmediate(toBlock);
const lastBlockHash = await lastBlock.header.hash();
await this.worldState.syncImmediate(toBlock, lastBlockHash);

// Create a processor factory
const publicProcessorFactory = new PublicProcessorFactory(
Expand Down
7 changes: 4 additions & 3 deletions yarn-project/stdlib/src/interfaces/world_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { PromiseWithResolvers } from '@aztec/foundation/promise';

import { z } from 'zod';

import type { BlockHash } from '../block/block_hash.js';
import type { SnapshotDataKeys } from '../snapshots/types.js';
import type { MerkleTreeReadOperations, MerkleTreeWriteOperations } from './merkle_tree_operations.js';

Expand Down Expand Up @@ -80,12 +81,12 @@ export interface WorldStateSynchronizer extends ReadonlyWorldStateAccess, ForkMe
resumeSync(): void;

/**
* Forces an immediate sync to an optionally provided minimum block number
* Forces an immediate sync to an optionally provided minimum block number.
* @param targetBlockNumber - The target block number that we must sync to. Will download unproven blocks if needed to reach it.
* @param skipThrowIfTargetNotReached - Whether to skip throwing if the target block number is not reached.
* @param blockHash - If provided, verifies the block at targetBlockNumber matches this hash. On mismatch, triggers a resync (reorg detection).
* @returns A promise that resolves with the block number the world state was synced to
*/
syncImmediate(minBlockNumber?: BlockNumber, skipThrowIfTargetNotReached?: boolean): Promise<BlockNumber>;
syncImmediate(minBlockNumber?: BlockNumber, blockHash?: BlockHash): Promise<BlockNumber>;

/** Deletes the db */
clear(): Promise<void>;
Expand Down
8 changes: 4 additions & 4 deletions yarn-project/txe/src/state_machine/synchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { NUMBER_OF_L1_L2_MESSAGES_PER_ROLLUP } from '@aztec/constants';
import { BlockNumber } from '@aztec/foundation/branded-types';
import { Fr } from '@aztec/foundation/curves/bn254';
import type { L2Block } from '@aztec/stdlib/block';
import type { BlockHash, L2Block } from '@aztec/stdlib/block';
import type {
MerkleTreeReadOperations,
MerkleTreeWriteOperations,
Expand Down Expand Up @@ -33,12 +33,12 @@ export class TXESynchronizer implements WorldStateSynchronizer {
}

/**
* Forces an immediate sync to an optionally provided minimum block number
* Forces an immediate sync to an optionally provided minimum block number.
* @param targetBlockNumber - The target block number that we must sync to. Will download unproven blocks if needed to reach it.
* @param skipThrowIfTargetNotReached - Whether to skip throwing if the target block number is not reached.
* @param blockHash - If provided, verifies the block at targetBlockNumber matches this hash.
* @returns A promise that resolves with the block number the world state was synced to
*/
public syncImmediate(_minBlockNumber?: BlockNumber, _skipThrowIfTargetNotReached?: boolean): Promise<BlockNumber> {
public syncImmediate(_minBlockNumber?: BlockNumber, _blockHash?: BlockHash): Promise<BlockNumber> {
return Promise.resolve(this.blockNumber);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,44 @@ describe('ServerWorldStateSynchronizer', () => {
await expect(server.syncImmediate(BlockNumber(8))).rejects.toThrow(/unable to sync/i);
});

it('returns early when blockHash matches at target block number', async () => {
void server.start();
await pushBlocks(1, 5);

const hashFr = Fr.random();
merkleTreeRead.getLeafValue.mockResolvedValue(hashFr);

await server.syncImmediate(BlockNumber(4), new BlockHash(hashFr));
expect(l2BlockStream.sync).not.toHaveBeenCalled();
});

it('triggers resync when blockHash mismatches at target block number', async () => {
void server.start();
await pushBlocks(1, 5);

const correctHash = new BlockHash(new Fr(123n));
// Return a hash that won't match the provided one on the first call (pre-sync)
merkleTreeRead.getLeafValue.mockResolvedValueOnce(new Fr(42n)); // mismatch
// Return the correct hash after sync
merkleTreeRead.getLeafValue.mockResolvedValueOnce(new Fr(123n)); // match

await server.syncImmediate(BlockNumber(4), correctHash);
expect(l2BlockStream.sync).toHaveBeenCalled();
});

it('throws when blockHash mismatches after sync', async () => {
void server.start();
await pushBlocks(1, 5);

l2BlockStream.sync.mockImplementation(() => pushBlocks(6, 8));

// Return wrong hash both before and after sync
merkleTreeRead.getLeafValue.mockResolvedValue(new Fr(999n));
const expectedHash = new BlockHash(new Fr(888n));

await expect(server.syncImmediate(BlockNumber(7), expectedHash)).rejects.toThrow(/block hash mismatch/i);
});

it('throws if you try to immediate sync when not running', async () => {
await expect(server.syncImmediate(BlockNumber(3))).rejects.toThrow(/is not running/i);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { type Logger, createLogger } from '@aztec/foundation/log';
import { promiseWithResolvers } from '@aztec/foundation/promise';
import { elapsed } from '@aztec/foundation/timer';
import {
type BlockHash,
GENESIS_CHECKPOINT_HEADER_HASH,
type L2Block,
type L2BlockId,
Expand Down Expand Up @@ -177,13 +178,10 @@ export class ServerWorldStateSynchronizer
/**
* Forces an immediate sync.
* @param targetBlockNumber - The target block number that we must sync to. Will download unproven blocks if needed to reach it.
* @param skipThrowIfTargetNotReached - Whether to skip throwing if the target block number is not reached.
* @param blockHash - If provided, verifies the block at targetBlockNumber matches this hash. On mismatch, triggers a resync (reorg detection).
* @returns A promise that resolves with the block number the world state was synced to
*/
public async syncImmediate(
targetBlockNumber?: BlockNumber,
skipThrowIfTargetNotReached?: boolean,
): Promise<BlockNumber> {
public async syncImmediate(targetBlockNumber?: BlockNumber, blockHash?: BlockHash): Promise<BlockNumber> {
if (this.currentState !== WorldStateRunningState.RUNNING) {
throw new Error(`World State is not running. Unable to perform sync.`);
}
Expand All @@ -195,7 +193,19 @@ export class ServerWorldStateSynchronizer
// If we have been given a block number to sync to and we have reached that number then return
const currentBlockNumber = await this.getLatestBlockNumber();
if (targetBlockNumber !== undefined && targetBlockNumber <= currentBlockNumber) {
return currentBlockNumber;
if (blockHash === undefined) {
return currentBlockNumber;
}

// If a block hash was provided, verify we're on the expected fork
const currentHash = await this.getL2BlockHash(targetBlockNumber);
if (currentHash === blockHash.toString()) {
return currentBlockNumber;
}
// Hash mismatch: a reorg may have occurred, fall through to trigger sync
this.log.debug(
`World state block hash mismatch at ${targetBlockNumber} (expected ${blockHash}, got ${currentHash}). Triggering resync.`,
);
}
this.log.debug(`World State at ${currentBlockNumber} told to sync to ${targetBlockNumber ?? 'latest'}`);

Expand All @@ -213,7 +223,7 @@ export class ServerWorldStateSynchronizer

// If we have been given a block number to sync to and we have not reached that number then fail
const updatedBlockNumber = await this.getLatestBlockNumber();
if (!skipThrowIfTargetNotReached && targetBlockNumber !== undefined && targetBlockNumber > updatedBlockNumber) {
if (targetBlockNumber !== undefined && targetBlockNumber > updatedBlockNumber) {
throw new WorldStateSynchronizerError(
`Unable to sync to block number ${targetBlockNumber} (last synced is ${updatedBlockNumber})`,
{
Expand All @@ -227,6 +237,24 @@ export class ServerWorldStateSynchronizer
);
}

// If a block hash was provided, verify we're on the expected fork after syncing, throw otherwise
if (blockHash !== undefined && targetBlockNumber !== undefined) {
const updatedHash = await this.getL2BlockHash(targetBlockNumber);
if (updatedHash !== blockHash.toString()) {
throw new WorldStateSynchronizerError(
`Block hash mismatch at block ${targetBlockNumber} (expected ${blockHash} but got ${updatedHash})`,
{
cause: {
reason: 'block_hash_mismatch',
targetBlockNumber,
expectedHash: blockHash.toString(),
actualHash: updatedHash,
},
},
);
}
}

return updatedBlockNumber;
}

Expand Down
Loading