diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index ffb4ed5bb9a2..21162a9630b9 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -125,6 +125,7 @@ export type EnvVar = | 'P2P_TRUSTED_PEERS' | 'P2P_PRIVATE_PEERS' | 'P2P_MAX_TX_POOL_SIZE' + | 'P2P_TX_POOL_OVERFLOW_FACTOR' | 'PEER_ID_PRIVATE_KEY' | 'PEER_ID_PRIVATE_KEY_PATH' | 'PROVER_AGENT_COUNT' diff --git a/yarn-project/foundation/src/config/index.ts b/yarn-project/foundation/src/config/index.ts index 131097c7c83f..ad3c4597ccf4 100644 --- a/yarn-project/foundation/src/config/index.ts +++ b/yarn-project/foundation/src/config/index.ts @@ -105,6 +105,18 @@ export function numberConfigHelper(defaultVal: number): Pick { + return { + parseEnv: (val: string) => safeParseFloat(val, defaultVal), + defaultValue: defaultVal, + }; +} + /** * Generates parseEnv and default values for a numerical config value. * @param defaultVal - The default numerical value to use if the environment variable is not set or is invalid @@ -171,6 +183,18 @@ function safeParseNumber(value: string, defaultValue: number): number { return Number.isSafeInteger(parsedValue) ? parsedValue : defaultValue; } +/** + * Safely parses a floating point number from a string. + * If the value is not a number, the default value is returned. + * @param value - The string value to parse + * @param defaultValue - The default value to return + * @returns Either parsed value or default value + */ +function safeParseFloat(value: string, defaultValue: number): number { + const parsedValue = parseFloat(value); + return Number.isNaN(parsedValue) ? defaultValue : parsedValue; +} + /** * Picks specific keys from the given configuration mappings. * diff --git a/yarn-project/p2p/src/client/p2p_client.ts b/yarn-project/p2p/src/client/p2p_client.ts index f81aa04ac4cb..6c3221fcd83a 100644 --- a/yarn-project/p2p/src/client/p2p_client.ts +++ b/yarn-project/p2p/src/client/p2p_client.ts @@ -111,11 +111,9 @@ export class P2PClient return this.synchedBlockHashes.getAsync(number); } - public async updateP2PConfig(config: Partial): Promise { - if (typeof config.maxTxPoolSize === 'number' && this.config.maxTxPoolSize !== config.maxTxPoolSize) { - await this.txPool.setMaxTxPoolSize(config.maxTxPoolSize); - this.config.maxTxPoolSize = config.maxTxPoolSize; - } + public updateP2PConfig(config: Partial): Promise { + this.txPool.updateConfig(config); + return Promise.resolve(); } public async getL2Tips(): Promise { diff --git a/yarn-project/p2p/src/config.ts b/yarn-project/p2p/src/config.ts index c04be290031b..cc955cd1bb79 100644 --- a/yarn-project/p2p/src/config.ts +++ b/yarn-project/p2p/src/config.ts @@ -1,6 +1,7 @@ import { type ConfigMappingsType, booleanConfigHelper, + floatConfigHelper, getConfigFromMappings, getDefaultConfig, numberConfigHelper, @@ -196,6 +197,11 @@ export interface P2PConfig extends P2PReqRespConfig, ChainConfig { * The maximum cumulative tx size (in bytes) of pending txs before evicting lower priority txs. */ maxTxPoolSize: number; + + /** + * If the pool is full, it will still accept a few more txs until it reached maxTxPoolOverspillFactor * maxTxPoolSize. Then it will evict + */ + txPoolOverflowFactor: number; } export const DEFAULT_P2P_PORT = 40400; @@ -393,6 +399,11 @@ export const p2pConfigMappings: ConfigMappingsType = { description: 'The maximum cumulative tx size of pending txs (in bytes) before evicting lower priority txs.', ...numberConfigHelper(100_000_000), // 100MB }, + txPoolOverflowFactor: { + env: 'P2P_TX_POOL_OVERFLOW_FACTOR', + description: 'How much the tx pool can overflow before it starts evicting txs. Must be greater than 1', + ...floatConfigHelper(1.1), // 10% overflow + }, ...p2pReqRespConfigMappings, ...chainConfigMappings, }; diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts index 5f12b5daaeb0..295bb96a340b 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts @@ -1,10 +1,13 @@ +import { timesAsync } from '@aztec/foundation/collection'; import { Fr } from '@aztec/foundation/fields'; +import { map, sort, toArray } from '@aztec/foundation/iterable'; import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; import { GasFees } from '@aztec/stdlib/gas'; import type { MerkleTreeReadOperations, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server'; import { mockTx } from '@aztec/stdlib/testing'; -import { MaxBlockNumber, Tx, type TxValidationResult } from '@aztec/stdlib/tx'; +import { MaxBlockNumber, Tx, TxHash, type TxValidationResult } from '@aztec/stdlib/tx'; +import { jest } from '@jest/globals'; import { type MockProxy, mock } from 'jest-mock-extended'; import { ArchiveCache, GasTxValidator } from '../../msg_validators/index.js'; @@ -15,6 +18,8 @@ describe('KV TX pool', () => { let txPool: TestAztecKVTxPool; let worldState: MockProxy; let db: MockProxy; + let nextTxSeed: number; + let mockTxSize: number; const checkPendingTxConsistency = async () => { const pendingTxHashCount = await txPool.getPendingTxHashes().then(h => h.length); @@ -22,6 +27,9 @@ describe('KV TX pool', () => { }; beforeEach(async () => { + nextTxSeed = 1; + mockTxSize = 100; + worldState = worldState = mock(); db = mock(); worldState.getCommitted.mockReturnValue(db); @@ -37,6 +45,12 @@ describe('KV TX pool', () => { describeTxPool(() => txPool); + const mockFixedSizeTx = async (maxPriorityFeesPerGas?: GasFees) => { + const tx = await mockTx(nextTxSeed++, { maxPriorityFeesPerGas }); + jest.spyOn(tx, 'getSize').mockReturnValue(mockTxSize); + return tx; + }; + it('Returns archived txs and purges archived txs once the archived tx limit is reached', async () => { // set the archived tx limit to 2 txPool = new TestAztecKVTxPool(await openTmpStore('p2p'), await openTmpStore('archive'), worldState, undefined, { @@ -140,6 +154,62 @@ describe('KV TX pool', () => { ]); }); + it('respects the overflow factor configured', async () => { + txPool = new TestAztecKVTxPool(await openTmpStore('p2p'), await openTmpStore('archive'), worldState, undefined, { + maxTxPoolSize: mockTxSize * 10, // pool should contain no more than 10 mock txs + txPoolOverflowFactor: 1.5, // but allow it to grow up to 15, but then when it evicts, it evicts until it's left to 10 + }); + + const cmp = (a: TxHash, b: TxHash) => (a.toBigInt() < b.toBigInt() ? -1 : a.toBigInt() > b.toBigInt() ? 1 : 0); + + const firstBatch = await timesAsync(10, () => mockFixedSizeTx()); + await txPool.addTxs(firstBatch); + + // we've just added 10 txs. They should all be availble + expect(await toArray(sort(await txPool.getPendingTxHashes(), cmp))).toEqual( + await toArray( + sort( + map(firstBatch, tx => tx.getTxHash()), + cmp, + ), + ), + ); + + const secondBatch = await timesAsync(2, () => mockFixedSizeTx()); + await txPool.addTxs(secondBatch); + + // we've added two more txs. At this point the pool contains more txs than the limit + // but it still hasn't evicted anything + expect(await toArray(sort(await txPool.getPendingTxHashes(), cmp))).toEqual( + await toArray( + sort( + map([...firstBatch, ...secondBatch], tx => tx.getTxHash()), + cmp, + ), + ), + ); + + const thirdBatch = await timesAsync(3, () => mockFixedSizeTx()); + await txPool.addTxs(thirdBatch); + + // add another 3 txs. The pool has reached the limit. All txs should be available still + // another txs would trigger evictions + expect(await toArray(sort(await txPool.getPendingTxHashes(), cmp))).toEqual( + await toArray( + sort( + map([...firstBatch, ...secondBatch, ...thirdBatch], tx => tx.getTxHash()), + cmp, + ), + ), + ); + + const lastTx = await mockFixedSizeTx(); + await txPool.addTxs([lastTx]); + + // the pool should evict enough txs to stay under the size limit + expect(await txPool.getPendingTxCount()).toBeLessThanOrEqual(10); + }); + it('Evicts txs with nullifiers that are already included in the mined block', async () => { const tx1 = await mockTx(1, { numberOfNonRevertiblePublicCallRequests: 1 }); const tx2 = await mockTx(2, { numberOfNonRevertiblePublicCallRequests: 1 }); diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts index d1e14d6027dd..d3b452bfc383 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts @@ -11,11 +11,13 @@ import { DatabasePublicStateSource } from '@aztec/stdlib/trees'; import { Tx, TxHash } from '@aztec/stdlib/tx'; import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client'; +import assert from 'assert'; + import { ArchiveCache } from '../../msg_validators/tx_validator/archive_cache.js'; import { GasTxValidator } from '../../msg_validators/tx_validator/gas_validator.js'; import { PoolInstrumentation, PoolName } from '../instrumentation.js'; import { getPendingTxPriority } from './priority.js'; -import type { TxPool } from './tx_pool.js'; +import type { TxPool, TxPoolOptions } from './tx_pool.js'; /** * KV implementation of the Transaction Pool. @@ -27,7 +29,10 @@ export class AztecKVTxPool implements TxPool { #txs: AztecAsyncMap; /** The maximum cumulative tx size that the pending txs in the pool take up. */ - #maxTxPoolSize: number | undefined; + #maxTxPoolSize: number = 0; + + /** The tx evicion logic will kick after pool size is greater than maxTxPoolSize * txPoolOverflowFactor */ + txPoolOverflowFactor: number = 1; /** Index from tx hash to the block number in which they were mined, filtered by mined txs. */ #minedTxHashToBlock: AztecAsyncMap; @@ -63,7 +68,7 @@ export class AztecKVTxPool implements TxPool { #archivedTxIndices: AztecAsyncMap; /** Number of txs to archive. */ - #archivedTxLimit: number; + #archivedTxLimit: number = 0; /** The world state synchronizer used in the node. */ #worldStateSynchronizer: WorldStateSynchronizer; @@ -85,12 +90,12 @@ export class AztecKVTxPool implements TxPool { archive: AztecAsyncKVStore, worldStateSynchronizer: WorldStateSynchronizer, telemetry: TelemetryClient = getTelemetryClient(), - config: { - maxTxPoolSize?: number; - archivedTxLimit?: number; - } = {}, + config: TxPoolOptions = {}, log = createLogger('p2p:tx_pool'), ) { + this.#log = log; + this.updateConfig(config); + this.#txs = store.openMap('txs'); this.#minedTxHashToBlock = store.openMap('txHashToBlockMined'); this.#pendingTxPriorityToHash = store.openMultiMap('pendingTxFeeToHash'); @@ -98,18 +103,16 @@ export class AztecKVTxPool implements TxPool { this.#pendingTxHashToHeaderHash = store.openMap('pendingTxHashToHeaderHash'); this.#pendingTxSize = store.openSingleton('pendingTxSize'); this.#pendingTxCount = store.openSingleton('pendingTxCount'); - this.#maxTxPoolSize = config.maxTxPoolSize; + this.#pendingTxs = new Map(); this.#nonEvictableTxs = new Set(); this.#archivedTxs = archive.openMap('archivedTxs'); this.#archivedTxIndices = archive.openMap('archivedTxIndices'); - this.#archivedTxLimit = config.archivedTxLimit ?? 0; this.#store = store; this.#archive = archive; this.#worldStateSynchronizer = worldStateSynchronizer; - this.#log = log; this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize()); } @@ -386,9 +389,28 @@ export class AztecKVTxPool implements TxPool { return vals.map(x => TxHash.fromString(x)); } - public setMaxTxPoolSize(maxSizeBytes: number | undefined): Promise { - this.#maxTxPoolSize = maxSizeBytes; - return Promise.resolve(); + public updateConfig({ maxTxPoolSize, txPoolOverflowFactor, archivedTxLimit }: TxPoolOptions): void { + if (typeof maxTxPoolSize === 'number') { + assert(maxTxPoolSize >= 0, 'maxTxPoolSize must be greater or equal to 0'); + this.#maxTxPoolSize = maxTxPoolSize; + + if (maxTxPoolSize === 0) { + this.#log.info(`Disabling maximum tx mempool size. Tx eviction stopped`); + } else { + this.#log.info(`Setting maximum tx mempool size`, { maxTxPoolSize }); + } + } + + if (typeof txPoolOverflowFactor === 'number') { + assert(txPoolOverflowFactor >= 1, 'txPoolOveflowFactor must be greater or equal to 1'); + this.txPoolOverflowFactor = txPoolOverflowFactor; + this.#log.info(`Allowing tx pool size to grow above limit`, { maxTxPoolSize, txPoolOverflowFactor }); + } + + if (typeof archivedTxLimit === 'number') { + assert(archivedTxLimit >= 0, 'archivedTxLimit must be greater or equal to 0'); + this.#archivedTxLimit = archivedTxLimit; + } } public markTxsAsNonEvictable(txHashes: TxHash[]): Promise { @@ -483,7 +505,7 @@ export class AztecKVTxPool implements TxPool { private async evictLowPriorityTxs( newTxHashes: TxHash[], ): Promise<{ numLowPriorityTxsEvicted: number; numNewTxsEvicted: number }> { - if (this.#maxTxPoolSize === undefined) { + if (this.#maxTxPoolSize === undefined || this.#maxTxPoolSize === 0) { return { numLowPriorityTxsEvicted: 0, numNewTxsEvicted: 0 }; } @@ -491,17 +513,22 @@ export class AztecKVTxPool implements TxPool { const txsToEvict: TxHash[] = []; let pendingTxsSize = (await this.#pendingTxSize.getAsync()) ?? 0; - if (pendingTxsSize > this.#maxTxPoolSize) { + if (pendingTxsSize > this.#maxTxPoolSize * this.txPoolOverflowFactor) { for await (const txHash of this.#pendingTxPriorityToHash.valuesAsync()) { if (this.#nonEvictableTxs.has(txHash.toString())) { continue; } - this.#log.verbose(`Evicting tx ${txHash} from pool due to low priority to satisfy max tx size limit`); - txsToEvict.push(TxHash.fromString(txHash)); - const txSize = (await this.#pendingTxHashToSize.getAsync(txHash.toString())) ?? (await this.getPendingTxByHash(txHash))?.getSize(); + + this.#log.verbose(`Evicting tx ${txHash} from pool due to low priority to satisfy max tx size limit`, { + txHash, + txSize, + }); + + txsToEvict.push(TxHash.fromString(txHash)); + if (txSize) { pendingTxsSize -= txSize; if (pendingTxsSize <= this.#maxTxPoolSize) { diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts index cff507cd51b1..660dc5872c3b 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts @@ -5,7 +5,7 @@ import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-clien import { PoolInstrumentation, PoolName } from '../instrumentation.js'; import { getPendingTxPriority } from './priority.js'; -import type { TxPool } from './tx_pool.js'; +import type { TxPool, TxPoolOptions } from './tx_pool.js'; /** * In-memory implementation of the Transaction Pool. @@ -190,9 +190,7 @@ export class InMemoryTxPool implements TxPool { return Promise.resolve(Array.from(this.txs.keys()).map(x => TxHash.fromBigInt(x))); } - setMaxTxPoolSize(_maxSizeBytes: number | undefined): Promise { - return Promise.resolve(); - } + updateConfig(_config: TxPoolOptions): void {} markTxsAsNonEvictable(_: TxHash[]): Promise { return Promise.resolve(); diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts index 72bc7daf8cfb..6a0ccb9f2aa0 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts @@ -1,5 +1,11 @@ import type { Tx, TxHash } from '@aztec/stdlib/tx'; +export type TxPoolOptions = { + maxTxPoolSize?: number; + txPoolOverflowFactor?: number; + archivedTxLimit?: number; +}; + /** * Interface of a transaction pool. The pool includes tx requests and is kept up-to-date by a P2P client. */ @@ -95,7 +101,7 @@ export interface TxPool { * Configure the maximum size of the tx pool * @param maxSizeBytes - The maximum size in bytes of the mempool. Set to undefined to disable it */ - setMaxTxPoolSize(maxSizeBytes: number | undefined): Promise; + updateConfig(config: TxPoolOptions): void; /** Returns whether the pool is empty. */ isEmpty(): Promise; diff --git a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts index 9bc130a934f1..86baa89bc708 100644 --- a/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts +++ b/yarn-project/p2p/src/testbench/p2p_client_testbench_worker.ts @@ -49,7 +49,7 @@ function mockTxPool(): TxPool { getTxStatus: () => Promise.resolve(TxStatus.PENDING), getTxsByHash: () => Promise.resolve([]), hasTxs: () => Promise.resolve([]), - setMaxTxPoolSize: () => Promise.resolve(), + updateConfig: () => {}, markTxsAsNonEvictable: () => Promise.resolve(), }; }