fix(l1): fix broadcast_pool race and offload tx pool insertion to background task#6253
Conversation
🤖 Kimi Code ReviewSecurity & Correctness Issues1. Race condition in transaction processing (lines 952-954, 973-977) Fix: Collect hashes after filtering in the background task, or use 2. Missing transaction deduplication check Performance & Resource Management3. Unbounded task spawning (line 957) Fix: Use a bounded channel or semaphore to limit concurrent transaction processing tasks. 4. Missing task cancellation on peer disconnect Fix: Pass an Code Quality & Best Practices5. Inefficient cloning (line 955) 6. Debug logging inconsistency 7. Missing error handling for spawn failure Ethereum-Specific Concerns8. Transaction validation timing The change appears to be a performance optimization rather than a consensus change, so this is acceptable. Automated review by Kimi (Moonshot AI) · custom prompt |
🤖 Codex Code ReviewFindings
Notes
If you want, I can check the broadcaster logic to verify whether the hash suppression path makes Item 1 a real bug or just a mild inefficiency. Next steps
Automated review by OpenAI Codex · custom prompt |
Greptile SummaryFixed critical race condition in transaction broadcaster that was causing transactions to be lost. The race occurred when Additionally, offloaded expensive transaction pool insertion (signature recovery and validation) to a background task to prevent blocking the ConnectionServer during large transaction batches.
Confidence Score: 4/5
|
| Filename | Overview |
|---|---|
| crates/blockchain/mempool.rs | Renamed clear_broadcasted_txs() to remove_broadcasted_txs() with targeted removal by hash list to fix race condition |
| crates/networking/p2p/tx_broadcaster.rs | Updated to use targeted remove_broadcasted_txs() instead of clearing entire pool, and removed a trace log statement |
| crates/networking/p2p/rlpx/connection/server.rs | Offloaded expensive tx pool insertion to background task with tokio::spawn to avoid blocking ConnectionServer during validation |
Sequence Diagram
sequenceDiagram
participant B as Broadcaster
participant M as Mempool
participant C as ConnectionServer
Note over B,M: Old behavior (race condition)
B->>M: get_txs_for_broadcast()
M-->>B: [tx1, tx2]
Note over C,M: Meanwhile...
C->>M: add_transaction(tx3)
M->>M: broadcast_pool.insert(tx3)
B->>M: clear_broadcasted_txs()
M->>M: broadcast_pool.clear() ❌
Note over M: tx3 lost! Never broadcast
Note over B,M: New behavior (fixed)
B->>M: get_txs_for_broadcast()
M-->>B: [tx1, tx2]
Note over C,M: Meanwhile...
C->>M: add_transaction(tx3)
M->>M: broadcast_pool.insert(tx3)
B->>M: remove_broadcasted_txs([tx1, tx2])
M->>M: remove only tx1, tx2 ✓
Note over M: tx3 preserved for next broadcast
Last reviewed commit: 1ed81cb
Lines of code reportTotal lines added: Detailed view |
🤖 Claude Code ReviewReview:
|
Addressing bot reviewsKimi findings
Codex findings
Claude reviewGood observations — agree with all of them. The TOCTOU in Update: broadcast_pool race condition fixInvestigation revealed the true root cause of the flaky
Any tx added between the read and the clear is lost forever — never broadcast but removed from the pool. With 2000 txs being inserted sequentially and the broadcaster firing every 5ms, this reliably drops transactions. Fix: Replaced |
…kground task Two fixes for the flaky hive `LargeTxRequest` devp2p eth test: 1. Fix broadcast_pool race condition: `clear_broadcasted_txs()` wiped the entire broadcast_pool, including txs added between the read (`get_txs_for_broadcast`) and the clear. With 2000 txs inserted one-by-one and the broadcaster firing every 5ms, this reliably dropped txs that were never broadcast. Replace with targeted `remove_broadcasted_txs()` that only removes the txs that were actually fetched and sent. 2. Offload incoming tx pool insertion to a background `tokio::spawn` task so the ConnectionServer isn't blocked during signature recovery and storage reads for large transaction batches.
3a6895d to
1ed81cb
Compare
🤖 Kimi Code ReviewReview SummaryOverall, this PR improves transaction handling by moving heavy validation work off the critical path, but introduces a few correctness and safety issues. Critical Issues
Performance & Safety Issues
Minor Issues
Suggested Fixes
// In handle_incoming_message
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let mut successful_hashes = Vec::new();
for tx in txs.transactions {
if blockchain.add_transaction_to_pool(tx.clone()).await.is_ok() {
successful_hashes.push(tx.hash());
}
}
let _ = tx.send(successful_hashes);
});
if let Ok(successful_hashes) = rx.await {
state.tx_broadcaster.cast(InMessage::AddTxs(successful_hashes, state.node.node_id())).await?;
}
let is_l2_mode = state.l2_state.is_supported(); // Clone this value
let blockchain = state.blockchain.clone();
let peer = state.node.to_string();
tokio::spawn(async move {
let is_l2_mode = is_l2_mode; // Use the cloned value
// ... rest of the code
});Automated review by Kimi (Moonshot AI) · custom prompt |
🤖 Codex Code ReviewReview Summary Overall direction looks good (decoupling expensive tx validation from the connection loop and only removing broadcasted hashes). I do see a couple of correctness/robustness risks worth addressing. Findings
Notes
If you want, I can scan the broadcaster and mempool internals to verify the dedupe semantics and whether the race in Item 2 is a real behavior issue. Automated review by OpenAI Codex · custom prompt |
🤖 Claude Code ReviewNow I have a thorough picture of the changes. Let me write the review. Review: fix(l1): fix broadcast_pool race and offload tx pool insertion to background taskSummaryBoth fixes are well-motivated and correctly implemented. The race condition in Fix 1 —
|
Benchmark Block Execution Results Comparison Against Main
|
ElFantasma
left a comment
There was a problem hiding this comment.
Just a comment on the JoinHandle. It LGTM
| debug!(peer=%state.node, "Rejecting transaction in L2 mode - {tx_type} transactions are not broadcasted in L2"); | ||
| continue; | ||
| } | ||
| tokio::spawn(async move { |
There was a problem hiding this comment.
The JoinHandle is dropped here, making this fire-and-forget. If add_transaction_to_pool panics (e.g., poisoned RwLock), the panic is caught by the tokio runtime but won't surface clearly. More importantly, on node shutdown, in-flight insertion tasks may be silently dropped.
Not necessarily a blocker since the task is bounded (processes a fixed set of txs and the errors are already debug-logged), but worth considering whether to at least log on task failure, e.g.:
tokio::spawn(async move {
// ... existing code ...
}.instrument(tracing::debug_span!("tx_pool_insert")));or store the handle in a JoinSet for clean shutdown.
…kground task (lambdaclass#6253) ## Summary - **Fix broadcast_pool race condition**: `clear_broadcasted_txs()` wiped the entire `broadcast_pool`, including txs added between the read (`get_txs_for_broadcast()`) and the clear. With 2000 txs being inserted one-by-one and the broadcaster firing every 5ms, this reliably dropped txs that were never announced to peers. Replaced with targeted `remove_broadcasted_txs()` that only removes the txs that were actually fetched and broadcast. - **Offload tx pool insertion to background task**: Spawn the incoming `Transactions` message pool-insertion loop as a background `tokio::spawn` task instead of running it synchronously in `handle_cast`. This prevents the ConnectionServer from being blocked during signature recovery + storage reads for large transaction batches. ## Motivation The hive `LargeTxRequest` devp2p eth test intermittently fails in CI ([example run](https://github.com/lambdaclass/ethrex/actions/runs/22335807040)). Root cause is a race condition in the transaction broadcaster: 1. `get_txs_for_broadcast()` reads the `broadcast_pool` (releases lock) 2. Meanwhile, incoming txs continue to be added to `broadcast_pool` 3. `clear_broadcasted_txs()` clears **all** of `broadcast_pool`, including txs added in step 2 Any tx added between steps 1 and 3 is lost forever — never broadcast but cleared from the pool. With 2000 txs being inserted sequentially and the broadcaster firing every 5ms, this race reliably drops transactions, causing the test's 2-second timeout to expire before all tx hashes are announced. ## Test plan - [x] `cargo check` (with and without `l2` feature) - [x] Local hive devp2p eth tests pass 3/3 runs: `./hive --sim devp2p --sim.limit "eth" --client ethrex` (20/20 each run) - [ ] CI hive daily run should show P2P Eth capability back to 20/20
Summary
clear_broadcasted_txs()wiped the entirebroadcast_pool, including txs added between the read (get_txs_for_broadcast()) and the clear. With 2000 txs being inserted one-by-one and the broadcaster firing every 5ms, this reliably dropped txs that were never announced to peers. Replaced with targetedremove_broadcasted_txs()that only removes the txs that were actually fetched and broadcast.Transactionsmessage pool-insertion loop as a backgroundtokio::spawntask instead of running it synchronously inhandle_cast. This prevents the ConnectionServer from being blocked during signature recovery + storage reads for large transaction batches.Motivation
The hive
LargeTxRequestdevp2p eth test intermittently fails in CI (example run). Root cause is a race condition in the transaction broadcaster:get_txs_for_broadcast()reads thebroadcast_pool(releases lock)broadcast_poolclear_broadcasted_txs()clears all ofbroadcast_pool, including txs added in step 2Any tx added between steps 1 and 3 is lost forever — never broadcast but cleared from the pool. With 2000 txs being inserted sequentially and the broadcaster firing every 5ms, this race reliably drops transactions, causing the test's 2-second timeout to expire before all tx hashes are announced.
Test plan
cargo check(with and withoutl2feature)./hive --sim devp2p --sim.limit "eth" --client ethrex(20/20 each run)