Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions yarn-project/p2p/src/client/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,6 @@ export type P2P<T extends P2PClientType = P2PClientType.Full> = P2PApiFull<T> &
*/
hasTxsInPool(txHashes: TxHash[]): Promise<boolean[]>;

/**
* Returns transactions in the transaction pool by hash, requesting from the network if not found.
* @param txHashes - Hashes of tx to return.
* @param pinnedPeerId - An optional peer id that will be used to request the tx from (in addition to other random peers).
* @returns An array of tx or undefined.
*/
getTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId | undefined): Promise<(Tx | undefined)[]>;

/**
* Returns an archived transaction from the transaction pool by its hash.
* @param txHash - Hash of tx to return.
Expand Down
79 changes: 1 addition & 78 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { L2Block } from '@aztec/stdlib/block';
import { EmptyL1RollupConstants, type L1RollupConstants } from '@aztec/stdlib/epoch-helpers';
import { P2PClientType } from '@aztec/stdlib/p2p';
import { mockTx } from '@aztec/stdlib/testing';
import { TxArray, TxHash, TxHashArray } from '@aztec/stdlib/tx';
import { TxHash } from '@aztec/stdlib/tx';

import { expect, jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';
Expand All @@ -19,7 +19,6 @@ import type { P2PService } from '../index.js';
import { type AttestationPool, createTestAttestationPool } from '../mem_pools/attestation_pool/attestation_pool.js';
import type { MemPools } from '../mem_pools/interface.js';
import type { TxPoolV2 } from '../mem_pools/tx_pool_v2/interfaces.js';
import { ReqRespSubProtocol } from '../services/reqresp/interface.js';
import type { TxCollection } from '../services/tx_collection/tx_collection.js';
import { P2PClient } from './p2p_client.js';

Expand Down Expand Up @@ -198,82 +197,6 @@ describe('P2P Client', () => {
await client.stop();
});

it('request transactions handles missing items', async () => {
const mockTx1 = await mockTx();
const mockTx2 = await mockTx();
const mockTx3 = await mockTx();

// None of the txs are in the pool
txPool.getTxByHash.mockResolvedValue(undefined);

// P2P service will not return tx2
p2pService.sendBatchRequest.mockResolvedValue([new TxArray(...[mockTx1, mockTx3])]);

// Spy on the tx pool addPendingTxs method, it should not be called for the missing tx
const addTxsSpy = jest.spyOn(txPool, 'addPendingTxs');

await client.start();

// We query for all 3 txs via getTxsByHash which internally requests from the network
const txHashes = await Promise.all([mockTx1.getTxHash(), mockTx2.getTxHash(), mockTx3.getTxHash()]);
const results = await client.getTxsByHash(txHashes, undefined);

// We should receive the found transactions (tx2 will be undefined)
expect(results).toEqual([mockTx1, undefined, mockTx3]);

// P2P should have been called with the 3 tx hashes (all missing from pool)
expect(p2pService.sendBatchRequest).toHaveBeenCalledWith(
ReqRespSubProtocol.TX,
txHashes.map(hash => new TxHashArray(...[hash])),
undefined,
expect.anything(),
expect.anything(),
expect.anything(),
);

// Retrieved txs should have been added to the pool
expect(addTxsSpy).toHaveBeenCalledTimes(1);
expect(addTxsSpy).toHaveBeenCalledWith([mockTx1, mockTx3]);

await client.stop();
});

it('getTxsByHash handles missing items', async () => {
// We expect the node to fetch this item from their local p2p pool
const txInMempool = await mockTx();
// We expect this transaction to be requested from the network
const txToBeRequested = await mockTx();
// We expect this transaction to not be found
const txToNotBeFound = await mockTx();

txPool.getTxByHash.mockImplementation(txHash =>
Promise.resolve(txHash === txInMempool.getTxHash() ? txInMempool : undefined),
);

const addTxsSpy = jest.spyOn(txPool, 'addPendingTxs');

p2pService.sendBatchRequest.mockResolvedValue([new TxArray(...[txToBeRequested])]);

await client.start();

const query = await Promise.all([txInMempool.getTxHash(), txToBeRequested.getTxHash(), txToNotBeFound.getTxHash()]);
const results = await client.getTxsByHash(query, undefined);

// We should return the resolved transactions (txToNotBeFound is undefined)
expect(results).toEqual([txInMempool, txToBeRequested, undefined]);
// We should add the found requested transactions to the pool
expect(addTxsSpy).toHaveBeenCalledWith([txToBeRequested]);
// The p2p service should have been called to request the missing txs
expect(p2pService.sendBatchRequest).toHaveBeenCalledWith(
ReqRespSubProtocol.TX,
expect.anything(),
undefined,
expect.anything(),
expect.anything(),
expect.anything(),
);
});

it('getPendingTxs respects pagination', async () => {
const txs = await timesAsync(20, i => mockTx(i));
txPool.getPendingTxHashes.mockResolvedValue(await Promise.all(txs.map(tx => tx.getTxHash())));
Expand Down
74 changes: 0 additions & 74 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import {
type ReqRespSubProtocolHandler,
type ReqRespSubProtocolValidators,
} from '../services/reqresp/interface.js';
import { chunkTxHashesRequest } from '../services/reqresp/protocols/tx.js';
import type {
DuplicateAttestationInfo,
DuplicateProposalInfo,
Expand Down Expand Up @@ -433,36 +432,6 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
this.p2pService.registerDuplicateAttestationCallback(callback);
}

/**
* Uses the batched Request Response protocol to request a set of transactions from the network.
*/
private async requestTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId | undefined): Promise<Tx[]> {
const timeoutMs = 8000; // Longer timeout for now
const maxRetryAttempts = 10; // Keep retrying within the timeout
const requests = chunkTxHashesRequest(txHashes);
const maxPeers = Math.min(Math.ceil(requests.length / 3), 10);

const txBatches = await this.p2pService.sendBatchRequest(
ReqRespSubProtocol.TX,
requests,
pinnedPeerId,
timeoutMs,
maxPeers,
maxRetryAttempts,
);

const txs = txBatches.flat();
if (txs.length > 0) {
await this.txPool.addPendingTxs(txs);
}

const txHashesStr = txHashes.map(tx => tx.toString()).join(', ');
this.log.debug(`Requested txs ${txHashesStr} (${txs.length} / ${txHashes.length}) from peers`);

// We return all transactions, even the not found ones to the caller, such they can handle missing items themselves.
return txs;
}

public async getPendingTxs(limit?: number, after?: TxHash): Promise<Tx[]> {
if (limit !== undefined && limit <= 0) {
throw new TypeError('limit must be greater than 0');
Expand Down Expand Up @@ -530,49 +499,6 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
return this.txPool.hasTxs(txHashes);
}

/**
* Returns transactions in the transaction pool by hash.
* If a transaction is not in the pool, it will be requested from the network.
* @param txHashes - Hashes of the transactions to look for.
* @returns The txs found, or undefined if not found in the order requested.
*/
async getTxsByHash(txHashes: TxHash[], pinnedPeerId: PeerId | undefined): Promise<(Tx | undefined)[]> {
const txs = await Promise.all(txHashes.map(txHash => this.txPool.getTxByHash(txHash)));
const missingTxHashes = txs
.map((tx, index) => [tx, index] as const)
.filter(([tx, _index]) => !tx)
.map(([_tx, index]) => txHashes[index]);

if (missingTxHashes.length === 0) {
return txs as Tx[];
}

const missingTxs = await this.requestTxsByHash(missingTxHashes, pinnedPeerId);
// TODO: optimize
// Merge the found txs in order
const mergingTxs = txHashes.map(txHash => {
// Is it in the txs list from the mempool?
for (const tx of txs) {
if (tx !== undefined && tx.getTxHash().equals(txHash)) {
return tx;
}
}

// Is it in the fetched missing txs?
// Note: this is an O(n^2) operation, but we expect the number of missing txs to be small.
for (const tx of missingTxs) {
if (tx.getTxHash().equals(txHash)) {
return tx;
}
}

// Otherwise return undefined
return undefined;
});

return mergingTxs;
}

/**
* Returns an archived transaction in the transaction pool by its hash.
* @param txHash - Hash of the archived transaction to look for.
Expand Down
Loading
Loading