diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index b0bc3ee48641..c31af230232a 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -171,7 +171,7 @@ export class ConnectionSampler { await stream?.close(); } catch (error) { - this.logger.error(`Failed to close connection to peer ${streamId}`, { error }); + this.logger.warn(`Failed to close connection to peer with stream id ${streamId}`); } finally { this.streams.delete(streamId); } diff --git a/yarn-project/p2p/src/services/reqresp/metrics.ts b/yarn-project/p2p/src/services/reqresp/metrics.ts new file mode 100644 index 000000000000..e32b4cdb4f4b --- /dev/null +++ b/yarn-project/p2p/src/services/reqresp/metrics.ts @@ -0,0 +1,57 @@ +// Request response metrics +import { Attributes, Metrics, ValueType } from '@aztec/telemetry-client'; +import { type TelemetryClient, type Tracer, type UpDownCounter } from '@aztec/telemetry-client'; + +export class ReqRespMetrics { + public readonly tracer: Tracer; + + private readonly sentRequests: UpDownCounter; + private readonly receivedRequests: UpDownCounter; + + private readonly failedOutboundRequests: UpDownCounter; + private readonly failedInboundRequests: UpDownCounter; + + constructor(readonly telemetryClient: TelemetryClient, name = 'ReqResp') { + this.tracer = telemetryClient.getTracer(name); + + const meter = telemetryClient.getMeter(name); + this.sentRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_SENT_REQUESTS, { + description: 'Number of requests sent to peers', + unit: 'requests', + valueType: ValueType.INT, + }); + this.receivedRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_RECEIVED_REQUESTS, { + description: 'Number of requests received from peers', + unit: 'requests', + valueType: ValueType.INT, + }); + + this.failedOutboundRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS, { + description: 'Number of failed outbound requests - nodes not getting valid responses', + unit: 'requests', + valueType: ValueType.INT, + }); + + this.failedInboundRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_FAILED_INBOUND_REQUESTS, { + description: 'Number of failed inbound requests - node failing to respond to requests', + unit: 'requests', + valueType: ValueType.INT, + }); + } + + public recordRequestSent(protocol: string) { + this.sentRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } + + public recordRequestReceived(protocol: string) { + this.receivedRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } + + public recordRequestError(protocol: string) { + this.failedOutboundRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } + + public recordResponseError(protocol: string) { + this.failedInboundRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol }); + } +} diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index 8ed753331be5..c77f7d13b996 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -2,6 +2,7 @@ import { PeerErrorSeverity } from '@aztec/circuit-types'; import { type Logger, createLogger } from '@aztec/foundation/log'; import { executeTimeout } from '@aztec/foundation/timer'; +import { Attributes, type TelemetryClient, getTelemetryClient, trackSpan } from '@aztec/telemetry-client'; import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface'; import { pipe } from 'it-pipe'; @@ -27,6 +28,7 @@ import { type SubProtocolMap, subProtocolMap, } from './interface.js'; +import { ReqRespMetrics } from './metrics.js'; import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js'; /** @@ -53,13 +55,19 @@ export class ReqResp { private subProtocolHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS; private subProtocolValidators: ReqRespSubProtocolValidators = DEFAULT_SUB_PROTOCOL_VALIDATORS; + private connectionSampler: ConnectionSampler; private rateLimiter: RequestResponseRateLimiter; private snappyTransform: SnappyTransform; - private connectionSampler: ConnectionSampler; + private metrics: ReqRespMetrics; - constructor(config: P2PReqRespConfig, private libp2p: Libp2p, private peerScoring: PeerScoring) { + constructor( + config: P2PReqRespConfig, + private libp2p: Libp2p, + private peerScoring: PeerScoring, + telemetryClient: TelemetryClient = getTelemetryClient(), + ) { this.logger = createLogger('p2p:reqresp'); this.overallRequestTimeoutMs = config.overallRequestTimeoutMs; @@ -71,6 +79,11 @@ export class ReqResp { this.connectionSampler = new ConnectionSampler(libp2p); this.snappyTransform = new SnappyTransform(); + this.metrics = new ReqRespMetrics(telemetryClient); + } + + get tracer() { + return this.metrics.tracer; } /** @@ -97,6 +110,9 @@ export class ReqResp { } // Close all active connections + await this.connectionSampler.stop(); + this.logger.debug('ReqResp: Connection sampler stopped'); + const closeStreamPromises = this.libp2p.getConnections().map(connection => connection.close()); await Promise.all(closeStreamPromises); this.logger.debug('ReqResp: All active streams closed'); @@ -104,9 +120,6 @@ export class ReqResp { this.rateLimiter.stop(); this.logger.debug('ReqResp: Rate limiter stopped'); - await this.connectionSampler.stop(); - this.logger.debug('ReqResp: Connection sampler stopped'); - // NOTE: We assume libp2p instance is managed by the caller } @@ -213,6 +226,13 @@ export class ReqResp { * * @throws {CollectiveReqRespTimeoutError} - If the request batch exceeds the specified timeout (`timeoutMs`). */ + @trackSpan( + 'ReqResp.sendBatchRequest', + (subProtocol: ReqRespSubProtocol, requests: InstanceType[]) => ({ + [Attributes.P2P_REQ_RESP_PROTOCOL]: subProtocol, + [Attributes.P2P_REQ_RESP_BATCH_REQUESTS_COUNT]: requests.length, + }), + ) async sendBatchRequest( subProtocol: SubProtocol, requests: InstanceType[], @@ -354,6 +374,10 @@ export class ReqResp { * If the stream is not closed by the dialled peer, and a timeout occurs, then * the stream is closed on the requester's end and sender (us) updates its peer score */ + @trackSpan('ReqResp.sendRequestToPeer', (peerId: PeerId, subProtocol: ReqRespSubProtocol, _: Buffer) => ({ + [Attributes.P2P_ID]: peerId.toString(), + [Attributes.P2P_REQ_RESP_PROTOCOL]: subProtocol, + })) public async sendRequestToPeer( peerId: PeerId, subProtocol: ReqRespSubProtocol, @@ -361,6 +385,8 @@ export class ReqResp { ): Promise { let stream: Stream | undefined; try { + this.metrics.recordRequestSent(subProtocol); + stream = await this.connectionSampler.dialProtocol(peerId, subProtocol); // Open the stream with a timeout @@ -372,6 +398,7 @@ export class ReqResp { return result; } catch (e: any) { + this.metrics.recordRequestError(subProtocol); this.handleResponseError(e, peerId, subProtocol); } finally { // Only close the stream if we created it @@ -479,7 +506,13 @@ export class ReqResp { * We check rate limits for each peer, note the peer will be penalised within the rate limiter implementation * if they exceed their peer specific limits. */ + @trackSpan('ReqResp.streamHandler', (protocol: ReqRespSubProtocol, { connection }: IncomingStreamData) => ({ + [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol, + [Attributes.P2P_ID]: connection.remotePeer.toString(), + })) private async streamHandler(protocol: ReqRespSubProtocol, { stream, connection }: IncomingStreamData) { + this.metrics.recordRequestReceived(protocol); + // Store a reference to from this for the async generator if (!this.rateLimiter.allow(protocol, connection.remotePeer)) { this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`); @@ -506,6 +539,7 @@ export class ReqResp { ); } catch (e: any) { this.logger.warn(e); + this.metrics.recordResponseError(protocol); } finally { await stream.close(); } diff --git a/yarn-project/telemetry-client/src/attributes.ts b/yarn-project/telemetry-client/src/attributes.ts index 15fdeafa62fa..3df53eaf7b6e 100644 --- a/yarn-project/telemetry-client/src/attributes.ts +++ b/yarn-project/telemetry-client/src/attributes.ts @@ -84,6 +84,8 @@ export const ROLLUP_PROVER_ID = 'aztec.rollup.prover_id'; export const PROOF_TIMED_OUT = 'aztec.proof.timed_out'; export const P2P_ID = 'aztec.p2p.id'; +export const P2P_REQ_RESP_PROTOCOL = 'aztec.p2p.req_resp.protocol'; +export const P2P_REQ_RESP_BATCH_REQUESTS_COUNT = 'aztec.p2p.req_resp.batch_requests_count'; export const POOL_NAME = 'aztec.pool.name'; export const SEQUENCER_STATE = 'aztec.sequencer.state'; diff --git a/yarn-project/telemetry-client/src/metrics.ts b/yarn-project/telemetry-client/src/metrics.ts index f755bbde8f45..5314a2039680 100644 --- a/yarn-project/telemetry-client/src/metrics.ts +++ b/yarn-project/telemetry-client/src/metrics.ts @@ -71,6 +71,11 @@ export const L1_PUBLISHER_TX_BLOBDATA_GAS_COST = 'aztec.l1_publisher.tx_blobdata export const PEER_MANAGER_GOODBYES_SENT = 'aztec.peer_manager.goodbyes_sent'; export const PEER_MANAGER_GOODBYES_RECEIVED = 'aztec.peer_manager.goodbyes_received'; +export const P2P_REQ_RESP_SENT_REQUESTS = 'aztec.p2p.req_resp.sent_requests'; +export const P2P_REQ_RESP_RECEIVED_REQUESTS = 'aztec.p2p.req_resp.received_requests'; +export const P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_outbound_requests'; +export const P2P_REQ_RESP_FAILED_INBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_inbound_requests'; + export const PUBLIC_PROCESSOR_TX_DURATION = 'aztec.public_processor.tx_duration'; export const PUBLIC_PROCESSOR_TX_COUNT = 'aztec.public_processor.tx_count'; export const PUBLIC_PROCESSOR_TX_PHASE_COUNT = 'aztec.public_processor.tx_phase_count';