Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -380,22 +380,24 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
await validatorClient.registerHandlers();
}
}
}

// If there's no validator client but alwaysReexecuteBlockProposals is enabled,
// create a BlockProposalHandler to reexecute block proposals for monitoring
if (!validatorClient && config.alwaysReexecuteBlockProposals) {
log.info('Setting up block proposal reexecution for monitoring');
createBlockProposalHandler(config, {
checkpointsBuilder: validatorCheckpointsBuilder,
worldState: worldStateSynchronizer,
epochCache,
blockSource: archiver,
l1ToL2MessageSource: archiver,
p2pClient,
dateProvider,
telemetry,
}).registerForReexecution(p2pClient);
}
// If there's no validator client, create a BlockProposalHandler to handle block proposals
// for monitoring or reexecution. Reexecution (default) allows us to follow the pending chain,
// while non-reexecution is used for validating the proposals and collecting their txs.
if (!validatorClient) {
const reexecute = !!config.alwaysReexecuteBlockProposals;
log.info(`Setting up block proposal handler` + (reexecute ? ' with reexecution of proposals' : ''));
createBlockProposalHandler(config, {
checkpointsBuilder: validatorCheckpointsBuilder,
worldState: worldStateSynchronizer,
epochCache,
blockSource: archiver,
l1ToL2MessageSource: archiver,
p2pClient,
dateProvider,
telemetry,
}).register(p2pClient, reexecute);
}

// Start world state and wait for it to sync to the archiver.
Expand Down
22 changes: 0 additions & 22 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
type L2TipsStore,
} from '@aztec/stdlib/block';
import type { ContractDataSource } from '@aztec/stdlib/contract';
import { getTimestampForSlot } from '@aztec/stdlib/epoch-helpers';
import { type PeerInfo, tryStop } from '@aztec/stdlib/interfaces/server';
import { type BlockProposal, CheckpointAttestation, type CheckpointProposal, type TopicType } from '@aztec/stdlib/p2p';
import type { BlockHeader, Tx, TxHash } from '@aztec/stdlib/tx';
Expand Down Expand Up @@ -111,27 +110,6 @@ export class P2PClient extends WithTracer implements P2P {
this.telemetry,
);

// Default to collecting all txs when we see a valid proposal
// This can be overridden by the validator client to validate, and it will call getTxsForBlockProposal on its own
// Note: Validators do NOT attest to individual blocks - attestations are only for checkpoint proposals.
// TODO(palla/txs): We should not trigger a request for txs on a proposal before fully validating it. We need to bring
// validator-client code into here so we can validate a proposal is reasonable.
this.registerBlockProposalHandler(async (block, sender) => {
this.log.debug(`Received block proposal from ${sender.toString()}`);
// TODO(palla/txs): Need to subtract validatorReexecuteDeadlineMs from this deadline (see ValidatorClient.getReexecutionDeadline)
const constants = this.txCollection.getConstants();
const nextSlotTimestampSeconds = Number(getTimestampForSlot(SlotNumber(block.slotNumber + 1), constants));
const deadline = new Date(nextSlotTimestampSeconds * 1000);
const parentBlock = await this.l2BlockSource.getBlockHeaderByArchive(block.blockHeader.lastArchive.root);
if (!parentBlock) {
this.log.debug(`Cannot collect txs for proposal as parent block not found`);
return false;
}
const blockNumber = BlockNumber(parentBlock.getBlockNumber() + 1);
await this.txProvider.getTxsForBlockProposal(block, blockNumber, { pinnedPeer: sender, deadline });
return true;
});

this.l2Tips = new L2TipsKVStore(store, 'p2p_client');
this.synchedLatestSlot = store.openSingleton('p2p_pool_last_l2_slot');
}
Expand Down
78 changes: 45 additions & 33 deletions yarn-project/validator-client/src/block_proposal_handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { INITIAL_L2_BLOCK_NUM } from '@aztec/constants';
import type { EpochCache } from '@aztec/epoch-cache';
import { BlockNumber, CheckpointNumber, SlotNumber } from '@aztec/foundation/branded-types';
import { pick } from '@aztec/foundation/collection';
import { Fr } from '@aztec/foundation/curves/bn254';
import { TimeoutError } from '@aztec/foundation/error';
import { createLogger } from '@aztec/foundation/log';
Expand Down Expand Up @@ -87,25 +88,28 @@ export class BlockProposalHandler {
this.tracer = telemetry.getTracer('BlockProposalHandler');
}

registerForReexecution(p2pClient: P2P): BlockProposalHandler {
// Non-validator handler that re-executes for monitoring but does not attest.
register(p2pClient: P2P, shouldReexecute: boolean): BlockProposalHandler {
// Non-validator handler that processes or re-executes for monitoring but does not attest.
// Returns boolean indicating whether the proposal was valid.
const handler = async (proposal: BlockProposal, proposalSender: PeerId): Promise<boolean> => {
try {
const result = await this.handleBlockProposal(proposal, proposalSender, true);
const { slotNumber, blockNumber } = proposal;
const result = await this.handleBlockProposal(proposal, proposalSender, shouldReexecute);
if (result.isValid) {
this.log.info(`Non-validator reexecution completed for slot ${proposal.slotNumber}`, {
this.log.info(`Non-validator block proposal ${blockNumber} at slot ${slotNumber} handled`, {
blockNumber: result.blockNumber,
slotNumber,
reexecutionTimeMs: result.reexecutionResult?.reexecutionTimeMs,
totalManaUsed: result.reexecutionResult?.totalManaUsed,
numTxs: result.reexecutionResult?.block?.body?.txEffects?.length ?? 0,
reexecuted: shouldReexecute,
});
return true;
} else {
this.log.warn(`Non-validator reexecution failed for slot ${proposal.slotNumber}`, {
blockNumber: result.blockNumber,
reason: result.reason,
});
this.log.warn(
`Non-validator block proposal ${blockNumber} at slot ${slotNumber} failed processing with ${result.reason}`,
{ blockNumber: result.blockNumber, slotNumber, reason: result.reason },
);
return false;
}
} catch (error) {
Expand Down Expand Up @@ -184,6 +188,15 @@ export class BlockProposalHandler {
deadline: this.getReexecutionDeadline(slotNumber, config),
});

// If reexecution is disabled, bail. We are just interested in triggering tx collection.
if (!shouldReexecute) {
this.log.info(
`Received valid block ${blockNumber} proposal at index ${proposal.indexWithinCheckpoint} on slot ${slotNumber}`,
proposalInfo,
);
return { isValid: true, blockNumber };
}

// Compute the checkpoint number for this block and validate checkpoint consistency
const checkpointResult = this.computeCheckpointNumber(proposal, parentBlock, proposalInfo);
if (checkpointResult.reason) {
Expand All @@ -210,30 +223,28 @@ export class BlockProposalHandler {
return { isValid: false, blockNumber, reason: 'txs_not_available' };
}

// Collect the out hashes of all the checkpoints before this one in the same epoch
const epoch = getEpochAtSlot(slotNumber, this.epochCache.getL1Constants());
const previousCheckpointOutHashes = (await this.blockSource.getCheckpointsDataForEpoch(epoch))
.filter(c => c.checkpointNumber < checkpointNumber)
.map(c => c.checkpointOutHash);

// Try re-executing the transactions in the proposal if needed
let reexecutionResult;
if (shouldReexecute) {
// Collect the out hashes of all the checkpoints before this one in the same epoch
const epoch = getEpochAtSlot(slotNumber, this.epochCache.getL1Constants());
const previousCheckpointOutHashes = (await this.blockSource.getCheckpointsDataForEpoch(epoch))
.filter(c => c.checkpointNumber < checkpointNumber)
.map(c => c.checkpointOutHash);

try {
this.log.verbose(`Re-executing transactions in the proposal`, proposalInfo);
reexecutionResult = await this.reexecuteTransactions(
proposal,
blockNumber,
checkpointNumber,
txs,
l1ToL2Messages,
previousCheckpointOutHashes,
);
} catch (error) {
this.log.error(`Error reexecuting txs while processing block proposal`, error, proposalInfo);
const reason = this.getReexecuteFailureReason(error);
return { isValid: false, blockNumber, reason, reexecutionResult };
}
try {
this.log.verbose(`Re-executing transactions in the proposal`, proposalInfo);
reexecutionResult = await this.reexecuteTransactions(
proposal,
blockNumber,
checkpointNumber,
txs,
l1ToL2Messages,
previousCheckpointOutHashes,
);
} catch (error) {
this.log.error(`Error reexecuting txs while processing block proposal`, error, proposalInfo);
const reason = this.getReexecuteFailureReason(error);
return { isValid: false, blockNumber, reason, reexecutionResult };
}

// If we succeeded, push this block into the archiver (unless disabled)
Expand All @@ -242,8 +253,8 @@ export class BlockProposalHandler {
}

this.log.info(
`Successfully processed block ${blockNumber} proposal at index ${proposal.indexWithinCheckpoint} on slot ${slotNumber}`,
proposalInfo,
`Successfully re-executed block ${blockNumber} proposal at index ${proposal.indexWithinCheckpoint} on slot ${slotNumber}`,
{ ...proposalInfo, ...pick(reexecutionResult, 'reexecutionTimeMs', 'totalManaUsed') },
);

return { isValid: true, blockNumber, reexecutionResult };
Expand Down Expand Up @@ -488,10 +499,11 @@ export class BlockProposalHandler {
const { block, failedTxs } = result;
const numFailedTxs = failedTxs.length;

this.log.verbose(`Transaction re-execution complete for slot ${slot}`, {
this.log.verbose(`Block proposal ${blockNumber} at slot ${slot} transaction re-execution complete`, {
numFailedTxs,
numProposalTxs: txHashes.length,
numProcessedTxs: block.body.txEffects.length,
blockNumber,
slot,
});

Expand Down
2 changes: 2 additions & 0 deletions yarn-project/validator-client/src/validator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ describe('ValidatorClient', () => {
});

it('should return false if the transactions are not available', async () => {
enableReexecution();
txProvider.getTxsForBlockProposal.mockImplementation(proposal =>
Promise.resolve({
txs: [],
Expand Down Expand Up @@ -694,6 +695,7 @@ describe('ValidatorClient', () => {
// L1 messages for the checkpoint) will catch it.

it('should return false if global variables do not match parent for non-first block in checkpoint', async () => {
enableReexecution();
// Create a proposal with indexWithinCheckpoint > 0 (non-first block in checkpoint)
const parentSlotNumber = 100;
const parentBlockNumber = 10;
Expand Down
Loading