diff --git a/yarn-project/p2p/src/client/interface.ts b/yarn-project/p2p/src/client/interface.ts index bdcd6bf7e2e3..b9579f7d9aba 100644 --- a/yarn-project/p2p/src/client/interface.ts +++ b/yarn-project/p2p/src/client/interface.ts @@ -1,11 +1,13 @@ import type { L2BlockId } from '@aztec/stdlib/block'; import type { P2PApi } from '@aztec/stdlib/interfaces/server'; -import { BlockAttestation, type BlockProposal, type P2PClientType } from '@aztec/stdlib/p2p'; +import type { BlockProposal, P2PClientType } from '@aztec/stdlib/p2p'; import type { Tx, TxHash } from '@aztec/stdlib/tx'; import type { ENR } from '@chainsafe/enr'; +import type { PeerId } from '@libp2p/interface'; import type { P2PConfig } from '../config.js'; +import type { P2PBlockReceivedCallback } from '../services/service.js'; /** * Enum defining the possible states of the p2p client. @@ -50,14 +52,15 @@ export type P2P = P2PApi & { */ // REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963 // ^ This pattern is not my favorite (md) - registerBlockProposalHandler(handler: (block: BlockProposal) => Promise): void; + registerBlockProposalHandler(callback: P2PBlockReceivedCallback): void; /** * Request a list of transactions from another peer by their tx hashes. * @param txHashes - Hashes of the txs to query. + * @param pinnedPeerId - An optional peer id that will be used to request the tx from (in addition to other random peers). * @returns A list of transactions or undefined if the transactions are not found. */ - requestTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]>; + requestTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId): Promise<(Tx | undefined)[]>; /** * Request a transaction from another peer by its tx hash. @@ -115,9 +118,10 @@ export type P2P = P2PApi & { /** * Returns transactions in the transaction pool by hash, requesting from the network if not found. * @param txHashes - Hashes of tx to return. + * @param pinnedPeerId - An optional peer id that will be used to request the tx from (in addition to other random peers). * @returns An array of tx or undefined. */ - getTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]>; + getTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId | undefined): Promise<(Tx | undefined)[]>; /** * Returns an archived transaction from the transaction pool by its hash. diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index 5ff37d5c0bb3..1247e42e69ba 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -1,5 +1,4 @@ import { MockL2BlockSource } from '@aztec/archiver/test'; -import { times } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/fields'; import { retryUntil } from '@aztec/foundation/retry'; import type { AztecAsyncKVStore } from '@aztec/kv-store'; @@ -15,9 +14,10 @@ import { InMemoryAttestationPool, type P2PService } from '../index.js'; import type { AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js'; import type { MemPools } from '../mem_pools/interface.js'; import type { TxPool } from '../mem_pools/tx_pool/index.js'; +import { ReqRespSubProtocol } from '../services/reqresp/interface.js'; import { P2PClient } from './p2p_client.js'; -describe('In-Memory P2P Client', () => { +describe('P2P Client', () => { let txPool: MockProxy; let attestationPool: AttestationPool; let mempools: MemPools; @@ -129,25 +129,36 @@ describe('In-Memory P2P Client', () => { }); it('request transactions handles missing items', async () => { - // Mock a batch response that returns undefined items const mockTx1 = await mockTx(); const mockTx2 = await mockTx(); - txPool.hasTxs.mockImplementation(txHashes => Promise.resolve(times(txHashes.length, () => true))); - p2pService.sendBatchRequest.mockResolvedValue([mockTx1, undefined, mockTx2]); + const mockTx3 = await mockTx(); + + // P2P service will not return tx2 + p2pService.sendBatchRequest.mockResolvedValue([mockTx1, undefined, mockTx3]); // Spy on the tx pool addTxs method, it should not be called for the missing tx const addTxsSpy = jest.spyOn(txPool, 'addTxs'); - await client.start(); + // We query for all 3 txs + const query = await Promise.all([mockTx1.getTxHash(), mockTx2.getTxHash(), mockTx3.getTxHash()]); + const results = await client.requestTxsByHash(query, undefined); - const missingTxHash = (await mockTx()).getTxHash(); - const query = await Promise.all([mockTx1.getTxHash(), missingTxHash, mockTx2.getTxHash()]); - const results = await client.requestTxsByHash(query); + // We should receive the found transactions + expect(results).toEqual([mockTx1, undefined, mockTx3]); - expect(results).toEqual([mockTx1, undefined, mockTx2]); + // P2P should have been called with the 3 tx hashes + expect(p2pService.sendBatchRequest).toHaveBeenCalledWith( + ReqRespSubProtocol.TX, + query, + undefined, + expect.anything(), + expect.anything(), + expect.anything(), + ); + // Retrieved txs should have been added to the pool expect(addTxsSpy).toHaveBeenCalledTimes(1); - expect(addTxsSpy).toHaveBeenCalledWith([mockTx1, mockTx2]); + expect(addTxsSpy).toHaveBeenCalledWith([mockTx1, mockTx3]); }); it('getTxsByHash handles missing items', async () => { @@ -173,14 +184,17 @@ describe('In-Memory P2P Client', () => { await client.start(); const query = await Promise.all([txInMempool.getTxHash(), txToBeRequested.getTxHash(), txToNotBeFound.getTxHash()]); - const results = await client.getTxsByHash(query); + const results = await client.getTxsByHash(query, undefined); // We should return the resolved transactions expect(results).toEqual([txInMempool, txToBeRequested]); // We should add the found requested transactions to the pool expect(addTxsSpy).toHaveBeenCalledWith([txToBeRequested]); // We should request the missing transactions from the network, but only find one of them - expect(requestTxsSpy).toHaveBeenCalledWith([await txToBeRequested.getTxHash(), await txToNotBeFound.getTxHash()]); + expect(requestTxsSpy).toHaveBeenCalledWith( + [await txToBeRequested.getTxHash(), await txToNotBeFound.getTxHash()], + undefined, + ); }); describe('Chain prunes', () => { diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index f81aa04ac4cb..aa3ff9cef77e 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -24,6 +24,7 @@ import { } from '@aztec/telemetry-client'; import type { ENR } from '@chainsafe/enr'; +import type { PeerId } from '@libp2p/interface'; import { type P2PConfig, getP2PDefaultConfig } from '../config.js'; import type { AttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js'; @@ -31,6 +32,7 @@ import type { MemPools } from '../mem_pools/interface.js'; import type { TxPool } from '../mem_pools/tx_pool/index.js'; import { ReqRespSubProtocol } from '../services/reqresp/interface.js'; import type { P2PService } from '../services/service.js'; +import { TxCollector } from '../services/tx_collector.js'; import { type P2P, P2PClientState, type P2PSyncState } from './interface.js'; /** @@ -87,6 +89,15 @@ export class P2PClient this.txPool = mempools.txPool; this.attestationPool = mempools.attestationPool!; + // Default to collecting all txs when we see a valid proposal + // This can be overridden by the validator client to attest, and it will call collectForBlockProposal on its own + const txCollector = new TxCollector(this, this.log); + this.registerBlockProposalHandler(async (block, sender) => { + this.log.debug(`Received block proposal from ${sender.toString()}`); + await txCollector.collectForBlockProposal(block, sender); + return undefined; + }); + // REFACTOR: Try replacing these with an L2TipsStore this.synchedBlockHashes = store.openMap('p2p_pool_block_hashes'); this.synchedLatestBlockNumber = store.openSingleton('p2p_pool_last_l2_block'); @@ -328,7 +339,9 @@ export class P2PClient // REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963 // ^ This pattern is not my favorite (md) - public registerBlockProposalHandler(handler: (block: BlockProposal) => Promise): void { + public registerBlockProposalHandler( + handler: (block: BlockProposal, sender: any) => Promise, + ): void { this.p2pService.registerBlockReceivedCallback(handler); } @@ -357,7 +370,7 @@ export class P2PClient /** * Uses the batched Request Response protocol to request a set of transactions from the network. */ - public async requestTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> { + public async requestTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId | undefined): Promise<(Tx | undefined)[]> { const timeoutMs = 8000; // Longer timeout for now const maxPeers = Math.min(Math.ceil(txHashes.length / 3), 10); const maxRetryAttempts = 10; // Keep retrying within the timeout @@ -365,6 +378,7 @@ export class P2PClient const txs = await this.p2pService.sendBatchRequest( ReqRespSubProtocol.TX, txHashes, + pinnedPeerId, timeoutMs, maxPeers, maxRetryAttempts, @@ -462,7 +476,7 @@ export class P2PClient * @param txHashes - Hashes of the transactions to look for. * @returns The txs found, or undefined if not found in the order requested. */ - async getTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> { + async getTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId | undefined): Promise<(Tx | undefined)[]> { const txs = await Promise.all(txHashes.map(txHash => this.txPool.getTxByHash(txHash))); const missingTxHashes = txs .map((tx, index) => [tx, index] as const) @@ -473,7 +487,7 @@ export class P2PClient return txs as Tx[]; } - const missingTxs = await this.requestTxsByHash(missingTxHashes); + const missingTxs = await this.requestTxsByHash(missingTxHashes, pinnedPeerId); const fetchedMissingTxs = missingTxs.filter((tx): tx is Tx => !!tx); // TODO: optimize @@ -678,7 +692,7 @@ export class P2PClient `Requesting ${missingTxHashes.length} missing txs from peers for ${unprovenBlocks.length} unproven mined blocks`, { missingTxHashes, unprovenBlockNumbers: unprovenBlocks.map(block => block.number) }, ); - await this.requestTxsByHash(missingTxHashes); + await this.requestTxsByHash(missingTxHashes, undefined); } } catch (err) { this.log.error(`Error requesting missing txs from unproven blocks`, err, { diff --git a/yarn-project/p2p/src/index.ts b/yarn-project/p2p/src/index.ts index 3e25b0f31ba7..97489a5b032a 100644 --- a/yarn-project/p2p/src/index.ts +++ b/yarn-project/p2p/src/index.ts @@ -1,3 +1,5 @@ +export type { PeerId } from '@libp2p/interface'; + export * from './bootstrap/bootstrap.js'; export * from './client/index.js'; export * from './enr/index.js'; diff --git a/yarn-project/p2p/src/services/dummy_service.ts b/yarn-project/p2p/src/services/dummy_service.ts index 74a0f6409b80..6c759ab28c72 100644 --- a/yarn-project/p2p/src/services/dummy_service.ts +++ b/yarn-project/p2p/src/services/dummy_service.ts @@ -51,7 +51,9 @@ export class DummyP2PService implements P2PService { /** * Register a callback into the validator client for when a block proposal is received */ - public registerBlockReceivedCallback(_: (block: BlockProposal) => Promise) {} + public registerBlockReceivedCallback( + _callback: (block: BlockProposal, sender: PeerId) => Promise, + ) {} /** * Sends a request to a peer. diff --git a/yarn-project/p2p/src/services/index.ts b/yarn-project/p2p/src/services/index.ts index 41c15c7624b6..d65b3dc266e6 100644 --- a/yarn-project/p2p/src/services/index.ts +++ b/yarn-project/p2p/src/services/index.ts @@ -1,2 +1,3 @@ export * from './service.js'; export * from './libp2p/libp2p_service.js'; +export * from './tx_collector.js'; diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 620e99634d6a..1accf24e5d3b 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -63,7 +63,7 @@ import { DEFAULT_SUB_PROTOCOL_VALIDATORS, ReqRespSubProtocol, type SubProtocolMa import { reqGoodbyeHandler } from '../reqresp/protocols/goodbye.js'; import { pingHandler, reqRespBlockHandler, reqRespTxHandler, statusHandler } from '../reqresp/protocols/index.js'; import { ReqResp } from '../reqresp/reqresp.js'; -import type { P2PService, PeerDiscoveryService } from '../service.js'; +import type { P2PBlockReceivedCallback, P2PService, PeerDiscoveryService } from '../service.js'; interface ValidationResult { name: string; @@ -101,7 +101,7 @@ export class LibP2PService extends * @param block - The block received from the peer. * @returns The attestation for the block, if any. */ - private blockReceivedCallback: (block: BlockProposal) => Promise; + private blockReceivedCallback: P2PBlockReceivedCallback; private gossipSubEventHandler: (e: CustomEvent) => void; @@ -446,8 +446,9 @@ export class LibP2PService extends sendBatchRequest( protocol: SubProtocol, requests: InstanceType[], + pinnedPeerId: PeerId | undefined, ): Promise<(InstanceType | undefined)[]> { - return this.reqresp.sendBatchRequest(protocol, requests); + return this.reqresp.sendBatchRequest(protocol, requests, pinnedPeerId); } /** @@ -458,9 +459,8 @@ export class LibP2PService extends return this.peerDiscoveryService.getEnr(); } - public registerBlockReceivedCallback(callback: (block: BlockProposal) => Promise) { + public registerBlockReceivedCallback(callback: P2PBlockReceivedCallback) { this.blockReceivedCallback = callback; - this.logger.verbose('Block received callback registered'); } /** @@ -610,7 +610,7 @@ export class LibP2PService extends if (!result || !block) { return; } - await this.processValidBlockProposal(block); + await this.processValidBlockProposal(block, source); } // REVIEW: callback pattern https://github.com/AztecProtocol/aztec-packages/issues/7963 @@ -620,7 +620,7 @@ export class LibP2PService extends [Attributes.BLOCK_ARCHIVE]: block.archive.toString(), [Attributes.P2P_ID]: await block.p2pMessageIdentifier().then(i => i.toString()), })) - private async processValidBlockProposal(block: BlockProposal) { + private async processValidBlockProposal(block: BlockProposal, sender: PeerId) { this.logger.verbose( `Received block ${block.blockNumber.toNumber()} for slot ${block.slotNumber.toNumber()} from external peer.`, { @@ -632,7 +632,7 @@ export class LibP2PService extends ); // Mark the txs in this proposal as non-evictable await this.mempools.txPool.markTxsAsNonEvictable(block.payload.txHashes); - const attestation = await this.blockReceivedCallback(block); + const attestation = await this.blockReceivedCallback(block, sender); // TODO: fix up this pattern - the abstraction is not nice // The attestation can be undefined if no handler is registered / the validator deems the block invalid diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts index 5fbe09950c76..9358a7a50d47 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/batch_connection_sampler.ts @@ -26,6 +26,7 @@ export class BatchConnectionSampler { private readonly connectionSampler: ConnectionSampler, batchSize: number, maxPeers: number, + exclude?: PeerId[], ) { if (maxPeers <= 0) { throw new Error('Max peers cannot be 0'); @@ -38,7 +39,8 @@ export class BatchConnectionSampler { this.requestsPerPeer = Math.max(1, Math.floor(batchSize / maxPeers)); // Sample initial peers - this.batch = this.connectionSampler.samplePeersBatch(maxPeers); + const excluding = exclude && new Map(exclude.map(peerId => [peerId.toString(), true] as const)); + this.batch = this.connectionSampler.samplePeersBatch(maxPeers, excluding); } /** @@ -70,7 +72,7 @@ export class BatchConnectionSampler { } const excluding = new Map([[peerId.toString(), true]]); - const newPeer = this.connectionSampler.getPeer(excluding); + const newPeer = this.connectionSampler.getPeer(excluding); // Q: Shouldn't we accumulate all excluded peers? Otherwise the sampler could return us a previously excluded peer? if (newPeer) { this.batch[index] = newPeer; diff --git a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts index e53d3bc55f81..727a64e73b81 100644 --- a/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts +++ b/yarn-project/p2p/src/services/reqresp/connection-sampler/connection_sampler.ts @@ -137,9 +137,10 @@ export class ConnectionSampler { * Samples a batch of unique peers from the libp2p node, prioritizing peers without active connections * * @param numberToSample - The number of peers to sample + * @param excluding - The peers to exclude from the sampling * @returns Array of unique sampled peers, prioritizing those without active connections */ - samplePeersBatch(numberToSample: number): PeerId[] { + samplePeersBatch(numberToSample: number, excluding?: Map): PeerId[] { const peers = this.libp2p.getPeers(); this.logger.debug('Sampling peers batch', { numberToSample, peers }); @@ -149,7 +150,7 @@ export class ConnectionSampler { const batch: PeerId[] = []; const withActiveConnections: Set = new Set(); for (let i = 0; i < numberToSample; i++) { - const { peer, sampledPeers } = this.getPeerFromList(peers, undefined); + const { peer, sampledPeers } = this.getPeerFromList(peers, excluding); if (peer) { batch.push(peer); } diff --git a/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts index b862196b4071..782021be1daa 100644 --- a/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts +++ b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limiter.ts @@ -8,7 +8,7 @@ import { PeerErrorSeverity } from '@aztec/stdlib/p2p'; import type { PeerId } from '@libp2p/interface'; import type { PeerScoring } from '../../peer-manager/peer_scoring.js'; -import type { ReqRespSubProtocol, ReqRespSubProtocolRateLimits } from '../interface.js'; +import type { ProtocolRateLimitQuota, ReqRespSubProtocol, ReqRespSubProtocolRateLimits } from '../interface.js'; import { DEFAULT_RATE_LIMITS } from './rate_limits.js'; // Check for disconnected peers every 10 minutes @@ -177,16 +177,18 @@ export class SubProtocolRateLimiter { */ export class RequestResponseRateLimiter { private subProtocolRateLimiters: Map; + private rateLimits: ReqRespSubProtocolRateLimits; private cleanupInterval: NodeJS.Timeout | undefined = undefined; constructor( private peerScoring: PeerScoring, - rateLimits: ReqRespSubProtocolRateLimits = DEFAULT_RATE_LIMITS, + rateLimits: Partial = {}, ) { this.subProtocolRateLimiters = new Map(); - for (const [subProtocol, protocolLimits] of Object.entries(rateLimits)) { + this.rateLimits = { ...DEFAULT_RATE_LIMITS, ...rateLimits }; + for (const [subProtocol, protocolLimits] of Object.entries(this.rateLimits)) { this.subProtocolRateLimiters.set( subProtocol as ReqRespSubProtocol, new SubProtocolRateLimiter( @@ -228,4 +230,8 @@ export class RequestResponseRateLimiter { stop() { clearInterval(this.cleanupInterval); } + + getRateLimits(protocol: ReqRespSubProtocol): ProtocolRateLimitQuota { + return this.rateLimits[protocol]; + } } diff --git a/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts index db3585300850..75fcd33c7b8e 100644 --- a/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts +++ b/yarn-project/p2p/src/services/reqresp/rate-limiter/rate_limits.ts @@ -29,7 +29,7 @@ export const DEFAULT_RATE_LIMITS: ReqRespSubProtocolRateLimits = { }, globalLimit: { quotaTimeMs: 1000, - quotaCount: 20, + quotaCount: 200, }, }, [ReqRespSubProtocol.BLOCK]: { diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts index 1ddf4abadd12..ea0275067676 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.test.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.test.ts @@ -1,3 +1,4 @@ +import { times } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/fields'; import { createLogger } from '@aztec/foundation/log'; import { sleep } from '@aztec/foundation/sleep'; @@ -153,7 +154,7 @@ describe('ReqResp', () => { }); describe('Tx req protocol', () => { - it('Can request a Tx from TxHash', async () => { + it('can request a Tx from TxHash', async () => { const tx = await mockTx(); const txHash = await tx.getTxHash(); @@ -179,7 +180,7 @@ describe('ReqResp', () => { expect(res).toEqual(tx); }); - it('Handle returning empty buffers', async () => { + it('handles returning empty buffers', async () => { const tx = await mockTx(); const txHash = await tx.getTxHash(); @@ -202,7 +203,7 @@ describe('ReqResp', () => { expect(res).toEqual(undefined); }); - it('Does not crash if tx hash returns undefined', async () => { + it('does not crash if tx hash returns undefined', async () => { const tx = await mockTx(); const txHash = await tx.getTxHash(); @@ -468,7 +469,7 @@ describe('ReqResp', () => { const requests = Array.from({ length: batchSize }, _ => RequestableBuffer.fromBuffer(Buffer.from(`ping`))); const expectResponses = Array.from({ length: batchSize }, _ => RequestableBuffer.fromBuffer(Buffer.from(`pong`))); - const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests); + const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests, undefined); expect(res).toEqual(expectResponses); // Expect one request to have been sent to each peer @@ -489,6 +490,52 @@ describe('ReqResp', () => { ); }); + it('should send a batch request with a pinned peer', async () => { + const batchSize = 9; + nodes = await createNodes(peerScoring, 4, { + // Bump rate limits so the pinned peer can respond + [ReqRespSubProtocol.PING]: { + peerLimit: { quotaTimeMs: 1000, quotaCount: 50 }, + globalLimit: { quotaTimeMs: 1000, quotaCount: 50 }, + }, + }); + + await startNodes(nodes); + await sleep(500); + await connectToPeers(nodes); + await sleep(500); + + const sendRequestToPeerSpy = jest.spyOn(nodes[0].req, 'sendRequestToPeer'); + + const requests = times(batchSize, i => RequestableBuffer.fromBuffer(Buffer.from(`ping${i}`))); + const expectResponses = times(batchSize, _ => RequestableBuffer.fromBuffer(Buffer.from(`pong`))); + + const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests, nodes[1].p2p.peerId); + expect(res).toEqual(expectResponses); + + // Expect pinned peer to have received all requests + for (let i = 0; i < batchSize; i++) { + expect(sendRequestToPeerSpy).toHaveBeenCalledWith( + expect.objectContaining({ publicKey: nodes[1].p2p.peerId.publicKey }), + ReqRespSubProtocol.PING, + Buffer.from(`ping${i}`), + ); + } + + // Expect at least one request to have been sent to each other peer + expect(sendRequestToPeerSpy).toHaveBeenCalledWith( + expect.objectContaining({ publicKey: nodes[2].p2p.peerId.publicKey }), + ReqRespSubProtocol.PING, + expect.any(Buffer), + ); + + expect(sendRequestToPeerSpy).toHaveBeenCalledWith( + expect.objectContaining({ publicKey: nodes[3].p2p.peerId.publicKey }), + ReqRespSubProtocol.PING, + expect.any(Buffer), + ); + }); + it('should stop after max retry attempts', async () => { const batchSize = 12; nodes = await createNodes(peerScoring, 3); @@ -506,7 +553,7 @@ describe('ReqResp', () => { RequestableBuffer.fromBuffer(Buffer.from(`pong`)), ); - const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests); + const res = await nodes[0].req.sendBatchRequest(ReqRespSubProtocol.PING, requests, undefined); expect(res).toEqual(expectResponses); // Check that we did detect hitting a rate limit diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index 3036c18ab630..815895eb908a 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -1,4 +1,5 @@ // @attribution: lodestar impl for inspiration +import { compactArray } from '@aztec/foundation/collection'; import { type Logger, createLogger } from '@aztec/foundation/log'; import { executeTimeout } from '@aztec/foundation/timer'; import { PeerErrorSeverity } from '@aztec/stdlib/p2p'; @@ -25,6 +26,7 @@ import { type ReqRespResponse, ReqRespSubProtocol, type ReqRespSubProtocolHandlers, + type ReqRespSubProtocolRateLimits, type ReqRespSubProtocolValidators, type SubProtocolMap, subProtocolMap, @@ -72,6 +74,7 @@ export class ReqResp { config: P2PReqRespConfig, private libp2p: Libp2p, private peerScoring: PeerScoring, + rateLimits: Partial = {}, telemetryClient: TelemetryClient = getTelemetryClient(), ) { this.logger = createLogger('p2p:reqresp'); @@ -79,7 +82,7 @@ export class ReqResp { this.overallRequestTimeoutMs = config.overallRequestTimeoutMs; this.individualRequestTimeoutMs = config.individualRequestTimeoutMs; - this.rateLimiter = new RequestResponseRateLimiter(peerScoring); + this.rateLimiter = new RequestResponseRateLimiter(peerScoring, rateLimits); // Connection sampler is used to sample our connected peers this.connectionSampler = new ConnectionSampler(libp2p); @@ -261,6 +264,7 @@ export class ReqResp { async sendBatchRequest( subProtocol: SubProtocol, requests: InstanceType[], + pinnedPeer: PeerId | undefined, timeoutMs = 10000, maxPeers = Math.max(10, Math.ceil(requests.length / 3)), maxRetryAttempts = 3, @@ -274,10 +278,15 @@ export class ReqResp { const pendingRequestIndices = new Set(requestBuffers.map((_, i) => i)); // Create batch sampler with the total number of requests and max peers - const batchSampler = new BatchConnectionSampler(this.connectionSampler, requests.length, maxPeers); + const batchSampler = new BatchConnectionSampler( + this.connectionSampler, + requests.length, + maxPeers, + compactArray([pinnedPeer]), // Exclude pinned peer from sampling, we will forcefully send all requests to it + ); - if (batchSampler.activePeerCount === 0) { - this.logger.debug('No active peers to send requests to'); + if (batchSampler.activePeerCount === 0 && !pinnedPeer) { + this.logger.warn('No active peers to send requests to'); return []; } @@ -308,6 +317,16 @@ export class ReqResp { requestBatches.get(peerAsString)!.indices.push(requestIndex); } + // If there is a pinned peer, we will always send every request to that peer + // We use the default limits for the subprotocol to avoid hitting the rate limiter + if (pinnedPeer) { + const limit = this.rateLimiter.getRateLimits(subProtocol).peerLimit.quotaCount; + requestBatches.set(pinnedPeer.toString(), { + peerId: pinnedPeer, + indices: Array.from(pendingRequestIndices.values()).slice(0, limit), + }); + } + // Make parallel requests for each peer's batch // A batch entry will look something like this: // PeerId0: [0, 1, 2, 3] @@ -323,6 +342,7 @@ export class ReqResp { const peerResults: { index: number; response: InstanceType }[] = []; for (const index of indices) { + this.logger.trace(`Sending request ${index} to peer ${peerAsString}`); const response = await this.sendRequestToPeer(peer, subProtocol, requestBuffers[index]); // Check the status of the response buffer diff --git a/yarn-project/p2p/src/services/service.ts b/yarn-project/p2p/src/services/service.ts index f735341b4fb8..fe0aaf6b49c8 100644 --- a/yarn-project/p2p/src/services/service.ts +++ b/yarn-project/p2p/src/services/service.ts @@ -13,6 +13,8 @@ export enum PeerDiscoveryState { STOPPED = 'stopped', } +export type P2PBlockReceivedCallback = (block: BlockProposal, sender: PeerId) => Promise; + /** * The interface for a P2P service implementation. */ @@ -57,13 +59,14 @@ export interface P2PService { sendBatchRequest( protocol: Protocol, requests: InstanceType[], + pinnedPeerId?: PeerId, timeoutMs?: number, maxPeers?: number, maxRetryAttempts?: number, ): Promise<(InstanceType | undefined)[]>; // Leaky abstraction: fix https://github.com/AztecProtocol/aztec-packages/issues/7963 - registerBlockReceivedCallback(callback: (block: BlockProposal) => Promise): void; + registerBlockReceivedCallback(callback: P2PBlockReceivedCallback): void; getEnr(): ENR | undefined; diff --git a/yarn-project/p2p/src/services/tx_collector.ts b/yarn-project/p2p/src/services/tx_collector.ts new file mode 100644 index 000000000000..7592bdd30ec7 --- /dev/null +++ b/yarn-project/p2p/src/services/tx_collector.ts @@ -0,0 +1,98 @@ +import { compactArray } from '@aztec/foundation/collection'; +import { type Logger, createLogger } from '@aztec/foundation/log'; +import type { BlockProposal } from '@aztec/stdlib/p2p'; +import type { Tx, TxHash } from '@aztec/stdlib/tx'; + +import type { P2PClient } from '../client/p2p_client.js'; + +export class TxCollector { + constructor( + private p2pClient: Pick< + P2PClient, + 'getTxsByHashFromPool' | 'hasTxsInPool' | 'getTxsByHash' | 'validate' | 'requestTxsByHash' + >, + private log: Logger = createLogger('p2p:tx-collector'), + ) {} + + async collectForBlockProposal( + proposal: BlockProposal, + peerWhoSentTheProposal: any, + ): Promise<{ txs: Tx[]; missing?: TxHash[] }> { + if (proposal.payload.txHashes.length === 0) { + this.log.verbose(`Received block proposal with no transactions, skipping transaction availability check`); + return { txs: [] }; + } + // Is this a new style proposal? + if (proposal.txs && proposal.txs.length > 0 && proposal.txs.length === proposal.payload.txHashes.length) { + // Yes, any txs that we already have we should use + this.log.info(`Using new style proposal with ${proposal.txs.length} transactions`); + + // Request from the pool based on the signed hashes in the payload + const hashesFromPayload = proposal.payload.txHashes; + const txsToUse = await this.p2pClient.getTxsByHashFromPool(hashesFromPayload); + + const missingTxs = txsToUse.filter(tx => tx === undefined).length; + if (missingTxs > 0) { + this.log.verbose( + `Missing ${missingTxs}/${hashesFromPayload.length} transactions in the tx pool, will attempt to take from the proposal`, + ); + } + + let usedFromProposal = 0; + + // Fill any holes with txs in the proposal, provided their hash matches the hash in the payload + for (let i = 0; i < txsToUse.length; i++) { + if (txsToUse[i] === undefined) { + // We don't have the transaction, take from the proposal, provided the hash is the same + const hashOfTxInProposal = await proposal.txs[i].getTxHash(); + if (hashOfTxInProposal.equals(hashesFromPayload[i])) { + // Hash is equal, we can use the tx from the proposal + txsToUse[i] = proposal.txs[i]; + usedFromProposal++; + } else { + this.log.warn( + `Unable to take tx: ${hashOfTxInProposal.toString()} from the proposal, it does not match payload hash: ${hashesFromPayload[ + i + ].toString()}`, + ); + } + } + } + + // See if we still have any holes, if there are then we were not successful and will try the old method + if (txsToUse.some(tx => tx === undefined)) { + this.log.warn(`Failed to use transactions from proposal. Falling back to old proposal logic`); + } else { + this.log.info( + `Successfully used ${usedFromProposal}/${hashesFromPayload.length} transactions from the proposal`, + ); + + await this.p2pClient.validate(txsToUse as Tx[]); + return { txs: txsToUse as Tx[] }; + } + } + + this.log.info(`Using old style proposal with ${proposal.payload.txHashes.length} transactions`); + + // Old style proposal, we will perform a request by hash from pool + // This will request from network any txs that are missing + const txHashes: TxHash[] = proposal.payload.txHashes; + + // This part is just for logging that we are requesting from the network + const availability = await this.p2pClient.hasTxsInPool(txHashes); + const notAvailable = availability.filter(availability => availability === false); + if (notAvailable.length) { + this.log.verbose( + `Missing ${notAvailable.length} transactions in the tx pool, will need to request from the network`, + ); + } + + // This will request from the network any txs that are missing + const retrievedTxs = await this.p2pClient.getTxsByHash(txHashes, peerWhoSentTheProposal); + const missingTxs = compactArray(retrievedTxs.map((tx, index) => (tx === undefined ? txHashes[index] : undefined))); + + await this.p2pClient.validate(retrievedTxs as Tx[]); + + return { txs: retrievedTxs as Tx[], missing: missingTxs }; + } +} diff --git a/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts b/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts index 5d7ef8cddc47..348ef8d46954 100644 --- a/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts +++ b/yarn-project/p2p/src/test-helpers/reqresp-nodes.ts @@ -33,6 +33,7 @@ import type { P2PReqRespConfig } from '../services/reqresp/config.js'; import { ReqRespSubProtocol, type ReqRespSubProtocolHandlers, + type ReqRespSubProtocolRateLimits, type ReqRespSubProtocolValidators, noopValidator, } from '../services/reqresp/interface.js'; @@ -172,8 +173,12 @@ export const MOCK_SUB_PROTOCOL_VALIDATORS: ReqRespSubProtocolValidators = { * @param numberOfNodes - the number of nodes to create * @returns An array of the created nodes */ -export const createNodes = (peerScoring: PeerScoring, numberOfNodes: number): Promise => { - return timesParallel(numberOfNodes, () => createReqResp(peerScoring)); +export const createNodes = ( + peerScoring: PeerScoring, + numberOfNodes: number, + rateLimits: Partial = {}, +): Promise => { + return timesParallel(numberOfNodes, () => createReqResp(peerScoring, rateLimits)); }; export const startNodes = async ( @@ -192,17 +197,17 @@ export const stopNodes = async (nodes: ReqRespNode[]): Promise => { }; // Create a req resp node, exposing the underlying p2p node -export const createReqResp = async (peerScoring: PeerScoring): Promise => { +export const createReqResp = async ( + peerScoring: PeerScoring, + rateLimits: Partial = {}, +): Promise => { const p2p = await createLibp2pNode(); const config: P2PReqRespConfig = { overallRequestTimeoutMs: 4000, individualRequestTimeoutMs: 2000, }; - const req = new ReqResp(config, p2p, peerScoring); - return { - p2p, - req, - }; + const req = new ReqResp(config, p2p, peerScoring, rateLimits); + return { p2p, req }; }; // Given a node list; hand shake all of the nodes with each other diff --git a/yarn-project/prover-node/src/prover-coordination/combined-prover-coordination.ts b/yarn-project/prover-node/src/prover-coordination/combined-prover-coordination.ts index ddbf86a40ca2..37a4b7fc52af 100644 --- a/yarn-project/prover-node/src/prover-coordination/combined-prover-coordination.ts +++ b/yarn-project/prover-node/src/prover-coordination/combined-prover-coordination.ts @@ -25,7 +25,7 @@ export interface TxSource { class P2PCoordinationPool implements CoordinationPool { constructor(private readonly p2p: P2P) {} getTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> { - return this.p2p.getTxsByHash(txHashes); + return this.p2p.getTxsByHash(txHashes, undefined); } hasTxsInPool(txHashes: TxHash[]): Promise { return this.p2p.hasTxsInPool(txHashes); diff --git a/yarn-project/txe/src/state_machine/dummy_p2p_client.ts b/yarn-project/txe/src/state_machine/dummy_p2p_client.ts index 1f6a7f8aef39..b045cdfe75cb 100644 --- a/yarn-project/txe/src/state_machine/dummy_p2p_client.ts +++ b/yarn-project/txe/src/state_machine/dummy_p2p_client.ts @@ -1,4 +1,4 @@ -import type { ENR, P2P, P2PConfig, P2PSyncState } from '@aztec/p2p'; +import type { ENR, P2P, P2PBlockReceivedCallback, P2PConfig, P2PSyncState } from '@aztec/p2p'; import type { L2BlockStreamEvent, L2Tips } from '@aztec/stdlib/block'; import type { PeerInfo } from '@aztec/stdlib/interfaces/server'; import type { BlockAttestation, BlockProposal } from '@aztec/stdlib/p2p'; @@ -29,7 +29,7 @@ export class DummyP2P implements P2P { throw new Error('DummyP2P does not implement "broadcastProposal"'); } - public registerBlockProposalHandler(_handler: (block: BlockProposal) => Promise): void { + public registerBlockProposalHandler(_handler: P2PBlockReceivedCallback): void { throw new Error('DummyP2P does not implement "registerBlockProposalHandler"'); } diff --git a/yarn-project/validator-client/src/validator.test.ts b/yarn-project/validator-client/src/validator.test.ts index c90b5c7f0ec3..59f055c3d70c 100644 --- a/yarn-project/validator-client/src/validator.test.ts +++ b/yarn-project/validator-client/src/validator.test.ts @@ -4,7 +4,7 @@ import { Secp256k1Signer } from '@aztec/foundation/crypto'; import { EthAddress } from '@aztec/foundation/eth-address'; import { Fr } from '@aztec/foundation/fields'; import { TestDateProvider, Timer } from '@aztec/foundation/timer'; -import type { P2P } from '@aztec/p2p'; +import type { P2P, PeerId } from '@aztec/p2p'; import type { L2Block, L2BlockSource } from '@aztec/stdlib/block'; import type { BlockProposal } from '@aztec/stdlib/p2p'; import { makeBlockAttestation, makeBlockProposal, makeHeader, mockTx } from '@aztec/stdlib/testing'; @@ -20,7 +20,6 @@ import { AttestationTimeoutError, BlockBuilderNotProvidedError, InvalidValidatorPrivateKeyError, - TransactionsNotAvailableError, } from './errors/validator.error.js'; import { ValidatorClient } from './validator.js'; @@ -52,111 +51,6 @@ describe('ValidatorClient', () => { validatorClient = ValidatorClient.new(config, epochCache, p2pClient, blockSource, dateProvider); }); - it('Should throw error if an invalid private key is provided', () => { - config.validatorPrivateKey = '0x1234567890123456789'; - expect(() => ValidatorClient.new(config, epochCache, p2pClient, blockSource, dateProvider)).toThrow( - InvalidValidatorPrivateKeyError, - ); - }); - - it('Should throw an error if re-execution is enabled but no block builder is provided', async () => { - config.validatorReexecute = true; - const fakeTx = await mockTx(); - p2pClient.getTxByHash.mockImplementation(() => Promise.resolve(fakeTx)); - const val = ValidatorClient.new(config, epochCache, p2pClient, blockSource, dateProvider); - await expect( - val.reExecuteTransactions(makeBlockProposal({ txs: [fakeTx], txHashes: [await fakeTx.getTxHash()] }), [fakeTx]), - ).rejects.toThrow(BlockBuilderNotProvidedError); - }); - - it('Should create a valid block proposal with txs', async () => { - const header = makeHeader(); - const archive = Fr.random(); - const txs = await Promise.all([Tx.random(), Tx.random(), Tx.random(), Tx.random(), Tx.random()]); - - const blockProposal = await validatorClient.createBlockProposal( - header.globalVariables.blockNumber, - header.toPropose(), - archive, - header.state, - txs, - { publishFullTxs: true }, - ); - - expect(blockProposal).toBeDefined(); - - const validatorAddress = EthAddress.fromString(validatorAccount.address); - expect(blockProposal?.getSender()).toEqual(validatorAddress); - - expect(blockProposal!.txs).toBeDefined(); - expect(blockProposal!.txs).toBe(txs); - }); - - it('Should create a valid block proposal without txs', async () => { - const header = makeHeader(); - const archive = Fr.random(); - const txs = await Promise.all([Tx.random(), Tx.random(), Tx.random(), Tx.random(), Tx.random()]); - - const blockProposal = await validatorClient.createBlockProposal( - header.globalVariables.blockNumber, - header.toPropose(), - archive, - header.state, - txs, - { publishFullTxs: false }, - ); - - expect(blockProposal).toBeDefined(); - - const validatorAddress = EthAddress.fromString(validatorAccount.address); - expect(blockProposal?.getSender()).toEqual(validatorAddress); - - expect(blockProposal!.txs).toBeUndefined(); - }); - - it('Should a timeout if we do not collect enough attestations in time', async () => { - const proposal = makeBlockProposal(); - - await expect(validatorClient.collectAttestations(proposal, 2, new Date(dateProvider.now() + 100))).rejects.toThrow( - AttestationTimeoutError, - ); - }); - - it('Should throw an error if the transactions are not available', async () => { - const proposal = makeBlockProposal(); - - // mock the p2pClient.getTxStatus to return undefined for all transactions - p2pClient.getTxStatus.mockResolvedValue(undefined); - p2pClient.hasTxsInPool.mockImplementation(txHashes => Promise.resolve(times(txHashes.length, () => false))); - p2pClient.getTxsByHash.mockImplementation(txHashes => Promise.resolve(times(txHashes.length, () => undefined))); - // Mock the p2pClient.requestTxs to return undefined for all transactions - p2pClient.requestTxsByHash.mockImplementation(() => Promise.resolve([undefined])); - - await expect(validatorClient.ensureTransactionsAreAvailable(proposal)).rejects.toThrow( - TransactionsNotAvailableError, - ); - }); - - it('Should not return an attestation if re-execution fails', () => { - const proposal = makeBlockProposal(); - - // mock the p2pClient.getTxStatus to return undefined for all transactions - p2pClient.getTxStatus.mockResolvedValue(undefined); - p2pClient.hasTxsInPool.mockImplementation(txHashes => Promise.resolve(times(txHashes.length, () => false))); - epochCache.getProposerInCurrentOrNextSlot.mockResolvedValue({ - currentProposer: proposal.getSender(), - nextProposer: proposal.getSender(), - currentSlot: proposal.slotNumber.toBigInt(), - nextSlot: proposal.slotNumber.toBigInt() + 1n, - }); - epochCache.isInCommittee.mockResolvedValue(true); - - const val = ValidatorClient.new(config, epochCache, p2pClient, blockSource, dateProvider); - val.registerBlockBuilder(() => { - throw new Error('Failed to build block'); - }); - }); - describe('constructor', () => { it('should throw error if an invalid private key is provided', () => { config.validatorPrivateKey = '0x1234567890123456789'; @@ -178,7 +72,7 @@ describe('ValidatorClient', () => { }); describe('createBlockProposal', () => { - it('should create a valid block proposal', async () => { + it('should create a valid block proposal without txs', async () => { const header = makeHeader(); const archive = Fr.random(); const txs = await Promise.all([1, 2, 3, 4, 5].map(() => mockTx())); @@ -196,6 +90,29 @@ describe('ValidatorClient', () => { const validatorAddress = EthAddress.fromString(validatorAccount.address); expect(blockProposal?.getSender()).toEqual(validatorAddress); + expect(blockProposal!.txs).toBeUndefined(); + }); + + it('should create a valid block proposal with txs', async () => { + const header = makeHeader(); + const archive = Fr.random(); + const txs = await Promise.all([1, 2, 3, 4, 5].map(() => mockTx())); + + const blockProposal = await validatorClient.createBlockProposal( + header.globalVariables.blockNumber, + header.toPropose(), + archive, + header.state, + txs, + { publishFullTxs: true }, + ); + + expect(blockProposal).toBeDefined(); + + const validatorAddress = EthAddress.fromString(validatorAccount.address); + expect(blockProposal?.getSender()).toEqual(validatorAddress); + expect(blockProposal!.txs).toBeDefined(); + expect(blockProposal!.txs).toBe(txs); }); }); @@ -245,22 +162,18 @@ describe('ValidatorClient', () => { describe('attestToProposal', () => { let proposal: BlockProposal; + let sender: PeerId; + + const makeTxFromHash = (txHash: TxHash) => ({ getTxHash: () => Promise.resolve(txHash) }) as Tx; beforeEach(() => { proposal = makeBlockProposal(); + sender = { toString: () => 'proposal-sender-peer-id' } as PeerId; p2pClient.getTxStatus.mockResolvedValue('pending'); p2pClient.hasTxsInPool.mockImplementation(txHashes => Promise.resolve(times(txHashes.length, () => true))); - p2pClient.getTxByHash.mockImplementation((txHash: TxHash) => - Promise.resolve({ getTxHash: () => Promise.resolve(txHash) } as Tx), - ); - p2pClient.getTxsByHash.mockImplementation((txHashes: TxHash[]) => - Promise.resolve( - txHashes.map(txHash => { - return { getTxHash: () => Promise.resolve(txHash) } as Tx; - }), - ), - ); + p2pClient.getTxByHash.mockImplementation((txHash: TxHash) => Promise.resolve(makeTxFromHash(txHash))); + p2pClient.getTxsByHash.mockImplementation((txHashes: TxHash[]) => Promise.resolve(txHashes.map(makeTxFromHash))); epochCache.isInCommittee.mockResolvedValue(true); epochCache.getProposerInCurrentOrNextSlot.mockResolvedValue({ @@ -276,7 +189,7 @@ describe('ValidatorClient', () => { }); it('should attest to proposal', async () => { - const attestation = await validatorClient.attestToProposal(proposal); + const attestation = await validatorClient.attestToProposal(proposal, sender); expect(attestation).toBeDefined(); }); @@ -295,8 +208,25 @@ describe('ValidatorClient', () => { }), ); - const attestation = await validatorClient.attestToProposal(proposal); + const attestation = await validatorClient.attestToProposal(proposal, sender); + expect(attestation).toBeDefined(); + }); + + it('should request txs if missing for attesting', async () => { + p2pClient.hasTxsInPool.mockImplementation(txHashes => Promise.resolve(times(txHashes.length, i => i === 0))); + + const attestation = await validatorClient.attestToProposal(proposal, sender); expect(attestation).toBeDefined(); + expect(p2pClient.getTxsByHash).toHaveBeenCalledWith(proposal.payload.txHashes, sender); + }); + + it('should request txs even if not attestor in this slot', async () => { + p2pClient.hasTxsInPool.mockImplementation(txHashes => Promise.resolve(times(txHashes.length, () => false))); + epochCache.isInCommittee.mockResolvedValue(false); + + const attestation = await validatorClient.attestToProposal(proposal, sender); + expect(attestation).toBeUndefined(); + expect(p2pClient.getTxsByHash).toHaveBeenCalledWith(proposal.payload.txHashes, sender); }); it('should throw an error if the transactions are not available', async () => { @@ -307,11 +237,7 @@ describe('ValidatorClient', () => { // Mock the p2pClient.requestTxs to return undefined for all transactions p2pClient.requestTxsByHash.mockImplementation(() => Promise.resolve([undefined])); - await expect(validatorClient.ensureTransactionsAreAvailable(proposal)).rejects.toThrow( - TransactionsNotAvailableError, - ); - - const attestation = await validatorClient.attestToProposal(proposal); + const attestation = await validatorClient.attestToProposal(proposal, sender); expect(attestation).toBeUndefined(); }); @@ -321,14 +247,14 @@ describe('ValidatorClient', () => { throw new Error('Failed to build block'); }); - const attestation = await validatorClient.attestToProposal(proposal); + const attestation = await validatorClient.attestToProposal(proposal, sender); expect(attestation).toBeUndefined(); }); it('should not return an attestation if the validator is not in the committee', async () => { epochCache.isInCommittee.mockImplementation(() => Promise.resolve(false)); - const attestation = await validatorClient.attestToProposal(proposal); + const attestation = await validatorClient.attestToProposal(proposal, sender); expect(attestation).toBeUndefined(); }); @@ -342,7 +268,7 @@ describe('ValidatorClient', () => { }), ); - const attestation = await validatorClient.attestToProposal(proposal); + const attestation = await validatorClient.attestToProposal(proposal, sender); expect(attestation).toBeUndefined(); }); @@ -354,7 +280,7 @@ describe('ValidatorClient', () => { nextSlot: proposal.slotNumber.toBigInt() + 21n, }); - const attestation = await validatorClient.attestToProposal(proposal); + const attestation = await validatorClient.attestToProposal(proposal, sender); expect(attestation).toBeUndefined(); }); }); diff --git a/yarn-project/validator-client/src/validator.ts b/yarn-project/validator-client/src/validator.ts index 23bfed0f1d9d..d08c48f0a025 100644 --- a/yarn-project/validator-client/src/validator.ts +++ b/yarn-project/validator-client/src/validator.ts @@ -7,11 +7,11 @@ import { createLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; import { sleep } from '@aztec/foundation/sleep'; import { DateProvider, type Timer } from '@aztec/foundation/timer'; -import type { P2P } from '@aztec/p2p'; +import { type P2P, type PeerId, TxCollector } from '@aztec/p2p'; import { BlockProposalValidator } from '@aztec/p2p/msg_validators'; import type { L2Block, L2BlockSource } from '@aztec/stdlib/block'; import type { BlockAttestation, BlockProposal, BlockProposalOptions } from '@aztec/stdlib/p2p'; -import type { ProposedBlockHeader, StateReference, Tx, TxHash } from '@aztec/stdlib/tx'; +import type { ProposedBlockHeader, StateReference, Tx } from '@aztec/stdlib/tx'; import { type TelemetryClient, WithTracer, getTelemetryClient } from '@aztec/telemetry-client'; import type { ValidatorClientConfig } from './config.js'; @@ -61,7 +61,7 @@ export interface Validator { txs: Tx[], options: BlockProposalOptions, ): Promise; - attestToProposal(proposal: BlockProposal): Promise; + attestToProposal(proposal: BlockProposal, sender: PeerId): Promise; broadcastBlockProposal(proposal: BlockProposal): Promise; collectAttestations(proposal: BlockProposal, required: number, deadline: Date): Promise; @@ -85,6 +85,7 @@ export class ValidatorClient extends WithTracer implements Validator { private epochCacheUpdateLoop: RunningPromise; private blockProposalValidator: BlockProposalValidator; + private txCollector: TxCollector; constructor( private keyStore: ValidatorKeyStore, @@ -104,6 +105,8 @@ export class ValidatorClient extends WithTracer implements Validator { this.blockProposalValidator = new BlockProposalValidator(epochCache); + this.txCollector = new TxCollector(p2pClient, this.log); + // Refresh epoch cache every second to trigger alert if participation in committee changes this.myAddress = this.keyStore.getAddress(); this.epochCacheUpdateLoop = new RunningPromise(this.handleEpochCommitteeUpdate.bind(this), log, 1000); @@ -180,8 +183,8 @@ export class ValidatorClient extends WithTracer implements Validator { } public registerBlockProposalHandler() { - const handler = (block: BlockProposal): Promise => { - return this.attestToProposal(block); + const handler = (block: BlockProposal, proposalSender: any): Promise => { + return this.attestToProposal(block, proposalSender); }; this.p2pClient.registerBlockProposalHandler(handler); } @@ -195,7 +198,7 @@ export class ValidatorClient extends WithTracer implements Validator { this.blockBuilder = blockBuilder; } - async attestToProposal(proposal: BlockProposal): Promise { + async attestToProposal(proposal: BlockProposal, proposalSender: PeerId): Promise { const slotNumber = proposal.slotNumber.toNumber(); const blockNumber = proposal.blockNumber.toNumber(); const proposalInfo = { @@ -207,13 +210,8 @@ export class ValidatorClient extends WithTracer implements Validator { }; this.log.verbose(`Received request to attest for slot ${slotNumber}`); - // Check that I am in the committee - if (!(await this.epochCache.isInCommittee(this.keyStore.getAddress()))) { - this.log.verbose(`Not in the committee, skipping attestation`); - return undefined; - } - // Check that the proposal is from the current proposer, or the next proposer. + // Q: Should this be moved to the block proposal validator, so we disregard proposals from anyone? const invalidProposal = await this.blockProposalValidator.validate(proposal); if (invalidProposal) { this.log.verbose(`Proposal is not valid, skipping attestation`); @@ -244,34 +242,42 @@ export class ValidatorClient extends WithTracer implements Validator { } } + // Collect txs from the proposal + const { missing, txs } = await this.txCollector.collectForBlockProposal(proposal, proposalSender); + + // Check that I am in the committee before attesting + if (!(await this.epochCache.isInCommittee(this.keyStore.getAddress()))) { + this.log.verbose(`Not in the committee, skipping attestation`); + return undefined; + } + // Check that all of the transactions in the proposal are available in the tx pool before attesting - this.log.verbose(`Processing attestation for slot ${slotNumber}`, proposalInfo); - try { - const txs = await this.ensureTransactionsAreAvailable(proposal); + if (missing && missing.length > 0) { + this.log.error( + `Missing ${missing.length}/${proposal.payload.txHashes.length} txs to attest to proposal`, + undefined, + { proposalInfo, missing }, + ); + this.metrics.incFailedAttestations('TransactionsNotAvailableError'); + return undefined; + } + // Try re-executing the transactions in the proposal + try { + this.log.verbose(`Processing attestation for slot ${slotNumber}`, proposalInfo); if (this.config.validatorReexecute) { this.log.verbose(`Re-executing transactions in the proposal before attesting`); await this.reExecuteTransactions(proposal, txs); } } catch (error: any) { this.metrics.incFailedAttestations(error instanceof Error ? error.name : 'unknown'); - - // If the transactions are not available, then we should not attempt to attest - if (error instanceof TransactionsNotAvailableError) { - this.log.error(`Transactions not available, skipping attestation`, error, proposalInfo); - } else { - // This branch most commonly be hit if the transactions are available, but the re-execution fails - // Catch all error handler - this.log.error(`Failed to attest to proposal`, error, proposalInfo); - } + this.log.error(`Failed to attest to proposal`, error, proposalInfo); return undefined; } // Provided all of the above checks pass, we can attest to the proposal this.log.info(`Attesting to proposal for slot ${slotNumber}`, proposalInfo); this.metrics.incAttestations(); - - // If the above function does not throw an error, then we can attest to the proposal return this.doAttestToProposal(proposal); } @@ -320,105 +326,6 @@ export class ValidatorClient extends WithTracer implements Validator { } } - /** - * Ensure that all of the transactions in the proposal are available in the tx pool before attesting - * - * 1. Check if the local tx pool contains all of the transactions in the proposal - * 2. If any transactions are not in the local tx pool, request them from the network - * 3. If we cannot retrieve them from the network, throw an error - * @param proposal - The proposal to attest to - */ - async ensureTransactionsAreAvailable(proposal: BlockProposal): Promise { - if (proposal.payload.txHashes.length === 0) { - this.log.verbose(`Received block proposal with no transactions, skipping transaction availability check`); - return []; - } - // Is this a new style proposal? - if (proposal.txs && proposal.txs.length > 0 && proposal.txs.length === proposal.payload.txHashes.length) { - // Yes, any txs that we already have we should use - this.log.info(`Using new style proposal with ${proposal.txs.length} transactions`); - - // Request from the pool based on the signed hashes in the payload - const hashesFromPayload = proposal.payload.txHashes; - const txsToUse = await this.p2pClient.getTxsByHashFromPool(hashesFromPayload); - - const missingTxs = txsToUse.filter(tx => tx === undefined).length; - if (missingTxs > 0) { - this.log.verbose( - `Missing ${missingTxs}/${hashesFromPayload.length} transactions in the tx pool, will attempt to take from the proposal`, - ); - } - - let usedFromProposal = 0; - - // Fill any holes with txs in the proposal, provided their hash matches the hash in the payload - for (let i = 0; i < txsToUse.length; i++) { - if (txsToUse[i] === undefined) { - // We don't have the transaction, take from the proposal, provided the hash is the same - const hashOfTxInProposal = await proposal.txs[i].getTxHash(); - if (hashOfTxInProposal.equals(hashesFromPayload[i])) { - // Hash is equal, we can use the tx from the proposal - txsToUse[i] = proposal.txs[i]; - usedFromProposal++; - } else { - this.log.warn( - `Unable to take tx: ${hashOfTxInProposal.toString()} from the proposal, it does not match payload hash: ${hashesFromPayload[ - i - ].toString()}`, - ); - } - } - } - - // See if we still have any holes, if there are then we were not successful and will try the old method - if (txsToUse.some(tx => tx === undefined)) { - this.log.warn(`Failed to use transactions from proposal. Falling back to old proposal logic`); - } else { - this.log.info( - `Successfully used ${usedFromProposal}/${hashesFromPayload.length} transactions from the proposal`, - ); - - await this.p2pClient.validate(txsToUse as Tx[]); - return txsToUse as Tx[]; - } - } - - this.log.info(`Using old style proposal with ${proposal.payload.txHashes.length} transactions`); - - // Old style proposal, we will perform a request by hash from pool - // This will request from network any txs that are missing - const txHashes: TxHash[] = proposal.payload.txHashes; - - // This part is just for logging that we are requesting from the network - const availability = await this.p2pClient.hasTxsInPool(txHashes); - const notAvailable = availability.filter(availability => availability === false); - if (notAvailable.length) { - this.log.verbose( - `Missing ${notAvailable.length} transactions in the tx pool, will need to request from the network`, - ); - } - - // This will request from the network any txs that are missing - const retrievedTxs = await this.p2pClient.getTxsByHash(txHashes); - const missingTxs = retrievedTxs - .map((tx, index) => { - // Return the hash of any that we did not get - if (tx === undefined) { - return txHashes[index]; - } else { - return undefined; - } - }) - .filter(hash => hash !== undefined); - if (missingTxs.length > 0) { - throw new TransactionsNotAvailableError(missingTxs as TxHash[]); - } - - await this.p2pClient.validate(retrievedTxs as Tx[]); - - return retrievedTxs as Tx[]; - } - async createBlockProposal( blockNumber: Fr, header: ProposedBlockHeader,