From 3aab8aaaf45f3ffe285c3712bf7f0182f8e995d9 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Fri, 6 Dec 2024 18:25:56 -0300 Subject: [PATCH 01/11] chore: Clean up p2p logs --- .../foundation/src/collection/array.ts | 10 +++++ .../p2p/src/service/discV5_service.ts | 23 +++++++---- .../p2p/src/service/libp2p_service.ts | 17 ++++---- yarn-project/p2p/src/service/peer_manager.ts | 41 +++++++++++-------- yarn-project/p2p/src/service/peer_scoring.ts | 9 +++- .../p2p/src/service/reqresp/reqresp.ts | 2 +- 6 files changed, 68 insertions(+), 34 deletions(-) diff --git a/yarn-project/foundation/src/collection/array.ts b/yarn-project/foundation/src/collection/array.ts index 9f37779727e1..dc1af0328841 100644 --- a/yarn-project/foundation/src/collection/array.ts +++ b/yarn-project/foundation/src/collection/array.ts @@ -145,3 +145,13 @@ export function areArraysEqual(a: T[], b: T[], eq: (a: T, b: T) => boolean = export function maxBy(arr: T[], fn: (x: T) => number): T | undefined { return arr.reduce((max, x) => (fn(x) > fn(max) ? x : max), arr[0]); } + +/** Computes the median of a numeric array. Returns undefined if array is empty. */ +export function median(arr: number[]) { + if (arr.length === 0) { + return undefined; + } + const sorted = arr.toSorted((a, b) => a - b); + const mid = Math.floor(sorted.length / 2); + return sorted.length % 2 !== 0 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2; +} diff --git a/yarn-project/p2p/src/service/discV5_service.ts b/yarn-project/p2p/src/service/discV5_service.ts index a39d58725e8c..5e79c7c50aff 100644 --- a/yarn-project/p2p/src/service/discV5_service.ts +++ b/yarn-project/p2p/src/service/discV5_service.ts @@ -5,7 +5,7 @@ import { OtelMetricsAdapter, type TelemetryClient } from '@aztec/telemetry-clien import { Discv5, type Discv5EventEmitter } from '@chainsafe/discv5'; import { ENR, SignableENR } from '@chainsafe/enr'; import type { PeerId } from '@libp2p/interface'; -import { multiaddr } from '@multiformats/multiaddr'; +import { type Multiaddr, multiaddr } from '@multiformats/multiaddr'; import EventEmitter from 'events'; import type { P2PConfig } from '../config.js'; @@ -35,6 +35,9 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService /** This instance's ENR */ private enr: SignableENR; + /** UDP listen addr */ + private listenMultiAddrUdp: Multiaddr; + private currentState = PeerDiscoveryState.STOPPED; private bootstrapNodes: string[]; @@ -66,7 +69,7 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService `${convertToMultiaddr(udpAnnounceAddress || tcpAnnounceAddress, 'udp')}/p2p/${peerId.toString()}`, ); - const listenMultiAddrUdp = multiaddr(convertToMultiaddr(udpListenAddress, 'udp')); + this.listenMultiAddrUdp = multiaddr(convertToMultiaddr(udpListenAddress, 'udp')); // set location multiaddr in ENR record this.enr.setLocationMultiaddr(multiAddrUdp); @@ -76,7 +79,7 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService this.discv5 = Discv5.create({ enr: this.enr, peerId, - bindAddrs: { ip4: listenMultiAddrUdp }, + bindAddrs: { ip4: this.listenMultiAddrUdp }, config: { lookupTimeout: 2000, requestTimeout: 2000, @@ -85,14 +88,14 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService metricsRegistry, }); - this.logger.info(`ENR NodeId: ${this.enr.nodeId}`); + this.logger.verbose(`DiscV5 ENR NodeId: ${this.enr.nodeId}`); this.logger.info(`ENR UDP: ${multiAddrUdp.toString()}`); (this.discv5 as Discv5EventEmitter).on('discovered', (enr: ENR) => this.onDiscovered(enr)); (this.discv5 as Discv5EventEmitter).on('enrAdded', async (enr: ENR) => { const multiAddrTcp = await enr.getFullMultiaddr('tcp'); const multiAddrUdp = await enr.getFullMultiaddr('udp'); - this.logger.debug(`ENR multiaddr: ${multiAddrTcp?.toString()}, ${multiAddrUdp?.toString()}`); + this.logger.debug(`Added ENR`, { multiAddrTcp, multiAddrUdp, nodeId: enr.nodeId }); this.onDiscovered(enr); }); } @@ -101,18 +104,22 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService if (this.currentState === PeerDiscoveryState.RUNNING) { throw new Error('DiscV5Service already started'); } - this.logger.info('Starting DiscV5'); + this.logger.debug('Starting DiscV5'); await this.discv5.start(); this.startTime = Date.now(); - this.logger.info('DiscV5 started'); + this.logger.info(`DiscV5 service started`, { + nodeId: this.enr.nodeId, + listenUdp: this.listenMultiAddrUdp, + peerId: this.peerId, + }); this.currentState = PeerDiscoveryState.RUNNING; // Add bootnode ENR if provided if (this.bootstrapNodes?.length) { // Do this conversion once since it involves an async function call this.bootstrapNodePeerIds = await Promise.all(this.bootstrapNodes.map(enr => ENR.decodeTxt(enr).peerId())); - this.logger.info(`Adding bootstrap ENRs: ${this.bootstrapNodes.join(', ')}`); + this.logger.info(`Adding bootstrap nodes ENRs: ${this.bootstrapNodes.join(', ')}`); try { this.bootstrapNodes.forEach(enr => { this.discv5.addEnr(enr); diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index 8ee0e1789cb5..ab249a951167 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -117,20 +117,17 @@ export class LibP2PService extends WithTracer implements P2PService { throw new Error('P2P service already started'); } - // Log listen & announce addresses + // Get listen & announce addresses for logging const { tcpListenAddress, tcpAnnounceAddress } = this.config; - this.logger.info(`Starting P2P node on ${tcpListenAddress}`); if (!tcpAnnounceAddress) { throw new Error('Announce address not provided.'); } const announceTcpMultiaddr = convertToMultiaddr(tcpAnnounceAddress, 'tcp'); - this.logger.info(`Announcing at ${announceTcpMultiaddr}`); // Start job queue, peer discovery service and libp2p node this.jobQueue.start(); await this.peerDiscoveryService.start(); await this.node.start(); - this.logger.info(`Started P2P client with Peer ID ${this.node.peerId.toString()}`); // Subscribe to standard GossipSub topics by default for (const topic in TopicType) { @@ -157,6 +154,11 @@ export class LibP2PService extends WithTracer implements P2PService { [TX_REQ_PROTOCOL]: this.validateRequestedTx.bind(this), }; await this.reqresp.start(this.requestResponseHandlers, reqrespSubProtocolValidators); + this.logger.info(`Started P2P service`, { + listen: tcpListenAddress, + announce: announceTcpMultiaddr, + peerId: this.node.peerId.toString(), + }); } /** @@ -175,7 +177,6 @@ export class LibP2PService extends WithTracer implements P2PService { this.logger.debug('Stopping LibP2P...'); await this.stopLibP2P(); this.logger.info('LibP2P service stopped'); - this.logger.debug('Stopping request response service...'); } /** @@ -583,10 +584,10 @@ export class LibP2PService extends WithTracer implements P2PService { const parent = message.constructor as typeof Gossipable; const identifier = message.p2pMessageIdentifier().toString(); - this.logger.verbose(`[${identifier}] sending`); + this.logger.trace(`Sending message ${identifier}`); const recipientsNum = await this.publishToTopic(parent.p2pTopic, message.toBuffer()); - this.logger.verbose(`[${identifier}] sent to ${recipientsNum} peers`); + this.logger.debug(`Sent message ${identifier} to ${recipientsNum} peers`); } // Libp2p seems to hang sometimes if new peers are initiating connections. @@ -597,7 +598,7 @@ export class LibP2PService extends WithTracer implements P2PService { }); try { await Promise.race([this.node.stop(), timeout]); - this.logger.debug('Libp2p stopped'); + this.logger.debug('LibP2P stopped'); } catch (error) { this.logger.error('Error during stop or timeout:', error); } diff --git a/yarn-project/p2p/src/service/peer_manager.ts b/yarn-project/p2p/src/service/peer_manager.ts index b413db5d59cc..8c229bac2444 100644 --- a/yarn-project/p2p/src/service/peer_manager.ts +++ b/yarn-project/p2p/src/service/peer_manager.ts @@ -22,6 +22,7 @@ type CachedPeer = { export class PeerManager { private cachedPeers: Map = new Map(); private peerScoring: PeerScoring; + private heartbeatCounter: number = 0; constructor( private libP2PNode: PubSubLibp2p, @@ -34,9 +35,9 @@ export class PeerManager { this.libP2PNode.addEventListener('peer:connect', evt => { const peerId = evt.detail; if (this.peerDiscoveryService.isBootstrapPeer(peerId)) { - this.logger.debug(`Connected to bootstrap peer ${peerId.toString()}`); + this.logger.verbose(`Connected to bootstrap peer ${peerId.toString()}`); } else { - this.logger.debug(`Connected to transaction peer ${peerId.toString()}`); + this.logger.verbose(`Connected to transaction peer ${peerId.toString()}`); } }); @@ -44,9 +45,9 @@ export class PeerManager { this.libP2PNode.addEventListener('peer:disconnect', evt => { const peerId = evt.detail; if (this.peerDiscoveryService.isBootstrapPeer(peerId)) { - this.logger.debug(`Disconnected from bootstrap peer ${peerId.toString()}`); + this.logger.verbose(`Disconnected from bootstrap peer ${peerId.toString()}`); } else { - this.logger.debug(`Disconnected from transaction peer ${peerId.toString()}`); + this.logger.verbose(`Disconnected from transaction peer ${peerId.toString()}`); } }); @@ -57,6 +58,7 @@ export class PeerManager { } public heartbeat() { + this.heartbeatCounter++; this.discover(); this.peerScoring.decayAllScores(); } @@ -64,7 +66,8 @@ export class PeerManager { public penalizePeer(peerId: PeerId, penalty: PeerErrorSeverity) { const id = peerId.toString(); const penaltyValue = this.peerScoring.peerPenalties[penalty]; - this.peerScoring.updateScore(id, -penaltyValue); + const newScore = this.peerScoring.updateScore(id, -penaltyValue); + this.logger.verbose(`Penalizing peer ${id} with ${penalty} (new score is ${newScore})`); } public getPeerScore(peerId: string): number { @@ -81,9 +84,13 @@ export class PeerManager { // Calculate how many connections we're looking to make const peersToConnect = this.config.maxPeerCount - connections.length; - this.logger.debug( - `Connections: ${connections.length}, Peers to connect: ${peersToConnect}, maxPeerCount: ${this.config.maxPeerCount}, cachedPeers: ${this.cachedPeers.size}`, - ); + const logLevel = this.heartbeatCounter % 60 === 0 ? 'info' : 'debug'; + this.logger[logLevel](`P2P peers status`, { + connections: connections.length, + maxPeerCount: this.config.maxPeerCount, + cachedPeers: this.cachedPeers.size, + ...this.peerScoring.getStats(), + }); // Exit if no peers to connect if (peersToConnect <= 0) { @@ -119,7 +126,7 @@ export class PeerManager { // if we need more peers, start randomNodesQuery if (peersToConnect > 0) { - this.logger.debug('Running random nodes query'); + this.logger.debug(`Running random nodes query to connect to ${peersToConnect} peers`); void this.peerDiscoveryService.runRandomNodesQuery(); } } @@ -134,11 +141,11 @@ export class PeerManager { // check if peer is already connected const [peerId, multiaddrTcp] = await Promise.all([enr.peerId(), enr.getFullMultiaddr('tcp')]); - this.logger.debug(`Handling discovered peer ${peerId.toString()}, ${multiaddrTcp?.toString()}`); + this.logger.debug(`Handling discovered peer ${peerId.toString()} at ${multiaddrTcp?.toString()}`); // throw if no tcp addr in multiaddr if (!multiaddrTcp) { - this.logger.debug(`No TCP address in discovered node's multiaddr: ${enr.toString()}`); + this.logger.warn(`No TCP address in discovered node's multiaddr ${enr.encodeTxt()}`); return; } const connections = this.libP2PNode.getConnections(); @@ -150,7 +157,7 @@ export class PeerManager { // check if peer is already in cache const id = peerId.toString(); if (this.cachedPeers.has(id)) { - this.logger.debug(`Already in cache ${id}`); + this.logger.debug(`Peer already in cache ${id}`); return; } @@ -164,7 +171,6 @@ export class PeerManager { // Determine if we should dial immediately or not if (this.shouldDialPeer()) { - this.logger.debug(`Dialing peer ${id}`); void this.dialPeer(cachedPeer); } else { this.logger.debug(`Caching peer ${id}`); @@ -182,11 +188,12 @@ export class PeerManager { try { await this.libP2PNode.dial(peer.multiaddrTcp); } catch { - this.logger.debug(`Failed to dial peer ${id}`); peer.dialAttempts++; if (peer.dialAttempts < MAX_DIAL_ATTEMPTS) { + this.logger.debug(`Failed to dial peer ${id} (attempt ${peer.dialAttempts})`); this.cachedPeers.set(id, peer); } else { + this.logger.debug(`Failed to dial peer ${id} (dropping)`); this.cachedPeers.delete(id); } } @@ -194,9 +201,10 @@ export class PeerManager { private shouldDialPeer(): boolean { const connections = this.libP2PNode.getConnections().length; - this.logger.debug(`Connections: ${connections}, maxPeerCount: ${this.config.maxPeerCount}`); if (connections >= this.config.maxPeerCount) { - this.logger.debug('Not dialing peer, maxPeerCount reached'); + this.logger.debug( + `Not dialing peer due to max peer count of ${this.config.maxPeerCount} reached (${connections} current connections)`, + ); return false; } return true; @@ -211,6 +219,7 @@ export class PeerManager { // Remove the oldest peers for (const key of this.cachedPeers.keys()) { this.cachedPeers.delete(key); + this.logger.debug(`Pruning peer ${key} from cache`); peersToDelete--; if (peersToDelete <= 0) { break; diff --git a/yarn-project/p2p/src/service/peer_scoring.ts b/yarn-project/p2p/src/service/peer_scoring.ts index d59cb10b1824..896eb0a69b65 100644 --- a/yarn-project/p2p/src/service/peer_scoring.ts +++ b/yarn-project/p2p/src/service/peer_scoring.ts @@ -1,3 +1,5 @@ +import { median } from '@aztec/foundation/collection'; + import { type P2PConfig } from '../config.js'; export enum PeerErrorSeverity { @@ -43,7 +45,7 @@ export class PeerScoring { }; } - updateScore(peerId: string, scoreDelta: number): void { + updateScore(peerId: string, scoreDelta: number): number { const currentTime = Date.now(); const lastUpdate = this.lastUpdateTime.get(peerId) || currentTime; const timePassed = currentTime - lastUpdate; @@ -59,6 +61,7 @@ export class PeerScoring { this.scores.set(peerId, currentScore); this.lastUpdateTime.set(peerId, currentTime); + return currentScore; } decayAllScores(): void { @@ -78,4 +81,8 @@ export class PeerScoring { getScore(peerId: string): number { return this.scores.get(peerId) || 0; } + + getStats(): { median: number } { + return { median: median(Array.from(this.scores.values())) ?? 0 }; + } } diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.ts b/yarn-project/p2p/src/service/reqresp/reqresp.ts index 0512c048f832..da6b3e87b53e 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.ts @@ -210,7 +210,7 @@ export class ReqResp { return result; } catch (e: any) { - this.logger.error(`${e.message} | peerId: ${peerId.toString()} | subProtocol: ${subProtocol}`); + this.logger.error(`Error sending request to peer`, e, { peerId, subProtocol }); this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError); } finally { if (stream) { From 8d91298395a147abf1553483ba0dd6b456ab09e8 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Mon, 9 Dec 2024 10:33:09 -0300 Subject: [PATCH 02/11] Define p2p public API and add getPeers --- .../aztec-node/src/aztec-node/server.ts | 11 ++- yarn-project/aztec/src/cli/cli.ts | 2 +- yarn-project/aztec/src/cli/cmds/start_node.ts | 5 +- .../aztec/src/cli/cmds/start_p2p_bootstrap.ts | 25 ++++-- .../aztec/src/cli/cmds/start_prover_node.ts | 8 +- .../circuit-types/src/interfaces/index.ts | 2 + .../src/interfaces/p2p-bootstrap.ts | 21 +++++ .../circuit-types/src/interfaces/p2p.test.ts | 82 +++++++++++++++++++ .../circuit-types/src/interfaces/p2p.ts | 71 ++++++++++++++++ .../src/p2p/block_attestation.ts | 12 +++ .../src/p2p/consensus_payload.ts | 11 +++ yarn-project/p2p/src/bootstrap/bootstrap.ts | 28 +++++-- .../p2p/src/client/p2p_client.test.ts | 61 ++++---------- yarn-project/p2p/src/client/p2p_client.ts | 40 +++++---- yarn-project/p2p/src/service/dummy_service.ts | 7 +- .../p2p/src/service/libp2p_service.ts | 5 ++ yarn-project/p2p/src/service/peer_manager.ts | 34 +++++++- .../reqresp/reqresp.integration.test.ts | 2 +- yarn-project/p2p/src/service/service.ts | 4 +- yarn-project/prover-node/src/prover-node.ts | 9 ++ .../src/sequencer/sequencer.test.ts | 36 ++++---- .../src/sequencer/sequencer.ts | 2 +- 22 files changed, 370 insertions(+), 108 deletions(-) create mode 100644 yarn-project/circuit-types/src/interfaces/p2p-bootstrap.ts create mode 100644 yarn-project/circuit-types/src/interfaces/p2p.test.ts create mode 100644 yarn-project/circuit-types/src/interfaces/p2p.ts diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index d0a5271640d7..2865e1f8649e 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -218,6 +218,10 @@ export class AztecNodeService implements AztecNode { return this.blockSource; } + public getP2P(): P2P { + return this.p2pClient; + } + /** * Method to return the currently deployed L1 contract addresses. * @returns - The currently deployed L1 contract addresses. @@ -427,11 +431,12 @@ export class AztecNodeService implements AztecNode { * @returns - The pending txs. */ public getPendingTxs() { - return Promise.resolve(this.p2pClient!.getTxs('pending')); + return this.p2pClient!.getPendingTxs(); } - public getPendingTxCount() { - return Promise.resolve(this.p2pClient!.getTxs('pending').length); + public async getPendingTxCount() { + const pendingTxs = await this.getPendingTxs(); + return pendingTxs.length; } /** diff --git a/yarn-project/aztec/src/cli/cli.ts b/yarn-project/aztec/src/cli/cli.ts index 173a9ff2f46a..9655f5169a7a 100644 --- a/yarn-project/aztec/src/cli/cli.ts +++ b/yarn-project/aztec/src/cli/cli.ts @@ -95,7 +95,7 @@ export function injectAztecCommands(program: Command, userLog: LogFn, debugLogge await startArchiver(options, signalHandlers, services); } else if (options.p2pBootstrap) { const { startP2PBootstrap } = await import('./cmds/start_p2p_bootstrap.js'); - await startP2PBootstrap(options, userLog, debugLogger); + await startP2PBootstrap(options, signalHandlers, services, userLog); } else if (options.proverAgent) { const { startProverAgent } = await import('./cmds/start_prover_agent.js'); await startProverAgent(options, signalHandlers, services, userLog); diff --git a/yarn-project/aztec/src/cli/cmds/start_node.ts b/yarn-project/aztec/src/cli/cmds/start_node.ts index 939776c345bf..2459533aa9fc 100644 --- a/yarn-project/aztec/src/cli/cmds/start_node.ts +++ b/yarn-project/aztec/src/cli/cmds/start_node.ts @@ -1,5 +1,5 @@ import { aztecNodeConfigMappings } from '@aztec/aztec-node'; -import { AztecNodeApiSchema, type PXE } from '@aztec/circuit-types'; +import { AztecNodeApiSchema, P2PApiSchema, type PXE } from '@aztec/circuit-types'; import { NULL_KEY } from '@aztec/ethereum'; import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; import { type LogFn } from '@aztec/foundation/log'; @@ -93,8 +93,9 @@ export async function startNode( // Create and start Aztec Node const node = await createAztecNode(nodeConfig, telemetryClient); - // Add node to services list + // Add node and p2p to services list services.node = [node, AztecNodeApiSchema]; + services.p2p = [node.getP2P(), P2PApiSchema]; // Add node stop function to signal handlers signalHandlers.push(node.stop.bind(node)); diff --git a/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts b/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts index 4d4f7618d80c..975f339aef65 100644 --- a/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts +++ b/yarn-project/aztec/src/cli/cmds/start_p2p_bootstrap.ts @@ -1,7 +1,8 @@ -import { type Logger } from '@aztec/aztec.js'; -import { type LogFn } from '@aztec/foundation/log'; -import { type BootnodeConfig, bootnodeConfigMappings } from '@aztec/p2p'; -import runBootstrapNode from '@aztec/p2p-bootstrap'; +import { P2PBootstrapApiSchema } from '@aztec/circuit-types'; +import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; +import { type LogFn, createLogger } from '@aztec/foundation/log'; +import { createStore } from '@aztec/kv-store/lmdb'; +import { type BootnodeConfig, BootstrapNode, bootnodeConfigMappings } from '@aztec/p2p'; import { createAndStartTelemetryClient, getConfigEnvVars as getTelemetryClientConfig, @@ -9,11 +10,19 @@ import { import { extractRelevantOptions } from '../util.js'; -export const startP2PBootstrap = async (options: any, userLog: LogFn, debugLogger: Logger) => { +export async function startP2PBootstrap( + options: any, + signalHandlers: (() => Promise)[], + services: NamespacedApiHandlers, + userLog: LogFn, +) { // Start a P2P bootstrap node. const config = extractRelevantOptions(options, bootnodeConfigMappings, 'p2p'); const telemetryClient = await createAndStartTelemetryClient(getTelemetryClientConfig()); - - await runBootstrapNode(config, telemetryClient, debugLogger); + const store = await createStore('p2p-bootstrap', config, createLogger('p2p:bootstrap:store')); + const node = new BootstrapNode(store, telemetryClient); + await node.start(config); + signalHandlers.push(() => node.stop()); + services.bootstrap = [node, P2PBootstrapApiSchema]; userLog(`P2P bootstrap node started on ${config.udpListenAddress}`); -}; +} diff --git a/yarn-project/aztec/src/cli/cmds/start_prover_node.ts b/yarn-project/aztec/src/cli/cmds/start_prover_node.ts index 0d6fa266edc8..ceb4cc00ff52 100644 --- a/yarn-project/aztec/src/cli/cmds/start_prover_node.ts +++ b/yarn-project/aztec/src/cli/cmds/start_prover_node.ts @@ -1,4 +1,4 @@ -import { ProverNodeApiSchema, type ProvingJobBroker, createAztecNodeClient } from '@aztec/circuit-types'; +import { P2PApiSchema, ProverNodeApiSchema, type ProvingJobBroker, createAztecNodeClient } from '@aztec/circuit-types'; import { NULL_KEY } from '@aztec/ethereum'; import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; import { type LogFn } from '@aztec/foundation/log'; @@ -81,12 +81,16 @@ export async function startProverNode( const proverNode = await createProverNode(proverConfig, { telemetry, broker }); services.proverNode = [proverNode, ProverNodeApiSchema]; + const p2p = proverNode.getP2P(); + if (p2p) { + services.p2p = [proverNode.getP2P(), P2PApiSchema]; + } + if (!proverConfig.proverBrokerUrl) { services.provingJobSource = [proverNode.getProver().getProvingJobSource(), ProvingJobConsumerSchema]; } signalHandlers.push(proverNode.stop.bind(proverNode)); - // Automatically start proving unproven blocks await proverNode.start(); } diff --git a/yarn-project/circuit-types/src/interfaces/index.ts b/yarn-project/circuit-types/src/interfaces/index.ts index c717ceae6490..3f05c960e1ba 100644 --- a/yarn-project/circuit-types/src/interfaces/index.ts +++ b/yarn-project/circuit-types/src/interfaces/index.ts @@ -21,3 +21,5 @@ export * from './service.js'; export * from './sync-status.js'; export * from './world_state.js'; export * from './prover-broker.js'; +export * from './p2p.js'; +export * from './p2p-bootstrap.js'; diff --git a/yarn-project/circuit-types/src/interfaces/p2p-bootstrap.ts b/yarn-project/circuit-types/src/interfaces/p2p-bootstrap.ts new file mode 100644 index 000000000000..8283f4da841e --- /dev/null +++ b/yarn-project/circuit-types/src/interfaces/p2p-bootstrap.ts @@ -0,0 +1,21 @@ +import { type ApiSchemaFor } from '@aztec/foundation/schemas'; + +import { z } from 'zod'; + +/** Exposed API to the P2P bootstrap node. */ +export interface P2PBootstrapApi { + /** + * Returns the ENR for this node. + */ + getEncodedEnr(): Promise; + + /** + * Returns ENRs for all nodes in the routing table. + */ + getRoutingTable(): Promise; +} + +export const P2PBootstrapApiSchema: ApiSchemaFor = { + getEncodedEnr: z.function().returns(z.string()), + getRoutingTable: z.function().returns(z.array(z.string())), +}; diff --git a/yarn-project/circuit-types/src/interfaces/p2p.test.ts b/yarn-project/circuit-types/src/interfaces/p2p.test.ts new file mode 100644 index 000000000000..f8a962e025bd --- /dev/null +++ b/yarn-project/circuit-types/src/interfaces/p2p.test.ts @@ -0,0 +1,82 @@ +import { type JsonRpcTestContext, createJsonRpcTestSetup } from '@aztec/foundation/json-rpc/test'; + +import { BlockAttestation } from '../p2p/block_attestation.js'; +import { EpochProofQuote } from '../prover_coordination/epoch_proof_quote.js'; +import { Tx } from '../tx/tx.js'; +import { type P2PApi, P2PApiSchema, type PeerInfo } from './p2p.js'; + +describe('P2PApiSchema', () => { + let handler: MockP2P; + let context: JsonRpcTestContext; + + const tested = new Set(); + + beforeEach(async () => { + handler = new MockP2P(); + context = await createJsonRpcTestSetup(handler, P2PApiSchema); + }); + + afterEach(() => { + tested.add(/^P2PApiSchema\s+([^(]+)/.exec(expect.getState().currentTestName!)![1]); + context.httpServer.close(); + }); + + afterAll(() => { + const all = Object.keys(P2PApiSchema); + expect([...tested].sort()).toEqual(all.sort()); + }); + + it('getAttestationsForSlot', async () => { + const attestations = await context.client.getAttestationsForSlot(BigInt(1), 'proposalId'); + expect(attestations).toEqual([BlockAttestation.empty()]); + expect(attestations[0]).toBeInstanceOf(BlockAttestation); + }); + + it('getEpochProofQuotes', async () => { + const quotes = await context.client.getEpochProofQuotes(BigInt(1)); + expect(quotes).toEqual([EpochProofQuote.empty()]); + expect(quotes[0]).toBeInstanceOf(EpochProofQuote); + }); + + it('getPendingTxs', async () => { + const txs = await context.client.getPendingTxs(); + expect(txs[0]).toBeInstanceOf(Tx); + }); + + it('getEncodedEnr', async () => { + const enr = await context.client.getEncodedEnr(); + expect(enr).toEqual('enr'); + }); + + it('getPeers', async () => { + const peers = await context.client.getPeers(); + expect(peers).toEqual(peers); + }); +}); + +const peers: PeerInfo[] = [ + { status: 'connected', score: 1, id: 'id' }, + { status: 'dialing', dialStatus: 'dialStatus', id: 'id', addresses: ['address'] }, + { status: 'cached', id: 'id', addresses: ['address'], enr: 'enr', dialAttempts: 1 }, +]; + +class MockP2P implements P2PApi { + getAttestationsForSlot(slot: bigint, proposalId?: string | undefined): Promise { + expect(slot).toEqual(1n); + expect(proposalId).toEqual('proposalId'); + return Promise.resolve([BlockAttestation.empty()]); + } + getEpochProofQuotes(epoch: bigint): Promise { + expect(epoch).toEqual(1n); + return Promise.resolve([EpochProofQuote.empty()]); + } + getPendingTxs(): Promise { + return Promise.resolve([Tx.random()]); + } + getEncodedEnr(): Promise { + return Promise.resolve('enr'); + } + getPeers(): Promise { + return Promise.resolve(peers); + } +} diff --git a/yarn-project/circuit-types/src/interfaces/p2p.ts b/yarn-project/circuit-types/src/interfaces/p2p.ts new file mode 100644 index 000000000000..e50a8bde866f --- /dev/null +++ b/yarn-project/circuit-types/src/interfaces/p2p.ts @@ -0,0 +1,71 @@ +import { type ApiSchemaFor, optional, schemas } from '@aztec/foundation/schemas'; + +import { z } from 'zod'; + +import { BlockAttestation } from '../p2p/block_attestation.js'; +import { EpochProofQuote } from '../prover_coordination/epoch_proof_quote.js'; +import { Tx } from '../tx/tx.js'; + +export type PeerInfo = + | { status: 'connected'; score: number; id: string } + | { status: 'dialing'; dialStatus: string; id: string; addresses: string[] } + | { status: 'cached'; id: string; addresses: string[]; enr: string; dialAttempts: number }; + +const PeerInfoSchema = z.discriminatedUnion('status', [ + z.object({ status: z.literal('connected'), score: z.number(), id: z.string() }), + z.object({ status: z.literal('dialing'), dialStatus: z.string(), id: z.string(), addresses: z.array(z.string()) }), + z.object({ + status: z.literal('cached'), + id: z.string(), + addresses: z.array(z.string()), + enr: z.string(), + dialAttempts: z.number(), + }), +]); + +/** Exposed API to the P2P module. */ +export interface P2PApi { + /** + * Queries the Attestation pool for attestations for the given slot + * + * @param slot - the slot to query + * @param proposalId - the proposal id to query, or undefined to query all proposals for the slot + * @returns BlockAttestations + */ + getAttestationsForSlot(slot: bigint, proposalId?: string): Promise; + + /** + * Queries the EpochProofQuote pool for quotes for the given epoch + * + * @param epoch - the epoch to query + * @returns EpochProofQuotes + */ + getEpochProofQuotes(epoch: bigint): Promise; + + /** + * Returns all pending transactions in the transaction pool. + * @returns An array of Txs. + */ + getPendingTxs(): Promise; + + /** + * Returns the ENR for this node, if any. + */ + getEncodedEnr(): Promise; + + /** + * Returns info for all connected, dialing, and cached peers. + */ + getPeers(): Promise; +} + +export const P2PApiSchema: ApiSchemaFor = { + getAttestationsForSlot: z + .function() + .args(schemas.BigInt, optional(z.string())) + .returns(z.array(BlockAttestation.schema)), + getEpochProofQuotes: z.function().args(schemas.BigInt).returns(z.array(EpochProofQuote.schema)), + getPendingTxs: z.function().returns(z.array(Tx.schema)), + getEncodedEnr: z.function().returns(z.string().optional()), + getPeers: z.function().returns(z.array(PeerInfoSchema)), +}; diff --git a/yarn-project/circuit-types/src/p2p/block_attestation.ts b/yarn-project/circuit-types/src/p2p/block_attestation.ts index 04ccfdf4d52b..4ac0babe6905 100644 --- a/yarn-project/circuit-types/src/p2p/block_attestation.ts +++ b/yarn-project/circuit-types/src/p2p/block_attestation.ts @@ -3,8 +3,11 @@ import { keccak256, recoverAddress } from '@aztec/foundation/crypto'; import { type EthAddress } from '@aztec/foundation/eth-address'; import { Signature } from '@aztec/foundation/eth-signature'; import { type Fr } from '@aztec/foundation/fields'; +import { type ZodFor } from '@aztec/foundation/schemas'; import { BufferReader, serializeToBuffer } from '@aztec/foundation/serialize'; +import { z } from 'zod'; + import { ConsensusPayload } from './consensus_payload.js'; import { Gossipable } from './gossipable.js'; import { SignatureDomainSeperator, getHashedSignaturePayloadEthSignedMessage } from './signature_utils.js'; @@ -37,6 +40,15 @@ export class BlockAttestation extends Gossipable { super(); } + static get schema(): ZodFor { + return z + .object({ + payload: ConsensusPayload.schema, + signature: Signature.schema, + }) + .transform(obj => new BlockAttestation(obj.payload, obj.signature)); + } + override p2pMessageIdentifier(): Buffer32 { return new BlockAttestationHash(keccak256(this.signature.toBuffer())); } diff --git a/yarn-project/circuit-types/src/p2p/consensus_payload.ts b/yarn-project/circuit-types/src/p2p/consensus_payload.ts index a043a8d20101..37fedc508da7 100644 --- a/yarn-project/circuit-types/src/p2p/consensus_payload.ts +++ b/yarn-project/circuit-types/src/p2p/consensus_payload.ts @@ -5,6 +5,7 @@ import { hexToBuffer } from '@aztec/foundation/string'; import { type FieldsOf } from '@aztec/foundation/types'; import { encodeAbiParameters, parseAbiParameters } from 'viem'; +import { z } from 'zod'; import { TxHash } from '../tx/tx_hash.js'; import { type Signable, type SignatureDomainSeperator } from './signature_utils.js'; @@ -21,6 +22,16 @@ export class ConsensusPayload implements Signable { public readonly txHashes: TxHash[], ) {} + static get schema() { + return z + .object({ + header: BlockHeader.schema, + archive: Fr.schema, + txHashes: z.array(TxHash.schema), + }) + .transform(obj => new ConsensusPayload(obj.header, obj.archive, obj.txHashes)); + } + static getFields(fields: FieldsOf) { return [fields.header, fields.archive, fields.txHashes] as const; } diff --git a/yarn-project/p2p/src/bootstrap/bootstrap.ts b/yarn-project/p2p/src/bootstrap/bootstrap.ts index d0d459be642d..ab4f84b6b797 100644 --- a/yarn-project/p2p/src/bootstrap/bootstrap.ts +++ b/yarn-project/p2p/src/bootstrap/bootstrap.ts @@ -1,3 +1,4 @@ +import { type P2PBootstrapApi } from '@aztec/circuit-types/interfaces'; import { createLogger } from '@aztec/foundation/log'; import { type AztecKVStore } from '@aztec/kv-store'; import { OtelMetricsAdapter, type TelemetryClient } from '@aztec/telemetry-client'; @@ -14,7 +15,7 @@ import { convertToMultiaddr, createLibP2PPeerIdFromPrivateKey, getPeerIdPrivateK /** * Encapsulates a 'Bootstrap' node, used for the purpose of assisting new joiners in acquiring peers. */ -export class BootstrapNode { +export class BootstrapNode implements P2PBootstrapApi { private node?: Discv5 = undefined; private peerId?: PeerId; @@ -47,7 +48,7 @@ export class BootstrapNode { enr.setLocationMultiaddr(publicAddr); enr.set(AZTEC_ENR_KEY, Uint8Array.from([AZTEC_NET])); - this.logger.info(`Starting bootstrap node ${peerId}, listening on ${listenAddrUdp.toString()}`); + this.logger.debug(`Starting bootstrap node ${peerId} listening on ${listenAddrUdp.toString()}`); const metricsRegistry = new OtelMetricsAdapter(this.telemetry); this.node = Discv5.create({ enr, @@ -65,17 +66,15 @@ export class BootstrapNode { }); (this.node as Discv5EventEmitter).on('discovered', async (enr: SignableENR) => { const addr = await enr.getFullMultiaddr('udp'); - this.logger.verbose(`Discovered new peer, enr: ${enr.encodeTxt()}, addr: ${addr?.toString()}`); + this.logger.verbose(`Discovered new peer`, { enr: enr.encodeTxt(), addr: addr?.toString() }); }); try { await this.node.start(); - this.logger.info('Discv5 started'); + this.logger.info('Bootstrap node started', { peerId, enr: enr.encodeTxt(), addr: listenAddrUdp.toString() }); } catch (e) { this.logger.error('Error starting Discv5', e); } - - this.logger.info(`ENR: ${this.node?.enr.encodeTxt()}`); } /** @@ -84,8 +83,9 @@ export class BootstrapNode { */ public async stop() { // stop libp2p + this.logger.debug('Stopping bootstrap node'); await this.node?.stop(); - this.logger.debug('Discv5 has stopped'); + this.logger.info('Bootstrap node stopped'); } /** @@ -105,4 +105,18 @@ export class BootstrapNode { } return this.node?.enr.toENR(); } + + public getEncodedEnr() { + if (!this.node) { + throw new Error('Node not started'); + } + return Promise.resolve(this.node.enr.encodeTxt()); + } + + public getRoutingTable() { + if (!this.node) { + throw new Error('Node not started'); + } + return Promise.resolve(this.node.kadValues().map(enr => enr.encodeTxt())); + } } diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 24a112ae839b..e09fef66999f 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -6,7 +6,8 @@ import { sleep } from '@aztec/foundation/sleep'; import { type AztecKVStore } from '@aztec/kv-store'; import { openTmpStore } from '@aztec/kv-store/lmdb'; -import { expect, jest } from '@jest/globals'; +import { expect } from '@jest/globals'; +import { type MockProxy, mock } from 'jest-mock-extended'; import { type EpochProofQuotePool, type P2PService } from '../index.js'; import { type AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js'; @@ -14,59 +15,31 @@ import { type MemPools } from '../mem_pools/interface.js'; import { type TxPool } from '../mem_pools/tx_pool/index.js'; import { P2PClient } from './p2p_client.js'; -/** - * Mockify helper for testing purposes. - */ -type Mockify = { - [P in keyof T]: ReturnType; -}; - describe('In-Memory P2P Client', () => { - let txPool: Mockify; - let attestationPool: Mockify; - let epochProofQuotePool: Mockify; + let txPool: MockProxy; + let attestationPool: MockProxy; + let epochProofQuotePool: MockProxy; let mempools: MemPools; let blockSource: MockL2BlockSource; - let p2pService: Mockify; + let p2pService: MockProxy; let kvStore: AztecKVStore; let client: P2PClient; beforeEach(() => { - txPool = { - addTxs: jest.fn(), - getTxByHash: jest.fn().mockReturnValue(undefined), - deleteTxs: jest.fn(), - getAllTxs: jest.fn().mockReturnValue([]), - getAllTxHashes: jest.fn().mockReturnValue([]), - getMinedTxHashes: jest.fn().mockReturnValue([]), - getPendingTxHashes: jest.fn().mockReturnValue([]), - getTxStatus: jest.fn().mockReturnValue(undefined), - markAsMined: jest.fn(), - markMinedAsPending: jest.fn(), - }; + txPool = mock({ + getAllTxs: () => [], + getAllTxHashes: () => [], + getPendingTxHashes: () => [], + getMinedTxHashes: () => [], + }); - p2pService = { - start: jest.fn(), - stop: jest.fn(), - propagate: jest.fn(), - registerBlockReceivedCallback: jest.fn(), - sendRequest: jest.fn(), - getEnr: jest.fn(), - }; + p2pService = mock(); - attestationPool = { - addAttestations: jest.fn(), - deleteAttestations: jest.fn(), - deleteAttestationsForSlot: jest.fn(), - deleteAttestationsOlderThan: jest.fn(), - getAttestationsForSlot: jest.fn().mockReturnValue(undefined), - }; + attestationPool = mock(); - epochProofQuotePool = { - addQuote: jest.fn(), - getQuotes: jest.fn().mockReturnValue([]), - deleteQuotesToEpoch: jest.fn(), - }; + epochProofQuotePool = mock({ + getQuotes: () => [], + }); blockSource = new MockL2BlockSource(); blockSource.createBlocks(100); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index fdb5f5b23a5c..071543ebbe8f 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -8,6 +8,8 @@ import { L2BlockStream, type L2BlockStreamEvent, type L2Tips, + type P2PApi, + type PeerInfo, type Tx, type TxHash, } from '@aztec/circuit-types'; @@ -54,7 +56,7 @@ export interface P2PSyncState { /** * Interface of a P2P client. **/ -export interface P2P { +export interface P2P extends P2PApi { /** * Broadcasts a block proposal to other peers. * @@ -62,15 +64,6 @@ export interface P2P { */ broadcastProposal(proposal: BlockProposal): void; - /** - * Queries the Attestation pool for attestations for the given slot - * - * @param slot - the slot to query - * @param proposalId - the proposal id to query - * @returns BlockAttestations - */ - getAttestationsForSlot(slot: bigint, proposalId: string): Promise; - /** * Queries the EpochProofQuote pool for quotes for the given epoch * @@ -122,12 +115,6 @@ export interface P2P { **/ deleteTxs(txHashes: TxHash[]): Promise; - /** - * Returns all transactions in the transaction pool. - * @returns An array of Txs. - */ - getTxs(filter: 'all' | 'pending' | 'mined'): Tx[]; - /** * Returns a transaction in the transaction pool by its hash. * @param txHash - Hash of tx to return. @@ -173,9 +160,12 @@ export interface P2P { getStatus(): Promise; /** - * Returns the ENR for this node, if any. + * Returns the ENR of this node, if any. */ getEnr(): ENR | undefined; + + /** Identifies a p2p client. */ + isP2PClient(): true; } /** @@ -245,6 +235,14 @@ export class P2PClient extends WithTracer implements P2P { this.epochProofQuotePool = mempools.epochProofQuotePool; } + public isP2PClient(): true { + return true; + } + + public getPeers(): Promise { + return Promise.resolve(this.p2pService.getPeers()); + } + public getL2BlockHash(number: number): Promise { return Promise.resolve(this.synchedBlockHashes.get(number)); } @@ -444,6 +442,10 @@ export class P2PClient extends WithTracer implements P2P { return tx; } + public getPendingTxs(): Promise { + return Promise.resolve(this.getTxs('pending')); + } + /** * Returns all transactions in the transaction pool. * @returns An array of Txs. @@ -514,6 +516,10 @@ export class P2PClient extends WithTracer implements P2P { return this.p2pService.getEnr(); } + public getEncodedEnr(): Promise { + return Promise.resolve(this.p2pService.getEnr()?.encodeTxt()); + } + /** * Deletes the 'txs' from the pool. * NOT used if we use sendTx as reconcileTxPool will handle this. diff --git a/yarn-project/p2p/src/service/dummy_service.ts b/yarn-project/p2p/src/service/dummy_service.ts index be5b82d9c8e8..e108cd47ac67 100644 --- a/yarn-project/p2p/src/service/dummy_service.ts +++ b/yarn-project/p2p/src/service/dummy_service.ts @@ -1,4 +1,4 @@ -import type { BlockAttestation, BlockProposal, Gossipable, TxHash } from '@aztec/circuit-types'; +import type { BlockAttestation, BlockProposal, Gossipable, PeerInfo, TxHash } from '@aztec/circuit-types'; import type { PeerId } from '@libp2p/interface'; import EventEmitter from 'events'; @@ -10,6 +10,11 @@ import { type P2PService, type PeerDiscoveryService, PeerDiscoveryState } from ' * A dummy implementation of the P2P Service. */ export class DummyP2PService implements P2PService { + /** Returns an empty array for peers. */ + getPeers(): PeerInfo[] { + return []; + } + /** * Starts the dummy implementation. * @returns A resolved promise. diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index ab249a951167..244430def067 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -6,6 +6,7 @@ import { type Gossipable, type L2BlockSource, MerkleTreeId, + type PeerInfo, type RawGossipMessage, TopicType, TopicTypeMap, @@ -310,6 +311,10 @@ export class LibP2PService extends WithTracer implements P2PService { ); } + public getPeers(): PeerInfo[] { + return this.peerManager.getPeers(); + } + /** * Send Request via the ReqResp service * The subprotocol defined will determine the request and response types diff --git a/yarn-project/p2p/src/service/peer_manager.ts b/yarn-project/p2p/src/service/peer_manager.ts index 8c229bac2444..5656884ece8d 100644 --- a/yarn-project/p2p/src/service/peer_manager.ts +++ b/yarn-project/p2p/src/service/peer_manager.ts @@ -1,3 +1,4 @@ +import { type PeerInfo } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; import { type ENR } from '@chainsafe/enr'; @@ -28,7 +29,7 @@ export class PeerManager { private libP2PNode: PubSubLibp2p, private peerDiscoveryService: PeerDiscoveryService, private config: P2PConfig, - private logger = createLogger('p2p:peer_manager'), + private logger = createLogger('p2p:peer-manager'), ) { this.peerScoring = new PeerScoring(config); // Handle new established connections @@ -74,6 +75,35 @@ export class PeerManager { return this.peerScoring.getScore(peerId); } + public getPeers(): PeerInfo[] { + const connected = this.libP2PNode + .getPeers() + .map(peer => ({ id: peer.toString(), score: this.getPeerScore(peer.toString()), status: 'connected' as const })); + + const dialQueue = this.libP2PNode + .getDialQueue() + .filter(peer => !!peer.peerId) + .map(peer => ({ + id: peer.peerId!.toString(), + status: 'dialing' as const, + dialStatus: peer.status, + addresses: peer.multiaddrs.map(m => m.toString()), + })); + + const cachedPeers = Array.from(this.cachedPeers.values()) + .filter(peer => !dialQueue.some(dialPeer => dialPeer.id && peer.peerId.toString() === dialPeer.id.toString())) + .filter(peer => !connected.some(connPeer => connPeer.id.toString() === peer.peerId.toString())) + .map(peer => ({ + status: 'cached' as const, + id: peer.peerId.toString(), + addresses: [peer.multiaddrTcp.toString()], + dialAttempts: peer.dialAttempts, + enr: peer.enr.encodeTxt(), + })); + + return [...connected, ...dialQueue, ...cachedPeers]; + } + /** * Discovers peers. */ @@ -145,7 +175,7 @@ export class PeerManager { // throw if no tcp addr in multiaddr if (!multiaddrTcp) { - this.logger.warn(`No TCP address in discovered node's multiaddr ${enr.encodeTxt()}`); + this.logger.debug(`No TCP address in discovered node's multiaddr ${enr.encodeTxt()}`); return; } const connections = this.libP2PNode.getConnections(); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts index 67436bad9874..7a08de566809 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts @@ -223,7 +223,7 @@ describe('Req Resp p2p client integration', () => { // We want to create a set of nodes and request transaction from them const clients = await createClients(NUMBER_OF_PEERS, /*valid proofs*/ false); const [client1, client2] = clients; - const client2PeerId = (await client2.getEnr()?.peerId())!; + const client2PeerId = client2.getEnr()!; // Give the nodes time to discover each other await sleep(6000); diff --git a/yarn-project/p2p/src/service/service.ts b/yarn-project/p2p/src/service/service.ts index ce486e0b2bba..db3bbc4b577e 100644 --- a/yarn-project/p2p/src/service/service.ts +++ b/yarn-project/p2p/src/service/service.ts @@ -1,4 +1,4 @@ -import type { BlockAttestation, BlockProposal, Gossipable } from '@aztec/circuit-types'; +import type { BlockAttestation, BlockProposal, Gossipable, PeerInfo } from '@aztec/circuit-types'; import type { ENR } from '@chainsafe/enr'; import type { PeerId } from '@libp2p/interface'; @@ -49,6 +49,8 @@ export interface P2PService { registerBlockReceivedCallback(callback: (block: BlockProposal) => Promise): void; getEnr(): ENR | undefined; + + getPeers(): PeerInfo[]; } /** diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 51cad0f12275..bd87117f6fa7 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -18,6 +18,7 @@ import { compact } from '@aztec/foundation/collection'; import { sha256 } from '@aztec/foundation/crypto'; import { createLogger } from '@aztec/foundation/log'; import { type Maybe } from '@aztec/foundation/types'; +import { type P2P } from '@aztec/p2p'; import { type L1Publisher } from '@aztec/sequencer-client'; import { PublicProcessorFactory } from '@aztec/simulator'; import { type TelemetryClient } from '@aztec/telemetry-client'; @@ -78,6 +79,14 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr this.metrics = new ProverNodeMetrics(telemetryClient, 'ProverNode'); } + public getP2P() { + const asP2PClient = this.coordination as P2P; + if (asP2PClient.isP2PClient && asP2PClient.isP2PClient()) { + return asP2PClient; + } + return undefined; + } + async handleClaim(proofClaim: EpochProofClaim): Promise { if (proofClaim.epochToProve === this.latestEpochWeAreProving) { this.log.verbose(`Already proving claim for epoch ${proofClaim.epochToProve}`); diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts index 5acbbd261f6a..aeb73d79725b 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.test.ts @@ -208,7 +208,7 @@ describe('sequencer', () => { tx.data.constants.txContext.chainId = chainId; const txHash = tx.getTxHash(); - p2p.getTxs.mockReturnValueOnce([tx]); + p2p.getPendingTxs.mockResolvedValueOnce([tx]); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -240,7 +240,7 @@ describe('sequencer', () => { const tx = mockTxForRollup(); tx.data.constants.txContext.chainId = chainId; - p2p.getTxs.mockReturnValueOnce([tx]); + p2p.getPendingTxs.mockResolvedValueOnce([tx]); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -262,7 +262,7 @@ describe('sequencer', () => { tx.data.constants.txContext.chainId = chainId; const txHash = tx.getTxHash(); - p2p.getTxs.mockReturnValue([tx]); + p2p.getPendingTxs.mockResolvedValue([tx]); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -307,7 +307,7 @@ describe('sequencer', () => { const doubleSpendTx = txs[doubleSpendTxIndex]; - p2p.getTxs.mockReturnValueOnce(txs); + p2p.getPendingTxs.mockResolvedValueOnce(txs); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -341,7 +341,7 @@ describe('sequencer', () => { const invalidChainTx = txs[invalidChainTxIndex]; const validTxHashes = txs.filter((_, i) => i !== invalidChainTxIndex).map(tx => tx.getTxHash()); - p2p.getTxs.mockReturnValueOnce(txs); + p2p.getPendingTxs.mockResolvedValueOnce(txs); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -370,7 +370,7 @@ describe('sequencer', () => { }); const validTxHashes = txs.filter((_, i) => i !== invalidTransactionIndex).map(tx => tx.getTxHash()); - p2p.getTxs.mockReturnValueOnce(txs); + p2p.getPendingTxs.mockResolvedValueOnce(txs); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -407,19 +407,19 @@ describe('sequencer', () => { sequencer.updateConfig({ minTxsPerBlock: 4 }); // block is not built with 0 txs - p2p.getTxs.mockReturnValueOnce([]); - //p2p.getTxs.mockReturnValueOnce(txs.slice(0, 4)); + p2p.getPendingTxs.mockResolvedValueOnce([]); + //p2p.getPendingTxs.mockResolvedValueOnce(txs.slice(0, 4)); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); // block is not built with 3 txs - p2p.getTxs.mockReturnValueOnce(txs.slice(0, 3)); + p2p.getPendingTxs.mockResolvedValueOnce(txs.slice(0, 3)); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); // block is built with 4 txs - p2p.getTxs.mockReturnValueOnce(txs.slice(0, 4)); + p2p.getPendingTxs.mockResolvedValueOnce(txs.slice(0, 4)); const txHashes = txs.slice(0, 4).map(tx => tx.getTxHash()); await sequencer.doRealWork(); @@ -448,12 +448,12 @@ describe('sequencer', () => { sequencer.updateConfig({ minTxsPerBlock: 4 }); // block is not built with 0 txs - p2p.getTxs.mockReturnValueOnce([]); + p2p.getPendingTxs.mockResolvedValueOnce([]); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); // block is not built with 3 txs - p2p.getTxs.mockReturnValueOnce(txs.slice(0, 3)); + p2p.getPendingTxs.mockResolvedValueOnce(txs.slice(0, 3)); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); @@ -461,7 +461,7 @@ describe('sequencer', () => { sequencer.flush(); // block is built with 0 txs - p2p.getTxs.mockReturnValueOnce([]); + p2p.getPendingTxs.mockResolvedValueOnce([]); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(1); expect(blockBuilder.startNewBlock).toHaveBeenCalledWith( @@ -489,12 +489,12 @@ describe('sequencer', () => { sequencer.updateConfig({ minTxsPerBlock: 4 }); // block is not built with 0 txs - p2p.getTxs.mockReturnValueOnce([]); + p2p.getPendingTxs.mockResolvedValueOnce([]); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); // block is not built with 3 txs - p2p.getTxs.mockReturnValueOnce(txs.slice(0, 3)); + p2p.getPendingTxs.mockResolvedValueOnce(txs.slice(0, 3)); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(0); @@ -503,7 +503,7 @@ describe('sequencer', () => { // block is built with 3 txs const postFlushTxs = txs.slice(0, 3); - p2p.getTxs.mockReturnValueOnce(postFlushTxs); + p2p.getPendingTxs.mockResolvedValueOnce(postFlushTxs); const postFlushTxHashes = postFlushTxs.map(tx => tx.getTxHash()); await sequencer.doRealWork(); expect(blockBuilder.startNewBlock).toHaveBeenCalledTimes(1); @@ -521,7 +521,7 @@ describe('sequencer', () => { const tx = mockTxForRollup(); tx.data.constants.txContext.chainId = chainId; - p2p.getTxs.mockReturnValueOnce([tx]); + p2p.getPendingTxs.mockResolvedValueOnce([tx]); blockBuilder.setBlockCompleted.mockResolvedValue(block); publisher.proposeL2Block.mockResolvedValueOnce(true); @@ -597,7 +597,7 @@ describe('sequencer', () => { tx.data.constants.txContext.chainId = chainId; txHash = tx.getTxHash(); - p2p.getTxs.mockReturnValue([tx]); + p2p.getPendingTxs.mockResolvedValue([tx]); blockBuilder.setBlockCompleted.mockResolvedValue(block); }; diff --git a/yarn-project/sequencer-client/src/sequencer/sequencer.ts b/yarn-project/sequencer-client/src/sequencer/sequencer.ts index 618984a69ce4..f1df58e18b04 100644 --- a/yarn-project/sequencer-client/src/sequencer/sequencer.ts +++ b/yarn-project/sequencer-client/src/sequencer/sequencer.ts @@ -289,7 +289,7 @@ export class Sequencer { this.setState(SequencerState.WAITING_FOR_TXS, slot); // Get txs to build the new block. - const pendingTxs = this.p2pClient.getTxs('pending'); + const pendingTxs = await this.p2pClient.getPendingTxs(); if (!this.shouldProposeBlock(historicalHeader, { pendingTxsCount: pendingTxs.length })) { return; From f8b2476008d33c344b50e46248ed4faf7d18db29 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Mon, 9 Dec 2024 16:06:52 -0300 Subject: [PATCH 03/11] Default pretty logs to singleline --- yarn-project/foundation/src/config/env_var.ts | 1 + yarn-project/foundation/src/log/pino-logger.ts | 2 ++ 2 files changed, 3 insertions(+) diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index 41e2d61b49f6..8f6bd085fd34 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -56,6 +56,7 @@ export type EnvVar = | 'L2_QUEUE_SIZE' | 'LOG_ELAPSED_TIME' | 'LOG_JSON' + | 'LOG_MULTILINE' | 'LOG_LEVEL' | 'MNEMONIC' | 'NETWORK_NAME' diff --git a/yarn-project/foundation/src/log/pino-logger.ts b/yarn-project/foundation/src/log/pino-logger.ts index 10b19086939c..db1b39075847 100644 --- a/yarn-project/foundation/src/log/pino-logger.ts +++ b/yarn-project/foundation/src/log/pino-logger.ts @@ -77,7 +77,9 @@ const pinoPrettyOpts = { customLevels: 'fatal:60,error:50,warn:40,info:30,verbose:25,debug:20,trace:10', customColors: 'fatal:bgRed,error:red,warn:yellow,info:green,verbose:magenta,debug:blue,trace:gray', minimumLevel: 'trace' as const, + singleLine: !['1', 'true'].includes(process.env.LOG_MULTILINE ?? ''), }; + const prettyTransport: pino.TransportSingleOptions = { target: 'pino-pretty', options: pinoPrettyOpts, From 89625bc7c6bc66a8eb4fc04f9b0e9d994a2e8ae8 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Mon, 9 Dec 2024 16:07:37 -0300 Subject: [PATCH 04/11] More logs tweaks --- yarn-project/p2p/src/client/index.ts | 3 +++ yarn-project/p2p/src/client/p2p_client.ts | 3 ++- .../p2p/src/service/discV5_service.ts | 8 +++---- yarn-project/p2p/src/service/peer_manager.ts | 21 +++++++++++-------- yarn-project/prover-node/src/prover-node.ts | 13 ++++++++---- 5 files changed, 29 insertions(+), 19 deletions(-) diff --git a/yarn-project/p2p/src/client/index.ts b/yarn-project/p2p/src/client/index.ts index 9340b9a25749..509e5d51614d 100644 --- a/yarn-project/p2p/src/client/index.ts +++ b/yarn-project/p2p/src/client/index.ts @@ -35,6 +35,7 @@ export const createP2PClient = async ( } = {}, ) => { let config = { ..._config }; + const logger = createLogger('p2p'); const store = deps.store ?? (await createStore('p2p', config, createLogger('p2p:lmdb'))); const mempools: MemPools = { @@ -46,6 +47,7 @@ export const createP2PClient = async ( let p2pService; if (_config.p2pEnabled) { + logger.verbose('P2P is enabled. Using LibP2P service.'); config = await configureP2PClientAddresses(_config); // Create peer discovery service @@ -65,6 +67,7 @@ export const createP2PClient = async ( telemetry, ); } else { + logger.verbose('P2P is disabled. Using dummy P2P service'); p2pService = new DummyP2PService(); } return new P2PClient(store, l2BlockSource, mempools, p2pService, config.keepProvenTxsInPoolFor, telemetry); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 071543ebbe8f..87e985a96032 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -715,8 +715,9 @@ export class P2PClient extends WithTracer implements P2P { * @param newState - New state value. */ private setCurrentState(newState: P2PClientState) { + const oldState = this.currentState; this.currentState = newState; - this.log.debug(`Moved to state ${P2PClientState[this.currentState]}`); + this.log.debug(`Moved from state ${P2PClientState[oldState]} to ${P2PClientState[this.currentState]}`); } private async publishStoredTxs() { diff --git a/yarn-project/p2p/src/service/discV5_service.ts b/yarn-project/p2p/src/service/discV5_service.ts index 5e79c7c50aff..5b2ceffcd4c9 100644 --- a/yarn-project/p2p/src/service/discV5_service.ts +++ b/yarn-project/p2p/src/service/discV5_service.ts @@ -88,14 +88,11 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService metricsRegistry, }); - this.logger.verbose(`DiscV5 ENR NodeId: ${this.enr.nodeId}`); - this.logger.info(`ENR UDP: ${multiAddrUdp.toString()}`); - (this.discv5 as Discv5EventEmitter).on('discovered', (enr: ENR) => this.onDiscovered(enr)); (this.discv5 as Discv5EventEmitter).on('enrAdded', async (enr: ENR) => { const multiAddrTcp = await enr.getFullMultiaddr('tcp'); const multiAddrUdp = await enr.getFullMultiaddr('udp'); - this.logger.debug(`Added ENR`, { multiAddrTcp, multiAddrUdp, nodeId: enr.nodeId }); + this.logger.debug(`Added ENR ${enr.encodeTxt()}`, { multiAddrTcp, multiAddrUdp, nodeId: enr.nodeId }); this.onDiscovered(enr); }); } @@ -110,8 +107,9 @@ export class DiscV5Service extends EventEmitter implements PeerDiscoveryService this.logger.info(`DiscV5 service started`, { nodeId: this.enr.nodeId, - listenUdp: this.listenMultiAddrUdp, peerId: this.peerId, + enrUdp: await this.enr.getFullMultiaddr('udp'), + enrTcp: await this.enr.getFullMultiaddr('tcp'), }); this.currentState = PeerDiscoveryState.RUNNING; diff --git a/yarn-project/p2p/src/service/peer_manager.ts b/yarn-project/p2p/src/service/peer_manager.ts index 5656884ece8d..511915c6a891 100644 --- a/yarn-project/p2p/src/service/peer_manager.ts +++ b/yarn-project/p2p/src/service/peer_manager.ts @@ -4,6 +4,7 @@ import { createLogger } from '@aztec/foundation/log'; import { type ENR } from '@chainsafe/enr'; import { type PeerId } from '@libp2p/interface'; import { type Multiaddr } from '@multiformats/multiaddr'; +import { inspect } from 'util'; import { type P2PConfig } from '../config.js'; import { type PubSubLibp2p } from '../util.js'; @@ -115,7 +116,7 @@ export class PeerManager { const peersToConnect = this.config.maxPeerCount - connections.length; const logLevel = this.heartbeatCounter % 60 === 0 ? 'info' : 'debug'; - this.logger[logLevel](`P2P peers status`, { + this.logger[logLevel](`Connected to ${connections.length} peers`, { connections: connections.length, maxPeerCount: this.config.maxPeerCount, cachedPeers: this.cachedPeers.size, @@ -156,7 +157,7 @@ export class PeerManager { // if we need more peers, start randomNodesQuery if (peersToConnect > 0) { - this.logger.debug(`Running random nodes query to connect to ${peersToConnect} peers`); + this.logger.trace(`Running random nodes query to connect to ${peersToConnect} peers`); void this.peerDiscoveryService.runRandomNodesQuery(); } } @@ -171,7 +172,9 @@ export class PeerManager { // check if peer is already connected const [peerId, multiaddrTcp] = await Promise.all([enr.peerId(), enr.getFullMultiaddr('tcp')]); - this.logger.debug(`Handling discovered peer ${peerId.toString()} at ${multiaddrTcp?.toString()}`); + this.logger.trace( + `Handling discovered peer ${peerId.toString()} at ${multiaddrTcp?.toString() ?? 'undefined address'}`, + ); // throw if no tcp addr in multiaddr if (!multiaddrTcp) { @@ -180,14 +183,14 @@ export class PeerManager { } const connections = this.libP2PNode.getConnections(); if (connections.some(conn => conn.remotePeer.equals(peerId))) { - this.logger.debug(`Already connected to peer ${peerId.toString()}`); + this.logger.trace(`Already connected to peer ${peerId.toString()}`); return; } // check if peer is already in cache const id = peerId.toString(); if (this.cachedPeers.has(id)) { - this.logger.debug(`Peer already in cache ${id}`); + this.logger.trace(`Peer already in cache ${id}`); return; } @@ -203,7 +206,7 @@ export class PeerManager { if (this.shouldDialPeer()) { void this.dialPeer(cachedPeer); } else { - this.logger.debug(`Caching peer ${id}`); + this.logger.trace(`Caching peer ${id}`); this.cachedPeers.set(id, cachedPeer); // Prune set of cached peers this.pruneCachedPeers(); @@ -217,13 +220,13 @@ export class PeerManager { this.logger.debug(`Dialing peer ${id}`); try { await this.libP2PNode.dial(peer.multiaddrTcp); - } catch { + } catch (error) { peer.dialAttempts++; if (peer.dialAttempts < MAX_DIAL_ATTEMPTS) { - this.logger.debug(`Failed to dial peer ${id} (attempt ${peer.dialAttempts})`); + this.logger.trace(`Failed to dial peer ${id} (attempt ${peer.dialAttempts})`, { error: inspect(error) }); this.cachedPeers.set(id, peer); } else { - this.logger.debug(`Failed to dial peer ${id} (dropping)`); + this.logger.debug(`Failed to dial peer ${id} (dropping)`, { error: inspect(error) }); this.cachedPeers.delete(id); } } diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index bd87117f6fa7..0e4b6f40b7ce 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -80,11 +80,16 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr } public getP2P() { - const asP2PClient = this.coordination as P2P; - if (asP2PClient.isP2PClient && asP2PClient.isP2PClient()) { - return asP2PClient; + // TODO(palla): Remove try/catch once #10544 lands + try { + const asP2PClient = this.coordination as P2P; + if (typeof asP2PClient.isP2PClient === 'function' && asP2PClient.isP2PClient()) { + return asP2PClient; + } + return undefined; + } catch { + return undefined; } - return undefined; } async handleClaim(proofClaim: EpochProofClaim): Promise { From e5165e1e8949bb85d0ca9f6467a8ce4d40237108 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Mon, 9 Dec 2024 18:36:32 -0300 Subject: [PATCH 05/11] chore: Remove Proxy from json rpc client Using a proxy as a json rpc client means that dynamic checks for functions in the returned object fail. For instance, `typeof pxe.getTransactions` returns true even if the method is not part of the pxe schema, since the proxy creates a fake function for every single property requested. But then it fails when we try to invoke it. Since we now have schemas, we can drop usage of the proxy and just create an object with exactly the methods we need. --- .../json-rpc/client/safe_json_rpc_client.ts | 18 +++++------------- .../src/json-rpc/test/integration.test.ts | 6 ++---- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts b/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts index 8949ca3e2eac..106042592992 100644 --- a/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts +++ b/yarn-project/foundation/src/json-rpc/client/safe_json_rpc_client.ts @@ -44,18 +44,10 @@ export function createSafeJsonRpcClient( return (schema as ApiSchema)[methodName].returnType().parse(res.result); }; - // Intercept any RPC methods with a proxy - const proxy = new Proxy( - {}, - { - get: (target, method: string) => { - if (['then', 'catch'].includes(method)) { - return Reflect.get(target, method); - } - return (...params: any[]) => request(method, params); - }, - }, - ) as T; + const proxy: any = {}; + for (const method of Object.keys(schema)) { + proxy[method] = (...params: any[]) => request(method, params); + } - return proxy; + return proxy as T; } diff --git a/yarn-project/foundation/src/json-rpc/test/integration.test.ts b/yarn-project/foundation/src/json-rpc/test/integration.test.ts index b48d0f8d7a56..d1611d0acde0 100644 --- a/yarn-project/foundation/src/json-rpc/test/integration.test.ts +++ b/yarn-project/foundation/src/json-rpc/test/integration.test.ts @@ -118,10 +118,8 @@ describe('JsonRpc integration', () => { await expect(() => client.fail()).rejects.toThrow('Test state failed'); }); - it('fails if calls non-existing method in handler', async () => { - await expect(() => (client as TestState).forceClear()).rejects.toThrow( - 'Unspecified method forceClear in client schema', - ); + it('fails if calls non-existing method in handler', () => { + expect(() => (client as TestState).forceClear()).toThrow(/not a function/i); }); }); From 351523cebd74ef9b1ae7d924656cc82246620632 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Mon, 9 Dec 2024 18:45:34 -0300 Subject: [PATCH 06/11] Fix p2p check in prover-node --- yarn-project/prover-node/src/prover-node.ts | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 0e4b6f40b7ce..b02b1d45fd19 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -80,16 +80,11 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr } public getP2P() { - // TODO(palla): Remove try/catch once #10544 lands - try { - const asP2PClient = this.coordination as P2P; - if (typeof asP2PClient.isP2PClient === 'function' && asP2PClient.isP2PClient()) { - return asP2PClient; - } - return undefined; - } catch { - return undefined; + const asP2PClient = this.coordination as P2P; + if (typeof asP2PClient.isP2PClient === 'function' && asP2PClient.isP2PClient()) { + return asP2PClient; } + return undefined; } async handleClaim(proofClaim: EpochProofClaim): Promise { From 4ac396fb7c9bf0b56d8d5ab867cb62b5cb5060c8 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Mon, 9 Dec 2024 18:53:23 -0300 Subject: [PATCH 07/11] Fix --- yarn-project/foundation/src/collection/array.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn-project/foundation/src/collection/array.ts b/yarn-project/foundation/src/collection/array.ts index dc1af0328841..b2ac6bec8733 100644 --- a/yarn-project/foundation/src/collection/array.ts +++ b/yarn-project/foundation/src/collection/array.ts @@ -151,7 +151,7 @@ export function median(arr: number[]) { if (arr.length === 0) { return undefined; } - const sorted = arr.toSorted((a, b) => a - b); + const sorted = [...arr].sort((a, b) => a - b); const mid = Math.floor(sorted.length / 2); return sorted.length % 2 !== 0 ? sorted[mid] : (sorted[mid - 1] + sorted[mid]) / 2; } From 2498ee72cc838d2d61e2c85cb3a5c9e63a2440f7 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Tue, 10 Dec 2024 09:18:51 -0300 Subject: [PATCH 08/11] Add includePending flag to getPeers --- yarn-project/circuit-types/src/interfaces/p2p.test.ts | 8 +++++++- yarn-project/circuit-types/src/interfaces/p2p.ts | 4 ++-- yarn-project/p2p/src/client/p2p_client.ts | 4 ++-- yarn-project/p2p/src/service/libp2p_service.ts | 4 ++-- yarn-project/p2p/src/service/peer_manager.ts | 6 +++++- yarn-project/p2p/src/service/service.ts | 2 +- 6 files changed, 19 insertions(+), 9 deletions(-) diff --git a/yarn-project/circuit-types/src/interfaces/p2p.test.ts b/yarn-project/circuit-types/src/interfaces/p2p.test.ts index f8a962e025bd..d9eb0cc654a0 100644 --- a/yarn-project/circuit-types/src/interfaces/p2p.test.ts +++ b/yarn-project/circuit-types/src/interfaces/p2p.test.ts @@ -52,6 +52,11 @@ describe('P2PApiSchema', () => { const peers = await context.client.getPeers(); expect(peers).toEqual(peers); }); + + it('getPeers(true)', async () => { + const peers = await context.client.getPeers(true); + expect(peers).toEqual(peers); + }); }); const peers: PeerInfo[] = [ @@ -76,7 +81,8 @@ class MockP2P implements P2PApi { getEncodedEnr(): Promise { return Promise.resolve('enr'); } - getPeers(): Promise { + getPeers(includePending?: boolean): Promise { + expect(includePending === undefined || includePending === true).toBeTruthy(); return Promise.resolve(peers); } } diff --git a/yarn-project/circuit-types/src/interfaces/p2p.ts b/yarn-project/circuit-types/src/interfaces/p2p.ts index e50a8bde866f..00fa526899d3 100644 --- a/yarn-project/circuit-types/src/interfaces/p2p.ts +++ b/yarn-project/circuit-types/src/interfaces/p2p.ts @@ -56,7 +56,7 @@ export interface P2PApi { /** * Returns info for all connected, dialing, and cached peers. */ - getPeers(): Promise; + getPeers(includePending?: boolean): Promise; } export const P2PApiSchema: ApiSchemaFor = { @@ -67,5 +67,5 @@ export const P2PApiSchema: ApiSchemaFor = { getEpochProofQuotes: z.function().args(schemas.BigInt).returns(z.array(EpochProofQuote.schema)), getPendingTxs: z.function().returns(z.array(Tx.schema)), getEncodedEnr: z.function().returns(z.string().optional()), - getPeers: z.function().returns(z.array(PeerInfoSchema)), + getPeers: z.function().args(optional(z.boolean())).returns(z.array(PeerInfoSchema)), }; diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 87e985a96032..9884d4b4e3fe 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -239,8 +239,8 @@ export class P2PClient extends WithTracer implements P2P { return true; } - public getPeers(): Promise { - return Promise.resolve(this.p2pService.getPeers()); + public getPeers(includePending?: boolean): Promise { + return Promise.resolve(this.p2pService.getPeers(includePending)); } public getL2BlockHash(number: number): Promise { diff --git a/yarn-project/p2p/src/service/libp2p_service.ts b/yarn-project/p2p/src/service/libp2p_service.ts index 244430def067..b0f10aec8bfc 100644 --- a/yarn-project/p2p/src/service/libp2p_service.ts +++ b/yarn-project/p2p/src/service/libp2p_service.ts @@ -311,8 +311,8 @@ export class LibP2PService extends WithTracer implements P2PService { ); } - public getPeers(): PeerInfo[] { - return this.peerManager.getPeers(); + public getPeers(includePending?: boolean): PeerInfo[] { + return this.peerManager.getPeers(includePending); } /** diff --git a/yarn-project/p2p/src/service/peer_manager.ts b/yarn-project/p2p/src/service/peer_manager.ts index 511915c6a891..44f01f2f473c 100644 --- a/yarn-project/p2p/src/service/peer_manager.ts +++ b/yarn-project/p2p/src/service/peer_manager.ts @@ -76,11 +76,15 @@ export class PeerManager { return this.peerScoring.getScore(peerId); } - public getPeers(): PeerInfo[] { + public getPeers(includePending = false): PeerInfo[] { const connected = this.libP2PNode .getPeers() .map(peer => ({ id: peer.toString(), score: this.getPeerScore(peer.toString()), status: 'connected' as const })); + if (!includePending) { + return connected; + } + const dialQueue = this.libP2PNode .getDialQueue() .filter(peer => !!peer.peerId) diff --git a/yarn-project/p2p/src/service/service.ts b/yarn-project/p2p/src/service/service.ts index db3bbc4b577e..d668d6ca441b 100644 --- a/yarn-project/p2p/src/service/service.ts +++ b/yarn-project/p2p/src/service/service.ts @@ -50,7 +50,7 @@ export interface P2PService { getEnr(): ENR | undefined; - getPeers(): PeerInfo[]; + getPeers(includePending?: boolean): PeerInfo[]; } /** From a14808fe94666951b3bf196e49921e7c8d922a9b Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Tue, 10 Dec 2024 09:19:45 -0300 Subject: [PATCH 09/11] Downgrade logs to trace --- yarn-project/p2p/src/service/peer_manager.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn-project/p2p/src/service/peer_manager.ts b/yarn-project/p2p/src/service/peer_manager.ts index 44f01f2f473c..7f12a4679bcb 100644 --- a/yarn-project/p2p/src/service/peer_manager.ts +++ b/yarn-project/p2p/src/service/peer_manager.ts @@ -239,7 +239,7 @@ export class PeerManager { private shouldDialPeer(): boolean { const connections = this.libP2PNode.getConnections().length; if (connections >= this.config.maxPeerCount) { - this.logger.debug( + this.logger.trace( `Not dialing peer due to max peer count of ${this.config.maxPeerCount} reached (${connections} current connections)`, ); return false; @@ -256,7 +256,7 @@ export class PeerManager { // Remove the oldest peers for (const key of this.cachedPeers.keys()) { this.cachedPeers.delete(key); - this.logger.debug(`Pruning peer ${key} from cache`); + this.logger.trace(`Pruning peer ${key} from cache`); peersToDelete--; if (peersToDelete <= 0) { break; From 5726eee0442c433130e24c1d8fa269099c26a751 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Tue, 10 Dec 2024 12:18:54 -0300 Subject: [PATCH 10/11] Fix p2p unit tests --- yarn-project/p2p/src/client/p2p_client.test.ts | 16 +++++++--------- .../service/reqresp/reqresp.integration.test.ts | 2 +- .../p2p/src/service/reqresp/reqresp.test.ts | 12 +++++++----- yarn-project/p2p/src/service/reqresp/reqresp.ts | 2 +- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index e09fef66999f..5d926dec4810 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -26,20 +26,18 @@ describe('In-Memory P2P Client', () => { let client: P2PClient; beforeEach(() => { - txPool = mock({ - getAllTxs: () => [], - getAllTxHashes: () => [], - getPendingTxHashes: () => [], - getMinedTxHashes: () => [], - }); + txPool = mock(); + txPool.getAllTxs.mockReturnValue([]); + txPool.getPendingTxHashes.mockReturnValue([]); + txPool.getMinedTxHashes.mockReturnValue([]); + txPool.getAllTxHashes.mockReturnValue([]); p2pService = mock(); attestationPool = mock(); - epochProofQuotePool = mock({ - getQuotes: () => [], - }); + epochProofQuotePool = mock(); + epochProofQuotePool.getQuotes.mockReturnValue([]); blockSource = new MockL2BlockSource(); blockSource.createBlocks(100); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts index 7a08de566809..739da0d5a094 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.integration.test.ts @@ -223,7 +223,7 @@ describe('Req Resp p2p client integration', () => { // We want to create a set of nodes and request transaction from them const clients = await createClients(NUMBER_OF_PEERS, /*valid proofs*/ false); const [client1, client2] = clients; - const client2PeerId = client2.getEnr()!; + const client2PeerId = await client2.getEnr()!.peerId(); // Give the nodes time to discover each other await sleep(6000); diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts index 349b3a8f6b50..f237a86cb5fc 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts @@ -114,7 +114,7 @@ describe('ReqResp', () => { expect(loggerSpy).toHaveBeenCalledWith(errorMessage); }); - describe('TX REQ PROTOCOL', () => { + describe('Tx req protocol', () => { it('Can request a Tx from TxHash', async () => { const tx = mockTx(); const txHash = tx.getTxHash(); @@ -181,10 +181,12 @@ describe('ReqResp', () => { expect(res).toBeUndefined(); // Make sure the error message is logged - const errorMessage = `${ - new IndiviualReqRespTimeoutError().message - } | peerId: ${nodes[1].p2p.peerId.toString()} | subProtocol: ${TX_REQ_PROTOCOL}`; - expect(loggerSpy).toHaveBeenCalledWith(errorMessage); + const peerId = nodes[1].p2p.peerId.toString(); + expect(loggerSpy).toHaveBeenCalledWith( + expect.stringMatching(/Error sending request to peer/i), + expect.any(Error), + { peerId, subProtocol: '/aztec/req/tx/0.1.0' }, + ); // Expect the peer to be penalized for timing out expect(peerManager.penalizePeer).toHaveBeenCalledWith( diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.ts b/yarn-project/p2p/src/service/reqresp/reqresp.ts index da6b3e87b53e..86aea3ee7817 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.ts @@ -210,7 +210,7 @@ export class ReqResp { return result; } catch (e: any) { - this.logger.error(`Error sending request to peer`, e, { peerId, subProtocol }); + this.logger.error(`Error sending request to peer`, e, { peerId: peerId.toString(), subProtocol }); this.peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError); } finally { if (stream) { From faf9ce2f03540e4469bde5e2a3df1890f18536ba Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Tue, 10 Dec 2024 12:36:45 -0300 Subject: [PATCH 11/11] Lint --- yarn-project/p2p/src/service/reqresp/reqresp.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts index f237a86cb5fc..f7091c2d5eb5 100644 --- a/yarn-project/p2p/src/service/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/service/reqresp/reqresp.test.ts @@ -4,7 +4,7 @@ import { sleep } from '@aztec/foundation/sleep'; import { describe, expect, it, jest } from '@jest/globals'; import { type MockProxy, mock } from 'jest-mock-extended'; -import { CollectiveReqRespTimeoutError, IndiviualReqRespTimeoutError } from '../../errors/reqresp.error.js'; +import { CollectiveReqRespTimeoutError } from '../../errors/reqresp.error.js'; import { MOCK_SUB_PROTOCOL_HANDLERS, MOCK_SUB_PROTOCOL_VALIDATORS,