diff --git a/Cargo.lock b/Cargo.lock index 3c5128bdef422..0280232818aa1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,6 +182,11 @@ name = "constant_time_eq" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "crossbeam" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "crossbeam" version = "0.3.2" @@ -949,6 +954,18 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "multiqueue" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "smallvec 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "net2" version = "0.2.31" @@ -993,6 +1010,11 @@ dependencies = [ "winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "owning_ref" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "owning_ref" version = "0.3.3" @@ -1021,6 +1043,16 @@ dependencies = [ "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "parking_lot" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "owning_ref 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot_core 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "thread-id 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "parking_lot" version = "0.4.8" @@ -1120,9 +1152,9 @@ dependencies = [ "hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-executor 0.1.0", - "polkadot-keystore 0.1.0", "polkadot-primitives 0.1.0", "polkadot-runtime 0.1.0", + "polkadot-service 0.1.0", "substrate-client 0.1.0", "substrate-codec 0.1.0", "substrate-executor 0.1.0", @@ -1157,8 +1189,12 @@ dependencies = [ "polkadot-statement-table 0.1.0", "polkadot-transaction-pool 0.1.0", "substrate-bft 0.1.0", + "substrate-client 0.1.0", "substrate-codec 0.1.0", + "substrate-keyring 0.1.0", + "substrate-network 0.1.0", "substrate-primitives 0.1.0", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1226,6 +1262,32 @@ dependencies = [ "substrate-runtime-support 0.1.0", ] +[[package]] +name = "polkadot-service" +version = "0.1.0" +dependencies = [ + "ed25519 0.1.0", + "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "polkadot-api 0.1.0", + "polkadot-consensus 0.1.0", + "polkadot-executor 0.1.0", + "polkadot-keystore 0.1.0", + "polkadot-primitives 0.1.0", + "polkadot-runtime 0.1.0", + "polkadot-transaction-pool 0.1.0", + "substrate-client 0.1.0", + "substrate-codec 0.1.0", + "substrate-executor 0.1.0", + "substrate-keyring 0.1.0", + "substrate-network 0.1.0", + "substrate-primitives 0.1.0", + "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "polkadot-statement-table" version = "0.1.0" @@ -1553,6 +1615,11 @@ name = "smallvec" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "smallvec" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "smallvec" version = "0.4.4" @@ -1615,6 +1682,7 @@ dependencies = [ "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "multiqueue 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0", "substrate-codec 0.1.0", @@ -1682,11 +1750,14 @@ name = "substrate-network" version = "0.1.0" dependencies = [ "bitflags 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "ed25519 0.1.0", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethcore-io 1.9.0 (git+https://github.com/paritytech/parity.git)", "ethcore-network 1.9.0 (git+https://github.com/paritytech/parity.git)", + "futures 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", + "multiqueue 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1882,6 +1953,16 @@ dependencies = [ "unicode-width 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "thread-id" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "thread_local" version = "0.3.5" @@ -2195,6 +2276,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum clap 2.29.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7b8f59bcebcfe4269b09f71dab0da15b355c75916a8f975d3876ce81561893ee" "checksum coco 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c06169f5beb7e31c7c67ebf5540b8b472d23e3eade3b2ec7d1f5b504a85f91bd" "checksum constant_time_eq 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8ff012e225ce166d4422e0e78419d901719760f62ae2b7969ca6b564d1b54a9e" +"checksum crossbeam 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)" = "bd66663db5a988098a89599d4857919b3acf7f61402e61365acfd3919857b9be" "checksum crossbeam 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "24ce9782d4d5c53674646a6a4c1863a21a8fc0cb649b3c94dfc16e45071dea19" "checksum crunchy 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "a2f4a431c5c9f662e1200b7c7f02c34e91361150e382089a8f2dec3ba680cbda" "checksum difference 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3304d19798a8e067e48d8e69b2c37f0b5e9b4e462504ad9e27e9f3fce02bba8" @@ -2266,15 +2348,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum mime 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e2e00e17be181010a91dbfefb01660b17311059dc8c7f48b9017677721e732bd" "checksum mio 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)" = "7da01a5e23070d92d99b1ecd1cd0af36447c6fd44b0fe283c2db199fa136724f" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +"checksum multiqueue 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4059673f3516669cbf7ebb448cb37171559ed22e6d8bc79cf0cf9394cf9e73fd" "checksum net2 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)" = "3a80f842784ef6c9a958b68b7516bc7e35883c614004dd94959a4dca1b716c09" "checksum nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "9a2228dca57108069a5262f2ed8bd2e82496d2e074a06d1ccc7ce1687b6ae0a2" "checksum num-traits 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)" = "9936036cc70fe4a8b2d338ab665900323290efb03983c86cbe235ae800ad8017" "checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30" "checksum odds 0.2.26 (registry+https://github.com/rust-lang/crates.io-index)" = "4eae0151b9dacf24fcc170d9995e511669a082856a91f958a2fe380bfab3fb22" "checksum ole32-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2c49021782e5233cd243168edfa8037574afed4eba4bbaf538b3d8d1789d8c" +"checksum owning_ref 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "9d52571ddcb42e9c900c901a18d8d67e393df723fcd51dd59c5b1a85d0acb6cc" "checksum owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37" "checksum parity-wasm 0.15.4 (registry+https://github.com/rust-lang/crates.io-index)" = "235801e9531998c4bb307f4ea6833c9f40a4cf132895219ac8c2cd25a9b310f7" "checksum parity-wordlist 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d0dec124478845b142f68b446cbee953d14d4b41f1bc0425024417720dce693" +"checksum parking_lot 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "fa12d706797d42551663426a45e2db2e0364bd1dbf6aeada87e89c5f981f43e9" "checksum parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "149d8f5b97f3c1133e3cfcd8886449959e856b557ff281e292b733d7c69e005e" "checksum parking_lot 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3e7f7c9857874e54afeb950eebeae662b1e51a2493666d2ea4c0a5d91dcf0412" "checksum parking_lot_core 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "9f35048d735bb93dd115a0030498785971aab3234d311fbe273d020084d26bd8" @@ -2319,6 +2404,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum slab 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fdeff4cd9ecff59ec7e3744cbca73dfe5ac35c2aedb2cfba8a1c715a18912e9d" "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" +"checksum smallvec 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8266519bc1d17d0b5b16f6c21295625d562841c708f6376f49028a43e9c11e" "checksum smallvec 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ee4f357e8cd37bf8822e1b964e96fd39e2cb5a0424f8aaa284ccaccc2162411c" "checksum smallvec 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44db0ecb22921ef790d17ae13a3f6d15784183ff5f2a01aa32098c7498d2b4b9" "checksum snappy 0.1.0 (git+https://github.com/paritytech/rust-snappy)" = "" @@ -2333,6 +2419,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum tempdir 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f73eebdb68c14bcb24aef74ea96079830e7fa7b31a6106e42ea7ee887c1e134e" "checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" "checksum textwrap 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c0b59b6b4b44d867f1370ef1bd91bfb262bf07bf0ae65c202ea2fbc16153b693" +"checksum thread-id 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2af4d6289a69a35c4d3aea737add39685f2784122c28119a7713165a63d68c9d" "checksum thread_local 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "279ef31c19ededf577bfd12dfae728040a21f635b06a24cd670ff510edd38963" "checksum time 0.1.39 (registry+https://github.com/rust-lang/crates.io-index)" = "a15375f1df02096fb3317256ce2cee6a1f42fc84ea5ad5fc8c421cfe40c73098" "checksum tiny-keccak 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3e9241752647ca572f12c9b520a5d360d9099360c527770647e694001646a1d0" diff --git a/Cargo.toml b/Cargo.toml index d51455bd543a3..b4d18443da82c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "polkadot/statement-table", "polkadot/transaction-pool", "polkadot/validator", + "polkadot/service", "substrate/bft", "substrate/client", "substrate/codec", diff --git a/demo/cli/src/lib.rs b/demo/cli/src/lib.rs index 8c0a3711abcb1..63ee1e3811506 100644 --- a/demo/cli/src/lib.rs +++ b/demo/cli/src/lib.rs @@ -41,6 +41,7 @@ extern crate log; pub mod error; +use std::sync::Arc; use codec::Slicable; use demo_runtime::genesismap::{additional_storage_with_genesis, GenesisConfig}; use client::genesis; @@ -98,7 +99,7 @@ pub fn run(args: I) -> error::Result<()> where storage.extend(additional_storage_with_genesis(&block)); (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) }; - let client = client::new_in_mem(executor, prepare_genesis)?; + let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?); let address = "127.0.0.1:9933".parse().unwrap(); let handler = rpc::rpc_handler(client); diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml index 37d920c3c00be..232c105f6f7bd 100644 --- a/polkadot/cli/Cargo.toml +++ b/polkadot/cli/Cargo.toml @@ -23,4 +23,4 @@ substrate-rpc-servers = { path = "../../substrate/rpc-servers" } polkadot-primitives = { path = "../primitives" } polkadot-executor = { path = "../executor" } polkadot-runtime = { path = "../runtime" } -polkadot-keystore = { path = "../keystore" } +polkadot-service = { path = "../service" } diff --git a/polkadot/cli/src/error.rs b/polkadot/cli/src/error.rs index 6c9e22cd55ad0..d7f6afca49b87 100644 --- a/polkadot/cli/src/error.rs +++ b/polkadot/cli/src/error.rs @@ -22,15 +22,11 @@ error_chain! { foreign_links { Io(::std::io::Error) #[doc="IO error"]; Cli(::clap::Error) #[doc="CLI error"]; + Service(::service::Error) #[doc="Polkadot service error"]; } links { Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"]; } errors { - /// Key store errors - Keystore(e: ::keystore::Error) { - description("Keystore error"), - display("Keystore error: {:?}", e), - } } } diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index c5e34af10a4ba..5c0492af01f0d 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -30,10 +30,8 @@ extern crate substrate_rpc_servers as rpc; extern crate polkadot_primitives; extern crate polkadot_executor; extern crate polkadot_runtime; -extern crate polkadot_keystore as keystore; +extern crate polkadot_service as service; -#[macro_use] -extern crate hex_literal; #[macro_use] extern crate clap; #[macro_use] @@ -45,11 +43,6 @@ pub mod error; use std::path::{Path, PathBuf}; -use codec::Slicable; -use polkadot_runtime::genesismap::{additional_storage_with_genesis, GenesisConfig}; -use client::genesis; -use keystore::Store as Keystore; - /// Parse command line arguments and start the node. /// /// IANA unassigned port ranges that we could use: @@ -69,52 +62,33 @@ pub fn run(args: I) -> error::Result<()> where let log_pattern = matches.value_of("log").unwrap_or(""); init_logger(log_pattern); - // Create client - let executor = polkadot_executor::Executor::new(); - let mut storage = Default::default(); - let god_key = hex!["3d866ec8a9190c8343c2fc593d21d8a6d0c5c4763aaab2349de3a6111d64d124"]; - - let genesis_config = GenesisConfig { - validators: vec![god_key.clone()], - authorities: vec![god_key.clone()], - balances: vec![(god_key.clone(), 1u64 << 63)].into_iter().collect(), - block_time: 5, // 5 second block time. - session_length: 720, // that's 1 hour per session. - sessions_per_era: 24, // 24 hours per era. - bonding_duration: 90, // 90 days per bond. - approval_ratio: 667, // 66.7% approvals required for legislation. - }; - - let prepare_genesis = || { - storage = genesis_config.genesis_map(); - let block = genesis::construct_genesis_block(&storage); - storage.extend(additional_storage_with_genesis(&block)); - (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) - }; + let mut config = service::Configuration::default(); - let keystore_path = matches.value_of("keystore") + config.keystore_path = matches.value_of("keystore") .map(|x| Path::new(x).to_owned()) - .unwrap_or_else(default_keystore_path); - - let _keystore = Keystore::open(keystore_path).map_err(::error::ErrorKind::Keystore)?; - let client = client::new_in_mem(executor, prepare_genesis)?; - - let address = "127.0.0.1:9933".parse().unwrap(); - let handler = rpc::rpc_handler(client); - let server = rpc::start_http(&address, handler)?; + .unwrap_or_else(default_keystore_path) + .to_string_lossy() + .into(); + let mut role = service::Role::FULL; if let Some(_) = matches.subcommand_matches("collator") { info!("Starting collator."); - server.wait(); - return Ok(()); + role = service::Role::COLLATOR; } - - if let Some(_) = matches.subcommand_matches("validator") { + else if let Some(_) = matches.subcommand_matches("validator") { info!("Starting validator."); - server.wait(); - return Ok(()); + role = service::Role::VALIDATOR; } + config.roles = role; + + let service = service::Service::new(config)?; + + let address = "127.0.0.1:9933".parse().unwrap(); + let handler = rpc::rpc_handler(service.client()); + let server = rpc::start_http(&address, handler)?; + + server.wait(); println!("No command given.\n"); let _ = clap::App::from_yaml(yaml).print_long_help(); diff --git a/polkadot/consensus/Cargo.toml b/polkadot/consensus/Cargo.toml index b7ccbc6534b04..df49b2a1cfdb8 100644 --- a/polkadot/consensus/Cargo.toml +++ b/polkadot/consensus/Cargo.toml @@ -18,3 +18,8 @@ polkadot-transaction-pool = { path = "../transaction-pool" } substrate-bft = { path = "../../substrate/bft" } substrate-codec = { path = "../../substrate/codec" } substrate-primitives = { path = "../../substrate/primitives" } +substrate-network = { path = "../../substrate/network" } + +tokio-core = "0.1.12" +substrate-keyring = { path = "../../substrate/keyring" } +substrate-client = { path = "../../substrate/client" } diff --git a/polkadot/consensus/src/error.rs b/polkadot/consensus/src/error.rs index 4d1387d17e1d4..38ba4ab60716f 100644 --- a/polkadot/consensus/src/error.rs +++ b/polkadot/consensus/src/error.rs @@ -48,6 +48,10 @@ error_chain! { ::MAX_TRANSACTIONS_SIZE, ::MAX_TRANSACTIONS_SIZE.saturating_sub(*size) ), } + Executor(e: ::futures::future::ExecuteErrorKind) { + description("Unable to dispatch agreement future"), + display("Unable to dispatch agreement future: {:?}", e), + } } } diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index a5c34e30ee406..8ee93bafa81ef 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -41,10 +41,14 @@ extern crate polkadot_transaction_pool as transaction_pool; extern crate substrate_bft as bft; extern crate substrate_codec as codec; extern crate substrate_primitives as primitives; +extern crate substrate_network; + +extern crate tokio_core; +extern crate substrate_keyring; +extern crate substrate_client as client; #[macro_use] extern crate error_chain; - #[macro_use] extern crate log; @@ -67,8 +71,10 @@ use futures::future; use parking_lot::Mutex; pub use self::error::{ErrorKind, Error}; +pub use service::Service; mod error; +mod service; // block size limit. const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024; @@ -83,7 +89,7 @@ pub trait TableRouter { type FetchExtrinsic: IntoFuture; /// Note local candidate data. - fn local_candidate_data(&self, block_data: BlockData, extrinsic: Extrinsic); + fn local_candidate_data(&self, hash: Hash, block_data: BlockData, extrinsic: Extrinsic); /// Fetch block data for a specific candidate. fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate; diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs new file mode 100644 index 0000000000000..288f7a7d839a5 --- /dev/null +++ b/polkadot/consensus/src/service.rs @@ -0,0 +1,233 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Consensus service. + +/// Consensus service. A long runnung service that manages BFT agreement and parachain +/// candidate agreement over the network. + +use std::thread; +use std::sync::Arc; +use futures::{future, Future, Stream, Sink, Async, Canceled}; +use parking_lot::Mutex; +use substrate_network as net; +use tokio_core::reactor; +use client::BlockchainEvents; +use substrate_keyring::Keyring; +use primitives::{Hash, AuthorityId}; +use primitives::block::{Id as BlockId, HeaderHash, Header}; +use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt}; +use polkadot_api::PolkadotApi; +use bft::{self, BftService}; +use transaction_pool::TransactionPool; +use ed25519; +use super::{TableRouter, SharedTable, ProposerFactory}; +use error::Error; + +struct BftSink { + network: Arc, + _e: ::std::marker::PhantomData, +} + +fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_hash: HeaderHash) -> Result { + Ok(match msg { + net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c { + net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({ + let proposal = bft::generic::LocalizedProposal { + round_number: proposal.round_number as usize, + proposal: proposal.proposal, + digest: proposal.digest, + sender: proposal.sender, + digest_signature: ed25519::LocalizedSignature { + signature: proposal.digest_signature, + signer: ed25519::Public(proposal.sender), + }, + full_signature: ed25519::LocalizedSignature { + signature: proposal.full_signature, + signer: ed25519::Public(proposal.sender), + } + }; + bft::check_proposal(authorities, &parent_hash, &proposal)?; + proposal + }), + net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({ + let vote = bft::generic::LocalizedVote { + sender: vote.sender, + signature: ed25519::LocalizedSignature { + signature: vote.signature, + signer: ed25519::Public(vote.sender), + }, + vote: match vote.vote { + net::ConsensusVote::Prepare(r, h) => bft::generic::Vote::Prepare(r as usize, h), + net::ConsensusVote::Commit(r, h) => bft::generic::Vote::Commit(r as usize, h), + net::ConsensusVote::AdvanceRound(r) => bft::generic::Vote::AdvanceRound(r as usize), + } + }; + bft::check_vote(authorities, &parent_hash, &vote)?; + vote + }), + }), + net::BftMessage::Auxiliary(a) => { + let justification = bft::UncheckedJustification::from(a); + // TODO: get proper error + let justification: Result<_, bft::Error> = bft::check_prepare_justification(authorities, parent_hash, justification) + .map_err(|_| bft::ErrorKind::InvalidJustification.into()); + bft::generic::Communication::Auxiliary(justification?) + }, + }) +} + +impl Sink for BftSink { + type SinkItem = bft::Communication; + // TODO: replace this with the ! type when that's stabilized + type SinkError = E; + + fn start_send(&mut self, message: bft::Communication) -> ::futures::StartSend { + let network_message = match message { + bft::generic::Communication::Consensus(c) => net::BftMessage::Consensus(match c { + bft::generic::LocalizedMessage::Propose(proposal) => net::SignedConsensusMessage::Propose(net::SignedConsensusProposal { + round_number: proposal.round_number as u32, + proposal: proposal.proposal, + digest: proposal.digest, + sender: proposal.sender, + digest_signature: proposal.digest_signature.signature, + full_signature: proposal.full_signature.signature, + }), + bft::generic::LocalizedMessage::Vote(vote) => net::SignedConsensusMessage::Vote(net::SignedConsensusVote { + sender: vote.sender, + signature: vote.signature.signature, + vote: match vote.vote { + bft::generic::Vote::Prepare(r, h) => net::ConsensusVote::Prepare(r as u32, h), + bft::generic::Vote::Commit(r, h) => net::ConsensusVote::Commit(r as u32, h), + bft::generic::Vote::AdvanceRound(r) => net::ConsensusVote::AdvanceRound(r as u32), + } + }), + }), + bft::generic::Communication::Auxiliary(justification) => net::BftMessage::Auxiliary(justification.uncheck().into()), + }; + self.network.send_bft_message(network_message); + Ok(::futures::AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> ::futures::Poll<(), E> { + Ok(Async::Ready(())) + } +} + +/// Consensus service. Starts working when created. +pub struct Service { + thread: Option>, +} + +struct Network(Arc); + +impl Service { + /// Create and start a new instance. + pub fn new(client: Arc, network: Arc, transaction_pool: Arc>, best_header: &Header) -> Service + where C: BlockchainEvents + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static + { + + let best_header = best_header.clone(); + let thread = thread::spawn(move || { + let mut core = reactor::Core::new().expect("tokio::Core could not be created"); + let key = Arc::new(Keyring::One.into()); + let factory = ProposerFactory { + client: client.clone(), + transaction_pool: transaction_pool.clone(), + network: Network(network.clone()), + }; + let bft_service = BftService::new(client.clone(), key, factory); + let build_bft = |header: &Header| -> Result<_, Error> { + let hash = header.hash(); + let authorities = client.authorities(&BlockId::Hash(hash))?; + let input = network.bft_messages() + .filter_map(move |message| { + process_message(message, &authorities, hash.clone()) + .map_err(|e| debug!("Message validation failed: {:?}", e)) + .ok() + }) + .map_err(|_| bft::InputStreamConcluded.into()); + let output = BftSink { network: network.clone(), _e: Default::default() }; + Ok(bft_service.build_upon(&header, input, output)?) + }; + // Kickstart BFT agreement on start. + if let Err(e) = build_bft(&best_header) + .map_err(|e| debug!("Error creating initial BFT agreement: {:?}", e)) + .and_then(|bft| core.run(bft)) + { + debug!("Error starting initial BFT agreement: {:?}", e); + } + let bft = client.import_notification_stream().and_then(|notification| { + build_bft(¬ification.header).map_err(|e| debug!("BFT agreement error: {:?}", e)) + }).for_each(|f| f); + if let Err(e) = core.run(bft) { + debug!("BFT event loop error {:?}", e); + } + }); + Service { + thread: Some(thread) + } + } +} + +impl Drop for Service { + fn drop(&mut self) { + if let Some(thread) = self.thread.take() { + thread.join().expect("The service thread has panicked"); + } + } +} + +impl super::Network for Network { + type TableRouter = Router; + fn table_router(&self, _table: Arc) -> Self::TableRouter { + Router { + network: self.0.clone() + } + } +} + +type FetchCandidateAdapter = future::Map) -> BlockData>; + +struct Router { + network: Arc, +} + +impl Router { + fn fetch_candidate_adapter(data: Vec) -> BlockData { + BlockData(data) + } +} + +impl TableRouter for Router { + type Error = Canceled; + type FetchCandidate = FetchCandidateAdapter; + type FetchExtrinsic = future::FutureResult; + + fn local_candidate_data(&self, hash: Hash, block_data: BlockData, _extrinsic: Extrinsic) { + let data = block_data.0; + self.network.set_local_candidate(Some((hash, data))) + } + + fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate { + let hash = candidate.hash(); + self.network.fetch_candidate(&hash).map(Self::fetch_candidate_adapter) + } + + fn fetch_extrinsic_data(&self, _candidate: &CandidateReceipt) -> Self::FetchExtrinsic { + future::ok(Extrinsic) + } +} diff --git a/polkadot/service/Cargo.toml b/polkadot/service/Cargo.toml new file mode 100644 index 0000000000000..a5361e59e2034 --- /dev/null +++ b/polkadot/service/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "polkadot-service" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +futures = "0.1.17" +parking_lot = "0.4" +tokio-timer = "0.1.2" +ed25519 = { path = "../../substrate/ed25519" } +error-chain = "0.11" +log = "0.4" +tokio-core = "0.1.12" +polkadot-primitives = { path = "../primitives" } +polkadot-runtime = { path = "../runtime" } +polkadot-consensus = { path = "../consensus" } +polkadot-executor = { path = "../executor" } +polkadot-api = { path = "../api" } +polkadot-transaction-pool = { path = "../transaction-pool" } +polkadot-keystore = { path = "../keystore" } +substrate-primitives = { path = "../../substrate/primitives" } +substrate-network = { path = "../../substrate/network" } +substrate-client = { path = "../../substrate/client" } +substrate-keyring = { path = "../../substrate/keyring" } +substrate-codec = { path = "../../substrate/codec" } +substrate-executor = { path = "../../substrate/executor" } diff --git a/polkadot/service/src/config.rs b/polkadot/service/src/config.rs new file mode 100644 index 0000000000000..6b87d0e49bacf --- /dev/null +++ b/polkadot/service/src/config.rs @@ -0,0 +1,46 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +//! Service configuration. + +use transaction_pool; +pub use network::Role; +pub use network::NetworkConfiguration; + +/// Service configuration. +pub struct Configuration { + /// Node roles. + pub roles: Role, + /// Transaction pool configuration. + pub transaction_pool: transaction_pool::Options, + /// Network configuration. + pub network: NetworkConfiguration, + /// Path to key files. + pub keystore_path: String, + // TODO: add more network, client, tx pool configuration options +} + +impl Default for Configuration { + fn default() -> Configuration { + Configuration { + roles: Role::FULL, + transaction_pool: Default::default(), + network: Default::default(), + keystore_path: Default::default(), + } + } +} + diff --git a/polkadot/service/src/error.rs b/polkadot/service/src/error.rs new file mode 100644 index 0000000000000..58bd8d633bcf0 --- /dev/null +++ b/polkadot/service/src/error.rs @@ -0,0 +1,35 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Errors that can occur during the service operation. + +use client; +use network; + +error_chain! { + links { + Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"]; + Network(network::error::Error, network::error::ErrorKind) #[doc="Network error"]; + } + + errors { + /// Key store errors + Keystore(e: ::keystore::Error) { + description("Keystore error"), + display("Keystore error: {:?}", e), + } + } +} diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs new file mode 100644 index 0000000000000..6bf972878d208 --- /dev/null +++ b/polkadot/service/src/lib.rs @@ -0,0 +1,216 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Polkadot service. Starts a thread that spins the network, the client and the transaction pool. +//! Manages communication between them. + +extern crate futures; +extern crate ed25519; +extern crate parking_lot; +extern crate tokio_timer; +extern crate polkadot_primitives; +extern crate polkadot_runtime; +extern crate polkadot_executor; +extern crate polkadot_api; +extern crate polkadot_consensus as consensus; +extern crate polkadot_transaction_pool as transaction_pool; +extern crate polkadot_keystore as keystore; +extern crate substrate_primitives as primitives; +extern crate substrate_network as network; +extern crate substrate_codec as codec; +extern crate substrate_executor; + +extern crate tokio_core; +extern crate substrate_keyring; +extern crate substrate_client as client; + +#[macro_use] +extern crate error_chain; +#[macro_use] +extern crate log; + +mod error; +mod config; + +use std::sync::Arc; +use std::thread; +use futures::prelude::*; +use parking_lot::Mutex; +use tokio_core::reactor::Core; +use codec::Slicable; +use primitives::block::{Id as BlockId, TransactionHash}; +use transaction_pool::TransactionPool; +use substrate_keyring::Keyring; +use substrate_executor::NativeExecutor; +use polkadot_executor::Executor as LocalDispatch; +use polkadot_primitives::AccountId; +use keystore::Store as Keystore; +use polkadot_api::PolkadotApi; +use polkadot_runtime::genesismap::{additional_storage_with_genesis, GenesisConfig}; +use client::{genesis, BlockchainEvents}; +use client::in_mem::Backend as InMemory; +use network::ManageNetwork; + +pub use self::error::{ErrorKind, Error}; +pub use config::{Configuration, Role}; + +type Client = client::Client>; + + +/// Polkadot service. +pub struct Service { + thread: Option>, + client: Arc, + network: Arc, + _consensus: Option, +} + +struct TransactionPoolAdapter { + pool: Arc>, + client: Arc, +} + +impl network::TransactionPool for TransactionPoolAdapter { + fn transactions(&self) -> Vec<(TransactionHash, Vec)> { + let best_block = match self.client.info() { + Ok(info) => info.chain.best_hash, + Err(e) => { + debug!("Error getting best block: {:?}", e); + return Vec::new(); + } + }; + let id = self.client.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed."); + let ready = transaction_pool::Ready::create(id, &*self.client); + self.pool.lock().pending(ready).map(|t| { + let hash = ::primitives::Hash::from(&t.hash()[..]); + let tx = codec::Slicable::encode(t.as_transaction()); + (hash, tx) + }).collect() + } + + fn import(&self, transaction: &[u8]) -> Option { + if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) { + match self.pool.lock().import(tx) { + Ok(t) => Some(t.hash()[..].into()), + Err(e) => match *e.kind() { + transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()), + _ => { + debug!("Error adding transaction to the pool: {:?}", e); + None + }, + } + } + } else { + debug!("Error decoding transaction"); + None + } + } +} + +impl Service { + /// Creates and register protocol with the network service + pub fn new(config: Configuration) -> Result { + // Create client + let executor = polkadot_executor::Executor::new(); + let mut storage = Default::default(); + let key: AccountId = Keyring::One.into(); + + let genesis_config = GenesisConfig { + validators: vec![key.clone()], + authorities: vec![key.clone()], + balances: vec![(Keyring::One.into(), 1u64 << 63), (Keyring::Two.into(), 1u64 << 63)].into_iter().collect(), + block_time: 5, // 5 second block time. + session_length: 720, // that's 1 hour per session. + sessions_per_era: 24, // 24 hours per era. + bonding_duration: 90, // 90 days per bond. + approval_ratio: 667, // 66.7% approvals required for legislation. + }; + let prepare_genesis = || { + storage = genesis_config.genesis_map(); + let block = genesis::construct_genesis_block(&storage); + storage.extend(additional_storage_with_genesis(&block)); + (primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect()) + }; + + let _keystore = Keystore::open(config.keystore_path.into()).map_err(::error::ErrorKind::Keystore)?; + let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?); + let best_header = client.header(&BlockId::Hash(client.info()?.chain.best_hash))?.expect("Best header always exists; qed"); + info!("Starting Polkadot. Best block is #{}", best_header.number); + let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool))); + let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { + pool: transaction_pool.clone(), + client: client.clone(), + }); + let network_params = network::Params { + config: network::ProtocolConfig { + roles: config.roles, + }, + network_config: config.network, + chain: client.clone(), + transaction_pool: transaction_pool_adapter, + }; + let network = network::Service::new(network_params)?; + + // Spin consensus service if configured + let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR { + Some(consensus::Service::new(client.clone(), network.clone(), transaction_pool, &best_header)) + } else { + None + }; + + let thread_client = client.clone(); + let thread_network = network.clone(); + let thread = thread::spawn(move || { + thread_network.start_network(); + let mut core = Core::new().expect("tokio::Core could not be created"); + let events = thread_client.import_notification_stream().for_each(|notification| { + thread_network.on_block_imported(¬ification.header); + Ok(()) + }); + if let Err(e) = core.run(events) { + debug!("Polkadot service event loop shutdown with {:?}", e); + } + debug!("Polkadot service shutdown"); + }); + Ok(Service { + thread: Some(thread), + client: client.clone(), + network: network.clone(), + _consensus: consensus_service, + }) + } + + /// Get shared client instance. + pub fn client(&self) -> Arc { + self.client.clone() + } + + /// Get shared network instance. + pub fn network(&self) -> Arc { + self.network.clone() + } +} + +impl Drop for Service { + fn drop(&mut self) { + self.client.stop_notifications(); + self.network.stop_network(); + if let Some(thread) = self.thread.take() { + thread.join().expect("The service thread has panicked"); + } + } +} + diff --git a/polkadot/transaction-pool/src/lib.rs b/polkadot/transaction-pool/src/lib.rs index 4ab327bd18ade..3864852f8c1fe 100644 --- a/polkadot/transaction-pool/src/lib.rs +++ b/polkadot/transaction-pool/src/lib.rs @@ -60,6 +60,11 @@ error_chain! { description("Transaction had bad signature."), display("Transaction had bad signature."), } + /// Attempted to queue a transaction that is already in the pool. + AlreadyImported(hash: TransactionHash) { + description("Transaction is already in the pool."), + display("Transaction {:?} is already in the pool.", hash), + } /// Import error. Import(err: Box<::std::error::Error + Send>) { description("Error importing transaction"), @@ -257,8 +262,15 @@ impl TransactionPool { let verified = VerifiedTransaction::create(tx, insertion_index)?; // TODO: just use a foreign link when the error type is made public. + let hash = verified.hash.clone(); self.inner.import(verified) - .map_err(|e| ErrorKind::Import(Box::new(e))) + .map_err(|e| + match e { + // TODO: make error types public in transaction_pool. For now just treat all errors as AlradyImported + _ => ErrorKind::AlreadyImported(hash), + // transaction_pool::error::AlreadyImported(h) => ErrorKind::AlreadyImported(h), + // e => ErrorKind::Import(Box::new(e)), + }) .map_err(Into::into) } diff --git a/substrate/bft/src/error.rs b/substrate/bft/src/error.rs index d248b217ff556..22baeece09b00 100644 --- a/substrate/bft/src/error.rs +++ b/substrate/bft/src/error.rs @@ -42,10 +42,22 @@ error_chain! { display("Unable to create block proposal."), } - /// Error dispatching the agreement future onto the executor. - Executor(e: ::futures::future::ExecuteErrorKind) { - description("Unable to dispatch agreement future"), - display("Unable to dispatch agreement future: {:?}", e), + /// Error checking signature + InvalidSignature(s: ::ed25519::Signature, a: ::primitives::AuthorityId) { + description("Message signature is invalid"), + display("Message signature {:?} by {:?} is invalid.", s, a), + } + + /// Account is not an authority. + InvalidAuthority(a: ::primitives::AuthorityId) { + description("Message sender is not a valid authority"), + display("Message sender {:?} is not a valid authority.", a), + } + + /// Justification requirements not met. + InvalidJustification { + description("Invalid justification"), + display("Invalid justification."), } /// Some other error. diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index f9d870799d38c..110d14ee1b329 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -41,8 +41,7 @@ use primitives::bft::{Message as PrimitiveMessage, Action as PrimitiveAction, Ju use primitives::block::{Block, Id as BlockId, Header, HeaderHash}; use primitives::AuthorityId; -use futures::{stream, task, Async, Sink, Future, IntoFuture}; -use futures::future::Executor; +use futures::{task, Async, Stream, Sink, Future, IntoFuture}; use futures::sync::oneshot; use tokio_timer::Timer; use parking_lot::Mutex; @@ -219,34 +218,25 @@ impl generic::Context for BftInstance

{ } } -type Input = stream::Empty; - -// "black hole" output sink. -struct Output(::std::marker::PhantomData); - -impl Sink for Output { - type SinkItem = Communication; - type SinkError = E; - - fn start_send(&mut self, _item: Communication) -> ::futures::StartSend { - Ok(::futures::AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> ::futures::Poll<(), E> { - Ok(Async::Ready(())) - } -} - /// A future that resolves either when canceled (witnessing a block from the network at same height) /// or when agreement completes. -pub struct BftFuture { - inner: generic::Agreement, Input, Output>, +pub struct BftFuture where + P: Proposer, + InStream: Stream, + OutSink: Sink, +{ + inner: generic::Agreement, InStream, OutSink>, cancel: Arc, send_task: Option>, import: Arc, } -impl Future for BftFuture { +impl Future for BftFuture where + P: Proposer, + I: BlockImport, + InStream: Stream, + OutSink: Sink, +{ type Item = (); type Error = (); @@ -274,7 +264,11 @@ impl Future for BftFuture { } } -impl Drop for BftFuture { +impl Drop for BftFuture where + P: Proposer, + InStream: Stream, + OutSink: Sink, +{ fn drop(&mut self) { // TODO: have a trait member to pass misbehavior reports into. let misbehavior = self.inner.drain_misbehavior().collect::>(); @@ -304,9 +298,8 @@ impl Drop for AgreementHandle { /// The BftService kicks off the agreement process on top of any blocks it /// is notified of. -pub struct BftService { +pub struct BftService { client: Arc, - executor: E, live_agreements: Mutex>, timer: Timer, round_timeout_multiplier: u64, @@ -314,17 +307,33 @@ pub struct BftService { factory: P, } -impl BftService +impl BftService where P: ProposerFactory, - E: Executor>, I: BlockImport + Authorities, { + + /// Create a new service instance. + pub fn new(client: Arc, key: Arc, factory: P) -> BftService { + BftService { + client: client, + live_agreements: Mutex::new(HashMap::new()), + timer: Timer::default(), + round_timeout_multiplier: 4, + key: key, // TODO: key changing over time. + factory: factory, + } + } + /// Signal that a valid block with the given header has been imported. /// /// If the local signing key is an authority, this will begin the consensus process to build a /// block on top of it. If the executor fails to run the future, an error will be returned. - pub fn build_upon(&self, header: &Header) -> Result<(), P::Error> { + pub fn build_upon(&self, header: &Header, input: InStream, output: OutSink) + -> Result::Proposer, I, InStream, OutSink>, P::Error> where + InStream: Stream::Proposer as Proposer>::Error>, + OutSink: Sink::Proposer as Proposer>::Error>, + { let hash = header.hash(); let mut _preempted_consensus = None; // defers drop of live to the end. @@ -337,7 +346,7 @@ impl BftService if !authorities.contains(&local_id) { self.live_agreements.lock().remove(&header.parent_hash); - return Ok(()) + Err(From::from(ErrorKind::InvalidAuthority(local_id)))?; } let proposer = self.factory.init(header, &authorities, self.key.clone())?; @@ -355,25 +364,18 @@ impl BftService bft_instance, n, max_faulty, - stream::empty(), - Output(Default::default()), + input, + output, ); let cancel = Arc::new(AtomicBool::new(false)); let (tx, rx) = oneshot::channel(); - self.executor.execute(BftFuture { - inner: agreement, - cancel: cancel.clone(), - send_task: Some(tx), - import: self.client.clone(), - }).map_err(|e| e.kind()).map_err(ErrorKind::Executor).map_err(Error::from)?; - { let mut live = self.live_agreements.lock(); live.insert(hash, AgreementHandle { task: Some(rx), - cancel, + cancel: cancel.clone(), }); // cancel any agreements attempted to build upon this block's parent @@ -381,7 +383,12 @@ impl BftService _preempted_consensus = live.remove(&header.parent_hash); } - Ok(()) + Ok(BftFuture { + inner: agreement, + cancel: cancel, + send_task: Some(tx), + import: self.client.clone(), + }) } } @@ -391,9 +398,16 @@ pub fn max_faulty_of(n: usize) -> usize { n.saturating_sub(1) / 3 } +/// Given a total number of authorities, yield the minimum required signatures. +/// This will always be over 2/3. +pub fn bft_threshold(n: usize) -> usize { + n - max_faulty_of(n) +} + fn check_justification_signed_message(authorities: &[AuthorityId], message: &[u8], just: UncheckedJustification) -> Result { + // TODO: return additional error information. just.check(authorities.len() - max_faulty_of(authorities.len()), |_, _, sig| { let auth_id = sig.signer.0; if !authorities.contains(&auth_id) { return None } @@ -436,6 +450,58 @@ pub fn check_prepare_justification(authorities: &[AuthorityId], parent: HeaderHa check_justification_signed_message(authorities, &message[..], just) } +/// Check proposal message signatures and authority. +/// Provide all valid authorities. +pub fn check_proposal( + authorities: &[AuthorityId], + parent_hash: &HeaderHash, + propose: &::generic::LocalizedProposal) + -> Result<(), Error> +{ + if !authorities.contains(&propose.sender) { + return Err(ErrorKind::InvalidAuthority(propose.sender.into()).into()); + } + + let action_header = PrimitiveAction::ProposeHeader(propose.round_number as u32, propose.digest.clone()); + let action_propose = PrimitiveAction::Propose(propose.round_number as u32, propose.proposal.clone()); + check_action(action_header, parent_hash, &propose.digest_signature)?; + check_action(action_propose, parent_hash, &propose.full_signature) +} + +/// Check vote message signatures and authority. +/// Provide all valid authorities. +pub fn check_vote( + authorities: &[AuthorityId], + parent_hash: &HeaderHash, + vote: &::generic::LocalizedVote) + -> Result<(), Error> +{ + if !authorities.contains(&vote.sender) { + return Err(ErrorKind::InvalidAuthority(vote.sender.into()).into()); + } + + let action = match vote.vote { + ::generic::Vote::Prepare(r, h) => PrimitiveAction::Prepare(r as u32, h), + ::generic::Vote::Commit(r, h) => PrimitiveAction::Commit(r as u32, h), + ::generic::Vote::AdvanceRound(r) => PrimitiveAction::AdvanceRound(r as u32), + }; + check_action(action, parent_hash, &vote.signature) +} + +fn check_action(action: PrimitiveAction, parent_hash: &HeaderHash, sig: &LocalizedSignature) -> Result<(), Error> { + let primitive = PrimitiveMessage { + parent: parent_hash.clone(), + action, + }; + + let message = Slicable::encode(&primitive); + if ed25519::verify_strong(&sig.signature, &message, &sig.signer) { + Ok(()) + } else { + Err(ErrorKind::InvalidSignature(sig.signature.into(), sig.signer.clone().into()).into()) + } +} + /// Sign a BFT message with the given key. pub fn sign_message(message: Message, key: &ed25519::Pair, parent_hash: HeaderHash) -> LocalizedMessage { let signer = key.public(); @@ -489,8 +555,10 @@ mod tests { use super::*; use std::collections::HashSet; use primitives::block; - use self::tokio_core::reactor::{Core, Handle}; + use self::tokio_core::reactor::{Core}; use self::keyring::Keyring; + use futures::stream; + use futures::future::Executor; extern crate substrate_keyring as keyring; extern crate tokio_core; @@ -512,6 +580,22 @@ mod tests { } } + // "black hole" output sink. + struct Output(::std::marker::PhantomData); + + impl Sink for Output { + type SinkItem = Communication; + type SinkError = E; + + fn start_send(&mut self, _item: Communication) -> ::futures::StartSend { + Ok(::futures::AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> ::futures::Poll<(), E> { + Ok(Async::Ready(())) + } + } + struct DummyFactory; struct DummyProposer(block::Number); @@ -543,12 +627,11 @@ mod tests { fn import_misbehavior(&self, _misbehavior: Vec<(AuthorityId, Misbehavior)>) {} } - fn make_service(client: FakeClient, handle: Handle) - -> BftService + fn make_service(client: FakeClient) + -> BftService { BftService { client: Arc::new(client), - executor: handle, live_agreements: Mutex::new(HashMap::new()), timer: Timer::default(), round_timeout_multiplier: 4, @@ -578,7 +661,7 @@ mod tests { let mut core = Core::new().unwrap(); - let service = make_service(client, core.handle()); + let service = make_service(client); let first = Header::from_block_number(2); let first_hash = first.hash(); @@ -587,16 +670,18 @@ mod tests { second.parent_hash = first_hash; let second_hash = second.hash(); - service.build_upon(&first).unwrap(); + let bft = service.build_upon(&first, stream::empty(), Output(Default::default())).unwrap(); assert!(service.live_agreements.lock().contains_key(&first_hash)); // turn the core so the future gets polled and sends its task to the // service. otherwise it deadlocks. + core.handle().execute(bft).unwrap(); core.turn(Some(::std::time::Duration::from_millis(100))); - service.build_upon(&second).unwrap(); + let bft = service.build_upon(&second, stream::empty(), Output(Default::default())).unwrap(); assert!(!service.live_agreements.lock().contains_key(&first_hash)); assert!(service.live_agreements.lock().contains_key(&second_hash)); + core.handle().execute(bft).unwrap(); core.turn(Some(::std::time::Duration::from_millis(100))); } @@ -671,4 +756,69 @@ mod tests { assert!(check_justification(&authorities, parent_hash, unchecked).is_err()); } + + #[test] + fn propose_check_works() { + let parent_hash = Default::default(); + + let authorities = vec![ + Keyring::Alice.to_raw_public(), + Keyring::Eve.to_raw_public(), + ]; + + let block = Block { + header: Header::from_block_number(1), + transactions: Default::default() + }; + + let proposal = sign_message(::generic::Message::Propose(1, block.clone()), &Keyring::Alice.pair(), parent_hash);; + if let ::generic::LocalizedMessage::Propose(proposal) = proposal { + assert!(check_proposal(&authorities, &parent_hash, &proposal).is_ok()); + let mut invalid_round = proposal.clone(); + invalid_round.round_number = 0; + assert!(check_proposal(&authorities, &parent_hash, &invalid_round).is_err()); + let mut invalid_digest = proposal.clone(); + invalid_digest.digest = [0xfe; 32].into(); + assert!(check_proposal(&authorities, &parent_hash, &invalid_digest).is_err()); + } else { + assert!(false); + } + + // Not an authority + let proposal = sign_message(::generic::Message::Propose(1, block), &Keyring::Bob.pair(), parent_hash);; + if let ::generic::LocalizedMessage::Propose(proposal) = proposal { + assert!(check_proposal(&authorities, &parent_hash, &proposal).is_err()); + } else { + assert!(false); + } + } + + #[test] + fn vote_check_works() { + let parent_hash = Default::default(); + let hash = [0xff; 32].into(); + + let authorities = vec![ + Keyring::Alice.to_raw_public(), + Keyring::Eve.to_raw_public(), + ]; + + let vote = sign_message(::generic::Message::Vote(::generic::Vote::Prepare(1, hash)), &Keyring::Alice.pair(), parent_hash);; + if let ::generic::LocalizedMessage::Vote(vote) = vote { + assert!(check_vote(&authorities, &parent_hash, &vote).is_ok()); + let mut invalid_sender = vote.clone(); + invalid_sender.signature.signer = Keyring::Eve.into(); + assert!(check_vote(&authorities, &parent_hash, &invalid_sender).is_err()); + } else { + assert!(false); + } + + // Not an authority + let vote = sign_message(::generic::Message::Vote(::generic::Vote::Prepare(1, hash)), &Keyring::Bob.pair(), parent_hash);; + if let ::generic::LocalizedMessage::Vote(vote) = vote { + assert!(check_vote(&authorities, &parent_hash, &vote).is_err()); + } else { + assert!(false); + } + } } diff --git a/substrate/client/Cargo.toml b/substrate/client/Cargo.toml index 879e05e3c9442..e398af6743ab1 100644 --- a/substrate/client/Cargo.toml +++ b/substrate/client/Cargo.toml @@ -9,6 +9,7 @@ log = "0.3" parking_lot = "0.4" triehash = "0.1" hex-literal = "0.1" +multiqueue = "0.3" ed25519 = { path = "../ed25519" } substrate-bft = { path = "../bft" } substrate-codec = { path = "../codec" } diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 402a67ef2a776..ecdb3690621c1 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -16,6 +16,8 @@ //! Substrate Client +use multiqueue; +use parking_lot::Mutex; use primitives::{self, block, AuthorityId}; use primitives::block::Id as BlockId; use primitives::storage::{StorageKey, StorageData}; @@ -27,11 +29,24 @@ use backend::{self, BlockImportOperation}; use blockchain::{self, Info as ChainInfo, Backend as ChainBackend}; use {error, in_mem, block_builder, runtime_io, bft}; +/// Type that implements `futures::Stream` of block import events. +pub type BlockchainEventStream = multiqueue::BroadcastFutReceiver; + +//TODO: The queue is preallocated in multiqueue. Make it unbounded +const NOTIFICATION_QUEUE_SIZE: u64 = 1 << 16; + /// Polkadot Client -#[derive(Debug)] pub struct Client where B: backend::Backend { backend: B, executor: E, + import_notification_sink: Mutex>, + import_notification_stream: Mutex>, +} + +/// A source of blockchain evenets. +pub trait BlockchainEvents { + /// Get block import event stream. + fn import_notification_stream(&self) -> BlockchainEventStream; } /// Client info @@ -82,6 +97,34 @@ pub enum BlockStatus { Unknown, } +/// Block data origin. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum BlockOrigin { + /// Genesis block built into the client. + Genesis, + /// Block is part of the initial sync with the network. + NetworkInitialSync, + /// Block was broadcasted on the network. + NetworkBroadcast, + /// Block that was received from the network and validated in the consensus process. + ConsensusBroadcast, + /// Block that was collated by this node. + Own, + /// Block was imported from a file. + File, +} + +/// Summary of an imported block +#[derive(Clone, Debug)] +pub struct BlockImportNotification { + /// Imported block origin. + pub origin: BlockOrigin, + /// Imported block header. + pub header: block::Header, + /// Is this the new best block. + pub is_new_best: bool, +} + /// A header paired with a justification which has already been checked. #[derive(Debug, PartialEq, Eq, Clone)] pub struct JustifiedHeader { @@ -122,6 +165,7 @@ impl Client where where F: FnOnce() -> (block::Header, Vec<(Vec, Vec)>) { + let (sink, stream) = multiqueue::broadcast_fut_queue(NOTIFICATION_QUEUE_SIZE); if backend.blockchain().header(BlockId::Number(0))?.is_none() { trace!("Empty database, writing genesis block"); let (genesis_header, genesis_store) = build_genesis(); @@ -133,6 +177,8 @@ impl Client where Ok(Client { backend, executor, + import_notification_sink: Mutex::new(sink), + import_notification_stream: Mutex::new(stream), }) } @@ -164,6 +210,13 @@ impl Client where self.executor.clone() } + /// Close notification streams. + pub fn stop_notifications(&self) { + let (sink, stream) = multiqueue::broadcast_fut_queue(NOTIFICATION_QUEUE_SIZE); + *self.import_notification_sink.lock() = sink; + *self.import_notification_stream.lock() = stream; + } + /// Get the current set of authorities from storage. pub fn authorities_at(&self, id: &BlockId) -> error::Result> { let state = self.state_at(id)?; @@ -236,6 +289,7 @@ impl Client where /// Queue a block for import. pub fn import_block( &self, + origin: BlockOrigin, header: JustifiedHeader, body: Option, ) -> error::Result { @@ -261,9 +315,21 @@ impl Client where let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1; trace!("Imported {}, (#{}), best={}", block::HeaderHash::from(header.blake2_256()), header.number, is_new_best); - transaction.set_block_data(header, body, Some(justification.uncheck().into()), is_new_best)?; + transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?; transaction.set_storage(overlay.drain())?; self.backend.commit_operation(transaction)?; + + if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast { + let notification = BlockImportNotification { + origin: origin, + header: header, + is_new_best: is_new_best, + }; + if let Err(e) = self.import_notification_sink.lock().try_send(notification) { + warn!("Error queueing block import notification: {:?}", e); + } + } + Ok(ImportResult::Queued) } @@ -335,7 +401,7 @@ impl bft::BlockImport for Client justification, }; - let _ = self.import_block(justified_header, Some(block.transactions)); + let _ = self.import_block(BlockOrigin::Genesis, justified_header, Some(block.transactions)); } } @@ -350,6 +416,18 @@ impl bft::Authorities for Client } } +impl BlockchainEvents for Client + where + B: backend::Backend, + E: state_machine::CodeExecutor, + error::Error: From<::Error> +{ + /// Get block import event stream. + fn import_notification_stream(&self) -> BlockchainEventStream { + self.import_notification_stream.lock().add_stream() + } +} + #[cfg(test)] mod tests { use super::*; @@ -455,7 +533,7 @@ mod tests { let justification = justify(&block.header); let justified = client.check_justification(block.header, justification).unwrap(); - client.import_block(justified, Some(block.transactions)).unwrap(); + client.import_block(BlockOrigin::Own, justified, Some(block.transactions)).unwrap(); assert_eq!(client.info().unwrap().chain.best_number, 1); assert_eq!(client.using_environment(|| test_runtime::system::latest_block_hash()).unwrap(), client.block_hash(1).unwrap().unwrap()); @@ -499,7 +577,7 @@ mod tests { let justification = justify(&block.header); let justified = client.check_justification(block.header, justification).unwrap(); - client.import_block(justified, Some(block.transactions)).unwrap(); + client.import_block(BlockOrigin::Own, justified, Some(block.transactions)).unwrap(); assert_eq!(client.info().unwrap().chain.best_number, 1); assert!(client.state_at(&BlockId::Number(1)).unwrap() != client.state_at(&BlockId::Number(0)).unwrap()); diff --git a/substrate/client/src/lib.rs b/substrate/client/src/lib.rs index 6054808f9b433..a5483b8c1398d 100644 --- a/substrate/client/src/lib.rs +++ b/substrate/client/src/lib.rs @@ -31,6 +31,7 @@ extern crate ed25519; extern crate triehash; extern crate parking_lot; +extern crate multiqueue; #[cfg(test)] #[macro_use] extern crate hex_literal; #[macro_use] extern crate error_chain; #[macro_use] extern crate log; @@ -43,5 +44,6 @@ pub mod genesis; pub mod block_builder; mod client; -pub use client::{Client, ClientInfo, CallResult, ImportResult, BlockStatus, new_in_mem}; +pub use client::{Client, ClientInfo, CallResult, ImportResult, + BlockStatus, BlockOrigin, new_in_mem, BlockchainEventStream, BlockchainEvents}; pub use blockchain::Info as ChainInfo; diff --git a/substrate/network/Cargo.toml b/substrate/network/Cargo.toml index 2c07934052605..fec62305871ec 100644 --- a/substrate/network/Cargo.toml +++ b/substrate/network/Cargo.toml @@ -16,13 +16,17 @@ bitflags = "1.0" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" +futures = "0.1.17" +multiqueue = "0.3" ethcore-network = { git = "https://github.com/paritytech/parity.git" } ethcore-io = { git = "https://github.com/paritytech/parity.git" } +ed25519 = { path = "../../substrate/ed25519" } substrate-primitives = { path = "../../substrate/primitives" } substrate-client = { path = "../../substrate/client" } substrate-state-machine = { path = "../../substrate/state-machine" } substrate-serializer = { path = "../../substrate/serializer" } substrate-runtime-support = { path = "../../substrate/runtime-support" } +substrate-bft = { path = "../../substrate/bft" } [dev-dependencies] substrate-test-runtime = { path = "../test-runtime" } diff --git a/substrate/network/src/chain.rs b/substrate/network/src/chain.rs index 1a51797f10b37..10d7a0c7d5a79 100644 --- a/substrate/network/src/chain.rs +++ b/substrate/network/src/chain.rs @@ -16,15 +16,15 @@ //! Blockchain access trait -use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus}; +use client::{self, Client as PolkadotClient, ImportResult, ClientInfo, BlockStatus, BlockOrigin}; use client::error::Error; use state_machine; use primitives::block::{self, Id as BlockId}; use primitives::bft::Justification; pub trait Client: Send + Sync { - /// Given a hash return a header - fn import(&self, header: block::Header, justification: Justification, body: Option) -> Result; + /// Import a new block. Parent is supposed to be existing in the blockchain. + fn import(&self, is_best: bool, header: block::Header, justification: Justification, body: Option) -> Result; /// Get blockchain info. fn info(&self) -> Result; @@ -50,10 +50,11 @@ impl Client for PolkadotClient where E: state_machine::CodeExecutor + Send + Sync + 'static, Error: From<<::State as state_machine::backend::Backend>::Error>, { - fn import(&self, header: block::Header, justification: Justification, body: Option) -> Result { + fn import(&self, is_best: bool, header: block::Header, justification: Justification, body: Option) -> Result { // TODO: defer justification check. let justified_header = self.check_justification(header, justification.into())?; - (self as &PolkadotClient).import_block(justified_header, body) + let origin = if is_best { BlockOrigin::NetworkBroadcast } else { BlockOrigin::NetworkInitialSync }; + (self as &PolkadotClient).import_block(origin, justified_header, body) } fn info(&self) -> Result { diff --git a/substrate/network/src/config.rs b/substrate/network/src/config.rs index e269d3cd511ff..7e21a5ded3b02 100644 --- a/substrate/network/src/config.rs +++ b/substrate/network/src/config.rs @@ -14,11 +14,12 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see .? -use service::Role; +pub use service::Role; /// Protocol configuration #[derive(Clone)] pub struct ProtocolConfig { + /// Assigned roles. pub roles: Role, } diff --git a/substrate/network/src/consensus.rs b/substrate/network/src/consensus.rs new file mode 100644 index 0000000000000..eb2a1e51a40e1 --- /dev/null +++ b/substrate/network/src/consensus.rs @@ -0,0 +1,203 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see .? + +//! Consensus related bits of the network service. + +use std::collections::HashMap; +use multiqueue; +use futures::sync::oneshot; +use io::SyncIo; +use protocol::Protocol; +use network::PeerId; +use primitives::Hash; +use message::{self, Message}; +use runtime_support::Hashable; + +//TODO: The queue is preallocated in multiqueue. Make it unbounded +const QUEUE_SIZE: u64 = 1 << 16; + +struct CandidateRequest { + id: message::RequestId, + completion: oneshot::Sender>, +} + +struct PeerConsensus { + candidate_fetch: Option, + candidate_available: Option, +} + +/// Consensus network protocol handler. Manages statements and candidate requests. +pub struct Consensus { + peers: HashMap, + our_candidate: Option<(Hash, Vec)>, + statement_sink: multiqueue::BroadcastFutSender, + statement_stream: multiqueue::BroadcastFutReceiver, + bft_message_sink: multiqueue::BroadcastFutSender, + bft_message_stream: multiqueue::BroadcastFutReceiver, +} + +impl Consensus { + /// Create a new instance. + pub fn new() -> Consensus { + let (statement_sink, statement_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE); + let (bft_sink, bft_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE); + Consensus { + peers: HashMap::new(), + our_candidate: None, + statement_sink: statement_sink, + statement_stream: statement_stream, + bft_message_sink: bft_sink, + bft_message_stream: bft_stream, + } + } + + /// Closes all notification streams. + pub fn restart(&mut self) { + let (statement_sink, statement_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE); + let (bft_sink, bft_stream) = multiqueue::broadcast_fut_queue(QUEUE_SIZE); + self.statement_sink = statement_sink; + self.statement_stream = statement_stream; + self.bft_message_sink = bft_sink; + self.bft_message_stream = bft_stream; + } + + /// Handle new connected peer. + pub fn new_peer(&mut self, _io: &mut SyncIo, _protocol: &Protocol, peer_id: PeerId, roles: &[message::Role]) { + if roles.iter().any(|r| *r == message::Role::Validator) { + trace!(target:"sync", "Registering validator {}", peer_id); + self.peers.insert(peer_id, PeerConsensus { + candidate_fetch: None, + candidate_available: None, + }); + } + } + + pub fn on_statement(&mut self, peer_id: PeerId, statement: message::Statement) { + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + // TODO: validate signature? + match &statement.statement { + &message::UnsignedStatement::Candidate(ref receipt) => peer.candidate_available = Some(Hash::from(receipt.blake2_256())), + &message::UnsignedStatement::Available(ref hash) => peer.candidate_available = Some(*hash), + &message::UnsignedStatement::Valid(_) | &message::UnsignedStatement::Invalid(_) => (), + } + if let Err(e) = self.statement_sink.try_send(statement) { + trace!(target:"sync", "Error broadcasting statement notification: {:?}", e); + } + } else { + trace!(target:"sync", "Ignored statement from unregistered peer {}", peer_id); + } + } + + pub fn statements(&self) -> multiqueue::BroadcastFutReceiver{ + self.statement_stream.add_stream() + } + + pub fn on_bft_message(&mut self, peer_id: PeerId, message: message::BftMessage) { + if self.peers.contains_key(&peer_id) { + // TODO: validate signature? + if let Err(e) = self.bft_message_sink.try_send(message) { + trace!(target:"sync", "Error broadcasting BFT message notification: {:?}", e); + } + } else { + trace!(target:"sync", "Ignored BFT statement from unregistered peer {}", peer_id); + } + } + + pub fn bft_messages(&self) -> multiqueue::BroadcastFutReceiver{ + self.bft_message_stream.add_stream() + } + + pub fn fetch_candidate(&mut self, io: &mut SyncIo, protocol: &Protocol, hash: &Hash) -> oneshot::Receiver> { + // Request from the first peer that has it available. + // TODO: random peer selection. + trace!(target:"sync", "Trying to fetch candidate {:?}", hash); + let (sender, receiver) = oneshot::channel(); + if let Some((peer_id, ref mut peer)) = self.peers.iter_mut() + .find(|&(_, ref peer)| peer.candidate_fetch.is_none() && peer.candidate_available.as_ref().map_or(false, |h| h == hash)) { + + trace!(target:"sync", "Fetching candidate from {}", peer_id); + let id = 0; //TODO: generate unique id + peer.candidate_fetch = Some(CandidateRequest { + id: id, + completion: sender, + }); + let request = message::CandidateRequest { + id: id, + hash: *hash, + }; + protocol.send_message(io, *peer_id, Message::CandidateRequest(request)); + } + // If no peer found `sender` is dropped and `receiver` is canceled immediatelly. + return receiver; + } + + pub fn send_statement(&mut self, io: &mut SyncIo, protocol: &Protocol, statement: message::Statement) { + // Broadcast statement to all validators. + trace!(target:"sync", "Broadcasting statement {:?}", statement); + for peer in self.peers.keys() { + protocol.send_message(io, *peer, Message::Statement(statement.clone())); + } + } + + pub fn send_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, message: message::BftMessage) { + // Broadcast message to all validators. + trace!(target:"sync", "Broadcasting BFT message {:?}", message); + for peer in self.peers.keys() { + protocol.send_message(io, *peer, Message::BftMessage(message.clone())); + } + } + + pub fn set_local_candidate(&mut self, candidate: Option<(Hash, Vec)>) { + trace!(target:"sync", "Set local candidate to {:?}", candidate.as_ref().map(|&(h, _)| h)); + self.our_candidate = candidate; + } + + pub fn on_candidate_request(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, request: message::CandidateRequest) { + let response = match self.our_candidate { + Some((ref hash, ref data)) if *hash == request.hash => Some(data.clone()), + _ => None, + }; + let msg = message::CandidateResponse { + id: request.id, + data: response, + }; + protocol.send_message(io, peer_id, Message::CandidateResponse(msg)); + } + + pub fn on_candidate_response(&mut self, io: &mut SyncIo, _protocol: &Protocol, peer_id: PeerId, response: message::CandidateResponse) { + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if let Some(request) = peer.candidate_fetch.take() { + if response.id == request.id { + if let Some(data) = response.data { + if let Err(e) = request.completion.send(data) { + trace!(target:"sync", "Error sending candidate data notification: {:?}", e); + } + } + } else { + trace!(target:"sync", "Unexpected candidate response from {}", peer_id); + io.disable_peer(peer_id); + } + } else { + trace!(target:"sync", "Unexpected candidate response from {}", peer_id); + io.disable_peer(peer_id); + } + } + } + + pub fn peer_disconnected(&mut self, _io: &mut SyncIo, _protocol: &Protocol, peer_id: PeerId) { + self.peers.remove(&peer_id); + } +} diff --git a/substrate/network/src/error.rs b/substrate/network/src/error.rs index 9583a29861d01..120cfe0b4f358 100644 --- a/substrate/network/src/error.rs +++ b/substrate/network/src/error.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see .? +//! Polkadot service possible errors. + use network::Error as NetworkError; use client; @@ -23,7 +25,7 @@ error_chain! { } links { - Client(client::error::Error, client::error::ErrorKind); + Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"]; } errors { diff --git a/substrate/network/src/lib.rs b/substrate/network/src/lib.rs index 71e6ab755f2e3..1c7fe9fa0e4c4 100644 --- a/substrate/network/src/lib.rs +++ b/substrate/network/src/lib.rs @@ -28,8 +28,12 @@ extern crate substrate_state_machine as state_machine; extern crate substrate_serializer as ser; extern crate substrate_client as client; extern crate substrate_runtime_support as runtime_support; +extern crate substrate_bft; extern crate serde; extern crate serde_json; +extern crate futures; +extern crate multiqueue; +extern crate ed25519; #[macro_use] extern crate serde_derive; #[macro_use] extern crate log; #[macro_use] extern crate bitflags; @@ -47,17 +51,21 @@ mod sync; mod protocol; mod io; mod message; -mod error; mod config; mod chain; mod blocks; +mod consensus; +pub mod error; #[cfg(test)] mod test; -pub use service::Service; +pub use service::{Service, FetchFuture, StatementStream, ConsensusService, BftMessageStream, TransactionPool, Params, ManageNetwork}; pub use protocol::{ProtocolStatus}; pub use sync::{Status as SyncStatus, SyncState}; pub use network::{NonReservedPeerMode, ConnectionFilter, ConnectionDirection, NetworkConfiguration}; +pub use message::{Statement, BftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal}; +pub use error::Error; +pub use config::{Role, ProtocolConfig}; // TODO: move it elsewhere fn header_hash(header: &primitives::Header) -> primitives::block::HeaderHash { diff --git a/substrate/network/src/message.rs b/substrate/network/src/message.rs index 793daa444f79e..52c78d95febb1 100644 --- a/substrate/network/src/message.rs +++ b/substrate/network/src/message.rs @@ -16,17 +16,15 @@ //! Network packet message types. These get serialized and put into the lower level protocol payload. -use std::borrow::Borrow; -use primitives::AuthorityId; -use primitives::block::{Number as BlockNumber, HeaderHash, Header, Body}; +use primitives::{AuthorityId, Hash}; +use primitives::block::{Number as BlockNumber, HeaderHash, Header, Body, Block}; use primitives::bft::Justification; use service::Role as RoleFlags; +use ed25519; pub type RequestId = u64; type Bytes = Vec; -type Signature = ::primitives::hash::H256; //TODO: - /// Configured node role. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum Role { @@ -40,10 +38,10 @@ pub enum Role { Collator, } -impl From for RoleFlags where T: Borrow<[Role]> { - fn from(roles: T) -> RoleFlags { +impl Role { + /// Convert enum to service flags. + pub fn as_flags(roles: &[Role]) -> RoleFlags { let mut flags = RoleFlags::NONE; - let roles: &[Role] = roles.borrow(); for r in roles { match *r { Role::Full => flags = flags | RoleFlags::FULL, @@ -126,6 +124,95 @@ pub enum Direction { Descending, } +/// A set of transactions. +pub type Transactions = Vec>; + +/// Statements circulated among peers. +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum UnsignedStatement { + /// Broadcast by a authority to indicate that this is his candidate for + /// inclusion. + /// + /// Broadcasting two different candidate messages per round is not allowed. + Candidate(Vec), + /// Broadcast by a authority to attest that the candidate with given digest + /// is valid. + Valid(Hash), + /// Broadcast by a authority to attest that the auxiliary data for a candidate + /// with given digest is available. + Available(Hash), + /// Broadcast by a authority to attest that the candidate with given digest + /// is invalid. + Invalid(Hash), +} + +/// A signed statement. +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct Statement { + /// The statement. + pub statement: UnsignedStatement, + /// The signature. + pub signature: ed25519::Signature, + /// The sender. + pub sender: AuthorityId, +} + +/// Communication that can occur between participants in consensus. +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum BftMessage { + /// A consensus message (proposal or vote) + Consensus(SignedConsensusMessage), + /// Auxiliary communication (just proof-of-lock for now). + Auxiliary(Justification), +} + +/// A localized proposal message. Contains two signed pieces of data. +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct SignedConsensusProposal { + /// The round number. + pub round_number: u32, + /// The proposal sent. + pub proposal: Block, + /// The digest of the proposal. + pub digest: Hash, + /// The sender of the proposal + pub sender: AuthorityId, + /// The signature on the message (propose, round number, digest) + pub digest_signature: ed25519::Signature, + /// The signature on the message (propose, round number, proposal) + pub full_signature: ed25519::Signature, +} + +/// A localized vote message, including the sender. +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub struct SignedConsensusVote { + /// The message sent. + pub vote: ConsensusVote, + /// The sender of the message + pub sender: AuthorityId, + /// The signature of the message. + pub signature: ed25519::Signature, +} + +/// Votes during a consensus round. +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum ConsensusVote { + /// Prepare to vote for proposal with digest D. + Prepare(u32, Hash), + /// Commit to proposal with digest D.. + Commit(u32, Hash), + /// Propose advancement to a new round. + AdvanceRound(u32), +} + +/// A localized message. +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +pub enum SignedConsensusMessage { + /// A proposal. + Propose(SignedConsensusProposal), + /// A vote. + Vote(SignedConsensusVote), +} #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] /// A network message. pub enum Message { @@ -137,6 +224,16 @@ pub enum Message { BlockResponse(BlockResponse), /// Block announce. BlockAnnounce(BlockAnnounce), + /// Transactions. + Transactions(Transactions), + /// Consensus statement. + Statement(Statement), + /// Candidate data request. + CandidateRequest(CandidateRequest), + /// Candidate response. + CandidateResponse(CandidateResponse), + /// BFT Consensus statement. + BftMessage(BftMessage), } #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] @@ -152,7 +249,7 @@ pub struct Status { /// Genesis block hash. pub genesis_hash: HeaderHash, /// Signatue of `best_hash` made with validator address. Required for the validator role. - pub validator_signature: Option, + pub validator_signature: Option, /// Validator address. Required for the validator role. pub validator_id: Option, /// Parachain id. Required for the collator role. @@ -176,6 +273,24 @@ pub struct BlockRequest { pub max: Option, } +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +/// Request candidate block data from a peer. +pub struct CandidateRequest { + /// Unique request id. + pub id: RequestId, + /// Candidate receipt hash. + pub hash: Hash, +} + +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +/// Candidate block data response. +pub struct CandidateResponse { + /// Unique request id. + pub id: RequestId, + /// Candidate data. Empty if the peer does not have the candidate anymore. + pub data: Option>, +} + #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] /// Response to `BlockRequest` pub struct BlockResponse { diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index 980bf441f5c2a..35f2ca0549788 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -18,14 +18,18 @@ use std::collections::{HashMap, HashSet, BTreeMap}; use std::{mem, cmp}; use std::sync::Arc; use std::time; -use parking_lot::RwLock; +use parking_lot::{RwLock, Mutex}; +use multiqueue; +use futures::sync::oneshot; use serde_json; use primitives::block::{HeaderHash, TransactionHash, Number as BlockNumber, Header, Id as BlockId}; +use primitives::Hash; use network::{PeerId, NodeId}; use message::{self, Message}; -use sync::{ChainSync, Status as SyncStatus}; -use service::Role; +use sync::{ChainSync, Status as SyncStatus, SyncState}; +use consensus::Consensus; +use service::{Role, TransactionPool}; use config::ProtocolConfig; use chain::Client; use io::SyncIo; @@ -44,10 +48,12 @@ pub struct Protocol { chain: Arc, genesis_hash: HeaderHash, sync: RwLock, - /// All connected peers + consensus: Mutex, + // All connected peers peers: RwLock>, - /// Connected peers pending Status message. + // Connected peers pending Status message. handshaking_peers: RwLock>, + transaction_pool: Arc, } /// Syncing status and statistics @@ -75,8 +81,8 @@ struct Peer { block_request: Option, /// Request timestamp request_timestamp: Option, - /// Holds a set of transactions recently sent to this peer to avoid spamming. - _last_sent_transactions: HashSet, + /// Holds a set of transactions known to this peer. + known_transactions: HashSet, /// Request counter, next_request_id: message::RequestId, } @@ -104,15 +110,17 @@ pub struct TransactionStats { impl Protocol { /// Create a new instance. - pub fn new(config: ProtocolConfig, chain: Arc) -> error::Result { + pub fn new(config: ProtocolConfig, chain: Arc, transaction_pool: Arc) -> error::Result { let info = chain.info()?; let protocol = Protocol { config: config, chain: chain, genesis_hash: info.chain.genesis_hash, sync: RwLock::new(ChainSync::new(&info)), + consensus: Mutex::new(Consensus::new()), peers: RwLock::new(HashMap::new()), handshaking_peers: RwLock::new(HashMap::new()), + transaction_pool: transaction_pool, }; Ok(protocol) } @@ -168,7 +176,12 @@ impl Protocol { }, Message::BlockAnnounce(announce) => { self.on_block_announce(io, peer_id, announce); - } + }, + Message::Statement(s) => self.on_statement(io, peer_id, s), + Message::CandidateRequest(r) => self.on_candidate_request(io, peer_id, r), + Message::CandidateResponse(r) => self.on_candidate_response(io, peer_id, r), + Message::BftMessage(m) => self.on_bft_message(io, peer_id, m), + Message::Transactions(m) => self.on_transactions(io, peer_id, m), } } @@ -209,11 +222,12 @@ impl Protocol { peers.remove(&peer).is_some() }; if removed { + self.consensus.lock().peer_disconnected(io, self, peer); self.sync.write().peer_disconnected(io, self, peer); } } - pub fn on_block_request(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest) { + fn on_block_request(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest) { trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, request.from, request.to, request.max); let mut blocks = Vec::new(); let mut id = match request.from { @@ -264,12 +278,63 @@ impl Protocol { self.send_message(io, peer, Message::BlockResponse(response)) } - pub fn on_block_response(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest, response: message::BlockResponse) { + fn on_block_response(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest, response: message::BlockResponse) { // TODO: validate response trace!(target: "sync", "BlockResponse {} from {} with {} blocks", response.id, peer, response.blocks.len()); self.sync.write().on_block_data(io, self, peer, request, response); } + fn on_candidate_request(&self, io: &mut SyncIo, peer: PeerId, request: message::CandidateRequest) { + trace!(target: "sync", "CandidateRequest {} from {} for {}", request.id, peer, request.hash); + self.consensus.lock().on_candidate_request(io, self, peer, request); + } + + fn on_candidate_response(&self, io: &mut SyncIo, peer: PeerId, response: message::CandidateResponse) { + trace!(target: "sync", "CandidateResponse {} from {} with {:?} bytes", response.id, peer, response.data.as_ref().map(|d| d.len())); + self.consensus.lock().on_candidate_response(io, self, peer, response); + } + + fn on_statement(&self, _io: &mut SyncIo, peer: PeerId, statement: message::Statement) { + trace!(target: "sync", "Statement from {}: {:?}", peer, statement); + self.consensus.lock().on_statement(peer, statement); + } + + fn on_bft_message(&self, _io: &mut SyncIo, peer: PeerId, message: message::BftMessage) { + trace!(target: "sync", "BFT message from {}: {:?}", peer, message); + self.consensus.lock().on_bft_message(peer, message); + } + + /// See `ConsensusService` trait. + pub fn send_bft_message(&self, io: &mut SyncIo, message: message::BftMessage) { + self.consensus.lock().send_bft_message(io, self, message) + } + + /// See `ConsensusService` trait. + pub fn bft_messages(&self) -> multiqueue::BroadcastFutReceiver { + self.consensus.lock().bft_messages() + } + + /// See `ConsensusService` trait. + pub fn statements(&self) -> multiqueue::BroadcastFutReceiver { + self.consensus.lock().statements() + } + + /// See `ConsensusService` trait. + pub fn fetch_candidate(&self, io: &mut SyncIo, hash: &Hash) -> oneshot::Receiver> { + self.consensus.lock().fetch_candidate(io, self, hash) + } + + /// See `ConsensusService` trait. + pub fn send_statement(&self, io: &mut SyncIo, statement: message::Statement) { + self.consensus.lock().send_statement(io, self, statement) + } + + /// See `ConsensusService` trait. + pub fn set_local_candidate(&self, candidate: Option<(Hash, Vec)>) { + self.consensus.lock().set_local_candidate(candidate) + } + + /// Perform time based maintenance. pub fn tick(&self, io: &mut SyncIo) { self.maintain_peers(io); } @@ -334,12 +399,12 @@ impl Protocol { let peer = Peer { protocol_version: status.version, - roles: status.roles.into(), + roles: message::Role::as_flags(&status.roles), best_hash: status.best_hash, best_number: status.best_number, block_request: None, request_timestamp: None, - _last_sent_transactions: HashSet::new(), + known_transactions: HashSet::new(), next_request_id: 0, }; peers.insert(peer_id.clone(), peer); @@ -347,6 +412,42 @@ impl Protocol { debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id)); } self.sync.write().new_peer(io, self, peer_id); + self.consensus.lock().new_peer(io, self, peer_id, &status.roles); + } + + /// Called when peer sends us new transactions + fn on_transactions(&self, _io: &mut SyncIo, peer_id: PeerId, transactions: message::Transactions) { + // Accept transactions only when fully synced + if self.sync.read().status().state != SyncState::Idle { + trace!(target: "sync", "{} Ignoring transactions while syncing", peer_id); + return; + } + trace!(target: "sync", "Received {} transactions from {}", peer_id, transactions.len()); + let mut peers = self.peers.write(); + if let Some(ref mut peer) = peers.get_mut(&peer_id) { + for t in transactions { + if let Some(hash) = self.transaction_pool.import(&t) { + peer.known_transactions.insert(hash); + } + } + } + } + + /// Called when peer sends us new transactions + pub fn propagate_transactions(&self, io: &mut SyncIo, transactions: &[(TransactionHash, Vec)]) { + // Accept transactions only when fully synced + if self.sync.read().status().state != SyncState::Idle { + return; + } + let mut peers = self.peers.write(); + for (peer_id, ref mut peer) in peers.iter_mut() { + let to_send: Vec<_> = transactions.iter().filter_map(|&(hash, ref t)| + if peer.known_transactions.insert(hash.clone()) { Some(t.clone()) } else { None }).collect(); + if !to_send.is_empty() { + trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), peer_id); + self.send_message(io, *peer_id, Message::Transactions(to_send)); + } + } } /// Send Status message @@ -373,6 +474,7 @@ impl Protocol { sync.clear(); peers.clear(); handshaking_peers.clear(); + self.consensus.lock().restart(); } pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce) { @@ -384,9 +486,6 @@ impl Protocol { self.sync.write().update_chain_info(&header); } - pub fn on_new_transactions(&self) { - } - pub fn transactions_stats(&self) -> BTreeMap { BTreeMap::new() } diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index e3832249db11b..1688d1e717057 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -17,25 +17,42 @@ use std::sync::Arc; use std::collections::{BTreeMap}; use std::io; +use multiqueue; +use futures::sync::oneshot; use network::{NetworkProtocolHandler, NetworkService, NetworkContext, HostInfo, PeerId, ProtocolId, NetworkConfiguration , NonReservedPeerMode, ErrorKind}; use primitives::block::{TransactionHash, Header}; +use primitives::Hash; use core_io::{TimerToken}; use io::NetSyncIo; use protocol::{Protocol, ProtocolStatus, PeerInfo as ProtocolPeerInfo, TransactionStats}; use config::{ProtocolConfig}; use error::Error; use chain::Client; +use message::{Statement, BftMessage}; /// Polkadot devp2p protocol id pub const DOT_PROTOCOL_ID: ProtocolId = *b"dot"; +/// Type that represents fetch completion future. +pub type FetchFuture = oneshot::Receiver>; +/// Type that represents statement stream. +pub type StatementStream = multiqueue::BroadcastFutReceiver; +/// Type that represents bft messages stream. +pub type BftMessageStream = multiqueue::BroadcastFutReceiver; + bitflags! { + /// Node roles bitmask. pub struct Role: u32 { + /// No network. const NONE = 0b00000000; + /// Full node, doe not participate in consensus. const FULL = 0b00000001; + /// Light client node. const LIGHT = 0b00000010; + /// Act as a validator. const VALIDATOR = 0b00000100; + /// Act as a collator. const COLLATOR = 0b00001000; } } @@ -52,6 +69,39 @@ pub trait SyncProvider: Send + Sync { fn transactions_stats(&self) -> BTreeMap; } +/// Transaction pool interface +pub trait TransactionPool: Send + Sync { + /// Get transactions from the pool that are ready to be propagated. + fn transactions(&self) -> Vec<(TransactionHash, Vec)>; + /// Import a transction into the pool. + fn import(&self, transaction: &[u8]) -> Option; +} + +/// ConsensusService +pub trait ConsensusService: Send + Sync { + /// Get statement stream. + fn statements(&self) -> multiqueue::BroadcastFutReceiver; + /// Send out a statement. + fn send_statement(&self, statement: Statement); + /// Maintain connectivity to given addresses. + fn connect_to_authorities(&self, addresses: &[String]); + /// Fetch candidate. + fn fetch_candidate(&self, hash: &Hash) -> oneshot::Receiver>; + /// Note local candidate. Accepts candidate receipt hash and candidate data. + /// Pass `None` to clear the candidate. + fn set_local_candidate(&self, candidate: Option<(Hash, Vec)>); + + /// Get BFT message stream. + fn bft_messages(&self) -> multiqueue::BroadcastFutReceiver; + /// Send out a BFT message. + fn send_bft_message(&self, message: BftMessage); +} + +/// devp2p Protocol handler +struct ProtocolHandler { + protocol: Protocol, +} + /// Peer connection information #[derive(Debug)] pub struct PeerInfo { @@ -77,6 +127,8 @@ pub struct Params { pub network_config: NetworkConfiguration, /// Polkadot relay chain access point. pub chain: Arc, + /// Transaction pool. + pub transaction_pool: Arc, } /// Polkadot network service. Handles network IO and manages connectivity. @@ -90,13 +142,11 @@ pub struct Service { impl Service { /// Creates and register protocol with the network service pub fn new(params: Params) -> Result, Error> { - let service = NetworkService::new(params.network_config.clone(), None)?; - let sync = Arc::new(Service { network: service, handler: Arc::new(ProtocolHandler { - protocol: Protocol::new(params.config, params.chain.clone())?, + protocol: Protocol::new(params.config, params.chain.clone(), params.transaction_pool)?, }), }); @@ -109,8 +159,10 @@ impl Service { } /// Called when new transactons are imported by the client. - pub fn on_new_transactions(&self) { - self.handler.protocol.on_new_transactions() + pub fn on_new_transactions(&self, transactions: &[(TransactionHash, Vec)]) { + self.network.with_context(DOT_PROTOCOL_ID, |context| { + self.handler.protocol.propagate_transactions(&mut NetSyncIo::new(context), transactions); + }); } fn start(&self) { @@ -130,6 +182,12 @@ impl Service { } } +impl Drop for Service { + fn drop(&mut self) { + self.stop(); + } +} + impl SyncProvider for Service { /// Get sync status fn status(&self) -> ProtocolStatus { @@ -168,9 +226,41 @@ impl SyncProvider for Service { } } -struct ProtocolHandler { - /// Protocol handler - protocol: Protocol, +/// ConsensusService +impl ConsensusService for Service { + fn statements(&self) -> multiqueue::BroadcastFutReceiver { + self.handler.protocol.statements() + } + + fn connect_to_authorities(&self, _addresses: &[String]) { + //TODO: implement me + } + + fn fetch_candidate(&self, hash: &Hash) -> oneshot::Receiver> { + self.network.with_context_eval(DOT_PROTOCOL_ID, |context| { + self.handler.protocol.fetch_candidate(&mut NetSyncIo::new(context), hash) + }).expect("DOT Service is registered") + } + + fn send_statement(&self, statement: Statement) { + self.network.with_context(DOT_PROTOCOL_ID, |context| { + self.handler.protocol.send_statement(&mut NetSyncIo::new(context), statement); + }); + } + + fn set_local_candidate(&self, candidate: Option<(Hash, Vec)>) { + self.handler.protocol.set_local_candidate(candidate) + } + + fn bft_messages(&self) -> BftMessageStream { + self.handler.protocol.bft_messages() + } + + fn send_bft_message(&self, message: BftMessage) { + self.network.with_context(DOT_PROTOCOL_ID, |context| { + self.handler.protocol.send_bft_message(&mut NetSyncIo::new(context), message); + }); + } } impl NetworkProtocolHandler for ProtocolHandler { diff --git a/substrate/network/src/sync.rs b/substrate/network/src/sync.rs index c952bd5f2e44a..60640664df364 100644 --- a/substrate/network/src/sync.rs +++ b/substrate/network/src/sync.rs @@ -84,9 +84,13 @@ impl ChainSync { } } + fn best_seen_block(&self) -> Option { + self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number) + } + /// Returns sync status pub fn status(&self) -> Status { - let best_seen = self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number); + let best_seen = self.best_seen_block(); let state = match &best_seen { &Some(n) if n > self.best_queued_number && n - self.best_queued_number > 5 => SyncState::Downloading, _ => SyncState::Idle, @@ -97,6 +101,7 @@ impl ChainSync { } } + /// Handle new connected peer. pub fn new_peer(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId) { if let Some(info) = protocol.peer_info(peer_id) { match (protocol.chain().block_status(&BlockId::Hash(info.best_hash)), info.best_number) { @@ -211,6 +216,7 @@ impl ChainSync { vec![] }; + let best_seen = self.best_seen_block(); // Blocks in the response/drain should be in ascending order. for block in new_blocks { let origin = block.origin; @@ -220,7 +226,9 @@ impl ChainSync { let number = header.number; let hash = header_hash(&header); let parent = header.parent_hash; + let is_best = best_seen.as_ref().map_or(false, |n| number >= *n); let result = protocol.chain().import( + is_best, header, justification, block.body diff --git a/substrate/network/src/test/mod.rs b/substrate/network/src/test/mod.rs index 8aac22f16ee7d..721640f9882aa 100644 --- a/substrate/network/src/test/mod.rs +++ b/substrate/network/src/test/mod.rs @@ -19,14 +19,15 @@ mod sync; use std::collections::{VecDeque, HashSet, HashMap}; use std::sync::Arc; use parking_lot::RwLock; -use client::{self, genesis}; +use client::{self, genesis, BlockOrigin}; use client::block_builder::BlockBuilder; -use primitives::block::Id as BlockId; +use primitives::block::{Id as BlockId, TransactionHash}; use primitives; use executor; use io::SyncIo; use protocol::Protocol; use config::ProtocolConfig; +use service::TransactionPool; use network::{PeerId, SessionInfo, Error as NetworkError}; use test_runtime::genesismap::{GenesisConfig, additional_storage_with_genesis}; use runtime_support::Hashable; @@ -190,7 +191,7 @@ impl Peer { trace!("Generating {}, (#{})", primitives::block::HeaderHash::from(block.header.blake2_256()), block.header.number); let justification = Self::justify(&block.header); let justified = self.client.check_justification(block.header, justification).unwrap(); - self.client.import_block(justified, Some(block.transactions)).unwrap(); + self.client.import_block(BlockOrigin::File, justified, Some(block.transactions)).unwrap(); } } @@ -215,6 +216,18 @@ impl Peer { } } +struct EmptyTransactionPool; + +impl TransactionPool for EmptyTransactionPool { + fn transactions(&self) -> Vec<(TransactionHash, Vec)> { + Vec::new() + } + + fn import(&self, _transaction: &[u8]) -> Option { + None + } +} + pub struct TestNet { pub peers: Vec>, pub started: bool, @@ -248,7 +261,8 @@ impl TestNet { for _ in 0..n { let client = Arc::new(client::new_in_mem(Executor::new(), Self::prepare_genesis).unwrap()); - let sync = Protocol::new(config.clone(), client.clone()).unwrap(); + let tx_pool = Arc::new(EmptyTransactionPool); + let sync = Protocol::new(config.clone(), client.clone(), tx_pool).unwrap(); net.peers.push(Arc::new(Peer { sync: sync, client: client, diff --git a/substrate/network/test/mod.rs b/substrate/network/test/mod.rs deleted file mode 100644 index 43f3abb09510a..0000000000000 --- a/substrate/network/test/mod.rs +++ /dev/null @@ -1,254 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -mod sync; - -use std::collections::{VecDeque, HashSet, HashMap}; -use std::sync::Arc; -use parking_lot::RwLock; -use client::{self, BlockId}; -use substrate_executor as executor; -use io::SyncIo; -use protocol::Protocol; -use config::ProtocolConfig; -use network::{PeerId, SessionInfo, Error as NetworkError}; - -pub struct TestIo<'p> { - pub queue: &'p RwLock>, - pub sender: Option, - pub to_disconnect: HashSet, - pub packets: Vec, - pub peers_info: HashMap, -} - -impl<'p> TestIo<'p> where { - pub fn new(queue: &'p RwLock>, sender: Option) -> TestIo<'p> { - TestIo { - queue: queue, - sender: sender, - to_disconnect: HashSet::new(), - packets: Vec::new(), - peers_info: HashMap::new(), - } - } -} - -impl<'p> Drop for TestIo<'p> { - fn drop(&mut self) { - self.queue.write().extend(self.packets.drain(..)); - } -} - -impl<'p> SyncIo for TestIo<'p> { - fn disable_peer(&mut self, peer_id: PeerId) { - self.disconnect_peer(peer_id); - } - - fn disconnect_peer(&mut self, peer_id: PeerId) { - self.to_disconnect.insert(peer_id); - } - - fn is_expired(&self) -> bool { - false - } - - fn send(&mut self, peer_id: PeerId, data: Vec) -> Result<(), NetworkError> { - self.packets.push(TestPacket { - data: data, - recipient: peer_id, - }); - Ok(()) - } - - fn peer_info(&self, peer_id: PeerId) -> String { - self.peers_info.get(&peer_id) - .cloned() - .unwrap_or_else(|| peer_id.to_string()) - } - - fn peer_session_info(&self, _peer_id: PeerId) -> Option { - None - } -} - -/// Mocked subprotocol packet -pub struct TestPacket { - pub data: Vec, - pub recipient: PeerId, -} - -pub struct Peer { - pub chain: Arc>, - pub sync: Protocol, - pub queue: RwLock>, -} - -impl Peer { - /// Called after blockchain has been populated to updated current state. - fn start(&self) { - // Update the sync state to the lates chain state. - let info = self.chain.info().expect("In-mem chain does not fail"); - let header = self.chain.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); - self.sync.on_block_imported(&header); - } - - /// Called on connection to other indicated peer. - fn on_connect(&self, other: PeerId) { - self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other); - } - - /// Called on disconnect from other indicated peer. - fn on_disconnect(&self, other: PeerId) { - let mut io = TestIo::new(&self.queue, Some(other)); - self.sync.on_peer_disconnected(&mut io, other); - } - - /// Receive a message from another peer. Return a set of peers to disconnect. - fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet { - let mut io = TestIo::new(&self.queue, Some(from)); - self.sync.handle_packet(&mut io, from, &msg.data); - self.flush(); - io.to_disconnect.clone() - } - - /// Produce the next pending message to send to another peer. - fn pending_message(&self) -> Option { - self.flush(); - self.queue.write().pop_front() - } - - /// Whether this peer is done syncing (has no messages to send). - fn is_done(&self) -> bool { - self.queue.read().is_empty() - } - - /// Execute a "sync step". This is called for each peer after it sends a packet. - fn sync_step(&self) { - self.flush(); - self.sync.tick(&mut TestIo::new(&self.queue, None)); - } - - /// Restart sync for a peer. - fn restart_sync(&self) { - self.sync.abort(); - } - - fn flush(&self) { - } -} - -pub struct TestNet { - pub peers: Vec>, - pub started: bool, - pub disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to) -} - -impl TestNet { - pub fn new(n: usize) -> Self { - Self::new_with_config(n, ProtocolConfig::default()) - } - - pub fn new_with_config(n: usize, config: ProtocolConfig) -> Self { - let mut net = TestNet { - peers: Vec::new(), - started: false, - disconnect_events: Vec::new(), - }; - for _ in 0..n { - let chain = Arc::new(client::new_in_mem(executor::executor()).unwrap()); - let sync = Protocol::new(config.clone(), chain.clone()).unwrap(); - net.peers.push(Arc::new(Peer { - sync: sync, - chain: chain, - queue: RwLock::new(VecDeque::new()), - })); - } - net - } - - pub fn peer(&self, i: usize) -> &Peer { - &self.peers[i] - } - - pub fn start(&mut self) { - if self.started { - return; - } - for peer in 0..self.peers.len() { - self.peers[peer].start(); - for client in 0..self.peers.len() { - if peer != client { - self.peers[peer].on_connect(client as PeerId); - } - } - } - self.started = true; - } - - pub fn sync_step(&mut self) { - for peer in 0..self.peers.len() { - let packet = self.peers[peer].pending_message(); - if let Some(packet) = packet { - let disconnecting = { - let recipient = packet.recipient; - trace!("--- {} -> {} ---", peer, recipient); - let to_disconnect = self.peers[recipient].receive_message(peer as PeerId, packet); - for d in &to_disconnect { - // notify this that disconnecting peers are disconnecting - self.peers[recipient].on_disconnect(*d as PeerId); - self.disconnect_events.push((peer, *d)); - } - to_disconnect - }; - for d in &disconnecting { - // notify other peers that this peer is disconnecting - self.peers[*d].on_disconnect(peer as PeerId); - } - } - - self.sync_step_peer(peer); - } - } - - pub fn sync_step_peer(&mut self, peer_num: usize) { - self.peers[peer_num].sync_step(); - } - - pub fn restart_peer(&mut self, i: usize) { - self.peers[i].restart_sync(); - } - - pub fn sync(&mut self) -> u32 { - self.start(); - let mut total_steps = 0; - while !self.done() { - self.sync_step(); - total_steps += 1; - } - total_steps - } - - pub fn sync_steps(&mut self, count: usize) { - self.start(); - for _ in 0..count { - self.sync_step(); - } - } - - pub fn done(&self) -> bool { - self.peers.iter().all(|p| p.is_done()) - } -} diff --git a/substrate/rpc/src/state/mod.rs b/substrate/rpc/src/state/mod.rs index b8977f30859ae..ce4b0a8b91149 100644 --- a/substrate/rpc/src/state/mod.rs +++ b/substrate/rpc/src/state/mod.rs @@ -21,6 +21,7 @@ mod error; #[cfg(test)] mod tests; +use std::sync::Arc; use client::{self, Client}; use primitives::block; use primitives::storage::{StorageKey, StorageData}; @@ -41,16 +42,16 @@ build_rpc_trait! { } } -impl StateApi for Client where +impl StateApi for Arc> where B: client::backend::Backend + Send + Sync + 'static, E: state_machine::CodeExecutor + Send + Sync + 'static, client::error::Error: From<<::State as state_machine::backend::Backend>::Error>, { fn storage(&self, key: StorageKey, block: block::HeaderHash) -> Result { - Ok(self.storage(&block::Id::Hash(block), &key)?) + Ok(self.as_ref().storage(&block::Id::Hash(block), &key)?) } fn call(&self, method: String, data: Vec, block: block::HeaderHash) -> Result> { - Ok(self.call(&block::Id::Hash(block), &method, &data)?.return_data) + Ok(self.as_ref().call(&block::Id::Hash(block), &method, &data)?.return_data) } } diff --git a/substrate/rpc/src/state/tests.rs b/substrate/rpc/src/state/tests.rs index 9bae8b1cd5610..3babe02c5c1d8 100644 --- a/substrate/rpc/src/state/tests.rs +++ b/substrate/rpc/src/state/tests.rs @@ -30,7 +30,7 @@ fn should_return_storage() { digest: Default::default(), }; - let client = client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap(); + let client = Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap()); let genesis_hash = test_genesis_block.blake2_256().into(); assert_matches!( @@ -51,7 +51,7 @@ fn should_call_contract() { digest: Default::default(), }; - let client = client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap(); + let client = Arc::new(client::new_in_mem(executor::WasmExecutor, || (test_genesis_block.clone(), vec![])).unwrap()); let genesis_hash = test_genesis_block.blake2_256().into(); assert_matches!(