Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions yarn-project/p2p/src/client/p2p_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,60 @@ describe('In-Memory P2P Client', () => {
await client.stop();
});

it('request transactions handles missing items', async () => {
// Mock a batch response that returns undefined items
const mockTx1 = await mockTx();
const mockTx2 = await mockTx();
p2pService.sendBatchRequest.mockResolvedValue([mockTx1, undefined, mockTx2]);

// Spy on the tx pool addTxs method, it should not be called for the missing tx
const addTxsSpy = jest.spyOn(txPool, 'addTxs');

await client.start();

const missingTxHash = (await mockTx()).getTxHash();
const query = await Promise.all([mockTx1.getTxHash(), missingTxHash, mockTx2.getTxHash()]);
const results = await client.requestTxsByHash(query);

expect(results).toEqual([mockTx1, undefined, mockTx2]);

expect(addTxsSpy).toHaveBeenCalledTimes(1);
expect(addTxsSpy).toHaveBeenCalledWith([mockTx1, mockTx2]);
});

it('getTxsByHash handles missing items', async () => {
// We expect the node to fetch this item from their local p2p pool
const txInMempool = await mockTx();
// We expect this transaction to be requested from the network
const txToBeRequested = await mockTx();
// We expect this transaction to not be found
const txToNotBeFound = await mockTx();

txPool.getTxByHash.mockImplementation(async txHash => {
if (txHash === (await txInMempool.getTxHash())) {
return txInMempool;
}
return undefined;
});

const addTxsSpy = jest.spyOn(txPool, 'addTxs');
const requestTxsSpy = jest.spyOn(client, 'requestTxsByHash');

p2pService.sendBatchRequest.mockResolvedValue([txToBeRequested, undefined]);

await client.start();

const query = await Promise.all([txInMempool.getTxHash(), txToBeRequested.getTxHash(), txToNotBeFound.getTxHash()]);
const results = await client.getTxsByHash(query);

// We should return the resolved transactions
expect(results).toEqual([txInMempool, txToBeRequested]);
// We should add the found requested transactions to the pool
expect(addTxsSpy).toHaveBeenCalledWith([txToBeRequested]);
// We should request the missing transactions from the network, but only find one of them
expect(requestTxsSpy).toHaveBeenCalledWith([await txToBeRequested.getTxHash(), await txToNotBeFound.getTxHash()]);
});

describe('Chain prunes', () => {
it('moves the tips on a chain reorg', async () => {
blockSource.setProvenBlockNumber(0);
Expand Down
16 changes: 11 additions & 5 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,17 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
/**
* Uses the batched Request Response protocol to request a set of transactions from the network.
*/
public async requestTxsByHash(txHashes: TxHash[]): Promise<Tx[]> {
const txs = (await this.p2pService.sendBatchRequest(ReqRespSubProtocol.TX, txHashes)) ?? [];
await this.txPool.addTxs(txs);
public async requestTxsByHash(txHashes: TxHash[]): Promise<(Tx | undefined)[]> {
const txs = await this.p2pService.sendBatchRequest(ReqRespSubProtocol.TX, txHashes);

// Some transactions may return undefined, so we filter them out
const filteredTxs = txs.filter((tx): tx is Tx => !!tx);
await this.txPool.addTxs(filteredTxs);
const txHashesStr = txHashes.map(tx => tx.toString()).join(', ');
this.log.debug(`Received batched txs ${txHashesStr} (${txs.length} / ${txHashes.length}}) from peers`);
return txs as Tx[];

// We return all transactions, even the not found ones to the caller, such they can handle missing items themselves.
return txs;
}

public getPendingTxs(): Promise<Tx[]> {
Expand Down Expand Up @@ -533,7 +538,8 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
}

const missingTxs = await this.requestTxsByHash(missingTxHashes);
return txs.filter((tx): tx is Tx => !!tx).concat(missingTxs);
const fetchedMissingTxs = missingTxs.filter((tx): tx is Tx => !!tx);
return txs.filter((tx): tx is Tx => !!tx).concat(fetchedMissingTxs);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/services/libp2p/libp2p_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ export class LibP2PService<T extends P2PClientType = P2PClientType.Full> extends
sendBatchRequest<SubProtocol extends ReqRespSubProtocol>(
protocol: SubProtocol,
requests: InstanceType<SubProtocolMap[SubProtocol]['request']>[],
): Promise<InstanceType<SubProtocolMap[SubProtocol]['response']>[] | undefined> {
): Promise<(InstanceType<SubProtocolMap[SubProtocol]['response']> | undefined)[]> {
return this.reqresp.sendBatchRequest(protocol, requests);
}

Expand Down
6 changes: 3 additions & 3 deletions yarn-project/p2p/src/services/reqresp/reqresp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,9 @@ export class ReqResp {
timeoutMs = 10000,
maxPeers = Math.min(10, requests.length),
maxRetryAttempts = 3,
): Promise<InstanceType<SubProtocolMap[SubProtocol]['response']>[]> {
): Promise<(InstanceType<SubProtocolMap[SubProtocol]['response']> | undefined)[]> {
const responseValidator = this.subProtocolValidators[subProtocol];
const responses: InstanceType<SubProtocolMap[SubProtocol]['response']>[] = new Array(requests.length);
const responses: (InstanceType<SubProtocolMap[SubProtocol]['response']> | undefined)[] = new Array(requests.length);
const requestBuffers = requests.map(req => req.toBuffer());

const requestFunction = async () => {
Expand Down Expand Up @@ -378,7 +378,7 @@ export class ReqResp {
};

try {
return await executeTimeout<InstanceType<SubProtocolMap[SubProtocol]['response']>[]>(
return await executeTimeout<(InstanceType<SubProtocolMap[SubProtocol]['response']> | undefined)[]>(
requestFunction,
timeoutMs,
() => new CollectiveReqRespTimeoutError(),
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/p2p/src/services/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export interface P2PService {
sendBatchRequest<Protocol extends ReqRespSubProtocol>(
protocol: Protocol,
requests: InstanceType<SubProtocolMap[Protocol]['request']>[],
): Promise<InstanceType<SubProtocolMap[Protocol]['response']>[] | undefined>;
): Promise<(InstanceType<SubProtocolMap[Protocol]['response']> | undefined)[]>;

// Leaky abstraction: fix https://github.com/AztecProtocol/aztec-packages/issues/7963
registerBlockReceivedCallback(callback: (block: BlockProposal) => Promise<BlockAttestation | undefined>): void;
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/stdlib/src/interfaces/prover-coordination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export interface ProverCoordination {
/**
* Returns a set of transactions given their hashes if available.
* @param txHashes - The hashes of the transactions, used as an ID.
* @returns The transactions, if found, 'undefined' otherwise.
* @returns The transactions found, no necessarily in the same order as the hashes.
*/
getTxsByHash(txHashes: TxHash[]): Promise<Tx[]>;
}
Expand Down