Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export type EnvVar =
| 'P2P_TRUSTED_PEERS'
| 'P2P_PRIVATE_PEERS'
| 'P2P_MAX_TX_POOL_SIZE'
| 'P2P_TX_POOL_OVERFLOW_FACTOR'
| 'PEER_ID_PRIVATE_KEY'
| 'PEER_ID_PRIVATE_KEY_PATH'
| 'PROVER_AGENT_COUNT'
Expand Down
24 changes: 24 additions & 0 deletions yarn-project/foundation/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,18 @@ export function numberConfigHelper(defaultVal: number): Pick<ConfigMapping, 'par
};
}

/**
* Generates parseEnv and default values for a numerical config value.
* @param defaultVal - The default numerical value to use if the environment variable is not set or is invalid
* @returns Object with parseEnv and default values for a numerical config value
*/
export function floatConfigHelper(defaultVal: number): Pick<ConfigMapping, 'parseEnv' | 'defaultValue'> {
return {
parseEnv: (val: string) => safeParseFloat(val, defaultVal),
defaultValue: defaultVal,
};
}

/**
* Generates parseEnv and default values for a numerical config value.
* @param defaultVal - The default numerical value to use if the environment variable is not set or is invalid
Expand Down Expand Up @@ -171,6 +183,18 @@ function safeParseNumber(value: string, defaultValue: number): number {
return Number.isSafeInteger(parsedValue) ? parsedValue : defaultValue;
}

/**
* Safely parses a floating point number from a string.
* If the value is not a number, the default value is returned.
* @param value - The string value to parse
* @param defaultValue - The default value to return
* @returns Either parsed value or default value
*/
function safeParseFloat(value: string, defaultValue: number): number {
const parsedValue = parseFloat(value);
return Number.isNaN(parsedValue) ? defaultValue : parsedValue;
}

/**
* Picks specific keys from the given configuration mappings.
*
Expand Down
8 changes: 3 additions & 5 deletions yarn-project/p2p/src/client/p2p_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,9 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full>
return this.synchedBlockHashes.getAsync(number);
}

public async updateP2PConfig(config: Partial<P2PConfig>): Promise<void> {
if (typeof config.maxTxPoolSize === 'number' && this.config.maxTxPoolSize !== config.maxTxPoolSize) {
await this.txPool.setMaxTxPoolSize(config.maxTxPoolSize);
this.config.maxTxPoolSize = config.maxTxPoolSize;
}
public updateP2PConfig(config: Partial<P2PConfig>): Promise<void> {
this.txPool.updateConfig(config);
return Promise.resolve();
}

public async getL2Tips(): Promise<L2Tips> {
Expand Down
11 changes: 11 additions & 0 deletions yarn-project/p2p/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
type ConfigMappingsType,
booleanConfigHelper,
floatConfigHelper,
getConfigFromMappings,
getDefaultConfig,
numberConfigHelper,
Expand Down Expand Up @@ -196,6 +197,11 @@ export interface P2PConfig extends P2PReqRespConfig, ChainConfig {
* The maximum cumulative tx size (in bytes) of pending txs before evicting lower priority txs.
*/
maxTxPoolSize: number;

/**
* If the pool is full, it will still accept a few more txs until it reached maxTxPoolOverspillFactor * maxTxPoolSize. Then it will evict
*/
txPoolOverflowFactor: number;
}

export const DEFAULT_P2P_PORT = 40400;
Expand Down Expand Up @@ -393,6 +399,11 @@ export const p2pConfigMappings: ConfigMappingsType<P2PConfig> = {
description: 'The maximum cumulative tx size of pending txs (in bytes) before evicting lower priority txs.',
...numberConfigHelper(100_000_000), // 100MB
},
txPoolOverflowFactor: {
env: 'P2P_TX_POOL_OVERFLOW_FACTOR',
description: 'How much the tx pool can overflow before it starts evicting txs. Must be greater than 1',
...floatConfigHelper(1.1), // 10% overflow
},
...p2pReqRespConfigMappings,
...chainConfigMappings,
};
Expand Down
72 changes: 71 additions & 1 deletion yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { timesAsync } from '@aztec/foundation/collection';
import { Fr } from '@aztec/foundation/fields';
import { map, sort, toArray } from '@aztec/foundation/iterable';
import { openTmpStore } from '@aztec/kv-store/lmdb-v2';
import { GasFees } from '@aztec/stdlib/gas';
import type { MerkleTreeReadOperations, WorldStateSynchronizer } from '@aztec/stdlib/interfaces/server';
import { mockTx } from '@aztec/stdlib/testing';
import { MaxBlockNumber, Tx, type TxValidationResult } from '@aztec/stdlib/tx';
import { MaxBlockNumber, Tx, TxHash, type TxValidationResult } from '@aztec/stdlib/tx';

import { jest } from '@jest/globals';
import { type MockProxy, mock } from 'jest-mock-extended';

import { ArchiveCache, GasTxValidator } from '../../msg_validators/index.js';
Expand All @@ -15,13 +18,18 @@ describe('KV TX pool', () => {
let txPool: TestAztecKVTxPool;
let worldState: MockProxy<WorldStateSynchronizer>;
let db: MockProxy<MerkleTreeReadOperations>;
let nextTxSeed: number;
let mockTxSize: number;

const checkPendingTxConsistency = async () => {
const pendingTxHashCount = await txPool.getPendingTxHashes().then(h => h.length);
expect(await txPool.getPendingTxCount()).toEqual(pendingTxHashCount);
};

beforeEach(async () => {
nextTxSeed = 1;
mockTxSize = 100;

worldState = worldState = mock<WorldStateSynchronizer>();
db = mock<MerkleTreeReadOperations>();
worldState.getCommitted.mockReturnValue(db);
Expand All @@ -37,6 +45,12 @@ describe('KV TX pool', () => {

describeTxPool(() => txPool);

const mockFixedSizeTx = async (maxPriorityFeesPerGas?: GasFees) => {
const tx = await mockTx(nextTxSeed++, { maxPriorityFeesPerGas });
jest.spyOn(tx, 'getSize').mockReturnValue(mockTxSize);
return tx;
};

it('Returns archived txs and purges archived txs once the archived tx limit is reached', async () => {
// set the archived tx limit to 2
txPool = new TestAztecKVTxPool(await openTmpStore('p2p'), await openTmpStore('archive'), worldState, undefined, {
Expand Down Expand Up @@ -140,6 +154,62 @@ describe('KV TX pool', () => {
]);
});

it('respects the overflow factor configured', async () => {
txPool = new TestAztecKVTxPool(await openTmpStore('p2p'), await openTmpStore('archive'), worldState, undefined, {
maxTxPoolSize: mockTxSize * 10, // pool should contain no more than 10 mock txs
txPoolOverflowFactor: 1.5, // but allow it to grow up to 15, but then when it evicts, it evicts until it's left to 10
});

const cmp = (a: TxHash, b: TxHash) => (a.toBigInt() < b.toBigInt() ? -1 : a.toBigInt() > b.toBigInt() ? 1 : 0);

const firstBatch = await timesAsync(10, () => mockFixedSizeTx());
await txPool.addTxs(firstBatch);

// we've just added 10 txs. They should all be availble
expect(await toArray(sort(await txPool.getPendingTxHashes(), cmp))).toEqual(
await toArray(
sort(
map(firstBatch, tx => tx.getTxHash()),
cmp,
),
),
);

const secondBatch = await timesAsync(2, () => mockFixedSizeTx());
await txPool.addTxs(secondBatch);

// we've added two more txs. At this point the pool contains more txs than the limit
// but it still hasn't evicted anything
expect(await toArray(sort(await txPool.getPendingTxHashes(), cmp))).toEqual(
await toArray(
sort(
map([...firstBatch, ...secondBatch], tx => tx.getTxHash()),
cmp,
),
),
);

const thirdBatch = await timesAsync(3, () => mockFixedSizeTx());
await txPool.addTxs(thirdBatch);

// add another 3 txs. The pool has reached the limit. All txs should be available still
// another txs would trigger evictions
expect(await toArray(sort(await txPool.getPendingTxHashes(), cmp))).toEqual(
await toArray(
sort(
map([...firstBatch, ...secondBatch, ...thirdBatch], tx => tx.getTxHash()),
cmp,
),
),
);

const lastTx = await mockFixedSizeTx();
await txPool.addTxs([lastTx]);

// the pool should evict enough txs to stay under the size limit
expect(await txPool.getPendingTxCount()).toBeLessThanOrEqual(10);
});

it('Evicts txs with nullifiers that are already included in the mined block', async () => {
const tx1 = await mockTx(1, { numberOfNonRevertiblePublicCallRequests: 1 });
const tx2 = await mockTx(2, { numberOfNonRevertiblePublicCallRequests: 1 });
Expand Down
63 changes: 45 additions & 18 deletions yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import { DatabasePublicStateSource } from '@aztec/stdlib/trees';
import { Tx, TxHash } from '@aztec/stdlib/tx';
import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-client';

import assert from 'assert';

import { ArchiveCache } from '../../msg_validators/tx_validator/archive_cache.js';
import { GasTxValidator } from '../../msg_validators/tx_validator/gas_validator.js';
import { PoolInstrumentation, PoolName } from '../instrumentation.js';
import { getPendingTxPriority } from './priority.js';
import type { TxPool } from './tx_pool.js';
import type { TxPool, TxPoolOptions } from './tx_pool.js';

/**
* KV implementation of the Transaction Pool.
Expand All @@ -27,7 +29,10 @@ export class AztecKVTxPool implements TxPool {
#txs: AztecAsyncMap<string, Buffer>;

/** The maximum cumulative tx size that the pending txs in the pool take up. */
#maxTxPoolSize: number | undefined;
#maxTxPoolSize: number = 0;

/** The tx evicion logic will kick after pool size is greater than maxTxPoolSize * txPoolOverflowFactor */
txPoolOverflowFactor: number = 1;

/** Index from tx hash to the block number in which they were mined, filtered by mined txs. */
#minedTxHashToBlock: AztecAsyncMap<string, number>;
Expand Down Expand Up @@ -63,7 +68,7 @@ export class AztecKVTxPool implements TxPool {
#archivedTxIndices: AztecAsyncMap<number, string>;

/** Number of txs to archive. */
#archivedTxLimit: number;
#archivedTxLimit: number = 0;

/** The world state synchronizer used in the node. */
#worldStateSynchronizer: WorldStateSynchronizer;
Expand All @@ -85,31 +90,29 @@ export class AztecKVTxPool implements TxPool {
archive: AztecAsyncKVStore,
worldStateSynchronizer: WorldStateSynchronizer,
telemetry: TelemetryClient = getTelemetryClient(),
config: {
maxTxPoolSize?: number;
archivedTxLimit?: number;
} = {},
config: TxPoolOptions = {},
log = createLogger('p2p:tx_pool'),
) {
this.#log = log;
this.updateConfig(config);

this.#txs = store.openMap('txs');
this.#minedTxHashToBlock = store.openMap('txHashToBlockMined');
this.#pendingTxPriorityToHash = store.openMultiMap('pendingTxFeeToHash');
this.#pendingTxHashToSize = store.openMap('pendingTxHashToSize');
this.#pendingTxHashToHeaderHash = store.openMap('pendingTxHashToHeaderHash');
this.#pendingTxSize = store.openSingleton('pendingTxSize');
this.#pendingTxCount = store.openSingleton('pendingTxCount');
this.#maxTxPoolSize = config.maxTxPoolSize;

this.#pendingTxs = new Map<string, Tx>();
this.#nonEvictableTxs = new Set<string>();

this.#archivedTxs = archive.openMap('archivedTxs');
this.#archivedTxIndices = archive.openMap('archivedTxIndices');
this.#archivedTxLimit = config.archivedTxLimit ?? 0;

this.#store = store;
this.#archive = archive;
this.#worldStateSynchronizer = worldStateSynchronizer;
this.#log = log;
this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize());
}

Expand Down Expand Up @@ -386,9 +389,28 @@ export class AztecKVTxPool implements TxPool {
return vals.map(x => TxHash.fromString(x));
}

public setMaxTxPoolSize(maxSizeBytes: number | undefined): Promise<void> {
this.#maxTxPoolSize = maxSizeBytes;
return Promise.resolve();
public updateConfig({ maxTxPoolSize, txPoolOverflowFactor, archivedTxLimit }: TxPoolOptions): void {
if (typeof maxTxPoolSize === 'number') {
assert(maxTxPoolSize >= 0, 'maxTxPoolSize must be greater or equal to 0');
this.#maxTxPoolSize = maxTxPoolSize;

if (maxTxPoolSize === 0) {
this.#log.info(`Disabling maximum tx mempool size. Tx eviction stopped`);
} else {
this.#log.info(`Setting maximum tx mempool size`, { maxTxPoolSize });
}
}

if (typeof txPoolOverflowFactor === 'number') {
assert(txPoolOverflowFactor >= 1, 'txPoolOveflowFactor must be greater or equal to 1');
this.txPoolOverflowFactor = txPoolOverflowFactor;
this.#log.info(`Allowing tx pool size to grow above limit`, { maxTxPoolSize, txPoolOverflowFactor });
}

if (typeof archivedTxLimit === 'number') {
assert(archivedTxLimit >= 0, 'archivedTxLimit must be greater or equal to 0');
this.#archivedTxLimit = archivedTxLimit;
}
}

public markTxsAsNonEvictable(txHashes: TxHash[]): Promise<void> {
Expand Down Expand Up @@ -483,25 +505,30 @@ export class AztecKVTxPool implements TxPool {
private async evictLowPriorityTxs(
newTxHashes: TxHash[],
): Promise<{ numLowPriorityTxsEvicted: number; numNewTxsEvicted: number }> {
if (this.#maxTxPoolSize === undefined) {
if (this.#maxTxPoolSize === undefined || this.#maxTxPoolSize === 0) {
return { numLowPriorityTxsEvicted: 0, numNewTxsEvicted: 0 };
}

let numNewTxsEvicted = 0;
const txsToEvict: TxHash[] = [];

let pendingTxsSize = (await this.#pendingTxSize.getAsync()) ?? 0;
if (pendingTxsSize > this.#maxTxPoolSize) {
if (pendingTxsSize > this.#maxTxPoolSize * this.txPoolOverflowFactor) {
for await (const txHash of this.#pendingTxPriorityToHash.valuesAsync()) {
if (this.#nonEvictableTxs.has(txHash.toString())) {
continue;
}
this.#log.verbose(`Evicting tx ${txHash} from pool due to low priority to satisfy max tx size limit`);
txsToEvict.push(TxHash.fromString(txHash));

const txSize =
(await this.#pendingTxHashToSize.getAsync(txHash.toString())) ??
(await this.getPendingTxByHash(txHash))?.getSize();

this.#log.verbose(`Evicting tx ${txHash} from pool due to low priority to satisfy max tx size limit`, {
txHash,
txSize,
});

txsToEvict.push(TxHash.fromString(txHash));

if (txSize) {
pendingTxsSize -= txSize;
if (pendingTxsSize <= this.#maxTxPoolSize) {
Expand Down
6 changes: 2 additions & 4 deletions yarn-project/p2p/src/mem_pools/tx_pool/memory_tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { type TelemetryClient, getTelemetryClient } from '@aztec/telemetry-clien

import { PoolInstrumentation, PoolName } from '../instrumentation.js';
import { getPendingTxPriority } from './priority.js';
import type { TxPool } from './tx_pool.js';
import type { TxPool, TxPoolOptions } from './tx_pool.js';

/**
* In-memory implementation of the Transaction Pool.
Expand Down Expand Up @@ -190,9 +190,7 @@ export class InMemoryTxPool implements TxPool {
return Promise.resolve(Array.from(this.txs.keys()).map(x => TxHash.fromBigInt(x)));
}

setMaxTxPoolSize(_maxSizeBytes: number | undefined): Promise<void> {
return Promise.resolve();
}
updateConfig(_config: TxPoolOptions): void {}

markTxsAsNonEvictable(_: TxHash[]): Promise<void> {
return Promise.resolve();
Expand Down
8 changes: 7 additions & 1 deletion yarn-project/p2p/src/mem_pools/tx_pool/tx_pool.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import type { Tx, TxHash } from '@aztec/stdlib/tx';

export type TxPoolOptions = {
maxTxPoolSize?: number;
txPoolOverflowFactor?: number;
archivedTxLimit?: number;
};

/**
* Interface of a transaction pool. The pool includes tx requests and is kept up-to-date by a P2P client.
*/
Expand Down Expand Up @@ -95,7 +101,7 @@ export interface TxPool {
* Configure the maximum size of the tx pool
* @param maxSizeBytes - The maximum size in bytes of the mempool. Set to undefined to disable it
*/
setMaxTxPoolSize(maxSizeBytes: number | undefined): Promise<void>;
updateConfig(config: TxPoolOptions): void;

/** Returns whether the pool is empty. */
isEmpty(): Promise<boolean>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ function mockTxPool(): TxPool {
getTxStatus: () => Promise.resolve(TxStatus.PENDING),
getTxsByHash: () => Promise.resolve([]),
hasTxs: () => Promise.resolve([]),
setMaxTxPoolSize: () => Promise.resolve(),
updateConfig: () => {},
markTxsAsNonEvictable: () => Promise.resolve(),
};
}
Expand Down