diff --git a/yarn-project/aztec-node/src/aztec-node/server.ts b/yarn-project/aztec-node/src/aztec-node/server.ts index f21e38592438..8383dc7c2f77 100644 --- a/yarn-project/aztec-node/src/aztec-node/server.ts +++ b/yarn-project/aztec-node/src/aztec-node/server.ts @@ -59,6 +59,7 @@ import { computePublicDataTreeLeafSlot, siloNullifier } from '@aztec/circuits.js import { EpochCache } from '@aztec/epoch-cache'; import { type L1ContractAddresses, createEthereumChain } from '@aztec/ethereum'; import { AztecAddress } from '@aztec/foundation/aztec-address'; +import { compactArray } from '@aztec/foundation/collection'; import { type Logger, createLogger } from '@aztec/foundation/log'; import { DateProvider, Timer } from '@aztec/foundation/timer'; import { type AztecKVStore } from '@aztec/kv-store'; @@ -500,6 +501,15 @@ export class AztecNodeService implements AztecNode, Traceable { return Promise.resolve(this.p2pClient!.getTxByHashFromPool(txHash)); } + /** + * Method to retrieve txs from the mempool or unfinalised chain. + * @param txHash - The transaction hash to return. + * @returns - The txs if it exists. + */ + public async getTxsByHash(txHashes: TxHash[]) { + return compactArray(await Promise.all(txHashes.map(txHash => this.getTxByHash(txHash)))); + } + /** * Find the indexes of the given leaves in the given tree. * @param blockNumber - The block number at which to get the data or 'latest' for latest data diff --git a/yarn-project/circuit-types/src/interfaces/aztec-node.test.ts b/yarn-project/circuit-types/src/interfaces/aztec-node.test.ts index 0a537b6b7ddc..44d52d36ca9e 100644 --- a/yarn-project/circuit-types/src/interfaces/aztec-node.test.ts +++ b/yarn-project/circuit-types/src/interfaces/aztec-node.test.ts @@ -281,6 +281,11 @@ describe('AztecNodeApiSchema', () => { expect(response).toBeInstanceOf(Tx); }); + it('getTxsByHash', async () => { + const response = await context.client.getTxsByHash([TxHash.random()]); + expect(response[0]).toBeInstanceOf(Tx); + }); + it('getPublicStorageAt', async () => { const response = await context.client.getPublicStorageAt(await AztecAddress.random(), Fr.random(), 1); expect(response).toBeInstanceOf(Fr); @@ -559,6 +564,10 @@ class MockAztecNode implements AztecNode { expect(txHash).toBeInstanceOf(TxHash); return Promise.resolve(Tx.random()); } + async getTxsByHash(txHashes: TxHash[]): Promise { + expect(txHashes[0]).toBeInstanceOf(TxHash); + return [await Tx.random()]; + } getPublicStorageAt(contract: AztecAddress, slot: Fr, _blockNumber: number | 'latest'): Promise { expect(contract).toBeInstanceOf(AztecAddress); expect(slot).toBeInstanceOf(Fr); diff --git a/yarn-project/circuit-types/src/interfaces/aztec-node.ts b/yarn-project/circuit-types/src/interfaces/aztec-node.ts index 5f376871ff62..1b5afd8e476a 100644 --- a/yarn-project/circuit-types/src/interfaces/aztec-node.ts +++ b/yarn-project/circuit-types/src/interfaces/aztec-node.ts @@ -53,7 +53,7 @@ import { type SequencerConfig, SequencerConfigSchema } from './configs.js'; import { type L2BlockNumber, L2BlockNumberSchema } from './l2_block_number.js'; import { NullifierMembershipWitness } from './nullifier_membership_witness.js'; import { type ProverConfig, ProverConfigSchema } from './prover-client.js'; -import { type ProverCoordination, ProverCoordinationApiSchema } from './prover-coordination.js'; +import { type ProverCoordination } from './prover-coordination.js'; /** * The aztec node. @@ -371,6 +371,13 @@ export interface AztecNode */ getTxByHash(txHash: TxHash): Promise; + /** + * Method to retrieve multiple pending txs. + * @param txHash - The transaction hashes to return. + * @returns The pending txs if exist. + */ + getTxsByHash(txHashes: TxHash[]): Promise; + /** * Gets the storage value at the given contract storage slot. * @@ -453,9 +460,8 @@ export interface AztecNode } export const AztecNodeApiSchema: ApiSchemaFor = { - ...ProverCoordinationApiSchema, - getL2Tips: z.function().args().returns(L2TipsSchema), + findLeavesIndexes: z .function() .args(L2BlockNumberSchema, z.nativeEnum(MerkleTreeId), z.array(schemas.Fr)) @@ -567,6 +573,8 @@ export const AztecNodeApiSchema: ApiSchemaFor = { getTxByHash: z.function().args(TxHash.schema).returns(Tx.schema.optional()), + getTxsByHash: z.function().args(z.array(TxHash.schema)).returns(z.array(Tx.schema)), + getPublicStorageAt: z.function().args(schemas.AztecAddress, schemas.Fr, L2BlockNumberSchema).returns(schemas.Fr), getBlockHeader: z.function().args(optional(L2BlockNumberSchema)).returns(BlockHeader.schema), diff --git a/yarn-project/circuit-types/src/interfaces/prover-coordination.ts b/yarn-project/circuit-types/src/interfaces/prover-coordination.ts index f0df43c34512..396f21cf36b5 100644 --- a/yarn-project/circuit-types/src/interfaces/prover-coordination.ts +++ b/yarn-project/circuit-types/src/interfaces/prover-coordination.ts @@ -15,6 +15,13 @@ export interface ProverCoordination { */ getTxByHash(txHash: TxHash): Promise; + /** + * Returns a set of transactions given their hashes if available. + * @param txHashes - The hashes of the transactions, used as an ID. + * @returns The transactions, if found, 'undefined' otherwise. + */ + getTxsByHash(txHashes: TxHash[]): Promise; + /** * Receives a quote for an epoch proof and stores it in its EpochProofQuotePool * @param quote - The quote to store @@ -24,5 +31,6 @@ export interface ProverCoordination { export const ProverCoordinationApiSchema: ApiSchemaFor = { getTxByHash: z.function().args(TxHash.schema).returns(Tx.schema.optional()), + getTxsByHash: z.function().args(z.array(TxHash.schema)).returns(z.array(Tx.schema)), addEpochProofQuote: z.function().args(EpochProofQuote.schema).returns(z.void()), }; diff --git a/yarn-project/end-to-end/src/fixtures/utils.ts b/yarn-project/end-to-end/src/fixtures/utils.ts index c097c040546a..4331ad70de23 100644 --- a/yarn-project/end-to-end/src/fixtures/utils.ts +++ b/yarn-project/end-to-end/src/fixtures/utils.ts @@ -716,6 +716,7 @@ export async function createAndSyncProverNode( const aztecNodeWithoutStop = { addEpochProofQuote: aztecNode.addEpochProofQuote.bind(aztecNode), getTxByHash: aztecNode.getTxByHash.bind(aztecNode), + getTxsByHash: aztecNode.getTxsByHash.bind(aztecNode), stop: () => Promise.resolve(), }; diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index 0cb4a5b303d7..6847736c957a 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -10,6 +10,7 @@ import { type P2PApi, type P2PClientType, type PeerInfo, + type ProverCoordination, type Tx, type TxHash, } from '@aztec/circuit-types'; @@ -62,130 +63,131 @@ export interface P2PSyncState { /** * Interface of a P2P client. **/ -export type P2P = P2PApi & { - /** - * Broadcasts a block proposal to other peers. - * - * @param proposal - the block proposal - */ - broadcastProposal(proposal: BlockProposal): void; - - /** - * Queries the EpochProofQuote pool for quotes for the given epoch - * - * @param epoch - the epoch to query - * @returns EpochProofQuotes - */ - getEpochProofQuotes(epoch: bigint): Promise; - - /** - * Adds an EpochProofQuote to the pool and broadcasts an EpochProofQuote to other peers. - * - * @param quote - the quote to broadcast - */ - addEpochProofQuote(quote: EpochProofQuote): Promise; - - /** - * Registers a callback from the validator client that determines how to behave when - * foreign block proposals are received - * - * @param handler - A function taking a received block proposal and producing an attestation - */ - // REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963 - // ^ This pattern is not my favorite (md) - registerBlockProposalHandler(handler: (block: BlockProposal) => Promise): void; - - /** - * Request a list of transactions from another peer by their tx hashes. - * @param txHashes - Hashes of the txs to query. - * @returns A list of transactions or undefined if the transactions are not found. - */ - requestTxs(txHashes: TxHash[]): Promise<(Tx | undefined)[]>; - - /** - * Request a transaction from another peer by its tx hash. - * @param txHash - Hash of the tx to query. - */ - requestTxByHash(txHash: TxHash): Promise; - - /** - * Verifies the 'tx' and, if valid, adds it to local tx pool and forwards it to other peers. - * @param tx - The transaction. - **/ - sendTx(tx: Tx): Promise; - - /** - * Deletes 'txs' from the pool, given hashes. - * NOT used if we use sendTx as reconcileTxPool will handle this. - * @param txHashes - Hashes to check. - **/ - deleteTxs(txHashes: TxHash[]): Promise; - - /** - * Returns a transaction in the transaction pool by its hash. - * @param txHash - Hash of tx to return. - * @returns A single tx or undefined. - */ - getTxByHashFromPool(txHash: TxHash): Promise; - - /** - * Returns a transaction in the transaction pool by its hash, requesting it from the network if it is not found. - * @param txHash - Hash of tx to return. - * @returns A single tx or undefined. - */ - getTxByHash(txHash: TxHash): Promise; - - /** - * Returns an archived transaction from the transaction pool by its hash. - * @param txHash - Hash of tx to return. - * @returns A single tx or undefined. - */ - getArchivedTxByHash(txHash: TxHash): Promise; - - /** - * Returns whether the given tx hash is flagged as pending or mined. - * @param txHash - Hash of the tx to query. - * @returns Pending or mined depending on its status, or undefined if not found. - */ - getTxStatus(txHash: TxHash): Promise<'pending' | 'mined' | undefined>; - - /** Returns an iterator over pending txs on the mempool. */ - iteratePendingTxs(): AsyncIterableIterator; - - /** Returns the number of pending txs in the mempool. */ - getPendingTxCount(): Promise; - - /** - * Starts the p2p client. - * @returns A promise signalling the completion of the block sync. - */ - start(): Promise; - - /** - * Stops the p2p client. - * @returns A promise signalling the completion of the stop process. - */ - stop(): Promise; - - /** - * Indicates if the p2p client is ready for transaction submission. - * @returns A boolean flag indicating readiness. - */ - isReady(): boolean; - - /** - * Returns the current status of the p2p client. - */ - getStatus(): Promise; - - /** - * Returns the ENR of this node, if any. - */ - getEnr(): ENR | undefined; - - /** Identifies a p2p client. */ - isP2PClient(): true; -}; +export type P2P = ProverCoordination & + P2PApi & { + /** + * Broadcasts a block proposal to other peers. + * + * @param proposal - the block proposal + */ + broadcastProposal(proposal: BlockProposal): void; + + /** + * Queries the EpochProofQuote pool for quotes for the given epoch + * + * @param epoch - the epoch to query + * @returns EpochProofQuotes + */ + getEpochProofQuotes(epoch: bigint): Promise; + + /** + * Adds an EpochProofQuote to the pool and broadcasts an EpochProofQuote to other peers. + * + * @param quote - the quote to broadcast + */ + addEpochProofQuote(quote: EpochProofQuote): Promise; + + /** + * Registers a callback from the validator client that determines how to behave when + * foreign block proposals are received + * + * @param handler - A function taking a received block proposal and producing an attestation + */ + // REVIEW: https://github.com/AztecProtocol/aztec-packages/issues/7963 + // ^ This pattern is not my favorite (md) + registerBlockProposalHandler(handler: (block: BlockProposal) => Promise): void; + + /** + * Request a list of transactions from another peer by their tx hashes. + * @param txHashes - Hashes of the txs to query. + * @returns A list of transactions or undefined if the transactions are not found. + */ + requestTxs(txHashes: TxHash[]): Promise<(Tx | undefined)[]>; + + /** + * Request a transaction from another peer by its tx hash. + * @param txHash - Hash of the tx to query. + */ + requestTxByHash(txHash: TxHash): Promise; + + /** + * Verifies the 'tx' and, if valid, adds it to local tx pool and forwards it to other peers. + * @param tx - The transaction. + **/ + sendTx(tx: Tx): Promise; + + /** + * Deletes 'txs' from the pool, given hashes. + * NOT used if we use sendTx as reconcileTxPool will handle this. + * @param txHashes - Hashes to check. + **/ + deleteTxs(txHashes: TxHash[]): Promise; + + /** + * Returns a transaction in the transaction pool by its hash. + * @param txHash - Hash of tx to return. + * @returns A single tx or undefined. + */ + getTxByHashFromPool(txHash: TxHash): Promise; + + /** + * Returns a transaction in the transaction pool by its hash, requesting it from the network if it is not found. + * @param txHash - Hash of tx to return. + * @returns A single tx or undefined. + */ + getTxByHash(txHash: TxHash): Promise; + + /** + * Returns an archived transaction from the transaction pool by its hash. + * @param txHash - Hash of tx to return. + * @returns A single tx or undefined. + */ + getArchivedTxByHash(txHash: TxHash): Promise; + + /** + * Returns whether the given tx hash is flagged as pending or mined. + * @param txHash - Hash of the tx to query. + * @returns Pending or mined depending on its status, or undefined if not found. + */ + getTxStatus(txHash: TxHash): Promise<'pending' | 'mined' | undefined>; + + /** Returns an iterator over pending txs on the mempool. */ + iteratePendingTxs(): AsyncIterableIterator; + + /** Returns the number of pending txs in the mempool. */ + getPendingTxCount(): Promise; + + /** + * Starts the p2p client. + * @returns A promise signalling the completion of the block sync. + */ + start(): Promise; + + /** + * Stops the p2p client. + * @returns A promise signalling the completion of the stop process. + */ + stop(): Promise; + + /** + * Indicates if the p2p client is ready for transaction submission. + * @returns A boolean flag indicating readiness. + */ + isReady(): boolean; + + /** + * Returns the current status of the p2p client. + */ + getStatus(): Promise; + + /** + * Returns the ENR of this node, if any. + */ + getEnr(): ENR | undefined; + + /** Identifies a p2p client. */ + isP2PClient(): true; + }; /** * The P2P client implementation. @@ -467,6 +469,17 @@ export class P2PClient return tx; } + /** + * Uses the batched Request Response protocol to request a set of transactions from the network. + */ + public async requestTxsByHash(txHashes: TxHash[]): Promise { + const txs = (await this.p2pService.sendBatchRequest(ReqRespSubProtocol.TX, txHashes)) ?? []; + await this.txPool.addTxs(txs); + const txHashesStr = txHashes.map(tx => tx.toString()).join(', '); + this.log.debug(`Received batched txs ${txHashesStr} (${txs.length} / ${txHashes.length}}) from peers`); + return txs as Tx[]; + } + public getPendingTxs(): Promise { return Promise.resolve(this.getTxs('pending')); } @@ -529,6 +542,27 @@ export class P2PClient return this.requestTxByHash(txHash); } + /** + * Returns transactions in the transaction pool by hash. + * If a transaction is not in the pool, it will be requested from the network. + * @param txHashes - Hashes of the transactions to look for. + * @returns The txs found, not necessarily on the same order as the hashes. + */ + async getTxsByHash(txHashes: TxHash[]): Promise { + const txs = await Promise.all(txHashes.map(txHash => this.txPool.getTxByHash(txHash))); + const missingTxHashes = txs + .map((tx, index) => [tx, index] as const) + .filter(([tx, _index]) => !tx) + .map(([_tx, index]) => txHashes[index]); + + if (missingTxHashes.length === 0) { + return txs as Tx[]; + } + + const missingTxs = await this.requestTxsByHash(missingTxHashes); + return txs.filter((tx): tx is Tx => !!tx).concat(missingTxs); + } + /** * Returns an archived transaction in the transaction pool by its hash. * @param txHash - Hash of the archived transaction to look for. diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index c959cd0daf7f..9c6729e49889 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -151,8 +151,8 @@ describe('prover-node', () => { l2BlockSource.getL1Constants.mockResolvedValue(EmptyL1RollupConstants); // Coordination plays along and returns a tx whenever requested - mockCoordination.getTxByHash.mockImplementation(hash => - Promise.resolve(mock({ getTxHash: () => Promise.resolve(hash) })), + mockCoordination.getTxsByHash.mockImplementation(hashes => + Promise.resolve(hashes.map(hash => mock({ getTxHash: () => Promise.resolve(hash) }))), ); // A sample claim @@ -193,7 +193,7 @@ describe('prover-node', () => { }); it('does not send a quote if there is a tx missing from coordinator', async () => { - mockCoordination.getTxByHash.mockResolvedValue(undefined); + mockCoordination.getTxsByHash.mockResolvedValue([]); await proverNode.handleEpochCompleted(10n); expect(coordination.addEpochProofQuote).not.toHaveBeenCalled(); }); @@ -293,32 +293,11 @@ describe('prover-node', () => { expect(jobs.length).toEqual(1); }); - it('retries acquiring txs if they are not immediately available', async () => { - l2BlockSource.getL2EpochNumber.mockResolvedValue(11n); - publisher.getProofClaim.mockResolvedValue(claim); - const mockGetTxByHash = mockCoordination.getTxByHash.getMockImplementation(); - mockCoordination.getTxByHash.mockResolvedValue(undefined); - - await proverNode.start(); - await sleep(100); - - // initially no job will be started because the txs aren't available - expect(jobs).toHaveLength(0); - expect(mockCoordination.getTxByHash).toHaveBeenCalled(); - - mockCoordination.getTxByHash.mockImplementation(mockGetTxByHash); - await sleep(100); - - // now it should have all the txs necessary to start proving - expect(jobs[0].epochNumber).toEqual(10n); - expect(jobs.length).toEqual(1); - }); - it('does not start proving if txs are not all available', async () => { l2BlockSource.getL2EpochNumber.mockResolvedValue(11n); publisher.getProofClaim.mockResolvedValue(claim); - mockCoordination.getTxByHash.mockResolvedValue(undefined); + mockCoordination.getTxsByHash.mockResolvedValue([]); await proverNode.start(); await sleep(2000); @@ -405,6 +384,12 @@ describe('prover-node', () => { jest.spyOn(p2pClient, 'getTxByHash').mockImplementation(mockGetTxByHash); jest.spyOn(otherP2PClient, 'getTxByHash').mockImplementation(mockGetTxByHash); + // And getTxsByHash just for good measure + const mockGetTxsByHash = (hashes: TxHash[]) => + Promise.resolve(hashes.map(hash => mock({ getTxHash: () => Promise.resolve(hash) }))); + jest.spyOn(p2pClient, 'getTxsByHash').mockImplementation(mockGetTxsByHash); + jest.spyOn(otherP2PClient, 'getTxsByHash').mockImplementation(mockGetTxsByHash); + await Promise.all([p2pClient.start(), otherP2PClient.start()]); // Sleep to enable peer discovery diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 05a2f0a16da8..55b19b334e17 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -17,12 +17,9 @@ import { tryStop, } from '@aztec/circuit-types'; import { type ContractDataSource } from '@aztec/circuits.js'; -import { asyncPool } from '@aztec/foundation/async-pool'; import { compact } from '@aztec/foundation/collection'; import { memoize } from '@aztec/foundation/decorators'; -import { TimeoutError } from '@aztec/foundation/error'; import { createLogger } from '@aztec/foundation/log'; -import { retryUntil } from '@aztec/foundation/retry'; import { DateProvider } from '@aztec/foundation/timer'; import { type Maybe } from '@aztec/foundation/types'; import { type P2P } from '@aztec/p2p'; @@ -333,68 +330,20 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr } private async gatherTxs(epochNumber: bigint, blocks: L2Block[]) { - let txsToFind: TxHash[] = []; - const txHashToBlock = new Map(); - const results = new Map(); - - for (const block of blocks) { - for (const tx of block.body.txEffects) { - txsToFind.push(tx.txHash); - txHashToBlock.set(tx.txHash.toString(), block.number); - } - } + const txsToFind: TxHash[] = blocks.flatMap(block => block.body.txEffects.map(tx => tx.txHash)); + const txs = await this.coordination.getTxsByHash(txsToFind); - const totalTxsRequired = txsToFind.length; - this.log.info( - `Gathering a total of ${totalTxsRequired} txs for epoch=${epochNumber} made up of ${blocks.length} blocks`, - { epochNumber }, - ); - - let iteration = 0; - try { - await retryUntil( - async () => { - const batch = [...txsToFind]; - txsToFind = []; - const batchResults = await asyncPool(this.options.txGatheringMaxParallelRequests, batch, async txHash => { - const tx = await this.coordination.getTxByHash(txHash); - return [txHash, tx] as const; - }); - let found = 0; - for (const [txHash, maybeTx] of batchResults) { - if (maybeTx) { - found++; - results.set(txHash.toString(), maybeTx); - } else { - txsToFind.push(txHash); - } - } - - this.log.verbose( - `Gathered ${found}/${batch.length} txs in iteration ${iteration} for epoch ${epochNumber}. In total ${results.size}/${totalTxsRequired} have been retrieved.`, - { epochNumber }, - ); - iteration++; - - // stop when we found all transactions - return txsToFind.length === 0; - }, - 'Gather txs', - this.options.txGatheringTimeoutMs / 1_000, - this.options.txGatheringIntervalMs / 1_000, - ); - } catch (err) { - if (err && err instanceof TimeoutError) { - const notFoundList = txsToFind - .map(txHash => `${txHash.toString()} (block ${txHashToBlock.get(txHash.toString())})`) - .join(', '); - throw new Error(`Txs not found for epoch ${epochNumber}: ${notFoundList}`); - } else { - throw err; - } + if (txs.length === txsToFind.length) { + this.log.verbose(`Gathered all ${txs.length} txs for epoch ${epochNumber}`, { epochNumber }); + return txs; } - return Array.from(results.values()); + const txHashesFound = await Promise.all(txs.map(tx => tx.getTxHash())); + const missingTxHashes = txsToFind + .filter(txHashToFind => !txHashesFound.some(txHashFound => txHashToFind.equals(txHashFound))) + .join(', '); + + throw new Error(`Txs not found for epoch ${epochNumber}: ${missingTxHashes}`); } /** Extracted for testing purposes. */ diff --git a/yarn-project/txe/src/node/txe_node.ts b/yarn-project/txe/src/node/txe_node.ts index 5b788eb156fe..39bf2d1e4cd8 100644 --- a/yarn-project/txe/src/node/txe_node.ts +++ b/yarn-project/txe/src/node/txe_node.ts @@ -568,6 +568,10 @@ export class TXENode implements AztecNode { throw new Error('TXE Node method getTxByHash not implemented'); } + getTxsByHash(_txHashes: TxHash[]): Promise { + throw new Error('TXE Node method getTxByHash not implemented'); + } + /** * Gets the storage value at the given contract storage slot. *