Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 321 additions & 34 deletions yarn-project/p2p/src/client/p2p_client.integration.test.ts

Large diffs are not rendered by default.

44 changes: 34 additions & 10 deletions yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import {
type Gossipable,
P2PClientType,
PeerErrorSeverity,
TopicTypeMap,
TopicType,
createTopicString,
getTopicTypeForClientType,
metricsTopicStrToLabels,
} from '@aztec/stdlib/p2p';
import { DatabasePublicStateSource, MerkleTreeId } from '@aztec/stdlib/trees';
import { Tx, type TxHash, type TxValidationResult } from '@aztec/stdlib/tx';
import { compressComponentVersions } from '@aztec/stdlib/versioning';
import { Attributes, OtelMetricsAdapter, type TelemetryClient, WithTracer, trackSpan } from '@aztec/telemetry-client';

import type { ENR } from '@chainsafe/enr';
Expand Down Expand Up @@ -57,6 +59,7 @@ import {
} from '../../msg_validators/tx_validator/index.js';
import { GossipSubEvent } from '../../types/index.js';
import { type PubSubLibp2p, convertToMultiaddr } from '../../util.js';
import { getVersions } from '../../versioning.js';
import { AztecDatastore } from '../data_store.js';
import { SnappyTransform, fastMsgIdFn, getMsgIdFn, msgIdToStrFn } from '../encoding.js';
import { gossipScoreThresholds } from '../gossipsub/scoring.js';
Expand Down Expand Up @@ -95,6 +98,9 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
private attestationValidator: AttestationValidator;
private blockProposalValidator: BlockProposalValidator;

private protocolVersion = '';
private topicStrings: Record<TopicType, string> = {} as Record<TopicType, string>;

// Request and response sub service
public reqresp: ReqResp;

Expand Down Expand Up @@ -127,6 +133,17 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
) {
super(telemetry, 'LibP2PService');

const versions = getVersions(config);
this.protocolVersion = compressComponentVersions(versions);
logger.info(`Started libp2p service with protocol version ${this.protocolVersion}`);

this.topicStrings[TopicType.tx] = createTopicString(TopicType.tx, this.protocolVersion);
this.topicStrings[TopicType.block_proposal] = createTopicString(TopicType.block_proposal, this.protocolVersion);
this.topicStrings[TopicType.block_attestation] = createTopicString(
TopicType.block_attestation,
this.protocolVersion,
);

const peerScoring = new PeerScoring(config);
this.reqresp = new ReqResp(config, node, peerScoring);

Expand Down Expand Up @@ -198,6 +215,13 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
peerDiscovery.push(bootstrap({ list: bootstrapNodes }));
}

const versions = getVersions(config);
const protocolVersion = compressComponentVersions(versions);

const txTopic = createTopicString(TopicType.tx, protocolVersion);
const blockProposalTopic = createTopicString(TopicType.block_proposal, protocolVersion);
const blockAttestationTopic = createTopicString(TopicType.block_attestation, protocolVersion);

const node = await createLibp2p({
start: false,
peerId,
Expand Down Expand Up @@ -251,24 +275,24 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
fastMsgIdFn: fastMsgIdFn,
dataTransform: new SnappyTransform(),
metricsRegister: otelMetricsAdapter,
metricsTopicStrToLabel: metricsTopicStrToLabels(),
metricsTopicStrToLabel: metricsTopicStrToLabels(protocolVersion),
asyncValidation: true,
scoreThresholds: gossipScoreThresholds,
scoreParams: createPeerScoreParams({
// IPColocation factor can be disabled for local testing - default to -5
IPColocationFactorWeight: config.debugDisableColocationPenalty ? 0 : -5.0,
topics: {
[Tx.p2pTopic]: createTopicScoreParams({
[txTopic]: createTopicScoreParams({
topicWeight: 1,
invalidMessageDeliveriesWeight: -20,
invalidMessageDeliveriesDecay: 0.5,
}),
[BlockAttestation.p2pTopic]: createTopicScoreParams({
[blockAttestationTopic]: createTopicScoreParams({
topicWeight: 1,
invalidMessageDeliveriesWeight: -20,
invalidMessageDeliveriesDecay: 0.5,
}),
[BlockProposal.p2pTopic]: createTopicScoreParams({
[blockProposalTopic]: createTopicScoreParams({
topicWeight: 1,
invalidMessageDeliveriesWeight: -20,
invalidMessageDeliveriesDecay: 0.5,
Expand Down Expand Up @@ -324,7 +348,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends

// Subscribe to standard GossipSub topics by default
for (const topic of getTopicTypeForClientType(this.clientType)) {
this.subscribeToTopic(TopicTypeMap[topic].p2pTopic);
this.subscribeToTopic(this.topicStrings[topic]);
}

// Create request response protocol handlers
Expand Down Expand Up @@ -483,13 +507,13 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
* @param data - The message data
*/
protected async handleNewGossipMessage(msg: Message, msgId: string, source: PeerId) {
if (msg.topic === Tx.p2pTopic) {
if (msg.topic === this.topicStrings[TopicType.tx]) {
await this.handleGossipedTx(msg, msgId, source);
}
if (msg.topic === BlockAttestation.p2pTopic && this.clientType === P2PClientType.Full) {
if (msg.topic === this.topicStrings[TopicType.block_attestation] && this.clientType === P2PClientType.Full) {
await this.processAttestationFromPeer(msg, msgId, source);
}
if (msg.topic == BlockProposal.p2pTopic) {
if (msg.topic === this.topicStrings[TopicType.block_proposal]) {
await this.processBlockFromPeer(msg, msgId, source);
}

Expand Down Expand Up @@ -901,7 +925,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
const identifier = await message.p2pMessageIdentifier().then(i => i.toString());
this.logger.trace(`Sending message ${identifier}`, { p2pMessageIdentifier: identifier });

const recipientsNum = await this.publishToTopic(parent.p2pTopic, message.toBuffer());
const recipientsNum = await this.publishToTopic(this.topicStrings[parent.p2pTopic], message.toBuffer());
this.logger.debug(`Sent message ${identifier} to ${recipientsNum} peers`, {
p2pMessageIdentifier: identifier,
sourcePeer: this.node.peerId.toString(),
Expand Down
22 changes: 20 additions & 2 deletions yarn-project/p2p/src/test-helpers/make-test-p2p-clients.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { MockL2BlockSource } from '@aztec/archiver/test';
import type { EpochCache } from '@aztec/epoch-cache';
import { type Logger, createLogger } from '@aztec/foundation/log';
import { sleep } from '@aztec/foundation/sleep';
import type { DataStoreConfig } from '@aztec/kv-store/config';
import { openTmpStore } from '@aztec/kv-store/lmdb-v2';
import type { WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server';
Expand Down Expand Up @@ -57,6 +58,7 @@ export async function makeTestP2PClient(
p2pEnabled: true,
peerIdPrivateKey,
p2pIp: `127.0.0.1`,
listenAddress: `127.0.0.1`,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2P test wouldn't run at all on '0.0.0.0', had to set to this.

p2pPort: port,
bootstrapNodes: peers,
peerCheckIntervalMS: 1000,
Expand Down Expand Up @@ -101,7 +103,16 @@ export async function makeTestP2PClients(numberOfPeers: number, testConfig: Make
const clients: P2PClient[] = [];
const peerIdPrivateKeys = generatePeerIdPrivateKeys(numberOfPeers);

const ports = await getPorts(numberOfPeers);
let ports = [];
while (true) {
try {
ports = await getPorts(numberOfPeers);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regularly got failures on mainframe with this.

break;
} catch (err) {
await sleep(1000);
}
}

const peerEnrs = await makeEnrs(peerIdPrivateKeys, ports, testConfig.p2pBaseConfig);

for (let i = 0; i < numberOfPeers; i++) {
Expand All @@ -113,5 +124,12 @@ export async function makeTestP2PClients(numberOfPeers: number, testConfig: Make
}

await Promise.all(clients.map(client => client.isReady()));
return clients;
return clients.map((client, index) => {
return {
client,
peerPrivateKey: peerIdPrivateKeys[index],
port: ports[index],
enr: peerEnrs[index],
};
});
}
4 changes: 2 additions & 2 deletions yarn-project/stdlib/src/p2p/block_attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { ZodFor } from '../schemas/index.js';
import { ConsensusPayload } from './consensus_payload.js';
import { Gossipable } from './gossipable.js';
import { SignatureDomainSeparator, getHashedSignaturePayloadEthSignedMessage } from './signature_utils.js';
import { TopicType, createTopicString } from './topic_type.js';
import { TopicType } from './topic_type.js';

export class BlockAttestationHash extends Buffer32 {
constructor(hash: Buffer) {
Expand All @@ -26,7 +26,7 @@ export class BlockAttestationHash extends Buffer32 {
* will produce a block attestation over the header of the block
*/
export class BlockAttestation extends Gossipable {
static override p2pTopic = createTopicString(TopicType.block_attestation);
static override p2pTopic = TopicType.block_attestation;

private sender: EthAddress | undefined;

Expand Down
4 changes: 2 additions & 2 deletions yarn-project/stdlib/src/p2p/block_proposal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
getHashedSignaturePayload,
getHashedSignaturePayloadEthSignedMessage,
} from './signature_utils.js';
import { TopicType, createTopicString } from './topic_type.js';
import { TopicType } from './topic_type.js';

export class BlockProposalHash extends Buffer32 {
constructor(hash: Buffer) {
Expand All @@ -27,7 +27,7 @@ export class BlockProposalHash extends Buffer32 {
* be included in the head of the chain
*/
export class BlockProposal extends Gossipable {
static override p2pTopic = createTopicString(TopicType.block_proposal);
static override p2pTopic = TopicType.block_proposal;

private sender: EthAddress | undefined;

Expand Down
4 changes: 3 additions & 1 deletion yarn-project/stdlib/src/p2p/gossipable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { Buffer32 } from '@aztec/foundation/buffer';

import type { TopicType } from './topic_type.js';

/**
* Gossipable
*
Expand All @@ -10,7 +12,7 @@ export abstract class Gossipable {
*
* - The p2p topic identifier, this determines how the message is handled
*/
static p2pTopic: string;
static p2pTopic: TopicType;

/** p2p Message Identifier
*
Expand Down
24 changes: 0 additions & 24 deletions yarn-project/stdlib/src/p2p/interface.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,4 @@
import { Tx } from '../tx/tx.js';
import { BlockAttestation } from './block_attestation.js';
import { BlockProposal } from './block_proposal.js';
import type { Gossipable } from './gossipable.js';
import { TopicType } from './topic_type.js';

export interface RawGossipMessage {
topic: string;
data: Uint8Array;
}

// Force casts as we know that each field here extends Gossipable, and we just want types from Gossipable
export const TopicTypeMap: Record<string, typeof Gossipable> = {
[TopicType.tx]: Tx as unknown as typeof Gossipable,
[TopicType.block_proposal]: BlockProposal as unknown as typeof Gossipable,
[TopicType.block_attestation]: BlockAttestation as unknown as typeof Gossipable,
};

/**
* Map from topic to deserialiser
*
* Used in msgIdFn libp2p to get the p2pMessageIdentifier from a message
*/
export const TopicToDeserializer = {
[Tx.p2pTopic]: Tx.fromBuffer,
[BlockProposal.p2pTopic]: BlockProposal.fromBuffer,
[BlockAttestation.p2pTopic]: BlockAttestation.fromBuffer,
};
8 changes: 4 additions & 4 deletions yarn-project/stdlib/src/p2p/topic_type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { P2PClientType } from './client_type.js';
* @param topicType
* @returns
*/
export function createTopicString(topicType: TopicType) {
return '/aztec/' + topicType + '/0.1.0';
export function createTopicString(topicType: TopicType, protocolVersion: string) {
return `/aztec/${TopicType[topicType]}/${protocolVersion}`;
}

/**
Expand Down Expand Up @@ -35,10 +35,10 @@ export function getTopicTypeForClientType(clientType: P2PClientType) {
* ...
* }
*/
export function metricsTopicStrToLabels() {
export function metricsTopicStrToLabels(protocolVersion: string) {
const topicStrToLabel = new Map<string, string>();
for (const topic in TopicType) {
topicStrToLabel.set(createTopicString(TopicType[topic as keyof typeof TopicType]), topic);
topicStrToLabel.set(createTopicString(TopicType[topic as keyof typeof TopicType], protocolVersion), topic);
}

return topicStrToLabel;
Expand Down
9 changes: 2 additions & 7 deletions yarn-project/stdlib/src/tx/tx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import type { ScopedLogHash } from '../kernel/log_hash.js';
import { PrivateKernelTailCircuitPublicInputs } from '../kernel/private_kernel_tail_circuit_public_inputs.js';
import { ContractClassLog } from '../logs/contract_class_log.js';
import { Gossipable } from '../p2p/gossipable.js';
import { TopicType, createTopicString } from '../p2p/topic_type.js';
import { TopicType } from '../p2p/topic_type.js';
import { ClientIvcProof } from '../proofs/client_ivc_proof.js';
import type { TxStats } from '../stats/stats.js';
import { HashedValues } from './hashed_values.js';
Expand All @@ -26,7 +26,7 @@ import { TxHash } from './tx_hash.js';
* The interface of an L2 transaction.
*/
export class Tx extends Gossipable {
static override p2pTopic: string;
static override p2pTopic = TopicType.tx;
// For memoization
private txHash: TxHash | undefined;
private calldataMap: Map<string, Fr[]> | undefined;
Expand All @@ -53,11 +53,6 @@ export class Tx extends Gossipable {
super();
}

// Gossipable method
static {
this.p2pTopic = createTopicString(TopicType.tx);
}

// Gossipable method
override async p2pMessageIdentifier(): Promise<Buffer32> {
return new Buffer32((await this.getTxHash()).toBuffer());
Expand Down