feat(p2p): Integrate TxPoolV2 across codebase#20172
feat(p2p): Integrate TxPoolV2 across codebase#20172alexghr merged 15 commits intomerge-train/spartanfrom
Conversation
TxPoolV2Impl Refactoring PlanSummaryRefactor Codebase LearningsArchitecture OverviewThe transaction pool consists of two main classes:
Transaction State MachineTransactions move through states:
State is derived from:
In-Memory Index StructureFive private maps manage pool state: #metadata: Map<string, TxMetaData> // Primary store: txHash → metadata
#nullifierToTxHash: Map<string, string> // Conflict detection (pending only)
#feePayerToTxHashes: Map<string, Set<string>> // Balance-based eviction (pending only)
#pendingByPriority: Map<bigint, Set<string>> // Priority ordering (pending only)
#protectedTransactions: Map<string, SlotNumber> // Protection trackingKey invariant: Only pending txs appear in nullifier/feePayer/priority indices. Eviction SystemTwo types of eviction rules:
Persistence
Key Patterns
1. Extract Index Management to
|
| Method | Lines | Action |
|---|---|---|
#isDuplicateTx(txHashStr) |
1041-1043 | Inline: just this.#indices.has(txHashStr) |
#partitionByMinedStatus(txs) |
953-969 | Inline: simple partition loop |
#populateMinedIndices(metas) |
990-994 | Inline: trivial iteration |
#unmineTxs(txs) |
816-821 | Inline: just meta.minedL2BlockId = undefined in caller |
#persistTx(txHashStr, tx) |
878-881 | Keep: provides semantic clarity |
5. Consolidate Add Transaction Methods
Current methods (tx_pool_v2_impl.ts):
#addNewPendingTx(lines 1046-1059)#addNewProtectedTx(lines 884-895)#addNewMinedTx(lines 898-909)
All share: build metadata → persist tx → add to indices → log.
Refactor: Single #addTx with state parameter:
async #addTx(tx: Tx, state: 'pending' | { protected: SlotNumber } | { mined: L2BlockId }): Promise<TxMetaData> {
const txHashStr = tx.getTxHash().toString();
const meta = await buildTxMetaData(tx);
await this.#txsDB.set(txHashStr, tx.toBuffer());
if (state === 'pending') {
this.#indices.addPending(meta);
} else if ('protected' in state) {
this.#indices.addProtected(meta, state.protected);
} else {
meta.minedL2BlockId = state.mined;
this.#indices.addMined(meta, state.mined);
}
this.#log.verbose(`Added ${typeof state === 'string' ? state : Object.keys(state)[0]} tx ${txHashStr}`);
return meta;
}6. Bug Fixes
Bug 1: handleMinedBlock emits wrong txHashes (line 408)
Current: Emits all txHashes from block body
Fix: Only emit hashes that were actually in pool
// Change line 408 from:
this.#callbacks.onTxsRemoved(txHashes.map(h => h.toBigInt()));
// To:
this.#callbacks.onTxsRemoved(found.map(m => BigInt('0x' + m.txHash)));Bug 2: Non-deterministic nullifier conflict resolution (checkNullifierConflict in tx_metadata.ts:148)
Current: Only compares priorityFee, not txHash tiebreaker
Risk: Equal-fee conflicts resolved by Map iteration order (non-deterministic)
Fix: Use comparePriority for consistent ordering
// Change line 148 from:
if (incomingMeta.priorityFee > conflictingMeta.priorityFee) {
// To:
if (comparePriority(incomingMeta, conflictingMeta) > 0) {7. Files to Create/Modify
| File | Action |
|---|---|
tx_pool_indices.ts |
Create: New class for index management |
tx_pool_v2_impl.ts |
Modify: Extract index mgmt, consolidate duplicates, inline trivial methods |
tx_metadata.ts |
Modify: Fix checkNullifierConflict to use comparePriority |
8. Verification
- Build:
yarn buildfrom yarn-project root - Unit tests:
yarn workspace @aztec/p2p test src/mem_pools/tx_pool_v2/ - Compat tests:
yarn workspace @aztec/p2p test src/mem_pools/tx_pool_v2/tx_pool_v2.compat.test.ts - Benchmark:
yarn workspace @aztec/p2p test src/mem_pools/tx_pool_v2/tx_pool_v2_bench.test.ts
Expected Results
- TxPoolV2Impl: Reduced from ~1265 lines to ~800 lines
- TxPoolIndices: New ~350 line class
- Duplicate code: Eliminated validation duplication
- Readability: Clearer separation of concerns between business logic and data structure management
- Bug fixes: 2 correctness issues resolved
Migrates all consumers from TxPool to TxPoolV2, the new event-driven
transaction pool implementation.
## Key API Changes
- `addTxs` → `addPendingTxs` (returns AddTxsResult with accepted/ignored/rejected)
- `markAsMined` → `handleMinedBlock` (takes full L2Block)
- `markMinedAsPending` → `handlePrunedBlocks` (takes L2BlockId)
- `markTxsAsNonEvictable/clearNonEvictableTxs` → `protectTxs/prepareForSlot`
- `deleteTxs` → `handleFailedExecution` or `handleFinalizedBlock`
- New lifecycle: `start()` must be called before use
## Integration Points
### P2P Client (p2p_client.ts)
- Block stream handlers now use pool event methods:
- `handleLatestL2Blocks` → `handleMinedBlock` per block
- `handleFinalizedL2Blocks` → `handleFinalizedBlock` per block
- `handlePruneL2Blocks` → `handlePrunedBlocks` with L2BlockId
- `markTxsAsNonEvictable` now requires BlockHeader for slot-based protection
- `getTxStatus` maps 'protected' → 'pending' for external API
- `getTxs('all')` combines pending + mined (no getAllTxs in V2)
- Pool started/stopped with client lifecycle
### Factory (factory.ts)
- Creates AggregateTxValidator for pending tx validation
- Instantiates AztecKVTxPoolV2 with dependencies:
- l2BlockSource (archiver)
- worldStateSynchronizer
- pendingTxValidator
### Libp2p Service (libp2p_service.ts)
- Block proposal handler: `protectTxs(txHashes, block.blockHeader)`
- Checkpoint proposal handler: `protectTxs(txHashes, checkpoint.lastBlock.blockHeader)`
### Services
- TxProvider: `addPendingTxs` for proposal txs
- TxCollectionSink: `addPendingTxs` for gossip txs
- BlockTxsHandler: Type change only (query methods unchanged)
### Sequencer (sequencer.ts)
- TODO added for `prepareForSlot` at slot boundaries
## TODOs for Follow-up
- Refactor validator creation into TxValidatorFactory
- Wire `prepareForSlot` calls at slot boundaries
- Add context on expected tx state when adding txs
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
056f801 to
f0fc884
Compare
| txPool: | ||
| deps.txPool ?? | ||
| new AztecKVTxPool(store, archive, worldStateSynchronizer, telemetry, { | ||
| // TODO(pw/tx-pool): Refactor into a TxValidatorFactory that can be called whenever we need a validator for a block |
There was a problem hiding this comment.
- Implement the factory taking a block as input
- Define exactly which validations we need here
| @@ -359,58 +341,6 @@ describe('P2P Client', () => { | |||
| finalized: { block: { number: BlockNumber(50), hash: expect.any(String) }, checkpoint: anyCheckpoint }, | |||
| }); | |||
| }); | |||
|
|
|||
| it('deletes txs created from a pruned block', async () => { | |||
There was a problem hiding this comment.
Double-check these tests are captured in the tx pool v2 unit tests
| @@ -401,7 +402,7 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full> | |||
|
|
|||
| const txs = txBatches.flat(); | |||
| if (txs.length > 0) { | |||
| await this.txPool.addTxs(txs); | |||
| await this.txPool.addPendingTxs(txs); | |||
There was a problem hiding this comment.
Review if this parent method is used at all and try to delete it
| @@ -444,8 +445,10 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full> | |||
| let txHashes: TxHash[]; | |||
|
|
|||
| if (filter === 'all') { | |||
There was a problem hiding this comment.
Is this filter used?
| await this.txPool.deleteTxs(txHashes, { permanently: true }); | ||
| await this.txPool.cleanupDeletedMinedTxs(lastBlockNum); | ||
| for (const block of blocks) { | ||
| await this.txPool.handleFinalizedBlock(block.header); |
There was a problem hiding this comment.
Call with the last one, do not iterate
|
|
||
| // TODO(pw/tx-pool): Figure out when to call! |
| @@ -607,7 +611,7 @@ export class P2PClient<T extends P2PClientType = P2PClientType.Full> | |||
| **/ | |||
| public async deleteTxs(txHashes: TxHash[]): Promise<void> { | |||
| this.#assertIsReady(); | |||
| await this.txPool.deleteTxs(txHashes); | |||
| await this.txPool.handleFailedExecution(txHashes); | |||
There was a problem hiding this comment.
No. We should expose handleFailedExecution.
| const txHash = tx.getTxHash(); | ||
| txsToDelete.set(txHash.toString(), txHash); | ||
| } | ||
| const header = await this.l2BlockSource.getBlockHeader(latestBlock); |
There was a problem hiding this comment.
Don't we get the block hash from the blockstream, so we don't need to go back to the archiver here?
| public markTxsAsNonEvictable(txHashes: TxHash[]): Promise<void> { | ||
| return this.txPool.markTxsAsNonEvictable(txHashes); | ||
| public async markTxsAsNonEvictable(txHashes: TxHash[], blockHeader: BlockHeader): Promise<TxHash[]> { | ||
| return this.txPool.protectTxs(txHashes, blockHeader); |
There was a problem hiding this comment.
Use the same function name in p2p client as in the tx pool, so then we can move the tx pool out of the p2p client
| @@ -114,7 +114,8 @@ export class TxCollectionSink extends (EventEmitter as new () => TypedEventEmitt | |||
|
|
|||
| // Add the txs to the tx pool (should not fail, but we catch it just in case) | |||
| try { | |||
| await this.txPool.addTxs(txs, { source: `tx-collection` }); | |||
| // TODO(pw/tx-pool): Add context on the expected state on this tx | |||
There was a problem hiding this comment.
Implement this TODO
| @@ -227,6 +227,7 @@ export class TxProvider implements ITxProvider { | |||
| return; | |||
| } | |||
| await this.txValidator.validate(txs); | |||
| await this.txPool.addTxs(txs); | |||
| // TODO(pw/tx-pool): Add context on the expected state on this tx | |||
| @@ -376,6 +376,7 @@ export class Sequencer extends (EventEmitter as new () => TypedEventEmitter<Sequ | |||
| } | |||
|
|
|||
| this.lastSlotForCheckpointProposalJob = slot; | |||
| // TODO(pw/tx-pool): Call txPool.prepareForSlot(slotNumber) here when transitioning to a new slot | |||
There was a problem hiding this comment.
Implement as well
…pool-v2-integration
…pool-v2-integration
…aztec-packages into pw/tx-pool-v2-integration
Flakey Tests🤖 says: This CI run detected 2 tests that failed, but were tolerated due to a .test_patterns.yml entry. |
| await this.markTxsAsMinedFromBlocks(blocks); | ||
| await this.txPool.clearNonEvictableTxs(); | ||
| await this.handleMinedBlocks(blocks); | ||
| await this.maybeCallPrepareForSlot(); |
| // Ensure we always start with source 0 so we can test the fallback to source 1 | ||
| jest.spyOn(Math, 'random').mockReturnValue(0); |
BEGIN_COMMIT_OVERRIDE chore(ci3): add optional local cache for bootstrap artifacts (#20305) fix: Fix p2p integration test (#20331) chore: reduce fee log severity (#20336) feat: restrict response sizes to expected sizes (#20287) feat: retry web3signer connection (#20342) feat(p2p): Integrate TxPoolV2 across codebase (#20172) feat: review and optimize Claude configuration, agents, and skills (#20270) fix(prover): handle cross-chain messages when proving mbps (#20354) chore: retry flakes. if retry pass, is a flake as we know it now. fail both is hard fail (#19322) chore(p2p): add mock reqresp layer for tests (#20370) fix: (A-370) don't propagate on tx mempool add failure (#20374) chore: Skip the HA test (#20376) feat: Retain pruned transactions until pruned block is finalised (#20237) END_COMMIT_OVERRIDE
Summary
Migrates all consumers from
TxPooltoTxPoolV2, the new event-driven transaction pool implementation.Key API Changes
addTxsaddPendingTxsAddTxsResultwith accepted/ignored/rejectedmarkAsMinedhandleMinedBlockL2BlockmarkMinedAsPendinghandlePrunedBlocksL2BlockIdmarkTxsAsNonEvictableprotectTxsBlockHeaderfor slot-based protectionclearNonEvictableTxsprepareForSlotdeleteTxshandleFailedExecution/handleFinalizedBlockstart()Integration Points
P2P Client (
p2p_client.ts)handleLatestL2Blocks→handleMinedBlockper blockhandleFinalizedL2Blocks→handleFinalizedBlockper blockhandlePruneL2Blocks→handlePrunedBlockswithL2BlockIdmarkTxsAsNonEvictablenow requiresBlockHeaderfor slot-based protectiongetTxStatusmaps'protected'→'pending'for external API compatibilitygetTxs('all')combines pending + mined hashes (nogetAllTxsin V2)Factory (
factory.ts)AggregateTxValidatorfor pending tx validation (without proof verification)AztecKVTxPoolV2with dependencies:l2BlockSource(archiver)worldStateSynchronizerpendingTxValidatorLibp2p Service (
libp2p_service.ts)protectTxs(txHashes, block.blockHeader)protectTxs(txHashes, checkpoint.lastBlock.blockHeader)Services
addPendingTxsfor proposal txsaddPendingTxsfor gossip txsSequencer (
sequencer.ts)prepareForSlotat slot boundariesTODOs for Follow-up
TODO(pw/tx-pool): Refactor validator creation intoTxValidatorFactoryTODO(pw/tx-pool): WireprepareForSlotcalls at slot boundariesTODO(pw/tx-pool): Add context on expected tx state when adding txs🤖 Generated with Claude Code