Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
2dc5c35
txpool reimpled
ControlCplusControlV Sep 3, 2022
3ab7525
Tests go green
ControlCplusControlV Sep 3, 2022
3cc1ae2
clippy
ControlCplusControlV Sep 3, 2022
40e0384
final touches?
ControlCplusControlV Sep 3, 2022
beca66d
clippy fix
ControlCplusControlV Sep 4, 2022
68e21fa
clippy maybe?
ControlCplusControlV Sep 4, 2022
9b56bd8
Merge branch 'master' into controlc/p2p_tx
ControlCplusControlV Sep 6, 2022
d866c11
Merge branch 'master' into controlc/p2p_tx
ControlCplusControlV Sep 9, 2022
716a3fa
updated fmt
ControlCplusControlV Sep 9, 2022
e248f8c
ci passes?
ControlCplusControlV Sep 9, 2022
d8edc4f
Merge branch 'master' into controlc/p2p_tx
ControlCplusControlV Sep 12, 2022
bf32dad
Merge branch 'master' into controlc/p2p_tx
ControlCplusControlV Sep 17, 2022
cd62488
Merge branch 'master' into controlc/p2p_tx
bvrooman Sep 20, 2022
3c3f514
Error when tx_receiver channel sender is dropped
bvrooman Sep 20, 2022
e9b8cc4
Updated fmt
bvrooman Sep 21, 2022
06d0e6b
Spelling
bvrooman Sep 21, 2022
5a2e7a6
Update modules channel wiring
bvrooman Sep 23, 2022
afaa697
Merge branch 'master' into controlc/p2p_tx
ControlCplusControlV Sep 23, 2022
386e073
Keep alive for incoming_tx_sender
bvrooman Sep 23, 2022
afcf626
Merge branch 'controlc/p2p_tx' of https://github.com/FuelLabs/fuel-co…
bvrooman Sep 23, 2022
58034a3
Update features config and tests
bvrooman Sep 23, 2022
41669d3
Revert unrelated change
bvrooman Sep 23, 2022
64dd2e3
Merge branch 'master' into controlc/p2p_tx
ControlCplusControlV Sep 24, 2022
03300f8
test issues now
ControlCplusControlV Sep 24, 2022
58f4654
fixes
ControlCplusControlV Sep 24, 2022
84f0e18
fmt
ControlCplusControlV Sep 24, 2022
a081d96
was it that easy?
ControlCplusControlV Sep 24, 2022
a6c942c
oops
ControlCplusControlV Sep 24, 2022
f0ea4ba
New testcase for tx gossip
ControlCplusControlV Sep 25, 2022
68c780f
Fmt
bvrooman Sep 26, 2022
de0e154
Test p2p connection and integ
bvrooman Sep 26, 2022
db0bb65
add tx gossip to default topics
Voxelot Sep 26, 2022
4e1946c
Merge branch 'master' into controlc/p2p_tx
ControlCplusControlV Sep 27, 2022
a2498d7
Clean up use statements and dbg
bvrooman Sep 27, 2022
2f2e7d0
Refactor node_config
bvrooman Sep 27, 2022
93c3890
Simplify gossip test mod with feature flag
bvrooman Sep 27, 2022
d4e2fac
Remove readme changes
bvrooman Sep 29, 2022
df2fe70
Minor cleanup
bvrooman Sep 29, 2022
fe89f69
Minor cleanup
bvrooman Sep 29, 2022
cb7fcda
Add all gossip topics to default p2p config
bvrooman Sep 29, 2022
f63683a
Fix
bvrooman Sep 29, 2022
9b79b61
Merge branch 'master' into controlc/p2p_tx
bvrooman Sep 29, 2022
4a2346c
Clean up txpool cargo
bvrooman Sep 29, 2022
e1da109
Merge branch 'controlc/p2p_tx' of https://github.com/FuelLabs/fuel-co…
bvrooman Sep 29, 2022
be1a152
Remove debug statements
bvrooman Sep 29, 2022
b9a7812
Merge branch 'master' into controlc/p2p_tx
ControlCplusControlV Sep 30, 2022
427c4d8
Merge branch 'master' into controlc/p2p_tx
ControlCplusControlV Oct 1, 2022
c37a8c6
Remove Feature Flags from Tx Gossiping (#664)
ControlCplusControlV Oct 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion fuel-block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,3 @@ rstest = "0.15"

[features]
test-helpers = ["fuel-core-interfaces/test-helpers"]

33 changes: 30 additions & 3 deletions fuel-block-producer/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ use fuel_core_interfaces::{
Coin,
CoinStatus,
},
txpool::Sender as TxPoolSender,
};
use fuel_txpool::{
Config as TxPoolConfig,
MockDb as TxPoolDb,
ServiceBuilder as TxPoolServiceBuilder,
};
Expand All @@ -48,7 +50,10 @@ use rand::{
SeedableRng,
};
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::{
broadcast,
mpsc,
};

const COIN_AMOUNT: u64 = 1_000_000_000;

Expand Down Expand Up @@ -108,8 +113,30 @@ async fn block_producer() -> Result<()> {
let (import_block_events_tx, import_block_events_rx) = broadcast::channel(16);

let mut txpool_builder = TxPoolServiceBuilder::new();
txpool_builder.db(Box::new(txpool_db));
txpool_builder.import_block_event(import_block_events_rx);

let (tx_status_sender, mut tx_status_receiver) = broadcast::channel(100);

// Remove once tx_status events are used
tokio::spawn(async move { while (tx_status_receiver.recv().await).is_ok() {} });

let (txpool_sender, txpool_receiver) = mpsc::channel(100);
let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100);

let keep_alive = Box::new(incoming_tx_sender);
Box::leak(keep_alive);

txpool_builder
.config(TxPoolConfig::default())
.db(Box::new(txpool_db))
.incoming_tx_receiver(incoming_tx_receiver)
.import_block_event(import_block_events_rx)
.tx_status_sender(tx_status_sender)
.txpool_sender(TxPoolSender::new(txpool_sender))
.txpool_receiver(txpool_receiver);

let (p2p_request_event_sender, _p2p_request_event_receiver) = mpsc::channel(100);
txpool_builder.network_sender(p2p_request_event_sender);

let txpool = txpool_builder.build().unwrap();
txpool.start().await?;

Expand Down
2 changes: 2 additions & 0 deletions fuel-core-interfaces/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::oneshot;

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum TransactionBroadcast {
NewTransaction(Transaction),
}
Expand All @@ -24,6 +25,7 @@ pub enum BlockBroadcast {
NewBlock(FuelBlock),
}

#[derive(Debug)]
pub enum P2pRequestEvent {
RequestBlock {
height: BlockHeight,
Expand Down
5 changes: 5 additions & 0 deletions fuel-core-interfaces/src/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ impl Sender {
self.send(TxPoolMpsc::Remove { ids, response }).await?;
receiver.await.map_err(Into::into)
}

pub fn channel(buffer: usize) -> (Sender, mpsc::Receiver<TxPoolMpsc>) {
let (sender, reciever) = mpsc::channel(buffer);
(Sender(sender), reciever)
}
}

/// RPC commands that can be sent to the TxPool through an MPSC channel.
Expand Down
3 changes: 3 additions & 0 deletions fuel-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ pub mod schema;
pub mod service;
pub mod state;
pub mod tx_pool;

#[cfg(feature = "p2p")]
pub use fuel_p2p;
137 changes: 81 additions & 56 deletions fuel-core/src/service/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ use fuel_core_interfaces::{
BlockHeight,
FuelBlock,
},
txpool::TxPoolDb,
txpool::{
Sender,
TxPoolDb,
},
};
use futures::future::join_all;
use std::sync::Arc;
use tokio::{
sync::mpsc,
sync::{
broadcast,
mpsc,
},
task::JoinHandle,
};
use tracing::info;
Expand Down Expand Up @@ -63,105 +69,124 @@ pub async fn start_modules(config: &Config, database: &Database) -> Result<Modul
let bft = fuel_core_bft::Service::new(&config.bft, db).await?;
let sync = fuel_sync::Service::new(&config.sync).await?;

// create builders
#[cfg(feature = "relayer")]
let mut relayer_builder = fuel_relayer::ServiceBuilder::new();
let mut txpool_builder = fuel_txpool::ServiceBuilder::new();

// initiate fields for builders
#[cfg(feature = "relayer")]
relayer_builder
.config(config.relayer.clone())
.db(Box::new(database.clone()) as Box<dyn RelayerDb>)
.import_block_event(block_importer.subscribe())
.private_key(
hex::decode(
"c6bd905dcac2a0b1c43f574ab6933df14d7ceee0194902bce523ed054e8e798b",
)
.unwrap(),
);
let relayer = {
let mut relayer_builder = fuel_relayer::ServiceBuilder::new();
relayer_builder
.config(config.relayer.clone())
.db(Box::new(database.clone()) as Box<dyn RelayerDb>)
.import_block_event(block_importer.subscribe())
.private_key(
hex::decode(
"c6bd905dcac2a0b1c43f574ab6933df14d7ceee0194902bce523ed054e8e798b",
)
.unwrap(),
);

relayer_builder.build()?
};

let relayer_sender = {
#[cfg(feature = "relayer")]
{
relayer_builder.sender().clone()
relayer.sender().clone()
}
#[cfg(not(feature = "relayer"))]
{
fuel_core_interfaces::relayer::Sender::noop()
}
};

let (incoming_tx_sender, incoming_tx_receiver) = broadcast::channel(100);
let (block_event_sender, block_event_receiver) = mpsc::channel(100);

#[cfg(feature = "p2p")]
let (p2p_request_event_sender, p2p_request_event_receiver) = mpsc::channel(100);
#[cfg(not(feature = "p2p"))]
let (p2p_request_event_sender, mut p2p_request_event_receiver) = mpsc::channel(100);

#[cfg(feature = "p2p")]
let network_service = {
let p2p_db: Arc<dyn P2pDb> = Arc::new(database.clone());
let (tx_consensus, _) = mpsc::channel(100);
fuel_p2p::orchestrator::Service::new(
config.p2p.clone(),
p2p_db,
p2p_request_event_sender.clone(),
p2p_request_event_receiver,
tx_consensus,
incoming_tx_sender,
block_event_sender,
)
};
#[cfg(not(feature = "p2p"))]
{
let keep_alive = Box::new(incoming_tx_sender);
Box::leak(keep_alive);

let keep_alive = Box::new(block_event_sender);
Box::leak(keep_alive);

tokio::spawn(async move {
while (p2p_request_event_receiver.recv().await).is_some() {}
});
}

let (tx_status_sender, mut tx_status_receiver) = broadcast::channel(100);

// Remove once tx_status events are used
tokio::spawn(async move { while (tx_status_receiver.recv().await).is_ok() {} });

let (txpool_sender, txpool_receiver) = mpsc::channel(100);

let mut txpool_builder = fuel_txpool::ServiceBuilder::new();
txpool_builder
.config(config.txpool.clone())
.db(Box::new(database.clone()) as Box<dyn TxPoolDb>)
.import_block_event(block_importer.subscribe());
.incoming_tx_receiver(incoming_tx_receiver)
.import_block_event(block_importer.subscribe())
.tx_status_sender(tx_status_sender)
.txpool_sender(Sender::new(txpool_sender))
.txpool_receiver(txpool_receiver);

#[cfg(feature = "p2p")]
let (tx_request_event, rx_request_event) = mpsc::channel(100);
#[cfg(feature = "p2p")]
let (tx_block, rx_block) = mpsc::channel(100);
txpool_builder.network_sender(p2p_request_event_sender.clone());

#[cfg(not(feature = "p2p"))]
let (tx_request_event, _) = mpsc::channel(100);
#[cfg(not(feature = "p2p"))]
let (_, rx_block) = mpsc::channel(100);
let txpool = txpool_builder.build()?;

// start services

block_importer.start().await;

bft.start(
relayer_sender.clone(),
tx_request_event.clone(),
p2p_request_event_sender.clone(),
block_producer.clone(),
block_importer.sender().clone(),
block_importer.subscribe(),
)
.await;

sync.start(
rx_block,
tx_request_event.clone(),
block_event_receiver,
p2p_request_event_sender.clone(),
relayer_sender,
bft.sender().clone(),
block_importer.sender().clone(),
)
.await;

// build services
#[cfg(feature = "relayer")]
let relayer = relayer_builder.build()?;
let txpool = txpool_builder.build()?;

// start services
#[cfg(feature = "relayer")]
if config.relayer.eth_client.is_some() {
relayer.start().await?;
}
txpool.start().await?;

#[cfg(feature = "p2p")]
let p2p_db: Arc<dyn P2pDb> = Arc::new(database.clone());
#[cfg(feature = "p2p")]
let (tx_consensus, _) = mpsc::channel(100);
#[cfg(feature = "p2p")]
let (tx_transaction, _) = mpsc::channel(100);

#[cfg(feature = "p2p")]
let network_service = fuel_p2p::orchestrator::Service::new(
config.p2p.clone(),
p2p_db,
tx_request_event,
rx_request_event,
tx_consensus,
tx_transaction,
tx_block,
);

#[cfg(feature = "p2p")]
if !config.p2p.network_name.is_empty() {
network_service.start().await?;
}

txpool.start().await?;

Ok(Modules {
txpool: Arc::new(txpool),
block_importer: Arc::new(block_importer),
Expand Down
11 changes: 10 additions & 1 deletion fuel-p2p/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use crate::gossipsub::topics::{
CON_VOTE_GOSSIP_TOPIC,
NEW_BLOCK_GOSSIP_TOPIC,
NEW_TX_GOSSIP_TOPIC,
};
use libp2p::{
core::{
muxing::StreamMuxerBox,
Expand Down Expand Up @@ -102,7 +107,11 @@ impl P2PConfig {
allow_private_addresses: true,
enable_random_walk: true,
connection_idle_timeout: Some(Duration::from_secs(120)),
topics: vec![],
topics: vec![
NEW_TX_GOSSIP_TOPIC.into(),
NEW_BLOCK_GOSSIP_TOPIC.into(),
CON_VOTE_GOSSIP_TOPIC.into(),
],
max_mesh_size: 12,
min_mesh_size: 4,
ideal_mesh_size: 6,
Expand Down
10 changes: 5 additions & 5 deletions fuel-p2p/src/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use fuel_core_interfaces::p2p::{
P2pRequestEvent,
TransactionBroadcast,
};

use libp2p::request_response::RequestId;
use tokio::{
sync::{
broadcast,
mpsc::{
Receiver,
Sender,
Expand Down Expand Up @@ -49,7 +49,7 @@ pub struct NetworkOrchestrator {

// senders
tx_consensus: Sender<ConsensusBroadcast>,
tx_transaction: Sender<TransactionBroadcast>,
tx_transaction: broadcast::Sender<TransactionBroadcast>,
tx_block: Sender<BlockBroadcast>,
tx_outbound_responses: Sender<Option<(OutboundResponse, RequestId)>>,

Expand All @@ -62,7 +62,7 @@ impl NetworkOrchestrator {
rx_request_event: Receiver<P2pRequestEvent>,

tx_consensus: Sender<ConsensusBroadcast>,
tx_transaction: Sender<TransactionBroadcast>,
tx_transaction: broadcast::Sender<TransactionBroadcast>,
tx_block: Sender<BlockBroadcast>,

db: Arc<dyn P2pDb>,
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Service {
tx_request_event: Sender<P2pRequestEvent>,
rx_request_event: Receiver<P2pRequestEvent>,
tx_consensus: Sender<ConsensusBroadcast>,
tx_transaction: Sender<TransactionBroadcast>,
tx_transaction: broadcast::Sender<TransactionBroadcast>,
tx_block: Sender<BlockBroadcast>,
) -> Self {
let network_orchestrator = NetworkOrchestrator::new(
Expand Down Expand Up @@ -274,7 +274,7 @@ pub mod tests {

let (tx_request_event, rx_request_event) = tokio::sync::mpsc::channel(100);
let (tx_consensus, _) = tokio::sync::mpsc::channel(100);
let (tx_transaction, _) = tokio::sync::mpsc::channel(100);
let (tx_transaction, _) = tokio::sync::broadcast::channel(100);
let (tx_block, _) = tokio::sync::mpsc::channel(100);

let service = Service::new(
Expand Down
3 changes: 0 additions & 3 deletions fuel-relayer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ impl ServiceBuilder {
config: Default::default(),
}
}
pub fn sender(&mut self) -> &relayer::Sender {
&self.sender
}

pub fn private_key(&mut self, private_key: Vec<u8>) -> &mut Self {
self.private_key = Some(private_key);
Expand Down
2 changes: 2 additions & 0 deletions fuel-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ tokio = { version = "1.21", features = ["macros", "rt-multi-thread"] }
metrics = ["fuel-core/rocksdb", "fuel-core/metrics"]
default = ["fuel-core/default", "metrics"]
debug = ["fuel-core-interfaces/debug"]
p2p = ["fuel-core/p2p"]
relayer = ["fuel-core/relayer"]
2 changes: 2 additions & 0 deletions fuel-tests/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ mod node_info;
mod resource;
mod snapshot;
mod tx;
#[cfg(feature = "p2p")]
mod tx_gossip;
Loading