diff --git a/yarn-project/p2p/src/services/libp2p/instrumentation.ts b/yarn-project/p2p/src/services/libp2p/instrumentation.ts new file mode 100644 index 000000000000..69f25e5365aa --- /dev/null +++ b/yarn-project/p2p/src/services/libp2p/instrumentation.ts @@ -0,0 +1,39 @@ +import type { Timer } from '@aztec/foundation/timer'; +import type { TopicType } from '@aztec/stdlib/p2p'; +import { + Attributes, + type Histogram, + Metrics, + type TelemetryClient, + type UpDownCounter, + ValueType, +} from '@aztec/telemetry-client'; + +export class P2PInstrumentation { + private messageValidationDuration: Histogram; + private messagePrevalidationCount: UpDownCounter; + + constructor(client: TelemetryClient, name: string) { + const meter = client.getMeter(name); + + this.messageValidationDuration = meter.createHistogram(Metrics.P2P_GOSSIP_MESSAGE_VALIDATION_DURATION, { + unit: 'ms', + description: 'How long validating a gossiped message takes', + valueType: ValueType.INT, + }); + + this.messagePrevalidationCount = meter.createUpDownCounter(Metrics.P2P_GOSSIP_MESSAGE_PREVALIDATION_COUNT, { + description: 'How many message pass/fail prevalidation', + valueType: ValueType.INT, + }); + } + + public recordMessageValidation(topicName: TopicType, timerOrMs: Timer | number) { + const ms = typeof timerOrMs === 'number' ? timerOrMs : timerOrMs.ms(); + this.messageValidationDuration.record(Math.ceil(ms), { [Attributes.TOPIC_NAME]: topicName }); + } + + public incMessagePrevalidationStatus(passed: boolean, topicName: TopicType | undefined) { + this.messagePrevalidationCount.add(1, { [Attributes.TOPIC_NAME]: topicName, [Attributes.OK]: passed }); + } +} diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 4421c8b2bf12..4b2f75f02802 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -2,6 +2,7 @@ import type { EpochCacheInterface } from '@aztec/epoch-cache'; import { createLibp2pComponentLogger, createLogger } from '@aztec/foundation/log'; import { SerialQueue } from '@aztec/foundation/queue'; import { RunningPromise } from '@aztec/foundation/running-promise'; +import { Timer } from '@aztec/foundation/timer'; import type { AztecAsyncKVStore } from '@aztec/kv-store'; import { protocolContractTreeRoot } from '@aztec/protocol-contracts'; import type { L2BlockSource } from '@aztec/stdlib/block'; @@ -65,6 +66,7 @@ import { reqGoodbyeHandler } from '../reqresp/protocols/goodbye.js'; import { pingHandler, reqRespBlockHandler, reqRespTxHandler, statusHandler } from '../reqresp/protocols/index.js'; import { ReqResp } from '../reqresp/reqresp.js'; import type { P2PBlockReceivedCallback, P2PService, PeerDiscoveryService } from '../service.js'; +import { P2PInstrumentation } from './instrumentation.js'; interface ValidationResult { name: string; @@ -107,6 +109,8 @@ export class LibP2PService extends private gossipSubEventHandler: (e: CustomEvent) => void; + private instrumentation: P2PInstrumentation; + constructor( private clientType: T, private config: P2PConfig, @@ -122,6 +126,8 @@ export class LibP2PService extends ) { super(telemetry, 'LibP2PService'); + this.instrumentation = new P2PInstrumentation(telemetry, 'LibP2PService'); + this.msgIdSeenValidators[TopicType.tx] = new MessageSeenValidator(config.seenMessageCacheSize); this.msgIdSeenValidators[TopicType.block_proposal] = new MessageSeenValidator(config.seenMessageCacheSize); this.msgIdSeenValidators[TopicType.block_attestation] = new MessageSeenValidator(config.seenMessageCacheSize); @@ -501,25 +507,33 @@ export class LibP2PService extends } protected preValidateReceivedMessage(msg: Message, msgId: string, source: PeerId) { - const getValidator = () => { - if (msg.topic === this.topicStrings[TopicType.tx]) { - return this.msgIdSeenValidators[TopicType.tx]; - } - if (msg.topic === this.topicStrings[TopicType.block_attestation]) { - return this.msgIdSeenValidators[TopicType.block_attestation]; - } - if (msg.topic === this.topicStrings[TopicType.block_proposal]) { - return this.msgIdSeenValidators[TopicType.block_proposal]; - } - this.logger.error(`Received message on unknown topic: ${msg.topic}`); - }; + let topicType: TopicType | undefined; + + switch (msg.topic) { + case this.topicStrings[TopicType.tx]: + topicType = TopicType.tx; + break; + case this.topicStrings[TopicType.block_attestation]: + topicType = TopicType.block_attestation; + break; + case this.topicStrings[TopicType.block_proposal]: + topicType = TopicType.block_proposal; + break; + default: + this.logger.error(`Received message on unknown topic: ${msg.topic}`); + break; + } - const validator = getValidator(); + const validator = topicType ? this.msgIdSeenValidators[topicType] : undefined; if (!validator || !validator.addMessage(msgId)) { + this.instrumentation.incMessagePrevalidationStatus(false, topicType); this.node.services.pubsub.reportMessageValidationResult(msgId, source.toString(), TopicValidatorResult.Ignore); return false; } + + this.instrumentation.incMessagePrevalidationStatus(true, topicType); + return true; } @@ -559,14 +573,20 @@ export class LibP2PService extends validationFunc: () => Promise<{ result: boolean; obj: T }>, msgId: string, source: PeerId, + topicType: TopicType, ): Promise<{ result: boolean; obj: T | undefined }> { let resultAndObj: { result: boolean; obj: T | undefined } = { result: false, obj: undefined }; + const timer = new Timer(); try { resultAndObj = await validationFunc(); } catch (err) { this.logger.error(`Error deserialising and validating message `, err); } + if (resultAndObj.result) { + this.instrumentation.recordMessageValidation(topicType, timer); + } + this.node.services.pubsub.reportMessageValidationResult( msgId, source.toString(), @@ -582,7 +602,7 @@ export class LibP2PService extends return { result, obj: tx }; }; - const { result, obj: tx } = await this.validateReceivedMessage(validationFunc, msgId, source); + const { result, obj: tx } = await this.validateReceivedMessage(validationFunc, msgId, source, TopicType.tx); if (!result || !tx) { return; } @@ -613,6 +633,7 @@ export class LibP2PService extends validationFunc, msgId, source, + TopicType.block_attestation, ); if (!result || !attestation) { return; @@ -640,7 +661,12 @@ export class LibP2PService extends return { result, obj: block }; }; - const { result, obj: block } = await this.validateReceivedMessage(validationFunc, msgId, source); + const { result, obj: block } = await this.validateReceivedMessage( + validationFunc, + msgId, + source, + TopicType.block_proposal, + ); if (!result || !block) { return; } diff --git a/yarn-project/telemetry-client/src/attributes.ts b/yarn-project/telemetry-client/src/attributes.ts index f5bb8aacf9d8..cb26c2fc603e 100644 --- a/yarn-project/telemetry-client/src/attributes.ts +++ b/yarn-project/telemetry-client/src/attributes.ts @@ -110,3 +110,5 @@ export const GAS_DIMENSION = 'aztec.gas_dimension'; export const WORLD_STATE_REQUEST_TYPE = 'aztec.world_state_request'; export const NODEJS_EVENT_LOOP_STATE = 'nodejs.eventloop.state'; + +export const TOPIC_NAME = 'aztec.gossip.topic_name'; diff --git a/yarn-project/telemetry-client/src/metrics.ts b/yarn-project/telemetry-client/src/metrics.ts index 2f892b7beab3..a2cdd63db2dd 100644 --- a/yarn-project/telemetry-client/src/metrics.ts +++ b/yarn-project/telemetry-client/src/metrics.ts @@ -97,6 +97,9 @@ export const P2P_REQ_RESP_RECEIVED_REQUESTS = 'aztec.p2p.req_resp.received_reque export const P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_outbound_requests'; export const P2P_REQ_RESP_FAILED_INBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_inbound_requests'; +export const P2P_GOSSIP_MESSAGE_VALIDATION_DURATION = 'aztec.p2p.gossip.message_validation_duration'; +export const P2P_GOSSIP_MESSAGE_PREVALIDATION_COUNT = 'aztec.p2p.gossip.message_validation_count'; + export const PUBLIC_PROCESSOR_TX_DURATION = 'aztec.public_processor.tx_duration'; export const PUBLIC_PROCESSOR_TX_COUNT = 'aztec.public_processor.tx_count'; export const PUBLIC_PROCESSOR_TX_PHASE_COUNT = 'aztec.public_processor.tx_phase_count';