Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion fuel-block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@ rand = "0.8"
rstest = "0.15"

[features]
p2p = ["fuel-txpool/p2p"]
test-helpers = ["fuel-core-interfaces/test-helpers"]
7 changes: 2 additions & 5 deletions fuel-block-producer/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,8 @@ async fn block_producer() -> Result<()> {
.txpool_sender(TxPoolSender::new(txpool_sender))
.txpool_receiver(txpool_receiver);

#[cfg(feature = "p2p")]
{
let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100);
txpool_builder.network_sender(p2p_request_event_sender);
}
let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100);
txpool_builder.network_sender(p2p_request_event_sender);

let txpool = txpool_builder.build().unwrap();
txpool.start().await?;
Expand Down
2 changes: 1 addition & 1 deletion fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ metrics = ["dep:fuel-metrics"]
default = ["rocksdb", "metrics", "debug"]
debug = ["fuel-core-interfaces/debug"]
relayer = ["dep:fuel-relayer"]
p2p = ["dep:fuel-p2p", "fuel-txpool/p2p", "fuel-block-producer/p2p"]
p2p = ["dep:fuel-p2p"]
# features to enable in production, but increase build times
production = ["rocksdb?/jemalloc"]
7 changes: 5 additions & 2 deletions fuel-core/src/service/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result<Modul
#[cfg(feature = "p2p")]
let (p2p_request_event_sender, p2p_request_event_receiver) = mpsc::channel(100);
#[cfg(not(feature = "p2p"))]
let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100);
let (p2p_request_event_sender, mut p2p_request_event_receiver) = mpsc::channel(100);

#[cfg(feature = "p2p")]
let network_service = {
Expand All @@ -126,6 +126,10 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result<Modul

let keep_alive = Box::new(block_event_sender);
Box::leak(keep_alive);

tokio::spawn(async move {
while (p2p_request_event_receiver.recv().await).is_some() {}
});
}

let (tx_status_sender, mut tx_status_receiver) = broadcast::channel(100);
Expand All @@ -145,7 +149,6 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result<Modul
.txpool_sender(Sender::new(txpool_sender))
.txpool_receiver(txpool_receiver);

#[cfg(feature = "p2p")]
txpool_builder.network_sender(p2p_request_event_sender.clone());

let txpool = txpool_builder.build()?;
Expand Down
1 change: 0 additions & 1 deletion fuel-txpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,4 @@ tokio = { version = "1.21", default-features = false, features = ["sync"] }
tracing = "0.1"

[features]
p2p = []
test-helpers = ["fuel-core-interfaces/test-helpers"]
46 changes: 12 additions & 34 deletions fuel-txpool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use crate::{
use anyhow::anyhow;
use fuel_core_interfaces::{
block_importer::ImportBlockBroadcast,
p2p::TransactionBroadcast,
p2p::{
P2pRequestEvent,
TransactionBroadcast,
},
txpool::{
self,
TxPoolDb,
Expand All @@ -25,9 +28,6 @@ use tokio::{
};
use tracing::error;

#[cfg(feature = "p2p")]
use fuel_core_interfaces::p2p::P2pRequestEvent;

pub struct ServiceBuilder {
config: Config,
db: Option<Box<dyn TxPoolDb>>,
Expand All @@ -36,7 +36,6 @@ pub struct ServiceBuilder {
tx_status_sender: Option<broadcast::Sender<TxStatusBroadcast>>,
import_block_receiver: Option<broadcast::Receiver<ImportBlockBroadcast>>,
incoming_tx_receiver: Option<broadcast::Receiver<TransactionBroadcast>>,
#[cfg(feature = "p2p")]
network_sender: Option<mpsc::Sender<P2pRequestEvent>>,
}

Expand All @@ -56,7 +55,6 @@ impl ServiceBuilder {
tx_status_sender: None,
import_block_receiver: None,
incoming_tx_receiver: None,
#[cfg(feature = "p2p")]
network_sender: None,
}
}
Expand Down Expand Up @@ -103,7 +101,6 @@ impl ServiceBuilder {
self
}

#[cfg(feature = "p2p")]
pub fn network_sender(
&mut self,
network_sender: mpsc::Sender<P2pRequestEvent>,
Expand Down Expand Up @@ -136,7 +133,6 @@ impl ServiceBuilder {
return Err(anyhow!("One of context items are not set"))
}

#[cfg(feature = "p2p")]
if self.network_sender.is_none() {
return Err(anyhow!("P2P network sender is not set"))
}
Expand All @@ -151,7 +147,6 @@ impl ServiceBuilder {
tx_status_sender: self.tx_status_sender.unwrap(),
import_block_receiver: self.import_block_receiver.unwrap(),
incoming_tx_receiver: self.incoming_tx_receiver.unwrap(),
#[cfg(feature = "p2p")]
network_sender: self.network_sender.unwrap(),
},
)?;
Expand All @@ -166,7 +161,6 @@ pub struct Context {
pub tx_status_sender: broadcast::Sender<TxStatusBroadcast>,
pub import_block_receiver: broadcast::Receiver<ImportBlockBroadcast>,
pub incoming_tx_receiver: broadcast::Receiver<TransactionBroadcast>,
#[cfg(feature = "p2p")]
pub network_sender: mpsc::Sender<P2pRequestEvent>,
}

Expand Down Expand Up @@ -205,7 +199,6 @@ impl Context {
let db = self.db.clone();
let tx_status_sender = self.tx_status_sender.clone();

#[cfg(feature = "p2p")]
let network_sender = self.network_sender.clone();

// This is little bit risky but we can always add semaphore to limit number of requests.
Expand All @@ -216,10 +209,7 @@ impl Context {
let _ = response.send(TxPool::includable(txpool).await);
}
TxPoolMpsc::Insert { txs, response } => {
#[cfg(feature = "p2p")]
let _ = response.send(TxPool::insert_with_broadcast(txpool, db.as_ref().as_ref(), tx_status_sender, network_sender, txs).await);
#[cfg(not(feature = "p2p"))]
let _ = response.send(TxPool::insert(txpool, db.as_ref().as_ref(), tx_status_sender, txs).await);
}

TxPoolMpsc::Find { ids, response } => {
Expand Down Expand Up @@ -360,11 +350,8 @@ pub mod tests {
.txpool_sender(txpool_sender)
.txpool_receiver(txpool_receiver);

#[cfg(feature = "p2p")]
{
let (network_sender, _) = mpsc::channel(100);
builder.network_sender(network_sender);
}
let (network_sender, _) = mpsc::channel(100);
builder.network_sender(network_sender);

let service = builder.build().unwrap();

Expand Down Expand Up @@ -399,11 +386,8 @@ pub mod tests {
.txpool_sender(txpool_sender)
.txpool_receiver(txpool_receiver);

#[cfg(feature = "p2p")]
{
let (network_sender, _) = mpsc::channel(100);
builder.network_sender(network_sender);
}
let (network_sender, _) = mpsc::channel(100);
builder.network_sender(network_sender);

let service = builder.build().unwrap();
service.start().await.ok();
Expand Down Expand Up @@ -489,11 +473,8 @@ pub mod tests {
.txpool_sender(txpool_sender)
.txpool_receiver(txpool_receiver);

#[cfg(feature = "p2p")]
{
let (network_sender, _) = mpsc::channel(100);
builder.network_sender(network_sender);
}
let (network_sender, _) = mpsc::channel(100);
builder.network_sender(network_sender);

let service = builder.build().unwrap();
service.start().await.ok();
Expand Down Expand Up @@ -560,11 +541,8 @@ pub mod tests {
.txpool_sender(txpool_sender)
.txpool_receiver(txpool_receiver);

#[cfg(feature = "p2p")]
{
let (network_sender, _) = mpsc::channel(100);
builder.network_sender(network_sender);
}
let (network_sender, _) = mpsc::channel(100);
builder.network_sender(network_sender);

let service = builder.build().unwrap();
service.start().await.ok();
Expand Down
3 changes: 0 additions & 3 deletions fuel-txpool/src/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ use tokio::sync::{
RwLock,
};

#[cfg(feature = "p2p")]
use fuel_core_interfaces::p2p::P2pRequestEvent;
#[cfg(feature = "p2p")]
use tokio::sync::mpsc;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -150,7 +148,6 @@ impl TxPool {
Ok(())
}

#[cfg(feature = "p2p")]
pub async fn insert_with_broadcast(
txpool: &RwLock<Self>,
db: &dyn TxPoolDb,
Expand Down