Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 132 additions & 121 deletions yarn-project/p2p/src/mem_pools/tx_pool_v2/tx_pool_v2_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ export class TxPoolV2Impl {
}
}
}

// Run post-add eviction rules for pending txs (inside transaction for atomicity)
if (acceptedPending.size > 0) {
const feePayers = Array.from(acceptedPending).map(txHash => this.#indices.getMetadata(txHash)!.feePayer);
const uniqueFeePayers = new Set<string>(feePayers);
await this.#evictionManager.evictAfterNewTxs(Array.from(acceptedPending), [...uniqueFeePayers]);
}
});

// Build final accepted list for pending txs (excludes intra-batch evictions)
Expand All @@ -249,13 +256,6 @@ export class TxPoolV2Impl {
this.#instrumentation.recordRejected(rejected.length);
}

// Run post-add eviction rules for pending txs
if (acceptedPending.size > 0) {
const feePayers = Array.from(acceptedPending).map(txHash => this.#indices.getMetadata(txHash)!.feePayer);
const uniqueFeePayers = new Set<string>(feePayers);
await this.#evictionManager.evictAfterNewTxs(Array.from(acceptedPending), [...uniqueFeePayers]);
}

return { accepted, ignored, rejected, ...(errors.size > 0 ? { errors } : {}) };
}

Expand Down Expand Up @@ -379,33 +379,35 @@ export class TxPoolV2Impl {
let softDeletedHits = 0;
let missingPreviouslyEvicted = 0;

for (const txHash of txHashes) {
const txHashStr = txHash.toString();

if (this.#indices.has(txHashStr)) {
// Update protection for existing tx
this.#indices.updateProtection(txHashStr, slotNumber);
} else if (this.#deletedPool.isSoftDeleted(txHashStr)) {
// Resurrect soft-deleted tx as protected
const buffer = await this.#txsDB.getAsync(txHashStr);
if (buffer) {
const tx = Tx.fromBuffer(buffer);
await this.#addTx(tx, { protected: slotNumber });
softDeletedHits++;
await this.#store.transactionAsync(async () => {
for (const txHash of txHashes) {
const txHashStr = txHash.toString();

if (this.#indices.has(txHashStr)) {
// Update protection for existing tx
this.#indices.updateProtection(txHashStr, slotNumber);
} else if (this.#deletedPool.isSoftDeleted(txHashStr)) {
// Resurrect soft-deleted tx as protected
const buffer = await this.#txsDB.getAsync(txHashStr);
if (buffer) {
const tx = Tx.fromBuffer(buffer);
await this.#addTx(tx, { protected: slotNumber });
softDeletedHits++;
} else {
// Data missing despite soft-delete flag — treat as truly missing
this.#indices.setProtection(txHashStr, slotNumber);
missing.push(txHash);
}
} else {
// Data missing despite soft-delete flag — treat as truly missing
// Truly missing — pre-record protection for tx we don't have yet
this.#indices.setProtection(txHashStr, slotNumber);
missing.push(txHash);
}
} else {
// Truly missing — pre-record protection for tx we don't have yet
this.#indices.setProtection(txHashStr, slotNumber);
missing.push(txHash);
if (this.#evictedTxHashes.has(txHashStr)) {
missingPreviouslyEvicted++;
if (this.#evictedTxHashes.has(txHashStr)) {
missingPreviouslyEvicted++;
}
}
}
}
});

// Record metrics
if (softDeletedHits > 0) {
Expand Down Expand Up @@ -466,56 +468,60 @@ export class TxPoolV2Impl {
}
}

// Step 4: Mark txs as mined (only those we have in the pool)
for (const meta of found) {
this.#indices.markAsMined(meta, blockId);
await this.#deletedPool.clearIfMinedHigher(meta.txHash, blockId.number);
}
await this.#store.transactionAsync(async () => {
// Step 4: Mark txs as mined (only those we have in the pool)
for (const meta of found) {
this.#indices.markAsMined(meta, blockId);
await this.#deletedPool.clearIfMinedHigher(meta.txHash, blockId.number);
}

// Step 5: Run eviction rules (remove pending txs with conflicting nullifiers/expired timestamps)
await this.#evictionManager.evictAfterNewBlock(block.header, nullifiers, feePayers);
// Step 5: Run post-event eviction rules (inside transaction for atomicity)
await this.#evictionManager.evictAfterNewBlock(block.header, nullifiers, feePayers);
});

this.#log.info(`Marked ${found.length} txs as mined in block ${blockId.number}`);
}

async prepareForSlot(slotNumber: SlotNumber): Promise<void> {
// Step 0: Clean up slot-deleted txs from previous slots
await this.#deletedPool.cleanupSlotDeleted(slotNumber);
await this.#store.transactionAsync(async () => {
// Step 0: Clean up slot-deleted txs from previous slots
await this.#deletedPool.cleanupSlotDeleted(slotNumber);

// Step 1: Find expired protected txs
const expiredProtected = this.#indices.findExpiredProtectedTxs(slotNumber);
// Step 1: Find expired protected txs
const expiredProtected = this.#indices.findExpiredProtectedTxs(slotNumber);

// Step 2: Clear protection for all expired entries (including those without metadata)
this.#indices.clearProtection(expiredProtected);
// Step 2: Clear protection for all expired entries (including those without metadata)
this.#indices.clearProtection(expiredProtected);

// Step 3: Filter to only txs that have metadata and are not mined
const txsToRestore = this.#indices.filterRestorable(expiredProtected);
if (txsToRestore.length === 0) {
this.#log.debug(`Preparing for slot ${slotNumber}, no txs to unprotect`);
return;
}
// Step 3: Filter to only txs that have metadata and are not mined
const txsToRestore = this.#indices.filterRestorable(expiredProtected);
if (txsToRestore.length === 0) {
this.#log.debug(`Preparing for slot ${slotNumber}, no txs to unprotect`);
return;
}

this.#log.info(`Preparing for slot ${slotNumber}: unprotecting ${txsToRestore.length} txs`);
this.#log.info(`Preparing for slot ${slotNumber}: unprotecting ${txsToRestore.length} txs`);

// Step 4: Validate for pending pool
const { valid, invalid } = await this.#revalidateMetadata(txsToRestore, 'during prepareForSlot');
// Step 4: Validate for pending pool
const { valid, invalid } = await this.#revalidateMetadata(txsToRestore, 'during prepareForSlot');

// Step 5: Resolve nullifier conflicts and add winners to pending indices
const { added, toEvict } = this.#applyNullifierConflictResolution(valid);
// Step 5: Resolve nullifier conflicts and add winners to pending indices
const { added, toEvict } = this.#applyNullifierConflictResolution(valid);

// Step 6: Delete invalid txs and evict conflict losers
await this.#deleteTxsBatch(invalid);
await this.#evictTxs(toEvict, 'NullifierConflict');
// Step 6: Delete invalid txs and evict conflict losers
await this.#deleteTxsBatch(invalid);
await this.#evictTxs(toEvict, 'NullifierConflict');

// Step 7: Run eviction rules (enforce pool size limit)
if (added.length > 0) {
const feePayers = added.map(meta => meta.feePayer);
const uniqueFeePayers = new Set<string>(feePayers);
await this.#evictionManager.evictAfterNewTxs(
added.map(m => m.txHash),
[...uniqueFeePayers],
);
}
// Step 7: Run eviction rules (enforce pool size limit)
if (added.length > 0) {
const feePayers = added.map(meta => meta.feePayer);
const uniqueFeePayers = new Set<string>(feePayers);
await this.#evictionManager.evictAfterNewTxs(
added.map(m => m.txHash),
[...uniqueFeePayers],
);
}
});
}

async handlePrunedBlocks(latestBlock: L2BlockId, options?: { deleteAllTxs?: boolean }): Promise<void> {
Expand All @@ -528,57 +534,60 @@ export class TxPoolV2Impl {

this.#log.info(`Handling prune to block ${latestBlock.number}: un-mining ${txsToUnmine.length} txs`);

// Step 2: Mark ALL un-mined txs with their original mined block number
// This ensures they get soft-deleted if removed later, and only hard-deleted
// when their original mined block is finalized
await this.#deletedPool.markFromPrunedBlock(
txsToUnmine.map(m => ({
txHash: m.txHash,
minedAtBlock: BlockNumber(m.minedL2BlockId!.number),
})),
);
await this.#store.transactionAsync(async () => {
// Step 2: Mark ALL un-mined txs with their original mined block number
// This ensures they get soft-deleted if removed later, and only hard-deleted
// when their original mined block is finalized
await this.#deletedPool.markFromPrunedBlock(
txsToUnmine.map(m => ({
txHash: m.txHash,
minedAtBlock: BlockNumber(m.minedL2BlockId!.number),
})),
);

// Step 3: Unmine - clear mined status from metadata
for (const meta of txsToUnmine) {
this.#indices.markAsUnmined(meta);
}
// Step 3: Unmine - clear mined status from metadata
for (const meta of txsToUnmine) {
this.#indices.markAsUnmined(meta);
}

// If deleteAllTxs is set (epoch prune), delete all un-mined txs and return early
if (options?.deleteAllTxs) {
const allTxHashes = txsToUnmine.map(m => m.txHash);
await this.#deleteTxsBatch(allTxHashes);
this.#log.info(
`Handled prune to block ${latestBlock.number} with deleteAllTxs: deleted ${allTxHashes.length} txs`,
);
return;
}
// If deleteAllTxs is set (epoch prune), delete all un-mined txs and return early
if (options?.deleteAllTxs) {
const allTxHashes = txsToUnmine.map(m => m.txHash);
await this.#deleteTxsBatch(allTxHashes);
this.#log.info(
`Handled prune to block ${latestBlock.number} with deleteAllTxs: deleted ${allTxHashes.length} txs`,
);
return;
}

// Step 4: Filter out protected txs (they'll be handled by prepareForSlot)
const unprotectedTxs = this.#indices.filterUnprotected(txsToUnmine);
// Step 4: Filter out protected txs (they'll be handled by prepareForSlot)
const unprotectedTxs = this.#indices.filterUnprotected(txsToUnmine);

// Step 5: Validate for pending pool
const { valid, invalid } = await this.#revalidateMetadata(unprotectedTxs, 'during handlePrunedBlocks');
// Step 5: Validate for pending pool
const { valid, invalid } = await this.#revalidateMetadata(unprotectedTxs, 'during handlePrunedBlocks');

// Step 6: Resolve nullifier conflicts and add winners to pending indices
const { toEvict } = this.#applyNullifierConflictResolution(valid);
// Step 6: Resolve nullifier conflicts and add winners to pending indices
const { toEvict } = this.#applyNullifierConflictResolution(valid);

// Step 7: Delete invalid txs and evict conflict losers
await this.#deleteTxsBatch(invalid);
await this.#evictTxs(toEvict, 'NullifierConflict');
// Step 7: Delete invalid txs and evict conflict losers
await this.#deleteTxsBatch(invalid);
await this.#evictTxs(toEvict, 'NullifierConflict');

this.#log.info(
`Handled prune to block ${latestBlock.number}: ${valid.length} txs restored to pending, ${invalid.length} invalid, ${toEvict.length} evicted due to nullifier conflicts`,
{ txHashesRestored: valid.map(m => m.txHash), txHashesInvalid: invalid, txHashesEvicted: toEvict },
);
this.#log.info(
`Handled prune to block ${latestBlock.number}: ${valid.length} txs restored to pending, ${invalid.length} invalid, ${toEvict.length} evicted due to nullifier conflicts`,
{ txHashesRestored: valid.map(m => m.txHash), txHashesInvalid: invalid, txHashesEvicted: toEvict },
);

// Step 8: Run eviction rules for ALL pending txs (not just restored ones)
// This handles cases like existing pending txs with invalid fee payer balances
await this.#evictionManager.evictAfterChainPrune(latestBlock.number);
// Step 8: Run eviction rules for ALL pending txs (not just restored ones)
// This handles cases like existing pending txs with invalid fee payer balances
await this.#evictionManager.evictAfterChainPrune(latestBlock.number);
});
}

async handleFailedExecution(txHashes: TxHash[]): Promise<void> {
// Delete failed txs
await this.#deleteTxsBatch(txHashes.map(h => h.toString()));
await this.#store.transactionAsync(async () => {
await this.#deleteTxsBatch(txHashes.map(h => h.toString()));
});

this.#log.info(`Deleted ${txHashes.length} failed txs`, { txHashes: txHashes.map(h => h.toString()) });
}
Expand All @@ -589,27 +598,29 @@ export class TxPoolV2Impl {
// Step 1: Find mined txs at or before finalized block
const minedTxsToFinalize = this.#indices.findTxsMinedAtOrBefore(blockNumber);

// Step 2: Collect mined txs for archiving (before deletion)
const txsToArchive: Tx[] = [];
if (this.#archive.isEnabled()) {
for (const txHashStr of minedTxsToFinalize) {
const buffer = await this.#txsDB.getAsync(txHashStr);
if (buffer) {
txsToArchive.push(Tx.fromBuffer(buffer));
await this.#store.transactionAsync(async () => {
// Step 2: Collect mined txs for archiving (before deletion)
const txsToArchive: Tx[] = [];
if (this.#archive.isEnabled()) {
for (const txHashStr of minedTxsToFinalize) {
const buffer = await this.#txsDB.getAsync(txHashStr);
if (buffer) {
txsToArchive.push(Tx.fromBuffer(buffer));
}
}
}
}

// Step 3: Delete mined txs from active pool
await this.#deleteTxsBatch(minedTxsToFinalize);
// Step 3: Delete mined txs from active pool
await this.#deleteTxsBatch(minedTxsToFinalize);

// Step 4: Finalize soft-deleted txs
await this.#deletedPool.finalizeBlock(blockNumber);
// Step 4: Finalize soft-deleted txs
await this.#deletedPool.finalizeBlock(blockNumber);

// Step 5: Archive mined txs
if (txsToArchive.length > 0) {
await this.#archive.archiveTxs(txsToArchive);
}
// Step 5: Archive mined txs
if (txsToArchive.length > 0) {
await this.#archive.archiveTxs(txsToArchive);
}
});

if (minedTxsToFinalize.length > 0) {
this.#log.info(`Finalized ${minedTxsToFinalize.length} mined txs from blocks up to ${blockNumber}`, {
Expand Down
Loading