Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions yarn-project/p2p/src/services/libp2p/instrumentation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import type { Timer } from '@aztec/foundation/timer';
import type { TopicType } from '@aztec/stdlib/p2p';
import {
Attributes,
type Histogram,
Metrics,
type TelemetryClient,
type UpDownCounter,
ValueType,
} from '@aztec/telemetry-client';

export class P2PInstrumentation {
private messageValidationDuration: Histogram;
private messagePrevalidationCount: UpDownCounter;

constructor(client: TelemetryClient, name: string) {
const meter = client.getMeter(name);

this.messageValidationDuration = meter.createHistogram(Metrics.P2P_GOSSIP_MESSAGE_VALIDATION_DURATION, {
unit: 'ms',
description: 'How long validating a gossiped message takes',
valueType: ValueType.INT,
});

this.messagePrevalidationCount = meter.createUpDownCounter(Metrics.P2P_GOSSIP_MESSAGE_PREVALIDATION_COUNT, {
description: 'How many message pass/fail prevalidation',
valueType: ValueType.INT,
});
}

public recordMessageValidation(topicName: TopicType, timerOrMs: Timer | number) {
const ms = typeof timerOrMs === 'number' ? timerOrMs : timerOrMs.ms();
this.messageValidationDuration.record(Math.ceil(ms), { [Attributes.TOPIC_NAME]: topicName });
}

public incMessagePrevalidationStatus(passed: boolean, topicName: TopicType | undefined) {
this.messagePrevalidationCount.add(1, { [Attributes.TOPIC_NAME]: topicName, [Attributes.OK]: passed });
}
}
56 changes: 41 additions & 15 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { EpochCacheInterface } from '@aztec/epoch-cache';
import { createLibp2pComponentLogger, createLogger } from '@aztec/foundation/log';
import { SerialQueue } from '@aztec/foundation/queue';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { Timer } from '@aztec/foundation/timer';
import type { AztecAsyncKVStore } from '@aztec/kv-store';
import { protocolContractTreeRoot } from '@aztec/protocol-contracts';
import type { L2BlockSource } from '@aztec/stdlib/block';
Expand Down Expand Up @@ -65,6 +66,7 @@ import { reqGoodbyeHandler } from '../reqresp/protocols/goodbye.js';
import { pingHandler, reqRespBlockHandler, reqRespTxHandler, statusHandler } from '../reqresp/protocols/index.js';
import { ReqResp } from '../reqresp/reqresp.js';
import type { P2PBlockReceivedCallback, P2PService, PeerDiscoveryService } from '../service.js';
import { P2PInstrumentation } from './instrumentation.js';

interface ValidationResult {
name: string;
Expand Down Expand Up @@ -107,6 +109,8 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends

private gossipSubEventHandler: (e: CustomEvent<GossipsubMessage>) => void;

private instrumentation: P2PInstrumentation;

constructor(
private clientType: T,
private config: P2PConfig,
Expand All @@ -122,6 +126,8 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
) {
super(telemetry, 'LibP2PService');

this.instrumentation = new P2PInstrumentation(telemetry, 'LibP2PService');

this.msgIdSeenValidators[TopicType.tx] = new MessageSeenValidator(config.seenMessageCacheSize);
this.msgIdSeenValidators[TopicType.block_proposal] = new MessageSeenValidator(config.seenMessageCacheSize);
this.msgIdSeenValidators[TopicType.block_attestation] = new MessageSeenValidator(config.seenMessageCacheSize);
Expand Down Expand Up @@ -501,25 +507,33 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
}

protected preValidateReceivedMessage(msg: Message, msgId: string, source: PeerId) {
const getValidator = () => {
if (msg.topic === this.topicStrings[TopicType.tx]) {
return this.msgIdSeenValidators[TopicType.tx];
}
if (msg.topic === this.topicStrings[TopicType.block_attestation]) {
return this.msgIdSeenValidators[TopicType.block_attestation];
}
if (msg.topic === this.topicStrings[TopicType.block_proposal]) {
return this.msgIdSeenValidators[TopicType.block_proposal];
}
this.logger.error(`Received message on unknown topic: ${msg.topic}`);
};
let topicType: TopicType | undefined;

switch (msg.topic) {
case this.topicStrings[TopicType.tx]:
topicType = TopicType.tx;
break;
case this.topicStrings[TopicType.block_attestation]:
topicType = TopicType.block_attestation;
break;
case this.topicStrings[TopicType.block_proposal]:
topicType = TopicType.block_proposal;
break;
default:
this.logger.error(`Received message on unknown topic: ${msg.topic}`);
break;
}

const validator = getValidator();
const validator = topicType ? this.msgIdSeenValidators[topicType] : undefined;

if (!validator || !validator.addMessage(msgId)) {
this.instrumentation.incMessagePrevalidationStatus(false, topicType);
this.node.services.pubsub.reportMessageValidationResult(msgId, source.toString(), TopicValidatorResult.Ignore);
return false;
}

this.instrumentation.incMessagePrevalidationStatus(true, topicType);

return true;
}

Expand Down Expand Up @@ -559,14 +573,20 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
validationFunc: () => Promise<{ result: boolean; obj: T }>,
msgId: string,
source: PeerId,
topicType: TopicType,
): Promise<{ result: boolean; obj: T | undefined }> {
let resultAndObj: { result: boolean; obj: T | undefined } = { result: false, obj: undefined };
const timer = new Timer();
try {
resultAndObj = await validationFunc();
} catch (err) {
this.logger.error(`Error deserialising and validating message `, err);
}

if (resultAndObj.result) {
this.instrumentation.recordMessageValidation(topicType, timer);
}

this.node.services.pubsub.reportMessageValidationResult(
msgId,
source.toString(),
Expand All @@ -582,7 +602,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
return { result, obj: tx };
};

const { result, obj: tx } = await this.validateReceivedMessage<Tx>(validationFunc, msgId, source);
const { result, obj: tx } = await this.validateReceivedMessage<Tx>(validationFunc, msgId, source, TopicType.tx);
if (!result || !tx) {
return;
}
Expand Down Expand Up @@ -613,6 +633,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
validationFunc,
msgId,
source,
TopicType.block_attestation,
);
if (!result || !attestation) {
return;
Expand Down Expand Up @@ -640,7 +661,12 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
return { result, obj: block };
};

const { result, obj: block } = await this.validateReceivedMessage<BlockProposal>(validationFunc, msgId, source);
const { result, obj: block } = await this.validateReceivedMessage<BlockProposal>(
validationFunc,
msgId,
source,
TopicType.block_proposal,
);
if (!result || !block) {
return;
}
Expand Down
2 changes: 2 additions & 0 deletions yarn-project/telemetry-client/src/attributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,5 @@ export const GAS_DIMENSION = 'aztec.gas_dimension';
export const WORLD_STATE_REQUEST_TYPE = 'aztec.world_state_request';

export const NODEJS_EVENT_LOOP_STATE = 'nodejs.eventloop.state';

export const TOPIC_NAME = 'aztec.gossip.topic_name';
3 changes: 3 additions & 0 deletions yarn-project/telemetry-client/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ export const P2P_REQ_RESP_RECEIVED_REQUESTS = 'aztec.p2p.req_resp.received_reque
export const P2P_REQ_RESP_FAILED_OUTBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_outbound_requests';
export const P2P_REQ_RESP_FAILED_INBOUND_REQUESTS = 'aztec.p2p.req_resp.failed_inbound_requests';

export const P2P_GOSSIP_MESSAGE_VALIDATION_DURATION = 'aztec.p2p.gossip.message_validation_duration';
export const P2P_GOSSIP_MESSAGE_PREVALIDATION_COUNT = 'aztec.p2p.gossip.message_validation_count';

export const PUBLIC_PROCESSOR_TX_DURATION = 'aztec.public_processor.tx_duration';
export const PUBLIC_PROCESSOR_TX_COUNT = 'aztec.public_processor.tx_count';
export const PUBLIC_PROCESSOR_TX_PHASE_COUNT = 'aztec.public_processor.tx_phase_count';
Expand Down