Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ export type EnvVar =
| 'P2P_PRIVATE_PEERS'
| 'P2P_MAX_TX_POOL_SIZE'
| 'P2P_TX_POOL_OVERFLOW_FACTOR'
| 'P2P_SEEN_MSG_CACHE_SIZE'
| 'PEER_ID_PRIVATE_KEY'
| 'PEER_ID_PRIVATE_KEY_PATH'
| 'PROVER_AGENT_COUNT'
Expand Down
10 changes: 10 additions & 0 deletions yarn-project/p2p/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ export interface P2PConfig extends P2PReqRespConfig, ChainConfig {
* If the pool is full, it will still accept a few more txs until it reached maxTxPoolOverspillFactor * maxTxPoolSize. Then it will evict
*/
txPoolOverflowFactor: number;

/**
* The node's seen message ID cache size
*/
seenMessageCacheSize: number;
}

export const DEFAULT_P2P_PORT = 40400;
Expand Down Expand Up @@ -404,6 +409,11 @@ export const p2pConfigMappings: ConfigMappingsType<P2PConfig> = {
description: 'How much the tx pool can overflow before it starts evicting txs. Must be greater than 1',
...floatConfigHelper(1.1), // 10% overflow
},
seenMessageCacheSize: {
env: 'P2P_SEEN_MSG_CACHE_SIZE',
description: 'The number of messages to keep in the seen message cache',
...numberConfigHelper(100_000), // 100K
},
...p2pReqRespConfigMappings,
...chainConfigMappings,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { Fr } from '@aztec/foundation/fields';

import { MessageSeenValidator } from './msg_seen_validator.js';

describe('MsgSeenValidator', () => {
let validator: MessageSeenValidator;

const makeMsgId = () => Fr.random().toString();

it('throws if created with invalid length', () => {
expect(() => new MessageSeenValidator(0)).toThrow('Queue length must be greater than 0');
expect(() => new MessageSeenValidator(-1)).toThrow('Queue length must be greater than 0');
});

it('adds a message successfully', () => {
validator = new MessageSeenValidator(10); // 10 messages max
const msgId = makeMsgId();
expect(validator.addMessage(msgId)).toBe(true);
});

it('fails to add a message that is already in the queue', () => {
validator = new MessageSeenValidator(10); // 10 messages max
const msgId = makeMsgId();
validator.addMessage(msgId);
// should fail
expect(validator.addMessage(msgId)).toBe(false);
});

it('adds a duplicate message after it has exited the queue', () => {
validator = new MessageSeenValidator(10); // 10 messages max
const msgId = makeMsgId();

expect(validator.addMessage(msgId)).toBe(true);

// Can't add the message again
expect(validator.addMessage(msgId)).toBe(false);

// add 9 more messages
for (let i = 0; i < 9; i++) {
expect(validator.addMessage(makeMsgId())).toBe(true);
}

// Still can't add the message
expect(validator.addMessage(msgId)).toBe(false);

// Now add one more
expect(validator.addMessage(makeMsgId())).toBe(true);

// now we should be able to add the first message again
expect(validator.addMessage(msgId)).toBe(true);
});

it('can take many more messages than the queue length', () => {
validator = new MessageSeenValidator(10); // 10 messages max

// add 1000 messages
for (let i = 0; i < 1000; i++) {
expect(validator.addMessage(makeMsgId())).toBe(true);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: I'd add expect(validator.queue.length === 10) just to be sure this is not growing indefinitely.

}
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Implements a queue of message IDs
export class MessageSeenValidator {
private queue: Array<string>;
private writePointer = 0;
private seenMessages: Set<string> = new Set();

constructor(private queueLength: number) {
if (this.queueLength <= 0) {
throw new Error('Queue length must be greater than 0');
}
this.queue = new Array<string>(this.queueLength);
}

// Adds a message if not seen before. Returns true if added, false if already seen.
public addMessage(msgId: string): boolean {
// Check if the message is already in the cache
if (this.seenMessages.has(msgId)) {
return false;
}
// If we are at the cache limit, remove the oldest msg ID
if (this.seenMessages.size >= this.queueLength) {
const msgToRemove = this.queue[this.writePointer];
this.seenMessages.delete(msgToRemove);
}

// Insert the message into the cache and the queue
this.seenMessages.add(msgId);
this.queue[this.writePointer] = msgId;
this.writePointer = this.writePointer === this.queueLength - 1 ? 0 : this.writePointer + 1;
return true;
}

public size() {
return this.seenMessages.size;
}
}
43 changes: 42 additions & 1 deletion yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import { createLibp2p } from 'libp2p';
import type { P2PConfig } from '../../config.js';
import type { MemPools } from '../../mem_pools/interface.js';
import { AttestationValidator, BlockProposalValidator } from '../../msg_validators/index.js';
import { MessageSeenValidator } from '../../msg_validators/msg_seen_validator/msg_seen_validator.js';
import { getDefaultAllowedSetupFunctions } from '../../msg_validators/tx_validator/allowed_public_setup.js';
import { type MessageValidator, createTxMessageValidators } from '../../msg_validators/tx_validator/factory.js';
import { DoubleSpendTxValidator, TxProofValidator } from '../../msg_validators/tx_validator/index.js';
Expand Down Expand Up @@ -80,6 +81,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
private jobQueue: SerialQueue = new SerialQueue();
private peerManager: PeerManager;
private discoveryRunningPromise?: RunningPromise;
private msgIdSeenValidators: Record<TopicType, MessageSeenValidator> = {} as Record<TopicType, MessageSeenValidator>;

// Message validators
private attestationValidator: AttestationValidator;
Expand Down Expand Up @@ -120,6 +122,10 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
) {
super(telemetry, 'LibP2PService');

this.msgIdSeenValidators[TopicType.tx] = new MessageSeenValidator(config.seenMessageCacheSize);
this.msgIdSeenValidators[TopicType.block_proposal] = new MessageSeenValidator(config.seenMessageCacheSize);
this.msgIdSeenValidators[TopicType.block_attestation] = new MessageSeenValidator(config.seenMessageCacheSize);

const versions = getVersions(config);
this.protocolVersion = compressComponentVersions(versions);
logger.info(`Started libp2p service with protocol version ${this.protocolVersion}`);
Expand Down Expand Up @@ -494,6 +500,28 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
return result.recipients.length;
}

protected preValidateReceivedMessage(msg: Message, msgId: string, source: PeerId) {
const getValidator = () => {
if (msg.topic === this.topicStrings[TopicType.tx]) {
return this.msgIdSeenValidators[TopicType.tx];
}
if (msg.topic === this.topicStrings[TopicType.block_attestation]) {
return this.msgIdSeenValidators[TopicType.block_attestation];
}
if (msg.topic === this.topicStrings[TopicType.block_proposal]) {
return this.msgIdSeenValidators[TopicType.block_proposal];
}
};
Comment thread
PhilWindle marked this conversation as resolved.

const validator = getValidator();
Comment on lines +504 to +517

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be simplified to const validator = this.msgIdSeenValidators[msg.topic]; given that we check taht validator exist: if(!validator ...).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tried this but TopicType.tx is just a prefix of msg.topic (the topic of the actual message contains the compressed version string as a suffix)


if (!validator || !validator.addMessage(msgId)) {
this.node.services.pubsub.reportMessageValidationResult(msgId, source.toString(), TopicValidatorResult.Ignore);
return false;
}
return true;
}

/**
* Handles a new gossip message that was received by the client.
* @param topic - The message's topic.
Expand All @@ -508,6 +536,11 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
messageId: p2pMessage.id,
messageLatency,
});

if (!this.preValidateReceivedMessage(msg, msgId, source)) {
return;
}

if (msg.topic === this.topicStrings[TopicType.tx]) {
await this.handleGossipedTx(p2pMessage.payload, msgId, source);
}
Expand Down Expand Up @@ -621,15 +654,23 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
[Attributes.P2P_ID]: await block.p2pMessageIdentifier().then(i => i.toString()),
}))
private async processValidBlockProposal(block: BlockProposal, sender: PeerId) {
const slot = block.slotNumber.toBigInt();
const previousSlot = slot - 1n;
const epoch = slot / 32n;
this.logger.verbose(
`Received block ${block.blockNumber.toNumber()} for slot ${block.slotNumber.toNumber()} from external peer.`,
`Received block ${block.blockNumber.toNumber()} for slot ${slot}, epoch ${epoch} from external peer.`,
{
p2pMessageIdentifier: await block.p2pMessageIdentifier(),
slot: block.slotNumber.toNumber(),
archive: block.archive.toString(),
block: block.blockNumber.toNumber(),
},
);
const attestationsForPreviousSlot = await this.mempools.attestationPool?.getAttestationsForSlot(previousSlot);
if (attestationsForPreviousSlot !== undefined) {
this.logger.verbose(`Received ${attestationsForPreviousSlot.length} attestations for slot ${previousSlot}`);
}

// Mark the txs in this proposal as non-evictable
await this.mempools.txPool.markTxsAsNonEvictable(block.payload.txHashes);
const attestation = await this.blockReceivedCallback(block, sender);
Expand Down