diff --git a/yarn-project/p2p/src/client/test/p2p_client.integration_batch_txs.test.ts b/yarn-project/p2p/src/client/test/p2p_client.integration_batch_txs.test.ts index c30babeac1b7..2a935748e48d 100644 --- a/yarn-project/p2p/src/client/test/p2p_client.integration_batch_txs.test.ts +++ b/yarn-project/p2p/src/client/test/p2p_client.integration_batch_txs.test.ts @@ -22,6 +22,7 @@ import { BatchTxRequester } from '../../services/reqresp/batch-tx-requester/batc import type { BatchTxRequesterLibP2PService } from '../../services/reqresp/batch-tx-requester/interface.js'; import type { IBatchRequestTxValidator } from '../../services/reqresp/batch-tx-requester/tx_validator.js'; import type { ConnectionSampler } from '../../services/reqresp/connection-sampler/connection_sampler.js'; +import { MissingTxsTracker } from '../../services/tx_collection/missing_txs_tracker.js'; import { generatePeerIdPrivateKeys } from '../../test-helpers/generate-peer-id-private-keys.js'; import { getPorts } from '../../test-helpers/get-ports.js'; import { makeEnrs } from '../../test-helpers/make-enrs.js'; @@ -229,7 +230,7 @@ describe('p2p client integration batch txs', () => { mockP2PService.reqResp = (client0 as any).p2pService.reqresp; const requester = new BatchTxRequester( - missingTxHashes, + MissingTxsTracker.fromArray(missingTxHashes), blockProposal, undefined, // no pinned peer 5_000, diff --git a/yarn-project/p2p/src/client/test/tx_proposal_collector/proposal_tx_collector_worker.ts b/yarn-project/p2p/src/client/test/tx_proposal_collector/proposal_tx_collector_worker.ts index fa270e79e6dd..45fba6b2d92a 100644 --- a/yarn-project/p2p/src/client/test/tx_proposal_collector/proposal_tx_collector_worker.ts +++ b/yarn-project/p2p/src/client/test/tx_proposal_collector/proposal_tx_collector_worker.ts @@ -16,13 +16,11 @@ import type { PeerId } from '@libp2p/interface'; import { peerIdFromString } from '@libp2p/peer-id'; import type { P2PConfig } from '../../../config.js'; +import { BatchTxRequesterCollector, SendBatchRequestCollector } from '../../../services/index.js'; import type { IBatchRequestTxValidator } from '../../../services/reqresp/batch-tx-requester/tx_validator.js'; import { RateLimitStatus } from '../../../services/reqresp/rate-limiter/rate_limiter.js'; -import { - BatchTxRequesterCollector, - SendBatchRequestCollector, -} from '../../../services/tx_collection/proposal_tx_collector.js'; -import { AlwaysTrueCircuitVerifier } from '../../../test-helpers/reqresp-nodes.js'; +import { MissingTxsTracker } from '../../../services/tx_collection/missing_txs_tracker.js'; +import { AlwaysTrueCircuitVerifier } from '../../../test-helpers/index.js'; import { BENCHMARK_CONSTANTS, InMemoryAttestationPool, @@ -31,7 +29,7 @@ import { calculateInternalTimeout, createMockEpochCache, createMockWorldStateSynchronizer, -} from '../../../test-helpers/testbench-utils.js'; +} from '../../../test-helpers/index.js'; import { createP2PClient } from '../../index.js'; import type { P2PClient } from '../../p2p_client.js'; import { @@ -214,7 +212,13 @@ async function runCollector(cmd: Extract collector.collectTxs(parsedTxHashes, parsedProposal, pinnedPeer, internalTimeoutMs), + (_signal: AbortSignal) => + collector.collectTxs( + MissingTxsTracker.fromArray(parsedTxHashes), + parsedProposal, + pinnedPeer, + internalTimeoutMs, + ), timeoutMs, () => new Error(`Collector timed out after ${timeoutMs}ms`), ); @@ -226,7 +230,13 @@ async function runCollector(cmd: Extract collector.collectTxs(parsedTxHashes, parsedProposal, pinnedPeer, internalTimeoutMs), + (_signal: AbortSignal) => + collector.collectTxs( + MissingTxsTracker.fromArray(parsedTxHashes), + parsedProposal, + pinnedPeer, + internalTimeoutMs, + ), timeoutMs, () => new Error(`Collector timed out after ${timeoutMs}ms`), ); diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts index 8e09dd76bacb..03f37cad2482 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.test.ts @@ -17,6 +17,7 @@ import { describe, expect, it, jest } from '@jest/globals'; import { type MockProxy, mock } from 'jest-mock-extended'; import { createSecp256k1PeerId } from '../../../index.js'; +import { MissingTxsTracker } from '../../tx_collection/missing_txs_tracker.js'; import type { ConnectionSampler } from '../connection-sampler/connection_sampler.js'; import type { ReqRespInterface } from '../interface.js'; import { BitVector, BlockTxsRequest, BlockTxsResponse } from '../protocols/index.js'; @@ -95,7 +96,7 @@ describe('BatchTxRequester', () => { const clock = new TestClock(); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, undefined, deadline, @@ -150,7 +151,7 @@ describe('BatchTxRequester', () => { const clock = new TestClock(); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, undefined, deadline, @@ -276,7 +277,7 @@ describe('BatchTxRequester', () => { }); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, undefined, deadline, @@ -330,7 +331,7 @@ describe('BatchTxRequester', () => { const semaphore = new TestSemaphore(new Semaphore(0)); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, undefined, deadline, @@ -382,7 +383,7 @@ describe('BatchTxRequester', () => { const semaphore = new TestSemaphore(new Semaphore(0)); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, @@ -449,7 +450,7 @@ describe('BatchTxRequester', () => { const semaphore = new TestSemaphore(new Semaphore(0)); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, @@ -505,7 +506,7 @@ describe('BatchTxRequester', () => { const semaphore = new TestSemaphore(new Semaphore(0)); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, @@ -561,7 +562,7 @@ describe('BatchTxRequester', () => { ); reqResp.sendRequestToPeer.mockImplementation(mockImplementation); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, @@ -635,7 +636,7 @@ describe('BatchTxRequester', () => { }); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, @@ -749,7 +750,7 @@ describe('BatchTxRequester', () => { }); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, @@ -885,7 +886,7 @@ describe('BatchTxRequester', () => { }); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, @@ -960,7 +961,7 @@ describe('BatchTxRequester', () => { const clock = new TestClock(); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, undefined, shortDeadline, @@ -1018,7 +1019,7 @@ describe('BatchTxRequester', () => { }); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, undefined, deadline, @@ -1090,7 +1091,7 @@ describe('BatchTxRequester', () => { }); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, undefined, deadline, @@ -1143,7 +1144,7 @@ describe('BatchTxRequester', () => { // Create semaphore that starts with 0 permits to block smart workers const semaphore = new TestSemaphore(new Semaphore(0)); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, undefined, deadline, @@ -1217,7 +1218,7 @@ describe('BatchTxRequester', () => { reqResp.sendRequestToPeer.mockImplementation(mockImplementation); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, undefined, deadline, @@ -1298,7 +1299,7 @@ describe('BatchTxRequester', () => { reqResp.sendRequestToPeer.mockImplementation(mockImplementation); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, undefined, deadline, @@ -1369,7 +1370,7 @@ describe('BatchTxRequester', () => { reqResp.sendRequestToPeer.mockImplementation(mockImplementation); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, undefined, deadline, @@ -1401,6 +1402,62 @@ describe('BatchTxRequester', () => { }); }); + describe('External fetching', () => { + it('should not request transactions that were marked as fetched externally', async () => { + const txCount = 16; + const deadline = 5_000; + const missing = Array.from({ length: txCount }, () => TxHash.random()); + + blockProposal = await makeBlockProposal({ + signer: Secp256k1Signer.random(), + blockHeader: makeBlockHeader(1, { blockNumber: BlockNumber(1) }), + archiveRoot: Fr.random(), + txHashes: missing, + }); + + const peer = await createSecp256k1PeerId(); + connectionSampler.getPeerListSortedByConnectionCountAsc.mockReturnValue([peer]); + + const tracker = MissingTxsTracker.fromArray(missing); + + // Peer has only first half of transactions + const peerTransactions = new Map([[peer.toString(), Array.from({ length: TX_BATCH_SIZE }, (_, i) => i)]]); + const { requestLog, mockImplementation } = createRequestLogger(blockProposal, new Set(), peerTransactions); + reqResp.sendRequestToPeer.mockImplementation(mockImplementation); + + // Create requester first + const requester = new BatchTxRequester( + tracker, + blockProposal, + undefined, + deadline, + mockP2PService, + logger, + new DateProvider(), + { + smartParallelWorkerCount: 0, + dumbParallelWorkerCount: 1, + txValidator, + }, + ); + + // Mark transactions 8-15 as fetched externally after creating requester + for (let i = TX_BATCH_SIZE; i < txCount; i++) { + tracker.markFetched(makeTx(missing[i])); + } + + // Run and collect results + const result = await BatchTxRequester.collectAllTxs(requester.run()); + + // Verify only transactions 0-7 were requested (indices 8-15 were marked fetched) + const allRequestedIndices = requestLog.get(peer.toString())?.flatMap(r => r.indices) || []; + const requestedExternallyFetched = allRequestedIndices.filter(idx => idx >= TX_BATCH_SIZE); + + expect(requestedExternallyFetched).toEqual([]); + expect(result.length).toBe(TX_BATCH_SIZE); + }); + }); + describe('Pinned peer functionality', () => { it('Should query pinned peer if available', async () => { const txCount = 10; @@ -1430,7 +1487,7 @@ describe('BatchTxRequester', () => { reqResp.sendRequestToPeer.mockImplementation(mockImplementation); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, @@ -1480,7 +1537,7 @@ describe('BatchTxRequester', () => { reqResp.sendRequestToPeer.mockImplementation(mockImplementation); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, @@ -1561,7 +1618,7 @@ describe('BatchTxRequester', () => { }); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, @@ -1615,7 +1672,7 @@ describe('BatchTxRequester', () => { reqResp.sendRequestToPeer.mockImplementation(mockImplementation); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, @@ -1671,7 +1728,7 @@ describe('BatchTxRequester', () => { reqResp.sendRequestToPeer.mockImplementation(mockImplementation); const requester = new BatchTxRequester( - missing, + MissingTxsTracker.fromArray(missing), blockProposal, pinnedPeer, deadline, diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts index 2a2f8ca60cc7..309b41c3f4bd 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/batch_tx_requester.ts @@ -10,6 +10,7 @@ import { Tx, TxArray, TxHash } from '@aztec/stdlib/tx'; import type { PeerId } from '@libp2p/interface'; import { peerIdFromString } from '@libp2p/peer-id'; +import type { IMissingTxsTracker } from '../../tx_collection/missing_txs_tracker.js'; import { ReqRespSubProtocol } from '.././interface.js'; import { BlockTxsRequest, BlockTxsResponse, type BlockTxsSource } from '.././protocols/index.js'; import { ReqRespStatus } from '.././status.js'; @@ -20,7 +21,7 @@ import { DEFAULT_BATCH_TX_REQUESTER_TX_BATCH_SIZE, } from './config.js'; import type { BatchTxRequesterLibP2PService, BatchTxRequesterOptions, ITxMetadataCollection } from './interface.js'; -import { MissingTxMetadata, MissingTxMetadataCollection } from './missing_txs.js'; +import { MissingTxMetadataCollection } from './missing_txs.js'; import { type IPeerCollection, PeerCollection } from './peer_collection.js'; import { BatchRequestTxValidator, type IBatchRequestTxValidator } from './tx_validator.js'; @@ -60,7 +61,7 @@ export class BatchTxRequester { private readonly txBatchSize: number; constructor( - missingTxs: TxHash[], + missingTxsTracker: IMissingTxsTracker, blockTxsSource: BlockTxsSource, pinnedPeer: PeerId | undefined, timeoutMs: number, @@ -99,8 +100,7 @@ export class BatchTxRequester { this.p2pService.peerScoring, ); } - const entries: Array<[string, MissingTxMetadata]> = missingTxs.map(h => [h.toString(), new MissingTxMetadata(h)]); - this.txsMetadata = new MissingTxMetadataCollection(entries, this.txBatchSize); + this.txsMetadata = new MissingTxMetadataCollection(missingTxsTracker, this.txBatchSize); this.smartRequesterSemaphore = this.opts.semaphore ?? new Semaphore(0); } @@ -661,7 +661,7 @@ export class BatchTxRequester { /* * @returns true if all missing txs have been fetched */ private fetchedAllTxs() { - return Array.from(this.txsMetadata.values()).every(tx => tx.fetched); + return this.txsMetadata.getMissingTxHashes().size == 0; } /* @@ -679,7 +679,7 @@ export class BatchTxRequester { this.unlockSmartRequesterSemaphores(); } - return aborted || this.txsMetadata.size === 0 || this.fetchedAllTxs() || this.dateProvider.now() > this.deadline; + return aborted || this.fetchedAllTxs() || this.dateProvider.now() > this.deadline; } /* diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/interface.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/interface.ts index 811c77dff630..f3e895b2d7a9 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/interface.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/interface.ts @@ -6,7 +6,6 @@ import type { PeerId } from '@libp2p/interface'; import type { ConnectionSampler } from '../connection-sampler/connection_sampler.js'; import type { ReqRespInterface } from '../interface.js'; -import type { MissingTxMetadata } from './missing_txs.js'; import type { IPeerCollection } from './peer_collection.js'; import type { BatchRequestTxValidatorConfig, IBatchRequestTxValidator } from './tx_validator.js'; @@ -15,18 +14,15 @@ export interface IPeerPenalizer { } export interface ITxMetadataCollection { - size: number; - values(): IterableIterator; getMissingTxHashes(): Set; + markFetched(peerId: PeerId, tx: Tx): boolean; getTxsToRequestFromThePeer(peer: PeerId): TxHash[]; markRequested(txHash: TxHash): void; markInFlightBySmartPeer(txHash: TxHash): void; markNotInFlightBySmartPeer(txHash: TxHash): void; alreadyFetched(txHash: TxHash): boolean; // Returns true if tx was marked as fetched, false if it was already marked as fetched - markFetched(peerId: PeerId, tx: Tx): boolean; markPeerHas(peerId: PeerId, txHashes: TxHash[]): void; - getFetchedTxs(): Tx[]; } /** diff --git a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/missing_txs.ts b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/missing_txs.ts index 6ef3c2b3c722..56705595faf9 100644 --- a/yarn-project/p2p/src/services/reqresp/batch-tx-requester/missing_txs.ts +++ b/yarn-project/p2p/src/services/reqresp/batch-tx-requester/missing_txs.ts @@ -2,13 +2,13 @@ import { type Tx, TxHash } from '@aztec/stdlib/tx'; import type { PeerId } from '@libp2p/interface'; +import type { IMissingTxsTracker } from '../../tx_collection/missing_txs_tracker.js'; import { DEFAULT_BATCH_TX_REQUESTER_TX_BATCH_SIZE } from './config.js'; import type { ITxMetadataCollection } from './interface.js'; -export class MissingTxMetadata { +class MissingTxMetadata { constructor( - public readonly txHash: TxHash, - public fetched = false, + public readonly txHash: string, public requestedCount = 0, public inFlightCount = 0, public tx: Tx | undefined = undefined, @@ -30,24 +30,6 @@ export class MissingTxMetadata { public isInFlight(): boolean { return this.inFlightCount > 0; } - - //Returns true if this is the first time we mark it as fetched - public markAsFetched(peerId: PeerId, tx: Tx): boolean { - if (this.fetched) { - return false; - } - - this.fetched = true; - this.tx = tx; - - this.peers.add(peerId.toString()); - - return true; - } - - public toString() { - return this.txHash.toString(); - } } /* @@ -55,21 +37,18 @@ export class MissingTxMetadata { * This could be better optimized but given expected count of missing txs (N < 100) * At the moment there is no need for it. And benefit is that we have everything in single store * */ -export class MissingTxMetadataCollection extends Map implements ITxMetadataCollection { +export class MissingTxMetadataCollection implements ITxMetadataCollection { + private txMetadata = new Map(); + constructor( - entries?: readonly (readonly [string, MissingTxMetadata])[] | null, + private missingTxsTracker: IMissingTxsTracker, private readonly txBatchSize: number = DEFAULT_BATCH_TX_REQUESTER_TX_BATCH_SIZE, ) { - super(entries); - } - public getSortedByRequestedCountAsc(txs: string[]): MissingTxMetadata[] { - return Array.from(this.values().filter(txMeta => txs.includes(txMeta.txHash.toString()))).sort( - (a, b) => a.requestedCount - b.requestedCount, - ); + missingTxsTracker.missingTxHashes.forEach(hash => this.txMetadata.set(hash, new MissingTxMetadata(hash))); } public getPrioritizingNotInFlightAndLowerRequestCount(txs: string[]): MissingTxMetadata[] { - const filtered = Array.from(this.values()).filter(txMeta => txs.includes(txMeta.txHash.toString())); + const filtered = Array.from(this.txMetadata.values()).filter(txMeta => txs.includes(txMeta.txHash.toString())); const [notInFlight, inFlight] = filtered.reduce<[MissingTxMetadata[], MissingTxMetadata[]]>( (buckets, tx) => { @@ -85,43 +64,15 @@ export class MissingTxMetadataCollection extends Map return [...notInFlight, ...inFlight]; } - public getFetchedTxHashes(): Set { - return new Set( - this.values() - .filter(t => t.fetched) - .map(t => t.txHash.toString()), - ); - } - public getMissingTxHashes(): Set { - return new Set( - this.values() - .filter(t => !t.fetched) - .map(t => t.txHash.toString()), - ); - } - - public getInFlightTxHashes(): Set { - return new Set( - this.values() - .filter(t => t.isInFlight()) - .map(t => t.txHash.toString()), - ); - } - - public getFetchedTxs(): Tx[] { - return Array.from( - this.values() - .map(t => t.tx) - .filter(t => !!t), - ); + return this.missingTxsTracker.missingTxHashes; } public getTxsPeerHas(peer: PeerId): Set { const peerIdStr = peer.toString(); const txsPeerHas = new Set(); - this.values().forEach(txMeta => { + this.txMetadata.values().forEach(txMeta => { if (txMeta.peers.has(peerIdStr)) { txsPeerHas.add(txMeta.txHash.toString()); } @@ -132,13 +83,13 @@ export class MissingTxMetadataCollection extends Map public getTxsToRequestFromThePeer(peer: PeerId): TxHash[] { const txsPeerHas = this.getTxsPeerHas(peer); - const fetchedTxs = this.getFetchedTxHashes(); + const missingTxHashes = this.getMissingTxHashes(); - const txsToRequest = txsPeerHas.difference(fetchedTxs); + const txsToRequest = txsPeerHas.intersection(missingTxHashes); if (txsToRequest.size >= this.txBatchSize) { return this.getPrioritizingNotInFlightAndLowerRequestCount(Array.from(txsToRequest)) - .map(t => t.txHash) + .map(t => TxHash.fromString(t.txHash)) .slice(0, this.txBatchSize); } @@ -150,13 +101,13 @@ export class MissingTxMetadataCollection extends Map Array.from(this.getMissingTxHashes().difference(txsToRequest)), ) .slice(0, countToFill) - .map(t => t.txHash); + .map(t => TxHash.fromString(t.txHash)); return [...Array.from(txsToRequest).map(t => TxHash.fromString(t)), ...txsToFill]; } public markRequested(txHash: TxHash) { - this.get(txHash.toString())?.markAsRequested(); + this.txMetadata.get(txHash.toString())?.markAsRequested(); } /* @@ -165,7 +116,7 @@ export class MissingTxMetadataCollection extends Map * "dumb" peer might return it, or might not - we don't know * */ public markInFlightBySmartPeer(txHash: TxHash) { - this.get(txHash.toString())?.markInFlight(); + this.txMetadata.get(txHash.toString())?.markInFlight(); } /* @@ -173,16 +124,16 @@ export class MissingTxMetadataCollection extends Map * Because the smart peer should return this tx, whereas * "dumb" peer might return it, or might not - we don't know*/ public markNotInFlightBySmartPeer(txHash: TxHash) { - this.get(txHash.toString())?.markNotInFlight(); + this.txMetadata.get(txHash.toString())?.markNotInFlight(); } public alreadyFetched(txHash: TxHash): boolean { - return this.get(txHash.toString())?.fetched ?? false; + return !this.missingTxsTracker.isMissing(txHash.toString()); } public markFetched(peerId: PeerId, tx: Tx): boolean { const txHashStr = tx.txHash.toString(); - const txMeta = this.get(txHashStr); + const txMeta = this.txMetadata.get(txHashStr); if (!txMeta) { //TODO: what to do about peer which sent txs we didn't request? // 1. don't request from it in the scope of this batch request @@ -192,7 +143,8 @@ export class MissingTxMetadataCollection extends Map return false; } - return txMeta.markAsFetched(peerId, tx); + txMeta.peers.add(peerId.toString()); + return this.missingTxsTracker.markFetched(tx); } public markPeerHas(peerId: PeerId, txHash: TxHash[]) { @@ -200,7 +152,7 @@ export class MissingTxMetadataCollection extends Map txHash .map(t => t.toString()) .forEach(txh => { - const txMeta = this.get(txh); + const txMeta = this.txMetadata.get(txh); if (txMeta) { txMeta.peers.add(peerIdStr); } diff --git a/yarn-project/p2p/src/services/tx_collection/fast_tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/fast_tx_collection.ts index f9bb9f8e399c..ebd7b628f57a 100644 --- a/yarn-project/p2p/src/services/tx_collection/fast_tx_collection.ts +++ b/yarn-project/p2p/src/services/tx_collection/fast_tx_collection.ts @@ -14,6 +14,7 @@ import type { PeerId } from '@libp2p/interface'; import type { BatchTxRequesterConfig } from '../reqresp/batch-tx-requester/config.js'; import type { BatchTxRequesterLibP2PService } from '../reqresp/batch-tx-requester/interface.js'; import type { TxCollectionConfig } from './config.js'; +import { MissingTxsTracker } from './missing_txs_tracker.js'; import { BatchTxRequesterCollector, type MissingTxsCollector, @@ -83,8 +84,7 @@ export class FastTxCollection { ...input, blockInfo, promise, - foundTxs: new Map(), - missingTxHashes: new Set(txHashes.map(t => t.toString())), + missingTxTracker: MissingTxsTracker.fromArray(txHashes), deadline: opts.deadline, }; @@ -92,15 +92,15 @@ export class FastTxCollection { clearTimeout(timeoutTimer); this.log.verbose( - `Collected ${request.foundTxs.size} txs out of ${txHashes.length} for ${input.type} at slot ${blockInfo.slotNumber}`, + `Collected ${request.missingTxTracker.collectedTxs.length} txs out of ${txHashes.length} for ${input.type} at slot ${blockInfo.slotNumber}`, { ...blockInfo, duration, requestType: input.type, - missingTxs: [...request.missingTxHashes], + missingTxs: [...request.missingTxTracker.missingTxHashes], }, ); - return [...request.foundTxs.values()]; + return request.missingTxTracker.collectedTxs; } protected async collectFast( @@ -111,7 +111,7 @@ export class FastTxCollection { const { blockInfo } = request; this.log.debug( - `Starting fast collection of ${request.missingTxHashes.size} txs for ${request.type} at slot ${blockInfo.slotNumber}`, + `Starting fast collection of ${request.missingTxTracker.numberOfMissingTxs} txs for ${request.type} at slot ${blockInfo.slotNumber}`, { ...blockInfo, requestType: request.type, deadline: opts.deadline }, ); @@ -124,7 +124,7 @@ export class FastTxCollection { await Promise.race([request.promise.promise, waitBeforeReqResp]); // If we have collected all txs, we can stop here - if (request.missingTxHashes.size === 0) { + if (request.missingTxTracker.allFetched()) { this.log.debug(`All txs collected for slot ${blockInfo.slotNumber} without reqresp`, blockInfo); return; } @@ -138,7 +138,7 @@ export class FastTxCollection { const logCtx = { ...blockInfo, errorMessage: err instanceof Error ? err.message : undefined, - missingTxs: [...request.missingTxHashes].map(txHash => txHash.toString()), + missingTxs: request.missingTxTracker.missingTxHashes.values().map(txHash => txHash.toString()), }; if (err instanceof Error && err.name === 'TimeoutError') { this.log.warn(`Timed out collecting txs for ${request.type} at slot ${blockInfo.slotNumber}`, logCtx); @@ -166,7 +166,11 @@ export class FastTxCollection { } // Keep a shared priority queue of all txs pending to be requested, sorted by the number of attempts made to collect them. - const attemptsPerTx = [...request.missingTxHashes].map(txHash => ({ txHash, attempts: 0, found: false })); + const attemptsPerTx = [...request.missingTxTracker.missingTxHashes].map(txHash => ({ + txHash, + attempts: 0, + found: false, + })); // Returns once we have finished all node loops. Each loop finishes when the deadline is hit, or all txs have been collected. await Promise.allSettled(this.nodes.map(node => this.collectFastFromNode(request, node, attemptsPerTx, opts))); @@ -179,7 +183,7 @@ export class FastTxCollection { opts: { deadline: Date }, ) { const notFinished = () => - this.dateProvider.now() <= +opts.deadline && request.missingTxHashes.size > 0 && this.requests.has(request); + this.dateProvider.now() <= +opts.deadline && !request.missingTxTracker.allFetched() && this.requests.has(request); const maxParallelRequests = this.config.txCollectionFastMaxParallelRequestsPerNode; const maxBatchSize = this.config.txCollectionNodeRpcMaxBatchSize; @@ -196,7 +200,7 @@ export class FastTxCollection { if (!txToRequest) { // No more txs to process break; - } else if (!request.missingTxHashes.has(txToRequest.txHash)) { + } else if (!request.missingTxTracker.isMissing(txToRequest.txHash)) { // Mark as found if it was found somewhere else, we'll then remove it from the array. // We don't delete it now since 'array.splice' is pretty expensive, so we do it after sorting. txToRequest.found = true; @@ -225,10 +229,17 @@ export class FastTxCollection { return; } + const txHashes = batch.map(({ txHash }) => txHash); // Collect this batch from the node await this.txCollectionSink.collect( - txHashes => node.getTxsByHash(txHashes), - batch.map(({ txHash }) => TxHash.fromString(txHash)), + async () => { + const result = await node.getTxsByHash(txHashes.map(TxHash.fromString)); + for (const tx of result.validTxs) { + request.missingTxTracker.markFetched(tx); + } + return result; + }, + txHashes, { description: `fast ${node.getInfo()}`, node: node.getInfo(), @@ -268,32 +279,44 @@ export class FastTxCollection { } this.log.debug( - `Starting fast reqresp for ${request.missingTxHashes.size} txs for ${request.type} at slot ${blockInfo.slotNumber}`, + `Starting fast reqresp for ${request.missingTxTracker.numberOfMissingTxs} txs for ${request.type} at slot ${blockInfo.slotNumber}`, { ...blockInfo, timeoutMs, pinnedPeer }, ); try { await this.txCollectionSink.collect( - async txHashes => { + async () => { + let result: Tx[]; if (request.type === 'proposal') { - return await this.missingTxsCollector.collectTxs(txHashes, request.blockProposal, pinnedPeer, timeoutMs); + result = await this.missingTxsCollector.collectTxs( + request.missingTxTracker, + request.blockProposal, + pinnedPeer, + timeoutMs, + ); } else if (request.type === 'block') { const blockTxsSource = { txHashes: request.block.body.txEffects.map(e => e.txHash), archive: request.block.archive.root, }; - return await this.missingTxsCollector.collectTxs(txHashes, blockTxsSource, pinnedPeer, timeoutMs); + result = await this.missingTxsCollector.collectTxs( + request.missingTxTracker, + blockTxsSource, + pinnedPeer, + timeoutMs, + ); } else { throw new Error(`Unknown request type: ${(request as any).type}`); } + return { validTxs: result, invalidTxHashes: [] }; }, - Array.from(request.missingTxHashes).map(txHash => TxHash.fromString(txHash)), + Array.from(request.missingTxTracker.missingTxHashes), { description: `reqresp for slot ${slotNumber}`, method: 'fast-req-resp', ...opts, ...request.blockInfo }, this.getAddContext(request), ); } catch (err) { this.log.error(`Error sending fast reqresp request for txs`, err, { - txs: [...request.missingTxHashes], + txs: [...request.missingTxTracker.missingTxHashes], ...blockInfo, }); } @@ -317,22 +340,20 @@ export class FastTxCollection { for (const tx of txs) { const txHash = tx.txHash.toString(); // Remove the tx hash from the missing set, and add it to the found set. - if (request.missingTxHashes.has(txHash)) { - request.missingTxHashes.delete(txHash); - request.foundTxs.set(txHash, tx); + if (request.missingTxTracker.markFetched(tx)) { this.log.trace(`Found tx ${txHash} for fast collection request`, { ...request.blockInfo, txHash: tx.txHash.toString(), type: request.type, }); - // If we found all txs for this request, we resolve the promise - if (request.missingTxHashes.size === 0) { - this.log.trace(`All txs found for fast collection request`, { - ...request.blockInfo, - type: request.type, - }); - request.promise.resolve(); - } + } + // If we found all txs for this request, we resolve the promise + if (request.missingTxTracker.allFetched()) { + this.log.trace(`All txs found for fast collection request`, { + ...request.blockInfo, + type: request.type, + }); + request.promise.resolve(); } } } diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts index 578a95dc0af9..eacef127cfb2 100644 --- a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.test.ts @@ -30,7 +30,7 @@ describe('FileStoreTxCollection', () => { const makeFileStoreSource = (name: string) => { const source = mock(); source.getInfo.mockReturnValue(name); - source.getTxsByHash.mockResolvedValue([]); + source.getTxsByHash.mockResolvedValue({ validTxs: [], invalidTxHashes: [] }); return source; }; @@ -41,9 +41,12 @@ describe('FileStoreTxCollection', () => { }; const setFileStoreTxs = (source: MockProxy, txs: Tx[]) => { - source.getTxsByHash.mockImplementation(hashes => - Promise.resolve(hashes.map(h => txs.find(tx => tx.getTxHash().equals(h)))), - ); + source.getTxsByHash.mockImplementation(hashes => { + return Promise.resolve({ + validTxs: hashes.map(h => txs.find(tx => tx.getTxHash().equals(h))).filter(tx => tx !== undefined), + invalidTxHashes: [], + }); + }); }; /** Waits for the sink to emit txs-added events for the expected number of txs. */ diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts index baa5879ea46e..165ba3d9928a 100644 --- a/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_collection.ts @@ -126,9 +126,13 @@ export class FileStoreTxCollection { try { const result = await this.txCollectionSink.collect( - hashes => source.getTxsByHash(hashes), - [TxHash.fromString(entry.txHash)], - { description: `file-store ${source.getInfo()}`, method: 'file-store', fileStore: source.getInfo() }, + () => source.getTxsByHash([TxHash.fromString(entry.txHash)]), + [entry.txHash], + { + description: `file-store ${source.getInfo()}`, + method: 'file-store', + fileStore: source.getInfo(), + }, entry.context, ); if (result.txs.length > 0) { diff --git a/yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts b/yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts index d9762a955d43..d682e303a0cf 100644 --- a/yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts +++ b/yarn-project/p2p/src/services/tx_collection/file_store_tx_source.ts @@ -10,7 +10,7 @@ import { getTelemetryClient, } from '@aztec/telemetry-client'; -import type { TxSource } from './tx_source.js'; +import type { TxSource, TxSourceCollectionResult } from './tx_source.js'; /** TxSource implementation that downloads txs from a file store. */ export class FileStoreTxSource implements TxSource { @@ -64,24 +64,37 @@ export class FileStoreTxSource implements TxSource { return `file-store:${this.baseUrl}`; } - public getTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> { - return Promise.all( - txHashes.map(async txHash => { - const path = `${this.basePath}/txs/${txHash.toString()}.bin`; - const timer = new Timer(); - try { - const buffer = await this.fileStore.read(path); - this.downloadsSuccess.add(1); - this.downloadDuration.record(Math.ceil(timer.ms())); - this.downloadSize.record(buffer.length); - return Tx.fromBuffer(buffer); - } catch { - this.downloadsFailed.add(1); - // Tx not found or error reading - return undefined - return undefined; - } - }), - ); + public async getTxsByHash(txHashes: TxHash[]): Promise { + const invalidTxHashes: string[] = []; + return { + validTxs: ( + await Promise.all( + txHashes.map(async txHash => { + const path = `${this.basePath}/txs/${txHash.toString()}.bin`; + const timer = new Timer(); + try { + const buffer = await this.fileStore.read(path); + const tx = Tx.fromBuffer(buffer); + if ((await tx.validateTxHash()) && txHash.equals(tx.txHash)) { + this.downloadsSuccess.add(1); + this.downloadDuration.record(Math.ceil(timer.ms())); + this.downloadSize.record(buffer.length); + return tx; + } else { + invalidTxHashes.push(tx.txHash.toString()); + this.downloadsFailed.add(1); + return undefined; + } + } catch { + // Tx not found or error reading - return undefined + this.downloadsFailed.add(1); + return undefined; + } + }), + ) + ).filter(tx => tx !== undefined), + invalidTxHashes: invalidTxHashes, + }; } } diff --git a/yarn-project/p2p/src/services/tx_collection/missing_txs_tracker.ts b/yarn-project/p2p/src/services/tx_collection/missing_txs_tracker.ts new file mode 100644 index 000000000000..d206a2fdccd9 --- /dev/null +++ b/yarn-project/p2p/src/services/tx_collection/missing_txs_tracker.ts @@ -0,0 +1,52 @@ +import { TxHash } from '@aztec/stdlib/tx'; +import type { Tx } from '@aztec/stdlib/tx'; + +/** + * Tracks which transactions are still missing and need to be fetched. + * Allows external code to mark transactions as fetched, enabling coordination + * between multiple fetching mechanisms (e.g., BatchTxRequester and Rpc Node requests). + */ +export interface IMissingTxsTracker { + /** Returns the set of transaction hashes that are still missing. */ + get missingTxHashes(): Set; + /** Size of this.missingTxHashes */ + get numberOfMissingTxs(): number; + /** Are all requested txs are fetched */ + allFetched(): boolean; + /** Checks that transaction is still missing */ + isMissing(txHash: string): boolean; + /** Marks a transaction as fetched. Returns true if it was previously missing. */ + markFetched(tx: Tx): boolean; + /** Get list of collected txs */ + get collectedTxs(): Tx[]; +} + +export class MissingTxsTracker implements IMissingTxsTracker { + public readonly collectedTxs: Tx[] = []; + + private constructor(public readonly missingTxHashes: Set) {} + + public static fromArray(hashes: TxHash[] | string[]) { + return new MissingTxsTracker(new Set(hashes.map(hash => hash.toString()))); + } + + markFetched(tx: Tx): boolean { + if (this.missingTxHashes.delete(tx.txHash.toString())) { + this.collectedTxs.push(tx); + return true; + } + return false; + } + + get numberOfMissingTxs(): number { + return this.missingTxHashes.size; + } + + allFetched(): boolean { + return this.numberOfMissingTxs === 0; + } + + isMissing(txHash: string): boolean { + return this.missingTxHashes.has(txHash.toString()); + } +} diff --git a/yarn-project/p2p/src/services/tx_collection/proposal_tx_collector.ts b/yarn-project/p2p/src/services/tx_collection/proposal_tx_collector.ts index bf777a37fdb8..5955631c4247 100644 --- a/yarn-project/p2p/src/services/tx_collection/proposal_tx_collector.ts +++ b/yarn-project/p2p/src/services/tx_collection/proposal_tx_collector.ts @@ -1,6 +1,6 @@ import type { Logger } from '@aztec/foundation/log'; import type { DateProvider } from '@aztec/foundation/timer'; -import type { Tx, TxHash } from '@aztec/stdlib/tx'; +import { type Tx, TxHash } from '@aztec/stdlib/tx'; import type { PeerId } from '@libp2p/interface'; @@ -9,6 +9,7 @@ import type { BatchTxRequesterConfig } from '../reqresp/batch-tx-requester/confi import type { BatchTxRequesterLibP2PService } from '../reqresp/batch-tx-requester/interface.js'; import type { IBatchRequestTxValidator } from '../reqresp/batch-tx-requester/tx_validator.js'; import { type BlockTxsSource, ReqRespSubProtocol, chunkTxHashesRequest } from '../reqresp/index.js'; +import type { IMissingTxsTracker } from './missing_txs_tracker.js'; /** * Strategy interface for collecting missing transactions for a block or proposal. @@ -17,14 +18,14 @@ import { type BlockTxsSource, ReqRespSubProtocol, chunkTxHashesRequest } from '. export interface MissingTxsCollector { /** * Collect missing transactions for a block or proposal. - * @param txHashes - The transaction hashes to collect + * @param missingTxsTracker - The missing transactions tracker * @param blockTxsSource - The block or proposal containing the transactions * @param pinnedPeer - Optional peer expected to have the transactions * @param timeoutMs - Timeout in milliseconds * @returns The collected transactions */ collectTxs( - txHashes: TxHash[], + missingTxsTracker: IMissingTxsTracker, blockTxsSource: BlockTxsSource, pinnedPeer: PeerId | undefined, timeoutMs: number, @@ -45,7 +46,7 @@ export class BatchTxRequesterCollector implements MissingTxsCollector { ) {} async collectTxs( - txHashes: TxHash[], + missingTxsTracker: IMissingTxsTracker, blockTxsSource: BlockTxsSource, pinnedPeer: PeerId | undefined, timeoutMs: number, @@ -58,7 +59,7 @@ export class BatchTxRequesterCollector implements MissingTxsCollector { } = this.batchTxRequesterConfig ?? {}; const batchRequester = new BatchTxRequester( - txHashes, + missingTxsTracker, blockTxsSource, pinnedPeer, timeoutMs, @@ -93,14 +94,14 @@ export class SendBatchRequestCollector implements MissingTxsCollector { ) {} async collectTxs( - txHashes: TxHash[], + missingTxsTracker: IMissingTxsTracker, _blockTxsSource: BlockTxsSource, pinnedPeer: PeerId | undefined, timeoutMs: number, ): Promise { const txs = await this.p2pService.reqResp.sendBatchRequest( ReqRespSubProtocol.TX, - chunkTxHashesRequest(txHashes), + chunkTxHashesRequest(Array.from(missingTxsTracker.missingTxHashes).map(TxHash.fromString)), pinnedPeer, timeoutMs, this.maxPeers, diff --git a/yarn-project/p2p/src/services/tx_collection/slow_tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/slow_tx_collection.ts index 6d264c0a8b5f..12f75ff61514 100644 --- a/yarn-project/p2p/src/services/tx_collection/slow_tx_collection.ts +++ b/yarn-project/p2p/src/services/tx_collection/slow_tx_collection.ts @@ -8,8 +8,7 @@ import type { L2Block } from '@aztec/stdlib/block'; import { type L1RollupConstants, getEpochAtSlot, getTimestampRangeForEpoch } from '@aztec/stdlib/epoch-helpers'; import { type Tx, TxHash } from '@aztec/stdlib/tx'; -import { type ReqRespInterface, ReqRespSubProtocol } from '../reqresp/interface.js'; -import { chunkTxHashesRequest } from '../reqresp/protocols/tx.js'; +import { type ReqRespInterface, ReqRespSubProtocol, chunkTxHashesRequest } from '../reqresp/index.js'; import type { TxCollectionConfig } from './config.js'; import type { FastTxCollection } from './fast_tx_collection.js'; import type { MissingTxInfo } from './tx_collection.js'; @@ -120,8 +119,8 @@ export class SlowTxCollection { const txHashes = entries.map(([txHash]) => TxHash.fromString(txHash)); for (const batch of chunk(txHashes, this.config.txCollectionNodeRpcMaxBatchSize)) { await this.txCollectionSink.collect( - hashes => node.getTxsByHash(hashes), - batch, + () => node.getTxsByHash(batch), + batch.map(h => h.toString()), { description: `node ${node.getInfo()}`, node: node.getInfo(), @@ -166,18 +165,18 @@ export class SlowTxCollection { const txHashes = entries.map(([txHash]) => TxHash.fromString(txHash)); const maxPeers = boundInclusive(Math.ceil(txHashes.length / 3), 4, 16); await this.txCollectionSink.collect( - async hashes => { + async () => { const txs = await this.reqResp.sendBatchRequest( ReqRespSubProtocol.TX, - chunkTxHashesRequest(hashes), + chunkTxHashesRequest(txHashes), pinnedPeer, timeoutMs, maxPeers, maxRetryAttempts, ); - return txs.flat(); + return { validTxs: txs.flat(), invalidTxHashes: [] }; }, - txHashes, + txHashes.map(h => h.toString()), { description: 'slow reqresp', timeoutMs, method: 'slow-req-resp' }, { type: 'mined', block }, ); @@ -197,7 +196,7 @@ export class SlowTxCollection { // from mined unproven blocks it has seen in the past. const fastRequests = this.fastCollection.getFastCollectionRequests(); const fastCollectionTxs: Set = new Set( - ...Array.from(fastRequests.values()).flatMap(r => r.missingTxHashes), + fastRequests.values().flatMap(r => Array.from(r.missingTxTracker.missingTxHashes)), ); // Return all missing txs that are not in fastCollectionTxs and are ready for reqresp if requested diff --git a/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts b/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts index 8cb04633dbb5..5d80a38c6836 100644 --- a/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts +++ b/yarn-project/p2p/src/services/tx_collection/tx_collection.test.ts @@ -46,14 +46,14 @@ describe('TxCollection', () => { const makeNode = (name: string) => { const node = mock(); node.getInfo.mockReturnValue(name); - node.getTxsByHash.mockResolvedValue([]); + node.getTxsByHash.mockResolvedValue({ validTxs: [], invalidTxHashes: [] }); return node; }; const makeFileStoreSource = (name: string) => { const source = mock(); source.getInfo.mockReturnValue(name); - source.getTxsByHash.mockResolvedValue([]); + source.getTxsByHash.mockResolvedValue({ validTxs: [], invalidTxHashes: [] }); return source; }; @@ -76,7 +76,10 @@ describe('TxCollection', () => { const setNodeTxs = (node: MockProxy, txs: Tx[]) => { node.getTxsByHash.mockImplementation(async hashes => { await sleep(1); - return hashes.map(h => txs.find(tx => tx.txHash.equals(h))); + return { + validTxs: hashes.map(h => txs.find(tx => tx.txHash.equals(h))).filter(tx => tx !== undefined), + invalidTxHashes: [], + }; }); }; @@ -513,7 +516,10 @@ describe('TxCollection', () => { const setFileStoreTxs = (source: MockProxy, txsToReturn: Tx[]) => { source.getTxsByHash.mockImplementation(hashes => { - return Promise.resolve(hashes.map(h => txsToReturn.find(tx => tx.txHash.equals(h)))); + return Promise.resolve({ + validTxs: hashes.map(h => txsToReturn.find(tx => tx.txHash.equals(h))).filter(tx => tx !== undefined), + invalidTxHashes: [], + }); }); }; diff --git a/yarn-project/p2p/src/services/tx_collection/tx_collection.ts b/yarn-project/p2p/src/services/tx_collection/tx_collection.ts index eae579e2a555..9797b6bd34a5 100644 --- a/yarn-project/p2p/src/services/tx_collection/tx_collection.ts +++ b/yarn-project/p2p/src/services/tx_collection/tx_collection.ts @@ -7,7 +7,8 @@ import { DateProvider } from '@aztec/foundation/timer'; import type { L2Block, L2BlockInfo } from '@aztec/stdlib/block'; import type { L1RollupConstants } from '@aztec/stdlib/epoch-helpers'; import type { BlockProposal } from '@aztec/stdlib/p2p'; -import { Tx, TxHash } from '@aztec/stdlib/tx'; +import type { Tx } from '@aztec/stdlib/tx'; +import { TxHash } from '@aztec/stdlib/tx'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; import type { PeerId } from '@libp2p/interface'; @@ -18,6 +19,7 @@ import type { TxCollectionConfig } from './config.js'; import { FastTxCollection } from './fast_tx_collection.js'; import { FileStoreTxCollection } from './file_store_tx_collection.js'; import type { FileStoreTxSource } from './file_store_tx_source.js'; +import type { IMissingTxsTracker } from './missing_txs_tracker.js'; import { SlowTxCollection, getProofDeadlineForSlot } from './slow_tx_collection.js'; import { type TxAddContext, TxCollectionSink } from './tx_collection_sink.js'; import type { TxSource } from './tx_source.js'; @@ -31,11 +33,10 @@ export type FastCollectionRequestInput = | { type: 'proposal'; blockProposal: BlockProposal; blockNumber: BlockNumber }; export type FastCollectionRequest = FastCollectionRequestInput & { - missingTxHashes: Set; + missingTxTracker: IMissingTxsTracker; deadline: Date; blockInfo: L2BlockInfo; promise: PromiseWithResolvers; - foundTxs: Map; }; /** diff --git a/yarn-project/p2p/src/services/tx_collection/tx_collection_sink.ts b/yarn-project/p2p/src/services/tx_collection/tx_collection_sink.ts index 406348907331..7848111d16b0 100644 --- a/yarn-project/p2p/src/services/tx_collection/tx_collection_sink.ts +++ b/yarn-project/p2p/src/services/tx_collection/tx_collection_sink.ts @@ -2,14 +2,15 @@ import type { Logger } from '@aztec/foundation/log'; import { elapsed } from '@aztec/foundation/timer'; import type { TypedEventEmitter } from '@aztec/foundation/types'; import type { L2Block } from '@aztec/stdlib/block'; -import type { BlockHeader, Tx, TxHash } from '@aztec/stdlib/tx'; +import type { BlockHeader, Tx } from '@aztec/stdlib/tx'; import type { TelemetryClient } from '@aztec/telemetry-client'; import EventEmitter from 'node:events'; -import type { TxPoolV2, TxPoolV2Events } from '../../mem_pools/tx_pool_v2/interfaces.js'; +import type { TxPoolV2, TxPoolV2Events } from '../../mem_pools/index.js'; import { TxCollectionInstrumentation } from './instrumentation.js'; import type { CollectionMethod } from './tx_collection.js'; +import type { TxSourceCollectionResult } from './tx_source.js'; /** Context determining how collected txs should be added to the pool. */ export type TxAddContext = { type: 'proposal'; blockHeader: BlockHeader } | { type: 'mined'; block: L2Block }; @@ -31,52 +32,37 @@ export class TxCollectionSink extends (EventEmitter as new () => TypedEventEmitt } public async collect( - collectValidTxsFn: (txHashes: TxHash[]) => Promise<(Tx | undefined)[]>, - requested: TxHash[], + collectValidTxsFn: () => Promise, + requested: string[], info: Record & { description: string; method: CollectionMethod }, context: TxAddContext, ) { this.log.trace(`Requesting ${requested.length} txs via ${info.description}`, { ...info, - requestedTxs: requested.map(t => t.toString()), + requestedTxs: requested, }); // Execute collection function and measure the time taken, catching any errors. - const [duration, txs] = await elapsed(async () => { + const [duration, { validTxs, invalidTxHashes }] = await elapsed(async () => { try { - const response = await collectValidTxsFn(requested); - return response.filter(tx => tx !== undefined); + return await collectValidTxsFn(); } catch (err) { this.log.error(`Error collecting txs via ${info.description}`, err, { ...info, - requestedTxs: requested.map(hash => hash.toString()), + requestedTxs: requested, }); - return [] as Tx[]; + return { validTxs: [] as Tx[], invalidTxHashes: [] as string[] }; } }); - if (txs.length === 0) { + if (validTxs.length === 0 && invalidTxHashes.length === 0) { this.log.trace(`No txs found via ${info.description}`, { ...info, - requestedTxs: requested.map(t => t.toString()), + requestedTxs: requested, }); - return { txs, requested, duration }; + return { txs: validTxs, requested, duration }; } - // Validate tx hashes for all collected txs from external sources - const validTxs: Tx[] = []; - const invalidTxHashes: string[] = []; - await Promise.all( - txs.map(async tx => { - const isValid = await tx.validateTxHash(); - if (isValid) { - validTxs.push(tx); - } else { - invalidTxHashes.push(tx.getTxHash().toString()); - } - }), - ); - if (invalidTxHashes.length > 0) { this.log.warn(`Rejecting ${invalidTxHashes.length} txs with invalid hashes from ${info.description}`, { ...info, @@ -87,7 +73,7 @@ export class TxCollectionSink extends (EventEmitter as new () => TypedEventEmitt if (validTxs.length === 0) { this.log.trace(`No valid txs found via ${info.description} after validation`, { ...info, - requestedTxs: requested.map(t => t.toString()), + requestedTxs: requested, invalidTxHashes, }); return { txs: [], requested, duration }; @@ -99,7 +85,7 @@ export class TxCollectionSink extends (EventEmitter as new () => TypedEventEmitt ...info, duration, txs: validTxs.map(t => t.getTxHash().toString()), - requestedTxs: requested.map(t => t.toString()), + requestedTxs: requested, rejectedCount: invalidTxHashes.length, }, ); diff --git a/yarn-project/p2p/src/services/tx_collection/tx_source.ts b/yarn-project/p2p/src/services/tx_collection/tx_source.ts index e9b6131b9ffd..68e2ad8c1c43 100644 --- a/yarn-project/p2p/src/services/tx_collection/tx_source.ts +++ b/yarn-project/p2p/src/services/tx_collection/tx_source.ts @@ -6,9 +6,11 @@ import type { Tx, TxHash } from '@aztec/stdlib/tx'; import { type ComponentsVersions, getComponentsVersionsFromConfig } from '@aztec/stdlib/versioning'; import { makeTracedFetch } from '@aztec/telemetry-client'; +export type TxSourceCollectionResult = { validTxs: Tx[]; invalidTxHashes: string[] }; + export interface TxSource { getInfo(): string; - getTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]>; + getTxsByHash(txHashes: TxHash[]): Promise; } export class NodeRpcTxSource implements TxSource { @@ -26,8 +28,25 @@ export class NodeRpcTxSource implements TxSource { return this.info; } - public getTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> { - return this.client.getTxsByHash(txHashes); + public async getTxsByHash(txHashes: TxHash[]): Promise { + return this.verifyTxs(await this.client.getTxsByHash(txHashes)); + } + + private async verifyTxs(txs: Tx[]): Promise { + // Validate tx hashes for all collected txs from external sources + const validTxs: Tx[] = []; + const invalidTxHashes: string[] = []; + await Promise.all( + txs.map(async tx => { + const isValid = await tx.validateTxHash(); + if (isValid) { + validTxs.push(tx); + } else { + invalidTxHashes.push(tx.getTxHash().toString()); + } + }), + ); + return { validTxs: validTxs, invalidTxHashes: invalidTxHashes }; } } diff --git a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts index 1a0ab6b19fde..893a0b54aa9d 100644 --- a/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts +++ b/yarn-project/p2p/src/services/tx_file_store/tx_file_store.test.ts @@ -309,6 +309,54 @@ describe('TxFileStore', () => { }, 10000); }); + describe('tx download validation', () => { + it('rejects tx with invalid hash when reading from file store', async () => { + // Write a tx with a mismatched hash directly to the file store + const invalidTx = Tx.random(); // random hash does not match computed hash + await fileStore.save(`${basePath}/txs/${invalidTx.txHash.toString()}.bin`, invalidTx.toBuffer(), { + compress: false, + }); + + // Read it back via FileStoreTxSource + const source = (await FileStoreTxSource.create(`file://${tmpDir}`, basePath, log))!; + const result = await source.getTxsByHash([invalidTx.txHash]); + + expect(result.validTxs).toHaveLength(0); + expect(result.invalidTxHashes).toEqual([invalidTx.txHash.toString()]); + }); + + it('rejects tx when tx with wrong hash is returned', async () => { + // Write a tx with a mismatched hash directly to the file store + const invalidTx = Tx.random(); // random hash does not match computed hash + const validTx = await makeTx(); + await fileStore.save(`${basePath}/txs/${invalidTx.txHash.toString()}.bin`, validTx.toBuffer(), { + compress: false, + }); + + // Read it back via FileStoreTxSource + const source = (await FileStoreTxSource.create(`file://${tmpDir}`, basePath, log))!; + const result = await source.getTxsByHash([invalidTx.txHash]); + + expect(result.validTxs).toHaveLength(0); + expect(result.invalidTxHashes).toEqual([validTx.txHash.toString()]); + }); + + it('accepts correct tx', async () => { + // Write a tx with a correct hash directly to the file store + const validTx = await makeTx(); + await fileStore.save(`${basePath}/txs/${validTx.txHash.toString()}.bin`, validTx.toBuffer(), { + compress: false, + }); + + // Read it back via FileStoreTxSource + const source = (await FileStoreTxSource.create(`file://${tmpDir}`, basePath, log))!; + const result = await source.getTxsByHash([validTx.txHash]); + + expect(result.validTxs).toHaveLength(1); + expect(result.invalidTxHashes).toHaveLength(0); + }); + }); + describe('getPendingUploadCount', () => { it('returns correct count of pending uploads', async () => { txFileStore = await TxFileStore.create(txPool, config, basePath, log, undefined, fileStore); @@ -344,9 +392,9 @@ describe('TxFileStore', () => { expect(txSource).toBeDefined(); const results = await txSource!.getTxsByHash([tx.getTxHash()]); - expect(results).toHaveLength(1); - expect(results[0]).toBeDefined(); - expect(results[0]!.toBuffer()).toEqual(tx.toBuffer()); + expect(results.validTxs).toHaveLength(1); + expect(results.validTxs[0]).toBeDefined(); + expect(results.validTxs[0]!.toBuffer()).toEqual(tx.toBuffer()); }); }); }); diff --git a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts index 4f563c4e8672..a44fa2808dea 100644 --- a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts +++ b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts @@ -29,22 +29,19 @@ import type { Message, PeerId } from '@libp2p/interface'; import { TopicValidatorResult } from '@libp2p/interface'; import { peerIdFromString } from '@libp2p/peer-id'; -import type { P2PClient } from '../client/p2p_client.js'; +import type { P2PClient } from '../client/index.js'; import type { P2PConfig } from '../config.js'; import { createP2PClient } from '../index.js'; -import type { MemPools } from '../mem_pools/interface.js'; -import { LibP2PService } from '../services/libp2p/libp2p_service.js'; +import type { MemPools } from '../mem_pools/index.js'; +import { BatchTxRequesterCollector, LibP2PService, SendBatchRequestCollector } from '../services/index.js'; import type { PeerManager } from '../services/peer-manager/peer_manager.js'; import type { BatchTxRequesterLibP2PService } from '../services/reqresp/batch-tx-requester/interface.js'; import type { IBatchRequestTxValidator } from '../services/reqresp/batch-tx-requester/tx_validator.js'; import { RateLimitStatus } from '../services/reqresp/rate-limiter/rate_limiter.js'; import type { ReqResp } from '../services/reqresp/reqresp.js'; import type { PeerDiscoveryService } from '../services/service.js'; -import { - BatchTxRequesterCollector, - SendBatchRequestCollector, -} from '../services/tx_collection/proposal_tx_collector.js'; -import { AlwaysTrueCircuitVerifier } from '../test-helpers/reqresp-nodes.js'; +import { MissingTxsTracker } from '../services/tx_collection/missing_txs_tracker.js'; +import { AlwaysTrueCircuitVerifier } from '../test-helpers/index.js'; import { BENCHMARK_CONSTANTS, type CollectorType, @@ -55,7 +52,7 @@ import { createMockEpochCache, createMockWorldStateSynchronizer, filterTxsByDistribution, -} from '../test-helpers/testbench-utils.js'; +} from '../test-helpers/index.js'; import type { PubSubLibp2p } from '../util.js'; export type { DistributionPattern, CollectorType } from '../test-helpers/testbench-utils.js'; @@ -277,7 +274,12 @@ async function runAggregatorBenchmark( new DateProvider(), noopTxValidator, ); - const fetchedTxs = await collector.collectTxs(txHashes, blockProposal, pinnedPeer, timeoutMs); + const fetchedTxs = await collector.collectTxs( + MissingTxsTracker.fromArray(txHashes), + blockProposal, + pinnedPeer, + timeoutMs, + ); const durationMs = timer.ms(); return { type: 'BENCH_RESULT', @@ -292,7 +294,12 @@ async function runAggregatorBenchmark( BENCHMARK_CONSTANTS.FIXED_MAX_PEERS, BENCHMARK_CONSTANTS.FIXED_MAX_RETRY_ATTEMPTS, ); - const fetchedTxs = await collector.collectTxs(txHashes, blockProposal, pinnedPeer, timeoutMs); + const fetchedTxs = await collector.collectTxs( + MissingTxsTracker.fromArray(txHashes), + blockProposal, + pinnedPeer, + timeoutMs, + ); const durationMs = timer.ms(); return { type: 'BENCH_RESULT',