diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a50f626c70..c993c8c8ba 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -206,7 +206,7 @@ jobs: BPF_ALIAS: /coda/0.0.1/29936104443aaf264a7f0192ac64b1c7173198c1ed404c1bcff5e562e05eb7f6-0.0.0.0 strategy: matrix: - test: [p2p_basic_connections, p2p_basic_incoming, p2p_basic_outgoing, p2p_pubsub] + test: [p2p_basic_connections, p2p_basic_incoming, p2p_basic_outgoing, p2p_pubsub, p2p_kad] fail-fast: false services: @@ -295,9 +295,13 @@ jobs: test: - single_node - multi_node + - connection_discovery_ocaml_to_rust_via_seed + - connection_discovery_ocaml_to_rust + - connection_discovery_rust_as_seed + - connection_discovery_rust_to_ocaml_via_seed + - connection_discovery_rust_to_ocaml # - webrtc_single_node # - webrtc_multi_node - # - connection_discovery fail-fast: false services: diff --git a/node/src/snark/snark_effects.rs b/node/src/snark/snark_effects.rs index 23b1ac001d..3a095fa39e 100644 --- a/node/src/snark/snark_effects.rs +++ b/node/src/snark/snark_effects.rs @@ -1,6 +1,6 @@ use snark::work_verify::SnarkWorkVerifyAction; -use crate::{snark_pool::candidate::SnarkPoolCandidateAction, Service, SnarkPoolAction, Store}; +use crate::{snark_pool::candidate::SnarkPoolCandidateAction, Service, Store}; use super::{SnarkAction, SnarkActionWithMeta}; diff --git a/node/testing/src/cluster/runner/mod.rs b/node/testing/src/cluster/runner/mod.rs index 8d4b262b59..3416d73d0e 100644 --- a/node/testing/src/cluster/runner/mod.rs +++ b/node/testing/src/cluster/runner/mod.rs @@ -9,6 +9,7 @@ use node::{event_source::Event, ledger::LedgerService, ActionKind, State}; use rand::{rngs::StdRng, SeedableRng}; use time::OffsetDateTime; +use crate::node::OcamlStep; use crate::{ cluster::{Cluster, ClusterNodeId, ClusterOcamlNodeId}, network_debugger::Debugger, @@ -450,4 +451,15 @@ impl<'a> ClusterRunner<'a> { ) .await } + + pub async fn wait_for_ocaml(&mut self, node_id: ClusterOcamlNodeId) { + self.exec_step(ScenarioStep::Ocaml { + node_id, + step: OcamlStep::WaitReady { + timeout: Duration::from_secs(6 * 60), + }, + }) + .await + .expect("Error waiting for ocaml node"); + } } diff --git a/node/testing/src/node/ocaml/config.rs b/node/testing/src/node/ocaml/config.rs index 1233a5f3f9..7f5db038d3 100644 --- a/node/testing/src/node/ocaml/config.rs +++ b/node/testing/src/node/ocaml/config.rs @@ -15,6 +15,16 @@ pub struct OcamlNodeTestingConfig { pub block_producer: Option, } +impl Default for OcamlNodeTestingConfig { + fn default() -> Self { + Self { + initial_peers: vec![], + daemon_json: DaemonJson::Custom("/var/lib/coda/config_dc6bf78b.json".to_owned()), + block_producer: None, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub enum DaemonJson { // TODO(binier): have presets. diff --git a/node/testing/src/node/ocaml/mod.rs b/node/testing/src/node/ocaml/mod.rs index 4be1edfa0b..b6ac601093 100644 --- a/node/testing/src/node/ocaml/mod.rs +++ b/node/testing/src/node/ocaml/mod.rs @@ -25,7 +25,7 @@ pub struct OcamlNode { temp_dir: temp_dir::TempDir, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum OcamlStep { /// Wait till ocaml node is ready. /// @@ -294,18 +294,9 @@ impl OcamlNode { } /// Queries graphql to get chain_id. - pub fn chain_id(&self) -> anyhow::Result { - let res = self.grapql_query("query { daemonStatus { chainId } }")?; - let chain_id = res["data"]["daemonStatus"]["chainId"] - .as_str() - .ok_or_else(|| anyhow::anyhow!("empty chain_id response"))?; - ChainId::from_hex(chain_id).map_err(|e| anyhow::anyhow!("invalid chain_id: {}", e)) - } - - /// Queries graphql to get chain_id. - pub async fn chain_id_async(&self) -> anyhow::Result { + pub async fn chain_id(&self) -> anyhow::Result { let res = self - .grapql_query_async("query { daemonStatus { chainId } }") + .grapql_query("query { daemonStatus { chainId } }") .await?; let chain_id = res["data"]["daemonStatus"]["chainId"] .as_str() @@ -315,21 +306,9 @@ impl OcamlNode { /// Queries graphql to check if ocaml node is synced, /// returning it's best tip hash if yes. - pub fn synced_best_tip(&self) -> anyhow::Result> { - let mut res = self.grapql_query("query { daemonStatus { syncStatus, stateHash } }")?; - let data = &mut res["data"]["daemonStatus"]; - if data["syncStatus"].as_str() == Some("SYNCED") { - Ok(Some(serde_json::from_value(data["stateHash"].take())?)) - } else { - Ok(None) - } - } - - /// Queries graphql to check if ocaml node is synced, - /// returning it's best tip hash if yes. - pub async fn synced_best_tip_async(&self) -> anyhow::Result> { + pub async fn synced_best_tip(&self) -> anyhow::Result> { let mut res = self - .grapql_query_async("query { daemonStatus { syncStatus, stateHash } }") + .grapql_query("query { daemonStatus { syncStatus, stateHash } }") .await?; let data = &mut res["data"]["daemonStatus"]; if data["syncStatus"].as_str() == Some("SYNCED") { @@ -348,7 +327,7 @@ impl OcamlNode { // Only `exec` function should be exposed and instead of this, we // should have a step to query graphql and assert response as a part // of that step. - pub async fn grapql_query_async(&self, query: &str) -> anyhow::Result { + pub async fn grapql_query(&self, query: &str) -> anyhow::Result { let client = reqwest::Client::new(); let response = client .post(self.graphql_addr()) @@ -362,24 +341,6 @@ impl OcamlNode { Ok(response.json().await?) } - // TODO(binier): shouldn't be publically accessible. - // - // Only `exec` function should be exposed and instead of this, we - // should have a step to query graphql and assert response as a part - // of that step. - pub fn grapql_query(&self, query: &str) -> anyhow::Result { - let client = reqwest::blocking::Client::new(); - let response = client - .post(self.graphql_addr()) - .json(&{ - serde_json::json!({ - "query": query - }) - }) - .send()?; - - Ok(response.json()?) - } async fn wait_for_p2p(&self, timeout: Duration) -> anyhow::Result<()> { let port = self.libp2p_port; @@ -408,7 +369,7 @@ impl OcamlNode { loop { interval.tick().await; if self - .synced_best_tip_async() + .synced_best_tip() .await .map_or(false, |tip| tip.is_some()) { diff --git a/node/testing/src/scenarios/multi_node/connection_discovery.rs b/node/testing/src/scenarios/multi_node/connection_discovery.rs index b7d4685394..a0a0754c54 100644 --- a/node/testing/src/scenarios/multi_node/connection_discovery.rs +++ b/node/testing/src/scenarios/multi_node/connection_discovery.rs @@ -1,275 +1,210 @@ use std::time::Duration; -use node::{ - event_source::Event, - p2p::{connection::P2pConnectionState, P2pConnectionEvent, P2pEvent, P2pPeerStatus}, -}; -use tokio::time::Instant; - use crate::{ - node::{DaemonJson, OcamlNodeTestingConfig, OcamlStep, RustNodeTestingConfig}, + cluster::{ClusterNodeId, ClusterOcamlNodeId}, + node::{OcamlNodeTestingConfig, RustNodeTestingConfig}, scenario::{ListenerNode, ScenarioStep}, - scenarios::{ - connection_finalized_event, get_peers_iter, wait_for_connection_established, ClusterRunner, - Driver, PEERS_QUERY, - }, + scenarios::{get_peers_iter, ClusterRunner, RunCfg, PEERS_QUERY}, +}; +use anyhow::Context; +use node::{ + p2p::{identify::P2pIdentifyAction, peer::P2pPeerAction, PeerId}, + ActionKind, P2pAction, }; +use tokio::time::sleep; /// Ensure that Rust node can pass information about peers when used as a seed node. +/// 1. Create rust seed node and wait for it to be ready +/// 2. Create 2 Ocaml nodes with rust seed node as initial peer +/// 3. Check that Ocaml nodes know each other via rust seed node #[derive(documented::Documented, Default, Clone, Copy)] pub struct RustNodeAsSeed; impl RustNodeAsSeed { pub async fn run(self, mut runner: ClusterRunner<'_>) { + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", "false"); let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default()); let rust_node_dial_addr = runner.node(rust_node_id).unwrap().dial_addr(); + let rust_peer_id = *rust_node_dial_addr.peer_id(); + wait_for_node_ready(&mut runner, rust_node_id).await; let ocaml_node_config = OcamlNodeTestingConfig { initial_peers: vec![rust_node_dial_addr], - daemon_json: DaemonJson::Custom("/var/lib/coda/config_dc6bf78b.json".to_owned()), - block_producer: None, + ..Default::default() }; let ocaml_node0 = runner.add_ocaml_node(ocaml_node_config.clone()); let ocaml_peer_id0 = runner.ocaml_node(ocaml_node0).unwrap().peer_id(); + // this is needed, to make sure that `ocaml_node0` connects before `ocaml_node1` + sleep(Duration::from_secs(30)).await; + let ocaml_node1 = runner.add_ocaml_node(ocaml_node_config.clone()); let ocaml_peer_id1 = runner.ocaml_node(ocaml_node1).unwrap().peer_id(); - let mut peers = vec![ocaml_peer_id0, ocaml_peer_id1]; - let mut duration = Duration::from_secs(8 * 60); - let mut driver = Driver::new(runner); - while !peers.is_empty() { - // wait for ocaml node to connect - let connected = - wait_for_connection_established(&mut driver, duration, |node_id, peer_id: &_| { - if rust_node_id == node_id { - peers.retain(|peer| peer != peer_id); - true - } else { - false - } - }) + wait_for_ready_connection( + &mut runner, + rust_node_id, + ocaml_peer_id0, + true, + Some(Duration::from_secs(500)), + ) + .await; + wait_for_ready_connection( + &mut runner, + rust_node_id, + ocaml_peer_id1, + true, + Some(Duration::from_secs(300)), + ) + .await; + + let _ = runner + .run(RunCfg::default().timeout(Duration::from_secs(120))) + .await; + + let ocaml_node0_check = + check_ocaml_peers(&mut runner, ocaml_node0, [rust_peer_id, ocaml_peer_id1]) .await - .unwrap(); - assert!(connected); - // TODO fix the assertion - // assert!(matches!( - // &state.p2p.peers.get(&ocaml_peer).unwrap().status, - // P2pPeerStatus::Ready(ready) if ready.is_incoming - // )); - duration = Duration::from_secs(60); - } - - let timeout = Instant::now() + Duration::from_secs(60); - let mut node0_has_node1 = false; - let mut node1_has_node0 = false; - while !node0_has_node1 && !node1_has_node0 && Instant::now() < timeout { - let node0_peers = driver - .inner() - .ocaml_node(ocaml_node0) - .unwrap() - .grapql_query(PEERS_QUERY) - .expect("peers graphql query"); - println!("{}", serde_json::to_string_pretty(&node0_peers).unwrap()); - node0_has_node1 = get_peers_iter(&node0_peers) - .unwrap() - .any(|peer| peer.unwrap().2 == ocaml_peer_id1.to_string()); - - let node1_peers = driver - .inner() - .ocaml_node(ocaml_node1) - .unwrap() - .grapql_query(PEERS_QUERY) - .expect("peers graphql query"); - println!("{}", serde_json::to_string_pretty(&node1_peers).unwrap()); - node1_has_node0 = get_peers_iter(&node1_peers) - .unwrap() - .any(|peer| peer.unwrap().2 == ocaml_peer_id0.to_string()); - - tokio::time::sleep(Duration::from_secs(10)).await; - } + .unwrap_or_default(); + + let ocaml_node1_check = + check_ocaml_peers(&mut runner, ocaml_node1, [rust_peer_id, ocaml_peer_id0]) + .await + .unwrap_or_default(); + + assert!(ocaml_node0_check, "OCaml node 0 doesn't have valid peers"); + assert!(ocaml_node1_check, "OCaml node 1 doesn't have valid peers"); + + let has_peer_in_routing_table = + check_kademlia_entries(&mut runner, rust_node_id, [ocaml_peer_id0, ocaml_peer_id1]) + .unwrap_or_default(); assert!( - node0_has_node1, - "ocaml node0 should have node1 as its peers" - ); - assert!( - node1_has_node0, - "ocaml node1 should have node0 as its peers" + has_peer_in_routing_table, + "Peers not found in rust node's routing table" ); - - // TODO: check known peers - // let state = driver.inner().node(rust_node_id).unwrap().state(); - // assert!( - // state.p2p.kademlia.known_peers.contains_key(&ocaml_peer_id0), - // "kademlia in rust seed statemachine should know ocaml node0" - // ); - // assert!( - // state.p2p.kademlia.known_peers.contains_key(&ocaml_peer_id1), - // "kademlia in rust seed statemachine should know ocaml node1" - // ); } } /// Test Rust node peer discovery when OCaml node connects to it +/// 1. Create rust node and wait for it to be ready +/// 2. Create OCaml node with rust node as initial peer +/// 3. Check that OCaml node connects to rust node #[derive(documented::Documented, Default, Clone, Copy)] pub struct OCamlToRust; impl OCamlToRust { pub async fn run(self, mut runner: ClusterRunner<'_>) { + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", "false"); let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default()); let rust_node_dial_addr = runner.node(rust_node_id).unwrap().dial_addr(); + let rust_peer_id = *rust_node_dial_addr.peer_id(); + wait_for_node_ready(&mut runner, rust_node_id).await; let ocaml_node_config = OcamlNodeTestingConfig { initial_peers: vec![rust_node_dial_addr], - daemon_json: DaemonJson::Custom("/var/lib/coda/config_dc6bf78b.json".to_owned()), - block_producer: None, + ..Default::default() }; let ocaml_node = runner.add_ocaml_node(ocaml_node_config.clone()); let ocaml_peer_id = runner.ocaml_node(ocaml_node).unwrap().peer_id(); - let mut driver = Driver::new(runner); - - // wait for ocaml node to connect - let connected = driver - .wait_for( - Duration::from_secs(5 * 60), - connection_finalized_event(|_, peer| peer == &ocaml_peer_id), - ) + wait_for_ready_connection( + &mut runner, + rust_node_id, + ocaml_peer_id, + true, + Some(Duration::from_secs(300)), + ) + .await; + + wait_for_identify( + &mut runner, + rust_node_id, + ocaml_peer_id, + "github.com/codaprotocol/coda/tree/master/src/app/libp2p_helper", + ) + .await; + + let ocaml_check = check_ocaml_peers(&mut runner, ocaml_node, [rust_peer_id]) .await - .unwrap() - .expect("expected connected event"); - // execute it - let state = driver.exec_even_step(connected).await.unwrap().unwrap(); - // check that now there is an outgoing connection to the ocaml peer - assert!(matches!( - &state.p2p.get_peer(&ocaml_peer_id).unwrap().status, - P2pPeerStatus::Ready(ready) if ready.is_incoming - )); - - // TODO: wait for identify message - // let identify = driver - // .wait_for( - // Duration::from_secs(5 * 60), - // identify_event(ocaml_peer_id.clone().into()), - // ) - // .await - // .unwrap() - // .expect("expected connected event"); - // // execute it - // let state = driver.exec_even_step(identify).await.unwrap().unwrap(); - // TODO: check that the peer address is added to kademlia + .expect("Error querying graphql"); + + assert!(ocaml_check, "OCaml node doesn't have rust as peer"); } } /// Tests Rust node peer discovery when it connects to OCaml node +/// 1. Create Rust node and wait for it to be ready +/// 2. Create OCaml node and wait for it to be ready +/// 3. Connect rust node to Ocaml node +/// 4. Check that it is connected +/// 5. Check for kademlia and identify #[derive(documented::Documented, Default, Clone, Copy)] pub struct RustToOCaml; impl RustToOCaml { pub async fn run(self, mut runner: ClusterRunner<'_>) { + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", "false"); let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default()); + let rust_peer_id = runner.node(rust_node_id).expect("Node not found").peer_id(); + wait_for_node_ready(&mut runner, rust_node_id).await; - let ocaml_seed_config = OcamlNodeTestingConfig { - initial_peers: Vec::new(), - daemon_json: DaemonJson::Custom("/var/lib/coda/config_dc6bf78b.json".to_owned()), - block_producer: None, - }; + let ocaml_seed_config = OcamlNodeTestingConfig::default(); let seed_node = runner.add_ocaml_node(ocaml_seed_config); let seed_peer_id = runner.ocaml_node(seed_node).unwrap().peer_id(); - runner - .exec_step(ScenarioStep::Ocaml { - node_id: seed_node, - step: OcamlStep::WaitReady { - timeout: Duration::from_secs(5 * 60), - }, - }) - .await - .unwrap(); - - let mut driver = Driver::new(runner); + runner.wait_for_ocaml(seed_node).await; - driver + runner .exec_step(ScenarioStep::ConnectNodes { dialer: rust_node_id, listener: ListenerNode::Ocaml(seed_node), }) .await - .unwrap(); - - // wait for conection finalize event - let connected = driver - .wait_for( - Duration::from_secs(5), - connection_finalized_event(|_, peer| peer == &seed_peer_id), - ) + .expect("Error connecting nodes"); + + wait_for_ready_connection(&mut runner, rust_node_id, seed_peer_id, false, None).await; + wait_for_identify( + &mut runner, + rust_node_id, + seed_peer_id, + "github.com/codaprotocol/coda/tree/master/src/app/libp2p_helper", + ) + .await; + + let ocaml_has_rust_peer = check_ocaml_peers(&mut runner, seed_node, [rust_peer_id]) .await - .unwrap() - .expect("expected connected event"); - // execute it - let state = driver.exec_even_step(connected).await.unwrap().unwrap(); - // check that now there is an outgoing connection to the ocaml peer - assert!(matches!( - &state.p2p.get_peer(&seed_peer_id).unwrap().status, - P2pPeerStatus::Ready(ready) if !ready.is_incoming - )); - - // wait for kademlia to add the ocaml peer - // let kad_add_rounte = driver.wait_for(Duration::from_secs(1), |_, event, _| { - // matches!(event, Event::P2p(P2pEvent::Discovery(P2pDiscoveryEvent::AddRoute(peer, addresses))) - // if peer == &seed_peer_id && addresses.iter().any(match_addr_with_port_and_peer_id(8302, seed_peer_id.clone().into())) - // ) - // }).await.unwrap().expect("expected add route event"); - // let state = driver - // .exec_even_step(kad_add_rounte) - // .await - // .unwrap() - // .unwrap(); - // assert!( - // state - // .p2p - // .kademlia - // .routes - // .get(&seed_peer_id.clone().into()) - // .map_or(false, |l| !l.is_empty()), - // "kademlia should know ocaml node's addresses" - // ); + .unwrap_or_default(); + assert!(ocaml_has_rust_peer, "Ocaml doesn't have rust node"); } } /// Tests Rust node peer discovery when OCaml node is connected to it via an OCaml seed node. +/// 1. Create Rust node and wait for it to be ready +/// 2. Create OCaml seed node and wait for it to be ready +/// 3. Connect rust node to OCaml seed node +/// 4. Create OCaml node and connect to OCaml seed node +/// 5. Check that OCaml node connects to rust node via seed #[derive(documented::Documented, Default, Clone, Copy)] pub struct OCamlToRustViaSeed; impl OCamlToRustViaSeed { pub async fn run(self, mut runner: ClusterRunner<'_>) { + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", "false"); let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default()); + wait_for_node_ready(&mut runner, rust_node_id).await; - let ocaml_seed_config = OcamlNodeTestingConfig { - initial_peers: Vec::new(), - daemon_json: DaemonJson::Custom("/var/lib/coda/config_dc6bf78b.json".to_owned()), - block_producer: None, - }; - + let ocaml_seed_config = OcamlNodeTestingConfig::default(); let seed_node = runner.add_ocaml_node(ocaml_seed_config.clone()); let (seed_peer_id, seed_addr) = runner .ocaml_node(seed_node) .map(|node| (node.peer_id(), node.dial_addr())) .unwrap(); - runner - .exec_step(ScenarioStep::Ocaml { - node_id: seed_node, - step: OcamlStep::WaitReady { - timeout: Duration::from_secs(5 * 60), - }, - }) - .await - .unwrap(); + runner.wait_for_ocaml(seed_node).await; runner .exec_step(ScenarioStep::ConnectNodes { @@ -279,122 +214,54 @@ impl OCamlToRustViaSeed { .await .unwrap(); - let mut driver = Driver::new(runner); - - let connected = driver - .wait_for( - Duration::from_secs(5), - connection_finalized_event(|_, peer| peer == &seed_peer_id), - ) - .await - .unwrap() - .expect("expected connected event"); - - let state = driver.exec_even_step(connected).await.unwrap().unwrap(); - assert!(matches!( - &state.p2p.get_peer(&seed_peer_id).unwrap().status, - P2pPeerStatus::Ready(ready) if !ready.is_incoming - )); + wait_for_ready_connection(&mut runner, rust_node_id, seed_peer_id, false, None).await; - let ocaml_node = driver.inner_mut().add_ocaml_node(OcamlNodeTestingConfig { + let ocaml_node = runner.add_ocaml_node(OcamlNodeTestingConfig { initial_peers: vec![seed_addr], ..ocaml_seed_config }); - let ocaml_peer_id = driver.inner().ocaml_node(ocaml_node).unwrap().peer_id(); - - driver - .exec_step(ScenarioStep::ManualEvent { - node_id: rust_node_id, - event: Box::new(Event::P2p(node::p2p::P2pEvent::Connection( - P2pConnectionEvent::Closed(seed_peer_id), - ))), - }) - .await - .unwrap(); - assert!(matches!( - &driver - .inner() - .node(rust_node_id) - .unwrap() - .state() - .p2p - .get_peer(&seed_peer_id) - .unwrap() - .status, - P2pPeerStatus::Disconnected { .. } - )); - - let connected = driver - .wait_for(Duration::from_secs(5 * 60), |_, event, _| { - matches!( - event, - Event::P2p(node::p2p::P2pEvent::Connection( - P2pConnectionEvent::Finalized(peer, res), - )) - if peer == &ocaml_peer_id && res.is_ok() - ) - }) - .await - .unwrap() - .expect("expected connected event"); - - let state = driver.exec_even_step(connected).await.unwrap().unwrap(); - assert!(matches!( - &state.p2p.get_peer(&ocaml_peer_id).unwrap().status, - P2pPeerStatus::Ready(ready) if ready.is_incoming - )); + let ocaml_peer_id = runner.ocaml_node(ocaml_node).unwrap().peer_id(); + + runner.wait_for_ocaml(ocaml_node).await; + wait_for_ready_connection(&mut runner, rust_node_id, ocaml_peer_id, true, None).await; } } /// Tests Rust node peer discovery when it connects to OCaml node via an OCaml seed node. +/// 1. Create rust node and wait for it to be ready +/// 2. Create OCaml seed node and wait for it to be ready +/// 3. Create OCaml node with OCaml seed as initial peer +/// 4. Wait for OCaml node to be ready +/// 5. Connect rust node to OCaml seed node +/// 6. Check that rust node connects to OCaml node via OCaml seed node #[derive(documented::Documented, Default, Clone, Copy)] pub struct RustToOCamlViaSeed; impl RustToOCamlViaSeed { pub async fn run(self, mut runner: ClusterRunner<'_>) { + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", "false"); let rust_node_id = runner.add_rust_node(RustNodeTestingConfig::devnet_default()); + wait_for_node_ready(&mut runner, rust_node_id).await; - let ocaml_seed_config = OcamlNodeTestingConfig { - initial_peers: Vec::new(), - daemon_json: DaemonJson::Custom("/var/lib/coda/config_dc6bf78b.json".to_owned()), - block_producer: None, - }; + let ocaml_seed_config = OcamlNodeTestingConfig::default(); let seed_node = runner.add_ocaml_node(ocaml_seed_config.clone()); + runner.wait_for_ocaml(seed_node).await; + let (seed_peer_id, seed_addr) = runner .ocaml_node(seed_node) .map(|node| (node.peer_id(), node.dial_addr())) .unwrap(); - tokio::time::sleep(Duration::from_secs(60)).await; - let ocaml_node = runner.add_ocaml_node(OcamlNodeTestingConfig { initial_peers: vec![seed_addr], ..ocaml_seed_config }); + let ocaml_peer_id = runner.ocaml_node(ocaml_node).unwrap().peer_id(); + runner.wait_for_ocaml(ocaml_node).await; - let wait_step = OcamlStep::WaitReady { - timeout: Duration::from_secs(60), - }; - runner - .exec_step(ScenarioStep::Ocaml { - node_id: seed_node, - step: wait_step.clone(), - }) - .await - .unwrap(); runner - .exec_step(ScenarioStep::Ocaml { - node_id: ocaml_node, - step: wait_step, - }) - .await - .unwrap(); - - let mut driver = Driver::new(runner); - - driver .exec_step(ScenarioStep::ConnectNodes { dialer: rust_node_id, listener: ListenerNode::Ocaml(seed_node), @@ -402,83 +269,121 @@ impl RustToOCamlViaSeed { .await .unwrap(); - let connected = driver - .wait_for( - Duration::from_secs(5), - connection_finalized_event(|_, peer| peer == &seed_peer_id), - ) - .await - .unwrap() - .expect("expected connected event"); - - let state = driver.exec_even_step(connected).await.unwrap().unwrap(); - assert!(matches!( - &state.p2p.get_peer(&seed_peer_id).unwrap().status, - P2pPeerStatus::Ready(ready) if !ready.is_incoming - )); - - let timeout = std::time::Instant::now() + Duration::from_secs(3 * 60); - let mut found = false; - while !found && std::time::Instant::now() < timeout { - let mut steps = Vec::new(); - for (node_id, state, events) in driver.inner_mut().pending_events(true) { - for (_, event) in events { - match event { - Event::P2p(P2pEvent::Connection(P2pConnectionEvent::Finalized( - peer, - Ok(()), - ))) if peer == &ocaml_peer_id => { - if let Some(peer_state) = &state.p2p.get_peer(peer) { - let status = &peer_state.status; - if let P2pPeerStatus::Connecting(P2pConnectionState::Incoming(..)) = - status - { - steps.push(ScenarioStep::ManualEvent { - node_id, - event: Box::new(Event::P2p(P2pEvent::Connection( - P2pConnectionEvent::Closed(*peer), - ))), - }); - } else { - steps.push(ScenarioStep::Event { - node_id, - event: event.to_string(), - }); - found = true; - } - } - } - _ => { - steps.push(ScenarioStep::Event { - node_id, - event: event.to_string(), - }); - } - } - } - } - for step in steps { - driver.exec_step(step).await.unwrap(); - } - if !found { - driver.idle(Duration::from_millis(10)).await.unwrap(); - } - } - - let _p2p = &driver.inner().node(rust_node_id).unwrap().state().p2p; - - // assert!( - // p2p.kademlia.known_peers.contains_key(&seed_peer_id), - // "should know seed node" - // ); - // assert!( - // p2p.kademlia.known_peers.contains_key(&ocaml_peer_id), - // "should know ocaml node" - // ); - - // assert!(matches!( - // &p2p.peers.get(&ocaml_peer_id).expect("ocaml node should be connected").status, - // P2pPeerStatus::Ready(ready) if !ready.is_incoming - // )); + wait_for_ready_connection(&mut runner, rust_node_id, seed_peer_id, false, None).await; + wait_for_ready_connection(&mut runner, rust_node_id, ocaml_peer_id, false, None).await; } } + +pub async fn wait_for_node_ready(runner: &mut ClusterRunner<'_>, node_id: ClusterNodeId) { + runner + .run(RunCfg::default().action_handler(move |id, _, _, action| { + node_id == id && matches!(action.action().kind(), ActionKind::P2pInitializeInitialize) + })) + .await + .expect("Node not ready") +} + +pub async fn wait_for_identify( + runner: &mut ClusterRunner<'_>, + node_id: ClusterNodeId, + connecting_peer_id: PeerId, + agent_version: &str, +) { + let agent_version = agent_version.to_owned(); + runner + .run( + RunCfg::default() + .action_handler(move |id, _, _, action| { + id == node_id + && matches!( + action.action(), + node::Action::P2p(P2pAction::Identify(P2pIdentifyAction::UpdatePeerInformation { + peer_id, + info + })) if peer_id == &connecting_peer_id && info.agent_version == Some(agent_version.to_string()) + ) + }), + ) + .await + .expect("Identify not exchanged"); +} + +async fn wait_for_ready_connection( + runner: &mut ClusterRunner<'_>, + node_id: ClusterNodeId, + connecting_peer_id: PeerId, + incoming_: bool, + duration: Option, +) { + runner + .run( + RunCfg::default() + .timeout(duration.unwrap_or(Duration::from_secs(60))) + .action_handler(move |id, _, _, action| { + id == node_id + && matches!( + action.action(), + &node::Action::P2p(P2pAction::Peer(P2pPeerAction::Ready { + peer_id, + incoming + })) if peer_id == connecting_peer_id && incoming == incoming_ + ) + }), + ) + .await + .expect("Nodes not connected"); +} + +async fn check_ocaml_peers( + runner: &mut ClusterRunner<'_>, + node_id: ClusterOcamlNodeId, + peer_ids: A, +) -> anyhow::Result +where + A: IntoIterator, +{ + let data = runner + .ocaml_node(node_id) + .expect("OCaml node not found") + .grapql_query(PEERS_QUERY) + .await?; + + let peers = get_peers_iter(&data) + .with_context(|| "Failed to convert graphql response")? + .flatten() + .map(|peer| peer.2.to_owned()) + .collect::>(); + + Ok(peer_ids + .into_iter() + .all(|peer_id| peers.contains(&peer_id.to_libp2p_string()))) +} + +pub fn check_kademlia_entries( + runner: &mut ClusterRunner<'_>, + node_id: ClusterNodeId, + peer_ids: A, +) -> anyhow::Result +where + A: IntoIterator, +{ + let table = &runner + .node(node_id) + .with_context(|| "Node not found")? + .state() + .p2p + .ready() + .with_context(|| "P2p state not ready")? + .network + .scheduler + .discovery_state() + .with_context(|| "Discovery state not ready")? + .routing_table; + + Ok(peer_ids.into_iter().all(|peer_id| { + table + .look_up(&peer_id.into()) + .map(|entry| entry.peer_id == peer_id) + .unwrap_or_default() + })) +} diff --git a/node/testing/src/scenarios/p2p/kademlia.rs b/node/testing/src/scenarios/p2p/kademlia.rs index f8c18d7ca2..70793fb1cc 100644 --- a/node/testing/src/scenarios/p2p/kademlia.rs +++ b/node/testing/src/scenarios/p2p/kademlia.rs @@ -1,293 +1,83 @@ -use std::{net::Ipv4Addr, time::Duration}; - -use libp2p::{ - futures::StreamExt, - identity::Keypair, - pnet::PreSharedKey, - swarm::{NetworkBehaviour, SwarmEvent}, - Transport, -}; -use multiaddr::{multiaddr, Multiaddr}; -use node::p2p::{ - connection::outgoing::{P2pConnectionOutgoingInitLibp2pOpts, P2pConnectionOutgoingInitOpts}, - identity::SecretKey, - webrtc::Host, -}; +use node::ActionKind; use crate::{ + cluster::ClusterNodeId, node::RustNodeTestingConfig, scenario::ListenerNode, - scenarios::{trace_steps, wait_for_nodes_listening_on_localhost, ClusterRunner, Driver}, + scenarios::{ + multi_node::connection_discovery::{ + check_kademlia_entries, wait_for_identify, wait_for_node_ready, + }, + ClusterRunner, RunCfg, + }, }; -const LOCALHOST: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1); - -/// Incoming FIND_NODE request test. -#[derive(documented::Documented, Default, Clone, Copy)] -pub struct IncomingFindNode; - -impl IncomingFindNode { - pub async fn run(self, runner: ClusterRunner<'_>) { - let mut driver = Driver::new(runner); - let (node1, peer_id1) = driver.add_rust_node( - RustNodeTestingConfig::devnet_default().initial_peers( - (0..100) - .map(|n| { - let peer_id = SecretKey::rand().public_key().peer_id(); - let port = 12000 + n; - let host = Host::Ipv4([127, 0, 0, 1].into()); - ListenerNode::Custom(P2pConnectionOutgoingInitOpts::LibP2P( - P2pConnectionOutgoingInitLibp2pOpts { - peer_id, - host, - port, - }, - )) - }) - .collect(), - ), - ); - - let addr = format!( - "/ip4/127.0.0.1/tcp/{}/p2p/{}", - driver - .inner() - .node(node1) - .unwrap() - .state() - .p2p - .config() - .libp2p_port - .unwrap(), - peer_id1.to_libp2p_string(), - ) - .parse::() - .unwrap(); - - assert!( - wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node1]) - .await - .unwrap(), - "node should be listening" - ); - - let identity_key = - Keypair::ed25519_from_bytes([0xba; 32]).expect("secret key bytes must be valid"); - - let handle = tokio::spawn(async move { - let mut fake_peer = fake_kad_peer(identity_key, None).unwrap(); - fake_peer.dial(addr.clone()).unwrap(); - fake_peer - .behaviour_mut() - .kademlia - .add_address(&peer_id1.into(), addr); - loop { - let next = fake_peer.next().await.unwrap(); - println!("<<< {next:?}"); - match next { - SwarmEvent::ConnectionEstablished { peer_id: _, .. } => { - fake_peer.behaviour_mut().kademlia.bootstrap().unwrap(); - } - SwarmEvent::Behaviour(Event::Kademlia( - libp2p::kad::Event::OutboundQueryProgressed { stats, .. }, - )) => { - return (stats.num_successes() >= 1) - .then_some(()) - .ok_or(format!("incorrect query stats: {stats:?}")); - } - _ => {} - } - } - }); - - tokio::select! { - res = trace_steps(driver.inner_mut()) => { - panic!("statemachine finished unexpectedly: {res:?}"); - } - res = tokio::time::timeout(Duration::from_secs(20), handle) => { - let res = res.expect("timeout waiting for kad query result"); - if let Err(err) = res { - panic!("error from peer: {err}"); - } - } - } - } -} - /// Kademlia bootstrap test. +/// 1. Create seed node and wait for it to be ready +/// 2. Create NUM nodes that will connect to seed node but not perform further discovery +/// 3. Check that seed nodes has all nodes in it's routing table +/// 4. Create new node with only seed node as peer +/// 5. Wait for new node to bootstrap +/// 6. Check that new node has all peers in it table #[derive(documented::Documented, Default, Clone, Copy)] pub struct KademliaBootstrap; impl KademliaBootstrap { - pub async fn run(self, runner: ClusterRunner<'_>) { - const NUM: u8 = 10; - let identity_key = - Keypair::ed25519_from_bytes([0xba; 32]).expect("secret key bytes must be valid"); - let peer_id = identity_key.public().to_peer_id(); - - let mut fake_peer = fake_kad_peer(identity_key, Some(13000)).unwrap(); - - for n in 1..NUM + 1 { - let mut bytes = [0; 32]; - bytes[0] = n; - let peer_id = SecretKey::from_bytes(bytes).public_key().peer_id(); - fake_peer.behaviour_mut().kademlia.add_address( - &peer_id.into(), - multiaddr!(Ip4(LOCALHOST), Tcp(12000 + (n as u16))), - ); + pub async fn run(self, mut runner: ClusterRunner<'_>) { + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", "false"); + const NUM: u8 = 16; + + let seed_node = runner.add_rust_node(RustNodeTestingConfig::devnet_default()); + wait_for_node_ready(&mut runner, seed_node).await; + let mut nodes = vec![]; + + let config = RustNodeTestingConfig { + initial_peers: vec![ListenerNode::Rust(seed_node)], + ..RustNodeTestingConfig::devnet_default() + }; + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", "true"); + for _ in 0..NUM { + let node_id = runner.add_rust_node(config.clone()); + let peer_id = runner.node(node_id).expect("Node not found").peer_id(); + + nodes.push((node_id, peer_id)); + + wait_for_bootstrap(&mut runner, node_id).await; + wait_for_identify(&mut runner, seed_node, peer_id, "openmina").await; } - let mut driver = Driver::new(runner); - - let (node1, _peer_id1) = - driver.add_rust_node(RustNodeTestingConfig::devnet_default().initial_peers( - FromIterator::from_iter([ListenerNode::Custom( - P2pConnectionOutgoingInitOpts::LibP2P(P2pConnectionOutgoingInitLibp2pOpts { - peer_id: peer_id.into(), - host: Host::Ipv4(LOCALHOST), - port: 13000, - }), - )]), - )); - + let peer_ids = nodes.iter().map(|node| node.1); + let seed_has_peers = + check_kademlia_entries(&mut runner, seed_node, peer_ids.clone()).unwrap_or_default(); assert!( - wait_for_nodes_listening_on_localhost(&mut driver, Duration::from_secs(30), [node1]) - .await - .unwrap(), - "node should be listening" + seed_has_peers, + "Seed doesn't have all peers in it's routing table" ); - // wait for listener to be ready - tokio::time::timeout( - Duration::from_secs(1), - (&mut fake_peer) - .any(|event| async move { matches!(event, SwarmEvent::NewListenAddr { .. }) }), - ) - .await - .expect("should be listening"); - - let handle = tokio::spawn(async move { - loop { - let next = fake_peer.next().await.unwrap(); - println!("<<< {next:?}"); - match next { - SwarmEvent::ConnectionEstablished { .. } => {} - SwarmEvent::Behaviour(Event::Kademlia( - libp2p::kad::Event::InboundRequest { .. }, - )) => { - // return Result::<_, String>::Ok(()); - } - _ => {} - } - } - }); + std::env::set_var("OPENMINA_DISCOVERY_FILTER_ADDR", "true"); + let new_node = runner.add_rust_node(config); + let new_node_peer_id = runner.node(new_node).expect("Not note found").peer_id(); - tokio::select! { - res = trace_steps(driver.inner_mut()) => { - panic!("statemachine finished unexpectedly: {res:?}"); - } - res = tokio::time::timeout(Duration::from_secs(20000), handle) => { - let res = res.expect("timeout waiting for kad query result"); - if let Err(err) = res { - panic!("error from peer: {err}"); - } - } - } + wait_for_bootstrap(&mut runner, new_node).await; + wait_for_identify(&mut runner, seed_node, new_node_peer_id, "openmina").await; + let new_has_peers = + check_kademlia_entries(&mut runner, seed_node, peer_ids).unwrap_or_default(); + assert!( + new_has_peers, + "Node doesn't have all peers in it's routing table" + ); } } -fn fake_kad_peer( - identity_key: Keypair, - port: Option, -) -> anyhow::Result> { - let psk = PreSharedKey::new(openmina_core::DEVNET_CHAIN_ID.preshared_key()); - let _identify = libp2p::identify::Behaviour::new(libp2p::identify::Config::new( - "ipfs/0.1.0".to_string(), - identity_key.public(), - )); - - let peer_id = identity_key.public().to_peer_id(); - println!("======== peer_id: {peer_id}"); - println!( - "======== peer_id bytes: {}", - hex::encode(peer_id.to_bytes()) - ); - let kad_config = { - let mut c = libp2p::kad::Config::default(); - c.set_protocol_names(vec![libp2p::StreamProtocol::new("/coda/kad/1.0.0")]); - c - }; - let mut kademlia = libp2p::kad::Behaviour::with_config( - peer_id, - libp2p::kad::store::MemoryStore::new(peer_id), - kad_config, - ); - - if port.is_some() { - kademlia.set_mode(Some(libp2p::kad::Mode::Server)); - } - - let behaviour = Behaviour { - // identify, - kademlia, - }; - - let swarm = libp2p::SwarmBuilder::with_existing_identity(identity_key) - .with_tokio() - .with_other_transport(|key| { - let noise_config = libp2p::noise::Config::new(key).unwrap(); - let mut yamux_config = libp2p::yamux::Config::default(); - - yamux_config.set_protocol_name("/coda/yamux/1.0.0"); - - let mut base_transport = libp2p::tcp::tokio::Transport::new( - libp2p::tcp::Config::default() - .nodelay(true) - .port_reuse(true), - ); - - if let Some(port) = port { - base_transport - .listen_on( - libp2p::core::transport::ListenerId::next(), - multiaddr!(Ip4([127, 0, 0, 1]), Tcp(port)), - ) - .expect("listen"); - } - - base_transport - .and_then(move |socket, _| libp2p::pnet::PnetConfig::new(psk).handshake(socket)) - .upgrade(libp2p::core::upgrade::Version::V1) - .authenticate(noise_config) - .multiplex(yamux_config) - .timeout(Duration::from_secs(60)) - })? - .with_dns()? - .with_behaviour(|_| behaviour)? - .with_swarm_config(|config| { - config.with_idle_connection_timeout(Duration::from_millis(1000)) - }) - .build(); - - //swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server)); - - Ok(swarm) -} - -#[derive(NetworkBehaviour)] -#[behaviour(to_swarm = "Event")] -pub struct Behaviour { - // pub gossipsub: gossipsub::Behaviour, - // pub rpc: RpcBehaviour, - // pub identify: libp2p::identify::Behaviour, - pub kademlia: libp2p::kad::Behaviour, -} - -#[derive(Debug, derive_more::From)] -pub enum Event { - // Identify(IdentifyEvent), - // Gossipsub(gossipsub::Event), - // Rpc((PeerId, RpcEvent)), - // Identify(libp2p::identify::Event), - Kademlia(libp2p::kad::Event), +async fn wait_for_bootstrap(runner: &mut ClusterRunner<'_>, node_id: ClusterNodeId) { + runner + .run(RunCfg::default().action_handler(move |node, _, _, action| { + node == node_id + && matches!( + action.action().kind(), + ActionKind::P2pNetworkKademliaBootstrapFinished + ) + })) + .await + .expect("Node failed to bootstrap") } diff --git a/node/testing/tests/connection_discovery.rs b/node/testing/tests/connection_discovery.rs deleted file mode 100644 index 9d9906193e..0000000000 --- a/node/testing/tests/connection_discovery.rs +++ /dev/null @@ -1,21 +0,0 @@ -#![cfg(all(not(feature = "p2p-webrtc"), feature = "p2p-libp2p"))] - -use openmina_node_testing::scenarios::multi_node::connection_discovery::{ - OCamlToRust, OCamlToRustViaSeed, RustNodeAsSeed, RustToOCaml, RustToOCamlViaSeed, -}; - -mod common; - -scenario_test!(rust_to_ocaml, RustToOCaml, RustToOCaml); -scenario_test!(ocaml_to_rust, OCamlToRust, OCamlToRust); -scenario_test!( - rust_to_ocaml_via_seed, - RustToOCamlViaSeed, - RustToOCamlViaSeed -); -scenario_test!( - ocaml_to_rust_via_seed, - OCamlToRustViaSeed, - OCamlToRustViaSeed -); -scenario_test!(rust_as_seed, RustNodeAsSeed, RustNodeAsSeed); diff --git a/node/testing/tests/connection_discovery_ocaml_to_rust.rs b/node/testing/tests/connection_discovery_ocaml_to_rust.rs new file mode 100644 index 0000000000..0481f3eeb4 --- /dev/null +++ b/node/testing/tests/connection_discovery_ocaml_to_rust.rs @@ -0,0 +1,7 @@ +#![cfg(all(not(feature = "p2p-webrtc"), feature = "p2p-libp2p"))] + +use openmina_node_testing::scenarios::multi_node::connection_discovery::OCamlToRust; + +mod common; + +scenario_test!(ocaml_to_rust, OCamlToRust, OCamlToRust); diff --git a/node/testing/tests/connection_discovery_ocaml_to_rust_via_seed.rs b/node/testing/tests/connection_discovery_ocaml_to_rust_via_seed.rs new file mode 100644 index 0000000000..e7d197c110 --- /dev/null +++ b/node/testing/tests/connection_discovery_ocaml_to_rust_via_seed.rs @@ -0,0 +1,11 @@ +#![cfg(all(not(feature = "p2p-webrtc"), feature = "p2p-libp2p"))] + +use openmina_node_testing::scenarios::multi_node::connection_discovery::OCamlToRustViaSeed; + +mod common; + +scenario_test!( + ocaml_to_rust_via_seed, + OCamlToRustViaSeed, + OCamlToRustViaSeed +); diff --git a/node/testing/tests/connection_discovery_rust_as_seed.rs b/node/testing/tests/connection_discovery_rust_as_seed.rs new file mode 100644 index 0000000000..cd217861e6 --- /dev/null +++ b/node/testing/tests/connection_discovery_rust_as_seed.rs @@ -0,0 +1,7 @@ +#![cfg(all(not(feature = "p2p-webrtc"), feature = "p2p-libp2p"))] + +use openmina_node_testing::scenarios::multi_node::connection_discovery::RustNodeAsSeed; + +mod common; + +scenario_test!(rust_as_seed, RustNodeAsSeed, RustNodeAsSeed); diff --git a/node/testing/tests/connection_discovery_rust_to_ocaml.rs b/node/testing/tests/connection_discovery_rust_to_ocaml.rs new file mode 100644 index 0000000000..16f165a5f3 --- /dev/null +++ b/node/testing/tests/connection_discovery_rust_to_ocaml.rs @@ -0,0 +1,7 @@ +#![cfg(all(not(feature = "p2p-webrtc"), feature = "p2p-libp2p"))] + +use openmina_node_testing::scenarios::multi_node::connection_discovery::RustToOCaml; + +mod common; + +scenario_test!(rust_to_ocaml, RustToOCaml, RustToOCaml); diff --git a/node/testing/tests/connection_discovery_rust_to_ocaml_via_seed.rs b/node/testing/tests/connection_discovery_rust_to_ocaml_via_seed.rs new file mode 100644 index 0000000000..6049b48c3c --- /dev/null +++ b/node/testing/tests/connection_discovery_rust_to_ocaml_via_seed.rs @@ -0,0 +1,11 @@ +#![cfg(all(not(feature = "p2p-webrtc"), feature = "p2p-libp2p"))] + +use openmina_node_testing::scenarios::multi_node::connection_discovery::RustToOCamlViaSeed; + +mod common; + +scenario_test!( + rust_to_ocaml_via_seed, + RustToOCamlViaSeed, + RustToOCamlViaSeed +); diff --git a/node/testing/tests/p2p_kad.rs b/node/testing/tests/p2p_kad.rs index bc29549435..44f3b5cb76 100644 --- a/node/testing/tests/p2p_kad.rs +++ b/node/testing/tests/p2p_kad.rs @@ -1,7 +1,5 @@ -use openmina_node_testing::scenarios::p2p::kademlia::{IncomingFindNode, KademliaBootstrap}; +use openmina_node_testing::scenarios::p2p::kademlia::KademliaBootstrap; mod common; -scenario_test!(incoming_find_node, IncomingFindNode, IncomingFindNode); - scenario_test!(kademlia_bootstrap, KademliaBootstrap, KademliaBootstrap);