diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 9dfbf883fa4d..a91379a0c34f 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -108,7 +108,12 @@ import { getTelemetryClient, trackSpan, } from '@aztec/telemetry-client'; -import { NodeKeystoreAdapter, ValidatorClient, createValidatorClient } from '@aztec/validator-client'; +import { + NodeKeystoreAdapter, + ValidatorClient, + createBlockProposalHandler, + createValidatorClient, +} from '@aztec/validator-client'; import { createWorldStateSynchronizer } from '@aztec/world-state'; import { createPublicClient, fallback, http } from 'viem'; @@ -336,6 +341,21 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { } } + // If there's no validator client but alwaysReexecuteBlockProposals is enabled, + // create a BlockProposalHandler to reexecute block proposals for monitoring + if (!validatorClient && config.alwaysReexecuteBlockProposals) { + log.info('Setting up block proposal reexecution for monitoring'); + createBlockProposalHandler(config, { + blockBuilder, + epochCache, + blockSource: archiver, + l1ToL2MessageSource: archiver, + p2pClient, + dateProvider, + telemetry, + }).registerForReexecution(p2pClient); + } + // Start world state and wait for it to sync to the archiver. await worldStateSynchronizer.start(); diff --git a/yarn-project/cli/src/config/chain_l2_config.ts b/yarn-project/cli/src/config/chain_l2_config.ts index 3c0c2b0ed0c1..5d6c564772e3 100644 --- a/yarn-project/cli/src/config/chain_l2_config.ts +++ b/yarn-project/cli/src/config/chain_l2_config.ts @@ -63,7 +63,7 @@ const DefaultSlashConfig = { slashProposeInvalidAttestationsPenalty: DefaultL1ContractsConfig.slashAmountLarge, slashAttestDescendantOfInvalidPenalty: DefaultL1ContractsConfig.slashAmountLarge, slashUnknownPenalty: DefaultL1ContractsConfig.slashAmountSmall, - slashBroadcastedInvalidBlockPenalty: DefaultL1ContractsConfig.slashAmountMedium, + slashBroadcastedInvalidBlockPenalty: 0n, // DefaultL1ContractsConfig.slashAmountSmall // Disabled until further testing slashMaxPayloadSize: 50, slashGracePeriodL2Slots: 32 * 2, // Two epochs from genesis slashOffenseExpirationRounds: 8, @@ -142,7 +142,7 @@ export const stagingIgnitionL2ChainConfig: L2ChainConfig = { slashProposeInvalidAttestationsPenalty: 50_000n * 10n ** 18n, slashAttestDescendantOfInvalidPenalty: 50_000n * 10n ** 18n, slashUnknownPenalty: 2_000n * 10n ** 18n, - slashBroadcastedInvalidBlockPenalty: 10_000n * 10n ** 18n, + slashBroadcastedInvalidBlockPenalty: 0n, // 10_000n * 10n ** 18n, Disabled for now until further testing slashMaxPayloadSize: 50, slashGracePeriodL2Slots: 32 * 4, // One round from genesis slashOffenseExpirationRounds: 8, diff --git a/yarn-project/end-to-end/src/e2e_p2p/gossip_network.test.ts b/yarn-project/end-to-end/src/e2e_p2p/gossip_network.test.ts index b2c2881e58aa..0762f9d86901 100644 --- a/yarn-project/end-to-end/src/e2e_p2p/gossip_network.test.ts +++ b/yarn-project/end-to-end/src/e2e_p2p/gossip_network.test.ts @@ -1,5 +1,5 @@ import type { Archiver } from '@aztec/archiver'; -import type { AztecNodeService } from '@aztec/aztec-node'; +import type { AztecNodeConfig, AztecNodeService } from '@aztec/aztec-node'; import { SentTx, retryUntil, sleep } from '@aztec/aztec.js'; import type { ProverNode } from '@aztec/prover-node'; import type { SequencerClient } from '@aztec/sequencer-client'; @@ -12,7 +12,12 @@ import os from 'os'; import path from 'path'; import { shouldCollectMetrics } from '../fixtures/fixtures.js'; -import { ATTESTER_PRIVATE_KEYS_START_INDEX, createNodes, createProverNode } from '../fixtures/setup_p2p_test.js'; +import { + ATTESTER_PRIVATE_KEYS_START_INDEX, + createNodes, + createNonValidatorNode, + createProverNode, +} from '../fixtures/setup_p2p_test.js'; import { AlertChecker, type AlertConfig } from '../quality_of_service/alert_checker.js'; import { P2PNetworkTest, SHORTENED_BLOCK_TIME_CONFIG_NO_PRUNES, WAIT_FOR_TX_TIMEOUT } from './p2p_network.js'; import { submitTransactions } from './shared.js'; @@ -42,6 +47,7 @@ describe('e2e_p2p_network', () => { let t: P2PNetworkTest; let nodes: AztecNodeService[]; let proverNode: ProverNode; + let monitoringNode: AztecNodeService; beforeEach(async () => { t = await P2PNetworkTest.create({ @@ -66,6 +72,9 @@ describe('e2e_p2p_network', () => { afterEach(async () => { await tryStop(proverNode); + fs.rmSync(`${DATA_DIR}-prover`, { recursive: true, force: true, maxRetries: 3 }); + await tryStop(monitoringNode); + fs.rmSync(`${DATA_DIR}-monitor`, { recursive: true, force: true, maxRetries: 3 }); await t.stopNodes(nodes); await t.teardown(); for (let i = 0; i < NUM_VALIDATORS; i++) { @@ -93,7 +102,7 @@ describe('e2e_p2p_network', () => { // should be set so that the only way for rollups to be built // is if the txs are successfully gossiped around the nodes. const txsSentViaDifferentNodes: SentTx[][] = []; - t.logger.info('Creating nodes'); + t.logger.info('Creating validator nodes'); nodes = await createNodes( t.ctx.aztecNodeConfig, t.ctx.dateProvider, @@ -107,6 +116,7 @@ describe('e2e_p2p_network', () => { ); // create a prover node that uses p2p only (not rpc) to gather txs to test prover tx collection + t.logger.warn(`Creating prover node`); proverNode = await createProverNode( t.ctx.aztecNodeConfig, BOOT_NODE_UDP_PORT + NUM_VALIDATORS + 1, @@ -119,6 +129,18 @@ describe('e2e_p2p_network', () => { ); await proverNode.start(); + t.logger.warn(`Creating non validator node`); + const monitoringNodeConfig: AztecNodeConfig = { ...t.ctx.aztecNodeConfig, alwaysReexecuteBlockProposals: true }; + monitoringNode = await createNonValidatorNode( + monitoringNodeConfig, + t.ctx.dateProvider, + BOOT_NODE_UDP_PORT + NUM_VALIDATORS + 2, + t.bootstrapNodeEnr, + t.prefilledPublicData, + `${DATA_DIR}-monitor`, + shouldCollectMetrics(), + ); + // wait a bit for peers to discover each other await sleep(8000); diff --git a/yarn-project/end-to-end/src/fixtures/setup_p2p_test.ts b/yarn-project/end-to-end/src/fixtures/setup_p2p_test.ts index fc20439c5752..dbbd8550cf1a 100644 --- a/yarn-project/end-to-end/src/fixtures/setup_p2p_test.ts +++ b/yarn-project/end-to-end/src/fixtures/setup_p2p_test.ts @@ -78,7 +78,7 @@ export async function createNodes( return nodes; } -// creates a P2P enabled instance of Aztec Node Service +/** Creates a P2P enabled instance of Aztec Node Service with a validator */ export async function createNode( config: AztecNodeConfig & { dontStartSequencer?: boolean }, dateProvider: DateProvider, @@ -102,6 +102,31 @@ export async function createNode( return loggerIdStorage ? await loggerIdStorage.run(tcpPort.toString(), createNode) : createNode(); } +/** Creates a P2P enabled instance of Aztec Node Service without a validator */ +export async function createNonValidatorNode( + baseConfig: AztecNodeConfig, + dateProvider: DateProvider, + tcpPort: number, + bootstrapNode: string | undefined, + prefilledPublicData?: PublicDataTreeLeaf[], + dataDirectory?: string, + metricsPort?: number, + loggerIdStorage?: AsyncLocalStorage, +) { + const createNode = async () => { + const p2pConfig = await createP2PConfig(baseConfig, bootstrapNode, tcpPort, dataDirectory); + const config: AztecNodeConfig = { + ...p2pConfig, + disableValidator: true, + validatorPrivateKeys: undefined, + publisherPrivateKeys: [], + }; + const telemetry = getEndToEndTestTelemetryClient(metricsPort); + return await AztecNodeService.createAndSync(config, { telemetry, dateProvider }, { prefilledPublicData }); + }; + return loggerIdStorage ? await loggerIdStorage.run(tcpPort.toString(), createNode) : createNode(); +} + export async function createProverNode( config: AztecNodeConfig, tcpPort: number, @@ -117,14 +142,13 @@ export async function createProverNode( const proverNodePrivateKey = getPrivateKeyFromIndex(ATTESTER_PRIVATE_KEYS_START_INDEX + addressIndex)!; const telemetry = getEndToEndTestTelemetryClient(metricsPort); - const proverConfig: Partial = { - p2pIp: `127.0.0.1`, - p2pPort: tcpPort ?? (await getPort()), - p2pEnabled: true, - peerCheckIntervalMS: TEST_PEER_CHECK_INTERVAL_MS, - blockCheckIntervalMS: 1000, - bootstrapNodes: bootstrapNode ? [bootstrapNode] : [], - }; + const proverConfig: Partial = await createP2PConfig( + config, + bootstrapNode, + tcpPort, + dataDirectory, + ); + const aztecNodeRpcTxProvider = undefined; return await createAndSyncProverNode( bufferToHex(proverNodePrivateKey), @@ -138,20 +162,14 @@ export async function createProverNode( return loggerIdStorage ? await loggerIdStorage.run(tcpPort.toString(), createProverNode) : createProverNode(); } -export async function createValidatorConfig( +export async function createP2PConfig( config: AztecNodeConfig, bootstrapNodeEnr?: string, port?: number, - addressIndex: number = 1, dataDirectory?: string, ) { port = port ?? (await getPort()); - const attesterPrivateKey = bufferToHex(getPrivateKeyFromIndex(ATTESTER_PRIVATE_KEYS_START_INDEX + addressIndex)!); - - config.validatorPrivateKeys = new SecretValue([attesterPrivateKey]); - config.publisherPrivateKeys = [new SecretValue(attesterPrivateKey)]; - const nodeConfig: AztecNodeConfig = { ...config, p2pIp: `127.0.0.1`, @@ -165,3 +183,22 @@ export async function createValidatorConfig( return nodeConfig; } + +export async function createValidatorConfig( + config: AztecNodeConfig, + bootstrapNodeEnr?: string, + port?: number, + addressIndex: number = 1, + dataDirectory?: string, +) { + const attesterPrivateKey = bufferToHex(getPrivateKeyFromIndex(ATTESTER_PRIVATE_KEYS_START_INDEX + addressIndex)!); + const p2pConfig = await createP2PConfig(config, bootstrapNodeEnr, port, dataDirectory); + const nodeConfig: AztecNodeConfig = { + ...config, + ...p2pConfig, + validatorPrivateKeys: new SecretValue([attesterPrivateKey]), + publisherPrivateKeys: [new SecretValue(attesterPrivateKey)], + }; + + return nodeConfig; +} diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 8fa354247c93..959e66794030 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -284,6 +284,7 @@ export type EnvVar = | 'K8S_POD_UID' | 'K8S_NAMESPACE_NAME' | 'VALIDATOR_REEXECUTE_DEADLINE_MS' + | 'ALWAYS_REEXECUTE_BLOCK_PROPOSALS' | 'AUTO_UPDATE' | 'AUTO_UPDATE_URL' | 'WEB3_SIGNER_URL'; diff --git a/yarn-project/slasher/src/config.ts b/yarn-project/slasher/src/config.ts index 114de841c75e..d6ee9a4f52a3 100644 --- a/yarn-project/slasher/src/config.ts +++ b/yarn-project/slasher/src/config.ts @@ -83,7 +83,7 @@ export const slasherConfigMappings: ConfigMappingsType = { }, slashBroadcastedInvalidBlockPenalty: { env: 'SLASH_INVALID_BLOCK_PENALTY', - description: 'Penalty amount for slashing a validator for an invalid block.', + description: 'Penalty amount for slashing a validator for an invalid block proposed via p2p.', ...bigintConfigHelper(DefaultSlasherConfig.slashBroadcastedInvalidBlockPenalty), }, slashInactivityTargetPercentage: { diff --git a/yarn-project/stdlib/src/interfaces/validator.ts b/yarn-project/stdlib/src/interfaces/validator.ts index c401114a1c84..cb137bd54a5e 100644 --- a/yarn-project/stdlib/src/interfaces/validator.ts +++ b/yarn-project/stdlib/src/interfaces/validator.ts @@ -32,11 +32,14 @@ export interface ValidatorClientConfig { /** Interval between polling for new attestations from peers */ attestationPollingIntervalMs: number; - /** Re-execute transactions before attesting */ + /** Whether to re-execute transactions in a block proposal before attesting */ validatorReexecute: boolean; /** Will re-execute until this many milliseconds are left in the slot */ validatorReexecuteDeadlineMs: number; + + /** Whether to always reexecute block proposals, even for non-validator nodes or when out of the currnet committee */ + alwaysReexecuteBlockProposals?: boolean; } export type ValidatorClientFullConfig = ValidatorClientConfig & @@ -50,6 +53,7 @@ export const ValidatorClientConfigSchema = z.object({ attestationPollingIntervalMs: z.number().min(0), validatorReexecute: z.boolean(), validatorReexecuteDeadlineMs: z.number().min(0), + alwaysReexecuteBlockProposals: z.boolean().optional(), }) satisfies ZodFor>; export interface Validator { diff --git a/yarn-project/telemetry-client/src/attributes.ts b/yarn-project/telemetry-client/src/attributes.ts index 9fa073c471a6..e3c7971f1904 100644 --- a/yarn-project/telemetry-client/src/attributes.ts +++ b/yarn-project/telemetry-client/src/attributes.ts @@ -79,12 +79,14 @@ export const P2P_GOODBYE_REASON = 'aztec.p2p.goodbye.reason'; export const PROVING_JOB_TYPE = 'aztec.proving.job_type'; /** The proving job id */ export const PROVING_JOB_ID = 'aztec.proving.job_id'; - +/** Merkle tree name */ export const MERKLE_TREE_NAME = 'aztec.merkle_tree.name'; /** The prover-id in a root rollup proof. */ export const ROLLUP_PROVER_ID = 'aztec.rollup.prover_id'; /** Whether the proof submission was timed out (delayed more than 20 min) */ export const PROOF_TIMED_OUT = 'aztec.proof.timed_out'; +/** Status of the validator (eg proposer, in-committee, none) */ +export const VALIDATOR_STATUS = 'aztec.validator_status'; export const P2P_ID = 'aztec.p2p.id'; export const P2P_REQ_RESP_PROTOCOL = 'aztec.p2p.req_resp.protocol'; diff --git a/yarn-project/validator-client/src/block_proposal_handler.ts b/yarn-project/validator-client/src/block_proposal_handler.ts new file mode 100644 index 000000000000..d0b7a218b68e --- /dev/null +++ b/yarn-project/validator-client/src/block_proposal_handler.ts @@ -0,0 +1,314 @@ +import { INITIAL_L2_BLOCK_NUM } from '@aztec/constants'; +import { Fr } from '@aztec/foundation/fields'; +import { createLogger } from '@aztec/foundation/log'; +import { retryUntil } from '@aztec/foundation/retry'; +import { DateProvider, Timer } from '@aztec/foundation/timer'; +import type { P2P, PeerId } from '@aztec/p2p'; +import { TxProvider } from '@aztec/p2p'; +import { BlockProposalValidator } from '@aztec/p2p/msg_validators'; +import { computeInHashFromL1ToL2Messages } from '@aztec/prover-client/helpers'; +import type { L2BlockSource } from '@aztec/stdlib/block'; +import { getTimestampForSlot } from '@aztec/stdlib/epoch-helpers'; +import type { IFullNodeBlockBuilder, ValidatorClientFullConfig } from '@aztec/stdlib/interfaces/server'; +import type { L1ToL2MessageSource } from '@aztec/stdlib/messaging'; +import { type BlockProposal, ConsensusPayload } from '@aztec/stdlib/p2p'; +import { type FailedTx, GlobalVariables, type Tx } from '@aztec/stdlib/tx'; +import { + ReExFailedTxsError, + ReExStateMismatchError, + ReExTimeoutError, + TransactionsNotAvailableError, +} from '@aztec/stdlib/validators'; +import { type TelemetryClient, type Tracer, getTelemetryClient } from '@aztec/telemetry-client'; + +import type { ValidatorMetrics } from './metrics.js'; + +export type BlockProposalValidationFailureReason = + | 'invalid_proposal' + | 'parent_block_not_found' + | 'parent_block_does_not_match' + | 'in_hash_mismatch' + | 'block_number_already_exists' + | 'txs_not_available' + | 'state_mismatch' + | 'failed_txs' + | 'timeout' + | 'unknown_error'; + +export interface BlockProposalValidationResult { + isValid: boolean; + reason?: BlockProposalValidationFailureReason; + reexecutionResult?: { + block: any; + failedTxs: FailedTx[]; + reexecutionTimeMs: number; + totalManaUsed: number; + }; +} + +export class BlockProposalHandler { + public readonly tracer: Tracer; + + constructor( + private blockBuilder: IFullNodeBlockBuilder, + private blockSource: L2BlockSource, + private l1ToL2MessageSource: L1ToL2MessageSource, + private txProvider: TxProvider, + private blockProposalValidator: BlockProposalValidator, + private config: ValidatorClientFullConfig, + private metrics?: ValidatorMetrics, + private dateProvider: DateProvider = new DateProvider(), + telemetry: TelemetryClient = getTelemetryClient(), + private log = createLogger('validator:block-proposal-handler'), + ) { + this.tracer = telemetry.getTracer('BlockProposalHandler'); + } + + registerForReexecution(p2pClient: P2P): BlockProposalHandler { + const handler = async (proposal: BlockProposal, proposalSender: PeerId) => { + try { + const result = await this.handleBlockProposal(proposal, proposalSender, true); + if (result.isValid && result.reexecutionResult) { + this.log.info(`Non-validator reexecution completed for slot ${proposal.slotNumber.toBigInt()}`, { + blockNumber: proposal.blockNumber, + reexecutionTimeMs: result.reexecutionResult.reexecutionTimeMs, + totalManaUsed: result.reexecutionResult.totalManaUsed, + numTxs: result.reexecutionResult.block?.body?.txEffects?.length ?? 0, + }); + } else { + this.log.warn(`Non-validator reexecution failed for slot ${proposal.slotNumber.toBigInt()}`, { + blockNumber: proposal.blockNumber, + reason: result.reason, + }); + } + } catch (error) { + this.log.error('Error processing block proposal in non-validator handler', error); + } + return undefined; // Non-validator nodes don't return attestations + }; + + p2pClient.registerBlockProposalHandler(handler); + return this; + } + + async handleBlockProposal( + proposal: BlockProposal, + proposalSender: PeerId, + shouldReexecute: boolean, + ): Promise { + const slotNumber = proposal.slotNumber.toBigInt(); + const blockNumber = proposal.blockNumber; + const proposer = proposal.getSender(); + + const proposalInfo = { ...proposal.toBlockInfo(), proposer: proposer.toString() }; + this.log.info(`Processing proposal for slot ${slotNumber}`, { + ...proposalInfo, + txHashes: proposal.txHashes.map(t => t.toString()), + }); + + // Check that the proposal is from the current proposer, or the next proposer + // This should have been handled by the p2p layer, but we double check here out of caution + const invalidProposal = await this.blockProposalValidator.validate(proposal); + if (invalidProposal) { + this.log.warn(`Proposal is not valid, skipping processing`, proposalInfo); + return { isValid: false, reason: 'invalid_proposal' }; + } + + // Collect txs from the proposal. We start doing this as early as possible, + // and we do it even if we don't plan to re-execute the txs, so that we have them + // if another node needs them. + const config = this.blockBuilder.getConfig(); + const { txs, missingTxs } = await this.txProvider.getTxsForBlockProposal(proposal, { + pinnedPeer: proposalSender, + deadline: this.getReexecutionDeadline(proposal, config), + }); + + // Check that the parent proposal is a block we know, otherwise reexecution would fail + if (blockNumber > INITIAL_L2_BLOCK_NUM) { + const deadline = this.getReexecutionDeadline(proposal, config); + const currentTime = this.dateProvider.now(); + const timeoutDurationMs = deadline.getTime() - currentTime; + const parentBlock = + timeoutDurationMs <= 0 + ? undefined + : await retryUntil( + async () => { + const block = await this.blockSource.getBlock(blockNumber - 1); + if (block) { + return block; + } + await this.blockSource.syncImmediate(); + return await this.blockSource.getBlock(blockNumber - 1); + }, + 'Force Archiver Sync', + timeoutDurationMs / 1000, + 0.5, + ); + + if (parentBlock === undefined) { + this.log.warn(`Parent block for ${blockNumber} not found, skipping processing`, proposalInfo); + return { isValid: false, reason: 'parent_block_not_found' }; + } + + if (!proposal.payload.header.lastArchiveRoot.equals(parentBlock.archive.root)) { + this.log.warn(`Parent block archive root for proposal does not match, skipping processing`, { + proposalLastArchiveRoot: proposal.payload.header.lastArchiveRoot.toString(), + parentBlockArchiveRoot: parentBlock.archive.root.toString(), + ...proposalInfo, + }); + return { isValid: false, reason: 'parent_block_does_not_match' }; + } + } + + // Check that I have the same set of l1ToL2Messages as the proposal + const l1ToL2Messages = await this.l1ToL2MessageSource.getL1ToL2Messages(blockNumber); + const computedInHash = await computeInHashFromL1ToL2Messages(l1ToL2Messages); + const proposalInHash = proposal.payload.header.contentCommitment.inHash; + if (!computedInHash.equals(proposalInHash)) { + this.log.warn(`L1 to L2 messages in hash mismatch, skipping processing`, { + proposalInHash: proposalInHash.toString(), + computedInHash: computedInHash.toString(), + ...proposalInfo, + }); + return { isValid: false, reason: 'in_hash_mismatch' }; + } + + // Check that this block number does not exist already + const existingBlock = await this.blockSource.getBlockHeader(blockNumber); + if (existingBlock) { + this.log.warn(`Block number ${blockNumber} already exists, skipping processing`, proposalInfo); + return { isValid: false, reason: 'block_number_already_exists' }; + } + + // Check that all of the transactions in the proposal are available + if (missingTxs.length > 0) { + this.log.warn(`Missing ${missingTxs.length} txs to process proposal`, { ...proposalInfo, missingTxs }); + return { isValid: false, reason: 'txs_not_available' }; + } + + // Try re-executing the transactions in the proposal if needed + let reexecutionResult; + if (shouldReexecute) { + try { + this.log.verbose(`Re-executing transactions in the proposal`, proposalInfo); + reexecutionResult = await this.reexecuteTransactions(proposal, txs, l1ToL2Messages); + } catch (error) { + this.log.error(`Error reexecuting txs while processing block proposal`, error, proposalInfo); + const reason = this.getReexecuteFailureReason(error); + return { isValid: false, reason, reexecutionResult }; + } + } + + this.log.info(`Successfully processed proposal for slot ${slotNumber}`, proposalInfo); + return { isValid: true, reexecutionResult }; + } + + private getReexecutionDeadline( + proposal: BlockProposal, + config: { l1GenesisTime: bigint; slotDuration: number }, + ): Date { + const nextSlotTimestampSeconds = Number(getTimestampForSlot(proposal.slotNumber.toBigInt() + 1n, config)); + const msNeededForPropagationAndPublishing = this.config.validatorReexecuteDeadlineMs; + return new Date(nextSlotTimestampSeconds * 1000 - msNeededForPropagationAndPublishing); + } + + private getReexecuteFailureReason(err: any) { + if (err instanceof ReExStateMismatchError) { + return 'state_mismatch'; + } else if (err instanceof ReExFailedTxsError) { + return 'failed_txs'; + } else if (err instanceof ReExTimeoutError) { + return 'timeout'; + } else if (err instanceof Error) { + return 'unknown_error'; + } + } + + async reexecuteTransactions( + proposal: BlockProposal, + txs: Tx[], + l1ToL2Messages: Fr[], + ): Promise<{ + block: any; + failedTxs: FailedTx[]; + reexecutionTimeMs: number; + totalManaUsed: number; + }> { + const { header } = proposal.payload; + const { txHashes } = proposal; + + // If we do not have all of the transactions, then we should fail + if (txs.length !== txHashes.length) { + const foundTxHashes = txs.map(tx => tx.getTxHash()); + const missingTxHashes = txHashes.filter(txHash => !foundTxHashes.includes(txHash)); + throw new TransactionsNotAvailableError(missingTxHashes); + } + + // Use the sequencer's block building logic to re-execute the transactions + const timer = new Timer(); + const config = this.blockBuilder.getConfig(); + + // We source most global variables from the proposal + const globalVariables = GlobalVariables.from({ + slotNumber: proposal.payload.header.slotNumber, // checked in the block proposal validator + coinbase: proposal.payload.header.coinbase, // set arbitrarily by the proposer + feeRecipient: proposal.payload.header.feeRecipient, // set arbitrarily by the proposer + gasFees: proposal.payload.header.gasFees, // validated by the rollup contract + blockNumber: proposal.blockNumber, // checked blockNumber-1 exists in archiver but blockNumber doesnt + timestamp: header.timestamp, // checked in the rollup contract against the slot number + chainId: new Fr(config.l1ChainId), + version: new Fr(config.rollupVersion), + }); + + const { block, failedTxs } = await this.blockBuilder.buildBlock(txs, l1ToL2Messages, globalVariables, { + deadline: this.getReexecutionDeadline(proposal, config), + }); + + const numFailedTxs = failedTxs.length; + const slot = proposal.slotNumber; + this.log.verbose(`Transaction re-execution complete for slot ${slot}`, { + numFailedTxs, + numProposalTxs: txHashes.length, + numProcessedTxs: block.body.txEffects.length, + slot, + }); + + if (numFailedTxs > 0) { + this.metrics?.recordFailedReexecution(proposal); + throw new ReExFailedTxsError(numFailedTxs); + } + + if (block.body.txEffects.length !== txHashes.length) { + this.metrics?.recordFailedReexecution(proposal); + throw new ReExTimeoutError(); + } + + // Throw a ReExStateMismatchError error if state updates do not match + const blockPayload = ConsensusPayload.fromBlock(block); + if (!blockPayload.equals(proposal.payload)) { + this.log.warn(`Re-execution state mismatch for slot ${slot}`, { + expected: blockPayload.toInspect(), + actual: proposal.payload.toInspect(), + }); + this.metrics?.recordFailedReexecution(proposal); + throw new ReExStateMismatchError( + proposal.archive, + block.archive.root, + proposal.payload.stateReference, + block.header.state, + ); + } + + const reexecutionTimeMs = timer.ms(); + const totalManaUsed = block.header.totalManaUsed.toNumber() / 1e6; + + this.metrics?.recordReex(reexecutionTimeMs, txs.length, totalManaUsed); + + return { + block, + failedTxs, + reexecutionTimeMs, + totalManaUsed, + }; + } +} diff --git a/yarn-project/validator-client/src/config.ts b/yarn-project/validator-client/src/config.ts index 0fbfd69c852e..2b7125f33dbd 100644 --- a/yarn-project/validator-client/src/config.ts +++ b/yarn-project/validator-client/src/config.ts @@ -58,6 +58,12 @@ export const validatorClientConfigMappings: ConfigMappingsType, + config: ValidatorClientFullConfig, deps: { blockBuilder: IFullNodeBlockBuilder; p2pClient: P2PClient; diff --git a/yarn-project/validator-client/src/index.ts b/yarn-project/validator-client/src/index.ts index 056a9badd892..21314d97da4b 100644 --- a/yarn-project/validator-client/src/index.ts +++ b/yarn-project/validator-client/src/index.ts @@ -1,3 +1,4 @@ +export * from './block_proposal_handler.js'; export * from './config.js'; export * from './factory.js'; export * from './validator.js'; diff --git a/yarn-project/validator-client/src/metrics.ts b/yarn-project/validator-client/src/metrics.ts index 25e0c26593f1..5f0b73e80fb9 100644 --- a/yarn-project/validator-client/src/metrics.ts +++ b/yarn-project/validator-client/src/metrics.ts @@ -72,9 +72,10 @@ export class ValidatorMetrics { this.attestationsCount.add(num); } - public incFailedAttestations(num: number, reason: string) { + public incFailedAttestations(num: number, reason: string, inCommittee: boolean) { this.failedAttestationsCount.add(num, { [Attributes.ERROR_TYPE]: reason, + [Attributes.VALIDATOR_STATUS]: inCommittee ? 'in-committee' : 'none', }); } } diff --git a/yarn-project/validator-client/src/validator.ts b/yarn-project/validator-client/src/validator.ts index 1a5b54c12831..2dff8c65d847 100644 --- a/yarn-project/validator-client/src/validator.ts +++ b/yarn-project/validator-client/src/validator.ts @@ -1,18 +1,14 @@ -import { INITIAL_L2_BLOCK_NUM } from '@aztec/constants'; import type { EpochCache } from '@aztec/epoch-cache'; import type { EthAddress } from '@aztec/foundation/eth-address'; import type { Signature } from '@aztec/foundation/eth-signature'; import { Fr } from '@aztec/foundation/fields'; import { type Logger, createLogger } from '@aztec/foundation/log'; -import { retryUntil } from '@aztec/foundation/retry'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { sleep } from '@aztec/foundation/sleep'; -import { DateProvider, Timer } from '@aztec/foundation/timer'; +import { DateProvider } from '@aztec/foundation/timer'; import type { KeystoreManager } from '@aztec/node-keystore'; -import type { P2P, PeerId } from '@aztec/p2p'; -import { AuthRequest, AuthResponse, ReqRespSubProtocol, TxProvider } from '@aztec/p2p'; -import { BlockProposalValidator } from '@aztec/p2p/msg_validators'; -import { computeInHashFromL1ToL2Messages } from '@aztec/prover-client/helpers'; +import type { P2P, PeerId, TxProvider } from '@aztec/p2p'; +import { AuthRequest, AuthResponse, BlockProposalValidator, ReqRespSubProtocol } from '@aztec/p2p'; import { OffenseType, type SlasherConfig, @@ -22,29 +18,18 @@ import { } from '@aztec/slasher'; import type { AztecAddress } from '@aztec/stdlib/aztec-address'; import type { CommitteeAttestationsAndSigners, L2BlockSource } from '@aztec/stdlib/block'; -import { getTimestampForSlot } from '@aztec/stdlib/epoch-helpers'; import type { IFullNodeBlockBuilder, Validator, ValidatorClientFullConfig } from '@aztec/stdlib/interfaces/server'; import type { L1ToL2MessageSource } from '@aztec/stdlib/messaging'; -import { - type BlockAttestation, - type BlockProposal, - type BlockProposalOptions, - ConsensusPayload, -} from '@aztec/stdlib/p2p'; +import type { BlockAttestation, BlockProposal, BlockProposalOptions } from '@aztec/stdlib/p2p'; import type { CheckpointHeader } from '@aztec/stdlib/rollup'; -import { GlobalVariables, type StateReference, type Tx } from '@aztec/stdlib/tx'; -import { - AttestationTimeoutError, - ReExFailedTxsError, - ReExStateMismatchError, - ReExTimeoutError, - TransactionsNotAvailableError, -} from '@aztec/stdlib/validators'; +import type { StateReference, Tx } from '@aztec/stdlib/tx'; +import { AttestationTimeoutError } from '@aztec/stdlib/validators'; import { type TelemetryClient, type Tracer, getTelemetryClient } from '@aztec/telemetry-client'; import { EventEmitter } from 'events'; import type { TypedDataDefinition } from 'viem'; +import { BlockProposalHandler, type BlockProposalValidationFailureReason } from './block_proposal_handler.js'; import type { ValidatorClientConfig } from './config.js'; import { ValidationService } from './duties/validation_service.js'; import { NodeKeystoreAdapter } from './key_store/node_keystore_adapter.js'; @@ -54,6 +39,12 @@ import { ValidatorMetrics } from './metrics.js'; // Just cap the set to avoid unbounded growth. const MAX_PROPOSERS_OF_INVALID_BLOCKS = 1000; +// What errors from the block proposal handler result in slashing +const SLASHABLE_BLOCK_PROPOSAL_VALIDATION_RESULT: BlockProposalValidationFailureReason[] = [ + 'state_mismatch', + 'failed_txs', +]; + /** * Validator Client */ @@ -71,18 +62,13 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) private lastEpochForCommitteeUpdateLoop: bigint | undefined; private epochCacheUpdateLoop: RunningPromise; - private blockProposalValidator: BlockProposalValidator; - private proposersOfInvalidBlocks: Set = new Set(); protected constructor( - private blockBuilder: IFullNodeBlockBuilder, private keyStore: NodeKeystoreAdapter, private epochCache: EpochCache, private p2pClient: P2P, - private blockSource: L2BlockSource, - private l1ToL2MessageSource: L1ToL2MessageSource, - private txProvider: TxProvider, + private blockProposalHandler: BlockProposalHandler, private config: ValidatorClientFullConfig, private dateProvider: DateProvider = new DateProvider(), telemetry: TelemetryClient = getTelemetryClient(), @@ -94,8 +80,6 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) this.validationService = new ValidationService(keyStore); - this.blockProposalValidator = new BlockProposalValidator(epochCache); - // Refresh epoch cache every second to trigger alert if participation in committee changes this.epochCacheUpdateLoop = new RunningPromise(this.handleEpochCommitteeUpdate.bind(this), log, 1000); @@ -167,14 +151,25 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) dateProvider: DateProvider = new DateProvider(), telemetry: TelemetryClient = getTelemetryClient(), ) { - const validator = new ValidatorClient( + const metrics = new ValidatorMetrics(telemetry); + const blockProposalValidator = new BlockProposalValidator(epochCache); + const blockProposalHandler = new BlockProposalHandler( blockBuilder, - NodeKeystoreAdapter.fromKeyStoreManager(keyStoreManager), - epochCache, - p2pClient, blockSource, l1ToL2MessageSource, txProvider, + blockProposalValidator, + config, + metrics, + dateProvider, + telemetry, + ); + + const validator = new ValidatorClient( + NodeKeystoreAdapter.fromKeyStoreManager(keyStoreManager), + epochCache, + p2pClient, + blockProposalHandler, config, dateProvider, telemetry, @@ -189,6 +184,15 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) .filter(addr => !this.config.disabledValidators.some(disabled => disabled.equals(addr))); } + public getBlockProposalHandler() { + return this.blockProposalHandler; + } + + // Proxy method for backwards compatibility with tests + public reExecuteTransactions(proposal: BlockProposal, txs: any[], l1ToL2Messages: Fr[]): Promise { + return this.blockProposalHandler.reexecuteTransactions(proposal, txs, l1ToL2Messages); + } + public signWithAddress(addr: EthAddress, msg: TypedDataDefinition) { return this.keyStore.signTypedDataWithAddress(addr, msg); } @@ -256,15 +260,12 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) async attestToProposal(proposal: BlockProposal, proposalSender: PeerId): Promise { const slotNumber = proposal.slotNumber.toBigInt(); - const blockNumber = proposal.blockNumber; const proposer = proposal.getSender(); // Check that I have any address in current committee before attesting const inCommittee = await this.epochCache.filterInCommittee(slotNumber, this.getValidatorAddresses()); const partOfCommittee = inCommittee.length > 0; - const incFailedAttestation = partOfCommittee - ? (reason: string) => this.metrics.incFailedAttestations(1, reason) - : () => {}; + const incFailedAttestation = (reason: string) => this.metrics.incFailedAttestations(1, reason, partOfCommittee); const proposalInfo = { ...proposal.toBlockInfo(), proposer: proposer.toString() }; this.log.info(`Received proposal for slot ${slotNumber}`, { @@ -272,117 +273,39 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) txHashes: proposal.txHashes.map(t => t.toString()), }); - // Collect txs from the proposal. Note that we do this before checking if we have an address in the - // current committee, since we want to collect txs anyway to facilitate propagation. - const config = this.blockBuilder.getConfig(); - const { txs, missingTxs } = await this.txProvider.getTxsForBlockProposal(proposal, { - pinnedPeer: proposalSender, - deadline: this.getReexecutionDeadline(proposal, config), - }); - - // Check that I have any address in current committee before attesting - if (!partOfCommittee) { - this.log.verbose(`No validator in the current committee, skipping attestation`, proposalInfo); - return undefined; - } - - // Check that the proposal is from the current proposer, or the next proposer. - // Q: Should this be moved to the block proposal validator, so we disregard proposals from anyone? - const invalidProposal = await this.blockProposalValidator.validate(proposal); - if (invalidProposal) { - this.log.warn(`Proposal is not valid, skipping attestation`, proposalInfo); - incFailedAttestation('invalid_proposal'); - return undefined; - } + // Reexecute txs if we are part of the committee so we can attest, or if slashing is enabled so we can slash + // invalid proposals even when not in the committee, or if we are configured to always reexecute for monitoring purposes. + const { validatorReexecute, slashBroadcastedInvalidBlockPenalty, alwaysReexecuteBlockProposals } = this.config; + const shouldReexecute = + (slashBroadcastedInvalidBlockPenalty > 0n && validatorReexecute) || + (partOfCommittee && validatorReexecute) || + alwaysReexecuteBlockProposals; + + const validationResult = await this.blockProposalHandler.handleBlockProposal( + proposal, + proposalSender, + !!shouldReexecute, + ); - // Check that the parent proposal is a block we know, otherwise reexecution would fail. - // Q: Should we move this to the block proposal validator? If there, then p2p would check it - // before re-broadcasting it. This means that proposals built on top of an L1-reorg'ed-out block - // would not be rebroadcasted. But it also means that nodes that have not fully synced would - // not rebroadcast the proposal. - if (blockNumber > INITIAL_L2_BLOCK_NUM) { - const deadline = this.getReexecutionDeadline(proposal, config); - const currentTime = this.dateProvider.now(); - const timeoutDurationMs = deadline.getTime() - currentTime; - const parentBlock = - timeoutDurationMs <= 0 - ? undefined - : await retryUntil( - async () => { - const block = await this.blockSource.getBlock(blockNumber - 1); - if (block) { - return block; - } - await this.blockSource.syncImmediate(); - return await this.blockSource.getBlock(blockNumber - 1); - }, - 'Force Archiver Sync', - timeoutDurationMs / 1000, // Continue retrying until the deadline - 0.5, // Retry every 500ms - ); - - if (parentBlock === undefined) { - this.log.warn(`Parent block for ${blockNumber} not found, skipping attestation`, proposalInfo); - incFailedAttestation('parent_block_not_found'); - return undefined; - } + if (!validationResult.isValid) { + this.log.warn(`Proposal validation failed: ${validationResult.reason}`, proposalInfo); + incFailedAttestation(validationResult.reason || 'unknown'); - if (!proposal.payload.header.lastArchiveRoot.equals(parentBlock.archive.root)) { - this.log.warn(`Parent block archive root for proposal does not match, skipping attestation`, { - proposalLastArchiveRoot: proposal.payload.header.lastArchiveRoot.toString(), - parentBlockArchiveRoot: parentBlock.archive.root.toString(), - ...proposalInfo, - }); - incFailedAttestation('parent_block_does_not_match'); - return undefined; + // Slash invalid block proposals + if ( + validationResult.reason && + SLASHABLE_BLOCK_PROPOSAL_VALIDATION_RESULT.includes(validationResult.reason) && + slashBroadcastedInvalidBlockPenalty > 0n + ) { + this.log.warn(`Slashing proposer for invalid block proposal`, proposalInfo); + this.slashInvalidBlock(proposal); } - } - - // Check that I have the same set of l1ToL2Messages as the proposal - // Q: Same as above, should this be part of p2p validation? - const l1ToL2Messages = await this.l1ToL2MessageSource.getL1ToL2Messages(blockNumber); - const computedInHash = await computeInHashFromL1ToL2Messages(l1ToL2Messages); - const proposalInHash = proposal.payload.header.contentCommitment.inHash; - if (!computedInHash.equals(proposalInHash)) { - this.log.warn(`L1 to L2 messages in hash mismatch, skipping attestation`, { - proposalInHash: proposalInHash.toString(), - computedInHash: computedInHash.toString(), - ...proposalInfo, - }); - incFailedAttestation('in_hash_mismatch'); return undefined; } - // Check that this block number does not exist already, otherwise the proposer could signal - // an arbitrary block number in the past, though this would most likely fail on the rollup contract. - const existingBlock = await this.blockSource.getBlockHeader(blockNumber); - if (existingBlock) { - this.log.warn(`Block number ${blockNumber} already exists, skipping attestation`, proposalInfo); - incFailedAttestation('block_number_already_exists'); - return undefined; - } - - // Check that all of the transactions in the proposal are available in the tx pool before attesting - if (missingTxs.length > 0) { - this.log.warn(`Missing ${missingTxs.length} txs to attest to proposal`, { ...proposalInfo, missingTxs }); - incFailedAttestation('TransactionsNotAvailableError'); - return undefined; - } - - // Try re-executing the transactions in the proposal - try { - this.log.verbose(`Processing attestation for slot ${slotNumber}`, proposalInfo); - if (this.config.validatorReexecute) { - this.log.verbose(`Re-executing transactions in the proposal before attesting`); - await this.reExecuteTransactions(proposal, txs, l1ToL2Messages); - } - } catch (error: any) { - incFailedAttestation(error instanceof Error ? error.name : 'unknown'); - this.log.error(`Error reexecuting txs while processing block proposal`, error, proposalInfo); - if (error instanceof ReExStateMismatchError && this.config.slashBroadcastedInvalidBlockPenalty > 0n) { - this.log.warn(`Slashing proposer for invalid block proposal`, proposalInfo); - this.slashInvalidBlock(proposal); - } + // Check that I have any address in current committee before attesting + if (!partOfCommittee) { + this.log.verbose(`No validator in the current committee, skipping attestation`, proposalInfo); return undefined; } @@ -391,84 +314,7 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) this.metrics.incAttestations(inCommittee.length); // If the above function does not throw an error, then we can attest to the proposal - return this.doAttestToProposal(proposal, inCommittee); - } - - private getReexecutionDeadline( - proposal: BlockProposal, - config: { l1GenesisTime: bigint; slotDuration: number }, - ): Date { - const nextSlotTimestampSeconds = Number(getTimestampForSlot(proposal.slotNumber.toBigInt() + 1n, config)); - const msNeededForPropagationAndPublishing = this.config.validatorReexecuteDeadlineMs; - return new Date(nextSlotTimestampSeconds * 1000 - msNeededForPropagationAndPublishing); - } - - /** - * Re-execute the transactions in the proposal and check that the state updates match the header state - * @param proposal - The proposal to re-execute - */ - async reExecuteTransactions(proposal: BlockProposal, txs: Tx[], l1ToL2Messages: Fr[]): Promise { - const { header } = proposal.payload; - const { txHashes } = proposal; - - // If we do not have all of the transactions, then we should fail - if (txs.length !== txHashes.length) { - const foundTxHashes = txs.map(tx => tx.getTxHash()); - const missingTxHashes = txHashes.filter(txHash => !foundTxHashes.includes(txHash)); - throw new TransactionsNotAvailableError(missingTxHashes); - } - - // Use the sequencer's block building logic to re-execute the transactions - const timer = new Timer(); - const config = this.blockBuilder.getConfig(); - - // We source most global variables from the proposal - const globalVariables = GlobalVariables.from({ - slotNumber: proposal.payload.header.slotNumber, // checked in the block proposal validator - coinbase: proposal.payload.header.coinbase, // set arbitrarily by the proposer - feeRecipient: proposal.payload.header.feeRecipient, // set arbitrarily by the proposer - gasFees: proposal.payload.header.gasFees, // validated by the rollup contract - blockNumber: proposal.blockNumber, // checked blockNumber-1 exists in archiver but blockNumber doesnt - timestamp: header.timestamp, // checked in the rollup contract against the slot number - chainId: new Fr(config.l1ChainId), - version: new Fr(config.rollupVersion), - }); - - const { block, failedTxs } = await this.blockBuilder.buildBlock(txs, l1ToL2Messages, globalVariables, { - deadline: this.getReexecutionDeadline(proposal, config), - }); - - this.log.verbose(`Transaction re-execution complete`); - const numFailedTxs = failedTxs.length; - - if (numFailedTxs > 0) { - this.metrics.recordFailedReexecution(proposal); - throw new ReExFailedTxsError(numFailedTxs); - } - - if (block.body.txEffects.length !== txHashes.length) { - this.metrics.recordFailedReexecution(proposal); - throw new ReExTimeoutError(); - } - - // Throw a ReExStateMismatchError error if state updates do not match. - // Note that we check the entire proposal payload here, since it could be inconsistent within itself, - // as in the archive root not being actually derived by its other tree roots. - if (!ConsensusPayload.fromBlock(block).equals(proposal.payload)) { - this.log.warn(`Re-execution state mismatch for slot ${proposal.slotNumber.toBigInt()}`, { - expected: ConsensusPayload.fromBlock(block).toInspect(), - actual: proposal.payload.toInspect(), - }); - this.metrics.recordFailedReexecution(proposal); - throw new ReExStateMismatchError( - proposal.archive, - block.archive.root, - proposal.payload.stateReference, - block.header.state, - ); - } - - this.metrics.recordReex(timer.ms(), txs.length, block.header.totalManaUsed.toNumber() / 1e6); + return this.createBlockAttestationsFromProposal(proposal, inCommittee); } private slashInvalidBlock(proposal: BlockProposal) { @@ -534,7 +380,7 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) const slot = proposal.payload.header.slotNumber.toBigInt(); const inCommittee = await this.epochCache.filterInCommittee(slot, this.getValidatorAddresses()); this.log.debug(`Collecting ${inCommittee.length} self-attestations for slot ${slot}`, { inCommittee }); - return this.doAttestToProposal(proposal, inCommittee); + return this.createBlockAttestationsFromProposal(proposal, inCommittee); } async collectAttestations(proposal: BlockProposal, required: number, deadline: Date): Promise { @@ -584,7 +430,10 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) } } - private async doAttestToProposal(proposal: BlockProposal, attestors: EthAddress[] = []): Promise { + private async createBlockAttestationsFromProposal( + proposal: BlockProposal, + attestors: EthAddress[] = [], + ): Promise { const attestations = await this.validationService.attestToProposal(proposal, attestors); await this.p2pClient.addAttestations(attestations); return attestations; @@ -613,5 +462,3 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) return authResponse.toBuffer(); } } - -// Conversion helpers moved into NodeKeystoreAdapter.