diff --git a/yarn-project/p2p/src/client/p2p_client.test.ts b/yarn-project/p2p/src/client/p2p_client.test.ts index f4ed001ffd89..cd1736de1293 100644 --- a/yarn-project/p2p/src/client/p2p_client.test.ts +++ b/yarn-project/p2p/src/client/p2p_client.test.ts @@ -134,6 +134,60 @@ describe('In-Memory P2P Client', () => { await client.stop(); }); + it('request transactions handles missing items', async () => { + // Mock a batch response that returns undefined items + const mockTx1 = await mockTx(); + const mockTx2 = await mockTx(); + p2pService.sendBatchRequest.mockResolvedValue([mockTx1, undefined, mockTx2]); + + // Spy on the tx pool addTxs method, it should not be called for the missing tx + const addTxsSpy = jest.spyOn(txPool, 'addTxs'); + + await client.start(); + + const missingTxHash = (await mockTx()).getTxHash(); + const query = await Promise.all([mockTx1.getTxHash(), missingTxHash, mockTx2.getTxHash()]); + const results = await client.requestTxsByHash(query); + + expect(results).toEqual([mockTx1, undefined, mockTx2]); + + expect(addTxsSpy).toHaveBeenCalledTimes(1); + expect(addTxsSpy).toHaveBeenCalledWith([mockTx1, mockTx2]); + }); + + it('getTxsByHash handles missing items', async () => { + // We expect the node to fetch this item from their local p2p pool + const txInMempool = await mockTx(); + // We expect this transaction to be requested from the network + const txToBeRequested = await mockTx(); + // We expect this transaction to not be found + const txToNotBeFound = await mockTx(); + + txPool.getTxByHash.mockImplementation(async txHash => { + if (txHash === (await txInMempool.getTxHash())) { + return txInMempool; + } + return undefined; + }); + + const addTxsSpy = jest.spyOn(txPool, 'addTxs'); + const requestTxsSpy = jest.spyOn(client, 'requestTxsByHash'); + + p2pService.sendBatchRequest.mockResolvedValue([txToBeRequested, undefined]); + + await client.start(); + + const query = await Promise.all([txInMempool.getTxHash(), txToBeRequested.getTxHash(), txToNotBeFound.getTxHash()]); + const results = await client.getTxsByHash(query); + + // We should return the resolved transactions + expect(results).toEqual([txInMempool, txToBeRequested]); + // We should add the found requested transactions to the pool + expect(addTxsSpy).toHaveBeenCalledWith([txToBeRequested]); + // We should request the missing transactions from the network, but only find one of them + expect(requestTxsSpy).toHaveBeenCalledWith([await txToBeRequested.getTxHash(), await txToNotBeFound.getTxHash()]); + }); + describe('Chain prunes', () => { it('moves the tips on a chain reorg', async () => { blockSource.setProvenBlockNumber(0); diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index bb117fb35315..7687b9fb1366 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -445,12 +445,17 @@ export class P2PClient /** * Uses the batched Request Response protocol to request a set of transactions from the network. */ - public async requestTxsByHash(txHashes: TxHash[]): Promise { - const txs = (await this.p2pService.sendBatchRequest(ReqRespSubProtocol.TX, txHashes)) ?? []; - await this.txPool.addTxs(txs); + public async requestTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> { + const txs = await this.p2pService.sendBatchRequest(ReqRespSubProtocol.TX, txHashes); + + // Some transactions may return undefined, so we filter them out + const filteredTxs = txs.filter((tx): tx is Tx => !!tx); + await this.txPool.addTxs(filteredTxs); const txHashesStr = txHashes.map(tx => tx.toString()).join(', '); this.log.debug(`Received batched txs ${txHashesStr} (${txs.length} / ${txHashes.length}}) from peers`); - return txs as Tx[]; + + // We return all transactions, even the not found ones to the caller, such they can handle missing items themselves. + return txs; } public getPendingTxs(): Promise { @@ -533,7 +538,8 @@ export class P2PClient } const missingTxs = await this.requestTxsByHash(missingTxHashes); - return txs.filter((tx): tx is Tx => !!tx).concat(missingTxs); + const fetchedMissingTxs = missingTxs.filter((tx): tx is Tx => !!tx); + return txs.filter((tx): tx is Tx => !!tx).concat(fetchedMissingTxs); } /** diff --git a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts index 888260fa36f3..0c22d0a5f0fe 100644 --- a/yarn-project/p2p/src/services/libp2p/libp2p_service.ts +++ b/yarn-project/p2p/src/services/libp2p/libp2p_service.ts @@ -434,7 +434,7 @@ export class LibP2PService extends sendBatchRequest( protocol: SubProtocol, requests: InstanceType[], - ): Promise[] | undefined> { + ): Promise<(InstanceType | undefined)[]> { return this.reqresp.sendBatchRequest(protocol, requests); } diff --git a/yarn-project/p2p/src/services/reqresp/reqresp.ts b/yarn-project/p2p/src/services/reqresp/reqresp.ts index fabb9172b31b..a3278c5411ee 100644 --- a/yarn-project/p2p/src/services/reqresp/reqresp.ts +++ b/yarn-project/p2p/src/services/reqresp/reqresp.ts @@ -264,9 +264,9 @@ export class ReqResp { timeoutMs = 10000, maxPeers = Math.min(10, requests.length), maxRetryAttempts = 3, - ): Promise[]> { + ): Promise<(InstanceType | undefined)[]> { const responseValidator = this.subProtocolValidators[subProtocol]; - const responses: InstanceType[] = new Array(requests.length); + const responses: (InstanceType | undefined)[] = new Array(requests.length); const requestBuffers = requests.map(req => req.toBuffer()); const requestFunction = async () => { @@ -378,7 +378,7 @@ export class ReqResp { }; try { - return await executeTimeout[]>( + return await executeTimeout<(InstanceType | undefined)[]>( requestFunction, timeoutMs, () => new CollectiveReqRespTimeoutError(), diff --git a/yarn-project/p2p/src/services/service.ts b/yarn-project/p2p/src/services/service.ts index 334595373a34..d0d1b73bcb06 100644 --- a/yarn-project/p2p/src/services/service.ts +++ b/yarn-project/p2p/src/services/service.ts @@ -56,7 +56,7 @@ export interface P2PService { sendBatchRequest( protocol: Protocol, requests: InstanceType[], - ): Promise[] | undefined>; + ): Promise<(InstanceType | undefined)[]>; // Leaky abstraction: fix https://github.com/AztecProtocol/aztec-packages/issues/7963 registerBlockReceivedCallback(callback: (block: BlockProposal) => Promise): void; diff --git a/yarn-project/stdlib/src/interfaces/prover-coordination.ts b/yarn-project/stdlib/src/interfaces/prover-coordination.ts index 6ce99c0f3dcb..60fc8f392e60 100644 --- a/yarn-project/stdlib/src/interfaces/prover-coordination.ts +++ b/yarn-project/stdlib/src/interfaces/prover-coordination.ts @@ -16,7 +16,7 @@ export interface ProverCoordination { /** * Returns a set of transactions given their hashes if available. * @param txHashes - The hashes of the transactions, used as an ID. - * @returns The transactions, if found, 'undefined' otherwise. + * @returns The transactions found, no necessarily in the same order as the hashes. */ getTxsByHash(txHashes: TxHash[]): Promise; }