Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .test_patterns.yml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,11 @@ tests:
error_regex: "timeout: sending signal TERM to command|ValidatorSelection__InsufficientValidatorSetSize"
owners:
- *palla
# http://ci.aztec-labs.com/2927032942451013
- regex: "src/e2e_p2p/valid_epoch_pruned_slash.test.ts"
error_regex: "Timeout waiting for proposal execution"
owners:
- *palla

# yarn-project tests
- regex: "p2p/src/services/discv5/discv5_service.test.ts"
Expand Down
49 changes: 28 additions & 21 deletions yarn-project/aztec-node/src/aztec-node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,7 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
deps.p2pClientDeps,
);

// Start world state and wait for it to sync to the archiver.
await worldStateSynchronizer.start();

// Start p2p. Note that it depends on world state to be running.
await p2pClient.start();

// We should really not be modifying the config object
config.txPublicSetupAllowList = config.txPublicSetupAllowList ?? (await getDefaultAllowedSetupFunctions());

const blockBuilder = new BlockBuilder(
Expand All @@ -316,8 +311,35 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
telemetry,
);

// We'll accumulate sentinel watchers here
const watchers: Watcher[] = [];

// Create validator client if required
const validatorClient = createValidatorClient(config, {
p2pClient,
telemetry,
dateProvider,
epochCache,
blockBuilder,
blockSource: archiver,
l1ToL2MessageSource: archiver,
keyStoreManager,
});

// If we have a validator client, register it as a source of offenses for the slasher,
// and have it register callbacks on the p2p client *before* we start it, otherwise messages
// like attestations or auths will fail.
if (validatorClient) {
watchers.push(validatorClient);
await validatorClient.registerHandlers();
}

// Start world state and wait for it to sync to the archiver.
await worldStateSynchronizer.start();

// Start p2p. Note that it depends on world state to be running.
await p2pClient.start();

const validatorsSentinel = await createSentinel(epochCache, archiver, p2pClient, config);
if (validatorsSentinel) {
// we can run a sentinel without trying to slash.
Expand Down Expand Up @@ -349,21 +371,6 @@ export class AztecNodeService implements AztecNode, AztecNodeAdmin, Traceable {
watchers.push(attestationsBlockWatcher);
}

const validatorClient = createValidatorClient(config, {
p2pClient,
telemetry,
dateProvider,
epochCache,
blockBuilder,
blockSource: archiver,
l1ToL2MessageSource: archiver,
keyStoreManager,
});

if (validatorClient) {
watchers.push(validatorClient);
}

log.verbose(`All Aztec Node subsystems synced`);

// Validator enabled, create/start relevant service
Expand Down
19 changes: 9 additions & 10 deletions yarn-project/p2p/src/services/peer-manager/peer_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ export class PeerManager implements PeerManagerInterface {
const response = await this.reqresp.sendRequestToPeer(peerId, ReqRespSubProtocol.AUTH, authRequest.toBuffer());
const { status } = response;
if (status !== ReqRespStatus.SUCCESS) {
this.logger.debug(`Disconnecting peer ${peerId} who failed to respond auth handshake`, {
this.logger.verbose(`Disconnecting peer ${peerId} who failed to respond auth handshake`, {
peerId,
status: ReqRespStatus[status],
});
Expand All @@ -899,7 +899,7 @@ export class PeerManager implements PeerManagerInterface {

const peerStatusMessage = peerAuthResponse.status;
if (!ourStatus.validate(peerStatusMessage)) {
this.logger.debug(`Disconnecting peer ${peerId} due to failed status handshake as part of auth.`, logData);
this.logger.verbose(`Disconnecting peer ${peerId} due to failed status handshake as part of auth.`, logData);
this.markAuthHandshakeFailed(peerId);
this.markPeerForDisconnect(peerId);
return;
Expand All @@ -911,12 +911,9 @@ export class PeerManager implements PeerManagerInterface {
const registeredValidators = await this.epochCache.getRegisteredValidators();
const found = registeredValidators.find(v => v.toString() === sender.toString()) !== undefined;
if (!found) {
this.logger.debug(
this.logger.verbose(
`Disconnecting peer ${peerId} due to failed auth handshake, peer is not a registered validator.`,
{
peerId,
address: sender.toString(),
},
{ ...logData, address: sender.toString() },
);
this.markAuthHandshakeFailed(peerId);
this.markPeerForDisconnect(peerId);
Expand All @@ -926,8 +923,9 @@ export class PeerManager implements PeerManagerInterface {
// Check to see that this validator address isn't already allocated to a different peer
const peerForAddress = this.authenticatedValidatorAddressToPeerId.get(sender.toString());
if (peerForAddress !== undefined && peerForAddress.toString() !== peerIdString) {
this.logger.debug(
this.logger.verbose(
`Received auth for validator ${sender.toString()} from peer ${peerIdString}, but this validator is already authenticated to peer ${peerForAddress.toString()}`,
{ ...logData, address: sender.toString() },
);
return;
}
Expand All @@ -937,12 +935,13 @@ export class PeerManager implements PeerManagerInterface {
this.authenticatedValidatorAddressToPeerId.set(sender.toString(), peerId);
this.logger.info(
`Successfully completed auth handshake with peer ${peerId}, validator address ${sender.toString()}`,
logData,
{ ...logData, address: sender.toString() },
);
} catch (err: any) {
//TODO: maybe hard ban these peers in the future
this.logger.debug(`Disconnecting peer ${peerId} due to error during auth handshake: ${err.message ?? err}`, {
this.logger.verbose(`Disconnecting peer ${peerId} due to error during auth handshake: ${err.message}`, {
peerId,
err,
});
this.markAuthHandshakeFailed(peerId);
this.markPeerForDisconnect(peerId);
Expand Down
1 change: 0 additions & 1 deletion yarn-project/stdlib/src/interfaces/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ export const ValidatorClientConfigSchema = z.object({

export interface Validator {
start(): Promise<void>;
registerBlockProposalHandler(): void;
updateConfig(config: Partial<ValidatorClientFullConfig>): void;

// Block validation responsibilities
Expand Down
29 changes: 18 additions & 11 deletions yarn-project/validator-client/src/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter)
private validationService: ValidationService;
private metrics: ValidatorMetrics;

// Whether it has already registered handlers on the p2p client
private hasRegisteredHandlers = false;

// Used to check if we are sending the same proposal twice
private previousProposal?: BlockProposal;

Expand Down Expand Up @@ -207,12 +210,9 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter)
return;
}

this.registerBlockProposalHandler();
await this.registerHandlers();

// Sync the committee from the smart contract
// https://github.com/AztecProtocol/aztec-packages/issues/7962
const myAddresses = this.getValidatorAddresses();

const inCommittee = await this.epochCache.filterInCommittee('now', myAddresses);
if (inCommittee.length > 0) {
this.log.info(
Expand All @@ -225,20 +225,27 @@ export class ValidatorClient extends (EventEmitter as new () => WatcherEmitter)
}
this.epochCacheUpdateLoop.start();

this.p2pClient.registerThisValidatorAddresses(myAddresses);
await this.p2pClient.addReqRespSubProtocol(ReqRespSubProtocol.AUTH, this.handleAuthRequest.bind(this));

return Promise.resolve();
}

public async stop() {
await this.epochCacheUpdateLoop.stop();
}

public registerBlockProposalHandler() {
const handler = (block: BlockProposal, proposalSender: PeerId): Promise<BlockAttestation[] | undefined> =>
this.attestToProposal(block, proposalSender);
this.p2pClient.registerBlockProposalHandler(handler);
/** Register handlers on the p2p client */
public async registerHandlers() {
if (!this.hasRegisteredHandlers) {
this.hasRegisteredHandlers = true;

const handler = (block: BlockProposal, proposalSender: PeerId): Promise<BlockAttestation[] | undefined> =>
this.attestToProposal(block, proposalSender);
this.p2pClient.registerBlockProposalHandler(handler);

const myAddresses = this.getValidatorAddresses();
this.p2pClient.registerThisValidatorAddresses(myAddresses);

await this.p2pClient.addReqRespSubProtocol(ReqRespSubProtocol.AUTH, this.handleAuthRequest.bind(this));
}
}

async attestToProposal(proposal: BlockProposal, proposalSender: PeerId): Promise<BlockAttestation[] | undefined> {
Expand Down
Loading