From fb5f07107f9275cb08fcbfc1354901287654429e Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Mon, 22 Sep 2025 15:11:20 -0300 Subject: [PATCH] fix: Register validator callbacks on p2p client before starting --- .test_patterns.yml | 5 ++ .../aztec-node/src/aztec-node/server.ts | 49 +++++++++++-------- .../src/services/peer-manager/peer_manager.ts | 19 ++++--- .../stdlib/src/interfaces/validator.ts | 1 - .../validator-client/src/validator.ts | 29 ++++++----- 5 files changed, 60 insertions(+), 43 deletions(-) diff --git a/.test_patterns.yml b/.test_patterns.yml index b06356005118..7179a7862eac 100644 --- a/.test_patterns.yml +++ b/.test_patterns.yml @@ -239,6 +239,11 @@ tests: error_regex: "timeout: sending signal TERM to command|ValidatorSelection__InsufficientValidatorSetSize" owners: - *palla + # http://ci.aztec-labs.com/2927032942451013 + - regex: "src/e2e_p2p/valid_epoch_pruned_slash.test.ts" + error_regex: "Timeout waiting for proposal execution" + owners: + - *palla # yarn-project tests - regex: "p2p/src/services/discv5/discv5_service.test.ts" diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index 972b26f2dcd8..1646aa986ab7 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -300,12 +300,7 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { deps.p2pClientDeps, ); - // Start world state and wait for it to sync to the archiver. - await worldStateSynchronizer.start(); - - // Start p2p. Note that it depends on world state to be running. - await p2pClient.start(); - + // We should really not be modifying the config object config.txPublicSetupAllowList = config.txPublicSetupAllowList ?? (await getDefaultAllowedSetupFunctions()); const blockBuilder = new BlockBuilder( @@ -316,8 +311,35 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { telemetry, ); + // We'll accumulate sentinel watchers here const watchers: Watcher[] = []; + // Create validator client if required + const validatorClient = createValidatorClient(config, { + p2pClient, + telemetry, + dateProvider, + epochCache, + blockBuilder, + blockSource: archiver, + l1ToL2MessageSource: archiver, + keyStoreManager, + }); + + // If we have a validator client, register it as a source of offenses for the slasher, + // and have it register callbacks on the p2p client *before* we start it, otherwise messages + // like attestations or auths will fail. + if (validatorClient) { + watchers.push(validatorClient); + await validatorClient.registerHandlers(); + } + + // Start world state and wait for it to sync to the archiver. + await worldStateSynchronizer.start(); + + // Start p2p. Note that it depends on world state to be running. + await p2pClient.start(); + const validatorsSentinel = await createSentinel(epochCache, archiver, p2pClient, config); if (validatorsSentinel) { // we can run a sentinel without trying to slash. @@ -349,21 +371,6 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable { watchers.push(attestationsBlockWatcher); } - const validatorClient = createValidatorClient(config, { - p2pClient, - telemetry, - dateProvider, - epochCache, - blockBuilder, - blockSource: archiver, - l1ToL2MessageSource: archiver, - keyStoreManager, - }); - - if (validatorClient) { - watchers.push(validatorClient); - } - log.verbose(`All Aztec Node subsystems synced`); // Validator enabled, create/start relevant service diff --git a/yarn-project/p2p/src/services/peer-manager/peer_manager.ts b/yarn-project/p2p/src/services/peer-manager/peer_manager.ts index 7fa8bb57632e..792efa6f4e39 100644 --- a/yarn-project/p2p/src/services/peer-manager/peer_manager.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_manager.ts @@ -883,7 +883,7 @@ export class PeerManager implements PeerManagerInterface { const response = await this.reqresp.sendRequestToPeer(peerId, ReqRespSubProtocol.AUTH, authRequest.toBuffer()); const { status } = response; if (status !== ReqRespStatus.SUCCESS) { - this.logger.debug(`Disconnecting peer ${peerId} who failed to respond auth handshake`, { + this.logger.verbose(`Disconnecting peer ${peerId} who failed to respond auth handshake`, { peerId, status: ReqRespStatus[status], }); @@ -899,7 +899,7 @@ export class PeerManager implements PeerManagerInterface { const peerStatusMessage = peerAuthResponse.status; if (!ourStatus.validate(peerStatusMessage)) { - this.logger.debug(`Disconnecting peer ${peerId} due to failed status handshake as part of auth.`, logData); + this.logger.verbose(`Disconnecting peer ${peerId} due to failed status handshake as part of auth.`, logData); this.markAuthHandshakeFailed(peerId); this.markPeerForDisconnect(peerId); return; @@ -911,12 +911,9 @@ export class PeerManager implements PeerManagerInterface { const registeredValidators = await this.epochCache.getRegisteredValidators(); const found = registeredValidators.find(v => v.toString() === sender.toString()) !== undefined; if (!found) { - this.logger.debug( + this.logger.verbose( `Disconnecting peer ${peerId} due to failed auth handshake, peer is not a registered validator.`, - { - peerId, - address: sender.toString(), - }, + { ...logData, address: sender.toString() }, ); this.markAuthHandshakeFailed(peerId); this.markPeerForDisconnect(peerId); @@ -926,8 +923,9 @@ export class PeerManager implements PeerManagerInterface { // Check to see that this validator address isn't already allocated to a different peer const peerForAddress = this.authenticatedValidatorAddressToPeerId.get(sender.toString()); if (peerForAddress !== undefined && peerForAddress.toString() !== peerIdString) { - this.logger.debug( + this.logger.verbose( `Received auth for validator ${sender.toString()} from peer ${peerIdString}, but this validator is already authenticated to peer ${peerForAddress.toString()}`, + { ...logData, address: sender.toString() }, ); return; } @@ -937,12 +935,13 @@ export class PeerManager implements PeerManagerInterface { this.authenticatedValidatorAddressToPeerId.set(sender.toString(), peerId); this.logger.info( `Successfully completed auth handshake with peer ${peerId}, validator address ${sender.toString()}`, - logData, + { ...logData, address: sender.toString() }, ); } catch (err: any) { //TODO: maybe hard ban these peers in the future - this.logger.debug(`Disconnecting peer ${peerId} due to error during auth handshake: ${err.message ?? err}`, { + this.logger.verbose(`Disconnecting peer ${peerId} due to error during auth handshake: ${err.message}`, { peerId, + err, }); this.markAuthHandshakeFailed(peerId); this.markPeerForDisconnect(peerId); diff --git a/yarn-project/stdlib/src/interfaces/validator.ts b/yarn-project/stdlib/src/interfaces/validator.ts index 900ab756b080..c401114a1c84 100644 --- a/yarn-project/stdlib/src/interfaces/validator.ts +++ b/yarn-project/stdlib/src/interfaces/validator.ts @@ -54,7 +54,6 @@ export const ValidatorClientConfigSchema = z.object({ export interface Validator { start(): Promise; - registerBlockProposalHandler(): void; updateConfig(config: Partial): void; // Block validation responsibilities diff --git a/yarn-project/validator-client/src/validator.ts b/yarn-project/validator-client/src/validator.ts index 182520af1f53..e525d35798ca 100644 --- a/yarn-project/validator-client/src/validator.ts +++ b/yarn-project/validator-client/src/validator.ts @@ -57,6 +57,9 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) private validationService: ValidationService; private metrics: ValidatorMetrics; + // Whether it has already registered handlers on the p2p client + private hasRegisteredHandlers = false; + // Used to check if we are sending the same proposal twice private previousProposal?: BlockProposal; @@ -207,12 +210,9 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) return; } - this.registerBlockProposalHandler(); + await this.registerHandlers(); - // Sync the committee from the smart contract - // https://github.com/AztecProtocol/aztec-packages/issues/7962 const myAddresses = this.getValidatorAddresses(); - const inCommittee = await this.epochCache.filterInCommittee('now', myAddresses); if (inCommittee.length > 0) { this.log.info( @@ -225,9 +225,6 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) } this.epochCacheUpdateLoop.start(); - this.p2pClient.registerThisValidatorAddresses(myAddresses); - await this.p2pClient.addReqRespSubProtocol(ReqRespSubProtocol.AUTH, this.handleAuthRequest.bind(this)); - return Promise.resolve(); } @@ -235,10 +232,20 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter) await this.epochCacheUpdateLoop.stop(); } - public registerBlockProposalHandler() { - const handler = (block: BlockProposal, proposalSender: PeerId): Promise => - this.attestToProposal(block, proposalSender); - this.p2pClient.registerBlockProposalHandler(handler); + /** Register handlers on the p2p client */ + public async registerHandlers() { + if (!this.hasRegisteredHandlers) { + this.hasRegisteredHandlers = true; + + const handler = (block: BlockProposal, proposalSender: PeerId): Promise => + this.attestToProposal(block, proposalSender); + this.p2pClient.registerBlockProposalHandler(handler); + + const myAddresses = this.getValidatorAddresses(); + this.p2pClient.registerThisValidatorAddresses(myAddresses); + + await this.p2pClient.addReqRespSubProtocol(ReqRespSubProtocol.AUTH, this.handleAuthRequest.bind(this)); + } } async attestToProposal(proposal: BlockProposal, proposalSender: PeerId): Promise {