diff --git a/yarn-project/p2p/src/mocks/index.ts b/yarn-project/p2p/src/mocks/index.ts index 6f598a5f1d65..7788db5e73f0 100644 --- a/yarn-project/p2p/src/mocks/index.ts +++ b/yarn-project/p2p/src/mocks/index.ts @@ -26,7 +26,7 @@ import { type BootnodeConfig, type P2PConfig } from '../config.js'; import { type MemPools } from '../mem_pools/interface.js'; import { DiscV5Service } from '../services/discv5/discV5_service.js'; import { LibP2PService } from '../services/libp2p/libp2p_service.js'; -import { type PeerManager } from '../services/peer_manager.js'; +import { type PeerScoring } from '../services/peer-manager/peer_scoring.js'; import { type P2PReqRespConfig } from '../services/reqresp/config.js'; import { ReqRespSubProtocol, @@ -34,8 +34,7 @@ import { type ReqRespSubProtocolValidators, noopValidator, } from '../services/reqresp/interface.js'; -import { pingHandler } from '../services/reqresp/protocols/ping.js'; -import { statusHandler } from '../services/reqresp/protocols/status.js'; +import { pingHandler, statusHandler } from '../services/reqresp/protocols/index.js'; import { ReqResp } from '../services/reqresp/reqresp.js'; import { type PubSubLibp2p } from '../util.js'; @@ -153,6 +152,7 @@ export const MOCK_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = { [ReqRespSubProtocol.PING]: pingHandler, [ReqRespSubProtocol.STATUS]: statusHandler, [ReqRespSubProtocol.TX]: (_msg: any) => Promise.resolve(Buffer.from('tx')), + [ReqRespSubProtocol.GOODBYE]: (_msg: any) => Promise.resolve(Buffer.from('goodbye')), }; // By default, all requests are valid @@ -161,14 +161,15 @@ export const MOCK_SUB_PROTOCOL_VALIDATORS: ReqRespSubProtocolValidators = { [ReqRespSubProtocol.PING]: noopValidator, [ReqRespSubProtocol.STATUS]: noopValidator, [ReqRespSubProtocol.TX]: noopValidator, + [ReqRespSubProtocol.GOODBYE]: noopValidator, }; /** * @param numberOfNodes - the number of nodes to create * @returns An array of the created nodes */ -export const createNodes = async (peerManager: PeerManager, numberOfNodes: number): Promise => { - return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp(peerManager))); +export const createNodes = async (peerScoring: PeerScoring, numberOfNodes: number): Promise => { + return await Promise.all(Array.from({ length: numberOfNodes }, () => createReqResp(peerScoring))); }; export const startNodes = async ( @@ -191,13 +192,13 @@ export const stopNodes = async (nodes: ReqRespNode[]): Promise => { }; // Create a req resp node, exposing the underlying p2p node -export const createReqResp = async (peerManager: PeerManager): Promise => { +export const createReqResp = async (peerScoring: PeerScoring): Promise => { const p2p = await createLibp2pNode(); const config: P2PReqRespConfig = { overallRequestTimeoutMs: 4000, individualRequestTimeoutMs: 2000, }; - const req = new ReqResp(config, p2p, peerManager); + const req = new ReqResp(config, p2p, peerScoring); return { p2p, req, diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index e302fe2cdb0a..7394469bb28f 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -56,14 +56,10 @@ import { import { type PubSubLibp2p, convertToMultiaddr } from '../../util.js'; import { AztecDatastore } from '../data_store.js'; import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from '../encoding.js'; -import { PeerManager } from '../peer_manager.js'; -import { - DEFAULT_SUB_PROTOCOL_HANDLERS, - DEFAULT_SUB_PROTOCOL_VALIDATORS, - ReqRespSubProtocol, - type ReqRespSubProtocolHandlers, - type SubProtocolMap, -} from '../reqresp/interface.js'; +import { PeerManager } from '../peer-manager/peer_manager.js'; +import { PeerScoring } from '../peer-manager/peer_scoring.js'; +import { DEFAULT_SUB_PROTOCOL_VALIDATORS, ReqRespSubProtocol, type SubProtocolMap } from '../reqresp/interface.js'; +import { reqGoodbyeHandler } from '../reqresp/protocols/goodbye.js'; import { pingHandler, statusHandler } from '../reqresp/protocols/index.js'; import { reqRespTxHandler } from '../reqresp/protocols/tx.js'; import { ReqResp } from '../reqresp/reqresp.js'; @@ -115,21 +111,32 @@ export class LibP2PService extends WithTracer implement private peerDiscoveryService: PeerDiscoveryService, private mempools: MemPools, private l2BlockSource: L2BlockSource, - private epochCache: EpochCache, + epochCache: EpochCache, private proofVerifier: ClientProtocolCircuitVerifier, private worldStateSynchronizer: WorldStateSynchronizer, - private telemetry: TelemetryClient, - private requestResponseHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS, + telemetry: TelemetryClient, private logger = createLogger('p2p:libp2p_service'), ) { super(telemetry, 'LibP2PService'); - this.peerManager = new PeerManager(node, peerDiscoveryService, config, telemetry, logger); + const peerScoring = new PeerScoring(config); + this.reqresp = new ReqResp(config, node, peerScoring); + + this.peerManager = new PeerManager( + node, + peerDiscoveryService, + config, + telemetry, + logger, + peerScoring, + this.reqresp, + ); + + // Update gossipsub score params this.node.services.pubsub.score.params.appSpecificScore = (peerId: string) => { return this.peerManager.getPeerScore(peerId); }; this.node.services.pubsub.score.params.appSpecificWeight = 10; - this.reqresp = new ReqResp(config, node, this.peerManager); this.attestationValidator = new AttestationValidator(epochCache); this.blockProposalValidator = new BlockProposalValidator(epochCache); @@ -143,95 +150,6 @@ export class LibP2PService extends WithTracer implement }; } - /** - * Starts the LibP2P service. - * @returns An empty promise. - */ - public async start() { - // Check if service is already started - if (this.node.status === 'started') { - throw new Error('P2P service already started'); - } - - // Get listen & announce addresses for logging - const { tcpListenAddress, tcpAnnounceAddress } = this.config; - if (!tcpAnnounceAddress) { - throw new Error('Announce address not provided.'); - } - const announceTcpMultiaddr = convertToMultiaddr(tcpAnnounceAddress, 'tcp'); - - // Start job queue, peer discovery service and libp2p node - this.jobQueue.start(); - await this.peerDiscoveryService.start(); - await this.node.start(); - - // Subscribe to standard GossipSub topics by default - for (const topic of getTopicTypeForClientType(this.clientType)) { - this.subscribeToTopic(TopicTypeMap[topic].p2pTopic); - } - - // Add p2p topic validators - // As they are stored within a kv pair, there is no need to register them conditionally - // based on the client type - const topicValidators = { - [Tx.p2pTopic]: this.validatePropagatedTxFromMessage.bind(this), - [BlockAttestation.p2pTopic]: this.validatePropagatedAttestationFromMessage.bind(this), - [BlockProposal.p2pTopic]: this.validatePropagatedBlockFromMessage.bind(this), - [EpochProofQuote.p2pTopic]: this.validatePropagatedEpochProofQuoteFromMessage.bind(this), - }; - for (const [topic, validator] of Object.entries(topicValidators)) { - this.node.services.pubsub.topicValidators.set(topic, validator); - } - - // add GossipSub listener - this.node.services.pubsub.addEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this)); - - // Start running promise for peer discovery - this.discoveryRunningPromise = new RunningPromise( - () => this.peerManager.heartbeat(), - this.logger, - this.config.peerCheckIntervalMS, - ); - this.discoveryRunningPromise.start(); - - // Define the sub protocol validators - This is done within this start() method to gain a callback to the existing validateTx function - const reqrespSubProtocolValidators = { - ...DEFAULT_SUB_PROTOCOL_VALIDATORS, - [ReqRespSubProtocol.TX]: this.validateRequestedTx.bind(this), - }; - await this.reqresp.start(this.requestResponseHandlers, reqrespSubProtocolValidators); - this.logger.info(`Started P2P service`, { - listen: tcpListenAddress, - announce: announceTcpMultiaddr, - peerId: this.node.peerId.toString(), - }); - } - - /** - * Stops the LibP2P service. - * @returns An empty promise. - */ - public async stop() { - // Remove gossip sub listener - this.node.services.pubsub.removeEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this)); - - // Stop peer manager - this.logger.debug('Stopping peer manager...'); - this.peerManager.stop(); - - this.logger.debug('Stopping job queue...'); - await this.jobQueue.end(); - this.logger.debug('Stopping running promise...'); - await this.discoveryRunningPromise?.stop(); - this.logger.debug('Stopping peer discovery service...'); - await this.peerDiscoveryService.stop(); - this.logger.debug('Request response service stopped...'); - await this.reqresp.stop(); - this.logger.debug('Stopping LibP2P...'); - await this.stopLibP2P(); - this.logger.info('LibP2P service stopped'); - } - /** * Creates an instance of the LibP2P service. * @param config - The configuration to use when creating the service. @@ -334,15 +252,6 @@ export class LibP2PService extends WithTracer implement }, }); - // Create request response protocol handlers - const txHandler = reqRespTxHandler(mempools); - - const requestResponseHandlers = { - [ReqRespSubProtocol.PING]: pingHandler, - [ReqRespSubProtocol.STATUS]: statusHandler, - [ReqRespSubProtocol.TX]: txHandler, - }; - return new LibP2PService( clientType, config, @@ -354,10 +263,109 @@ export class LibP2PService extends WithTracer implement proofVerifier, worldStateSynchronizer, telemetry, - requestResponseHandlers, ); } + /** + * Starts the LibP2P service. + * @returns An empty promise. + */ + public async start() { + // Check if service is already started + if (this.node.status === 'started') { + throw new Error('P2P service already started'); + } + + // Get listen & announce addresses for logging + const { tcpListenAddress, tcpAnnounceAddress } = this.config; + if (!tcpAnnounceAddress) { + throw new Error('Announce address not provided.'); + } + const announceTcpMultiaddr = convertToMultiaddr(tcpAnnounceAddress, 'tcp'); + + // Start job queue, peer discovery service and libp2p node + this.jobQueue.start(); + await this.peerDiscoveryService.start(); + await this.node.start(); + + // Subscribe to standard GossipSub topics by default + for (const topic of getTopicTypeForClientType(this.clientType)) { + this.subscribeToTopic(TopicTypeMap[topic].p2pTopic); + } + + // Create request response protocol handlers + const txHandler = reqRespTxHandler(this.mempools); + const goodbyeHandler = reqGoodbyeHandler(this.peerManager); + + const requestResponseHandlers = { + [ReqRespSubProtocol.PING]: pingHandler, + [ReqRespSubProtocol.STATUS]: statusHandler, + [ReqRespSubProtocol.TX]: txHandler.bind(this), + [ReqRespSubProtocol.GOODBYE]: goodbyeHandler.bind(this), + }; + + // Add p2p topic validators + // As they are stored within a kv pair, there is no need to register them conditionally + // based on the client type + const topicValidators = { + [Tx.p2pTopic]: this.validatePropagatedTxFromMessage.bind(this), + [BlockAttestation.p2pTopic]: this.validatePropagatedAttestationFromMessage.bind(this), + [BlockProposal.p2pTopic]: this.validatePropagatedBlockFromMessage.bind(this), + [EpochProofQuote.p2pTopic]: this.validatePropagatedEpochProofQuoteFromMessage.bind(this), + }; + for (const [topic, validator] of Object.entries(topicValidators)) { + this.node.services.pubsub.topicValidators.set(topic, validator); + } + + // add GossipSub listener + this.node.services.pubsub.addEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this)); + + // Start running promise for peer discovery + this.discoveryRunningPromise = new RunningPromise( + () => this.peerManager.heartbeat(), + this.logger, + this.config.peerCheckIntervalMS, + ); + this.discoveryRunningPromise.start(); + + // Define the sub protocol validators - This is done within this start() method to gain a callback to the existing validateTx function + const reqrespSubProtocolValidators = { + ...DEFAULT_SUB_PROTOCOL_VALIDATORS, + [ReqRespSubProtocol.TX]: this.validateRequestedTx.bind(this), + }; + await this.reqresp.start(requestResponseHandlers, reqrespSubProtocolValidators); + this.logger.info(`Started P2P service`, { + listen: tcpListenAddress, + announce: announceTcpMultiaddr, + peerId: this.node.peerId.toString(), + }); + } + + /** + * Stops the LibP2P service. + * @returns An empty promise. + */ + public async stop() { + // Remove gossip sub listener + this.node.services.pubsub.removeEventListener(GossipSubEvent.MESSAGE, this.handleGossipSubEvent.bind(this)); + + // Stop peer manager + this.logger.debug('Stopping peer manager...'); + await this.peerManager.stop(); + + this.logger.debug('Stopping job queue...'); + await this.jobQueue.end(); + this.logger.debug('Stopping running promise...'); + await this.discoveryRunningPromise?.stop(); + this.logger.debug('Stopping peer discovery service...'); + await this.peerDiscoveryService.stop(); + this.logger.debug('Request response service stopped...'); + await this.reqresp.stop(); + this.logger.debug('Stopping LibP2P...'); + await this.stopLibP2P(); + this.logger.info('LibP2P service stopped'); + } + public getPeers(includePending?: boolean): PeerInfo[] { return this.peerManager.getPeers(includePending); } diff --git a/yarn-project/p2p/src/services/peer-manager/metrics.ts b/yarn-project/p2p/src/services/peer-manager/metrics.ts new file mode 100644 index 000000000000..7725f22ef7d9 --- /dev/null +++ b/yarn-project/p2p/src/services/peer-manager/metrics.ts @@ -0,0 +1,41 @@ +import { + Attributes, + Metrics, + type TelemetryClient, + type Tracer, + type UpDownCounter, + ValueType, +} from '@aztec/telemetry-client'; + +import { type GoodByeReason, prettyGoodbyeReason } from '../reqresp/protocols/index.js'; + +export class PeerManagerMetrics { + private sentGoodbyes: UpDownCounter; + private receivedGoodbyes: UpDownCounter; + + public readonly tracer: Tracer; + + constructor(public readonly telemetryClient: TelemetryClient, name = 'PeerManager') { + this.tracer = telemetryClient.getTracer(name); + + const meter = telemetryClient.getMeter(name); + this.sentGoodbyes = meter.createUpDownCounter(Metrics.PEER_MANAGER_GOODBYES_SENT, { + description: 'Number of goodbyes sent to peers', + unit: 'peers', + valueType: ValueType.INT, + }); + this.receivedGoodbyes = meter.createUpDownCounter(Metrics.PEER_MANAGER_GOODBYES_RECEIVED, { + description: 'Number of goodbyes received from peers', + unit: 'peers', + valueType: ValueType.INT, + }); + } + + public recordGoodbyeSent(reason: GoodByeReason) { + this.sentGoodbyes.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) }); + } + + public recordGoodbyeReceived(reason: GoodByeReason) { + this.receivedGoodbyes.add(1, { [Attributes.P2P_GOODBYE_REASON]: prettyGoodbyeReason(reason) }); + } +} diff --git a/yarn-project/p2p/src/services/peer_manager.test.ts b/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts similarity index 72% rename from yarn-project/p2p/src/services/peer_manager.test.ts rename to yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts index dda028e647d6..a39a90319500 100644 --- a/yarn-project/p2p/src/services/peer_manager.test.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_manager.test.ts @@ -1,16 +1,19 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; import { sleep } from '@aztec/foundation/sleep'; -import { getTelemetryClient } from '@aztec/telemetry-client'; +import { Attributes, getTelemetryClient } from '@aztec/telemetry-client'; import { type ENR, SignableENR } from '@chainsafe/enr'; import { jest } from '@jest/globals'; import { createSecp256k1PeerId } from '@libp2p/peer-id-factory'; import { multiaddr } from '@multiformats/multiaddr'; -import { getP2PDefaultConfig } from '../config.js'; +import { type P2PConfig, getP2PDefaultConfig } from '../../config.js'; +import { ReqRespSubProtocol } from '../reqresp/interface.js'; +import { GoodByeReason } from '../reqresp/protocols/index.js'; +import { PeerEvent } from '../types.js'; import { PeerManager } from './peer_manager.js'; -import { PeerEvent } from './types.js'; +import { PeerScoring } from './peer_scoring.js'; describe('PeerManager', () => { const mockLibP2PNode: any = { @@ -33,6 +36,12 @@ describe('PeerManager', () => { runRandomNodesQuery: jest.fn(), }; + const mockReqResp: any = { + sendRequestToPeer: jest.fn(), + }; + + let peerScoring: PeerScoring; + let peerManager: PeerManager; // The function provided to the discovery servive callback will be run here let discoveredPeerCallback: (enr: ENR) => Promise; @@ -53,12 +62,15 @@ describe('PeerManager', () => { } }); + peerScoring = new PeerScoring({} as P2PConfig); peerManager = new PeerManager( mockLibP2PNode, mockPeerDiscoveryService, getP2PDefaultConfig(), getTelemetryClient(), createLogger('test'), + peerScoring, + mockReqResp, ); }); @@ -120,6 +132,20 @@ describe('PeerManager', () => { // Verify that discover was called expect(mockPeerDiscoveryService.runRandomNodesQuery).toHaveBeenCalled(); }); + + it('should send goodbye to peers on shutdown', async () => { + const peerId = await createSecp256k1PeerId(); + const peerId2 = await createSecp256k1PeerId(); + mockLibP2PNode.getPeers.mockReturnValue([peerId, peerId2]); + + const goodbyeAndDisconnectPeerSpy = jest.spyOn(peerManager as any, 'goodbyeAndDisconnectPeer'); + + await peerManager.stop(); + + // Both peers were sent goodbyes on shutdown + expect(goodbyeAndDisconnectPeerSpy).toHaveBeenCalledWith(peerId, GoodByeReason.SHUTDOWN); + expect(goodbyeAndDisconnectPeerSpy).toHaveBeenCalledWith(peerId2, GoodByeReason.SHUTDOWN); + }); }); describe('peer timeout functionality', () => { @@ -280,6 +306,7 @@ describe('PeerManager', () => { // Set the peer scores to trigger different states peerManager.penalizePeer(bannedPeerId, PeerErrorSeverity.LowToleranceError); // Will set score below -100 peerManager.penalizePeer(bannedPeerId, PeerErrorSeverity.LowToleranceError); // Additional penalty to ensure banned state + peerManager.penalizePeer(bannedPeerId, PeerErrorSeverity.HighToleranceError); peerManager.penalizePeer(disconnectPeerId, PeerErrorSeverity.LowToleranceError); // Will set score between -100 and -50 peerManager.penalizePeer(disconnectPeerId, PeerErrorSeverity.HighToleranceError); @@ -289,9 +316,20 @@ describe('PeerManager', () => { await sleep(100); - // Verify that hangUp was called for both unhealthy peers + // Verify that hangUp and a goodbye was sent for both unhealthy peers expect(mockLibP2PNode.hangUp).toHaveBeenCalledWith(bannedPeerId); + expect(mockReqResp.sendRequestToPeer).toHaveBeenCalledWith( + bannedPeerId, + ReqRespSubProtocol.GOODBYE, + Buffer.from([GoodByeReason.BANNED]), + ); + expect(mockLibP2PNode.hangUp).toHaveBeenCalledWith(disconnectPeerId); + expect(mockReqResp.sendRequestToPeer).toHaveBeenCalledWith( + disconnectPeerId, + ReqRespSubProtocol.GOODBYE, + Buffer.from([GoodByeReason.DISCONNECTED]), + ); // Verify that hangUp was not called for the healthy peer expect(mockLibP2PNode.hangUp).not.toHaveBeenCalledWith(healthyPeerId); @@ -301,14 +339,61 @@ describe('PeerManager', () => { }); it('should properly clean up peers on stop', async () => { - const enr = await createMockENR(); - await discoveredPeerCallback(enr); + mockLibP2PNode.getPeers.mockReturnValue([await createSecp256k1PeerId(), await createSecp256k1PeerId()]); - peerManager.stop(); + await peerManager.stop(); expect(mockLibP2PNode.removeEventListener).toHaveBeenCalledWith(PeerEvent.CONNECTED, expect.any(Function)); expect(mockLibP2PNode.removeEventListener).toHaveBeenCalledWith(PeerEvent.DISCONNECTED, expect.any(Function)); expect(mockPeerDiscoveryService.off).toHaveBeenCalledWith(PeerEvent.DISCOVERED, expect.any(Function)); + + // Verify that goodbyes were sent to all peers + expect(mockReqResp.sendRequestToPeer).toHaveBeenCalledTimes(2); + }); + }); + + describe('goodbye metrics', () => { + it('should record metrics when receiving goodbye messages', async () => { + const peerId = await createSecp256k1PeerId(); + + // Get reference to the counter's add function + const goodbyeReceivedMetric = jest.spyOn((peerManager as any).metrics.receivedGoodbyes, 'add'); + + // Test receiving goodbye for different reasons + peerManager.goodbyeReceived(peerId, GoodByeReason.BANNED); + expect(goodbyeReceivedMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'banned' }); + + peerManager.goodbyeReceived(peerId, GoodByeReason.DISCONNECTED); + expect(goodbyeReceivedMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'disconnected' }); + + peerManager.goodbyeReceived(peerId, GoodByeReason.SHUTDOWN); + expect(goodbyeReceivedMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'shutdown' }); + }); + + it('should record metrics when sending goodbye messages', async () => { + const peerId = await createSecp256k1PeerId(); + + // Get reference to the counter's add function + const goodbyeSentMetric = jest.spyOn((peerManager as any).metrics.sentGoodbyes, 'add'); + + // Mock connections to include our test peer + mockLibP2PNode.getConnections.mockReturnValue([{ remotePeer: peerId }]); + + // Test sending goodbye for different scenarios + + // Test banned scenario + peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); // Set score below -100 + peerManager.penalizePeer(peerId, PeerErrorSeverity.LowToleranceError); + peerManager.penalizePeer(peerId, PeerErrorSeverity.HighToleranceError); + peerManager.heartbeat(); + expect(goodbyeSentMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'banned' }); + + // Reset mocks + mockLibP2PNode.getPeers.mockReturnValue([{ remotePeer: peerId }]); + + // Test shutdown scenario + await peerManager.stop(); + expect(goodbyeSentMetric).toHaveBeenCalledWith(1, { [Attributes.P2P_GOODBYE_REASON]: 'shutdown' }); }); }); }); diff --git a/yarn-project/p2p/src/services/peer_manager.ts b/yarn-project/p2p/src/services/peer-manager/peer_manager.ts similarity index 84% rename from yarn-project/p2p/src/services/peer_manager.ts rename to yarn-project/p2p/src/services/peer-manager/peer_manager.ts index 59052dfb9adb..6b5b9ba45706 100644 --- a/yarn-project/p2p/src/services/peer_manager.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_manager.ts @@ -1,17 +1,21 @@ import { type PeerErrorSeverity, type PeerInfo } from '@aztec/circuit-types'; import { createLogger } from '@aztec/foundation/log'; -import { type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client'; +import { type TelemetryClient, trackSpan } from '@aztec/telemetry-client'; import { type ENR } from '@chainsafe/enr'; import { type Connection, type PeerId } from '@libp2p/interface'; import { type Multiaddr } from '@multiformats/multiaddr'; import { inspect } from 'util'; -import { type P2PConfig } from '../config.js'; -import { type PubSubLibp2p } from '../util.js'; -import { PeerScoreState, PeerScoring } from './peer-scoring/peer_scoring.js'; -import { type PeerDiscoveryService } from './service.js'; -import { PeerEvent } from './types.js'; +import { type P2PConfig } from '../../config.js'; +import { type PubSubLibp2p } from '../../util.js'; +import { ReqRespSubProtocol } from '../reqresp/interface.js'; +import { GoodByeReason, prettyGoodbyeReason } from '../reqresp/protocols/goodbye.js'; +import { type ReqResp } from '../reqresp/reqresp.js'; +import { type PeerDiscoveryService } from '../service.js'; +import { PeerEvent } from '../types.js'; +import { PeerManagerMetrics } from './metrics.js'; +import { PeerScoreState, type PeerScoring } from './peer_scoring.js'; const MAX_DIAL_ATTEMPTS = 3; const MAX_CACHED_PEERS = 100; @@ -31,23 +35,25 @@ type TimedOutPeer = { timeoutUntilMs: number; }; -export class PeerManager extends WithTracer { +export class PeerManager { private cachedPeers: Map = new Map(); - private peerScoring: PeerScoring; private heartbeatCounter: number = 0; private displayPeerCountsPeerHeartbeat: number = 0; private timedOutPeers: Map = new Map(); + private metrics: PeerManagerMetrics; + constructor( private libP2PNode: PubSubLibp2p, private peerDiscoveryService: PeerDiscoveryService, private config: P2PConfig, telemetryClient: TelemetryClient, private logger = createLogger('p2p:peer-manager'), + private peerScoring: PeerScoring, + private reqresp: ReqResp, ) { - super(telemetryClient, 'PeerManager'); + this.metrics = new PeerManagerMetrics(telemetryClient, 'PeerManager'); - this.peerScoring = new PeerScoring(config); // Handle new established connections this.libP2PNode.addEventListener(PeerEvent.CONNECTED, this.handleConnectedPeerEvent.bind(this)); // Handle lost connections @@ -60,6 +66,10 @@ export class PeerManager extends WithTracer { this.displayPeerCountsPeerHeartbeat = Math.floor(60_000 / this.config.peerCheckIntervalMS); } + get tracer() { + return this.metrics.tracer; + } + @trackSpan('PeerManager.heartbeat') public heartbeat() { this.heartbeatCounter++; @@ -113,11 +123,23 @@ export class PeerManager extends WithTracer { } } + /** + * Handles a goodbye received from a peer. + * + * Used as the reqresp handler when a peer sends us goodbye message. + * @param peerId - The peer ID. + * @param reason - The reason for the goodbye. + */ + public goodbyeReceived(peerId: PeerId, reason: GoodByeReason) { + this.logger.debug(`Goodbye received from peer ${peerId.toString()} with reason ${prettyGoodbyeReason(reason)}`); + + this.metrics.recordGoodbyeReceived(reason); + + void this.disconnectPeer(peerId); + } + public penalizePeer(peerId: PeerId, penalty: PeerErrorSeverity) { - const id = peerId.toString(); - const penaltyValue = this.peerScoring.peerPenalties[penalty]; - const newScore = this.peerScoring.updateScore(id, -penaltyValue); - this.logger.verbose(`Penalizing peer ${id} with ${penalty} (new score is ${newScore})`); + this.peerScoring.penalizePeer(peerId, penalty); } public getPeerScore(peerId: string): number { @@ -227,10 +249,11 @@ export class PeerManager extends WithTracer { for (const peer of connections) { const score = this.peerScoring.getScoreState(peer.remotePeer.toString()); switch (score) { - // TODO: add goodbye and give reasons case PeerScoreState.Banned: + void this.goodbyeAndDisconnectPeer(peer.remotePeer, GoodByeReason.BANNED); + break; case PeerScoreState.Disconnect: - void this.disconnectPeer(peer.remotePeer); + void this.goodbyeAndDisconnectPeer(peer.remotePeer, GoodByeReason.DISCONNECTED); break; case PeerScoreState.Healthy: connectedHealthyPeers.push(peer); @@ -240,10 +263,26 @@ export class PeerManager extends WithTracer { return connectedHealthyPeers; } - // TODO: send a goodbye with a reason to the peer + private async goodbyeAndDisconnectPeer(peer: PeerId, reason: GoodByeReason) { + this.logger.debug(`Disconnecting peer ${peer.toString()} with reason ${prettyGoodbyeReason(reason)}`); + + this.metrics.recordGoodbyeSent(reason); + + try { + await this.reqresp.sendRequestToPeer(peer, ReqRespSubProtocol.GOODBYE, Buffer.from([reason])); + } catch (error) { + this.logger.debug(`Failed to send goodbye to peer ${peer.toString()}: ${error}`); + } finally { + await this.disconnectPeer(peer); + } + } + private async disconnectPeer(peer: PeerId) { - this.logger.debug(`Disconnecting peer ${peer.toString()}`); - await this.libP2PNode.hangUp(peer); + try { + await this.libP2PNode.hangUp(peer); + } catch (error) { + this.logger.debug(`Failed to disconnect peer ${peer.toString()}`, { error: inspect(error) }); + } } /** @@ -281,7 +320,7 @@ export class PeerManager extends WithTracer { } // check if peer is already connected const connections = this.libP2PNode.getConnections(); - if (connections.some(conn => conn.remotePeer.equals(peerId))) { + if (connections.some((conn: Connection) => conn.remotePeer.equals(peerId))) { this.logger.trace(`Already connected to peer ${peerId}`); return; } @@ -371,10 +410,16 @@ export class PeerManager extends WithTracer { * Stops the peer manager. * Removing all event listeners. */ - public stop() { + public async stop() { + this.peerDiscoveryService.off(PeerEvent.DISCOVERED, this.handleDiscoveredPeer); + + // Send goodbyes to all peers + await Promise.all( + this.libP2PNode.getPeers().map(peer => this.goodbyeAndDisconnectPeer(peer, GoodByeReason.SHUTDOWN)), + ); + this.libP2PNode.removeEventListener(PeerEvent.CONNECTED, this.handleConnectedPeerEvent); this.libP2PNode.removeEventListener(PeerEvent.DISCONNECTED, this.handleDisconnectedPeerEvent); - this.peerDiscoveryService.off(PeerEvent.DISCOVERED, this.handleDiscoveredPeer); } } diff --git a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.test.ts b/yarn-project/p2p/src/services/peer-manager/peer_scoring.test.ts similarity index 100% rename from yarn-project/p2p/src/services/peer-scoring/peer_scoring.test.ts rename to yarn-project/p2p/src/services/peer-manager/peer_scoring.test.ts diff --git a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts b/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts similarity index 81% rename from yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts rename to yarn-project/p2p/src/services/peer-manager/peer_scoring.ts index 34233435b865..ffc1b65501f0 100644 --- a/yarn-project/p2p/src/services/peer-scoring/peer_scoring.ts +++ b/yarn-project/p2p/src/services/peer-manager/peer_scoring.ts @@ -1,5 +1,8 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { median } from '@aztec/foundation/collection'; +import { createLogger } from '@aztec/foundation/log'; + +import { type PeerId } from '@libp2p/interface'; import { type P2PConfig } from '../../config.js'; @@ -20,6 +23,7 @@ const MIN_SCORE_BEFORE_BAN = -100; const MIN_SCORE_BEFORE_DISCONNECT = -50; export class PeerScoring { + private logger = createLogger('p2p:peer-scoring'); private scores: Map = new Map(); private lastUpdateTime: Map = new Map(); private decayInterval = 1000 * 60; // 1 minute @@ -38,6 +42,14 @@ export class PeerScoring { }; } + public penalizePeer(peerId: PeerId, penalty: PeerErrorSeverity) { + const id = peerId.toString(); + const penaltyValue = this.peerPenalties[penalty]; + const newScore = this.updateScore(id, -penaltyValue); + this.logger.verbose(`Penalizing peer ${id} with ${penalty} (new score is ${newScore})`); + return newScore; + } + updateScore(peerId: string, scoreDelta: number): number { const currentTime = Date.now(); const lastUpdate = this.lastUpdateTime.get(peerId) || currentTime; @@ -75,12 +87,13 @@ export class PeerScoring { return this.scores.get(peerId) || 0; } - getScoreState(peerId: string) { - // TODO: permanently store banned peers??? + public getScoreState(peerId: string): PeerScoreState { + // TODO(#11329): permanently store banned peers? const score = this.getScore(peerId); if (score < MIN_SCORE_BEFORE_BAN) { return PeerScoreState.Banned; - } else if (score < MIN_SCORE_BEFORE_DISCONNECT) { + } + if (score < MIN_SCORE_BEFORE_DISCONNECT) { return PeerScoreState.Disconnect; } return PeerScoreState.Healthy; diff --git a/yarn-project/p2p/src/services/reqresp/interface.ts b/yarn-project/p2p/src/services/reqresp/interface.ts index 13efa4f055e9..43e5b9a0ecdb 100644 --- a/yarn-project/p2p/src/services/reqresp/interface.ts +++ b/yarn-project/p2p/src/services/reqresp/interface.ts @@ -5,21 +5,23 @@ import { type PeerId } from '@libp2p/interface'; /* * Request Response Sub Protocols */ -const PING_PROTOCOL = '/aztec/req/ping/0.1.0'; -const STATUS_PROTOCOL = '/aztec/req/status/0.1.0'; -const TX_PROTOCOL = '/aztec/req/tx/0.1.0'; +export const PING_PROTOCOL = '/aztec/req/ping/0.1.0'; +export const STATUS_PROTOCOL = '/aztec/req/status/0.1.0'; +export const GOODBYE_PROTOCOL = '/aztec/req/goodbye/0.1.0'; +export const TX_REQ_PROTOCOL = '/aztec/req/tx/0.1.0'; export enum ReqRespSubProtocol { PING = PING_PROTOCOL, STATUS = STATUS_PROTOCOL, - TX = TX_PROTOCOL, + GOODBYE = GOODBYE_PROTOCOL, + TX = TX_REQ_PROTOCOL, } /** * A handler for a sub protocol * The message will arrive as a buffer, and the handler must return a buffer */ -export type ReqRespSubProtocolHandler = (msg: Buffer) => Promise; +export type ReqRespSubProtocolHandler = (peerId: PeerId, msg: Buffer) => Promise; /** * A type mapping from supprotocol to it's rate limits @@ -72,6 +74,7 @@ export const DEFAULT_SUB_PROTOCOL_VALIDATORS: ReqRespSubProtocolValidators = { [ReqRespSubProtocol.PING]: noopValidator, [ReqRespSubProtocol.STATUS]: noopValidator, [ReqRespSubProtocol.TX]: noopValidator, + [ReqRespSubProtocol.GOODBYE]: noopValidator, }; /** @@ -97,6 +100,7 @@ export const DEFAULT_SUB_PROTOCOL_HANDLERS: ReqRespSubProtocolHandlers = { [ReqRespSubProtocol.PING]: defaultHandler, [ReqRespSubProtocol.STATUS]: defaultHandler, [ReqRespSubProtocol.TX]: defaultHandler, + [ReqRespSubProtocol.GOODBYE]: defaultHandler, }; /** @@ -150,4 +154,8 @@ export const subProtocolMap: SubProtocolMap = { request: TxHash, response: Tx, }, + [ReqRespSubProtocol.GOODBYE]: { + request: RequestableBuffer, + response: RequestableBuffer, + }, }; diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.test.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.test.ts new file mode 100644 index 000000000000..20554469e3db --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.test.ts @@ -0,0 +1,15 @@ +import { GoodByeReason, decodeGoodbyeReason, encodeGoodbyeReason } from './goodbye.js'; + +describe('goodbye', () => { + it('should encode and decode goodbye reason', () => { + const reason = GoodByeReason.SHUTDOWN; + const encoded = encodeGoodbyeReason(reason); + const decoded = decodeGoodbyeReason(encoded); + expect(decoded).toBe(reason); + }); + + it('should return unknown if the goodbye reason buffer length is invalid', () => { + const invalidBuffer = Buffer.from([0x1, 0x2]); + expect(decodeGoodbyeReason(invalidBuffer)).toBe(GoodByeReason.UNKNOWN); + }); +}); diff --git a/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts new file mode 100644 index 000000000000..888e9e8d2cd5 --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/protocols/goodbye.ts @@ -0,0 +1,101 @@ +import { createLogger } from '@aztec/foundation/log'; + +import { type PeerId } from '@libp2p/interface'; + +import { type PeerManager } from '../../peer-manager/peer_manager.js'; +import { ReqRespSubProtocol, type ReqRespSubProtocolHandler } from '../interface.js'; +import { type ReqResp } from '../reqresp.js'; + +/** + * Enum defining the possible reasons for a goodbye message. + */ +export enum GoodByeReason { + /** The peer has shutdown, will be received whenever a peer's node is routinely stopped */ + SHUTDOWN = 0x1, + /** Whenever the peer must disconnect due to maintaining max peers */ + DISCONNECTED = 0x2, + /** The peer has a low score, will be received whenever a peer's score is low */ + LOW_SCORE = 0x3, + /** The peer has been banned, will be received whenever a peer is banned */ + BANNED = 0x4, + /** Wrong network / fork */ + WRONG_NETWORK = 0x5, + /** Unknown reason */ + UNKNOWN = 0x6, +} + +export function encodeGoodbyeReason(reason: GoodByeReason): Buffer { + return Buffer.from([reason]); +} + +export function decodeGoodbyeReason(buffer: Buffer): GoodByeReason { + try { + if (buffer.length !== 1) { + throw new Error('Invalid goodbye reason buffer length'); + } + return buffer[0] as GoodByeReason; + } catch (error) { + return GoodByeReason.UNKNOWN; + } +} + +/** + * Pretty prints the goodbye reason. + * @param reason - The goodbye reason. + * @returns The pretty printed goodbye reason. + */ +export function prettyGoodbyeReason(reason: GoodByeReason): string { + switch (reason) { + case GoodByeReason.SHUTDOWN: + return 'shutdown'; + case GoodByeReason.DISCONNECTED: + return 'disconnected'; + case GoodByeReason.LOW_SCORE: + return 'low_score'; + case GoodByeReason.BANNED: + return 'banned'; + // TODO(#11328): implement + case GoodByeReason.WRONG_NETWORK: + return 'wrong_network'; + case GoodByeReason.UNKNOWN: + return 'unknown'; + } +} + +/** + * Handles a goodbye message request + */ +export class GoodbyeProtocolHandler { + private logger = createLogger('p2p:goodbye-protocol'); + + constructor(private reqresp: ReqResp) {} + + public async sendGoodbye(peerId: PeerId, reason: GoodByeReason): Promise { + try { + await this.reqresp.sendRequestToPeer(peerId, ReqRespSubProtocol.GOODBYE, Buffer.from([reason])); + this.logger.debug(`Sent goodbye to peer ${peerId.toString()} with reason ${reason}`); + } catch (error) { + this.logger.debug(`Failed to send goodbye to peer ${peerId.toString()}: ${error}`); + } + } +} + +/** + * Handles the goodbye request. In request response, the goodbye request is handled by the peer manager. + * + * @dev Implemented as returning a function as the function is bound in the libp2p service, however + * its implementation is here to keep functionality together. + * + * @param peerManager - The peer manager. + * @returns A resolved promise with the goodbye response. + */ +export function reqGoodbyeHandler(peerManager: PeerManager): ReqRespSubProtocolHandler { + return (peerId: PeerId, _msg: Buffer) => { + const reason = decodeGoodbyeReason(_msg); + + peerManager.goodbyeReceived(peerId, reason); + + // Return a buffer of length 1 as an acknowledgement + return Promise.resolve(Buffer.from([0x0])); + }; +} diff --git a/yarn-project/p2p/src/services/reqresp/protocols/index.ts b/yarn-project/p2p/src/services/reqresp/protocols/index.ts index 51902f46f3d1..ffc009fe37a5 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/index.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/index.ts @@ -4,3 +4,4 @@ export * from './ping.js'; export * from './status.js'; export * from './tx.js'; +export * from './goodbye.js'; diff --git a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts index 9aabc5a81851..415cf4293c65 100644 --- a/yarn-project/p2p/src/services/reqresp/protocols/tx.ts +++ b/yarn-project/p2p/src/services/reqresp/protocols/tx.ts @@ -1,7 +1,10 @@ import { type P2PClientType } from '@aztec/circuit-types'; import { TxHash } from '@aztec/circuit-types/tx_hash'; +import { type PeerId } from '@libp2p/interface'; + import { type MemPools } from '../../../mem_pools/interface.js'; +import { type ReqRespSubProtocolHandler } from '../interface.js'; /** * We want to keep the logic of the req resp handler in this file, but we do not have a reference to the mempools here @@ -11,13 +14,13 @@ import { type MemPools } from '../../../mem_pools/interface.js'; * @param mempools - the mempools * @returns the tx response message */ -export function reqRespTxHandler(mempools: MemPools): (msg: Buffer) => Promise { +export function reqRespTxHandler(mempools: MemPools): ReqRespSubProtocolHandler { /** * Handler for tx requests * @param msg - the tx request message * @returns the tx response message */ - return (msg: Buffer) => { + return (_peerId: PeerId, msg: Buffer) => { const txHash = TxHash.fromBuffer(msg); const foundTx = mempools.txPool.getTxByHash(txHash); const buf = foundTx ? foundTx.toBuffer() : Buffer.alloc(0); diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/index.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/index.ts similarity index 100% rename from yarn-project/p2p/src/services/reqresp/rate_limiter/index.ts rename to yarn-project/p2p/src/services/reqresp/rate-limiter/index.ts diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.test.ts similarity index 93% rename from yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts rename to yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.test.ts index 08bff68bec9c..68c6e044f3fc 100644 --- a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.test.ts +++ b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.test.ts @@ -4,7 +4,7 @@ import { jest } from '@jest/globals'; import { type PeerId } from '@libp2p/interface'; import { type MockProxy, mock } from 'jest-mock-extended'; -import { type PeerManager } from '../../peer_manager.js'; +import { type PeerScoring } from '../../peer-manager/peer_scoring.js'; import { ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js'; import { RequestResponseRateLimiter } from './rate_limiter.js'; @@ -24,7 +24,7 @@ const makePeer = (id: string): PeerId => { describe('rate limiter', () => { let rateLimiter: RequestResponseRateLimiter; - let peerManager: MockProxy; + let peerScoring: MockProxy; beforeEach(() => { jest.useFakeTimers(); @@ -43,9 +43,9 @@ describe('rate limiter', () => { }, } as ReqRespSubProtocolRateLimits; // force type as we will not provide descriptions of all protocols - peerManager = mock(); + peerScoring = mock(); - rateLimiter = new RequestResponseRateLimiter(peerManager, config); + rateLimiter = new RequestResponseRateLimiter(peerScoring, config); }); afterEach(() => { @@ -77,7 +77,7 @@ describe('rate limiter', () => { expect(rateLimiter.allow(ReqRespSubProtocol.TX, peerId)).toBe(false); // Spy on the peer manager and check that penalizePeer is called - expect(peerManager.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.MidToleranceError); + expect(peerScoring.penalizePeer).toHaveBeenCalledWith(peerId, PeerErrorSeverity.MidToleranceError); }); it('Should allow requests within the global limit', () => { @@ -137,7 +137,7 @@ describe('rate limiter', () => { }, }, } as ReqRespSubProtocolRateLimits; - const multiProtocolRateLimiter = new RequestResponseRateLimiter(peerManager, config); + const multiProtocolRateLimiter = new RequestResponseRateLimiter(peerScoring, config); const peerId = makePeer('peer1'); @@ -157,7 +157,7 @@ describe('rate limiter', () => { }); it('Should allow requests if no rate limiter is configured', () => { - const rateLimiter = new RequestResponseRateLimiter(peerManager, {} as ReqRespSubProtocolRateLimits); + const rateLimiter = new RequestResponseRateLimiter(peerScoring, {} as ReqRespSubProtocolRateLimits); expect(rateLimiter.allow(ReqRespSubProtocol.TX, makePeer('peer1'))).toBe(true); }); diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts similarity index 97% rename from yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts rename to yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts index 495aea8bac24..5477b65d295e 100644 --- a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limiter.ts +++ b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts @@ -7,7 +7,7 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { type PeerId } from '@libp2p/interface'; -import { type PeerManager } from '../../peer_manager.js'; +import { type PeerScoring } from '../../peer-manager/peer_scoring.js'; import { type ReqRespSubProtocol, type ReqRespSubProtocolRateLimits } from '../interface.js'; import { DEFAULT_RATE_LIMITS } from './rate_limits.js'; @@ -169,7 +169,7 @@ export class RequestResponseRateLimiter { private cleanupInterval: NodeJS.Timeout | undefined = undefined; - constructor(private peerManager: PeerManager, rateLimits: ReqRespSubProtocolRateLimits = DEFAULT_RATE_LIMITS) { + constructor(private peerScoring: PeerScoring, rateLimits: ReqRespSubProtocolRateLimits = DEFAULT_RATE_LIMITS) { this.subProtocolRateLimiters = new Map(); for (const [subProtocol, protocolLimits] of Object.entries(rateLimits)) { @@ -200,7 +200,7 @@ export class RequestResponseRateLimiter { switch (rateLimitStatus) { case RateLimitStatus.DeniedPeer: - this.peerManager.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); + this.peerScoring.penalizePeer(peerId, PeerErrorSeverity.MidToleranceError); return false; case RateLimitStatus.DeniedGlobal: return false; diff --git a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limits.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts similarity index 80% rename from yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limits.ts rename to yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts index 11c46c595519..e0c2dbdb6e44 100644 --- a/yarn-project/p2p/src/services/reqresp/rate_limiter/rate_limits.ts +++ b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts @@ -32,4 +32,14 @@ export const DEFAULT_RATE_LIMITS: ReqRespSubProtocolRateLimits = { quotaCount: 10, }, }, + [ReqRespSubProtocol.GOODBYE]: { + peerLimit: { + quotaTimeMs: 1000, + quotaCount: 5, + }, + globalLimit: { + quotaTimeMs: 1000, + quotaCount: 10, + }, + }, }; diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index 86a66f59d4ea..c5b0fd43e5bd 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -2,6 +2,7 @@ import { PeerErrorSeverity, TxHash, mockTx } from '@aztec/circuit-types'; import { sleep } from '@aztec/foundation/sleep'; import { describe, expect, it, jest } from '@jest/globals'; +import { type PeerId } from '@libp2p/interface'; import { type MockProxy, mock } from 'jest-mock-extended'; import { CollectiveReqRespTimeoutError, IndividualReqRespTimeoutError } from '../../errors/reqresp.error.js'; @@ -14,8 +15,10 @@ import { startNodes, stopNodes, } from '../../mocks/index.js'; -import { type PeerManager } from '../peer_manager.js'; +import { type PeerManager } from '../peer-manager/peer_manager.js'; +import { type PeerScoring } from '../peer-manager/peer_scoring.js'; import { ReqRespSubProtocol, RequestableBuffer } from './interface.js'; +import { GoodByeReason, reqGoodbyeHandler } from './protocols/goodbye.js'; const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping')); @@ -23,9 +26,11 @@ const PING_REQUEST = RequestableBuffer.fromBuffer(Buffer.from('ping')); // and ask for specific data that they missed via the traditional gossip protocol. describe('ReqResp', () => { let peerManager: MockProxy; + let peerScoring: MockProxy; let nodes: ReqRespNode[]; beforeEach(() => { + peerScoring = mock(); peerManager = mock(); }); @@ -38,7 +43,7 @@ describe('ReqResp', () => { it('Should perform a ping request', async () => { // Create two nodes // They need to discover each other - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); const { req: pinger } = nodes[0]; await startNodes(nodes); @@ -55,7 +60,7 @@ describe('ReqResp', () => { }); it('Should handle gracefully if a peer connected peer is offline', async () => { - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); const { req: pinger } = nodes[0]; const { req: ponger } = nodes[1]; @@ -74,7 +79,7 @@ describe('ReqResp', () => { }); it('Should request from a later peer if other peers are offline', async () => { - nodes = await createNodes(peerManager, 4); + nodes = await createNodes(peerScoring, 4); await startNodes(nodes); await sleep(500); @@ -110,7 +115,7 @@ describe('ReqResp', () => { }); it('Should hit a rate limit if too many requests are made in quick succession', async () => { - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); await startNodes(nodes); @@ -137,7 +142,7 @@ describe('ReqResp', () => { const txHash = tx.getTxHash(); const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; - protocolHandlers[ReqRespSubProtocol.TX] = (message: Buffer): Promise => { + protocolHandlers[ReqRespSubProtocol.TX] = (_peerId: PeerId, message: Buffer): Promise => { const receivedHash = TxHash.fromBuffer(message); if (txHash.equals(receivedHash)) { return Promise.resolve(tx.toBuffer()); @@ -145,7 +150,7 @@ describe('ReqResp', () => { return Promise.resolve(Buffer.from('')); }; - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); await startNodes(nodes, protocolHandlers); await sleep(500); @@ -163,11 +168,11 @@ describe('ReqResp', () => { const txHash = tx.getTxHash(); const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; - protocolHandlers[ReqRespSubProtocol.TX] = (_message: Buffer): Promise => { + protocolHandlers[ReqRespSubProtocol.TX] = (_peerId: PeerId, _message: Buffer): Promise => { return Promise.resolve(Buffer.alloc(0)); }; - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); const spySendRequestToPeer = jest.spyOn(nodes[0].req, 'sendRequestToPeer'); @@ -187,11 +192,11 @@ describe('ReqResp', () => { const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; // Return nothing - protocolHandlers[ReqRespSubProtocol.TX] = (_message: Buffer): Promise => { + protocolHandlers[ReqRespSubProtocol.TX] = (_peerId: PeerId, _message: Buffer): Promise => { return Promise.resolve(Buffer.from('')); }; - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); await startNodes(nodes, protocolHandlers); await sleep(500); @@ -203,7 +208,7 @@ describe('ReqResp', () => { }); it('Should hit individual timeout if nothing is returned over the stream', async () => { - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); await startNodes(nodes); @@ -235,7 +240,7 @@ describe('ReqResp', () => { ); // Expect the peer to be penalized for timing out - expect(peerManager.penalizePeer).toHaveBeenCalledWith( + expect(peerScoring.penalizePeer).toHaveBeenCalledWith( expect.objectContaining({ publicKey: nodes[1].p2p.peerId.publicKey, // must use objectContaining as we do not match exactly, as private key is contained in this test mapping }), @@ -244,7 +249,7 @@ describe('ReqResp', () => { }); it('Should hit collective timeout if nothing is returned over the stream from multiple peers', async () => { - nodes = await createNodes(peerManager, 4); + nodes = await createNodes(peerScoring, 4); await startNodes(nodes); @@ -276,7 +281,7 @@ describe('ReqResp', () => { // Mock that the node will respond with the tx const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; - protocolHandlers[ReqRespSubProtocol.TX] = (message: Buffer): Promise => { + protocolHandlers[ReqRespSubProtocol.TX] = (_peerId: PeerId, message: Buffer): Promise => { const receivedHash = TxHash.fromBuffer(message); if (txHash.equals(receivedHash)) { return Promise.resolve(tx.toBuffer()); @@ -287,11 +292,11 @@ describe('ReqResp', () => { // Mock that the receiving node will find that the transaction is invalid const protocolValidators = MOCK_SUB_PROTOCOL_VALIDATORS; protocolValidators[ReqRespSubProtocol.TX] = (_request, _response, peer) => { - peerManager.penalizePeer(peer, PeerErrorSeverity.LowToleranceError); + peerScoring.penalizePeer(peer, PeerErrorSeverity.LowToleranceError); return Promise.resolve(false); }; - nodes = await createNodes(peerManager, 2); + nodes = await createNodes(peerScoring, 2); await startNodes(nodes, protocolHandlers, protocolValidators); await sleep(500); @@ -302,7 +307,7 @@ describe('ReqResp', () => { expect(res).toBeUndefined(); // Expect the peer to be penalized for sending an invalid response - expect(peerManager.penalizePeer).toHaveBeenCalledWith( + expect(peerScoring.penalizePeer).toHaveBeenCalledWith( expect.objectContaining({ publicKey: nodes[1].p2p.peerId.publicKey, // must use objectContaining as we do not match exactly, as private key is contained in this test mapping }), @@ -310,4 +315,36 @@ describe('ReqResp', () => { ); }); }); + + describe('Goodbye protocol', () => { + it('should send a goodbye message to a peer', async () => { + nodes = await createNodes(peerScoring, 2); + + const protocolHandlers = MOCK_SUB_PROTOCOL_HANDLERS; + // Req Goodbye Handler is defined in the reqresp.ts file + protocolHandlers[ReqRespSubProtocol.GOODBYE] = reqGoodbyeHandler(peerManager); + + await startNodes(nodes, protocolHandlers); + await sleep(500); + await connectToPeers(nodes); + await sleep(500); + + const response = await nodes[0].req.sendRequestToPeer( + nodes[1].p2p.peerId, + ReqRespSubProtocol.GOODBYE, + Buffer.from([GoodByeReason.SHUTDOWN]), + ); + + // Node 1 Peer manager receives the goodbye from the sending node + expect(peerManager.goodbyeReceived).toHaveBeenCalledWith( + expect.objectContaining({ + publicKey: nodes[0].p2p.peerId.publicKey, + }), + GoodByeReason.SHUTDOWN, + ); + + // Expect the response to be a buffer of length 1 + expect(response).toEqual(Buffer.from([0x0])); + }); + }); }); diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index e6b8d386c9cb..a242e1357ee0 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -14,7 +14,7 @@ import { InvalidResponseError, } from '../../errors/reqresp.error.js'; import { SnappyTransform } from '../encoding.js'; -import { type PeerManager } from '../peer_manager.js'; +import { type PeerScoring } from '../peer-manager/peer_scoring.js'; import { type P2PReqRespConfig } from './config.js'; import { DEFAULT_SUB_PROTOCOL_HANDLERS, @@ -25,7 +25,7 @@ import { type SubProtocolMap, subProtocolMap, } from './interface.js'; -import { RequestResponseRateLimiter } from './rate_limiter/rate_limiter.js'; +import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js'; /** * The Request Response Service @@ -55,13 +55,13 @@ export class ReqResp { private snappyTransform: SnappyTransform; - constructor(config: P2PReqRespConfig, protected readonly libp2p: Libp2p, private peerManager: PeerManager) { + constructor(config: P2PReqRespConfig, private libp2p: Libp2p, private peerScoring: PeerScoring) { this.logger = createLogger('p2p:reqresp'); this.overallRequestTimeoutMs = config.overallRequestTimeoutMs; this.individualRequestTimeoutMs = config.individualRequestTimeoutMs; - this.rateLimiter = new RequestResponseRateLimiter(peerManager); + this.rateLimiter = new RequestResponseRateLimiter(peerScoring); this.snappyTransform = new SnappyTransform(); } @@ -194,7 +194,7 @@ export class ReqResp { * If the stream is not closed by the dialled peer, and a timeout occurs, then * the stream is closed on the requester's end and sender (us) updates its peer score */ - async sendRequestToPeer( + public async sendRequestToPeer( peerId: PeerId, subProtocol: ReqRespSubProtocol, payload: Buffer, @@ -241,7 +241,7 @@ export class ReqResp { private handleResponseError(e: any, peerId: PeerId, subProtocol: ReqRespSubProtocol): void { const severity = this.categorizeError(e, peerId, subProtocol); if (severity) { - this.peerManager.penalizePeer(peerId, severity); + this.peerScoring.penalizePeer(peerId, severity); } } @@ -339,7 +339,7 @@ export class ReqResp { async function* (source: any) { for await (const chunkList of source) { const msg = Buffer.from(chunkList.subarray()); - const response = await handler(msg); + const response = await handler(connection.remotePeer, msg); yield new Uint8Array(transform.outboundTransformNoTopic(response)); } }, diff --git a/yarn-project/telemetry-client/src/attributes.ts b/yarn-project/telemetry-client/src/attributes.ts index 56b1105052b7..15fdeafa62fa 100644 --- a/yarn-project/telemetry-client/src/attributes.ts +++ b/yarn-project/telemetry-client/src/attributes.ts @@ -70,6 +70,8 @@ export const L1_TX_TYPE = 'aztec.l1.tx_type'; export const L1_SENDER = 'aztec.l1.sender'; /** The phase of the transaction */ export const TX_PHASE_NAME = 'aztec.tx.phase_name'; +/** The reason for disconnecting a peer */ +export const P2P_GOODBYE_REASON = 'aztec.p2p.goodbye.reason'; /** The proving job type */ export const PROVING_JOB_TYPE = 'aztec.proving.job_type'; /** The proving job id */ diff --git a/yarn-project/telemetry-client/src/metrics.ts b/yarn-project/telemetry-client/src/metrics.ts index ccd4ed7c5fc0..f755bbde8f45 100644 --- a/yarn-project/telemetry-client/src/metrics.ts +++ b/yarn-project/telemetry-client/src/metrics.ts @@ -68,6 +68,9 @@ export const L1_PUBLISHER_TX_CALLDATA_GAS = 'aztec.l1_publisher.tx_calldata_gas' export const L1_PUBLISHER_TX_BLOBDATA_GAS_USED = 'aztec.l1_publisher.tx_blobdata_gas_used'; export const L1_PUBLISHER_TX_BLOBDATA_GAS_COST = 'aztec.l1_publisher.tx_blobdata_gas_cost'; +export const PEER_MANAGER_GOODBYES_SENT = 'aztec.peer_manager.goodbyes_sent'; +export const PEER_MANAGER_GOODBYES_RECEIVED = 'aztec.peer_manager.goodbyes_received'; + export const PUBLIC_PROCESSOR_TX_DURATION = 'aztec.public_processor.tx_duration'; export const PUBLIC_PROCESSOR_TX_COUNT = 'aztec.public_processor.tx_count'; export const PUBLIC_PROCESSOR_TX_PHASE_COUNT = 'aztec.public_processor.tx_phase_count';