From 2dc5c358cf4db32e250ccbcd24f1f4696bac10b5 Mon Sep 17 00:00:00 2001 From: controlcthenv Date: Sat, 3 Sep 2022 15:59:33 -0600 Subject: [PATCH 01/38] txpool reimpled --- Cargo.lock | 1 - fuel-core-interfaces/src/p2p.rs | 2 + fuel-core-interfaces/src/txpool.rs | 5 + fuel-txpool/src/service.rs | 373 +++++++++++++++++++++++++---- fuel-txpool/src/txpool.rs | 50 +++- 5 files changed, 379 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ebce38d3c3..671f0bb2845 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2130,7 +2130,6 @@ dependencies = [ "insta", "itertools", "lazy_static", - "prometheus", "rand 0.8.5", "rocksdb", "serde", diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index 6f45a865d8c..3e370bf38d5 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -5,6 +5,7 @@ use fuel_tx::Transaction; use std::sync::Arc; use tokio::sync::oneshot; +#[derive(Debug, PartialEq, Clone)] pub enum TransactionBroadcast { NewTransaction(Transaction), } @@ -18,6 +19,7 @@ pub enum BlockBroadcast { NewBlock(FuelBlock), } +#[derive(Debug)] pub enum P2pRequestEvent { RequestBlock { height: BlockHeight, diff --git a/fuel-core-interfaces/src/txpool.rs b/fuel-core-interfaces/src/txpool.rs index 5ad4fdfae6e..f6c6bc417eb 100644 --- a/fuel-core-interfaces/src/txpool.rs +++ b/fuel-core-interfaces/src/txpool.rs @@ -89,6 +89,11 @@ impl Sender { self.send(TxPoolMpsc::Remove { ids, response }).await?; receiver.await.map_err(Into::into) } + + pub fn channel(buffer: usize) -> (Sender, mpsc::Receiver) { + let (sender, reciever) = mpsc::channel(buffer); + (Sender(sender), reciever) + } } /// RPC commands that can be sent to the TxPool through an MPSC channel. diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index f6f82163f43..1ed8faa0db9 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -3,16 +3,20 @@ use anyhow::anyhow; use fuel_core_interfaces::block_importer::ImportBlockBroadcast; use fuel_core_interfaces::txpool::{self, TxPoolDb, TxPoolMpsc, TxStatusBroadcast}; use std::sync::Arc; +use fuel_core_interfaces::p2p::P2pRequestEvent; use tokio::sync::{broadcast, mpsc, Mutex, RwLock}; use tokio::task::JoinHandle; +use fuel_core_interfaces::p2p::TransactionBroadcast; pub struct ServiceBuilder { - sender: txpool::Sender, - receiver: mpsc::Receiver, config: Config, - broadcast: broadcast::Sender, db: Option>, - import_block_events: Option>, + txpool_sender: Option, + txpool_receiver: Option>, + tx_status_sender: Option>, + network_sender: Option>, + import_block_receiver: Option>, + incoming_tx_receiver: Option>, } impl Default for ServiceBuilder { @@ -23,24 +27,24 @@ impl Default for ServiceBuilder { impl ServiceBuilder { pub fn new() -> Self { - let (sender, receiver) = mpsc::channel(100); - let (broadcast, _receiver) = broadcast::channel(100); Self { - sender: txpool::Sender::new(sender), - receiver, - db: None, - broadcast, config: Default::default(), - import_block_events: None, + db: None, + txpool_sender: None, + txpool_receiver: None, + tx_status_sender: None, + network_sender: None, + import_block_receiver: None, + incoming_tx_receiver: None, } } pub fn sender(&self) -> &txpool::Sender { - &self.sender + self.txpool_sender.as_ref().unwrap() } pub fn subscribe(&self) -> broadcast::Receiver { - self.broadcast.subscribe() + self.tx_status_sender.as_ref().unwrap().subscribe() } pub fn db(&mut self, db: Box) -> &mut Self { @@ -48,11 +52,42 @@ impl ServiceBuilder { self } + pub fn txpool_sender(&mut self, txpool_sender: txpool::Sender) -> &mut Self { + self.txpool_sender = Some(txpool_sender); + self + } + + pub fn txpool_receiver(&mut self, txpool_receiver: mpsc::Receiver) -> &mut Self { + self.txpool_receiver = Some(txpool_receiver); + self + } + + pub fn tx_status_sender( + &mut self, + tx_status_sender: broadcast::Sender, + ) -> &mut Self { + self.tx_status_sender = Some(tx_status_sender); + self + } + + pub fn incoming_tx_receiver( + &mut self, + incoming_tx_receiver: broadcast::Receiver, + ) -> &mut Self { + self.incoming_tx_receiver = Some(incoming_tx_receiver); + self + } + + pub fn network_sender(&mut self, network_sender: mpsc::Sender) -> &mut Self { + self.network_sender = Some(network_sender); + self + } + pub fn import_block_event( &mut self, - import_block_event: broadcast::Receiver, + import_block_receiver: broadcast::Receiver, ) -> &mut Self { - self.import_block_events = Some(import_block_event); + self.import_block_receiver = Some(import_block_receiver); self } @@ -62,18 +97,27 @@ impl ServiceBuilder { } pub fn build(self) -> anyhow::Result { - if self.db.is_none() || self.import_block_events.is_none() { + if self.db.is_none() + || self.import_block_receiver.is_none() + || self.incoming_tx_receiver.is_none() + || self.network_sender.is_none() + || self.txpool_sender.is_none() + || self.tx_status_sender.is_none() + || self.txpool_receiver.is_none() + { return Err(anyhow!("One of context items are not set")); } let service = Service::new( - self.sender, - self.broadcast.clone(), + self.txpool_sender.unwrap(), + self.tx_status_sender.clone().unwrap(), Context { - receiver: self.receiver, - broadcast: self.broadcast, - db: Arc::new(self.db.unwrap()), - import_block_events: self.import_block_events.unwrap(), config: self.config, + db: Arc::new(self.db.unwrap()), + txpool_receiver: self.txpool_receiver.unwrap(), + tx_status_sender: self.tx_status_sender.unwrap(), + import_block_receiver: self.import_block_receiver.unwrap(), + incoming_tx_receiver: self.incoming_tx_receiver.unwrap(), + network_sender: self.network_sender.unwrap(), }, )?; Ok(service) @@ -82,10 +126,12 @@ impl ServiceBuilder { pub struct Context { pub config: Config, - pub broadcast: broadcast::Sender, pub db: Arc>, - pub receiver: mpsc::Receiver, - pub import_block_events: broadcast::Receiver, + pub txpool_receiver: mpsc::Receiver, + pub network_sender: mpsc::Sender, + pub tx_status_sender: broadcast::Sender, + pub import_block_receiver: broadcast::Receiver, + pub incoming_tx_receiver: broadcast::Receiver, } impl Context { @@ -94,13 +140,30 @@ impl Context { loop { tokio::select! { - event = self.receiver.recv() => { + new_transaction = self.incoming_tx_receiver.recv() => { + let txpool = txpool.clone(); + let db = self.db.clone(); + let tx_status_sender = self.tx_status_sender.clone(); + + tokio::spawn( async move { + let txpool = txpool.as_ref(); + match new_transaction.unwrap() { + TransactionBroadcast::NewTransaction ( tx ) => { + let txs = vec!(Arc::new(tx)); + TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender, txs).await + } + } + }); + } + + event = self.txpool_receiver.recv() => { if matches!(event,Some(TxPoolMpsc::Stop) | None) { break; } - let broadcast = self.broadcast.clone(); - let db = self.db.clone(); let txpool = txpool.clone(); + let db = self.db.clone(); + let tx_status_sender = self.tx_status_sender.clone(); + let network_sender = self.network_sender.clone(); // This is little bit risky but we can always add semaphore to limit number of requests. tokio::spawn( async move { @@ -110,7 +173,7 @@ impl Context { let _ = response.send(TxPool::includable(txpool).await); } TxPoolMpsc::Insert { txs, response } => { - let _ = response.send(TxPool::insert(txpool,db.as_ref().as_ref(),broadcast, txs).await); + let _ = response.send(TxPool::insert_with_broadcast(txpool, db.as_ref().as_ref(), tx_status_sender, network_sender, txs).await); } TxPoolMpsc::Find { ids, response } => { let _ = response.send(TxPool::find(txpool,&ids).await); @@ -125,12 +188,12 @@ impl Context { let _ = response.send(TxPool::filter_by_negative(txpool,&ids).await); } TxPoolMpsc::Remove { ids, response } => { - let _ = response.send(TxPool::remove(txpool,broadcast,&ids).await); + let _ = response.send(TxPool::remove(txpool,tx_status_sender,&ids).await); } TxPoolMpsc::Stop => {} }}); } - _block_updated = self.import_block_events.recv() => { + _block_updated = self.import_block_receiver.recv() => { let txpool = txpool.clone(); tokio::spawn( async move { TxPool::block_update(txpool.as_ref()).await @@ -143,21 +206,21 @@ impl Context { } pub struct Service { - sender: txpool::Sender, - broadcast: broadcast::Sender, + txpool_sender: txpool::Sender, + tx_status_sender: broadcast::Sender, join: Mutex>>, context: Arc>>, } impl Service { pub fn new( - sender: txpool::Sender, - broadcast: broadcast::Sender, + txpool_sender: txpool::Sender, + tx_status_sender: broadcast::Sender, context: Context, ) -> anyhow::Result { Ok(Self { - sender, - broadcast, + txpool_sender, + tx_status_sender, join: Mutex::new(None), context: Arc::new(Mutex::new(Some(context))), }) @@ -181,7 +244,7 @@ impl Service { let mut join = self.join.lock().await; let join_handle = join.take(); if let Some(join_handle) = join_handle { - let _ = self.sender.send(TxPoolMpsc::Stop).await; + let _ = self.txpool_sender.send(TxPoolMpsc::Stop).await; let context = self.context.clone(); Some(tokio::spawn(async move { let ret = join_handle.await; @@ -193,11 +256,11 @@ impl Service { } pub fn subscribe_ch(&self) -> broadcast::Receiver { - self.broadcast.subscribe() + self.tx_status_sender.subscribe() } pub fn sender(&self) -> &txpool::Sender { - &self.sender + &self.txpool_sender } } @@ -207,9 +270,9 @@ pub mod tests { use crate::MockDb; use fuel_core_interfaces::{ common::fuel_tx::TransactionBuilder, - txpool::{Error as TxpoolError, TxStatus}, + txpool::{self, Sender, TxPoolMpsc, TxStatusBroadcast, Error as TxpoolError, TxStatus}, }; - use tokio::sync::oneshot; + use tokio::sync::{mpsc::error::TryRecvError, oneshot}; #[tokio::test] async fn test_start_stop() { @@ -217,11 +280,23 @@ pub mod tests { let db = Box::new(MockDb::default()); let (bs, _br) = broadcast::channel(10); + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, _) = mpsc::channel(100); + let (_, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + let mut builder = ServiceBuilder::new(); builder .config(config) .db(db) - .import_block_event(bs.subscribe()); + .incoming_tx_receiver(incoming_tx_receiver) + .network_sender(network_sender) + .import_block_event(bs.subscribe()) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver); + let service = builder.build().unwrap(); assert!(service.start().await.is_ok(), "start service"); @@ -237,13 +312,183 @@ pub mod tests { } #[tokio::test] - async fn test_filter_by_negative() { + async fn test_insert_from_p2p() { let config = Config::default(); let db = Box::new(MockDb::default()); let (_bs, br) = broadcast::channel(10); + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, _) = mpsc::channel(100); + let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let tx1 = + TransactionBuilder::script(vec![], vec![]) + .gas_price(10) + .finalize(); + let mut builder = ServiceBuilder::new(); - builder.config(config).db(db).import_block_event(br); + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .network_sender(network_sender) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver); + let service = builder.build().unwrap(); + service.start().await.ok(); + + let broadcast_tx = TransactionBroadcast::NewTransaction(tx1.clone()); + let mut receiver = service.subscribe_ch(); + let res = incoming_tx_sender.send(broadcast_tx).unwrap(); + let _ = receiver.recv().await; + assert_eq!(1, res); + + let (response, receiver) = oneshot::channel(); + let _ = service + .sender() + .send(TxPoolMpsc::Find { + ids: vec![tx1.id()], + response, + }) + .await; + let out = receiver.await.unwrap(); + + let arc_tx1 = Arc::new(tx1); + + assert_eq!(arc_tx1, *out[0].as_ref().unwrap().tx()); + } + + #[tokio::test] + async fn test_insert_from_local_broadcasts_to_p2p() { + let config = Config::default(); + let db = Box::new(MockDb::default()); + let (_bs, br) = broadcast::channel(10); + + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, mut rx) = mpsc::channel(100); + let (_stx, incoming_txs) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let tx1 = Arc::new( + TransactionBuilder::script(vec![], vec![]) + .gas_price(10) + .finalize(), + ); + + + let mut builder = ServiceBuilder::new(); + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_txs) + .network_sender(network_sender) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver); + let service = builder.build().unwrap(); + service.start().await.ok(); + + let mut subscribe = service.subscribe_ch(); + + let (response, receiver) = oneshot::channel(); + let _ = service + .sender() + .send(TxPoolMpsc::Insert { + txs: vec![tx1.clone()], + response, + }) + .await; + let out = receiver.await.unwrap(); + + assert!(out[0].is_ok(), "Tx1 should be OK, got err:{:?}", out); + + // we are sure that included tx are already broadcasted. + assert_eq!( + subscribe.try_recv(), + Ok(TxStatusBroadcast { + tx: tx1.clone(), + status: TxStatus::Submitted, + }), + "First added should be tx1" + ); + + let ret = rx.try_recv().unwrap(); + + if let P2pRequestEvent::BroadcastNewTransaction { transaction } = ret { + assert_eq!(tx1, transaction); + } else { + panic!("Transaction Broadcast Unwrap Failed"); + } + } + + #[tokio::test] + async fn test_insert_from_p2p_does_not_broadcast_to_p2p() { + let config = Config::default(); + let db = Box::new(MockDb::default()); + let (_bs, br) = broadcast::channel(10); + + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, mut network_receiver) = mpsc::channel(100); + let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let tx1 = + TransactionBuilder::script(vec![], vec![]) + .gas_price(10) + .finalize(); + + + let mut builder = ServiceBuilder::new(); + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .network_sender(network_sender) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver); + let service = builder.build().unwrap(); + service.start().await.ok(); + + let broadcast_tx = TransactionBroadcast::NewTransaction(tx1.clone()); + let mut receiver = service.subscribe_ch(); + let res = incoming_tx_sender.send(broadcast_tx).unwrap(); + let _ = receiver.recv().await; + assert_eq!(1, res); + + let ret = network_receiver.try_recv(); + assert!(ret.is_err()); + assert_eq!(Some(TryRecvError::Empty), ret.err()); + } + + #[tokio::test] + async fn test_filter_by_negative() { + let config = Config::default(); + let db = Box::new(MockDb::default()); + let (bs, br) = broadcast::channel(10); + + let (network_sender, _) = mpsc::channel(100); + let (_, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let mut builder = ServiceBuilder::new(); + builder.config(config).db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .network_sender(network_sender) + .import_block_event(bs.subscribe()) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver); + let service = builder.build().unwrap(); service.start().await.ok(); @@ -298,6 +543,12 @@ pub mod tests { let db = Box::new(MockDb::default()); let (_bs, br) = broadcast::channel(10); + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, _) = mpsc::channel(100); + let (_, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) .gas_price(10) @@ -315,7 +566,17 @@ pub mod tests { ); let mut builder = ServiceBuilder::new(); - builder.config(config).db(db).import_block_event(br); + + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .network_sender(network_sender) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver); + let service = builder.build().unwrap(); service.start().await.ok(); @@ -355,6 +616,14 @@ pub mod tests { let db = Box::new(MockDb::default()); let (_bs, br) = broadcast::channel(10); + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, _) = mpsc::channel(100); + let (_, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let db = Box::new(MockDb::default()); + let tx1 = Arc::new( TransactionBuilder::script(vec![], vec![]) .gas_price(10) @@ -367,7 +636,17 @@ pub mod tests { ); let mut builder = ServiceBuilder::new(); - builder.config(config).db(db).import_block_event(br); + + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .network_sender(network_sender) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver); + let service = builder.build().unwrap(); service.start().await.ok(); diff --git a/fuel-txpool/src/txpool.rs b/fuel-txpool/src/txpool.rs index 3520d500cc3..d4c0889f9c4 100644 --- a/fuel-txpool/src/txpool.rs +++ b/fuel-txpool/src/txpool.rs @@ -5,11 +5,12 @@ use crate::{ }; use fuel_core_interfaces::{ model::{ArcTx, TxInfo}, + p2p::P2pRequestEvent, txpool::{TxPoolDb, TxStatus, TxStatusBroadcast}, }; use std::cmp::Reverse; use std::collections::HashMap; -use tokio::sync::{broadcast, RwLock}; +use tokio::sync::{broadcast,mpsc, RwLock}; #[derive(Debug, Clone)] pub struct TxPool { @@ -127,11 +128,52 @@ impl TxPool { Ok(()) } + pub async fn insert_with_broadcast( + txpool: &RwLock, + db: &dyn TxPoolDb, + tx_status_sender: broadcast::Sender, + network_sender: mpsc::Sender, + txs: Vec, + ) -> Vec>> { + let mut res = Vec::new(); + for tx in txs.iter() { + let mut pool = txpool.write().await; + res.push(pool.insert_inner(tx.clone(), db).await) + } + for (ret, tx) in res.iter().zip(txs.into_iter()) { + match ret { + Ok(removed) => { + for removed in removed { + let _ = tx_status_sender.send(TxStatusBroadcast { + tx: removed.clone(), + status: TxStatus::SqueezedOut { + reason: Error::Removed, + }, + }); + } + let _ = tx_status_sender.send(TxStatusBroadcast { + tx: tx.clone(), + status: TxStatus::Submitted, + }); + let _ = network_sender + .send(P2pRequestEvent::BroadcastNewTransaction { + transaction: tx.clone(), + }) + .await; + } + Err(_) => { + // @dev should not broadcast tx if error occurred + } + } + } + res + } + /// Import a set of transactions from network gossip or GraphQL endpoints. pub async fn insert( txpool: &RwLock, db: &dyn TxPoolDb, - broadcast: broadcast::Sender, + tx_status_sender: broadcast::Sender, txs: Vec, ) -> Vec>> { // Check if that data is okay (witness match input/output, and if recovered signatures ara valid). @@ -148,14 +190,14 @@ impl TxPool { for removed in removed { // small todo there is possibility to have removal reason (ReplacedByHigherGas, DependencyRemoved) // but for now it is okay to just use Error::Removed. - let _ = broadcast.send(TxStatusBroadcast { + let _ = tx_status_sender.send(TxStatusBroadcast { tx: removed.clone(), status: TxStatus::SqueezedOut { reason: Error::Removed, }, }); } - let _ = broadcast.send(TxStatusBroadcast { + let _ = tx_status_sender.send(TxStatusBroadcast { tx, status: TxStatus::Submitted, }); From 3ab75251d3a5eca96f13689fd313051018b423e6 Mon Sep 17 00:00:00 2001 From: controlcthenv Date: Sat, 3 Sep 2022 16:22:55 -0600 Subject: [PATCH 02/38] Tests go green --- fuel-core/src/service/modules.rs | 30 +++++++++++++++++++++++++----- fuel-p2p/src/orchestrator.rs | 10 ++++++---- fuel-txpool/README.md | 23 +++++++++++++++++++++++ fuel-txpool/src/txpool.rs | 4 +++- fuel-txpool/src/types.rs | 7 +++---- 5 files changed, 60 insertions(+), 14 deletions(-) diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index 0fc6fcae00c..f37f3bdf349 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -7,9 +7,11 @@ use fuel_core_interfaces::p2p::P2pDb; #[cfg(feature = "relayer")] use fuel_core_interfaces::relayer::RelayerDb; use fuel_core_interfaces::txpool::TxPoolDb; +use fuel_core_interfaces::txpool::Sender; use futures::future::join_all; use std::sync::Arc; use tokio::sync::mpsc; +use tokio::sync::broadcast; use tokio::task::JoinHandle; pub struct Modules { @@ -78,11 +80,6 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result) - .import_block_event(block_importer.subscribe()); - #[cfg(feature = "p2p")] let (tx_request_event, rx_request_event) = mpsc::channel(100); #[cfg(feature = "p2p")] @@ -93,6 +90,29 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result) + .incoming_tx_receiver(incoming_tx_reciever) + .network_sender(tx_request_event.clone()) + .import_block_event(block_importer.subscribe()) + .tx_status_sender(tx_status_sender) + .txpool_sender(Sender::new(txpool_sender)) + .txpool_receiver(txpool_receiver); + block_importer.start().await; block_producer.start(txpool_builder.sender().clone()).await; diff --git a/fuel-p2p/src/orchestrator.rs b/fuel-p2p/src/orchestrator.rs index 5fe84160568..cf7ebad4f04 100644 --- a/fuel-p2p/src/orchestrator.rs +++ b/fuel-p2p/src/orchestrator.rs @@ -5,6 +5,8 @@ use fuel_core_interfaces::p2p::{ BlockBroadcast, ConsensusBroadcast, P2pDb, P2pRequestEvent, TransactionBroadcast, }; +use tokio::sync::broadcast; + use libp2p::request_response::RequestId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; @@ -28,7 +30,7 @@ pub struct NetworkOrchestrator { // senders tx_consensus: Sender, - tx_transaction: Sender, + tx_transaction: broadcast::Sender, tx_block: Sender, tx_outbound_responses: Sender>, @@ -41,7 +43,7 @@ impl NetworkOrchestrator { rx_request_event: Receiver, tx_consensus: Sender, - tx_transaction: Sender, + tx_transaction: broadcast::Sender, tx_block: Sender, db: Arc, @@ -152,7 +154,7 @@ impl Service { tx_request_event: Sender, rx_request_event: Receiver, tx_consensus: Sender, - tx_transaction: Sender, + tx_transaction: broadcast::Sender, tx_block: Sender, ) -> Self { let network_orchestrator = NetworkOrchestrator::new( @@ -241,7 +243,7 @@ pub mod tests { let (tx_request_event, rx_request_event) = tokio::sync::mpsc::channel(100); let (tx_consensus, _) = tokio::sync::mpsc::channel(100); - let (tx_transaction, _) = tokio::sync::mpsc::channel(100); + let (tx_transaction, _) = tokio::sync::broadcast::channel(100); let (tx_block, _) = tokio::sync::mpsc::channel(100); let service = Service::new( diff --git a/fuel-txpool/README.md b/fuel-txpool/README.md index 20be34da987..99d70ff8f0d 100644 --- a/fuel-txpool/README.md +++ b/fuel-txpool/README.md @@ -29,6 +29,29 @@ We will need to have at least three structures that do sorting of tx. * by PriceSort: sorted structure that sort all transaction by GasPrice. Most optimal structure can be `BinaryHeap` for fast sorting/inserting but it does have some downsides when we want to remove one item. For use case when a lot of tx are going to be removed we can just recreate structure from scratch. For first interaction we can use simple `BTreeMap` that sorts all inputs. * by Dependency: With every fuel transaction, inputs and outputs change state and we need to be aware of it. Graph is the main defensive structure against DDoS attack, and every transaction that we include in pool should have potential to be included in next block. The graph represents connections between parent and child txs, where child depends on execution output of parent that is found inside database or transaction pool. +## P2P Integration + +The fuel-txpool integrates with fuel-p2p through the use of 2 channels. Since fuel-txpool uses 6 total channels though each one's use is documented below. + +### GraphQL <-> TxPool Channels + +- tx_status_sender: This channel is used to communicate between GraphQL and the TxPool, primarily for status updates on where a transaction is. + +- txpool_receiver: This channel is used to recieve TxPoolMpsc events from downstream consumers. The main use of this channel is GraphQL whose endpoints send various events to either view or submit tx's to the txpool. + +### P2P <-> TxPool Channels + +- network_sender: This channel is used to communicate to p2p from txpool. Transactions are inserted into the TxPool from the GraphQL endpoint will be broadcasted on this channel. + +- incoming_tx_receiver: This channel is used to recieve new txs from p2p, and is important to how txs propogate through the network. Critically though once this is recieved on this channel it is not broadcasted further. + +- import_block_receiver: This channel is used to recieve blocks, so when new blocks are created they are recieved on this channel. + +### ????? +- txpool_sender: TODO Ask someone on this, but looks like pretty much to just stop the TxPool. + +So the 2 channels needed to communicate with p2p are `network_sender` and `incoming_tx_receiver`. The first way the channels connect is that fuel-p2p's `rx_request_event` is the reciever to txpool's `network_sender` sender channel. The combination of both of these channels allows for transactions recieved on the GraphQL endpoint of the node to be broadcasted on p2p. The next channel combination is txpool's `incoming_tx_receiver` which recieves transactions broadcasted across the network from fuel-p2p's `tx_transaction`. + ### Dependency graph Few reasonings on decision and restrains made by txpool diff --git a/fuel-txpool/src/txpool.rs b/fuel-txpool/src/txpool.rs index d4c0889f9c4..a0998fab117 100644 --- a/fuel-txpool/src/txpool.rs +++ b/fuel-txpool/src/txpool.rs @@ -202,7 +202,9 @@ impl TxPool { status: TxStatus::Submitted, }); } - Err(_) => {} + Err(_) => { + // @dev should not broadcast tx if error occurred + } } } res diff --git a/fuel-txpool/src/types.rs b/fuel-txpool/src/types.rs index b70989d7e22..074545d2e39 100644 --- a/fuel-txpool/src/types.rs +++ b/fuel-txpool/src/types.rs @@ -1,7 +1,6 @@ -use fuel_core_interfaces::common::fuel_types::Word; pub use fuel_core_interfaces::common::{ - fuel_tx::ContractId, - fuel_tx::{Transaction, TxId}, + fuel_tx::{ContractId, Transaction, TxId}, + fuel_types::Word, }; -pub type GasPrice = Word; +pub type GasPrice = Word; \ No newline at end of file From 3cc1ae277b6b01935fb1d151fb5e7ff3fef64a25 Mon Sep 17 00:00:00 2001 From: controlcthenv Date: Sat, 3 Sep 2022 16:27:42 -0600 Subject: [PATCH 03/38] clippy --- fuel-client/src/schema.rs | 1 - fuel-core/src/service/modules.rs | 5 +-- fuel-txpool/src/service.rs | 69 +++++++++++++++----------------- fuel-txpool/src/txpool.rs | 2 +- fuel-txpool/src/types.rs | 2 +- 5 files changed, 37 insertions(+), 42 deletions(-) diff --git a/fuel-client/src/schema.rs b/fuel-client/src/schema.rs index 5c715dcfc24..3a2fcbed93b 100644 --- a/fuel-client/src/schema.rs +++ b/fuel-client/src/schema.rs @@ -1,4 +1,3 @@ -#![allow(clippy::derive_partial_eq_without_eq)] use serde::{Deserialize, Serialize}; include! {concat!(env!("OUT_DIR"), "/../schema.rs")} diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index f37f3bdf349..cc970db3327 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -6,12 +6,12 @@ use anyhow::Result; use fuel_core_interfaces::p2p::P2pDb; #[cfg(feature = "relayer")] use fuel_core_interfaces::relayer::RelayerDb; -use fuel_core_interfaces::txpool::TxPoolDb; use fuel_core_interfaces::txpool::Sender; +use fuel_core_interfaces::txpool::TxPoolDb; use futures::future::join_all; use std::sync::Arc; -use tokio::sync::mpsc; use tokio::sync::broadcast; +use tokio::sync::mpsc; use tokio::task::JoinHandle; pub struct Modules { @@ -90,7 +90,6 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result Date: Sat, 3 Sep 2022 16:57:44 -0600 Subject: [PATCH 04/38] final touches? --- fuel-tests/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fuel-tests/Cargo.toml b/fuel-tests/Cargo.toml index 8c98aee7e57..60eae549cb9 100644 --- a/fuel-tests/Cargo.toml +++ b/fuel-tests/Cargo.toml @@ -20,7 +20,7 @@ harness = true [[test]] name = "metrics_test" path = "tests/metrics.rs" -required-features = ["fuel-core/prometheus", "fuel-core/rocksdb"] +required-features = ["fuel-core/metrics", "fuel-core/rocksdb"] harness = true [dependencies] From beca66daadf88dff2f0c6ee378c345c4be73ee12 Mon Sep 17 00:00:00 2001 From: ControlCplusControlV <44706811+ControlCplusControlV@users.noreply.github.com> Date: Sat, 3 Sep 2022 19:20:54 -0600 Subject: [PATCH 05/38] clippy fix --- fuel-core-interfaces/src/p2p.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fuel-core-interfaces/src/p2p.rs b/fuel-core-interfaces/src/p2p.rs index 3e370bf38d5..e74024dc4a2 100644 --- a/fuel-core-interfaces/src/p2p.rs +++ b/fuel-core-interfaces/src/p2p.rs @@ -5,7 +5,7 @@ use fuel_tx::Transaction; use std::sync::Arc; use tokio::sync::oneshot; -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum TransactionBroadcast { NewTransaction(Transaction), } From 68e21fa80ef3400ebe91ec8f8481013187e3aa36 Mon Sep 17 00:00:00 2001 From: ControlCplusControlV <44706811+ControlCplusControlV@users.noreply.github.com> Date: Sat, 3 Sep 2022 21:22:56 -0600 Subject: [PATCH 06/38] clippy maybe? --- fuel-client/src/schema.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/fuel-client/src/schema.rs b/fuel-client/src/schema.rs index 3a2fcbed93b..5c715dcfc24 100644 --- a/fuel-client/src/schema.rs +++ b/fuel-client/src/schema.rs @@ -1,3 +1,4 @@ +#![allow(clippy::derive_partial_eq_without_eq)] use serde::{Deserialize, Serialize}; include! {concat!(env!("OUT_DIR"), "/../schema.rs")} From 716a3fad3c9ad03d2cfacf30d612fc431a2d1bca Mon Sep 17 00:00:00 2001 From: controlcthenv Date: Fri, 9 Sep 2022 16:52:45 -0600 Subject: [PATCH 07/38] updated fmt --- fuel-core/src/service/modules.rs | 11 ++++++++--- fuel-txpool/src/service.rs | 23 +++++++++++++++++------ fuel-txpool/src/types.rs | 2 +- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index 631409618e7..fafd9f29178 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -8,12 +8,17 @@ use anyhow::Result; use fuel_core_interfaces::p2p::P2pDb; #[cfg(feature = "relayer")] use fuel_core_interfaces::relayer::RelayerDb; -use fuel_core_interfaces::txpool::Sender; -use fuel_core_interfaces::txpool::TxPoolDb; +use fuel_core_interfaces::txpool::{ + Sender, + TxPoolDb, +}; use futures::future::join_all; use std::sync::Arc; use tokio::{ - sync::{mpsc, broadcast}, + sync::{ + broadcast, + mpsc, + }, task::JoinHandle, }; diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index e7c018487f9..4b65daf8e8e 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -5,13 +5,16 @@ use crate::{ use anyhow::anyhow; use fuel_core_interfaces::{ block_importer::ImportBlockBroadcast, + p2p::{ + P2pRequestEvent, + TransactionBroadcast, + }, txpool::{ self, TxPoolDb, TxPoolMpsc, TxStatusBroadcast, }, - p2p::{P2pRequestEvent, TransactionBroadcast}, }; use std::sync::Arc; use tokio::{ @@ -73,7 +76,10 @@ impl ServiceBuilder { self } - pub fn txpool_receiver(&mut self, txpool_receiver: mpsc::Receiver) -> &mut Self { + pub fn txpool_receiver( + &mut self, + txpool_receiver: mpsc::Receiver, + ) -> &mut Self { self.txpool_receiver = Some(txpool_receiver); self } @@ -94,7 +100,10 @@ impl ServiceBuilder { self } - pub fn network_sender(&mut self, network_sender: mpsc::Sender) -> &mut Self { + pub fn network_sender( + &mut self, + network_sender: mpsc::Sender, + ) -> &mut Self { self.network_sender = Some(network_sender); self } @@ -291,11 +300,13 @@ pub mod tests { Sender, TxPoolMpsc, TxStatus, - TxStatusBroadcast - TxStatus, + TxStatusBroadcast, }, }; - use tokio::sync::{mpsc::error::TryRecvError, oneshot}; + use tokio::sync::{ + mpsc::error::TryRecvError, + oneshot, + }; #[tokio::test] async fn test_start_stop() { diff --git a/fuel-txpool/src/types.rs b/fuel-txpool/src/types.rs index dd44a26d13b..597af334fff 100644 --- a/fuel-txpool/src/types.rs +++ b/fuel-txpool/src/types.rs @@ -2,7 +2,7 @@ pub use fuel_core_interfaces::common::{ fuel_tx::{ ContractId, Transaction, - TxId + TxId, }, fuel_types::Word, }; From e248f8c06dcd6c9785d8930ee2d7b919f8c98761 Mon Sep 17 00:00:00 2001 From: controlcthenv Date: Fri, 9 Sep 2022 17:14:43 -0600 Subject: [PATCH 08/38] ci passes? --- fuel-core/src/service/modules.rs | 2 +- fuel-txpool/src/types.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index fafd9f29178..dde05d0765d 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -162,7 +162,7 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result Date: Tue, 20 Sep 2022 19:59:33 -0400 Subject: [PATCH 09/38] Error when tx_receiver channel sender is dropped --- fuel-core/src/service/modules.rs | 19 ++++---------- fuel-txpool/src/service.rs | 45 +++++++++----------------------- 2 files changed, 18 insertions(+), 46 deletions(-) diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index dde05d0765d..3dcaed46678 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -1,24 +1,15 @@ #![allow(clippy::let_unit_value)] -use crate::{ - database::Database, - service::Config, -}; +use crate::{database::Database, service::Config}; use anyhow::Result; #[cfg(feature = "p2p")] use fuel_core_interfaces::p2p::P2pDb; #[cfg(feature = "relayer")] use fuel_core_interfaces::relayer::RelayerDb; -use fuel_core_interfaces::txpool::{ - Sender, - TxPoolDb, -}; +use fuel_core_interfaces::txpool::{Sender, TxPoolDb}; use futures::future::join_all; use std::sync::Arc; use tokio::{ - sync::{ - broadcast, - mpsc, - }, + sync::{broadcast, mpsc}, task::JoinHandle, }; @@ -112,12 +103,12 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result) - .incoming_tx_receiver(incoming_tx_reciever) + .incoming_tx_receiver(incoming_tx_receiver) .network_sender(tx_request_event.clone()) .import_block_event(block_importer.subscribe()) .tx_status_sender(tx_status_sender) diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index 4b65daf8e8e..02c4da3a6c6 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -1,31 +1,16 @@ -use crate::{ - Config, - TxPool, -}; +use crate::{Config, TxPool}; use anyhow::anyhow; use fuel_core_interfaces::{ block_importer::ImportBlockBroadcast, - p2p::{ - P2pRequestEvent, - TransactionBroadcast, - }, - txpool::{ - self, - TxPoolDb, - TxPoolMpsc, - TxStatusBroadcast, - }, + p2p::{P2pRequestEvent, TransactionBroadcast}, + txpool::{self, TxPoolDb, TxPoolMpsc, TxStatusBroadcast}, }; use std::sync::Arc; use tokio::{ - sync::{ - broadcast, - mpsc, - Mutex, - RwLock, - }, + sync::{broadcast, mpsc, Mutex, RwLock}, task::JoinHandle, }; +use tracing::error; pub struct ServiceBuilder { config: Config, @@ -130,7 +115,7 @@ impl ServiceBuilder { || self.tx_status_sender.is_none() || self.txpool_receiver.is_none() { - return Err(anyhow!("One of context items are not set")) + return Err(anyhow!("One of context items are not set")); } let service = Service::new( self.txpool_sender.unwrap(), @@ -166,6 +151,11 @@ impl Context { loop { tokio::select! { new_transaction = self.incoming_tx_receiver.recv() => { + if new_transaction.is_err() { + error!("Incoming tx receiver channel closed unexpectedly; shutting down transaction pool service."); + break; + } + let txpool = txpool.clone(); let db = self.db.clone(); let tx_status_sender = self.tx_status_sender.clone(); @@ -295,18 +285,9 @@ pub mod tests { use crate::MockDb; use fuel_core_interfaces::{ common::fuel_tx::TransactionBuilder, - txpool::{ - Error as TxpoolError, - Sender, - TxPoolMpsc, - TxStatus, - TxStatusBroadcast, - }, - }; - use tokio::sync::{ - mpsc::error::TryRecvError, - oneshot, + txpool::{Error as TxpoolError, Sender, TxPoolMpsc, TxStatus, TxStatusBroadcast}, }; + use tokio::sync::{mpsc::error::TryRecvError, oneshot}; #[tokio::test] async fn test_start_stop() { From e9b8cc4235a92edfc8bbbe6d7a02a45a6a5535e2 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Tue, 20 Sep 2022 20:16:26 -0400 Subject: [PATCH 10/38] Updated fmt --- fuel-core/src/service/modules.rs | 15 +++++++++--- fuel-txpool/src/service.rs | 39 ++++++++++++++++++++++++++------ 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index 3dcaed46678..93388e7ebfc 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -1,15 +1,24 @@ #![allow(clippy::let_unit_value)] -use crate::{database::Database, service::Config}; +use crate::{ + database::Database, + service::Config, +}; use anyhow::Result; #[cfg(feature = "p2p")] use fuel_core_interfaces::p2p::P2pDb; #[cfg(feature = "relayer")] use fuel_core_interfaces::relayer::RelayerDb; -use fuel_core_interfaces::txpool::{Sender, TxPoolDb}; +use fuel_core_interfaces::txpool::{ + Sender, + TxPoolDb, +}; use futures::future::join_all; use std::sync::Arc; use tokio::{ - sync::{broadcast, mpsc}, + sync::{ + broadcast, + mpsc, + }, task::JoinHandle, }; diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index 02c4da3a6c6..102960fae2e 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -1,13 +1,29 @@ -use crate::{Config, TxPool}; +use crate::{ + Config, + TxPool, +}; use anyhow::anyhow; use fuel_core_interfaces::{ block_importer::ImportBlockBroadcast, - p2p::{P2pRequestEvent, TransactionBroadcast}, - txpool::{self, TxPoolDb, TxPoolMpsc, TxStatusBroadcast}, + p2p::{ + P2pRequestEvent, + TransactionBroadcast, + }, + txpool::{ + self, + TxPoolDb, + TxPoolMpsc, + TxStatusBroadcast, + }, }; use std::sync::Arc; use tokio::{ - sync::{broadcast, mpsc, Mutex, RwLock}, + sync::{ + broadcast, + mpsc, + Mutex, + RwLock, + }, task::JoinHandle, }; use tracing::error; @@ -115,7 +131,7 @@ impl ServiceBuilder { || self.tx_status_sender.is_none() || self.txpool_receiver.is_none() { - return Err(anyhow!("One of context items are not set")); + return Err(anyhow!("One of context items are not set")) } let service = Service::new( self.txpool_sender.unwrap(), @@ -285,9 +301,18 @@ pub mod tests { use crate::MockDb; use fuel_core_interfaces::{ common::fuel_tx::TransactionBuilder, - txpool::{Error as TxpoolError, Sender, TxPoolMpsc, TxStatus, TxStatusBroadcast}, + txpool::{ + Error as TxpoolError, + Sender, + TxPoolMpsc, + TxStatus, + TxStatusBroadcast, + }, + }; + use tokio::sync::{ + mpsc::error::TryRecvError, + oneshot, }; - use tokio::sync::{mpsc::error::TryRecvError, oneshot}; #[tokio::test] async fn test_start_stop() { From 06d0e6bf0d5b1ca124d01dbbd3af7cc942fb622e Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Wed, 21 Sep 2022 13:25:42 -0400 Subject: [PATCH 11/38] Spelling --- fuel-core/src/service/modules.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index 93388e7ebfc..f896f898c56 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -103,10 +103,10 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result Date: Thu, 22 Sep 2022 21:29:06 -0400 Subject: [PATCH 12/38] Update modules channel wiring --- fuel-core/src/service/modules.rs | 50 +++++++++++++------------------- fuel-relayer/src/service.rs | 3 -- fuel-tests/Cargo.toml | 3 ++ fuel-txpool/src/service.rs | 9 +++--- 4 files changed, 28 insertions(+), 37 deletions(-) diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index f896f898c56..386ced0fe81 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -63,12 +63,9 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result Result Result) .incoming_tx_receiver(incoming_tx_receiver) - .network_sender(tx_request_event.clone()) + .network_sender(p2p_request_event_sender.clone()) .import_block_event(block_importer.subscribe()) .tx_status_sender(tx_status_sender) .txpool_sender(Sender::new(txpool_sender)) .txpool_receiver(txpool_receiver); + let txpool = txpool_builder.build()?; block_importer.start().await; - - block_producer.start(txpool_builder.sender().clone()).await; + block_producer.start(txpool.sender().clone()).await; bft.start( relayer_sender.clone(), - tx_request_event.clone(), + p2p_request_event_sender.clone(), block_producer.sender().clone(), block_importer.sender().clone(), block_importer.subscribe(), @@ -137,19 +134,14 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result Result = Arc::new(database.clone()); #[cfg(feature = "p2p")] let (tx_consensus, _) = mpsc::channel(100); - #[cfg(feature = "p2p")] - let (tx_transaction, _) = broadcast::channel(100); #[cfg(feature = "p2p")] let network_service = fuel_p2p::orchestrator::Service::new( config.p2p.clone(), p2p_db, - tx_request_event, - rx_request_event, + p2p_request_event_sender, + p2p_request_event_receiver, tx_consensus, - tx_transaction, - tx_block, + incoming_tx_sender, + block_event_sender, ); #[cfg(feature = "p2p")] diff --git a/fuel-relayer/src/service.rs b/fuel-relayer/src/service.rs index 260a9350dea..9bf1877ca65 100644 --- a/fuel-relayer/src/service.rs +++ b/fuel-relayer/src/service.rs @@ -61,9 +61,6 @@ impl ServiceBuilder { config: Default::default(), } } - pub fn sender(&mut self) -> &relayer::Sender { - &self.sender - } pub fn private_key(&mut self, private_key: Vec) -> &mut Self { self.private_key = Some(private_key); diff --git a/fuel-tests/Cargo.toml b/fuel-tests/Cargo.toml index e4bee5adac8..f245897b09e 100644 --- a/fuel-tests/Cargo.toml +++ b/fuel-tests/Cargo.toml @@ -37,3 +37,6 @@ tokio = { version = "1.8", features = ["macros", "rt-multi-thread"] } metrics = ["fuel-core/rocksdb", "fuel-core/metrics"] default = ["fuel-core/default", "metrics"] debug = ["fuel-core-interfaces/debug"] +p2p = ["fuel-core/p2p"] +relayer = ["fuel-core/relayer"] +all-services = ["p2p", "relayer"] diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index 102960fae2e..9d47b767eca 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -274,6 +274,7 @@ impl Service { pub async fn stop(&self) -> Option> { let mut join = self.join.lock().await; let join_handle = join.take(); + if let Some(join_handle) = join_handle { let _ = self.txpool_sender.send(TxPoolMpsc::Stop).await; let context = self.context.clone(); @@ -322,7 +323,7 @@ pub mod tests { // Meant to simulate p2p's channels which hook in to communicate with txpool let (network_sender, _) = mpsc::channel(100); - let (_, incoming_tx_receiver) = broadcast::channel(100); + let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); let (tx_status_sender, _) = broadcast::channel(100); let (txpool_sender, txpool_receiver) = Sender::channel(100); @@ -512,7 +513,7 @@ pub mod tests { let (bs, _br) = broadcast::channel(10); let (network_sender, _) = mpsc::channel(100); - let (_, incoming_tx_receiver) = broadcast::channel(100); + let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); let (tx_status_sender, _) = broadcast::channel(100); let (txpool_sender, txpool_receiver) = Sender::channel(100); @@ -583,7 +584,7 @@ pub mod tests { // Meant to simulate p2p's channels which hook in to communicate with txpool let (network_sender, _) = mpsc::channel(100); - let (_, incoming_tx_receiver) = broadcast::channel(100); + let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); let (tx_status_sender, _) = broadcast::channel(100); let (txpool_sender, txpool_receiver) = Sender::channel(100); @@ -655,7 +656,7 @@ pub mod tests { // Meant to simulate p2p's channels which hook in to communicate with txpool let (network_sender, _) = mpsc::channel(100); - let (_, incoming_tx_receiver) = broadcast::channel(100); + let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); let (tx_status_sender, _) = broadcast::channel(100); let (txpool_sender, txpool_receiver) = Sender::channel(100); From 386e0733a4b4265fdba7bb929f219c506307ed63 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Fri, 23 Sep 2022 14:18:05 -0400 Subject: [PATCH 13/38] Keep alive for incoming_tx_sender --- fuel-core/src/service/modules.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index 386ced0fe81..f45c4aa2933 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -110,6 +110,12 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result Date: Fri, 23 Sep 2022 15:31:56 -0400 Subject: [PATCH 14/38] Update features config and tests --- fuel-core/Cargo.toml | 2 +- fuel-core/src/service/modules.rs | 90 +++---- fuel-txpool/Cargo.toml | 3 + fuel-txpool/src/service.rs | 402 +++++++++++++++++-------------- 4 files changed, 271 insertions(+), 226 deletions(-) diff --git a/fuel-core/Cargo.toml b/fuel-core/Cargo.toml index 0081039a213..8bb339a30ce 100644 --- a/fuel-core/Cargo.toml +++ b/fuel-core/Cargo.toml @@ -86,6 +86,6 @@ metrics = ["dep:fuel-metrics"] default = ["rocksdb", "metrics", "debug"] debug = ["fuel-core-interfaces/debug"] relayer = ["dep:fuel-relayer"] -p2p = ["dep:fuel-p2p"] +p2p = ["dep:fuel-p2p", "fuel-txpool/p2p" ] # features to enable in production, but increase build times production = ["rocksdb?/jemalloc"] diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index f45c4aa2933..d5f1a381a8a 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -64,22 +64,21 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result) - .import_block_event(block_importer.subscribe()) - .private_key( - hex::decode( - "c6bd905dcac2a0b1c43f574ab6933df14d7ceee0194902bce523ed054e8e798b", - ) - .unwrap(), - ); - - #[cfg(feature = "relayer")] - let relayer = relayer_builder.build()?; + let relayer = { + let mut relayer_builder = fuel_relayer::ServiceBuilder::new(); + relayer_builder + .config(config.relayer.clone()) + .db(Box::new(database.clone()) as Box) + .import_block_event(block_importer.subscribe()) + .private_key( + hex::decode( + "c6bd905dcac2a0b1c43f574ab6933df14d7ceee0194902bce523ed054e8e798b", + ) + .unwrap(), + ); + + relayer_builder.build()? + }; let relayer_sender = { #[cfg(feature = "relayer")] @@ -92,6 +91,14 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result Result = Arc::new(database.clone()); + let (tx_consensus, _) = mpsc::channel(100); + fuel_p2p::orchestrator::Service::new( + config.p2p.clone(), + p2p_db, + p2p_request_event_sender.clone(), + p2p_request_event_receiver, + tx_consensus, + incoming_tx_sender, + block_event_sender, + ) + }; #[cfg(not(feature = "p2p"))] { let keep_alive = Box::new(incoming_tx_sender); @@ -121,13 +134,18 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result) .incoming_tx_receiver(incoming_tx_receiver) - .network_sender(p2p_request_event_sender.clone()) .import_block_event(block_importer.subscribe()) .tx_status_sender(tx_status_sender) .txpool_sender(Sender::new(txpool_sender)) .txpool_receiver(txpool_receiver); + + #[cfg(feature = "p2p")] + txpool_builder.network_sender(p2p_request_event_sender.clone()); + let txpool = txpool_builder.build()?; + // start services + block_importer.start().await; block_producer.start(txpool.sender().clone()).await; bft.start( @@ -148,34 +166,18 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result = Arc::new(database.clone()); - #[cfg(feature = "p2p")] - let (tx_consensus, _) = mpsc::channel(100); - - #[cfg(feature = "p2p")] - let network_service = fuel_p2p::orchestrator::Service::new( - config.p2p.clone(), - p2p_db, - p2p_request_event_sender, - p2p_request_event_receiver, - tx_consensus, - incoming_tx_sender, - block_event_sender, - ); #[cfg(feature = "p2p")] if !config.p2p.network_name.is_empty() { network_service.start().await?; } + txpool.start().await?; + Ok(Modules { txpool: Arc::new(txpool), block_importer: Arc::new(block_importer), diff --git a/fuel-txpool/Cargo.toml b/fuel-txpool/Cargo.toml index 1c28b365b39..01d2eb7997f 100644 --- a/fuel-txpool/Cargo.toml +++ b/fuel-txpool/Cargo.toml @@ -25,3 +25,6 @@ tracing = "0.1" fuel-core-interfaces = { path = "../fuel-core-interfaces", version = "0.10.1", features = [ "test-helpers", ] } + +[features] +p2p = [] diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index 9d47b767eca..e8535170e10 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -5,10 +5,7 @@ use crate::{ use anyhow::anyhow; use fuel_core_interfaces::{ block_importer::ImportBlockBroadcast, - p2p::{ - P2pRequestEvent, - TransactionBroadcast, - }, + p2p::TransactionBroadcast, txpool::{ self, TxPoolDb, @@ -28,15 +25,19 @@ use tokio::{ }; use tracing::error; +#[cfg(feature = "p2p")] +use fuel_core_interfaces::p2p::P2pRequestEvent; + pub struct ServiceBuilder { config: Config, db: Option>, txpool_sender: Option, txpool_receiver: Option>, tx_status_sender: Option>, - network_sender: Option>, import_block_receiver: Option>, incoming_tx_receiver: Option>, + #[cfg(feature = "p2p")] + network_sender: Option>, } impl Default for ServiceBuilder { @@ -53,9 +54,10 @@ impl ServiceBuilder { txpool_sender: None, txpool_receiver: None, tx_status_sender: None, - network_sender: None, import_block_receiver: None, incoming_tx_receiver: None, + #[cfg(feature = "p2p")] + network_sender: None, } } @@ -101,6 +103,7 @@ impl ServiceBuilder { self } + #[cfg(feature = "p2p")] pub fn network_sender( &mut self, network_sender: mpsc::Sender, @@ -126,13 +129,18 @@ impl ServiceBuilder { if self.db.is_none() || self.import_block_receiver.is_none() || self.incoming_tx_receiver.is_none() - || self.network_sender.is_none() || self.txpool_sender.is_none() || self.tx_status_sender.is_none() || self.txpool_receiver.is_none() { return Err(anyhow!("One of context items are not set")) } + + #[cfg(feature = "p2p")] + if self.network_sender.is_none() { + return Err(anyhow!("P2P network sender is not set")) + } + let service = Service::new( self.txpool_sender.unwrap(), self.tx_status_sender.clone().unwrap(), @@ -143,6 +151,7 @@ impl ServiceBuilder { tx_status_sender: self.tx_status_sender.unwrap(), import_block_receiver: self.import_block_receiver.unwrap(), incoming_tx_receiver: self.incoming_tx_receiver.unwrap(), + #[cfg(feature = "p2p")] network_sender: self.network_sender.unwrap(), }, )?; @@ -154,10 +163,11 @@ pub struct Context { pub config: Config, pub db: Arc>, pub txpool_receiver: mpsc::Receiver, - pub network_sender: mpsc::Sender, pub tx_status_sender: broadcast::Sender, pub import_block_receiver: broadcast::Receiver, pub incoming_tx_receiver: broadcast::Receiver, + #[cfg(feature = "p2p")] + pub network_sender: mpsc::Sender, } impl Context { @@ -194,7 +204,11 @@ impl Context { let txpool = txpool.clone(); let db = self.db.clone(); let tx_status_sender = self.tx_status_sender.clone(); + + #[cfg(feature = "p2p")] let network_sender = self.network_sender.clone(); + #[cfg(not(feature = "p2p"))] + let (network_sender, _) = mpsc::channel(100); // This is little bit risky but we can always add semaphore to limit number of requests. tokio::spawn( async move { @@ -310,19 +324,13 @@ pub mod tests { TxStatusBroadcast, }, }; - use tokio::sync::{ - mpsc::error::TryRecvError, - oneshot, - }; + use tokio::sync::oneshot; #[tokio::test] async fn test_start_stop() { let config = Config::default(); let db = Box::new(MockDb::default()); let (bs, _br) = broadcast::channel(10); - - // Meant to simulate p2p's channels which hook in to communicate with txpool - let (network_sender, _) = mpsc::channel(100); let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); let (tx_status_sender, _) = broadcast::channel(100); let (txpool_sender, txpool_receiver) = Sender::channel(100); @@ -332,12 +340,17 @@ pub mod tests { .config(config) .db(db) .incoming_tx_receiver(incoming_tx_receiver) - .network_sender(network_sender) .import_block_event(bs.subscribe()) .tx_status_sender(tx_status_sender) .txpool_sender(txpool_sender) .txpool_receiver(txpool_receiver); + #[cfg(feature = "p2p")] + { + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); + } + let service = builder.build().unwrap(); assert!(service.start().await.is_ok(), "start service"); @@ -352,167 +365,11 @@ pub mod tests { assert!(service.start().await.is_ok(), "Should start again"); } - #[tokio::test] - async fn test_insert_from_p2p() { - let config = Config::default(); - let db = Box::new(MockDb::default()); - let (_bs, br) = broadcast::channel(10); - - // Meant to simulate p2p's channels which hook in to communicate with txpool - let (network_sender, _) = mpsc::channel(100); - let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); - let (tx_status_sender, _) = broadcast::channel(100); - let (txpool_sender, txpool_receiver) = Sender::channel(100); - - let tx1 = TransactionBuilder::script(vec![], vec![]) - .gas_price(10) - .finalize(); - - let mut builder = ServiceBuilder::new(); - builder - .config(config) - .db(db) - .incoming_tx_receiver(incoming_tx_receiver) - .network_sender(network_sender) - .import_block_event(br) - .tx_status_sender(tx_status_sender) - .txpool_sender(txpool_sender) - .txpool_receiver(txpool_receiver); - let service = builder.build().unwrap(); - service.start().await.ok(); - - let broadcast_tx = TransactionBroadcast::NewTransaction(tx1.clone()); - let mut receiver = service.subscribe_ch(); - let res = incoming_tx_sender.send(broadcast_tx).unwrap(); - let _ = receiver.recv().await; - assert_eq!(1, res); - - let (response, receiver) = oneshot::channel(); - let _ = service - .sender() - .send(TxPoolMpsc::Find { - ids: vec![tx1.id()], - response, - }) - .await; - let out = receiver.await.unwrap(); - - let arc_tx1 = Arc::new(tx1); - - assert_eq!(arc_tx1, *out[0].as_ref().unwrap().tx()); - } - - #[tokio::test] - async fn test_insert_from_local_broadcasts_to_p2p() { - let config = Config::default(); - let db = Box::new(MockDb::default()); - let (_bs, br) = broadcast::channel(10); - - // Meant to simulate p2p's channels which hook in to communicate with txpool - let (network_sender, mut rx) = mpsc::channel(100); - let (_stx, incoming_txs) = broadcast::channel(100); - let (tx_status_sender, _) = broadcast::channel(100); - let (txpool_sender, txpool_receiver) = Sender::channel(100); - - let tx1 = Arc::new( - TransactionBuilder::script(vec![], vec![]) - .gas_price(10) - .finalize(), - ); - - let mut builder = ServiceBuilder::new(); - builder - .config(config) - .db(db) - .incoming_tx_receiver(incoming_txs) - .network_sender(network_sender) - .import_block_event(br) - .tx_status_sender(tx_status_sender) - .txpool_sender(txpool_sender) - .txpool_receiver(txpool_receiver); - let service = builder.build().unwrap(); - service.start().await.ok(); - - let mut subscribe = service.subscribe_ch(); - - let (response, receiver) = oneshot::channel(); - let _ = service - .sender() - .send(TxPoolMpsc::Insert { - txs: vec![tx1.clone()], - response, - }) - .await; - let out = receiver.await.unwrap(); - - assert!(out[0].is_ok(), "Tx1 should be OK, got err:{:?}", out); - - // we are sure that included tx are already broadcasted. - assert_eq!( - subscribe.try_recv(), - Ok(TxStatusBroadcast { - tx: tx1.clone(), - status: TxStatus::Submitted, - }), - "First added should be tx1" - ); - - let ret = rx.try_recv().unwrap(); - - if let P2pRequestEvent::BroadcastNewTransaction { transaction } = ret { - assert_eq!(tx1, transaction); - } else { - panic!("Transaction Broadcast Unwrap Failed"); - } - } - - #[tokio::test] - async fn test_insert_from_p2p_does_not_broadcast_to_p2p() { - let config = Config::default(); - let db = Box::new(MockDb::default()); - let (_bs, br) = broadcast::channel(10); - - // Meant to simulate p2p's channels which hook in to communicate with txpool - let (network_sender, mut network_receiver) = mpsc::channel(100); - let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); - let (tx_status_sender, _) = broadcast::channel(100); - let (txpool_sender, txpool_receiver) = Sender::channel(100); - - let tx1 = TransactionBuilder::script(vec![], vec![]) - .gas_price(10) - .finalize(); - - let mut builder = ServiceBuilder::new(); - builder - .config(config) - .db(db) - .incoming_tx_receiver(incoming_tx_receiver) - .network_sender(network_sender) - .import_block_event(br) - .tx_status_sender(tx_status_sender) - .txpool_sender(txpool_sender) - .txpool_receiver(txpool_receiver); - let service = builder.build().unwrap(); - service.start().await.ok(); - - let broadcast_tx = TransactionBroadcast::NewTransaction(tx1.clone()); - let mut receiver = service.subscribe_ch(); - let res = incoming_tx_sender.send(broadcast_tx).unwrap(); - let _ = receiver.recv().await; - assert_eq!(1, res); - - let ret = network_receiver.try_recv(); - assert!(ret.is_err()); - assert_eq!(Some(TryRecvError::Empty), ret.err()); - } - #[tokio::test] async fn test_filter_by_negative() { let config = Config::default(); let db = Box::new(MockDb::default()); let (bs, _br) = broadcast::channel(10); - - let (network_sender, _) = mpsc::channel(100); let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); let (tx_status_sender, _) = broadcast::channel(100); let (txpool_sender, txpool_receiver) = Sender::channel(100); @@ -522,12 +379,17 @@ pub mod tests { .config(config) .db(db) .incoming_tx_receiver(incoming_tx_receiver) - .network_sender(network_sender) .import_block_event(bs.subscribe()) .tx_status_sender(tx_status_sender) .txpool_sender(txpool_sender) .txpool_receiver(txpool_receiver); + #[cfg(feature = "p2p")] + { + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); + } + let service = builder.build().unwrap(); service.start().await.ok(); @@ -581,9 +443,6 @@ pub mod tests { let config = Config::default(); let db = Box::new(MockDb::default()); let (_bs, br) = broadcast::channel(10); - - // Meant to simulate p2p's channels which hook in to communicate with txpool - let (network_sender, _) = mpsc::channel(100); let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); let (tx_status_sender, _) = broadcast::channel(100); let (txpool_sender, txpool_receiver) = Sender::channel(100); @@ -610,12 +469,17 @@ pub mod tests { .config(config) .db(db) .incoming_tx_receiver(incoming_tx_receiver) - .network_sender(network_sender) .import_block_event(br) .tx_status_sender(tx_status_sender) .txpool_sender(txpool_sender) .txpool_receiver(txpool_receiver); + #[cfg(feature = "p2p")] + { + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); + } + let service = builder.build().unwrap(); service.start().await.ok(); @@ -653,9 +517,6 @@ pub mod tests { async fn simple_insert_removal_subscription() { let config = Config::default(); let (_bs, br) = broadcast::channel(10); - - // Meant to simulate p2p's channels which hook in to communicate with txpool - let (network_sender, _) = mpsc::channel(100); let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); let (tx_status_sender, _) = broadcast::channel(100); let (txpool_sender, txpool_receiver) = Sender::channel(100); @@ -679,12 +540,17 @@ pub mod tests { .config(config) .db(db) .incoming_tx_receiver(incoming_tx_receiver) - .network_sender(network_sender) .import_block_event(br) .tx_status_sender(tx_status_sender) .txpool_sender(txpool_sender) .txpool_receiver(txpool_receiver); + #[cfg(feature = "p2p")] + { + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); + } + let service = builder.build().unwrap(); service.start().await.ok(); @@ -757,3 +623,177 @@ pub mod tests { ); } } + +#[cfg(all(test, feature = "p2p"))] +pub mod tests_p2p { + use super::*; + use crate::MockDb; + use fuel_core_interfaces::{ + common::fuel_tx::TransactionBuilder, + txpool::{ + Sender, + TxPoolMpsc, + TxStatus, + TxStatusBroadcast, + }, + }; + use tokio::sync::{ + mpsc::error::TryRecvError, + oneshot, + }; + + #[tokio::test] + async fn test_insert_from_p2p() { + let config = Config::default(); + let db = Box::new(MockDb::default()); + let (_bs, br) = broadcast::channel(10); + + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, _) = mpsc::channel(100); + let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let tx1 = TransactionBuilder::script(vec![], vec![]) + .gas_price(10) + .finalize(); + + let mut builder = ServiceBuilder::new(); + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver) + .network_sender(network_sender); + + let service = builder.build().unwrap(); + service.start().await.ok(); + + let broadcast_tx = TransactionBroadcast::NewTransaction(tx1.clone()); + let mut receiver = service.subscribe_ch(); + let res = incoming_tx_sender.send(broadcast_tx).unwrap(); + let _ = receiver.recv().await; + assert_eq!(1, res); + + let (response, receiver) = oneshot::channel(); + let _ = service + .sender() + .send(TxPoolMpsc::Find { + ids: vec![tx1.id()], + response, + }) + .await; + let out = receiver.await.unwrap(); + + let arc_tx1 = Arc::new(tx1); + + assert_eq!(arc_tx1, *out[0].as_ref().unwrap().tx()); + } + + #[tokio::test] + async fn test_insert_from_local_broadcasts_to_p2p() { + let config = Config::default(); + let db = Box::new(MockDb::default()); + let (_bs, br) = broadcast::channel(10); + + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, mut rx) = mpsc::channel(100); + let (_stx, incoming_txs) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let tx1 = Arc::new( + TransactionBuilder::script(vec![], vec![]) + .gas_price(10) + .finalize(), + ); + + let mut builder = ServiceBuilder::new(); + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_txs) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver) + .network_sender(network_sender); + let service = builder.build().unwrap(); + service.start().await.ok(); + + let mut subscribe = service.subscribe_ch(); + + let (response, receiver) = oneshot::channel(); + let _ = service + .sender() + .send(TxPoolMpsc::Insert { + txs: vec![tx1.clone()], + response, + }) + .await; + let out = receiver.await.unwrap(); + + assert!(out[0].is_ok(), "Tx1 should be OK, got err:{:?}", out); + + // we are sure that included tx are already broadcasted. + assert_eq!( + subscribe.try_recv(), + Ok(TxStatusBroadcast { + tx: tx1.clone(), + status: TxStatus::Submitted, + }), + "First added should be tx1" + ); + + let ret = rx.try_recv().unwrap(); + + if let P2pRequestEvent::BroadcastNewTransaction { transaction } = ret { + assert_eq!(tx1, transaction); + } else { + panic!("Transaction Broadcast Unwrap Failed"); + } + } + + #[tokio::test] + async fn test_insert_from_p2p_does_not_broadcast_to_p2p() { + let config = Config::default(); + let db = Box::new(MockDb::default()); + let (_bs, br) = broadcast::channel(10); + + // Meant to simulate p2p's channels which hook in to communicate with txpool + let (network_sender, mut network_receiver) = mpsc::channel(100); + let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + let (tx_status_sender, _) = broadcast::channel(100); + let (txpool_sender, txpool_receiver) = Sender::channel(100); + + let tx1 = TransactionBuilder::script(vec![], vec![]) + .gas_price(10) + .finalize(); + + let mut builder = ServiceBuilder::new(); + builder + .config(config) + .db(db) + .incoming_tx_receiver(incoming_tx_receiver) + .import_block_event(br) + .tx_status_sender(tx_status_sender) + .txpool_sender(txpool_sender) + .txpool_receiver(txpool_receiver) + .network_sender(network_sender); + let service = builder.build().unwrap(); + service.start().await.ok(); + + let broadcast_tx = TransactionBroadcast::NewTransaction(tx1.clone()); + let mut receiver = service.subscribe_ch(); + let res = incoming_tx_sender.send(broadcast_tx).unwrap(); + let _ = receiver.recv().await; + assert_eq!(1, res); + + let ret = network_receiver.try_recv(); + assert!(ret.is_err()); + assert_eq!(Some(TryRecvError::Empty), ret.err()); + } +} From 41669d335d57993c2fe2fdaf63021512d4ab5c3f Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Fri, 23 Sep 2022 15:38:19 -0400 Subject: [PATCH 15/38] Revert unrelated change --- fuel-txpool/src/service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index e8535170e10..599d1dc4624 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -369,7 +369,7 @@ pub mod tests { async fn test_filter_by_negative() { let config = Config::default(); let db = Box::new(MockDb::default()); - let (bs, _br) = broadcast::channel(10); + let (_bs, br) = broadcast::channel(10); let (_incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); let (tx_status_sender, _) = broadcast::channel(100); let (txpool_sender, txpool_receiver) = Sender::channel(100); @@ -379,7 +379,7 @@ pub mod tests { .config(config) .db(db) .incoming_tx_receiver(incoming_tx_receiver) - .import_block_event(bs.subscribe()) + .import_block_event(br) .tx_status_sender(tx_status_sender) .txpool_sender(txpool_sender) .txpool_receiver(txpool_receiver); From 03300f859c21c0300e47da45d586e970d1d77bae Mon Sep 17 00:00:00 2001 From: ControlCplusControlV <44706811+ControlCplusControlV@users.noreply.github.com> Date: Sat, 24 Sep 2022 11:49:22 -0600 Subject: [PATCH 16/38] test issues now --- fuel-block-producer/tests/integration.rs | 31 +++++++++++++++++++++--- fuel-core/src/service/modules.rs | 8 +++--- fuel-txpool/src/service.rs | 16 ++++++------ 3 files changed, 41 insertions(+), 14 deletions(-) diff --git a/fuel-block-producer/tests/integration.rs b/fuel-block-producer/tests/integration.rs index d0e3fc44d91..5bd0db27cd4 100644 --- a/fuel-block-producer/tests/integration.rs +++ b/fuel-block-producer/tests/integration.rs @@ -37,8 +37,10 @@ use fuel_core_interfaces::{ Coin, CoinStatus, }, + txpool::Sender as TxPoolSender, }; use fuel_txpool::{ + Config as TxPoolConfig, MockDb as TxPoolDb, ServiceBuilder as TxPoolServiceBuilder, }; @@ -48,7 +50,10 @@ use rand::{ SeedableRng, }; use std::sync::Arc; -use tokio::sync::broadcast; +use tokio::sync::{ + broadcast, + mpsc, +}; const COIN_AMOUNT: u64 = 1_000_000_000; @@ -108,8 +113,25 @@ async fn block_producer() -> Result<()> { let (import_block_events_tx, import_block_events_rx) = broadcast::channel(16); let mut txpool_builder = TxPoolServiceBuilder::new(); - txpool_builder.db(Box::new(txpool_db)); - txpool_builder.import_block_event(import_block_events_rx); + txpool_builder + .db(Box::new(txpool_db)) + .import_block_event(import_block_events_rx) + .config(TxPoolConfig::default()); + + let (txpool_sender, txpool_receiver) = mpsc::channel(100); + let (_, incoming_tx_receiver) = broadcast::channel(100); + + let (tx_status_sender, mut tx_status_receiver) = broadcast::channel(100); + + // Remove once tx_status events are used + tokio::spawn(async move { while (tx_status_receiver.recv().await).is_ok() {} }); + + txpool_builder + .incoming_tx_receiver(incoming_tx_receiver) + .tx_status_sender(tx_status_sender) + .txpool_sender(TxPoolSender::new(txpool_sender)) + .txpool_receiver(txpool_receiver); + let txpool = txpool_builder.build().unwrap(); txpool.start().await?; @@ -139,6 +161,7 @@ async fn block_producer() -> Result<()> { (txsize + small_limit) * 2 < max_gas_per_block, "Incorrect test: no space in block" ); + let limit2_takes_whole_block = max_gas_per_block.checked_sub(txsize).unwrap(); let gas_prices = [10, 20, 15]; let results: Vec<_> = txpool @@ -209,6 +232,8 @@ async fn block_producer() -> Result<()> { // Check that the generated block looks right assert_eq!(generated_block.transactions.len(), 0); + println!("Made it here"); + Ok(()) } diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index 28989118b40..e2a4a3bc8ca 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -15,9 +15,9 @@ use fuel_core_interfaces::{ FuelBlock, }, txpool::{ - Sender, - TxPoolDb, - } + Sender, + TxPoolDb, + }, }; use futures::future::join_all; use std::sync::Arc; @@ -153,7 +153,7 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result { if new_transaction.is_err() { + println!("Wack"); error!("Incoming tx receiver channel closed unexpectedly; shutting down transaction pool service."); break; } @@ -186,11 +187,13 @@ impl Context { let db = self.db.clone(); let tx_status_sender = self.tx_status_sender.clone(); + println!("Progressing to move"); tokio::spawn( async move { let txpool = txpool.as_ref(); match new_transaction.unwrap() { TransactionBroadcast::NewTransaction ( tx ) => { let txs = vec!(Arc::new(tx)); + println!("Progressing to insert"); TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender, txs).await } } @@ -238,19 +241,18 @@ impl Context { TxPoolMpsc::Stop => {} }}); } - - block_updated = self.import_block_events.recv() => { + block_updated = self.import_block_receiver.recv() => { if let Ok(block_updated) = block_updated { - match block_updated { ImportBlockBroadcast::PendingFuelBlockImported { block } => { let txpool = txpool.clone(); - TxPool::block_update(txpool.as_ref(), block).await - tokio::spawn( async move { - TxPool::block_update(txpool.as_ref(), block).await - }); + TxPool::block_update(txpool.as_ref(), block).await; + // TODO : Not working well + // tokio::spawn( async move { + // TxPool::block_update(txpool.as_ref(), block).await + // }); }, ImportBlockBroadcast::SealedFuelBlockImported { block: _, is_created_by_self: _ } => { // TODO: what to do with sealed blocks? From 58f46542794479506a912b08195c8498d36a077c Mon Sep 17 00:00:00 2001 From: ControlCplusControlV <44706811+ControlCplusControlV@users.noreply.github.com> Date: Sat, 24 Sep 2022 12:43:40 -0600 Subject: [PATCH 17/38] fixes --- fuel-block-producer/tests/integration.rs | 52 +++++++++++++++++------- fuel-txpool/src/service.rs | 1 - 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/fuel-block-producer/tests/integration.rs b/fuel-block-producer/tests/integration.rs index 5bd0db27cd4..652973b6170 100644 --- a/fuel-block-producer/tests/integration.rs +++ b/fuel-block-producer/tests/integration.rs @@ -37,7 +37,9 @@ use fuel_core_interfaces::{ Coin, CoinStatus, }, - txpool::Sender as TxPoolSender, + txpool::{ + Sender as TxPoolSender, + } }; use fuel_txpool::{ Config as TxPoolConfig, @@ -50,10 +52,7 @@ use rand::{ SeedableRng, }; use std::sync::Arc; -use tokio::sync::{ - broadcast, - mpsc, -}; +use tokio::sync::{ broadcast, mpsc}; const COIN_AMOUNT: u64 = 1_000_000_000; @@ -112,22 +111,48 @@ async fn block_producer() -> Result<()> { let (import_block_events_tx, import_block_events_rx) = broadcast::channel(16); + let mut txpool_builder = TxPoolServiceBuilder::new(); - txpool_builder - .db(Box::new(txpool_db)) - .import_block_event(import_block_events_rx) - .config(TxPoolConfig::default()); - - let (txpool_sender, txpool_receiver) = mpsc::channel(100); - let (_, incoming_tx_receiver) = broadcast::channel(100); let (tx_status_sender, mut tx_status_receiver) = broadcast::channel(100); // Remove once tx_status events are used tokio::spawn(async move { while (tx_status_receiver.recv().await).is_ok() {} }); + let (txpool_sender, txpool_receiver) = mpsc::channel(100); + let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); + + + #[cfg(feature = "p2p")] + let (p2p_request_event_sender, p2p_request_event_receiver) = mpsc::channel(100); + #[cfg(feature = "p2p")] + let (block_event_sender, block_event_receiver) = mpsc::channel(100); + + #[cfg(feature = "p2p")] + let network_service = { + let p2p_db: Arc = Arc::new(database.clone()); + let (tx_consensus, _) = mpsc::channel(100); + fuel_p2p::orchestrator::Service::new( + config.p2p.clone(), + p2p_db, + p2p_request_event_sender.clone(), + p2p_request_event_receiver, + tx_consensus, + incoming_tx_sender, + block_event_sender, + ) + }; + #[cfg(not(feature = "p2p"))] + { + let keep_alive = Box::new(incoming_tx_sender); + Box::leak(keep_alive); + } + txpool_builder + .config(TxPoolConfig::default()) + .db(Box::new(txpool_db)) .incoming_tx_receiver(incoming_tx_receiver) + .import_block_event(import_block_events_rx) .tx_status_sender(tx_status_sender) .txpool_sender(TxPoolSender::new(txpool_sender)) .txpool_receiver(txpool_receiver); @@ -161,7 +186,6 @@ async fn block_producer() -> Result<()> { (txsize + small_limit) * 2 < max_gas_per_block, "Incorrect test: no space in block" ); - let limit2_takes_whole_block = max_gas_per_block.checked_sub(txsize).unwrap(); let gas_prices = [10, 20, 15]; let results: Vec<_> = txpool @@ -232,8 +256,6 @@ async fn block_producer() -> Result<()> { // Check that the generated block looks right assert_eq!(generated_block.transactions.len(), 0); - println!("Made it here"); - Ok(()) } diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index 260dcc686c8..2d080e030db 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -178,7 +178,6 @@ impl Context { tokio::select! { new_transaction = self.incoming_tx_receiver.recv() => { if new_transaction.is_err() { - println!("Wack"); error!("Incoming tx receiver channel closed unexpectedly; shutting down transaction pool service."); break; } From 84f0e18898868060107c1d979029b9a503135a99 Mon Sep 17 00:00:00 2001 From: ControlCplusControlV <44706811+ControlCplusControlV@users.noreply.github.com> Date: Sat, 24 Sep 2022 12:45:20 -0600 Subject: [PATCH 18/38] fmt --- fuel-block-producer/tests/integration.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/fuel-block-producer/tests/integration.rs b/fuel-block-producer/tests/integration.rs index 652973b6170..268e81604cc 100644 --- a/fuel-block-producer/tests/integration.rs +++ b/fuel-block-producer/tests/integration.rs @@ -37,9 +37,7 @@ use fuel_core_interfaces::{ Coin, CoinStatus, }, - txpool::{ - Sender as TxPoolSender, - } + txpool::Sender as TxPoolSender, }; use fuel_txpool::{ Config as TxPoolConfig, @@ -52,7 +50,10 @@ use rand::{ SeedableRng, }; use std::sync::Arc; -use tokio::sync::{ broadcast, mpsc}; +use tokio::sync::{ + broadcast, + mpsc, +}; const COIN_AMOUNT: u64 = 1_000_000_000; @@ -111,7 +112,6 @@ async fn block_producer() -> Result<()> { let (import_block_events_tx, import_block_events_rx) = broadcast::channel(16); - let mut txpool_builder = TxPoolServiceBuilder::new(); let (tx_status_sender, mut tx_status_receiver) = broadcast::channel(100); @@ -122,7 +122,6 @@ async fn block_producer() -> Result<()> { let (txpool_sender, txpool_receiver) = mpsc::channel(100); let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); - #[cfg(feature = "p2p")] let (p2p_request_event_sender, p2p_request_event_receiver) = mpsc::channel(100); #[cfg(feature = "p2p")] From a081d969a4599c2d26acf9aa70c6bde1dce69230 Mon Sep 17 00:00:00 2001 From: ControlCplusControlV <44706811+ControlCplusControlV@users.noreply.github.com> Date: Sat, 24 Sep 2022 14:07:51 -0600 Subject: [PATCH 19/38] was it that easy? --- fuel-block-producer/Cargo.toml | 2 +- fuel-block-producer/tests/integration.rs | 31 ++++++------------------ fuel-txpool/Cargo.toml | 2 +- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/fuel-block-producer/Cargo.toml b/fuel-block-producer/Cargo.toml index 7490da45b94..f2d9cfb0f7e 100644 --- a/fuel-block-producer/Cargo.toml +++ b/fuel-block-producer/Cargo.toml @@ -26,5 +26,5 @@ rand = "0.8" rstest = "0.15" [features] +p2p = ["fuel-txpool/p2p"] test-helpers = ["fuel-core-interfaces/test-helpers"] - diff --git a/fuel-block-producer/tests/integration.rs b/fuel-block-producer/tests/integration.rs index 268e81604cc..0a78906a228 100644 --- a/fuel-block-producer/tests/integration.rs +++ b/fuel-block-producer/tests/integration.rs @@ -122,30 +122,9 @@ async fn block_producer() -> Result<()> { let (txpool_sender, txpool_receiver) = mpsc::channel(100); let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); - #[cfg(feature = "p2p")] - let (p2p_request_event_sender, p2p_request_event_receiver) = mpsc::channel(100); - #[cfg(feature = "p2p")] - let (block_event_sender, block_event_receiver) = mpsc::channel(100); - #[cfg(feature = "p2p")] - let network_service = { - let p2p_db: Arc = Arc::new(database.clone()); - let (tx_consensus, _) = mpsc::channel(100); - fuel_p2p::orchestrator::Service::new( - config.p2p.clone(), - p2p_db, - p2p_request_event_sender.clone(), - p2p_request_event_receiver, - tx_consensus, - incoming_tx_sender, - block_event_sender, - ) - }; - #[cfg(not(feature = "p2p"))] - { - let keep_alive = Box::new(incoming_tx_sender); - Box::leak(keep_alive); - } + let keep_alive = Box::new(incoming_tx_sender); + Box::leak(keep_alive); txpool_builder .config(TxPoolConfig::default()) @@ -156,6 +135,12 @@ async fn block_producer() -> Result<()> { .txpool_sender(TxPoolSender::new(txpool_sender)) .txpool_receiver(txpool_receiver); + #[cfg(feature = "p2p")] + let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100); + #[cfg(feature = "p2p")] + txpool_builder.network_sender(p2p_request_event_sender.clone()); + + let txpool = txpool_builder.build().unwrap(); txpool.start().await?; diff --git a/fuel-txpool/Cargo.toml b/fuel-txpool/Cargo.toml index 3a0fc9d337c..447bbf4a663 100644 --- a/fuel-txpool/Cargo.toml +++ b/fuel-txpool/Cargo.toml @@ -27,5 +27,5 @@ fuel-core-interfaces = { path = "../fuel-core-interfaces", version = "0.10.1", f ] } [features] +p2p = [] test-helpers = ["fuel-core-interfaces/test-helpers"] -p2p= [] From a6c942c86c24ebf2e1b240064807e5df2d1f5b80 Mon Sep 17 00:00:00 2001 From: ControlCplusControlV <44706811+ControlCplusControlV@users.noreply.github.com> Date: Sat, 24 Sep 2022 14:08:26 -0600 Subject: [PATCH 20/38] oops --- fuel-block-producer/tests/integration.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/fuel-block-producer/tests/integration.rs b/fuel-block-producer/tests/integration.rs index 0a78906a228..d8e25751a74 100644 --- a/fuel-block-producer/tests/integration.rs +++ b/fuel-block-producer/tests/integration.rs @@ -122,7 +122,6 @@ async fn block_producer() -> Result<()> { let (txpool_sender, txpool_receiver) = mpsc::channel(100); let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100); - let keep_alive = Box::new(incoming_tx_sender); Box::leak(keep_alive); @@ -139,7 +138,6 @@ async fn block_producer() -> Result<()> { let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100); #[cfg(feature = "p2p")] txpool_builder.network_sender(p2p_request_event_sender.clone()); - let txpool = txpool_builder.build().unwrap(); txpool.start().await?; From f0ea4bad870c843d766aa63fa8b1e6aa27d9d4f7 Mon Sep 17 00:00:00 2001 From: ControlCplusControlV <44706811+ControlCplusControlV@users.noreply.github.com> Date: Sun, 25 Sep 2022 15:54:54 -0600 Subject: [PATCH 21/38] New testcase for tx gossip --- fuel-tests/tests/lib.rs | 1 + fuel-tests/tests/tx_gossip.rs | 74 +++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 fuel-tests/tests/tx_gossip.rs diff --git a/fuel-tests/tests/lib.rs b/fuel-tests/tests/lib.rs index 91ea7cb592a..c42b08bb776 100644 --- a/fuel-tests/tests/lib.rs +++ b/fuel-tests/tests/lib.rs @@ -6,6 +6,7 @@ mod contract; mod dap; mod debugger; mod health; +mod tx_gossip; mod helpers; mod messages; #[cfg(feature = "metrics")] diff --git a/fuel-tests/tests/tx_gossip.rs b/fuel-tests/tests/tx_gossip.rs new file mode 100644 index 00000000000..9100475e0a1 --- /dev/null +++ b/fuel-tests/tests/tx_gossip.rs @@ -0,0 +1,74 @@ +use crate::helpers::TestContext; +use chrono::Utc; +use fuel_core::{ + database::Database, + executor::Executor, + model::{ + FuelBlock, + FuelBlockHeader, + }, + service::{ + Config, + FuelService, + }, +}; +use fuel_core_interfaces::{ + common::{ + fuel_tx, + fuel_vm::{ + consts::*, + prelude::*, + }, + }, + executor::ExecutionMode, +}; +use fuel_gql_client::client::{ + types::TransactionStatus, + FuelClient, + PageDirection, + PaginationRequest, +}; + +#[tokio::test] +async fn test_tx_gossiping() { + let node_config = Config::local_node(); + let node_one = FuelService::new_node(node_config.clone()).await.unwrap(); + let client_one = FuelClient::from(node_one.bound_address); + + let node_two = FuelService::new_node(node_config).await.unwrap(); + let client_two = FuelClient::from(node_two.bound_address); + + let gas_price = 0; + let gas_limit = 1_000_000; + let maturity = 0; + + let script = vec![ + Opcode::ADDI(0x10, REG_ZERO, 0xca), + Opcode::ADDI(0x11, REG_ZERO, 0xba), + Opcode::LOG(0x10, 0x11, REG_ZERO, REG_ZERO), + Opcode::RET(REG_ONE), + ]; + let script: Vec = script + .iter() + .flat_map(|op| u32::from(*op).to_be_bytes()) + .collect(); + + let tx = fuel_tx::Transaction::script( + gas_price, + gas_limit, + maturity, + script, + vec![], + vec![], + vec![], + vec![], + ); + + let result = client_one.submit(&tx).await.unwrap(); + + // Perhaps some delay is needed before this query? + + let tx = client_two.transaction(&result.0.to_string()).await.unwrap(); + + assert!(tx.is_some()); +} \ No newline at end of file From 68c780f7e61127cbfa61cd12b5f5ad22160650f6 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Mon, 26 Sep 2022 12:12:25 -0400 Subject: [PATCH 22/38] Fmt --- fuel-tests/tests/lib.rs | 2 +- fuel-tests/tests/tx_gossip.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fuel-tests/tests/lib.rs b/fuel-tests/tests/lib.rs index c42b08bb776..612610846ff 100644 --- a/fuel-tests/tests/lib.rs +++ b/fuel-tests/tests/lib.rs @@ -6,7 +6,6 @@ mod contract; mod dap; mod debugger; mod health; -mod tx_gossip; mod helpers; mod messages; #[cfg(feature = "metrics")] @@ -15,3 +14,4 @@ mod node_info; mod resource; mod snapshot; mod tx; +mod tx_gossip; diff --git a/fuel-tests/tests/tx_gossip.rs b/fuel-tests/tests/tx_gossip.rs index 9100475e0a1..3211aa70e43 100644 --- a/fuel-tests/tests/tx_gossip.rs +++ b/fuel-tests/tests/tx_gossip.rs @@ -71,4 +71,4 @@ async fn test_tx_gossiping() { let tx = client_two.transaction(&result.0.to_string()).await.unwrap(); assert!(tx.is_some()); -} \ No newline at end of file +} From de0e1543d460ab7a232a3a6fcc346f13d66eba5c Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Mon, 26 Sep 2022 18:06:28 -0400 Subject: [PATCH 23/38] Test p2p connection and integ --- fuel-core/Cargo.toml | 2 +- fuel-core/src/lib.rs | 3 + fuel-p2p/src/peer_info.rs | 1 + fuel-tests/tests/tx/utxo_validation.rs | 115 +++++++++++++++++++++++++ fuel-tests/tests/tx_gossip.rs | 105 +++++++++++----------- fuel-txpool/src/txpool.rs | 1 + 6 files changed, 169 insertions(+), 58 deletions(-) diff --git a/fuel-core/Cargo.toml b/fuel-core/Cargo.toml index 3ba08434bc5..52d255c5853 100644 --- a/fuel-core/Cargo.toml +++ b/fuel-core/Cargo.toml @@ -87,6 +87,6 @@ metrics = ["dep:fuel-metrics"] default = ["rocksdb", "metrics", "debug"] debug = ["fuel-core-interfaces/debug"] relayer = ["dep:fuel-relayer"] -p2p = ["dep:fuel-p2p", "fuel-txpool/p2p" ] +p2p = ["dep:fuel-p2p", "fuel-txpool/p2p", "fuel-block-producer/p2p"] # features to enable in production, but increase build times production = ["rocksdb?/jemalloc"] diff --git a/fuel-core/src/lib.rs b/fuel-core/src/lib.rs index cec5e80f02b..20b7a261cdd 100644 --- a/fuel-core/src/lib.rs +++ b/fuel-core/src/lib.rs @@ -7,3 +7,6 @@ pub mod schema; pub mod service; pub mod state; pub mod tx_pool; + +#[cfg(feature = "p2p")] +pub use fuel_p2p; diff --git a/fuel-p2p/src/peer_info.rs b/fuel-p2p/src/peer_info.rs index b16ce5091f1..bf7b1f06b66 100644 --- a/fuel-p2p/src/peer_info.rs +++ b/fuel-p2p/src/peer_info.rs @@ -145,6 +145,7 @@ impl PeerInfoBehaviour { } _ => { self.peers.insert(*peer_id, PeerInfo::new(connected_point)); + dbg!(&peer_id); } } } diff --git a/fuel-tests/tests/tx/utxo_validation.rs b/fuel-tests/tests/tx/utxo_validation.rs index 1171905f1ed..a39ba035272 100644 --- a/fuel-tests/tests/tx/utxo_validation.rs +++ b/fuel-tests/tests/tx/utxo_validation.rs @@ -26,6 +26,121 @@ use rand::{ }; use std::collections::HashSet; +#[cfg(feature = "p2p")] +mod gossip_tests { + use fuel_core::{ + chain_config::{ + CoinConfig, + StateConfig, + }, + fuel_p2p::config::{ + convert_to_libp2p_keypair, + P2PConfig, + }, + service::{ + Config, + FuelService, + }, + }; + use fuel_core_interfaces::common::{ + fuel_tx, + fuel_tx::TransactionBuilder, + fuel_vm::{ + consts::*, + prelude::*, + }, + }; + use fuel_gql_client::client::FuelClient; + use rand::{ + rngs::StdRng, + Rng, + SeedableRng, + }; + use std::time::Duration; + + #[tokio::test] + async fn test_tx_gossiping() { + let mut rng = StdRng::seed_from_u64(2322); + + let mut node_config = Config::local_node(); + node_config.utxo_validation = true; + + let tx = TransactionBuilder::script( + Opcode::RET(REG_ONE).to_bytes().into_iter().collect(), + vec![], + ) + .gas_limit(100) + .gas_price(1) + .add_unsigned_coin_input( + SecretKey::random(&mut rng), + rng.gen(), + 1000, + Default::default(), + Default::default(), + 0, + ) + .add_output(Output::Change { + amount: 0, + asset_id: Default::default(), + to: rng.gen(), + }) + .finalize(); + + if let Input::CoinSigned { + amount, + owner, + asset_id, + utxo_id, + .. + } + | Input::CoinPredicate { + amount, + owner, + asset_id, + utxo_id, + .. + } = tx.inputs()[0] + { + let mut initial_state = StateConfig::default(); + let coin_config = Some(vec![CoinConfig { + tx_id: Some(*utxo_id.tx_id()), + output_index: Some(utxo_id.output_index() as u64), + block_created: None, + maturity: None, + owner, + amount, + asset_id, + }]); + initial_state.coins = coin_config; + node_config.chain_conf.initial_state = Some(initial_state); + }; + + node_config.p2p.enable_mdns = true; + + let node_one = FuelService::new_node(node_config.clone()).await.unwrap(); + let client_one = FuelClient::from(node_one.bound_address); + + let secret_key = rng.gen::(); + node_config.p2p.local_keypair = convert_to_libp2p_keypair(secret_key).unwrap(); + let node_two = FuelService::new_node(node_config.clone()).await.unwrap(); + let client_two = FuelClient::from(node_two.bound_address); + + tokio::time::sleep(Duration::new(3, 0)).await; + + let result = client_one.submit(&tx).await.unwrap(); + + let tx = client_one.transaction(&result.0.to_string()).await.unwrap(); + assert!(tx.is_some()); + + // Perhaps some delay is needed before this query? + + tokio::time::sleep(Duration::new(3, 0)).await; + + let tx = client_two.transaction(&result.0.to_string()).await.unwrap(); + assert!(tx.is_some()); + } +} + #[tokio::test] async fn submit_utxo_verified_tx_with_min_gas_price() { let mut rng = StdRng::seed_from_u64(2322); diff --git a/fuel-tests/tests/tx_gossip.rs b/fuel-tests/tests/tx_gossip.rs index 3211aa70e43..7036abcf6ed 100644 --- a/fuel-tests/tests/tx_gossip.rs +++ b/fuel-tests/tests/tx_gossip.rs @@ -1,74 +1,65 @@ -use crate::helpers::TestContext; -use chrono::Utc; -use fuel_core::{ - database::Database, - executor::Executor, - model::{ - FuelBlock, - FuelBlockHeader, - }, - service::{ +#[cfg(feature = "p2p")] +mod gossip_tests { + use fuel_core::service::{ Config, FuelService, - }, -}; -use fuel_core_interfaces::{ - common::{ + }; + use fuel_core_interfaces::common::{ fuel_tx, fuel_vm::{ consts::*, prelude::*, }, - }, - executor::ExecutionMode, -}; -use fuel_gql_client::client::{ - types::TransactionStatus, - FuelClient, - PageDirection, - PaginationRequest, -}; + }; + use fuel_gql_client::client::FuelClient; -#[tokio::test] -async fn test_tx_gossiping() { - let node_config = Config::local_node(); - let node_one = FuelService::new_node(node_config.clone()).await.unwrap(); - let client_one = FuelClient::from(node_one.bound_address); + #[tokio::test] + async fn test_tx_gossiping() { + let mut node_config = Config::local_node(); + node_config.utxo_validation = true; - let node_two = FuelService::new_node(node_config).await.unwrap(); - let client_two = FuelClient::from(node_two.bound_address); + let node_one = FuelService::new_node(node_config.clone()).await.unwrap(); + let client_one = FuelClient::from(node_one.bound_address); - let gas_price = 0; - let gas_limit = 1_000_000; - let maturity = 0; + let node_two = FuelService::new_node(node_config.clone()).await.unwrap(); + let client_two = FuelClient::from(node_two.bound_address); - let script = vec![ - Opcode::ADDI(0x10, REG_ZERO, 0xca), - Opcode::ADDI(0x11, REG_ZERO, 0xba), - Opcode::LOG(0x10, 0x11, REG_ZERO, REG_ZERO), - Opcode::RET(REG_ONE), - ]; - let script: Vec = script - .iter() - .flat_map(|op| u32::from(*op).to_be_bytes()) - .collect(); + let gas_price = 0; + let gas_limit = 1_000_000; + let maturity = 0; - let tx = fuel_tx::Transaction::script( - gas_price, - gas_limit, - maturity, - script, - vec![], - vec![], - vec![], - vec![], - ); + let script = vec![ + Opcode::ADDI(0x10, REG_ZERO, 0xca), + Opcode::ADDI(0x11, REG_ZERO, 0xba), + Opcode::LOG(0x10, 0x11, REG_ZERO, REG_ZERO), + Opcode::RET(REG_ONE), + ]; + let script: Vec = script + .iter() + .flat_map(|op| u32::from(*op).to_be_bytes()) + .collect(); - let result = client_one.submit(&tx).await.unwrap(); + let tx = Transaction::script( + gas_price, + gas_limit, + maturity, + script, + vec![], + vec![], + vec![], + vec![], + ); - // Perhaps some delay is needed before this query? + let result = client_one.submit(&tx).await.unwrap(); - let tx = client_two.transaction(&result.0.to_string()).await.unwrap(); + let tx = client_one.transaction(&result.0.to_string()).await.unwrap(); + assert!(tx.is_some()); - assert!(tx.is_some()); + // Perhaps some delay is needed before this query? + + std::thread::sleep(std::time::Duration::new(2, 0)); + + let tx = client_two.transaction(&result.0.to_string()).await.unwrap(); + assert!(tx.is_some()); + } } diff --git a/fuel-txpool/src/txpool.rs b/fuel-txpool/src/txpool.rs index 7cb0982c1a7..6b4a1ae16ce 100644 --- a/fuel-txpool/src/txpool.rs +++ b/fuel-txpool/src/txpool.rs @@ -174,6 +174,7 @@ impl TxPool { tx: tx.clone(), status: TxStatus::Submitted, }); + #[cfg(feature = "p2p")] let _ = network_sender .send(P2pRequestEvent::BroadcastNewTransaction { transaction: tx.clone(), From db0bb65d942d275131011fab8b950dcfe682ab09 Mon Sep 17 00:00:00 2001 From: Brandon Kite Date: Mon, 26 Sep 2022 15:23:02 -0700 Subject: [PATCH 24/38] add tx gossip to default topics --- fuel-p2p/src/config.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fuel-p2p/src/config.rs b/fuel-p2p/src/config.rs index 1b6432d3f17..dca1342d1a4 100644 --- a/fuel-p2p/src/config.rs +++ b/fuel-p2p/src/config.rs @@ -1,3 +1,4 @@ +use crate::gossipsub::topics::NEW_TX_GOSSIP_TOPIC; use libp2p::{ core::{ muxing::StreamMuxerBox, @@ -102,7 +103,7 @@ impl P2PConfig { allow_private_addresses: true, enable_random_walk: true, connection_idle_timeout: Some(Duration::from_secs(120)), - topics: vec![], + topics: vec![NEW_TX_GOSSIP_TOPIC.into()], max_mesh_size: 12, min_mesh_size: 4, ideal_mesh_size: 6, From a2498d774157bcf8c81df90e44cc6bfaecc6ad78 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Tue, 27 Sep 2022 18:06:06 -0400 Subject: [PATCH 25/38] Clean up use statements and dbg --- fuel-p2p/src/peer_info.rs | 1 - fuel-tests/tests/tx/utxo_validation.rs | 115 ------------------------- fuel-tests/tests/tx_gossip.rs | 105 +++++++++++++++------- fuel-txpool/src/service.rs | 6 +- fuel-txpool/src/txpool.rs | 9 +- 5 files changed, 85 insertions(+), 151 deletions(-) diff --git a/fuel-p2p/src/peer_info.rs b/fuel-p2p/src/peer_info.rs index bf7b1f06b66..b16ce5091f1 100644 --- a/fuel-p2p/src/peer_info.rs +++ b/fuel-p2p/src/peer_info.rs @@ -145,7 +145,6 @@ impl PeerInfoBehaviour { } _ => { self.peers.insert(*peer_id, PeerInfo::new(connected_point)); - dbg!(&peer_id); } } } diff --git a/fuel-tests/tests/tx/utxo_validation.rs b/fuel-tests/tests/tx/utxo_validation.rs index a39ba035272..1171905f1ed 100644 --- a/fuel-tests/tests/tx/utxo_validation.rs +++ b/fuel-tests/tests/tx/utxo_validation.rs @@ -26,121 +26,6 @@ use rand::{ }; use std::collections::HashSet; -#[cfg(feature = "p2p")] -mod gossip_tests { - use fuel_core::{ - chain_config::{ - CoinConfig, - StateConfig, - }, - fuel_p2p::config::{ - convert_to_libp2p_keypair, - P2PConfig, - }, - service::{ - Config, - FuelService, - }, - }; - use fuel_core_interfaces::common::{ - fuel_tx, - fuel_tx::TransactionBuilder, - fuel_vm::{ - consts::*, - prelude::*, - }, - }; - use fuel_gql_client::client::FuelClient; - use rand::{ - rngs::StdRng, - Rng, - SeedableRng, - }; - use std::time::Duration; - - #[tokio::test] - async fn test_tx_gossiping() { - let mut rng = StdRng::seed_from_u64(2322); - - let mut node_config = Config::local_node(); - node_config.utxo_validation = true; - - let tx = TransactionBuilder::script( - Opcode::RET(REG_ONE).to_bytes().into_iter().collect(), - vec![], - ) - .gas_limit(100) - .gas_price(1) - .add_unsigned_coin_input( - SecretKey::random(&mut rng), - rng.gen(), - 1000, - Default::default(), - Default::default(), - 0, - ) - .add_output(Output::Change { - amount: 0, - asset_id: Default::default(), - to: rng.gen(), - }) - .finalize(); - - if let Input::CoinSigned { - amount, - owner, - asset_id, - utxo_id, - .. - } - | Input::CoinPredicate { - amount, - owner, - asset_id, - utxo_id, - .. - } = tx.inputs()[0] - { - let mut initial_state = StateConfig::default(); - let coin_config = Some(vec![CoinConfig { - tx_id: Some(*utxo_id.tx_id()), - output_index: Some(utxo_id.output_index() as u64), - block_created: None, - maturity: None, - owner, - amount, - asset_id, - }]); - initial_state.coins = coin_config; - node_config.chain_conf.initial_state = Some(initial_state); - }; - - node_config.p2p.enable_mdns = true; - - let node_one = FuelService::new_node(node_config.clone()).await.unwrap(); - let client_one = FuelClient::from(node_one.bound_address); - - let secret_key = rng.gen::(); - node_config.p2p.local_keypair = convert_to_libp2p_keypair(secret_key).unwrap(); - let node_two = FuelService::new_node(node_config.clone()).await.unwrap(); - let client_two = FuelClient::from(node_two.bound_address); - - tokio::time::sleep(Duration::new(3, 0)).await; - - let result = client_one.submit(&tx).await.unwrap(); - - let tx = client_one.transaction(&result.0.to_string()).await.unwrap(); - assert!(tx.is_some()); - - // Perhaps some delay is needed before this query? - - tokio::time::sleep(Duration::new(3, 0)).await; - - let tx = client_two.transaction(&result.0.to_string()).await.unwrap(); - assert!(tx.is_some()); - } -} - #[tokio::test] async fn submit_utxo_verified_tx_with_min_gas_price() { let mut rng = StdRng::seed_from_u64(2322); diff --git a/fuel-tests/tests/tx_gossip.rs b/fuel-tests/tests/tx_gossip.rs index 7036abcf6ed..2530a5616b7 100644 --- a/fuel-tests/tests/tx_gossip.rs +++ b/fuel-tests/tests/tx_gossip.rs @@ -1,54 +1,99 @@ #[cfg(feature = "p2p")] mod gossip_tests { - use fuel_core::service::{ - Config, - FuelService, + use fuel_core::{ + chain_config::{ + CoinConfig, + StateConfig, + }, + fuel_p2p::config::convert_to_libp2p_keypair, + service::{ + Config, + FuelService, + }, }; use fuel_core_interfaces::common::{ - fuel_tx, + fuel_tx::TransactionBuilder, fuel_vm::{ consts::*, prelude::*, }, }; use fuel_gql_client::client::FuelClient; + use rand::{ + rngs::StdRng, + Rng, + SeedableRng, + }; + use std::time::Duration; #[tokio::test] async fn test_tx_gossiping() { + let mut rng = StdRng::seed_from_u64(2322); + let mut node_config = Config::local_node(); node_config.utxo_validation = true; + let tx = TransactionBuilder::script( + Opcode::RET(REG_ONE).to_bytes().into_iter().collect(), + vec![], + ) + .gas_limit(100) + .gas_price(1) + .add_unsigned_coin_input( + SecretKey::random(&mut rng), + rng.gen(), + 1000, + Default::default(), + Default::default(), + 0, + ) + .add_output(Output::Change { + amount: 0, + asset_id: Default::default(), + to: rng.gen(), + }) + .finalize(); + + if let Input::CoinSigned { + amount, + owner, + asset_id, + utxo_id, + .. + } + | Input::CoinPredicate { + amount, + owner, + asset_id, + utxo_id, + .. + } = tx.inputs()[0] + { + let mut initial_state = StateConfig::default(); + let coin_config = vec![CoinConfig { + tx_id: Some(*utxo_id.tx_id()), + output_index: Some(utxo_id.output_index() as u64), + block_created: None, + maturity: None, + owner, + amount, + asset_id, + }]; + initial_state.coins = Some(coin_config); + node_config.chain_conf.initial_state = Some(initial_state); + }; + + node_config.p2p.enable_mdns = true; + let node_one = FuelService::new_node(node_config.clone()).await.unwrap(); let client_one = FuelClient::from(node_one.bound_address); + let secret_key = rng.gen::(); + node_config.p2p.local_keypair = convert_to_libp2p_keypair(secret_key).unwrap(); let node_two = FuelService::new_node(node_config.clone()).await.unwrap(); let client_two = FuelClient::from(node_two.bound_address); - let gas_price = 0; - let gas_limit = 1_000_000; - let maturity = 0; - - let script = vec![ - Opcode::ADDI(0x10, REG_ZERO, 0xca), - Opcode::ADDI(0x11, REG_ZERO, 0xba), - Opcode::LOG(0x10, 0x11, REG_ZERO, REG_ZERO), - Opcode::RET(REG_ONE), - ]; - let script: Vec = script - .iter() - .flat_map(|op| u32::from(*op).to_be_bytes()) - .collect(); - - let tx = Transaction::script( - gas_price, - gas_limit, - maturity, - script, - vec![], - vec![], - vec![], - vec![], - ); + tokio::time::sleep(Duration::new(3, 0)).await; let result = client_one.submit(&tx).await.unwrap(); @@ -57,7 +102,7 @@ mod gossip_tests { // Perhaps some delay is needed before this query? - std::thread::sleep(std::time::Duration::new(2, 0)); + tokio::time::sleep(Duration::new(3, 0)).await; let tx = client_two.transaction(&result.0.to_string()).await.unwrap(); assert!(tx.is_some()); diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index 2d080e030db..8e4b530abbb 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -209,8 +209,6 @@ impl Context { #[cfg(feature = "p2p")] let network_sender = self.network_sender.clone(); - #[cfg(not(feature = "p2p"))] - let (network_sender, _) = mpsc::channel(100); // This is little bit risky but we can always add semaphore to limit number of requests. tokio::spawn( async move { @@ -220,8 +218,12 @@ impl Context { let _ = response.send(TxPool::includable(txpool).await); } TxPoolMpsc::Insert { txs, response } => { + #[cfg(feature = "p2p")] let _ = response.send(TxPool::insert_with_broadcast(txpool, db.as_ref().as_ref(), tx_status_sender, network_sender, txs).await); + #[cfg(not(feature = "p2p"))] + let _ = response.send(TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender, txs).await); } + TxPoolMpsc::Find { ids, response } => { let _ = response.send(TxPool::find(txpool,&ids).await); } diff --git a/fuel-txpool/src/txpool.rs b/fuel-txpool/src/txpool.rs index 6b4a1ae16ce..8a64b9cf519 100644 --- a/fuel-txpool/src/txpool.rs +++ b/fuel-txpool/src/txpool.rs @@ -13,7 +13,6 @@ use fuel_core_interfaces::{ FuelBlock, TxInfo, }, - p2p::P2pRequestEvent, txpool::{ TxPoolDb, TxStatus, @@ -27,10 +26,14 @@ use std::{ }; use tokio::sync::{ broadcast, - mpsc, RwLock, }; +#[cfg(feature = "p2p")] +use fuel_core_interfaces::p2p::P2pRequestEvent; +#[cfg(feature = "p2p")] +use tokio::sync::mpsc; + #[derive(Debug, Clone)] pub struct TxPool { by_hash: HashMap, @@ -147,6 +150,7 @@ impl TxPool { Ok(()) } + #[cfg(feature = "p2p")] pub async fn insert_with_broadcast( txpool: &RwLock, db: &dyn TxPoolDb, @@ -174,7 +178,6 @@ impl TxPool { tx: tx.clone(), status: TxStatus::Submitted, }); - #[cfg(feature = "p2p")] let _ = network_sender .send(P2pRequestEvent::BroadcastNewTransaction { transaction: tx.clone(), From 2f2e7d0ed042d846988fec55756f4261c757c878 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Tue, 27 Sep 2022 18:35:51 -0400 Subject: [PATCH 26/38] Refactor node_config --- fuel-tests/tests/tx_gossip.rs | 119 +++++++++++++++++----------------- 1 file changed, 59 insertions(+), 60 deletions(-) diff --git a/fuel-tests/tests/tx_gossip.rs b/fuel-tests/tests/tx_gossip.rs index 2530a5616b7..72091c27b72 100644 --- a/fuel-tests/tests/tx_gossip.rs +++ b/fuel-tests/tests/tx_gossip.rs @@ -5,7 +5,6 @@ mod gossip_tests { CoinConfig, StateConfig, }, - fuel_p2p::config::convert_to_libp2p_keypair, service::{ Config, FuelService, @@ -13,10 +12,7 @@ mod gossip_tests { }; use fuel_core_interfaces::common::{ fuel_tx::TransactionBuilder, - fuel_vm::{ - consts::*, - prelude::*, - }, + fuel_vm::prelude::*, }; use fuel_gql_client::client::FuelClient; use rand::{ @@ -26,71 +22,76 @@ mod gossip_tests { }; use std::time::Duration; - #[tokio::test] - async fn test_tx_gossiping() { - let mut rng = StdRng::seed_from_u64(2322); - + fn create_node_config_from_inputs(inputs: &[Input]) -> Config { let mut node_config = Config::local_node(); node_config.utxo_validation = true; - let tx = TransactionBuilder::script( - Opcode::RET(REG_ONE).to_bytes().into_iter().collect(), - vec![], - ) - .gas_limit(100) - .gas_price(1) - .add_unsigned_coin_input( - SecretKey::random(&mut rng), - rng.gen(), - 1000, - Default::default(), - Default::default(), - 0, - ) - .add_output(Output::Change { - amount: 0, - asset_id: Default::default(), - to: rng.gen(), - }) - .finalize(); + let mut initial_state = StateConfig::default(); + let mut coin_configs = vec![]; - if let Input::CoinSigned { - amount, - owner, - asset_id, - utxo_id, - .. - } - | Input::CoinPredicate { - amount, - owner, - asset_id, - utxo_id, - .. - } = tx.inputs()[0] - { - let mut initial_state = StateConfig::default(); - let coin_config = vec![CoinConfig { - tx_id: Some(*utxo_id.tx_id()), - output_index: Some(utxo_id.output_index() as u64), - block_created: None, - maturity: None, + for input in inputs { + if let Input::CoinSigned { + amount, owner, + asset_id, + utxo_id, + .. + } + | Input::CoinPredicate { amount, + owner, asset_id, - }]; - initial_state.coins = Some(coin_config); - node_config.chain_conf.initial_state = Some(initial_state); - }; + utxo_id, + .. + } = input + { + let coin_config = CoinConfig { + tx_id: Some(*utxo_id.tx_id()), + output_index: Some(utxo_id.output_index() as u64), + block_created: None, + maturity: None, + owner: *owner, + amount: *amount, + asset_id: *asset_id, + }; + coin_configs.push(coin_config); + }; + } + initial_state.coins = Some(coin_configs); + node_config.chain_conf.initial_state = Some(initial_state); node_config.p2p.enable_mdns = true; + node_config + } + + #[tokio::test] + async fn test_tx_gossiping() { + let mut rng = StdRng::seed_from_u64(2322); - let node_one = FuelService::new_node(node_config.clone()).await.unwrap(); + let tx = TransactionBuilder::script(vec![], vec![]) + .gas_limit(100) + .gas_price(1) + .add_unsigned_coin_input( + SecretKey::random(&mut rng), + rng.gen(), + 1000, + Default::default(), + Default::default(), + 0, + ) + .add_output(Output::Change { + amount: 0, + asset_id: Default::default(), + to: rng.gen(), + }) + .finalize(); + + let node_config = create_node_config_from_inputs(tx.inputs()); + let node_one = FuelService::new_node(node_config).await.unwrap(); let client_one = FuelClient::from(node_one.bound_address); - let secret_key = rng.gen::(); - node_config.p2p.local_keypair = convert_to_libp2p_keypair(secret_key).unwrap(); - let node_two = FuelService::new_node(node_config.clone()).await.unwrap(); + let node_config = create_node_config_from_inputs(tx.inputs()); + let node_two = FuelService::new_node(node_config).await.unwrap(); let client_two = FuelClient::from(node_two.bound_address); tokio::time::sleep(Duration::new(3, 0)).await; @@ -100,8 +101,6 @@ mod gossip_tests { let tx = client_one.transaction(&result.0.to_string()).await.unwrap(); assert!(tx.is_some()); - // Perhaps some delay is needed before this query? - tokio::time::sleep(Duration::new(3, 0)).await; let tx = client_two.transaction(&result.0.to_string()).await.unwrap(); From 93c3890466d163b05b36872557b398cd4c08ce39 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Tue, 27 Sep 2022 19:00:21 -0400 Subject: [PATCH 27/38] Simplify gossip test mod with feature flag --- fuel-tests/tests/lib.rs | 1 + fuel-tests/tests/tx_gossip.rs | 186 +++++++++++++++++----------------- 2 files changed, 92 insertions(+), 95 deletions(-) diff --git a/fuel-tests/tests/lib.rs b/fuel-tests/tests/lib.rs index 612610846ff..bc0ed4684d4 100644 --- a/fuel-tests/tests/lib.rs +++ b/fuel-tests/tests/lib.rs @@ -14,4 +14,5 @@ mod node_info; mod resource; mod snapshot; mod tx; +#[cfg(feature = "p2p")] mod tx_gossip; diff --git a/fuel-tests/tests/tx_gossip.rs b/fuel-tests/tests/tx_gossip.rs index 72091c27b72..9c3f12f2728 100644 --- a/fuel-tests/tests/tx_gossip.rs +++ b/fuel-tests/tests/tx_gossip.rs @@ -1,109 +1,105 @@ -#[cfg(feature = "p2p")] -mod gossip_tests { - use fuel_core::{ - chain_config::{ - CoinConfig, - StateConfig, - }, - service::{ - Config, - FuelService, - }, - }; - use fuel_core_interfaces::common::{ - fuel_tx::TransactionBuilder, - fuel_vm::prelude::*, - }; - use fuel_gql_client::client::FuelClient; - use rand::{ - rngs::StdRng, - Rng, - SeedableRng, - }; - use std::time::Duration; +use fuel_core::{ + chain_config::{ + CoinConfig, + StateConfig, + }, + service::{ + Config, + FuelService, + }, +}; +use fuel_core_interfaces::common::{ + fuel_tx::TransactionBuilder, + fuel_vm::prelude::*, +}; +use fuel_gql_client::client::FuelClient; +use rand::{ + rngs::StdRng, + Rng, + SeedableRng, +}; +use std::time::Duration; - fn create_node_config_from_inputs(inputs: &[Input]) -> Config { - let mut node_config = Config::local_node(); - node_config.utxo_validation = true; +fn create_node_config_from_inputs(inputs: &[Input]) -> Config { + let mut node_config = Config::local_node(); + let mut initial_state = StateConfig::default(); + let mut coin_configs = vec![]; - let mut initial_state = StateConfig::default(); - let mut coin_configs = vec![]; - - for input in inputs { - if let Input::CoinSigned { - amount, - owner, - asset_id, - utxo_id, - .. - } - | Input::CoinPredicate { - amount, - owner, - asset_id, - utxo_id, - .. - } = input - { - let coin_config = CoinConfig { - tx_id: Some(*utxo_id.tx_id()), - output_index: Some(utxo_id.output_index() as u64), - block_created: None, - maturity: None, - owner: *owner, - amount: *amount, - asset_id: *asset_id, - }; - coin_configs.push(coin_config); - }; + for input in inputs { + if let Input::CoinSigned { + amount, + owner, + asset_id, + utxo_id, + .. } - - initial_state.coins = Some(coin_configs); - node_config.chain_conf.initial_state = Some(initial_state); - node_config.p2p.enable_mdns = true; - node_config + | Input::CoinPredicate { + amount, + owner, + asset_id, + utxo_id, + .. + } = input + { + let coin_config = CoinConfig { + tx_id: Some(*utxo_id.tx_id()), + output_index: Some(utxo_id.output_index() as u64), + block_created: None, + maturity: None, + owner: *owner, + amount: *amount, + asset_id: *asset_id, + }; + coin_configs.push(coin_config); + }; } - #[tokio::test] - async fn test_tx_gossiping() { - let mut rng = StdRng::seed_from_u64(2322); + initial_state.coins = Some(coin_configs); + node_config.chain_conf.initial_state = Some(initial_state); + node_config.utxo_validation = true; + node_config.p2p.enable_mdns = true; + node_config +} - let tx = TransactionBuilder::script(vec![], vec![]) - .gas_limit(100) - .gas_price(1) - .add_unsigned_coin_input( - SecretKey::random(&mut rng), - rng.gen(), - 1000, - Default::default(), - Default::default(), - 0, - ) - .add_output(Output::Change { - amount: 0, - asset_id: Default::default(), - to: rng.gen(), - }) - .finalize(); +#[tokio::test] +async fn test_tx_gossiping() { + let mut rng = StdRng::seed_from_u64(2322); - let node_config = create_node_config_from_inputs(tx.inputs()); - let node_one = FuelService::new_node(node_config).await.unwrap(); - let client_one = FuelClient::from(node_one.bound_address); + let tx = TransactionBuilder::script(vec![], vec![]) + .gas_limit(100) + .gas_price(1) + .add_unsigned_coin_input( + SecretKey::random(&mut rng), + rng.gen(), + 1000, + Default::default(), + Default::default(), + 0, + ) + .add_output(Output::Change { + amount: 0, + asset_id: Default::default(), + to: rng.gen(), + }) + .finalize(); - let node_config = create_node_config_from_inputs(tx.inputs()); - let node_two = FuelService::new_node(node_config).await.unwrap(); - let client_two = FuelClient::from(node_two.bound_address); + let node_config = create_node_config_from_inputs(tx.inputs()); + let node_one = FuelService::new_node(node_config).await.unwrap(); + let client_one = FuelClient::from(node_one.bound_address); - tokio::time::sleep(Duration::new(3, 0)).await; + let node_config = create_node_config_from_inputs(tx.inputs()); + let node_two = FuelService::new_node(node_config).await.unwrap(); + let client_two = FuelClient::from(node_two.bound_address); - let result = client_one.submit(&tx).await.unwrap(); + tokio::time::sleep(Duration::new(3, 0)).await; - let tx = client_one.transaction(&result.0.to_string()).await.unwrap(); - assert!(tx.is_some()); + let result = client_one.submit(&tx).await.unwrap(); - tokio::time::sleep(Duration::new(3, 0)).await; + let tx = client_one.transaction(&result.0.to_string()).await.unwrap(); + assert!(tx.is_some()); - let tx = client_two.transaction(&result.0.to_string()).await.unwrap(); - assert!(tx.is_some()); - } + tokio::time::sleep(Duration::new(3, 0)).await; + + let tx = client_two.transaction(&result.0.to_string()).await.unwrap(); + assert!(tx.is_some()); } From d4e2fac64380b72b6f852dba6e7c268d02fade6e Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Thu, 29 Sep 2022 13:42:38 -0400 Subject: [PATCH 28/38] Remove readme changes --- fuel-txpool/README.md | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/fuel-txpool/README.md b/fuel-txpool/README.md index 99d70ff8f0d..20be34da987 100644 --- a/fuel-txpool/README.md +++ b/fuel-txpool/README.md @@ -29,29 +29,6 @@ We will need to have at least three structures that do sorting of tx. * by PriceSort: sorted structure that sort all transaction by GasPrice. Most optimal structure can be `BinaryHeap` for fast sorting/inserting but it does have some downsides when we want to remove one item. For use case when a lot of tx are going to be removed we can just recreate structure from scratch. For first interaction we can use simple `BTreeMap` that sorts all inputs. * by Dependency: With every fuel transaction, inputs and outputs change state and we need to be aware of it. Graph is the main defensive structure against DDoS attack, and every transaction that we include in pool should have potential to be included in next block. The graph represents connections between parent and child txs, where child depends on execution output of parent that is found inside database or transaction pool. -## P2P Integration - -The fuel-txpool integrates with fuel-p2p through the use of 2 channels. Since fuel-txpool uses 6 total channels though each one's use is documented below. - -### GraphQL <-> TxPool Channels - -- tx_status_sender: This channel is used to communicate between GraphQL and the TxPool, primarily for status updates on where a transaction is. - -- txpool_receiver: This channel is used to recieve TxPoolMpsc events from downstream consumers. The main use of this channel is GraphQL whose endpoints send various events to either view or submit tx's to the txpool. - -### P2P <-> TxPool Channels - -- network_sender: This channel is used to communicate to p2p from txpool. Transactions are inserted into the TxPool from the GraphQL endpoint will be broadcasted on this channel. - -- incoming_tx_receiver: This channel is used to recieve new txs from p2p, and is important to how txs propogate through the network. Critically though once this is recieved on this channel it is not broadcasted further. - -- import_block_receiver: This channel is used to recieve blocks, so when new blocks are created they are recieved on this channel. - -### ????? -- txpool_sender: TODO Ask someone on this, but looks like pretty much to just stop the TxPool. - -So the 2 channels needed to communicate with p2p are `network_sender` and `incoming_tx_receiver`. The first way the channels connect is that fuel-p2p's `rx_request_event` is the reciever to txpool's `network_sender` sender channel. The combination of both of these channels allows for transactions recieved on the GraphQL endpoint of the node to be broadcasted on p2p. The next channel combination is txpool's `incoming_tx_receiver` which recieves transactions broadcasted across the network from fuel-p2p's `tx_transaction`. - ### Dependency graph Few reasonings on decision and restrains made by txpool From df2fe7014ff96a693519a64598403d8872df39b9 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Thu, 29 Sep 2022 14:18:58 -0400 Subject: [PATCH 29/38] Minor cleanup --- fuel-block-producer/tests/integration.rs | 7 ++++--- fuel-core/src/service/modules.rs | 1 - fuel-p2p/src/orchestrator.rs | 4 +--- fuel-tests/Cargo.toml | 1 - fuel-txpool/src/service.rs | 7 +++---- 5 files changed, 8 insertions(+), 12 deletions(-) diff --git a/fuel-block-producer/tests/integration.rs b/fuel-block-producer/tests/integration.rs index d8e25751a74..20b87cf8bcf 100644 --- a/fuel-block-producer/tests/integration.rs +++ b/fuel-block-producer/tests/integration.rs @@ -135,9 +135,10 @@ async fn block_producer() -> Result<()> { .txpool_receiver(txpool_receiver); #[cfg(feature = "p2p")] - let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100); - #[cfg(feature = "p2p")] - txpool_builder.network_sender(p2p_request_event_sender.clone()); + { + let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100); + txpool_builder.network_sender(p2p_request_event_sender.clone()); + } let txpool = txpool_builder.build().unwrap(); txpool.start().await?; diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index e2a4a3bc8ca..d94e83b6429 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -153,7 +153,6 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result { - if let Ok(block_updated) = block_updated { match block_updated { ImportBlockBroadcast::PendingFuelBlockImported { block } => { let txpool = txpool.clone(); - TxPool::block_update(txpool.as_ref(), block).await; - // TODO : Not working well + TxPool::block_update(txpool.as_ref(), block).await + // TODO: Should this be done in a separate task? Like this: // tokio::spawn( async move { - // TxPool::block_update(txpool.as_ref(), block).await + // TxPool::block_update(txpool.as_ref(), block).await // }); }, ImportBlockBroadcast::SealedFuelBlockImported { block: _, is_created_by_self: _ } => { From fe89f69434901fb9da064db27c54f6f4cddf1af7 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Thu, 29 Sep 2022 14:25:12 -0400 Subject: [PATCH 30/38] Minor cleanup --- fuel-block-producer/tests/integration.rs | 2 +- fuel-core/src/service/modules.rs | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/fuel-block-producer/tests/integration.rs b/fuel-block-producer/tests/integration.rs index 20b87cf8bcf..c9e1e091f06 100644 --- a/fuel-block-producer/tests/integration.rs +++ b/fuel-block-producer/tests/integration.rs @@ -137,7 +137,7 @@ async fn block_producer() -> Result<()> { #[cfg(feature = "p2p")] { let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100); - txpool_builder.network_sender(p2p_request_event_sender.clone()); + txpool_builder.network_sender(p2p_request_event_sender); } let txpool = txpool_builder.build().unwrap(); diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index d94e83b6429..162bd367c11 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -97,12 +97,6 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result Result Date: Thu, 29 Sep 2022 14:26:36 -0400 Subject: [PATCH 31/38] Add all gossip topics to default p2p config --- fuel-p2p/src/config.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/fuel-p2p/src/config.rs b/fuel-p2p/src/config.rs index dca1342d1a4..4ad32218a9f 100644 --- a/fuel-p2p/src/config.rs +++ b/fuel-p2p/src/config.rs @@ -1,4 +1,8 @@ -use crate::gossipsub::topics::NEW_TX_GOSSIP_TOPIC; +use crate::gossipsub::topics::{ + CON_VOTE_GOSSIP_TOPIC, + NEW_BLOCK_GOSSIP_TOPIC, + NEW_TX_GOSSIP_TOPIC, +}; use libp2p::{ core::{ muxing::StreamMuxerBox, @@ -103,7 +107,11 @@ impl P2PConfig { allow_private_addresses: true, enable_random_walk: true, connection_idle_timeout: Some(Duration::from_secs(120)), - topics: vec![NEW_TX_GOSSIP_TOPIC.into()], + topics: vec![ + NEW_TX_GOSSIP_TOPIC.into(), + NEW_BLOCK_GOSSIP_TOPIC.into(), + CON_VOTE_GOSSIP_TOPIC.into(), + ], max_mesh_size: 12, min_mesh_size: 4, ideal_mesh_size: 6, From f63683a32e17cf15c0ff64440179491136dc3024 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Thu, 29 Sep 2022 14:28:48 -0400 Subject: [PATCH 32/38] Fix --- fuel-core/src/service/modules.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index 162bd367c11..da1cb4eaf69 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -98,16 +98,12 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result Date: Thu, 29 Sep 2022 14:35:13 -0400 Subject: [PATCH 33/38] Clean up txpool cargo --- fuel-txpool/Cargo.toml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/fuel-txpool/Cargo.toml b/fuel-txpool/Cargo.toml index 447bbf4a663..f34f0145e4e 100644 --- a/fuel-txpool/Cargo.toml +++ b/fuel-txpool/Cargo.toml @@ -21,11 +21,6 @@ thiserror = "1.0" tokio = { version = "1.14", default-features = false, features = ["sync"] } tracing = "0.1" -[dev-dependencies] -fuel-core-interfaces = { path = "../fuel-core-interfaces", version = "0.10.1", features = [ - "test-helpers", -] } - [features] p2p = [] test-helpers = ["fuel-core-interfaces/test-helpers"] From be1a152dc84c0c23b19ef3e29d816439dc8cee11 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Thu, 29 Sep 2022 14:41:37 -0400 Subject: [PATCH 34/38] Remove debug statements --- fuel-txpool/src/service.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index e825795a060..8be81e0a28f 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -186,13 +186,11 @@ impl Context { let db = self.db.clone(); let tx_status_sender = self.tx_status_sender.clone(); - println!("Progressing to move"); tokio::spawn( async move { let txpool = txpool.as_ref(); match new_transaction.unwrap() { TransactionBroadcast::NewTransaction ( tx ) => { let txs = vec!(Arc::new(tx)); - println!("Progressing to insert"); TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender, txs).await } } From c37a8c67a6fcbf104d3fbd061125f363e93864c4 Mon Sep 17 00:00:00 2001 From: ControlCplusControlV <44706811+ControlCplusControlV@users.noreply.github.com> Date: Sun, 2 Oct 2022 15:50:25 -0600 Subject: [PATCH 35/38] Remove Feature Flags from Tx Gossiping (#664) * cleaned up feature flags * fmt * fix leak: * fmt again --- fuel-block-producer/Cargo.toml | 1 - fuel-block-producer/tests/integration.rs | 7 ++-- fuel-core/Cargo.toml | 2 +- fuel-core/src/service/modules.rs | 7 ++-- fuel-txpool/Cargo.toml | 1 - fuel-txpool/src/service.rs | 46 +++++++----------------- fuel-txpool/src/txpool.rs | 3 -- 7 files changed, 20 insertions(+), 47 deletions(-) diff --git a/fuel-block-producer/Cargo.toml b/fuel-block-producer/Cargo.toml index 1a12c2dbad1..fce145993c3 100644 --- a/fuel-block-producer/Cargo.toml +++ b/fuel-block-producer/Cargo.toml @@ -26,5 +26,4 @@ rand = "0.8" rstest = "0.15" [features] -p2p = ["fuel-txpool/p2p"] test-helpers = ["fuel-core-interfaces/test-helpers"] diff --git a/fuel-block-producer/tests/integration.rs b/fuel-block-producer/tests/integration.rs index c9e1e091f06..86e804b9fba 100644 --- a/fuel-block-producer/tests/integration.rs +++ b/fuel-block-producer/tests/integration.rs @@ -134,11 +134,8 @@ async fn block_producer() -> Result<()> { .txpool_sender(TxPoolSender::new(txpool_sender)) .txpool_receiver(txpool_receiver); - #[cfg(feature = "p2p")] - { - let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100); - txpool_builder.network_sender(p2p_request_event_sender); - } + let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100); + txpool_builder.network_sender(p2p_request_event_sender); let txpool = txpool_builder.build().unwrap(); txpool.start().await?; diff --git a/fuel-core/Cargo.toml b/fuel-core/Cargo.toml index bb6c9467838..e2b46ba8d8e 100644 --- a/fuel-core/Cargo.toml +++ b/fuel-core/Cargo.toml @@ -88,6 +88,6 @@ metrics = ["dep:fuel-metrics"] default = ["rocksdb", "metrics", "debug"] debug = ["fuel-core-interfaces/debug"] relayer = ["dep:fuel-relayer"] -p2p = ["dep:fuel-p2p", "fuel-txpool/p2p", "fuel-block-producer/p2p"] +p2p = ["dep:fuel-p2p"] # features to enable in production, but increase build times production = ["rocksdb?/jemalloc"] diff --git a/fuel-core/src/service/modules.rs b/fuel-core/src/service/modules.rs index da1cb4eaf69..c5ed701300f 100644 --- a/fuel-core/src/service/modules.rs +++ b/fuel-core/src/service/modules.rs @@ -103,7 +103,7 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result Result Result>, @@ -36,7 +36,6 @@ pub struct ServiceBuilder { tx_status_sender: Option>, import_block_receiver: Option>, incoming_tx_receiver: Option>, - #[cfg(feature = "p2p")] network_sender: Option>, } @@ -56,7 +55,6 @@ impl ServiceBuilder { tx_status_sender: None, import_block_receiver: None, incoming_tx_receiver: None, - #[cfg(feature = "p2p")] network_sender: None, } } @@ -103,7 +101,6 @@ impl ServiceBuilder { self } - #[cfg(feature = "p2p")] pub fn network_sender( &mut self, network_sender: mpsc::Sender, @@ -136,7 +133,6 @@ impl ServiceBuilder { return Err(anyhow!("One of context items are not set")) } - #[cfg(feature = "p2p")] if self.network_sender.is_none() { return Err(anyhow!("P2P network sender is not set")) } @@ -151,7 +147,6 @@ impl ServiceBuilder { tx_status_sender: self.tx_status_sender.unwrap(), import_block_receiver: self.import_block_receiver.unwrap(), incoming_tx_receiver: self.incoming_tx_receiver.unwrap(), - #[cfg(feature = "p2p")] network_sender: self.network_sender.unwrap(), }, )?; @@ -166,7 +161,6 @@ pub struct Context { pub tx_status_sender: broadcast::Sender, pub import_block_receiver: broadcast::Receiver, pub incoming_tx_receiver: broadcast::Receiver, - #[cfg(feature = "p2p")] pub network_sender: mpsc::Sender, } @@ -205,7 +199,6 @@ impl Context { let db = self.db.clone(); let tx_status_sender = self.tx_status_sender.clone(); - #[cfg(feature = "p2p")] let network_sender = self.network_sender.clone(); // This is little bit risky but we can always add semaphore to limit number of requests. @@ -216,10 +209,7 @@ impl Context { let _ = response.send(TxPool::includable(txpool).await); } TxPoolMpsc::Insert { txs, response } => { - #[cfg(feature = "p2p")] let _ = response.send(TxPool::insert_with_broadcast(txpool, db.as_ref().as_ref(), tx_status_sender, network_sender, txs).await); - #[cfg(not(feature = "p2p"))] - let _ = response.send(TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender, txs).await); } TxPoolMpsc::Find { ids, response } => { @@ -360,11 +350,8 @@ pub mod tests { .txpool_sender(txpool_sender) .txpool_receiver(txpool_receiver); - #[cfg(feature = "p2p")] - { - let (network_sender, _) = mpsc::channel(100); - builder.network_sender(network_sender); - } + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); let service = builder.build().unwrap(); @@ -399,11 +386,8 @@ pub mod tests { .txpool_sender(txpool_sender) .txpool_receiver(txpool_receiver); - #[cfg(feature = "p2p")] - { - let (network_sender, _) = mpsc::channel(100); - builder.network_sender(network_sender); - } + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); let service = builder.build().unwrap(); service.start().await.ok(); @@ -489,11 +473,8 @@ pub mod tests { .txpool_sender(txpool_sender) .txpool_receiver(txpool_receiver); - #[cfg(feature = "p2p")] - { - let (network_sender, _) = mpsc::channel(100); - builder.network_sender(network_sender); - } + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); let service = builder.build().unwrap(); service.start().await.ok(); @@ -560,11 +541,8 @@ pub mod tests { .txpool_sender(txpool_sender) .txpool_receiver(txpool_receiver); - #[cfg(feature = "p2p")] - { - let (network_sender, _) = mpsc::channel(100); - builder.network_sender(network_sender); - } + let (network_sender, _) = mpsc::channel(100); + builder.network_sender(network_sender); let service = builder.build().unwrap(); service.start().await.ok(); diff --git a/fuel-txpool/src/txpool.rs b/fuel-txpool/src/txpool.rs index 8a64b9cf519..0a1f5b2cfaf 100644 --- a/fuel-txpool/src/txpool.rs +++ b/fuel-txpool/src/txpool.rs @@ -29,9 +29,7 @@ use tokio::sync::{ RwLock, }; -#[cfg(feature = "p2p")] use fuel_core_interfaces::p2p::P2pRequestEvent; -#[cfg(feature = "p2p")] use tokio::sync::mpsc; #[derive(Debug, Clone)] @@ -150,7 +148,6 @@ impl TxPool { Ok(()) } - #[cfg(feature = "p2p")] pub async fn insert_with_broadcast( txpool: &RwLock, db: &dyn TxPoolDb, From 6a18f9427181280f2beff703ab031135eb38e6a4 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Mon, 3 Oct 2022 17:53:58 -0400 Subject: [PATCH 36/38] Refactor insert_with_broadcast --- fuel-txpool/src/service.rs | 13 +++++++++++- fuel-txpool/src/txpool.rs | 41 -------------------------------------- 2 files changed, 12 insertions(+), 42 deletions(-) diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index a3f91da91a7..c90a3d8675d 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -209,7 +209,18 @@ impl Context { let _ = response.send(TxPool::includable(txpool).await); } TxPoolMpsc::Insert { txs, response } => { - let _ = response.send(TxPool::insert_with_broadcast(txpool, db.as_ref().as_ref(), tx_status_sender, network_sender, txs).await); + let insert = TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender,txs.clone()).await; + for (ret, tx) in insert.iter().zip(txs.into_iter()) { + match ret { + Ok(_) => { + let _ = network_sender.send(P2pRequestEvent::BroadcastNewTransaction { + transaction: tx.clone(), + }).await; + } + Err(_) => {} + } + } + let _ = response.send(insert); } TxPoolMpsc::Find { ids, response } => { diff --git a/fuel-txpool/src/txpool.rs b/fuel-txpool/src/txpool.rs index 0a1f5b2cfaf..f7b4eb94619 100644 --- a/fuel-txpool/src/txpool.rs +++ b/fuel-txpool/src/txpool.rs @@ -148,47 +148,6 @@ impl TxPool { Ok(()) } - pub async fn insert_with_broadcast( - txpool: &RwLock, - db: &dyn TxPoolDb, - tx_status_sender: broadcast::Sender, - network_sender: mpsc::Sender, - txs: Vec, - ) -> Vec>> { - let mut res = Vec::new(); - for tx in txs.iter() { - let mut pool = txpool.write().await; - res.push(pool.insert_inner(tx.clone(), db).await) - } - for (ret, tx) in res.iter().zip(txs.into_iter()) { - match ret { - Ok(removed) => { - for removed in removed { - let _ = tx_status_sender.send(TxStatusBroadcast { - tx: removed.clone(), - status: TxStatus::SqueezedOut { - reason: Error::Removed, - }, - }); - } - let _ = tx_status_sender.send(TxStatusBroadcast { - tx: tx.clone(), - status: TxStatus::Submitted, - }); - let _ = network_sender - .send(P2pRequestEvent::BroadcastNewTransaction { - transaction: tx.clone(), - }) - .await; - } - Err(_) => { - // @dev should not broadcast tx if error occurred - } - } - } - res - } - /// Import a set of transactions from network gossip or GraphQL endpoints. pub async fn insert( txpool: &RwLock, From 91fea79a838727a9c15e6447964c6ba02361a008 Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Thu, 6 Oct 2022 11:52:26 -0400 Subject: [PATCH 37/38] Remove insert_with_broadcast --- fuel-txpool/src/txpool.rs | 44 --------------------------------------- 1 file changed, 44 deletions(-) diff --git a/fuel-txpool/src/txpool.rs b/fuel-txpool/src/txpool.rs index 0a1f5b2cfaf..2fa7b718614 100644 --- a/fuel-txpool/src/txpool.rs +++ b/fuel-txpool/src/txpool.rs @@ -29,9 +29,6 @@ use tokio::sync::{ RwLock, }; -use fuel_core_interfaces::p2p::P2pRequestEvent; -use tokio::sync::mpsc; - #[derive(Debug, Clone)] pub struct TxPool { by_hash: HashMap, @@ -148,47 +145,6 @@ impl TxPool { Ok(()) } - pub async fn insert_with_broadcast( - txpool: &RwLock, - db: &dyn TxPoolDb, - tx_status_sender: broadcast::Sender, - network_sender: mpsc::Sender, - txs: Vec, - ) -> Vec>> { - let mut res = Vec::new(); - for tx in txs.iter() { - let mut pool = txpool.write().await; - res.push(pool.insert_inner(tx.clone(), db).await) - } - for (ret, tx) in res.iter().zip(txs.into_iter()) { - match ret { - Ok(removed) => { - for removed in removed { - let _ = tx_status_sender.send(TxStatusBroadcast { - tx: removed.clone(), - status: TxStatus::SqueezedOut { - reason: Error::Removed, - }, - }); - } - let _ = tx_status_sender.send(TxStatusBroadcast { - tx: tx.clone(), - status: TxStatus::Submitted, - }); - let _ = network_sender - .send(P2pRequestEvent::BroadcastNewTransaction { - transaction: tx.clone(), - }) - .await; - } - Err(_) => { - // @dev should not broadcast tx if error occurred - } - } - } - res - } - /// Import a set of transactions from network gossip or GraphQL endpoints. pub async fn insert( txpool: &RwLock, From c1ecf932637c89ff075c1e9729f25e35ee0acceb Mon Sep 17 00:00:00 2001 From: Brandon Vrooman Date: Thu, 6 Oct 2022 11:56:53 -0400 Subject: [PATCH 38/38] Simplify --- fuel-txpool/src/service.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/fuel-txpool/src/service.rs b/fuel-txpool/src/service.rs index 5d3791373ea..0674e4e0ccc 100644 --- a/fuel-txpool/src/service.rs +++ b/fuel-txpool/src/service.rs @@ -129,14 +129,11 @@ impl ServiceBuilder { || self.txpool_sender.is_none() || self.tx_status_sender.is_none() || self.txpool_receiver.is_none() + || self.network_sender.is_none() { return Err(anyhow!("One of context items are not set")) } - if self.network_sender.is_none() { - return Err(anyhow!("P2P network sender is not set")) - } - let service = Service::new( self.txpool_sender.unwrap(), self.tx_status_sender.clone().unwrap(),