diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index a3f91da91a7..0674e4e0ccc 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -129,14 +129,11 @@ impl ServiceBuilder { || self.txpool_sender.is_none() || self.tx_status_sender.is_none() || self.txpool_receiver.is_none() + || self.network_sender.is_none() { return Err(anyhow!("One of context items are not set")) } - if self.network_sender.is_none() { - return Err(anyhow!("P2P network sender is not set")) - } - let service = Service::new( self.txpool_sender.unwrap(), self.tx_status_sender.clone().unwrap(), @@ -209,9 +206,19 @@ impl Context { let _ = response.send(TxPool::includable(txpool).await); } TxPoolMpsc::Insert { txs, response } => { - let _ = response.send(TxPool::insert_with_broadcast(txpool, db.as_ref().as_ref(), tx_status_sender, network_sender, txs).await); + let insert = TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender,txs.clone()).await; + for (ret, tx) in insert.iter().zip(txs.into_iter()) { + match ret { + Ok(_) => { + let _ = network_sender.send(P2pRequestEvent::BroadcastNewTransaction { + transaction: tx.clone(), + }).await; + } + Err(_) => {} + } + } + let _ = response.send(insert); } - TxPoolMpsc::Find { ids, response } => { let _ = response.send(TxPool::find(txpool,&ids).await); } diff --git a/fuel-txpool/src/txpool.rs b/fuel-txpool/src/txpool.rs index 0a1f5b2cfaf..2fa7b718614 100644 --- a/fuel-txpool/src/txpool.rs +++ b/fuel-txpool/src/txpool.rs @@ -29,9 +29,6 @@ use tokio::sync::{ RwLock, }; -use fuel_core_interfaces::p2p::P2pRequestEvent; -use tokio::sync::mpsc; - #[derive(Debug, Clone)] pub struct TxPool { by_hash: HashMap, @@ -148,47 +145,6 @@ impl TxPool { Ok(()) } - pub async fn insert_with_broadcast( - txpool: &RwLock, - db: &dyn TxPoolDb, - tx_status_sender: broadcast::Sender, - network_sender: mpsc::Sender, - txs: Vec, - ) -> Vec>> { - let mut res = Vec::new(); - for tx in txs.iter() { - let mut pool = txpool.write().await; - res.push(pool.insert_inner(tx.clone(), db).await) - } - for (ret, tx) in res.iter().zip(txs.into_iter()) { - match ret { - Ok(removed) => { - for removed in removed { - let _ = tx_status_sender.send(TxStatusBroadcast { - tx: removed.clone(), - status: TxStatus::SqueezedOut { - reason: Error::Removed, - }, - }); - } - let _ = tx_status_sender.send(TxStatusBroadcast { - tx: tx.clone(), - status: TxStatus::Submitted, - }); - let _ = network_sender - .send(P2pRequestEvent::BroadcastNewTransaction { - transaction: tx.clone(), - }) - .await; - } - Err(_) => { - // @dev should not broadcast tx if error occurred - } - } - } - res - } - /// Import a set of transactions from network gossip or GraphQL endpoints. pub async fn insert( txpool: &RwLock,