diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts index e7f1c35ba152..a769fef1f162 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool.ts @@ -21,6 +21,15 @@ export interface AttestationPool { */ getBlockProposal(id: string): Promise; + /** + * Check if a block proposal exists in the pool + * + * @param idOrProposal - The ID of the block proposal or the block proposal itself to check. The ID is proposal.payload.archive + * + * @return True if the block proposal exists, false otherwise. + */ + hasBlockProposal(idOrProposal: string | BlockProposal): Promise; + /** * AddAttestations * @@ -84,6 +93,14 @@ export interface AttestationPool { */ getAttestationsForSlotAndProposal(slot: bigint, proposalId: string): Promise; + /** + * Check if a specific attestation exists in the pool + * + * @param attestation - The attestation to check + * @return True if the attestation exists, false otherwise + */ + hasAttestation(attestation: BlockAttestation): Promise; + /** Returns whether the pool is empty. */ isEmpty(): Promise; } diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts index f731a746d54f..f36f8111ce3c 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/attestation_pool_test_suite.ts @@ -72,6 +72,11 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo expect(retrievedAttestations.length).toBe(attestations.length); compareAttestations(retrievedAttestations, attestations); + // Check hasAttestation for added attestations + for (const attestation of attestations) { + expect(await ap.hasAttestation(attestation)).toBe(true); + } + const retrievedAttestationsForSlot = await ap.getAttestationsForSlot(BigInt(slotNumber)); expect(retrievedAttestationsForSlot.length).toBe(attestations.length); compareAttestations(retrievedAttestationsForSlot, attestations); @@ -85,6 +90,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo ); expect(retrievedAttestationsAfterAdd.length).toBe(attestations.length + 1); compareAttestations(retrievedAttestationsAfterAdd, [...attestations, newAttestation]); + expect(await ap.hasAttestation(newAttestation)).toBe(true); const retrievedAttestationsForSlotAfterAdd = await ap.getAttestationsForSlot(BigInt(slotNumber)); expect(retrievedAttestationsForSlotAfterAdd.length).toBe(attestations.length + 1); compareAttestations(retrievedAttestationsForSlotAfterAdd, [...attestations, newAttestation]); @@ -97,6 +103,11 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo archive.toString(), ); expect(retreivedAttestationsAfterDelete.length).toBe(0); + // Check hasAttestation after deletion + for (const attestation of attestations) { + expect(await ap.hasAttestation(attestation)).toBe(false); + } + expect(await ap.hasAttestation(newAttestation)).toBe(false); }); it('should handle duplicate proposals in a slot', async () => { @@ -170,10 +181,20 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo expect(retreivedAttestations.length).toBe(NUMBER_OF_SIGNERS_PER_TEST); compareAttestations(retreivedAttestations, attestations); + // Check hasAttestation before deletion + for (const attestation of attestations) { + expect(await ap.hasAttestation(attestation)).toBe(true); + } + await ap.deleteAttestations(attestations); const gottenAfterDelete = await ap.getAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); expect(gottenAfterDelete.length).toBe(0); + + // Check hasAttestation after deletion + for (const attestation of attestations) { + expect(await ap.hasAttestation(attestation)).toBe(false); + } }); it('should blanket delete attestations per slot', async () => { @@ -265,12 +286,19 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo expect(retrievedProposal).toBeDefined(); expect(retrievedProposal!).toEqual(proposal); + + // Check hasBlockProposal with both id and object + expect(await ap.hasBlockProposal(proposalId)).toBe(true); + expect(await ap.hasBlockProposal(proposal)).toBe(true); }); it('should return undefined for non-existent block proposal', async () => { const nonExistentId = Fr.random().toString(); const retrievedProposal = await ap.getBlockProposal(nonExistentId); expect(retrievedProposal).toBeUndefined(); + + // Check hasBlockProposal returns false for non-existent proposal + expect(await ap.hasBlockProposal(nonExistentId)).toBe(false); }); it('should update block proposal if added twice with same id', async () => { @@ -323,6 +351,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo // Verify proposal exists let retrievedProposal = await ap.getBlockProposal(proposalId); expect(retrievedProposal).toBeDefined(); + expect(await ap.hasBlockProposal(proposalId)).toBe(true); // Delete attestations for slot and proposal await ap.deleteAttestationsForSlotAndProposal(BigInt(slotNumber), proposalId); @@ -330,6 +359,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo // Proposal should be deleted retrievedProposal = await ap.getBlockProposal(proposalId); expect(retrievedProposal).toBeUndefined(); + expect(await ap.hasBlockProposal(proposalId)).toBe(false); }); it('should delete block proposal when deleting attestations for slot', async () => { @@ -344,6 +374,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo // Verify proposal exists let retrievedProposal = await ap.getBlockProposal(proposalId); expect(retrievedProposal).toBeDefined(); + expect(await ap.hasBlockProposal(proposal)).toBe(true); // Delete attestations for slot await ap.deleteAttestationsForSlot(BigInt(slotNumber)); @@ -351,6 +382,7 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo // Proposal should be deleted retrievedProposal = await ap.getBlockProposal(proposalId); expect(retrievedProposal).toBeUndefined(); + expect(await ap.hasBlockProposal(proposal)).toBe(false); }); it('should be able to fetch both block proposal and attestations', async () => { @@ -372,8 +404,13 @@ export function describeAttestationPool(getAttestationPool: () => AttestationPoo expect(retrievedProposal).toBeDefined(); expect(retrievedProposal).toEqual(proposal); + expect(await ap.hasBlockProposal(proposalId)).toBe(true); compareAttestations(retrievedAttestations, attestations); + // Check hasAttestation for all attestations + for (const attestation of attestations) { + expect(await ap.hasAttestation(attestation)).toBe(true); + } }); }); } diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts index 7b9c209d29b2..24fd544ca098 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/kv_attestation_pool.ts @@ -213,6 +213,22 @@ export class KvAttestationPool implements AttestationPool { }); } + public async hasAttestation(attestation: BlockAttestation): Promise { + const slotNumber = attestation.payload.header.slotNumber; + const proposalId = attestation.archive; + const sender = attestation.getSender(); + + // Attestations with invalid signatures are never in the pool + if (!sender) { + return false; + } + + const address = sender.toString(); + const key = this.getAttestationKey(slotNumber, proposalId, address); + + return await this.attestations.hasAsync(key); + } + public async getBlockProposal(id: string): Promise { const buffer = await this.proposals.getAsync(id); try { @@ -226,6 +242,11 @@ export class KvAttestationPool implements AttestationPool { return Promise.resolve(undefined); } + public async hasBlockProposal(idOrProposal: string | BlockProposal): Promise { + const id = typeof idOrProposal === 'string' ? idOrProposal : idOrProposal.payload.archive.toString(); + return await this.proposals.hasAsync(id); + } + public async addBlockProposal(blockProposal: BlockProposal): Promise { await this.store.transactionAsync(async () => { await this.proposalsForSlot.set(blockProposal.slotNumber.toString(), blockProposal.archive.toString()); diff --git a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts index fdcae61f6034..44494f4c7b7a 100644 --- a/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts +++ b/yarn-project/p2p/src/mem_pools/attestation_pool/memory_attestation_pool.ts @@ -173,6 +173,29 @@ export class InMemoryAttestationPool implements AttestationPool { return Promise.resolve(); } + public hasAttestation(attestation: BlockAttestation): Promise { + const slotNumber = attestation.payload.header.slotNumber; + const proposalId = attestation.archive.toString(); + const sender = attestation.getSender(); + + // Attestations with invalid signatures are never in the pool + if (!sender) { + return Promise.resolve(false); + } + + const slotAttestationMap = this.attestations.get(slotNumber.toBigInt()); + if (!slotAttestationMap) { + return Promise.resolve(false); + } + + const proposalAttestationMap = slotAttestationMap.get(proposalId); + if (!proposalAttestationMap) { + return Promise.resolve(false); + } + + return Promise.resolve(proposalAttestationMap.has(sender.toString())); + } + public addBlockProposal(blockProposal: BlockProposal): Promise { // We initialize slot-proposal mapping if it does not exist // This is important to ensure we can delete this proposal if there were not attestations for it @@ -186,6 +209,11 @@ export class InMemoryAttestationPool implements AttestationPool { public getBlockProposal(id: string): Promise { return Promise.resolve(this.proposals.get(id)); } + + public hasBlockProposal(idOrProposal: string | BlockProposal): Promise { + const id = typeof idOrProposal === 'string' ? idOrProposal : idOrProposal.payload.archive.toString(); + return Promise.resolve(this.proposals.has(id)); + } } /** diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts index afe43861452a..622d70163f46 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts @@ -273,6 +273,11 @@ export class AztecKVTxPool extends (EventEmitter as new () => TypedEventEmitter< return await Promise.all(txHashes.map(txHash => this.#txs.hasAsync(txHash.toString()))); } + async hasTx(txHash: TxHash): Promise { + const result = await this.hasTxs([txHash]); + return result[0]; + } + /** * Checks if an archived tx exists and returns it. * @param txHash - The tx hash. diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts index 113e9d6a8e5d..f08775868dcd 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts @@ -151,6 +151,11 @@ export class InMemoryTxPool extends (EventEmitter as new () => TypedEventEmitter return Promise.resolve(txHashes.map(txHash => this.txs.has(txHash.toBigInt()))); } + async hasTx(txHash: TxHash): Promise { + const result = await this.hasTxs([txHash]); + return result[0]; + } + public getArchivedTxByHash(): Promise { return Promise.resolve(undefined); } diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts index f1011641278f..5c25cb7f68e3 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts @@ -43,6 +43,13 @@ export interface TxPool extends TypedEventEmitter { */ hasTxs(txHashes: TxHash[]): Promise; + /** + * Checks if a transaction exists in the pool + * @param txHash - The hash of the transaction to check for + * @returns True if the transaction exists, false otherwise + */ + hasTx(txHash: TxHash): Promise; + /** * Checks if an archived transaction exists in the pool and returns it. * @param txHash - The hash of the transaction, used as an ID. diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index d1a07811ea4e..007eaa458244 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -107,6 +107,11 @@ interface ValidationResult { type ValidationOutcome = { allPassed: true } | { allPassed: false; failure: ValidationResult }; +// REFACTOR: Unify with the type above +type ReceivedMessageValidationResult = + | { obj: T; result: Exclude } + | { obj?: undefined; result: TopicValidatorResult.Reject }; + /** * Lib P2P implementation of the P2PService interface. */ @@ -615,6 +620,11 @@ export class LibP2PService extends return result.recipients.length; } + /** + * Checks if this message has already been seen, based on its msgId computed from hashing the message data. + * Note that we do not rely on the seenCache from gossipsub since we want to keep a longer history of seen + * messages to avoid tx echoes across the network. + */ protected preValidateReceivedMessage( msg: Message, msgId: string, @@ -678,42 +688,57 @@ export class LibP2PService extends } protected async validateReceivedMessage( - validationFunc: () => Promise<{ result: boolean; obj: T }>, + validationFunc: () => Promise>, msgId: string, source: PeerId, topicType: TopicType, - ): Promise<{ result: boolean; obj: T | undefined }> { - let resultAndObj: { result: boolean; obj: T | undefined } = { result: false, obj: undefined }; + ): Promise> { + let resultAndObj: ReceivedMessageValidationResult = { result: TopicValidatorResult.Reject }; const timer = new Timer(); try { resultAndObj = await validationFunc(); } catch (err) { - this.logger.error(`Error deserializing and validating message `, err); + this.logger.error(`Error deserializing and validating gossipsub message`, err, { + msgId, + source: source.toString(), + topicType, + }); } - if (resultAndObj.result) { + if (resultAndObj.result === TopicValidatorResult.Accept) { this.instrumentation.recordMessageValidation(topicType, timer); } - this.node.services.pubsub.reportMessageValidationResult( - msgId, - source.toString(), - resultAndObj.result && resultAndObj.obj ? TopicValidatorResult.Accept : TopicValidatorResult.Reject, - ); + this.node.services.pubsub.reportMessageValidationResult(msgId, source.toString(), resultAndObj.result); return resultAndObj; } protected async handleGossipedTx(payloadData: Buffer, msgId: string, source: PeerId) { - const validationFunc = async () => { + const validationFunc: () => Promise> = async () => { const tx = Tx.fromBuffer(payloadData); - const result = await this.validatePropagatedTx(tx, source); - return { result, obj: tx }; + const isValid = await this.validatePropagatedTx(tx, source); + const exists = isValid && (await this.mempools.txPool.hasTx(tx.getTxHash())); + + this.logger.trace(`Validate propagated tx`, { + isValid, + exists, + [Attributes.P2P_ID]: source.toString(), + }); + + if (!isValid) { + return { result: TopicValidatorResult.Reject }; + } else if (exists) { + return { result: TopicValidatorResult.Ignore, obj: tx }; + } else { + return { result: TopicValidatorResult.Accept, obj: tx }; + } }; const { result, obj: tx } = await this.validateReceivedMessage(validationFunc, msgId, source, TopicType.tx); - if (!result || !tx) { + if (result !== TopicValidatorResult.Accept || !tx) { return; } + const txHash = tx.getTxHash(); const txHashString = txHash.toString(); this.logger.verbose(`Received tx ${txHashString} from external peer ${source.toString()} via gossip`, { @@ -722,7 +747,7 @@ export class LibP2PService extends }); if (this.config.dropTransactions && randomInt(1000) < this.config.dropTransactionsProbability * 1000) { - this.logger.debug(`Intentionally dropping tx ${txHashString} (probability rule)`); + this.logger.warn(`Intentionally dropping tx ${txHashString} (probability rule)`); return; } @@ -736,14 +761,25 @@ export class LibP2PService extends * @param attestation - The attestation to process. */ private async processAttestationFromPeer(payloadData: Buffer, msgId: string, source: PeerId): Promise { - const validationFunc = async () => { + const validationFunc: () => Promise> = async () => { const attestation = BlockAttestation.fromBuffer(payloadData); - const result = await this.validateAttestation(source, attestation); - this.logger.trace(`validatePropagatedAttestation: ${result}`, { + const isValid = await this.validateAttestation(source, attestation); + const exists = isValid && (await this.mempools.attestationPool!.hasAttestation(attestation)); + + this.logger.trace(`Validate propagated block attestation`, { + isValid, + exists, [Attributes.SLOT_NUMBER]: attestation.payload.header.slotNumber.toString(), [Attributes.P2P_ID]: source.toString(), }); - return { result, obj: attestation }; + + if (!isValid) { + return { result: TopicValidatorResult.Reject }; + } else if (exists) { + return { result: TopicValidatorResult.Ignore, obj: attestation }; + } else { + return { result: TopicValidatorResult.Accept, obj: attestation }; + } }; const { result, obj: attestation } = await this.validateReceivedMessage( @@ -752,9 +788,11 @@ export class LibP2PService extends source, TopicType.block_attestation, ); - if (!result || !attestation) { + + if (result !== TopicValidatorResult.Accept || !attestation) { return; } + this.logger.debug( `Received attestation for slot ${attestation.slotNumber.toNumber()} from external peer ${source.toString()}`, { @@ -764,18 +802,30 @@ export class LibP2PService extends source: source.toString(), }, ); + await this.mempools.attestationPool!.addAttestations([attestation]); } private async processBlockFromPeer(payloadData: Buffer, msgId: string, source: PeerId): Promise { - const validationFunc = async () => { + const validationFunc: () => Promise> = async () => { const block = BlockProposal.fromBuffer(payloadData); - const result = await this.validateBlockProposal(source, block); - this.logger.trace(`validatePropagatedBlock: ${result}`, { + const isValid = await this.validateBlockProposal(source, block); + const exists = isValid && (await this.mempools.attestationPool!.hasBlockProposal(block)); + + this.logger.trace(`Validate propagated block proposal`, { + isValid, + exists, [Attributes.SLOT_NUMBER]: block.payload.header.slotNumber.toString(), [Attributes.P2P_ID]: source.toString(), }); - return { result, obj: block }; + + if (!isValid) { + return { result: TopicValidatorResult.Reject }; + } else if (exists) { + return { result: TopicValidatorResult.Ignore, obj: block }; + } else { + return { result: TopicValidatorResult.Accept, obj: block }; + } }; const { result, obj: block } = await this.validateReceivedMessage( @@ -784,6 +834,7 @@ export class LibP2PService extends source, TopicType.block_proposal, ); + if (!result || !block) { return; } diff --git a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts index f979280ef780..aecb2e4c7991 100644 --- a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts +++ b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts @@ -52,6 +52,7 @@ function mockTxPool(): TxPool { getTxStatus: () => Promise.resolve(TxStatus.PENDING), getTxsByHash: () => Promise.resolve([]), hasTxs: () => Promise.resolve([]), + hasTx: () => Promise.resolve(false), updateConfig: () => {}, markTxsAsNonEvictable: () => Promise.resolve(), cleanupDeletedMinedTxs: () => Promise.resolve(0), @@ -71,6 +72,8 @@ function mockAttestationPool(): AttestationPool { getAttestationsForSlotAndProposal: () => Promise.resolve([]), addBlockProposal: () => Promise.resolve(), getBlockProposal: () => Promise.resolve(undefined), + hasBlockProposal: () => Promise.resolve(false), + hasAttestation: () => Promise.resolve(false), }; } diff --git a/yarn-project/stdlib/src/p2p/gossipable.ts b/yarn-project/stdlib/src/p2p/gossipable.ts index 138e56f473a9..bc3a00ff88b3 100644 --- a/yarn-project/stdlib/src/p2p/gossipable.ts +++ b/yarn-project/stdlib/src/p2p/gossipable.ts @@ -28,15 +28,12 @@ export class P2PMessage { */ export abstract class Gossipable { private cachedId: Buffer32 | undefined; - /** p2p Topic - * - * - The p2p topic identifier, this determines how the message is handled - */ + /** The p2p topic identifier, this determines how the message is handled */ static p2pTopic: TopicType; - /** p2p Message Identifier - * - * - A digest of the message information, this key is used for deduplication + /** + * A digest of the message information **used for logging only**. + * The identifier used for deduplication is `getMsgIdFn` as defined in `encoding.ts` which is a hash over topic and data. */ async p2pMessageIdentifier(): Promise { if (this.cachedId) { @@ -48,10 +45,6 @@ export abstract class Gossipable { abstract generateP2PMessageIdentifier(): Promise; - /** To Buffer - * - * - Serialization method - */ abstract toBuffer(): Buffer; toMessage(): Buffer { @@ -60,7 +53,6 @@ export abstract class Gossipable { /** * Get the size of the gossipable object. - * * This is used for metrics recording. */ abstract getSize(): number;