Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export class ConnectionSampler {

await stream?.close();
} catch (error) {
this.logger.error(`Failed to close connection to peer ${streamId}`, { error });
this.logger.warn(`Failed to close connection to peer with stream id ${streamId}`);
} finally {
this.streams.delete(streamId);
}
Expand Down
57 changes: 57 additions & 0 deletions yarn-project/p2p/src/services/reqresp/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Request response metrics
import { Attributes, Metrics, ValueType } from '@aztec/telemetry-client';
import { type TelemetryClient, type Tracer, type UpDownCounter } from '@aztec/telemetry-client';

export class ReqRespMetrics {
public readonly tracer: Tracer;

private readonly sentRequests: UpDownCounter;
private readonly receivedRequests: UpDownCounter;

private readonly failedOutboundRequests: UpDownCounter;
private readonly failedInboundRequests: UpDownCounter;

constructor(readonly telemetryClient: TelemetryClient, name = 'ReqResp') {
this.tracer = telemetryClient.getTracer(name);

const meter = telemetryClient.getMeter(name);
this.sentRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_SENT_REQUESTS, {
description: 'Number of requests sent to peers',
unit: 'requests',
valueType: ValueType.INT,
});
this.receivedRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_RECEIVED_REQUESTS, {
description: 'Number of requests received from peers',
unit: 'requests',
valueType: ValueType.INT,
});

this.failedOutboundRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS, {
description: 'Number of failed outbound requests - nodes not getting valid responses',
unit: 'requests',
valueType: ValueType.INT,
});

this.failedInboundRequests = meter.createUpDownCounter(Metrics.P2P_REQ_RESP_FAILED_INBOUND_REQUESTS, {
description: 'Number of failed inbound requests - node failing to respond to requests',
unit: 'requests',
valueType: ValueType.INT,
});
}

public recordRequestSent(protocol: string) {
this.sentRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol });
}

public recordRequestReceived(protocol: string) {
this.receivedRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol });
}

public recordRequestError(protocol: string) {
this.failedOutboundRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol });
}

public recordResponseError(protocol: string) {
this.failedInboundRequests.add(1, { [Attributes.P2P_REQ_RESP_PROTOCOL]: protocol });
}
}
44 changes: 39 additions & 5 deletions yarn-project/p2p/src/services/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import { PeerErrorSeverity } from '@aztec/circuit-types';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { executeTimeout } from '@aztec/foundation/timer';
import { Attributes, type TelemetryClient, getTelemetryClient, trackSpan } from '@aztec/telemetry-client';

import { type IncomingStreamData, type PeerId, type Stream } from '@libp2p/interface';
import { pipe } from 'it-pipe';
Expand All @@ -27,6 +28,7 @@ import {
type SubProtocolMap,
subProtocolMap,
} from './interface.js';
import { ReqRespMetrics } from './metrics.js';
import { RequestResponseRateLimiter } from './rate-limiter/rate_limiter.js';

/**
Expand All @@ -53,13 +55,19 @@ export class ReqResp {
private subProtocolHandlers: ReqRespSubProtocolHandlers = DEFAULT_SUB_PROTOCOL_HANDLERS;
private subProtocolValidators: ReqRespSubProtocolValidators = DEFAULT_SUB_PROTOCOL_VALIDATORS;

private connectionSampler: ConnectionSampler;
private rateLimiter: RequestResponseRateLimiter;

private snappyTransform: SnappyTransform;

private connectionSampler: ConnectionSampler;
private metrics: ReqRespMetrics;

constructor(config: P2PReqRespConfig, private libp2p: Libp2p, private peerScoring: PeerScoring) {
constructor(
config: P2PReqRespConfig,
private libp2p: Libp2p,
private peerScoring: PeerScoring,
telemetryClient: TelemetryClient = getTelemetryClient(),
) {
this.logger = createLogger('p2p:reqresp');

this.overallRequestTimeoutMs = config.overallRequestTimeoutMs;
Expand All @@ -71,6 +79,11 @@ export class ReqResp {
this.connectionSampler = new ConnectionSampler(libp2p);

this.snappyTransform = new SnappyTransform();
this.metrics = new ReqRespMetrics(telemetryClient);
}

get tracer() {
return this.metrics.tracer;
}

/**
Expand All @@ -97,16 +110,16 @@ export class ReqResp {
}

// Close all active connections
await this.connectionSampler.stop();
this.logger.debug('ReqResp: Connection sampler stopped');

const closeStreamPromises = this.libp2p.getConnections().map(connection => connection.close());
await Promise.all(closeStreamPromises);
this.logger.debug('ReqResp: All active streams closed');

this.rateLimiter.stop();
this.logger.debug('ReqResp: Rate limiter stopped');

await this.connectionSampler.stop();
this.logger.debug('ReqResp: Connection sampler stopped');

// NOTE: We assume libp2p instance is managed by the caller
}

Expand Down Expand Up @@ -213,6 +226,13 @@ export class ReqResp {
*
* @throws {CollectiveReqRespTimeoutError} - If the request batch exceeds the specified timeout (`timeoutMs`).
*/
@trackSpan(
'ReqResp.sendBatchRequest',
(subProtocol: ReqRespSubProtocol, requests: InstanceType<SubProtocolMap[ReqRespSubProtocol]['request']>[]) => ({
[Attributes.P2P_REQ_RESP_PROTOCOL]: subProtocol,
[Attributes.P2P_REQ_RESP_BATCH_REQUESTS_COUNT]: requests.length,
}),
)
async sendBatchRequest<SubProtocol extends ReqRespSubProtocol>(
subProtocol: SubProtocol,
requests: InstanceType<SubProtocolMap[SubProtocol]['request']>[],
Expand Down Expand Up @@ -354,13 +374,19 @@ export class ReqResp {
* If the stream is not closed by the dialled peer, and a timeout occurs, then
* the stream is closed on the requester's end and sender (us) updates its peer score
*/
@trackSpan('ReqResp.sendRequestToPeer', (peerId: PeerId, subProtocol: ReqRespSubProtocol, _: Buffer) => ({
[Attributes.P2P_ID]: peerId.toString(),
[Attributes.P2P_REQ_RESP_PROTOCOL]: subProtocol,
}))
public async sendRequestToPeer(
peerId: PeerId,
subProtocol: ReqRespSubProtocol,
payload: Buffer,
): Promise<Buffer | undefined> {
let stream: Stream | undefined;
try {
this.metrics.recordRequestSent(subProtocol);

stream = await this.connectionSampler.dialProtocol(peerId, subProtocol);

// Open the stream with a timeout
Expand All @@ -372,6 +398,7 @@ export class ReqResp {

return result;
} catch (e: any) {
this.metrics.recordRequestError(subProtocol);
this.handleResponseError(e, peerId, subProtocol);
} finally {
// Only close the stream if we created it
Expand Down Expand Up @@ -479,7 +506,13 @@ export class ReqResp {
* We check rate limits for each peer, note the peer will be penalised within the rate limiter implementation
* if they exceed their peer specific limits.
*/
@trackSpan('ReqResp.streamHandler', (protocol: ReqRespSubProtocol, { connection }: IncomingStreamData) => ({
[Attributes.P2P_REQ_RESP_PROTOCOL]: protocol,
[Attributes.P2P_ID]: connection.remotePeer.toString(),
}))
private async streamHandler(protocol: ReqRespSubProtocol, { stream, connection }: IncomingStreamData) {
this.metrics.recordRequestReceived(protocol);

// Store a reference to from this for the async generator
if (!this.rateLimiter.allow(protocol, connection.remotePeer)) {
this.logger.warn(`Rate limit exceeded for ${protocol} from ${connection.remotePeer}`);
Expand All @@ -506,6 +539,7 @@ export class ReqResp {
);
} catch (e: any) {
this.logger.warn(e);
this.metrics.recordResponseError(protocol);
} finally {
await stream.close();
}
Expand Down
2 changes: 2 additions & 0 deletions yarn-project/telemetry-client/src/attributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ export const ROLLUP_PROVER_ID = 'aztec.rollup.prover_id';
export const PROOF_TIMED_OUT = 'aztec.proof.timed_out';

export const P2P_ID = 'aztec.p2p.id';
export const P2P_REQ_RESP_PROTOCOL = 'aztec.p2p.req_resp.protocol';
export const P2P_REQ_RESP_BATCH_REQUESTS_COUNT = 'aztec.p2p.req_resp.batch_requests_count';
export const POOL_NAME = 'aztec.pool.name';

export const SEQUENCER_STATE = 'aztec.sequencer.state';
Expand Down
5 changes: 5 additions & 0 deletions yarn-project/telemetry-client/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ export const L1_PUBLISHER_TX_BLOBDATA_GAS_COST = 'aztec.l1_publisher.tx_blobdata
export const PEER_MANAGER_GOODBYES_SENT = 'aztec.peer_manager.goodbyes_sent';
export const PEER_MANAGER_GOODBYES_RECEIVED = 'aztec.peer_manager.goodbyes_received';

export const P2P_REQ_RESP_SENT_REQUESTS = 'aztec.p2p.req_resp.sent_requests';
export const P2P_REQ_RESP_RECEIVED_REQUESTS = 'aztec.p2p.req_resp.received_requests';
export const P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_outbound_requests';
export const P2P_REQ_RESP_FAILED_INBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_inbound_requests';

export const PUBLIC_PROCESSOR_TX_DURATION = 'aztec.public_processor.tx_duration';
export const PUBLIC_PROCESSOR_TX_COUNT = 'aztec.public_processor.tx_count';
export const PUBLIC_PROCESSOR_TX_PHASE_COUNT = 'aztec.public_processor.tx_phase_count';
Expand Down
Loading