Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions crates/blockchain/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,11 @@ impl Mempool {
Ok(txs)
}

pub fn clear_broadcasted_txs(&self) -> Result<(), StoreError> {
self.write()?.broadcast_pool.clear();
pub fn remove_broadcasted_txs(&self, hashes: &[H256]) -> Result<(), StoreError> {
let mut inner = self.write()?;
for hash in hashes {
inner.broadcast_pool.remove(hash);
}
Ok(())
}

Expand Down
51 changes: 29 additions & 22 deletions crates/networking/p2p/rlpx/connection/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,34 +945,41 @@ async fn handle_incoming_message(
Message::Transactions(txs) if peer_supports_eth => {
// https://github.com/ethereum/devp2p/blob/master/caps/eth.md#transactions-0x02
if state.blockchain.is_synced() {
let tx_hashes: Vec<_> = txs.transactions.iter().map(|tx| tx.hash()).collect();

// Offload pool insertion to a background task so we don't block
// the ConnectionServer (validation + signature recovery are expensive).
let blockchain = state.blockchain.clone();
let peer = state.node.to_string();
#[cfg(feature = "l2")]
let is_l2_mode = state.l2_state.is_supported();
for tx in &txs.transactions {
// Reject blob transactions in L2 mode
#[cfg(feature = "l2")]
if (is_l2_mode && matches!(tx, Transaction::EIP4844Transaction(_)))
|| tx.is_privileged()
{
let tx_type = tx.tx_type();
debug!(peer=%state.node, "Rejecting transaction in L2 mode - {tx_type} transactions are not broadcasted in L2");
continue;
}
tokio::spawn(async move {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JoinHandle is dropped here, making this fire-and-forget. If add_transaction_to_pool panics (e.g., poisoned RwLock), the panic is caught by the tokio runtime but won't surface clearly. More importantly, on node shutdown, in-flight insertion tasks may be silently dropped.

Not necessarily a blocker since the task is bounded (processes a fixed set of txs and the errors are already debug-logged), but worth considering whether to at least log on task failure, e.g.:

tokio::spawn(async move {
    // ... existing code ...
}.instrument(tracing::debug_span!("tx_pool_insert")));

or store the handle in a JoinSet for clean shutdown.

for tx in txs.transactions {
#[cfg(feature = "l2")]
if (is_l2_mode && matches!(tx, Transaction::EIP4844Transaction(_)))
|| tx.is_privileged()
{
let tx_type = tx.tx_type();
debug!(peer=%peer, "Rejecting transaction in L2 mode - {tx_type} transactions are not broadcasted in L2");
continue;
}

if let Err(e) = state.blockchain.add_transaction_to_pool(tx.clone()).await {
debug!(
peer=%state.node,
error=%e,
"Error adding transaction"
);
continue;
if let Err(e) = blockchain.add_transaction_to_pool(tx).await {
debug!(
peer=%peer,
error=%e,
"Error adding transaction"
);
}
}
}
});

// Notify the broadcaster immediately — it only tracks hashes
// to avoid re-broadcasting to the sender. The actual broadcast
// happens on a periodic timer that queries the mempool directly.
state
.tx_broadcaster
.cast(InMessage::AddTxs(
txs.transactions.iter().map(|tx| tx.hash()).collect(),
state.node.node_id(),
))
.cast(InMessage::AddTxs(tx_hashes, state.node.node_id()))
.await
.map_err(|e| PeerConnectionError::BroadcastError(e.to_string()))?;
}
Expand Down
6 changes: 4 additions & 2 deletions crates/networking/p2p/tx_broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ impl TxBroadcaster {
.get_txs_for_broadcast()
.map_err(|_| TxBroadcasterError::Broadcast)?;
if txs_to_broadcast.is_empty() {
trace!("No transactions to broadcast");
return Ok(());
}
let peers = self.peer_table.get_peers_with_capabilities().await?;
Expand Down Expand Up @@ -244,7 +243,10 @@ impl TxBroadcaster {
)
.await?;
}
self.blockchain.mempool.clear_broadcasted_txs()?;
let broadcasted_hashes: Vec<H256> = txs_to_broadcast.iter().map(|tx| tx.hash()).collect();
self.blockchain
.mempool
.remove_broadcasted_txs(&broadcasted_hashes)?;
Ok(())
}

Expand Down
Loading