diff --git a/fuel-block-producer/Cargo.toml b/fuel-block-producer/Cargo.toml index 182099fb34d..fce145993c3 100644 --- a/fuel-block-producer/Cargo.toml +++ b/fuel-block-producer/Cargo.toml @@ -27,4 +27,3 @@ rstest = "0.15" [features] test-helpers = ["fuel-core-interfaces/test-helpers"] - diff --git a/fuel-block-producer/tests/integration.rs b/fuel-block-producer/tests/integration.rs index d0e3fc44d91..86e804b9fba 100644 --- a/fuel-block-producer/tests/integration.rs +++ b/fuel-block-producer/tests/integration.rs @@ -37,8 +37,10 @@ use fuel_core_interfaces::{ Coin, CoinStatus, }, + txpool::Sender as TxPoolSender, }; use fuel_txpool::{ + Config as TxPoolConfig, MockDb as TxPoolDb, ServiceBuilder as TxPoolServiceBuilder, }; @@ -48,7 +50,10 @@ use rand::{ SeedableRng, }; use std::sync::Arc; -use tokio::sync::broadcast; +use tokio::sync::{ + broadcast, + mpsc, +}; const COIN_AMOUNT: u64 = 1_000_000_000; @@ -108,8 +113,30 @@ async fn block_producer() -> Result<()> { let (import_block_events_tx, import_block_events_rx) = broadcast::channel(16); let mut txpool_builder = TxPoolServiceBuilder::new(); - txpool_builder.db(Box::new(txpool_db)); - txpool_builder.import_block_event(import_block_events_rx); + + let (tx_status_sender, mut tx_status_receiver) = broadcast::channel(100); + + // Remove once tx_status events are used + tokio::spawn(async move { while (tx_status_receiver.recv().await).is_ok() {} }); + + let (txpool_sender, txpool_receiver) = mpsc::channel(100); + let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + + let keep_alive = Box::new(incoming_tx_sender); + Box::leak(keep_alive); + + txpool_builder + .config(TxPoolConfig::default()) + .db(Box::new(txpool_db)) + .incoming_tx_receiver(incoming_tx_receiver) + .import_block_event(import_block_events_rx) + .tx_status_sender(tx_status_sender) + .txpool_sender(TxPoolSender::new(txpool_sender)) + .txpool_receiver(txpool_receiver); + + let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100); + txpool_builder.network_sender(p2p_request_event_sender); + let txpool = txpool_builder.build().unwrap(); txpool.start().await?; diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index 1a5cd17e4e7..dbe565b0685 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -11,6 +11,7 @@ use async_trait::async_trait; use std::sync::Arc; use tokio::sync::oneshot; +#[derive(Debug, PartialEq, Eq, Clone)] pub enum TransactionBroadcast { NewTransaction(Transaction), } @@ -24,6 +25,7 @@ pub enum BlockBroadcast { NewBlock(FuelBlock), } +#[derive(Debug)] pub enum P2pRequestEvent { RequestBlock { height: BlockHeight, diff --git a/fuel-core-interfaces/src/txpool.rs b/fuel-core-interfaces/src/txpool.rs index a939a6c89cf..e89a2deb667 100644 --- a/fuel-core-interfaces/src/txpool.rs +++ b/fuel-core-interfaces/src/txpool.rs @@ -119,6 +119,11 @@ impl Sender { self.send(TxPoolMpsc::Remove { ids, response }).await?; receiver.await.map_err(Into::into) } + + pub fn channel(buffer: usize) -> (Sender, mpsc::Receiver) { + let (sender, reciever) = mpsc::channel(buffer); + (Sender(sender), reciever) + } } /// RPC commands that can be sent to the TxPool through an MPSC channel. diff --git a/fuel-core/src/lib.rs b/fuel-core/src/lib.rs index 12055ff897b..c70540e8e0a 100644 --- a/fuel-core/src/lib.rs +++ b/fuel-core/src/lib.rs @@ -7,3 +7,6 @@ pub mod schema; pub mod service; pub mod state; pub mod tx_pool; + +#[cfg(feature = "p2p")] +pub use fuel_p2p; diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index 2f802d50503..c5ed701300f 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -14,12 +14,18 @@ use fuel_core_interfaces::{ BlockHeight, FuelBlock, }, - txpool::TxPoolDb, + txpool::{ + Sender, + TxPoolDb, + }, }; use futures::future::join_all; use std::sync::Arc; use tokio::{ - sync::mpsc, + sync::{ + broadcast, + mpsc, + }, task::JoinHandle, }; use tracing::info; @@ -63,28 +69,27 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result) - .import_block_event(block_importer.subscribe()) - .private_key( - hex::decode( - "c6bd905dcac2a0b1c43f574ab6933df14d7ceee0194902bce523ed054e8e798b", - ) - .unwrap(), - ); + let relayer = { + let mut relayer_builder = fuel_relayer::ServiceBuilder::new(); + relayer_builder + .config(config.relayer.clone()) + .db(Box::new(database.clone()) as Box) + .import_block_event(block_importer.subscribe()) + .private_key( + hex::decode( + "c6bd905dcac2a0b1c43f574ab6933df14d7ceee0194902bce523ed054e8e798b", + ) + .unwrap(), + ); + + relayer_builder.build()? + }; let relayer_sender = { #[cfg(feature = "relayer")] { - relayer_builder.sender().clone() + relayer.sender().clone() } #[cfg(not(feature = "relayer"))] { @@ -92,26 +97,69 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result = Arc::new(database.clone()); + let (tx_consensus, _) = mpsc::channel(100); + fuel_p2p::orchestrator::Service::new( + config.p2p.clone(), + p2p_db, + p2p_request_event_sender.clone(), + p2p_request_event_receiver, + tx_consensus, + incoming_tx_sender, + block_event_sender, + ) + }; + #[cfg(not(feature = "p2p"))] + { + let keep_alive = Box::new(incoming_tx_sender); + Box::leak(keep_alive); + + let keep_alive = Box::new(block_event_sender); + Box::leak(keep_alive); + + tokio::spawn(async move { + while (p2p_request_event_receiver.recv().await).is_some() {} + }); + } + + let (tx_status_sender, mut tx_status_receiver) = broadcast::channel(100); + + // Remove once tx_status events are used + tokio::spawn(async move { while (tx_status_receiver.recv().await).is_ok() {} }); + + let (txpool_sender, txpool_receiver) = mpsc::channel(100); + + let mut txpool_builder = fuel_txpool::ServiceBuilder::new(); txpool_builder .config(config.txpool.clone()) .db(Box::new(database.clone()) as Box) - .import_block_event(block_importer.subscribe()); + .incoming_tx_receiver(incoming_tx_receiver) + .import_block_event(block_importer.subscribe()) + .tx_status_sender(tx_status_sender) + .txpool_sender(Sender::new(txpool_sender)) + .txpool_receiver(txpool_receiver); - #[cfg(feature = "p2p")] - let (tx_request_event, rx_request_event) = mpsc::channel(100); - #[cfg(feature = "p2p")] - let (tx_block, rx_block) = mpsc::channel(100); + txpool_builder.network_sender(p2p_request_event_sender.clone()); - #[cfg(not(feature = "p2p"))] - let (tx_request_event, _) = mpsc::channel(100); - #[cfg(not(feature = "p2p"))] - let (_, rx_block) = mpsc::channel(100); + let txpool = txpool_builder.build()?; + + // start services block_importer.start().await; bft.start( relayer_sender.clone(), - tx_request_event.clone(), + p2p_request_event_sender.clone(), block_producer.clone(), block_importer.sender().clone(), block_importer.subscribe(), @@ -119,49 +167,26 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result = Arc::new(database.clone()); - #[cfg(feature = "p2p")] - let (tx_consensus, _) = mpsc::channel(100); - #[cfg(feature = "p2p")] - let (tx_transaction, _) = mpsc::channel(100); - - #[cfg(feature = "p2p")] - let network_service = fuel_p2p::orchestrator::Service::new( - config.p2p.clone(), - p2p_db, - tx_request_event, - rx_request_event, - tx_consensus, - tx_transaction, - tx_block, - ); #[cfg(feature = "p2p")] if !config.p2p.network_name.is_empty() { network_service.start().await?; } + txpool.start().await?; + Ok(Modules { txpool: Arc::new(txpool), block_importer: Arc::new(block_importer), diff --git a/fuel-p2p/src/config.rs b/fuel-p2p/src/config.rs index 1b6432d3f17..4ad32218a9f 100644 --- a/fuel-p2p/src/config.rs +++ b/fuel-p2p/src/config.rs @@ -1,3 +1,8 @@ +use crate::gossipsub::topics::{ + CON_VOTE_GOSSIP_TOPIC, + NEW_BLOCK_GOSSIP_TOPIC, + NEW_TX_GOSSIP_TOPIC, +}; use libp2p::{ core::{ muxing::StreamMuxerBox, @@ -102,7 +107,11 @@ impl P2PConfig { allow_private_addresses: true, enable_random_walk: true, connection_idle_timeout: Some(Duration::from_secs(120)), - topics: vec![], + topics: vec![ + NEW_TX_GOSSIP_TOPIC.into(), + NEW_BLOCK_GOSSIP_TOPIC.into(), + CON_VOTE_GOSSIP_TOPIC.into(), + ], max_mesh_size: 12, min_mesh_size: 4, ideal_mesh_size: 6, diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index 81df8f5783a..0ebc3deeda1 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -8,10 +8,10 @@ use fuel_core_interfaces::p2p::{ P2pRequestEvent, TransactionBroadcast, }; - use libp2p::request_response::RequestId; use tokio::{ sync::{ + broadcast, mpsc::{ Receiver, Sender, @@ -49,7 +49,7 @@ pub struct NetworkOrchestrator { // senders tx_consensus: Sender, - tx_transaction: Sender, + tx_transaction: broadcast::Sender, tx_block: Sender, tx_outbound_responses: Sender>, @@ -62,7 +62,7 @@ impl NetworkOrchestrator { rx_request_event: Receiver, tx_consensus: Sender, - tx_transaction: Sender, + tx_transaction: broadcast::Sender, tx_block: Sender, db: Arc, @@ -174,7 +174,7 @@ impl Service { tx_request_event: Sender, rx_request_event: Receiver, tx_consensus: Sender, - tx_transaction: Sender, + tx_transaction: broadcast::Sender, tx_block: Sender, ) -> Self { let network_orchestrator = NetworkOrchestrator::new( @@ -274,7 +274,7 @@ pub mod tests { let (tx_request_event, rx_request_event) = tokio::sync::mpsc::channel(100); let (tx_consensus, _) = tokio::sync::mpsc::channel(100); - let (tx_transaction, _) = tokio::sync::mpsc::channel(100); + let (tx_transaction, _) = tokio::sync::broadcast::channel(100); let (tx_block, _) = tokio::sync::mpsc::channel(100); let service = Service::new( diff --git a/fuel-relayer/src/service.rs b/fuel-relayer/src/service.rs index 260a9350dea..9bf1877ca65 100644 --- a/fuel-relayer/src/service.rs +++ b/fuel-relayer/src/service.rs @@ -61,9 +61,6 @@ impl ServiceBuilder { config: Default::default(), } } - pub fn sender(&mut self) -> &relayer::Sender { - &self.sender - } pub fn private_key(&mut self, private_key: Vec) -> &mut Self { self.private_key = Some(private_key); diff --git a/fuel-tests/Cargo.toml b/fuel-tests/Cargo.toml index 64a75deec93..733aaec0046 100644 --- a/fuel-tests/Cargo.toml +++ b/fuel-tests/Cargo.toml @@ -37,3 +37,5 @@ tokio = { version = "1.21", features = ["macros", "rt-multi-thread"] } metrics = ["fuel-core/rocksdb", "fuel-core/metrics"] default = ["fuel-core/default", "metrics"] debug = ["fuel-core-interfaces/debug"] +p2p = ["fuel-core/p2p"] +relayer = ["fuel-core/relayer"] diff --git a/fuel-tests/tests/lib.rs b/fuel-tests/tests/lib.rs index 91ea7cb592a..bc0ed4684d4 100644 --- a/fuel-tests/tests/lib.rs +++ b/fuel-tests/tests/lib.rs @@ -14,3 +14,5 @@ mod node_info; mod resource; mod snapshot; mod tx; +#[cfg(feature = "p2p")] +mod tx_gossip; diff --git a/fuel-tests/tests/tx_gossip.rs b/fuel-tests/tests/tx_gossip.rs new file mode 100644 index 00000000000..9c3f12f2728 --- /dev/null +++ b/fuel-tests/tests/tx_gossip.rs @@ -0,0 +1,105 @@ +use fuel_core::{ + chain_config::{ + CoinConfig, + StateConfig, + }, + service::{ + Config, + FuelService, + }, +}; +use fuel_core_interfaces::common::{ + fuel_tx::TransactionBuilder, + fuel_vm::prelude::*, +}; +use fuel_gql_client::client::FuelClient; +use rand::{ + rngs::StdRng, + Rng, + SeedableRng, +}; +use std::time::Duration; + +fn create_node_config_from_inputs(inputs: &[Input]) -> Config { + let mut node_config = Config::local_node(); + let mut initial_state = StateConfig::default(); + let mut coin_configs = vec![]; + + for input in inputs { + if let Input::CoinSigned { + amount, + owner, + asset_id, + utxo_id, + .. + } + | Input::CoinPredicate { + amount, + owner, + asset_id, + utxo_id, + .. + } = input + { + let coin_config = CoinConfig { + tx_id: Some(*utxo_id.tx_id()), + output_index: Some(utxo_id.output_index() as u64), + block_created: None, + maturity: None, + owner: *owner, + amount: *amount, + asset_id: *asset_id, + }; + coin_configs.push(coin_config); + }; + } + + initial_state.coins = Some(coin_configs); + node_config.chain_conf.initial_state = Some(initial_state); + node_config.utxo_validation = true; + node_config.p2p.enable_mdns = true; + node_config +} + +#[tokio::test] +async fn test_tx_gossiping() { + let mut rng = StdRng::seed_from_u64(2322); + + let tx = TransactionBuilder::script(vec![], vec![]) + .gas_limit(100) + .gas_price(1) + .add_unsigned_coin_input( + SecretKey::random(&mut rng), + rng.gen(), + 1000, + Default::default(), + Default::default(), + 0, + ) + .add_output(Output::Change { + amount: 0, + asset_id: Default::default(), + to: rng.gen(), + }) + .finalize(); + + let node_config = create_node_config_from_inputs(tx.inputs()); + let node_one = FuelService::new_node(node_config).await.unwrap(); + let client_one = FuelClient::from(node_one.bound_address); + + let node_config = create_node_config_from_inputs(tx.inputs()); + let node_two = FuelService::new_node(node_config).await.unwrap(); + let client_two = FuelClient::from(node_two.bound_address); + + tokio::time::sleep(Duration::new(3, 0)).await; + + let result = client_one.submit(&tx).await.unwrap(); + + let tx = client_one.transaction(&result.0.to_string()).await.unwrap(); + assert!(tx.is_some()); + + tokio::time::sleep(Duration::new(3, 0)).await; + + let tx = client_two.transaction(&result.0.to_string()).await.unwrap(); + assert!(tx.is_some()); +} diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index f1adb34e8b6..a3f91da91a7 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -5,6 +5,10 @@ use crate::{ use anyhow::anyhow; use fuel_core_interfaces::{ block_importer::ImportBlockBroadcast, + p2p::{ + P2pRequestEvent, + TransactionBroadcast, + }, txpool::{ self, TxPoolDb, @@ -22,14 +26,17 @@ use tokio::{ }, task::JoinHandle, }; +use tracing::error; pub struct ServiceBuilder { - sender: txpool::Sender, - receiver: mpsc::Receiver, config: Config, - broadcast: broadcast::Sender, db: Option>, - import_block_events: Option>, + txpool_sender: Option, + txpool_receiver: Option>, + tx_status_sender: Option>, + import_block_receiver: Option>, + incoming_tx_receiver: Option>, + network_sender: Option>, } impl Default for ServiceBuilder { @@ -40,24 +47,24 @@ impl Default for ServiceBuilder { impl ServiceBuilder { pub fn new() -> Self { - let (sender, receiver) = mpsc::channel(100); - let (broadcast, _receiver) = broadcast::channel(100); Self { - sender: txpool::Sender::new(sender), - receiver, - db: None, - broadcast, config: Default::default(), - import_block_events: None, + db: None, + txpool_sender: None, + txpool_receiver: None, + tx_status_sender: None, + import_block_receiver: None, + incoming_tx_receiver: None, + network_sender: None, } } pub fn sender(&self) -> &txpool::Sender { - &self.sender + self.txpool_sender.as_ref().unwrap() } pub fn subscribe(&self) -> broadcast::Receiver { - self.broadcast.subscribe() + self.tx_status_sender.as_ref().unwrap().subscribe() } pub fn db(&mut self, db: Box) -> &mut Self { @@ -65,11 +72,48 @@ impl ServiceBuilder { self } + pub fn txpool_sender(&mut self, txpool_sender: txpool::Sender) -> &mut Self { + self.txpool_sender = Some(txpool_sender); + self + } + + pub fn txpool_receiver( + &mut self, + txpool_receiver: mpsc::Receiver, + ) -> &mut Self { + self.txpool_receiver = Some(txpool_receiver); + self + } + + pub fn tx_status_sender( + &mut self, + tx_status_sender: broadcast::Sender, + ) -> &mut Self { + self.tx_status_sender = Some(tx_status_sender); + self + } + + pub fn incoming_tx_receiver( + &mut self, + incoming_tx_receiver: broadcast::Receiver, + ) -> &mut Self { + self.incoming_tx_receiver = Some(incoming_tx_receiver); + self + } + + pub fn network_sender( + &mut self, + network_sender: mpsc::Sender, + ) -> &mut Self { + self.network_sender = Some(network_sender); + self + } + pub fn import_block_event( &mut self, - import_block_event: broadcast::Receiver, + import_block_receiver: broadcast::Receiver, ) -> &mut Self { - self.import_block_events = Some(import_block_event); + self.import_block_receiver = Some(import_block_receiver); self } @@ -79,18 +123,31 @@ impl ServiceBuilder { } pub fn build(self) -> anyhow::Result { - if self.db.is_none() || self.import_block_events.is_none() { + if self.db.is_none() + || self.import_block_receiver.is_none() + || self.incoming_tx_receiver.is_none() + || self.txpool_sender.is_none() + || self.tx_status_sender.is_none() + || self.txpool_receiver.is_none() + { return Err(anyhow!("One of context items are not set")) } + + if self.network_sender.is_none() { + return Err(anyhow!("P2P network sender is not set")) + } + let service = Service::new( - self.sender, - self.broadcast.clone(), + self.txpool_sender.unwrap(), + self.tx_status_sender.clone().unwrap(), Context { - receiver: self.receiver, - broadcast: self.broadcast, - db: Arc::new(self.db.unwrap()), - import_block_events: self.import_block_events.unwrap(), config: self.config, + db: Arc::new(self.db.unwrap()), + txpool_receiver: self.txpool_receiver.unwrap(), + tx_status_sender: self.tx_status_sender.unwrap(), + import_block_receiver: self.import_block_receiver.unwrap(), + incoming_tx_receiver: self.incoming_tx_receiver.unwrap(), + network_sender: self.network_sender.unwrap(), }, )?; Ok(service) @@ -99,10 +156,12 @@ impl ServiceBuilder { pub struct Context { pub config: Config, - pub broadcast: broadcast::Sender, pub db: Arc>, - pub receiver: mpsc::Receiver, - pub import_block_events: broadcast::Receiver, + pub txpool_receiver: mpsc::Receiver, + pub tx_status_sender: broadcast::Sender, + pub import_block_receiver: broadcast::Receiver, + pub incoming_tx_receiver: broadcast::Receiver, + pub network_sender: mpsc::Sender, } impl Context { @@ -111,13 +170,36 @@ impl Context { loop { tokio::select! { - event = self.receiver.recv() => { - if matches!(event,Some(TxPoolMpsc::Stop) | None) { + new_transaction = self.incoming_tx_receiver.recv() => { + if new_transaction.is_err() { + error!("Incoming tx receiver channel closed unexpectedly; shutting down transaction pool service."); break; } - let broadcast = self.broadcast.clone(); + + let txpool = txpool.clone(); let db = self.db.clone(); + let tx_status_sender = self.tx_status_sender.clone(); + + tokio::spawn( async move { + let txpool = txpool.as_ref(); + match new_transaction.unwrap() { + TransactionBroadcast::NewTransaction ( tx ) => { + let txs = vec!(Arc::new(tx)); + TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender, txs).await + } + } + }); + } + + event = self.txpool_receiver.recv() => { + if matches!(event,Some(TxPoolMpsc::Stop) | None) { + break; + } let txpool = txpool.clone(); + let db = self.db.clone(); + let tx_status_sender = self.tx_status_sender.clone(); + + let network_sender = self.network_sender.clone(); // This is little bit risky but we can always add semaphore to limit number of requests. tokio::spawn( async move { @@ -127,8 +209,9 @@ impl Context { let _ = response.send(TxPool::includable(txpool).await); } TxPoolMpsc::Insert { txs, response } => { - let _ = response.send(TxPool::insert(txpool,db.as_ref().as_ref(),broadcast, txs).await); + let _ = response.send(TxPool::insert_with_broadcast(txpool, db.as_ref().as_ref(), tx_status_sender, network_sender, txs).await); } + TxPoolMpsc::Find { ids, response } => { let _ = response.send(TxPool::find(txpool,&ids).await); } @@ -142,14 +225,14 @@ impl Context { let _ = response.send(TxPool::filter_by_negative(txpool,&ids).await); } TxPoolMpsc::Remove { ids, response } => { - let _ = response.send(TxPool::remove(txpool,broadcast,&ids).await); + let _ = response.send(TxPool::remove(txpool,tx_status_sender,&ids).await); } TxPoolMpsc::Stop => {} }}); } - block_updated = self.import_block_events.recv() => { - if let Ok(block_updated) = block_updated { + block_updated = self.import_block_receiver.recv() => { + if let Ok(block_updated) = block_updated { match block_updated { ImportBlockBroadcast::PendingFuelBlockImported { block } => { let txpool = txpool.clone(); @@ -173,21 +256,21 @@ impl Context { } pub struct Service { - sender: txpool::Sender, - broadcast: broadcast::Sender, + txpool_sender: txpool::Sender, + tx_status_sender: broadcast::Sender, join: Mutex>>, context: Arc>>, } impl Service { pub fn new( - sender: txpool::Sender, - broadcast: broadcast::Sender, + txpool_sender: txpool::Sender, + tx_status_sender: broadcast::Sender, context: Context, ) -> anyhow::Result { Ok(Self { - sender, - broadcast, + txpool_sender, + tx_status_sender, join: Mutex::new(None), context: Arc::new(Mutex::new(Some(context))), }) @@ -210,8 +293,9 @@ impl Service { pub async fn stop(&self) -> Option> { let mut join = self.join.lock().await; let join_handle = join.take(); + if let Some(join_handle) = join_handle { - let _ = self.sender.send(TxPoolMpsc::Stop).await; + let _ = self.txpool_sender.send(TxPoolMpsc::Stop).await; let context = self.context.clone(); Some(tokio::spawn(async move { let ret = join_handle.await; @@ -223,11 +307,11 @@ impl Service { } pub fn subscribe_ch(&self) -> broadcast::Receiver { - self.broadcast.subscribe() + self.tx_status_sender.subscribe() } pub fn sender(&self) -> &txpool::Sender { - &self.sender + &self.txpool_sender } } @@ -239,7 +323,10 @@ pub mod tests { common::fuel_tx::TransactionBuilder, txpool::{ Error as TxpoolError, + Sender, + TxPoolMpsc, TxStatus, + TxStatusBroadcast, }, }; use tokio::sync::oneshot; @@ -249,12 +336,23 @@ pub mod tests { let config = Config::default(); let db = Box::new(MockDb::default()); let (bs, _br) = broadcast::channel(10); + let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); let mut builder = ServiceBuilder::new(); builder .config(config) .db(db) - .import_block_event(bs.subscribe()); + .incoming_tx_receiver(incoming_tx_receiver) + .import_block_event(bs.subscribe()) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver); + + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); + let service = builder.build().unwrap(); assert!(service.start().await.is_ok(), "start service"); @@ -274,9 +372,23 @@ pub mod tests { let config = Config::default(); let db = Box::new(MockDb::default()); let (_bs, br) = broadcast::channel(10); + let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); let mut builder = ServiceBuilder::new(); - builder.config(config).db(db).import_block_event(br); + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver); + + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); + let service = builder.build().unwrap(); service.start().await.ok(); @@ -330,6 +442,9 @@ pub mod tests { let config = Config::default(); let db = Box::new(MockDb::default()); let (_bs, br) = broadcast::channel(10); + let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) @@ -348,7 +463,19 @@ pub mod tests { ); let mut builder = ServiceBuilder::new(); - builder.config(config).db(db).import_block_event(br); + + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver); + + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); + let service = builder.build().unwrap(); service.start().await.ok(); @@ -385,8 +512,12 @@ pub mod tests { #[tokio::test] async fn simple_insert_removal_subscription() { let config = Config::default(); - let db = Box::new(MockDb::default()); let (_bs, br) = broadcast::channel(10); + let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let db = Box::new(MockDb::default()); let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) @@ -400,7 +531,19 @@ pub mod tests { ); let mut builder = ServiceBuilder::new(); - builder.config(config).db(db).import_block_event(br); + + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver); + + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); + let service = builder.build().unwrap(); service.start().await.ok(); @@ -473,3 +616,177 @@ pub mod tests { ); } } + +#[cfg(all(test, feature = "p2p"))] +pub mod tests_p2p { + use super::*; + use crate::MockDb; + use fuel_core_interfaces::{ + common::fuel_tx::TransactionBuilder, + txpool::{ + Sender, + TxPoolMpsc, + TxStatus, + TxStatusBroadcast, + }, + }; + use tokio::sync::{ + mpsc::error::TryRecvError, + oneshot, + }; + + #[tokio::test] + async fn test_insert_from_p2p() { + let config = Config::default(); + let db = Box::new(MockDb::default()); + let (_bs, br) = broadcast::channel(10); + + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, _) = mpsc::channel(100); + let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let tx1 = TransactionBuilder::script(vec![], vec![]) + .gas_price(10) + .finalize(); + + let mut builder = ServiceBuilder::new(); + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver) + .network_sender(network_sender); + + let service = builder.build().unwrap(); + service.start().await.ok(); + + let broadcast_tx = TransactionBroadcast::NewTransaction(tx1.clone()); + let mut receiver = service.subscribe_ch(); + let res = incoming_tx_sender.send(broadcast_tx).unwrap(); + let _ = receiver.recv().await; + assert_eq!(1, res); + + let (response, receiver) = oneshot::channel(); + let _ = service + .sender() + .send(TxPoolMpsc::Find { + ids: vec![tx1.id()], + response, + }) + .await; + let out = receiver.await.unwrap(); + + let arc_tx1 = Arc::new(tx1); + + assert_eq!(arc_tx1, *out[0].as_ref().unwrap().tx()); + } + + #[tokio::test] + async fn test_insert_from_local_broadcasts_to_p2p() { + let config = Config::default(); + let db = Box::new(MockDb::default()); + let (_bs, br) = broadcast::channel(10); + + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, mut rx) = mpsc::channel(100); + let (_stx, incoming_txs) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let tx1 = Arc::new( + TransactionBuilder::script(vec![], vec![]) + .gas_price(10) + .finalize(), + ); + + let mut builder = ServiceBuilder::new(); + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_txs) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver) + .network_sender(network_sender); + let service = builder.build().unwrap(); + service.start().await.ok(); + + let mut subscribe = service.subscribe_ch(); + + let (response, receiver) = oneshot::channel(); + let _ = service + .sender() + .send(TxPoolMpsc::Insert { + txs: vec![tx1.clone()], + response, + }) + .await; + let out = receiver.await.unwrap(); + + assert!(out[0].is_ok(), "Tx1 should be OK, got err:{:?}", out); + + // we are sure that included tx are already broadcasted. + assert_eq!( + subscribe.try_recv(), + Ok(TxStatusBroadcast { + tx: tx1.clone(), + status: TxStatus::Submitted, + }), + "First added should be tx1" + ); + + let ret = rx.try_recv().unwrap(); + + if let P2pRequestEvent::BroadcastNewTransaction { transaction } = ret { + assert_eq!(tx1, transaction); + } else { + panic!("Transaction Broadcast Unwrap Failed"); + } + } + + #[tokio::test] + async fn test_insert_from_p2p_does_not_broadcast_to_p2p() { + let config = Config::default(); + let db = Box::new(MockDb::default()); + let (_bs, br) = broadcast::channel(10); + + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, mut network_receiver) = mpsc::channel(100); + let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let tx1 = TransactionBuilder::script(vec![], vec![]) + .gas_price(10) + .finalize(); + + let mut builder = ServiceBuilder::new(); + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver) + .network_sender(network_sender); + let service = builder.build().unwrap(); + service.start().await.ok(); + + let broadcast_tx = TransactionBroadcast::NewTransaction(tx1.clone()); + let mut receiver = service.subscribe_ch(); + let res = incoming_tx_sender.send(broadcast_tx).unwrap(); + let _ = receiver.recv().await; + assert_eq!(1, res); + + let ret = network_receiver.try_recv(); + assert!(ret.is_err()); + assert_eq!(Some(TryRecvError::Empty), ret.err()); + } +} diff --git a/fuel-txpool/src/txpool.rs b/fuel-txpool/src/txpool.rs index 0378b5af388..0a1f5b2cfaf 100644 --- a/fuel-txpool/src/txpool.rs +++ b/fuel-txpool/src/txpool.rs @@ -29,6 +29,9 @@ use tokio::sync::{ RwLock, }; +use fuel_core_interfaces::p2p::P2pRequestEvent; +use tokio::sync::mpsc; + #[derive(Debug, Clone)] pub struct TxPool { by_hash: HashMap, @@ -145,11 +148,52 @@ impl TxPool { Ok(()) } + pub async fn insert_with_broadcast( + txpool: &RwLock, + db: &dyn TxPoolDb, + tx_status_sender: broadcast::Sender, + network_sender: mpsc::Sender, + txs: Vec, + ) -> Vec>> { + let mut res = Vec::new(); + for tx in txs.iter() { + let mut pool = txpool.write().await; + res.push(pool.insert_inner(tx.clone(), db).await) + } + for (ret, tx) in res.iter().zip(txs.into_iter()) { + match ret { + Ok(removed) => { + for removed in removed { + let _ = tx_status_sender.send(TxStatusBroadcast { + tx: removed.clone(), + status: TxStatus::SqueezedOut { + reason: Error::Removed, + }, + }); + } + let _ = tx_status_sender.send(TxStatusBroadcast { + tx: tx.clone(), + status: TxStatus::Submitted, + }); + let _ = network_sender + .send(P2pRequestEvent::BroadcastNewTransaction { + transaction: tx.clone(), + }) + .await; + } + Err(_) => { + // @dev should not broadcast tx if error occurred + } + } + } + res + } + /// Import a set of transactions from network gossip or GraphQL endpoints. pub async fn insert( txpool: &RwLock, db: &dyn TxPoolDb, - broadcast: broadcast::Sender, + tx_status_sender: broadcast::Sender, txs: Vec, ) -> Vec>> { // Check if that data is okay (witness match input/output, and if recovered signatures ara valid). @@ -166,19 +210,21 @@ impl TxPool { for removed in removed { // small todo there is possibility to have removal reason (ReplacedByHigherGas, DependencyRemoved) // but for now it is okay to just use Error::Removed. - let _ = broadcast.send(TxStatusBroadcast { + let _ = tx_status_sender.send(TxStatusBroadcast { tx: removed.clone(), status: TxStatus::SqueezedOut { reason: Error::Removed, }, }); } - let _ = broadcast.send(TxStatusBroadcast { + let _ = tx_status_sender.send(TxStatusBroadcast { tx, status: TxStatus::Submitted, }); } - Err(_) => {} + Err(_) => { + // @dev should not broadcast tx if error occurred + } } } res diff --git a/fuel-txpool/src/types.rs b/fuel-txpool/src/types.rs index 2b268a87f3e..ebf10a3e74c 100644 --- a/fuel-txpool/src/types.rs +++ b/fuel-txpool/src/types.rs @@ -1,8 +1,10 @@ -pub use fuel_core_interfaces::common::fuel_tx::{ - ContractId, - Transaction, - TxId, +pub use fuel_core_interfaces::common::{ + fuel_tx::{ + ContractId, + Transaction, + TxId, + }, + fuel_types::Word, }; -use fuel_core_interfaces::common::fuel_types::Word; pub type GasPrice = Word;