diff --git a/Cargo.lock b/Cargo.lock index 98bfea42fb20..c96f0b2d5e47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -519,6 +519,24 @@ dependencies = [ "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "console_error_panic_hook" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "wasm-bindgen 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "console_log" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "web-sys 0.3.32 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "const-random" version = "0.1.6" @@ -960,15 +978,6 @@ dependencies = [ "libc 0.2.65 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "exit-future" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "exit-future" version = "0.2.0" @@ -1386,14 +1395,6 @@ dependencies = [ "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "futures01" -version = "0.1.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "fxhash" version = "0.2.1" @@ -3509,7 +3510,6 @@ dependencies = [ "derive_more 0.99.2 (registry+https://github.com/rust-lang/crates.io-index)", "exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "futures01 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb-memorydb 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb-rocksdb 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3528,27 +3528,35 @@ dependencies = [ "sp-consensus 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", "sp-core 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", "sp-runtime 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "polkadot-cli" version = "0.7.9" dependencies = [ + "console_error_panic_hook 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "console_log 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "js-sys 0.3.32 (registry+https://github.com/rust-lang/crates.io-index)", + "kvdb-memorydb 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libp2p 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "polkadot-service 0.7.9", "sc-cli 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", + "sc-network 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", + "sc-service 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", "structopt 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "wasm-bindgen 0.2.55 (registry+https://github.com/rust-lang/crates.io-index)", + "wasm-bindgen-futures 0.3.27 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "polkadot-collator" version = "0.7.9" dependencies = [ - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3564,7 +3572,7 @@ dependencies = [ "sp-consensus 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", "sp-core 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", "sp-keyring 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3593,7 +3601,6 @@ version = "0.7.9" dependencies = [ "arrayvec 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3734,7 +3741,6 @@ dependencies = [ name = "polkadot-service" version = "0.7.9" dependencies = [ - "exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3789,11 +3795,9 @@ dependencies = [ name = "polkadot-validation" version = "0.7.9" dependencies = [ - "async-std 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "bitvec 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3821,8 +3825,7 @@ dependencies = [ "sp-timestamp 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", "sp-transaction-pool-api 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", "sp-trie 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -4434,7 +4437,7 @@ dependencies = [ "sp-state-machine 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)", "structopt 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -5999,7 +6002,7 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.2" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -6073,8 +6076,6 @@ version = "0.2.0-alpha.6" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-sync 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -6952,6 +6953,8 @@ dependencies = [ "checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" "checksum clear_on_drop 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "97276801e127ffb46b66ce23f35cc96bd454fa311294bced4bbace7baa8b1d17" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +"checksum console_error_panic_hook 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "b8d976903543e0c48546a91908f21588a680a8c8f984df9a5d69feccb2b2a211" +"checksum console_log 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1e7871d2947441b0fdd8e2bd1ce2a2f75304f896582c0d572162d48290683c48" "checksum const-random 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "7b641a8c9867e341f3295564203b1c250eb8ce6cb6126e007941f78c4d2ed7fe" "checksum const-random-macro 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c750ec12b83377637110d5a57f5ae08e895b06c4b16e2bdbf1a94ef717428c59" "checksum constant_time_eq 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "995a44c877f9212528ccc74b21a232f66ad69001e40ede5bcee2ac9ef2657120" @@ -7001,7 +7004,6 @@ dependencies = [ "checksum erased-serde 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3beee4bc16478a1b26f2e80ad819a52d24745e292f521a63c16eea5f74b7eb60" "checksum errno 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "c2a071601ed01b988f896ab14b95e67335d1eeb50190932a1320f7fe3cadc84e" "checksum errno-dragonfly 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "14ca354e36190500e1e1fb267c647932382b54053c50b14970856c0b00a35067" -"checksum exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d8013f441e38e31c670e7f34ec8f1d5d3a2bd9d303c1ff83976ca886005e8f48" "checksum exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e43f2f1833d64e33f15592464d6fdd70f349dda7b1a53088eb83cd94014008c5" "checksum faerie 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f902f2af041f6c7177a2a04f805687cdc71e69c7cbef059a2755d8923f4cd7a8" "checksum failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f8273f13c977665c5db7eb2b99ae520952fe5ac831ae4cd09d80c4c7042b5ed9" @@ -7046,7 +7048,6 @@ dependencies = [ "checksum futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6" "checksum futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c0d66274fb76985d3c62c886d1da7ac4c0903a8c9f754e8fe0f35a6a6cc39e76" "checksum futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)" = "5ce968633c17e5f97936bd2797b6e38fb56cf16a7422319f7ec2e30d3c470e8d" -"checksum futures01 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "7ef8cbbf52909170053540c6c05a62433ddb60662dabee714e2a882caa864f22" "checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" "checksum gcc 0.3.55 (registry+https://github.com/rust-lang/crates.io-index)" = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" "checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" @@ -7418,7 +7419,7 @@ dependencies = [ "checksum tiny-keccak 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d8a021c69bb74a44ccedb824a046447e2c84a01df9e5c20779750acb38e11b2" "checksum tiny-keccak 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2953ca5148619bc99695c1274cb54c5275bbb913c6adad87e72eaf8db9787f69" "checksum tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5a09c0b5bb588872ab2f09afa13ee6e9dac11e10a0ec9e8e3ba39a5a5d530af6" -"checksum tokio 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "2e765bf9f550bd9b8a970633ca3b56b8120c4b6c5dcbe26a93744cb02fee4b17" +"checksum tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bcced6bb623d4bff3739c176c415f13c418f426395c169c9c3cd9a492c715b16" "checksum tokio-buf 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fb220f46c53859a4b7ec083e41dec9778ff0b1851c0942b211edb89e0ccdc46" "checksum tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5c501eceaf96f0e1793cf26beb63da3d11c738c4a943fdf3746d81d64684c39f" "checksum tokio-codec 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)" = "9f5d22fd1e84bd4045d28813491cb7d7caae34d45c80517c2213f09a85e8787a" diff --git a/availability-store/Cargo.toml b/availability-store/Cargo.toml index 1026e2aa2134..369906ec8379 100644 --- a/availability-store/Cargo.toml +++ b/availability-store/Cargo.toml @@ -12,9 +12,8 @@ polkadot-runtime = { path = "../runtime" } parking_lot = "0.9.0" derive_more = "0.99" log = "0.4.8" -futures01 = "0.1.17" -futures = { package = "futures", version = "0.3.1", features = ["compat"] } -tokio = "0.1.7" +futures = "0.3.1" +tokio = { version = "0.2.4", features = ["rt-core"] } exit-future = "0.2.0" codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] } sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } diff --git a/availability-store/src/lib.rs b/availability-store/src/lib.rs index 7ccd87001333..730b10052f6a 100644 --- a/availability-store/src/lib.rs +++ b/availability-store/src/lib.rs @@ -23,7 +23,7 @@ #![warn(missing_docs)] use futures::prelude::*; -use futures::channel::{mpsc, oneshot}; +use futures::{channel::{mpsc, oneshot}, task::Spawn}; use keystore::KeyStorePtr; use polkadot_primitives::{ Hash, Block, @@ -38,7 +38,7 @@ use client::{ BlockchainEvents, BlockBody, }; use sp_api::ApiExt; - +use std::pin::Pin; use log::warn; use std::sync::Arc; @@ -58,10 +58,7 @@ use worker::{ use store::{Store as InnerStore}; /// Abstraction over an executor that lets you spawn tasks in the background. -pub(crate) type TaskExecutor = - Arc + Send> - > + Send + Sync>; +pub(crate) type TaskExecutor = Arc; const LOG_TARGET: &str = "availability"; @@ -110,7 +107,7 @@ pub trait ProvideGossipMessages { fn gossip_messages_for( &self, topic: Hash, - ) -> Box + Send + Unpin>; + ) -> Pin + Send>>; /// Gossip an erasure chunk message. fn gossip_erasure_chunk( @@ -155,6 +152,7 @@ impl Store { /// /// Creating a store among other things starts a background worker thread which /// handles most of the write operations to the storage. + #[cfg(not(target_os = "unknown"))] pub fn new(config: Config, gossip: PGM) -> io::Result where PGM: ProvideGossipMessages + Send + Sync + Clone + 'static { diff --git a/availability-store/src/store.rs b/availability-store/src/store.rs index 5458a64b1712..43a1d898b02b 100644 --- a/availability-store/src/store.rs +++ b/availability-store/src/store.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +#[cfg(not(target_os = "unknown"))] use kvdb_rocksdb::{Database, DatabaseConfig}; use kvdb::{KeyValueDB, DBTransaction}; use codec::{Encode, Decode}; @@ -82,6 +83,7 @@ fn erasure_roots_in_relay_chain_block_key(relay_block: &Hash) -> Vec { impl Store { /// Create a new `Store` with given condig on disk. + #[cfg(not(target_os = "unknown"))] pub(super) fn new(config: Config) -> io::Result { let mut db_config = DatabaseConfig::with_columns(Some(columns::NUM_COLUMNS)); diff --git a/availability-store/src/worker.rs b/availability-store/src/worker.rs index c0ee49959a44..10a879582ec0 100644 --- a/availability-store/src/worker.rs +++ b/availability-store/src/worker.rs @@ -37,11 +37,10 @@ use polkadot_primitives::parachain::{ CandidateReceipt, ParachainHost, ValidatorId, ValidatorPair, AvailableMessages, BlockData, ErasureChunk, }; -use futures::channel::{mpsc, oneshot}; -use futures::{FutureExt, Sink, SinkExt, TryFutureExt, StreamExt, future::select}; +use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::SpawnExt}; use keystore::KeyStorePtr; -use tokio::runtime::current_thread::{Handle, Runtime as LocalRuntime}; +use tokio::runtime::{Handle, Runtime as LocalRuntime}; use crate::{LOG_TARGET, Data, TaskExecutor, ProvideGossipMessages, erasure_coding_topic}; use crate::store::Store; @@ -308,7 +307,7 @@ where // Called on startup of the worker to register listeners for all awaited chunks. fn register_listeners( &mut self, - runtime_handle: &mut Handle, + runtime_handle: &Handle, sender: &mut mpsc::UnboundedSender, ) { if let Some(awaited_chunks) = self.availability_store.awaited_chunks() { @@ -327,7 +326,7 @@ where fn register_chunks_listener( &mut self, - runtime_handle: &mut Handle, + runtime_handle: &Handle, sender: &mut mpsc::UnboundedSender, relay_parent: Hash, erasure_root: Hash, @@ -354,18 +353,14 @@ where self.registered_gossip_streams.insert(topic, signal); - let _ = runtime_handle.spawn( - select(fut.boxed(), exit) - .map(|_| Ok(())) - .compat() - ); + let _ = runtime_handle.spawn(select(fut.boxed(), exit).map(drop)); Ok(()) } fn on_parachain_blocks_received( &mut self, - runtime_handle: &mut Handle, + runtime_handle: &Handle, sender: &mut mpsc::UnboundedSender, relay_parent: Hash, blocks: Vec<(CandidateReceipt, Option<(BlockData, AvailableMessages)>)>, @@ -450,7 +445,7 @@ where // we don't have that piece, and then it registers a listener. fn on_listen_for_chunks_received( &mut self, - runtime_handle: &mut Handle, + runtime_handle: &Handle, sender: &mut mpsc::UnboundedSender, relay_parent: Hash, candidate_hash: Hash, @@ -496,11 +491,11 @@ where let mut runtime = LocalRuntime::new()?; let mut sender = worker.sender.clone(); - let mut runtime_handle = runtime.handle(); + let runtime_handle = runtime.handle().clone(); // On startup, registers listeners (gossip streams) for all // (relay_parent, erasure-root, i) in the awaited frontier. - worker.register_listeners(&mut runtime_handle, &mut sender); + worker.register_listeners(runtime.handle(), &mut sender); let process_notification = async move { while let Some(msg) = receiver.next().await { @@ -525,7 +520,7 @@ where } = msg; let res = worker.on_listen_for_chunks_received( - &mut runtime_handle, + &runtime_handle, &mut sender, relay_parent, candidate_hash, @@ -545,7 +540,7 @@ where } = msg; let res = worker.on_parachain_blocks_received( - &mut runtime_handle, + &runtime_handle, &mut sender, relay_parent, blocks, @@ -589,15 +584,9 @@ where }; - runtime.spawn( - futures::future::select(process_notification.boxed(), exit.clone()) - .map(|_| Ok(())) - .compat() - ); + runtime.spawn(select(process_notification.boxed(), exit.clone()).map(drop)); - if let Err(e) = runtime.block_on(exit.unit_error().compat()) { - warn!(target: LOG_TARGET, "Availability worker error {:?}", e); - } + runtime.block_on(exit); info!(target: LOG_TARGET, "Availability worker exiting"); @@ -771,9 +760,9 @@ impl AvailabilityBlockImport { let prune_available = select( prune_unneeded_availability(client.clone(), to_worker.clone()).boxed(), exit.clone() - ).map(|_| Ok(())).compat(); + ).map(drop); - if let Err(_) = thread_pool.execute(Box::new(prune_available)) { + if let Err(_) = thread_pool.spawn(Box::new(prune_available)) { error!(target: LOG_TARGET, "Failed to spawn availability pruning task"); exit_signal = None; } @@ -806,6 +795,7 @@ mod tests { use std::time::Duration; use futures::{stream, channel::mpsc, Stream}; use std::sync::{Arc, Mutex}; + use std::pin::Pin; use tokio::runtime::Runtime; // Just contains topic->channel mapping to give to outer code on `gossip_messages_for` calls. @@ -815,11 +805,11 @@ mod tests { impl ProvideGossipMessages for TestGossipMessages { fn gossip_messages_for(&self, topic: Hash) - -> Box + Send + Unpin> + -> Pin + Send>> { match self.messages.lock().unwrap().remove(&topic) { - Some(receiver) => Box::new(receiver), - None => Box::new(stream::iter(vec![])), + Some(receiver) => receiver.boxed(), + None => stream::iter(vec![]).boxed(), } } @@ -890,7 +880,7 @@ mod tests { // chunk topics. handle.sender.unbounded_send(msg).unwrap(); - runtime.block_on(r.unit_error().boxed().compat()).unwrap().unwrap().unwrap(); + runtime.block_on(r).unwrap().unwrap(); // Make sure that at this point we are waiting for the appropriate chunk. assert_eq!( @@ -992,7 +982,7 @@ mod tests { handle.sender.unbounded_send(listen_msg_2).unwrap(); - runtime.block_on(r2.unit_error().boxed().compat()).unwrap().unwrap().unwrap(); + runtime.block_on(r2).unwrap().unwrap(); // The gossip sender for this topic left intact => listener not registered. assert!(messages.messages.lock().unwrap().contains_key(&topic_2)); @@ -1008,7 +998,7 @@ mod tests { }); handle.sender.unbounded_send(listen_msg_1).unwrap(); - runtime.block_on(r1.unit_error().boxed().compat()).unwrap().unwrap().unwrap(); + runtime.block_on(r1).unwrap().unwrap(); // The gossip sender taken => listener registered. assert!(!messages.messages.lock().unwrap().contains_key(&topic_1)); diff --git a/cli/Cargo.toml b/cli/Cargo.toml index a4014b9dc485..0d715712dd5c 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -5,6 +5,9 @@ authors = ["Parity Technologies "] description = "Polkadot node implementation in Rust." edition = "2018" +[lib] +crate-type = ["cdylib", "rlib"] + [dependencies] log = "0.4.8" tokio = "0.1.22" @@ -14,6 +17,28 @@ structopt = "0.3.4" cli = { package = "sc-cli", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } service = { package = "polkadot-service", path = "../service" } +libp2p = { version = "0.13.0", default-features = false, optional = true } +wasm-bindgen = { version = "0.2.45", optional = true } +wasm-bindgen-futures = { version = "0.3.22", optional = true } +console_log = { version = "0.1.2", optional = true } +console_error_panic_hook = { version = "0.1.1", optional = true } +js-sys = { version = "0.3.22", optional = true } +kvdb-memorydb = { version = "0.1.1", optional = true } +substrate-service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true, default-features = false } +substrate-network = { package = "sc-network", git = "https://github.com/paritytech/substrate", branch = "polkadot-master", optional = true } + [features] -default = [ "wasmtime" ] +default = [ "wasmtime", "rocksdb" ] wasmtime = [ "cli/wasmtime" ] +rocksdb = [ "service/rocksdb" ] +browser = [ + "libp2p", + "wasm-bindgen", + "console_error_panic_hook", + "wasm-bindgen-futures", + "console_log", + "js-sys", + "kvdb-memorydb", + "substrate-service", + "substrate-network" +] diff --git a/cli/browser-demo/.gitignore b/cli/browser-demo/.gitignore new file mode 100644 index 000000000000..0c6117d9fb83 --- /dev/null +++ b/cli/browser-demo/.gitignore @@ -0,0 +1 @@ +pkg \ No newline at end of file diff --git a/cli/browser-demo/README.md b/cli/browser-demo/README.md new file mode 100644 index 000000000000..2ff1cc54f5db --- /dev/null +++ b/cli/browser-demo/README.md @@ -0,0 +1,9 @@ +# How to run this demo + +```sh +cargo install wasm-pack # If necessary + +wasm-pack build --target web --out-dir ./browser-demo/pkg --no-typescript --release ./.. -- --no-default-features --features "browser" + +xdg-open index.html +``` diff --git a/cli/browser-demo/build.sh b/cli/browser-demo/build.sh new file mode 100755 index 000000000000..059ed9fe423b --- /dev/null +++ b/cli/browser-demo/build.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env sh +wasm-pack build --target web --out-dir ./browser-demo/pkg --no-typescript --release ./.. -- --no-default-features --features "browser" +python -m http.server 8000 diff --git a/cli/browser-demo/favicon.png b/cli/browser-demo/favicon.png new file mode 100644 index 000000000000..5e36153e4df3 Binary files /dev/null and b/cli/browser-demo/favicon.png differ diff --git a/cli/browser-demo/index.html b/cli/browser-demo/index.html new file mode 100644 index 000000000000..ee20166f896e --- /dev/null +++ b/cli/browser-demo/index.html @@ -0,0 +1,39 @@ + + + + + Polkadot node + + + + + diff --git a/cli/browser-demo/ws.js b/cli/browser-demo/ws.js new file mode 100644 index 000000000000..fa7a499a8a7a --- /dev/null +++ b/cli/browser-demo/ws.js @@ -0,0 +1,148 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +export default () => { + return { + dial: dial, + listen_on: (addr) => { + let err = new Error("Listening on WebSockets is not possible from within a browser"); + err.name = "NotSupportedError"; + throw err; + }, + }; +} + +/// Turns a string multiaddress into a WebSockets string URL. +// TODO: support dns addresses as well +const multiaddr_to_ws = (addr) => { + let parsed = addr.match(/^\/(ip4|ip6|dns4|dns6)\/(.*?)\/tcp\/(.*?)\/(ws|wss|x-parity-ws\/(.*)|x-parity-wss\/(.*))$/); + let proto = 'wss'; + if (parsed[4] == 'ws' || parsed[4] == 'x-parity-ws') { + proto = 'ws'; + } + let url = decodeURIComponent(parsed[5] || parsed[6] || ''); + if (parsed != null) { + if (parsed[1] == 'ip6') { + return proto + "://[" + parsed[2] + "]:" + parsed[3] + url; + } else { + return proto + "://" + parsed[2] + ":" + parsed[3] + url; + } + } + + let err = new Error("Address not supported: " + addr); + err.name = "NotSupportedError"; + throw err; +} + +// Attempt to dial a multiaddress. +const dial = (addr) => { + let ws = new WebSocket(multiaddr_to_ws(addr)); + let reader = read_queue(); + + return new Promise((resolve, reject) => { + // TODO: handle ws.onerror properly after dialing has happened + ws.onerror = (ev) => reject(ev); + ws.onmessage = (ev) => reader.inject_blob(ev.data); + ws.onclose = () => reader.inject_eof(); + ws.onopen = () => resolve({ + read: (function*() { while(ws.readyState == 1) { yield reader.next(); } })(), + write: (data) => { + if (ws.readyState == 1) { + ws.send(data); + return promise_when_ws_finished(ws); + } else { + return Promise.reject("WebSocket is closed"); + } + }, + shutdown: () => {}, + close: () => ws.close() + }); + }); +} + +// Takes a WebSocket object and returns a Promise that resolves when bufferedAmount is 0. +const promise_when_ws_finished = (ws) => { + if (ws.bufferedAmount == 0) { + return Promise.resolve(); + } + + return new Promise((resolve, reject) => { + setTimeout(function check() { + if (ws.bufferedAmount == 0) { + resolve(); + } else { + setTimeout(check, 100); + } + }, 2); + }) +} + +// Creates a queue reading system. +const read_queue = () => { + // State of the queue. + let state = { + // Array of promises resolving to `ArrayBuffer`s, that haven't been transmitted back with + // `next` yet. + queue: new Array(), + // If `resolve` isn't null, it is a "resolve" function of a promise that has already been + // returned by `next`. It should be called with some data. + resolve: null, + }; + + return { + // Inserts a new Blob in the queue. + inject_blob: (blob) => { + if (state.resolve != null) { + var resolve = state.resolve; + state.resolve = null; + + var reader = new FileReader(); + reader.addEventListener("loadend", () => resolve(reader.result)); + reader.readAsArrayBuffer(blob); + } else { + state.queue.push(new Promise((resolve, reject) => { + var reader = new FileReader(); + reader.addEventListener("loadend", () => resolve(reader.result)); + reader.readAsArrayBuffer(blob); + })); + } + }, + + // Inserts an EOF message in the queue. + inject_eof: () => { + if (state.resolve != null) { + var resolve = state.resolve; + state.resolve = null; + resolve(null); + } else { + state.queue.push(Promise.resolve(null)); + } + }, + + // Returns a Promise that yields the next entry as an ArrayBuffer. + next: () => { + if (state.queue.length != 0) { + return state.queue.shift(0); + } else { + if (state.resolve !== null) + throw "Internal error: already have a pending promise"; + return new Promise((resolve, reject) => { + state.resolve = resolve; + }); + } + } + }; +}; diff --git a/cli/src/browser.rs b/cli/src/browser.rs new file mode 100644 index 000000000000..9549fc607d80 --- /dev/null +++ b/cli/src/browser.rs @@ -0,0 +1,169 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::ChainSpec; +use futures01::{prelude::*, sync::oneshot, sync::mpsc}; +use libp2p::wasm_ext; +use log::{debug, info}; +use std::sync::Arc; +use service::{AbstractService, Roles as ServiceRoles}; +use substrate_service::{RpcSession, Configuration, config::DatabaseConfig}; +use wasm_bindgen::prelude::*; + +/// Starts the client. +/// +/// You must pass a libp2p transport that supports . +#[wasm_bindgen] +pub fn start_client(wasm_ext: wasm_ext::ffi::Transport) -> Result { + start_inner(wasm_ext) + .map_err(|err| JsValue::from_str(&err.to_string())) +} + +fn start_inner(wasm_ext: wasm_ext::ffi::Transport) -> Result> { + console_error_panic_hook::set_once(); + console_log::init_with_level(log::Level::Info); + + // Build the configuration to pass to the service. + let config = { + let wasm_ext = wasm_ext::ExtTransport::new(wasm_ext); + let chain_spec = ChainSpec::Kusama.load().map_err(|e| format!("{:?}", e))?; + let mut config = Configuration::::default_with_spec_and_base_path(chain_spec, None); + config.network.transport = substrate_network::config::TransportConfig::Normal { + wasm_external_transport: Some(wasm_ext.clone()), + allow_private_ipv4: true, + enable_mdns: false, + }; + config.telemetry_external_transport = Some(wasm_ext); + config.roles = ServiceRoles::LIGHT; + config.name = "Browser node".to_string(); + config.database = { + let db = Arc::new(kvdb_memorydb::create(10)); + DatabaseConfig::Custom(db) + }; + config.keystore_path = Some(std::path::PathBuf::from("/")); + config + }; + + info!("Polkadot browser node"); + info!(" version {}", config.full_version()); + info!(" by Parity Technologies, 2017-2019"); + info!("Chain specification: {}", config.chain_spec.name()); + if config.chain_spec.name().starts_with("Kusama") { + info!("----------------------------"); + info!("This chain is not in any way"); + info!(" endorsed by the "); + info!(" KUSAMA FOUNDATION "); + info!("----------------------------"); + } + info!("Node name: {}", config.name); + info!("Roles: {:?}", config.roles); + + // Create the service. This is the most heavy initialization step. + let mut service = service::new_light(config).map_err(|e| format!("{:?}", e))?; + + // We now dispatch a background task responsible for processing the service. + // + // The main action performed by the code below consists in polling the service with + // `service.poll()`. + // The rest consists in handling RPC requests. + let (rpc_send_tx, mut rpc_send_rx) = mpsc::unbounded::(); + wasm_bindgen_futures::spawn_local(futures01::future::poll_fn(move || { + loop { + match rpc_send_rx.poll() { + Ok(Async::Ready(Some(message))) => { + let fut = service.rpc_query(&message.session, &message.rpc_json); + let _ = message.send_back.send(Box::new(fut)); + }, + Ok(Async::NotReady) => break, + Err(_) | Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + } + } + + loop { + match service.poll().map_err(|_| ())? { + Async::Ready(()) => return Ok(Async::Ready(())), + Async::NotReady => break + } + } + + Ok(Async::NotReady) + })); + + Ok(Client { + rpc_send_tx, + }) +} + +/// A running client. +#[wasm_bindgen] +pub struct Client { + rpc_send_tx: mpsc::UnboundedSender, +} + +struct RpcMessage { + rpc_json: String, + session: RpcSession, + send_back: oneshot::Sender, Error = ()>>>, +} + +#[wasm_bindgen] +impl Client { + /// Allows starting an RPC request. Returns a `Promise` containing the result of that request. + #[wasm_bindgen(js_name = "rpcSend")] + pub fn rpc_send(&mut self, rpc: &str) -> js_sys::Promise { + let rpc_session = RpcSession::new(mpsc::channel(1).0); + let (tx, rx) = oneshot::channel(); + let _ = self.rpc_send_tx.unbounded_send(RpcMessage { + rpc_json: rpc.to_owned(), + session: rpc_session, + send_back: tx, + }); + let fut = rx + .map_err(|_| ()) + .and_then(|fut| fut) + .map(|s| JsValue::from_str(&s.unwrap_or(String::new()))) + .map_err(|_| JsValue::NULL); + wasm_bindgen_futures::future_to_promise(fut) + } + + /// Subscribes to an RPC pubsub endpoint. + #[wasm_bindgen(js_name = "rpcSubscribe")] + pub fn rpc_subscribe(&mut self, rpc: &str, callback: js_sys::Function) { + let (tx, rx) = mpsc::channel(4); + let rpc_session = RpcSession::new(tx); + let (fut_tx, fut_rx) = oneshot::channel(); + let _ = self.rpc_send_tx.unbounded_send(RpcMessage { + rpc_json: rpc.to_owned(), + session: rpc_session.clone(), + send_back: fut_tx, + }); + let fut_rx = fut_rx + .map_err(|_| ()) + .and_then(|fut| fut); + wasm_bindgen_futures::spawn_local(fut_rx.then(|_| Ok(()))); + wasm_bindgen_futures::spawn_local(rx.for_each(move |s| { + match callback.call1(&callback, &JsValue::from_str(&s)) { + Ok(_) => Ok(()), + Err(_) => Err(()), + } + }).then(move |v| { + // We need to keep `rpc_session` alive. + debug!("RPC subscription has ended"); + drop(rpc_session); + v + })); + } +} diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 780af01f2b23..30630a947cbc 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -20,26 +20,27 @@ #![warn(unused_extern_crates)] mod chain_spec; +#[cfg(feature = "browser")] +mod browser; use chain_spec::ChainSpec; -use futures::{Future, FutureExt, TryFutureExt, future::select, channel::oneshot, compat::Future01CompatExt}; +use futures::{ + Future, FutureExt, TryFutureExt, future::select, channel::oneshot, compat::Future01CompatExt, + task::Spawn +}; use tokio::runtime::Runtime; -use std::sync::Arc; use log::{info, error}; use structopt::StructOpt; pub use service::{ AbstractService, CustomConfiguration, ProvideRuntimeApi, CoreApi, ParachainHost, + WrappedExecutor }; pub use cli::{VersionInfo, IntoExit, NoCustom}; pub use cli::{display_role, error}; -type BoxedFuture = Box + Send>; -/// Abstraction over an executor that lets you spawn tasks in the background. -pub type TaskExecutor = Arc + Send + Sync>; - fn load_spec(id: &str) -> Result, String> { Ok(match ChainSpec::from(id) { Some(spec) => Some(spec.load()?), @@ -62,13 +63,14 @@ pub trait Worker: IntoExit { fn configuration(&self) -> service::CustomConfiguration { Default::default() } /// Do work and schedule exit. - fn work(self, service: &S, executor: TaskExecutor) -> Self::Work + fn work(self, service: &S, spawner: SP) -> Self::Work where S: AbstractService, SC: service::SelectChain + 'static, B: service::Backend + 'static, - CE: service::CallExecutor + Clone + Send + Sync + 'static; + CE: service::CallExecutor + Clone + Send + Sync + 'static, + SP: Spawn + Clone + Send + Sync + 'static; } #[derive(Debug, StructOpt, Clone)] @@ -147,8 +149,13 @@ pub fn run(worker: W, version: cli::VersionInfo) -> error::Result<()> where cli::ParseAndPrepare::RevertChain(cmd) => cmd.run_with_builder::<(), _, _, _, _, _>(|config| Ok(service::new_chain_ops(config)?), load_spec), cli::ParseAndPrepare::CustomCommand(PolkadotSubCommands::ValidationWorker(args)) => { - service::run_validation_worker(&args.mem_id)?; - Ok(()) + if cfg!(feature = "browser") { + Err(error::Error::Input("Cannot run validation worker in browser".into())) + } else { + #[cfg(not(feature = "browser"))] + service::run_validation_worker(&args.mem_id)?; + Ok(()) + } } } } @@ -180,7 +187,7 @@ fn run_until_exit( // but we need to keep holding a reference to the global telemetry guard let _telemetry = service.telemetry(); - let work = worker.work(&service, Arc::new(executor)); + let work = worker.work(&service, WrappedExecutor(executor)); let service = service .map_err(|err| error!("Error while running Service: {}", err)) .compat(); diff --git a/collator/Cargo.toml b/collator/Cargo.toml index 59b04059d4e6..01a27a5c73aa 100644 --- a/collator/Cargo.toml +++ b/collator/Cargo.toml @@ -6,8 +6,7 @@ description = "Collator node implementation" edition = "2018" [dependencies] -futures01 = { package = "futures", version = "0.1.17" } -futures = { version = "0.3.1", features = ["compat"] } +futures = "0.3.1" client = { package = "sc-client", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } client-api = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } primitives = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } @@ -20,7 +19,7 @@ polkadot-network = { path = "../network" } polkadot-validation = { path = "../validation" } polkadot-service = { path = "../service" } log = "0.4.8" -tokio = "0.1.22" +tokio = "0.2.1" futures-timer = "1.0" [dev-dependencies] diff --git a/collator/src/lib.rs b/collator/src/lib.rs index 324fce743a06..bfc5c346d46a 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -49,11 +49,7 @@ use std::fmt; use std::sync::Arc; use std::time::Duration; -use futures::{ - future, Future, Stream, FutureExt, TryFutureExt, StreamExt, - compat::{Future01CompatExt, Stream01CompatExt} -}; -use futures01::{Future as _}; +use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn}; use log::{warn, error}; use client::BlockchainEvents; use primitives::{Pair, Blake2Hasher}; @@ -71,7 +67,7 @@ use polkadot_network::validation::{LeafWorkParams, ValidationNetwork}; use polkadot_network::{PolkadotNetworkService, PolkadotProtocol}; use polkadot_runtime::RuntimeApi; -pub use polkadot_cli::{VersionInfo, TaskExecutor}; +pub use polkadot_cli::VersionInfo; pub use polkadot_network::validation::Incoming; pub use polkadot_validation::SignedStatement; pub use polkadot_primitives::parachain::CollatorId; @@ -83,7 +79,7 @@ const COLLATION_TIMEOUT: Duration = Duration::from_secs(30); pub trait Network: Send + Sync { /// Convert the given `CollatorId` to a `PeerId`. fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> - Box> + Unpin + Send>; + Box> + Send>; /// Create a `Stream` of checked statements for the given `relay_parent`. /// @@ -93,26 +89,19 @@ pub trait Network: Send + Sync { fn checked_statements(&self, relay_parent: Hash) -> Box>; } -impl Network for ValidationNetwork where +impl Network for ValidationNetwork where P: 'static + Send + Sync, E: 'static + Send + Sync, + SP: 'static + Spawn + Clone + Send + Sync, { fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> - Box> + Unpin + Send> + Box> + Send> { - Box::new( - Self::collator_id_to_peer_id(self, collator_id) - .compat() - .map(|res| res.ok().and_then(|id| id)) - ) + Box::new(Self::collator_id_to_peer_id(self, collator_id)) } fn checked_statements(&self, relay_parent: Hash) -> Box> { - Box::new( - Self::checked_statements(self, relay_parent) - .compat() - .filter_map(|item| future::ready(item.ok())) - ) + Box::new(Self::checked_statements(self, relay_parent)) } } @@ -147,15 +136,16 @@ pub trait BuildParachainContext { type ParachainContext: self::ParachainContext; /// Build the `ParachainContext`. - fn build( + fn build( self, client: Arc>, - task_executor: TaskExecutor, + spawner: SP, network: Arc, ) -> Result where B: client_api::backend::Backend + 'static, - E: client::CallExecutor + Clone + Send + Sync + 'static; + E: client::CallExecutor + Clone + Send + Sync + 'static, + SP: Spawn + Clone + Send + Sync + 'static; } /// Parachain context needed for collation. @@ -239,16 +229,17 @@ pub async fn collate( } /// Polkadot-api context. -struct ApiContext { - network: Arc>, +struct ApiContext { + network: Arc>, parent_hash: Hash, validators: Vec, } -impl RelayChainContext for ApiContext where +impl RelayChainContext for ApiContext where P: ProvideRuntimeApi + Send + Sync, P::Api: ParachainHost, E: futures::Future + Clone + Send + Sync + 'static, + SP: Spawn + Clone + Send + Sync { type Error = String; type FutureEgress = Box> + Unpin + Send>; @@ -262,7 +253,6 @@ impl RelayChainContext for ApiContext where parent_hash: self.parent_hash, authorities: self.validators.clone(), }) - .compat() .map_err(|e| format!("unable to instantiate validation session: {:?}", e)); Box::new(future::ok(ConsolidatedIngress(Vec::new()))) @@ -302,7 +292,7 @@ impl Worker for CollationNode where config } - fn work(self, service: &S, task_executor: TaskExecutor) -> Self::Work + fn work(self, service: &S, spawner: SP) -> Self::Work where S: AbstractService< Block = Block, @@ -314,7 +304,8 @@ impl Worker for CollationNode where >, SC: polkadot_service::SelectChain + 'static, B: client_api::backend::Backend + 'static, - CE: client::CallExecutor + Clone + Send + Sync + 'static + CE: client::CallExecutor + Clone + Send + Sync + 'static, + SP: Spawn + Clone + Send + Sync + 'static, { let CollationNode { build_parachain_context, exit, para_id, key } = self; let client = service.client(); @@ -356,12 +347,12 @@ impl Worker for CollationNode where exit.clone(), message_validator, client.clone(), - task_executor.clone(), + spawner.clone(), )); let parachain_context = match build_parachain_context.build( client.clone(), - task_executor, + spawner, validation_network.clone(), ) { Ok(ctx) => ctx, @@ -433,13 +424,13 @@ impl Worker for CollationNode where outgoing, ); - let exit = inner_exit_2.clone().unit_error().compat(); - tokio::spawn(res.select(exit).then(|_| Ok(()))); + let exit = inner_exit_2.clone(); + tokio::spawn(future::select(res, exit).map(drop)); }) }); future::Either::Right(collation_work) - }).map(|_| Ok::<_, ()>(())); + }); let deadlined = future::select( work, @@ -456,7 +447,7 @@ impl Worker for CollationNode where let future = future::select( silenced, inner_exit.clone() - ).map(|_| Ok::<_, ()>(())).compat(); + ).map(drop); tokio::spawn(future); future::ready(()) diff --git a/network/Cargo.toml b/network/Cargo.toml index 0cf3b16edbe7..0718898dab14 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -16,8 +16,7 @@ codec = { package = "parity-scale-codec", version = "1.1.0", default-features = sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } -futures = "0.1" -futures03 = { package = "futures", version = "0.3.1", features = ["compat"] } +futures = "0.3.1" log = "0.4.8" exit-future = "0.2.0" sc-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } diff --git a/network/src/collator_pool.rs b/network/src/collator_pool.rs index 01f13378e559..6e6274574b8f 100644 --- a/network/src/collator_pool.rs +++ b/network/src/collator_pool.rs @@ -20,7 +20,7 @@ use codec::{Encode, Decode}; use polkadot_primitives::Hash; use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation}; use sc_network::PeerId; -use futures::sync::oneshot; +use futures::channel::oneshot; use std::collections::hash_map::{HashMap, Entry}; use std::time::{Duration, Instant}; @@ -230,7 +230,7 @@ mod tests { use polkadot_primitives::parachain::{ CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress, }; - use futures::Future; + use futures::executor::block_on; fn make_pov(block_data: Vec) -> PoVBlock { PoVBlock { @@ -293,8 +293,8 @@ mod tests { pov: make_pov(vec![4, 5, 6]), }); - rx1.wait().unwrap(); - rx2.wait().unwrap(); + block_on(rx1).unwrap(); + block_on(rx2).unwrap(); assert_eq!(pool.collators.get(&primary).map(|ids| &ids.1).unwrap(), &peer_id); } @@ -324,7 +324,7 @@ mod tests { let (tx, rx) = oneshot::channel(); pool.await_collation(relay_parent, para_id, tx); - rx.wait().unwrap(); + block_on(rx).unwrap(); } #[test] diff --git a/network/src/lib.rs b/network/src/lib.rs index f73dae2e7c5d..312e0ed69d2a 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -26,10 +26,9 @@ pub mod validation; pub mod gossip; use codec::{Decode, Encode}; -use futures::sync::oneshot; -use futures::future::Either; +use futures::channel::{oneshot, mpsc}; use futures::prelude::*; -use futures03::{channel::mpsc, compat::{Compat, Stream01CompatExt}, FutureExt, StreamExt, TryFutureExt}; +use futures::future::Either; use polkadot_primitives::{Block, Hash, Header}; use polkadot_primitives::parachain::{ Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock, @@ -48,6 +47,8 @@ use self::local_collations::LocalCollations; use log::{trace, debug, warn}; use std::collections::{HashMap, HashSet}; +use std::pin::Pin; +use std::task::{Context as PollContext, Poll}; use crate::gossip::{POLKADOT_ENGINE_ID, GossipMessage, ErasureChunkMessage}; @@ -112,23 +113,18 @@ impl av_store::ProvideGossipMessages for AvailabilityNetworkShim where T: NetworkService { fn gossip_messages_for(&self, topic: Hash) - -> Box + Unpin + Send> + -> Pin + Send>> { - Box::new(self.0.gossip_messages_for(topic) - .compat() - .filter_map(|msg| async move { + self.0.gossip_messages_for(topic) + .filter_map(|(msg, _)| async move { match msg { - Ok(msg) => match msg.0 { - GossipMessage::ErasureChunk(chunk) => { - Some((chunk.relay_parent, chunk.candidate_hash, chunk.chunk)) - }, - _ => None, - } + GossipMessage::ErasureChunk(chunk) => { + Some((chunk.relay_parent, chunk.candidate_hash, chunk.chunk)) + }, _ => None, } }) .boxed() - ) } fn gossip_erasure_chunk( @@ -170,7 +166,7 @@ impl NetworkService for PolkadotNetworkService { Err(_) => mpsc::unbounded().1, // return empty channel. }; - GossipMessageStream::new(Box::new(Compat::new(topic_stream.map(Ok)))) + GossipMessageStream::new(topic_stream.boxed()) } fn gossip_message(&self, topic: Hash, message: GossipMessage) { @@ -213,12 +209,12 @@ impl GossipService for consensus_gossip::ConsensusGossip { /// A stream of gossip messages and an optional sender for a topic. pub struct GossipMessageStream { - topic_stream: Box + Send>, + topic_stream: Pin + Send>>, } impl GossipMessageStream { /// Create a new instance with the given topic stream. - pub fn new(topic_stream: Box + Send>) -> Self { + pub fn new(topic_stream: Pin + Send>>) -> Self { Self { topic_stream, } @@ -227,18 +223,20 @@ impl GossipMessageStream { impl Stream for GossipMessageStream { type Item = (GossipMessage, Option); - type Error = (); - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut PollContext) -> Poll> { + let this = Pin::into_inner(self); + loop { - let msg = match futures::try_ready!(self.topic_stream.poll()) { - Some(msg) => msg, - None => return Ok(Async::Ready(None)), + let msg = match Pin::new(&mut this.topic_stream).poll_next(cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, }; debug!(target: "validation", "Processing statement for live validation leaf-work"); if let Ok(gmsg) = GossipMessage::decode(&mut &msg.message[..]) { - return Ok(Async::Ready(Some((gmsg, msg.sender)))) + return Poll::Ready(Some((gmsg, msg.sender))) } } } @@ -835,7 +833,7 @@ impl PolkadotProtocol { targets: HashSet, collation: Collation, outgoing_targeted: OutgoingMessages, - ) -> impl futures::future::Future { + ) -> impl Future { debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}", relay_parent, collation.info.parachain_index); @@ -843,7 +841,7 @@ impl PolkadotProtocol { Some(ref availability_store) => { let availability_store_cloned = availability_store.clone(); let collation_cloned = collation.clone(); - Either::A((async move { + Either::Left((async move { let _ = availability_store_cloned.make_available(av_store::Data { relay_parent, parachain_id: collation_cloned.info.parachain_index, @@ -852,13 +850,10 @@ impl PolkadotProtocol { }).await; } ) - .unit_error() .boxed() - .compat() - .then(|_| Ok(())) ) } - None => Either::B(futures::future::ok::<(), ()>(())), + None => Either::Right(futures::future::ready(())), }; for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { diff --git a/network/src/router.rs b/network/src/router.rs index 79d6fffa9ef8..78922a1307ec 100644 --- a/network/src/router.rs +++ b/network/src/router.rs @@ -34,7 +34,7 @@ use polkadot_primitives::parachain::{ use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement, ErasureChunkMessage}; use futures::prelude::*; -use futures03::{future::FutureExt, TryFutureExt}; +use futures::{task::SpawnExt, future::{ready, select}}; use parking_lot::Mutex; use log::{debug, trace}; @@ -59,14 +59,14 @@ pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash { /// dropped when it is not required anymore. Otherwise, it will stick around in memory /// infinitely. pub(crate) fn checked_statements(network: &N, topic: Hash) -> - impl Stream { + impl Stream { // spin up a task in the background that processes all incoming statements // validation has been done already by the gossip validator. // this will block internally until the gossip messages stream is obtained. network.gossip_messages_for(topic) .filter_map(|msg| match msg.0 { - GossipMessage::Statement(s) => Some(s.signed_statement), - _ => None + GossipMessage::Statement(s) => ready(Some(s.signed_statement)), + _ => ready(None) }) } @@ -101,7 +101,7 @@ impl Router { /// The returned stream will not terminate, so it is required to make sure that the stream is /// dropped when it is not required anymore. Otherwise, it will stick around in memory /// infinitely. - pub(crate) fn checked_statements(&self) -> impl Stream { + pub(crate) fn checked_statements(&self) -> impl Stream { checked_statements(&**self.network(), self.attestation_topic) } @@ -130,7 +130,7 @@ impl Router w P::Api: ParachainHost, N: NetworkService, T: Clone + Executor + Send + 'static, - E: futures03::Future + Clone + Send + Unpin + 'static, + E: Future + Clone + Send + Unpin + 'static, { /// Import a statement whose signature has been checked already. pub(crate) fn import_statement(&self, statement: SignedStatement) { @@ -174,50 +174,54 @@ impl Router w if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) { trace!(target: "validation", "driving statement work to completion"); - let exit = self.fetcher.exit().clone().unit_error().compat(); - let work = work.select2(exit).then(|_| Ok(())); - self.fetcher.executor().spawn(work); + + let work = select(work.boxed(), self.fetcher.exit().clone()) + .map(drop); + let _ = self.fetcher.executor().spawn(work); } } } } fn create_work(&self, candidate_hash: Hash, producer: ParachainWork) - -> impl Future + Send + 'static + -> impl Future + Send + 'static where - D: Future + Send + 'static, + D: Future> + Send + Unpin + 'static, { let table = self.table.clone(); let network = self.network().clone(); let knowledge = self.fetcher.knowledge().clone(); let attestation_topic = self.attestation_topic; let parent_hash = self.parent_hash(); + let api = self.fetcher.api().clone(); - producer.prime(self.fetcher.api().clone()) - .validate() - .boxed() - .compat() - .map(move |validated| { - // store the data before broadcasting statements, so other peers can fetch. - knowledge.lock().note_candidate( + async move { + match producer.prime(api).validate().await { + Ok(validated) => { + // store the data before broadcasting statements, so other peers can fetch. + knowledge.lock().note_candidate( candidate_hash, Some(validated.0.pov_block().clone()), validated.0.outgoing_messages().cloned(), - ); + ); - // propagate the statement. - // consider something more targeted than gossip in the future. - let statement = GossipStatement::new( + // propagate the statement. + // consider something more targeted than gossip in the future. + let statement = GossipStatement::new( parent_hash, match table.import_validated(validated.0) { - None => return, - Some(s) => s, + None => return, + Some(s) => s, } - ); + ); - network.gossip_message(attestation_topic, statement.into()); - }) - .map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e)) + network.gossip_message(attestation_topic, statement.into()); + }, + Err(err) => { + debug!(target: "p_net", "Failed to produce statements: {:?}", err); + } + } + } } } @@ -225,7 +229,7 @@ impl TableRouter for Router wh P::Api: ParachainHost, N: NetworkService, T: Clone + Executor + Send + 'static, - E: futures03::Future + Clone + Send + Unpin + 'static, + E: Future + Clone + Send + 'static, { type Error = io::Error; type FetchValidationProof = validation::PoVReceiver; diff --git a/network/src/tests/mod.rs b/network/src/tests/mod.rs index e78bbfadf857..db15ff438d3a 100644 --- a/network/src/tests/mod.rs +++ b/network/src/tests/mod.rs @@ -33,7 +33,7 @@ use sc_network::{ specialization::NetworkSpecialization, }; -use futures::Future; +use futures::executor::block_on; mod validation; @@ -245,7 +245,7 @@ fn fetches_from_those_with_knowledge() { let pov_block = make_pov(block_data.0); on_message(&mut protocol, &mut ctx, peer_b, Message::PovBlock(2, Some(pov_block.clone()))); drop(protocol); - assert_eq!(recv.wait().unwrap(), pov_block); + assert_eq!(block_on(recv).unwrap(), pov_block); } } diff --git a/network/src/tests/validation.rs b/network/src/tests/validation.rs index 9e5bc7e9370c..fc976f9bdea7 100644 --- a/network/src/tests/validation.rs +++ b/network/src/tests/validation.rs @@ -39,22 +39,23 @@ use sp_runtime::traits::{ApiRef, {Block as BlockT}, ProvideRuntimeApi}; use std::collections::HashMap; use std::sync::Arc; -use futures::{prelude::*, sync::mpsc}; +use std::pin::Pin; +use std::task::{Poll, Context}; +use futures::{prelude::*, channel::mpsc}; use codec::Encode; use super::{TestContext, TestChainContext}; -type TaskExecutor = Arc + Send>> + Send + Sync>; +type TaskExecutor = Arc; #[derive(Clone, Copy)] struct NeverExit; impl Future for NeverExit { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll<(), ()> { - Ok(Async::NotReady) + fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll { + Poll::Pending } } @@ -93,27 +94,28 @@ impl GossipRouter { } impl Future for GossipRouter { - type Item = (); - type Error = (); + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = Pin::into_inner(self); - fn poll(&mut self) -> Poll<(), ()> { loop { - match self.incoming_messages.poll().unwrap() { - Async::Ready(Some((topic, message))) => self.add_message(topic, message), - Async::Ready(None) => panic!("ended early."), - Async::NotReady => break, + match Pin::new(&mut this.incoming_messages).poll_next(cx) { + Poll::Ready(Some((topic, message))) => this.add_message(topic, message), + Poll::Ready(None) => panic!("ended early."), + Poll::Pending => break, } } loop { - match self.incoming_streams.poll().unwrap() { - Async::Ready(Some((topic, sender))) => self.add_outgoing(topic, sender), - Async::Ready(None) => panic!("ended early."), - Async::NotReady => break, + match Pin::new(&mut this.incoming_streams).poll_next(cx) { + Poll::Ready(Some((topic, sender))) => this.add_outgoing(topic, sender), + Poll::Ready(None) => panic!("ended early."), + Poll::Pending => break, } } - Ok(Async::NotReady) + Poll::Pending } } @@ -148,7 +150,7 @@ impl NetworkService for TestNetwork { fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { let (tx, rx) = mpsc::unbounded(); let _ = self.gossip.send_listener.unbounded_send((topic, tx)); - GossipMessageStream::new(Box::new(rx)) + GossipMessageStream::new(rx.boxed()) } fn gossip_message(&self, topic: Hash, message: GossipMessage) { @@ -417,8 +419,8 @@ impl av_store::ProvideGossipMessages for DummyGossipMessages { fn gossip_messages_for( &self, _topic: Hash - ) -> Box + Send + Unpin> { - Box::new(futures03::stream::empty()) + ) -> Pin + Send>> { + stream::empty().boxed() } fn gossip_erasure_chunk( diff --git a/network/src/validation.rs b/network/src/validation.rs index 25e87559f38d..f6e652fae280 100644 --- a/network/src/validation.rs +++ b/network/src/validation.rs @@ -31,17 +31,19 @@ use polkadot_primitives::parachain::{ }; use futures::prelude::*; -use futures::future::{self, Executor as FutureExecutor}; -use futures::sync::oneshot::{self, Receiver}; -use futures03::{FutureExt as _, TryFutureExt as _}; +use futures::task::SpawnExt; +pub use futures::task::Spawn as Executor; +use futures::channel::oneshot::{self, Receiver}; +use futures::future::{ready, select}; use std::collections::hash_map::{HashMap, Entry}; use std::io; use std::sync::Arc; +use std::pin::Pin; +use std::task::{Poll, Context}; use arrayvec::ArrayVec; use parking_lot::Mutex; -use log::warn; use crate::router::Router; use crate::gossip::{RegisteredMessageValidator, MessageValidationData}; @@ -50,33 +52,6 @@ use super::NetworkService; pub use polkadot_validation::Incoming; -/// An executor suitable for dispatching async consensus tasks. -pub trait Executor { - fn spawn + Send + 'static>(&self, f: F); -} - -/// A wrapped futures::future::Executor. -#[derive(Clone)] -pub struct WrappedExecutor(pub T); - -impl Executor for WrappedExecutor - where T: FutureExecutor + Send + 'static>> -{ - fn spawn + Send + 'static>(&self, f: F) { - if let Err(e) = self.0.execute(Box::new(f)) { - warn!(target: "validation", "could not spawn consensus task: {:?}", e); - } - } -} - -impl Executor for Arc< - dyn futures::future::Executor + Send>> + Send + Sync -> { - fn spawn + Send + 'static>(&self, f: F) { - let _ = FutureExecutor::execute(&**self, Box::new(f)); - } -} - /// Params to instantiate validation work on a block-DAG leaf. pub struct LeafWorkParams { /// The local session key. @@ -124,7 +99,7 @@ impl Clone for ValidationNetwork { impl ValidationNetwork where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, - E: Clone + futures03::Future + Send + Sync + 'static, + E: Clone + Future + Send + Sync + 'static, N: NetworkService, T: Clone + Executor + Send + Sync + 'static, { @@ -184,13 +159,17 @@ impl ValidationNetwork where impl ValidationNetwork where N: NetworkService { /// Convert the given `CollatorId` to a `PeerId`. pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> - impl Future, Error=()> + Send + impl Future> + Send { - let (send, recv) = oneshot::channel(); - self.network.with_spec(move |spec, _| { - let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned()); - }); - recv.map_err(|_| ()) + let network = self.network.clone(); + + async move { + let (send, recv) = oneshot::channel(); + network.with_spec(move |spec, _| { + let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned()); + }); + recv.await.ok().and_then(|opt| opt) + } } /// Create a `Stream` of checked statements for the given `relay_parent`. @@ -198,7 +177,7 @@ impl ValidationNetwork where N: NetworkService { /// The returned stream will not terminate, so it is required to make sure that the stream is /// dropped when it is not required anymore. Otherwise, it will stick around in memory /// infinitely. - pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream { + pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream { crate::router::checked_statements(&*self.network, crate::router::attestation_topic(relay_parent)) } } @@ -207,13 +186,13 @@ impl ValidationNetwork where N: NetworkService { impl ParachainNetwork for ValidationNetwork where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, - E: Clone + futures03::Future + Send + Sync + Unpin + 'static, + E: Clone + Future + Send + Sync + Unpin + 'static, N: NetworkService, T: Clone + Executor + Send + Sync + 'static, { type Error = String; type TableRouter = Router; - type BuildTableRouter = Box + Send>; + type BuildTableRouter = Box> + Send + Unpin>; fn communication_for( &self, @@ -234,7 +213,7 @@ impl ParachainNetwork for ValidationNetwork where let executor = self.executor.clone(); let work = build_fetcher .map_err(|e| format!("{:?}", e)) - .map(move |fetcher| { + .map_ok(move |fetcher| { let table_router = Router::new( table, fetcher, @@ -243,12 +222,14 @@ impl ParachainNetwork for ValidationNetwork where let table_router_clone = table_router.clone(); let work = table_router.checked_statements() - .for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) }) - .select(exit.clone().unit_error().compat()) - .map(|_| ()) - .map_err(|_| ()); + .for_each(move |msg| { + table_router_clone.import_statement(msg); + ready(()) + }); + + let work = select(work, exit).map(drop); - executor.spawn(work); + let _ = executor.spawn(work); table_router }); @@ -263,27 +244,26 @@ pub struct NetworkDown; /// A future that resolves when a collation is received. pub struct AwaitingCollation { - outer: futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver>, - inner: Option<::futures::sync::oneshot::Receiver> + outer: oneshot::Receiver>, + inner: Option> } impl Future for AwaitingCollation { - type Item = Collation; - type Error = NetworkDown; + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = Pin::into_inner(self); - fn poll(&mut self) -> Poll { - if let Some(ref mut inner) = self.inner { - return inner - .poll() - .map_err(|_| NetworkDown) + if let Some(ref mut inner) = this.inner { + return Pin::new(inner).poll(cx).map_err(|_| NetworkDown) } - match self.outer.poll() { - Ok(futures::Async::Ready(inner)) => { - self.inner = Some(inner); - self.poll() + match Pin::new(&mut this.outer).poll(cx) { + Poll::Ready(Ok(inner)) => { + this.inner = Some(inner); + Pin::new(this).poll(cx) }, - Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady), - Err(_) => Err(NetworkDown) + Poll::Ready(Err(_)) => Poll::Ready(Err(NetworkDown)), + Poll::Pending => Poll::Pending, } } } @@ -297,7 +277,7 @@ impl Collators for ValidationNetwork where type Collation = AwaitingCollation; fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation { - let (tx, rx) = ::futures::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.network.with_spec(move |spec, _| { let collation = spec.await_collation(relay_parent, parachain); let _ = tx.send(collation); @@ -375,17 +355,16 @@ pub struct IncomingReceiver { } impl Future for IncomingReceiver { - type Item = Incoming; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - match self.inner.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(i)) => Ok(Async::Ready(Incoming::clone(&*i))), - Err(_) => Err(io::Error::new( + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match Pin::new(&mut Pin::into_inner(self).inner).poll(cx) { + Poll::Ready(Ok(i)) => Poll::Ready(Ok(Incoming::clone(&i))), + Poll::Ready(Err(_)) => Poll::Ready(Err(io::Error::new( io::ErrorKind::Other, "Sending end of channel hung up", - )), + ))), + Poll::Pending => Poll::Pending, } } } @@ -592,24 +571,25 @@ pub struct PoVReceiver { } impl Future for PoVReceiver { - type Item = PoVBlock; - type Error = io::Error; + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = Pin::into_inner(self); - fn poll(&mut self) -> Poll { let map_err = |_| io::Error::new( io::ErrorKind::Other, "Sending end of channel hung up", ); - if let Some(ref mut inner) = self.inner { - return inner.poll().map_err(map_err); + if let Some(ref mut inner) = this.inner { + return Pin::new(inner).poll(cx).map_err(map_err); } - match self.outer.poll().map_err(map_err)? { - Async::Ready(inner) => { - self.inner = Some(inner); - self.poll() + match Pin::new(&mut this.outer).poll(cx).map_err(map_err)? { + Poll::Ready(inner) => { + this.inner = Some(inner); + Pin::new(this).poll(cx) } - Async::NotReady => Ok(Async::NotReady), + Poll::Pending => Poll::Pending, } } } @@ -675,7 +655,7 @@ impl LeafWorkDataFetcher where P::Api: ParachainHost, N: NetworkService, T: Clone + Executor + Send + 'static, - E: futures03::Future + Clone + Send + 'static, + E: Future + Clone + Send + 'static, { /// Fetch PoV block for the given candidate receipt. pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver { @@ -697,7 +677,7 @@ impl LeafWorkDataFetcher where ); let candidate = candidate.clone(); - let (tx, rx) = ::futures::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.network.with_spec(move |spec, ctx| { if let Ok(Some(canon_roots)) = canon_roots { let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots); diff --git a/service/Cargo.toml b/service/Cargo.toml index 391ae7c88b5f..a26a5dd83303 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -8,9 +8,8 @@ edition = "2018" parking_lot = "0.9.0" lazy_static = "1.4.0" log = "0.4.8" -futures = "0.1.29" -futures03 = { package = "futures", version = "0.3.1", features = ["compat"] } -exit-future = "0.1.4" +futures = "0.3.1" +futures01 = { package = "futures", version = "0.1.29" } slog = "2.5.2" hex-literal = "0.2.1" av_store = { package = "polkadot-availability-store", path = "../availability-store" } @@ -45,3 +44,7 @@ authority-discovery = { package = "sc-authority-discovery", git = "https://githu authority-discovery-primitives = { package = "sp-authority-discovery", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } babe = { package = "sc-consensus-babe", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } babe-primitives = { package = "sp-consensus-babe", git = "https://github.com/paritytech/substrate", default-features = false, branch = "polkadot-master" } + +[features] +default = ["rocksdb"] +rocksdb = ["service/rocksdb"] diff --git a/service/src/lib.rs b/service/src/lib.rs index 935e87f2f57a..13c4ad11595e 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -18,7 +18,8 @@ pub mod chain_spec; -use futures::sync::mpsc; +use futures01::sync::mpsc; +use futures::{FutureExt, TryFutureExt, task::{Spawn, SpawnError, FutureObj}}; use client::LongestChain; use std::sync::Arc; use std::time::Duration; @@ -44,8 +45,21 @@ pub use primitives::Blake2Hasher; pub use sp_runtime::traits::ProvideRuntimeApi; pub use sc_network::specialization::NetworkSpecialization; pub use chain_spec::ChainSpec; +#[cfg(not(target_os = "unknown"))] pub use consensus::run_validation_worker; +/// Wrap a futures01 executor as a futures03 spawn. +#[derive(Clone)] +pub struct WrappedExecutor(pub T); + +impl Spawn for WrappedExecutor + where T: futures01::future::Executor + Send + 'static>> +{ + fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> { + self.0.execute(Box::new(future.map(Ok).compat())) + .map_err(|_| SpawnError::shutdown()) + } +} /// Polkadot-specific configuration. pub struct CustomConfiguration { /// Set to `Some` with a collator `CollatorId` and desired parachain @@ -151,11 +165,7 @@ pub fn new_full(config: Configuration) >, ServiceError> { use sc_network::DhtEvent; - use futures03::{ - compat::Stream01CompatExt, - stream::StreamExt, - future::{FutureExt, TryFutureExt}, - }; + use futures::{compat::Stream01CompatExt, stream::StreamExt}; let is_collator = config.custom.collating_for.is_some(); let is_authority = config.roles.is_authority() && !is_collator; @@ -237,12 +247,18 @@ pub fn new_full(config: Configuration) let mut path = PathBuf::from(db_path); path.push("availability"); - av_store::Store::new(::av_store::Config { - cache_size: None, - path, - }, - polkadot_network::AvailabilityNetworkShim(service.network()), - )? + let gossip = polkadot_network::AvailabilityNetworkShim(service.network()); + + #[cfg(not(target_os = "unknown"))] + { + av_store::Store::new(::av_store::Config { + cache_size: None, + path, + }, gossip)? + } + + #[cfg(target_os = "unknown")] + av_store::Store::new_in_memory(gossip) }; { @@ -263,7 +279,7 @@ pub fn new_full(config: Configuration) service.on_exit(), gossip_validator, service.client(), - polkadot_network::validation::WrappedExecutor(service.spawn_task_handle()), + WrappedExecutor(service.spawn_task_handle()), ); let proposer = consensus::ProposerFactory::new( client.clone(), @@ -271,7 +287,7 @@ pub fn new_full(config: Configuration) validation_network.clone(), validation_network, service.transaction_pool(), - Arc::new(service.spawn_task_handle()), + Arc::new(WrappedExecutor(service.spawn_task_handle())), service.keystore(), availability_store.clone(), polkadot_runtime::constants::time::SLOT_DURATION, @@ -287,7 +303,7 @@ pub fn new_full(config: Configuration) let block_import = availability_store.block_import( block_import, client.clone(), - Arc::new(service.spawn_task_handle()), + Arc::new(WrappedExecutor(service.spawn_task_handle())), service.keystore(), )?; diff --git a/src/main.rs b/src/main.rs index 842f70cbe42a..662283d9cc24 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,9 +18,8 @@ #![warn(missing_docs)] -use cli::{AbstractService, VersionInfo, TaskExecutor}; -use futures::channel::oneshot; -use futures::{future, FutureExt}; +use cli::{AbstractService, VersionInfo}; +use futures::{channel::oneshot, future, FutureExt, task::Spawn}; use std::cell::RefCell; @@ -33,6 +32,7 @@ impl cli::IntoExit for Worker { let (exit_send, exit) = oneshot::channel(); let exit_send_cell = RefCell::new(Some(exit_send)); + #[cfg(not(target_os = "unknown"))] ctrlc::set_handler(move || { if let Some(exit_send) = exit_send_cell.try_borrow_mut().expect("signal handler not reentrant; qed").take() { exit_send.send(()).expect("Error sending exit notification"); @@ -45,13 +45,14 @@ impl cli::IntoExit for Worker { impl cli::Worker for Worker { type Work = ::Exit; - fn work(self, _: &S, _: TaskExecutor) -> Self::Work + fn work(self, _: &S, _: SP) -> Self::Work where S: AbstractService, SC: service::SelectChain + 'static, B: service::Backend + 'static, - CE: service::CallExecutor + Clone + Send + Sync + 'static { + CE: service::CallExecutor + Clone + Send + Sync + 'static, + SP: Spawn + Clone + Send + Sync + 'static { use cli::IntoExit; self.into_exit() } diff --git a/test-parachains/adder/collator/src/main.rs b/test-parachains/adder/collator/src/main.rs index fe3e9d74731c..6c4823b5eb5f 100644 --- a/test-parachains/adder/collator/src/main.rs +++ b/test-parachains/adder/collator/src/main.rs @@ -29,11 +29,9 @@ use primitives::{ HeadData, BlockData, Id as ParaId, Message, OutgoingMessages, Status as ParachainStatus, }, }; -use collator::{ - InvalidHead, ParachainContext, VersionInfo, Network, BuildParachainContext, TaskExecutor, -}; +use collator::{InvalidHead, ParachainContext, VersionInfo, Network, BuildParachainContext}; use parking_lot::Mutex; -use futures::future::{Ready, ok, err}; +use futures::{future::{Ready, ok, err}, task::Spawn}; const GENESIS: AdderHead = AdderHead { number: 0, @@ -108,15 +106,16 @@ impl ParachainContext for AdderContext { impl BuildParachainContext for AdderContext { type ParachainContext = Self; - fn build( + fn build( self, _: Arc>, - _: TaskExecutor, + _: SP, network: Arc, ) -> Result where B: client_api::backend::Backend + 'static, - E: client::CallExecutor + Clone + Send + Sync + 'static + E: client::CallExecutor + Clone + Send + Sync + 'static, + SP: Spawn + Clone + Send + Sync + 'static, { Ok(Self { _network: Some(network), ..self }) } diff --git a/validation/Cargo.toml b/validation/Cargo.toml index 560f46b143f7..de4417fa757c 100644 --- a/validation/Cargo.toml +++ b/validation/Cargo.toml @@ -5,16 +5,13 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] -futures = "0.1.17" -futures03 = { package = "futures", version = "0.3.1", features = ["compat"] } +futures = "0.3.1" futures-timer = "2.0" -async-std = { version = "1.0.1", features = ["unstable"] } parking_lot = "0.9.0" -tokio = "0.1.22" +tokio = { version = "0.2.4", features = ["rt-core", "blocking"] } derive_more = "0.14.1" log = "0.4.8" exit-future = "0.2.0" -tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] } codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] } availability_store = { package = "polkadot-availability-store", path = "../availability-store" } parachain = { package = "polkadot-parachain", path = "../parachain" } diff --git a/validation/src/attestation_service.rs b/validation/src/attestation_service.rs index 4875cb740643..6eb864726aa4 100644 --- a/validation/src/attestation_service.rs +++ b/validation/src/attestation_service.rs @@ -23,15 +23,14 @@ /// such as candidate verification while performing event-driven work /// on a local event loop. -use std::{thread, time::{Duration, Instant}, sync::Arc}; +use std::{thread, time::Duration, sync::Arc}; use client::{BlockchainEvents, BlockBody}; use sp_blockchain::HeaderBackend; use block_builder::BlockBuilderApi; use consensus::SelectChain; use futures::prelude::*; -use futures03::{TryStreamExt as _, StreamExt as _, FutureExt as _, TryFutureExt as _}; -use log::error; +use futures::{future::{ready, select}, task::{Spawn, SpawnExt}}; use polkadot_primitives::Block; use polkadot_primitives::parachain::ParachainHost; use runtime_primitives::traits::{ProvideRuntimeApi}; @@ -39,12 +38,12 @@ use babe_primitives::BabeApi; use keystore::KeyStorePtr; use sp_api::ApiExt; -use tokio::{timer::Interval, runtime::current_thread::Runtime as LocalRuntime}; -use log::{warn, debug}; +use tokio::{runtime::Runtime as LocalRuntime}; +use log::{warn, error}; use super::{Network, Collators}; -type TaskExecutor = Arc + Send>> + Send + Sync>; +type TaskExecutor = Arc; /// Parachain candidate attestation service handle. pub(crate) struct ServiceHandle { @@ -62,8 +61,8 @@ pub(crate) fn start( max_block_data_size: Option, ) -> ServiceHandle where - C: Collators + Send + Sync + 'static, - ::Future: Send + 'static, + C: Collators + Send + Sync + Unpin + 'static, + C::Collation: Send + Unpin + 'static, P: BlockchainEvents + BlockBody, P: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, P::Api: ParachainHost + @@ -72,10 +71,9 @@ pub(crate) fn start( ApiExt, N: Network + Send + Sync + 'static, N::TableRouter: Send + 'static, - ::Future: Send + 'static, + N::BuildTableRouter: Send + Unpin + 'static, SC: SelectChain + 'static, { - const TIMER_DELAY: Duration = Duration::from_secs(5); const TIMER_INTERVAL: Duration = Duration::from_secs(30); let (signal, exit) = ::exit_future::signal(); @@ -87,8 +85,7 @@ pub(crate) fn start( let keystore = keystore.clone(); - client.import_notification_stream() - .map(|v| Ok::<_, ()>(v)).compat() + let notifications = client.import_notification_stream() .for_each(move |notification| { let parent_hash = notification.hash; if notification.is_new_best { @@ -105,43 +102,35 @@ pub(crate) fn start( ); } } - Ok(()) - }) - .select(exit.clone().unit_error().compat()) - .then(|_| Ok(())) + ready(()) + }); + + select(notifications, exit.clone()) }; let prune_old_sessions = { let select_chain = select_chain.clone(); - let interval = Interval::new( - Instant::now() + TIMER_DELAY, - TIMER_INTERVAL, - ); - - interval + let interval = crate::interval(TIMER_INTERVAL) .for_each(move |_| match select_chain.leaves() { Ok(leaves) => { parachain_validation.retain(|h| leaves.contains(h)); - Ok(()) + ready(()) } Err(e) => { warn!("Error fetching leaves from client: {:?}", e); - Ok(()) + ready(()) } - }) - .map_err(|e| warn!("Timer error {:?}", e)) - .select(exit.clone().unit_error().compat()) - .then(|_| Ok(())) + }); + + select(interval, exit.clone()).map(|_| ()) }; runtime.spawn(notifications); - if let Err(_) = thread_pool.execute(Box::new(prune_old_sessions)) { + if let Err(_) = thread_pool.spawn(prune_old_sessions) { error!("Failed to spawn old sessions pruning task"); } - if let Err(e) = runtime.block_on(exit.unit_error().compat()) { - debug!("BFT event loop error {:?}", e); - } + runtime.block_on(exit); }); ServiceHandle { diff --git a/validation/src/collation.rs b/validation/src/collation.rs index e44e140c5c56..cce4d50ff9cb 100644 --- a/validation/src/collation.rs +++ b/validation/src/collation.rs @@ -32,6 +32,8 @@ use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, Messag use trie::TrieConfiguration; use futures::prelude::*; use log::debug; +use std::task::{Poll, Context}; +use std::pin::Pin; /// Encapsulates connections to collators and allows collation on any parachain. /// @@ -40,7 +42,7 @@ pub trait Collators: Clone { /// Errors when producing collations. type Error: std::fmt::Debug; /// A full collation. - type Collation: IntoFuture; + type Collation: Future>; /// Collate on a specific parachain, building on a given relay chain parent hash. /// @@ -63,7 +65,7 @@ pub struct CollationFetch { relay_parent_hash: Hash, relay_parent: BlockId, collators: C, - live_fetch: Option<::Future>, + live_fetch: Option, client: Arc

, max_block_data_size: Option, } @@ -99,41 +101,50 @@ impl CollationFetch { } } -impl Future for CollationFetch - where P::Api: ParachainHost, +impl Future for CollationFetch + where + P::Api: ParachainHost, + C: Collators + Unpin, + P: ProvideRuntimeApi, + ::Collation: Unpin, { - type Item = (Collation, OutgoingMessages, Balance); - type Error = C::Error; + type Output = Result<(Collation, OutgoingMessages, Balance),C::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = Pin::into_inner(self); - fn poll(&mut self) -> Poll<(Collation, OutgoingMessages, Balance), C::Error> { loop { let collation = { - let parachain = self.parachain.clone(); - let (r, c) = (self.relay_parent_hash, &self.collators); - let poll = self.live_fetch - .get_or_insert_with(move || c.collate(parachain, r).into_future()) - .poll(); + let parachain = this.parachain.clone(); + let (r, c) = (this.relay_parent_hash, &this.collators); + + let future = this.live_fetch + .get_or_insert_with(move || c.collate(parachain, r)); - futures::try_ready!(poll) + match Pin::new(future).poll(cx) { + Poll::Ready(Ok(c)) => c, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending + } }; let res = validate_collation( - &*self.client, - &self.relay_parent, + &*this.client, + &this.relay_parent, &collation, - self.max_block_data_size, + this.max_block_data_size, ); match res { Ok((messages, fees)) => { - return Ok(Async::Ready((collation, messages, fees))) + return Poll::Ready(Ok((collation, messages, fees))) } Err(e) => { debug!("Failed to validate parachain due to API error: {}", e); // just continue if we got a bad collation or failed to validate - self.live_fetch = None; - self.collators.note_bad_collator(collation.info.collator) + this.live_fetch = None; + this.collators.note_bad_collator(collation.info.collator) } } } diff --git a/validation/src/error.rs b/validation/src/error.rs index 8160a0b64f56..ea29e1dff740 100644 --- a/validation/src/error.rs +++ b/validation/src/error.rs @@ -46,9 +46,7 @@ pub enum Error { Timer(std::io::Error), #[display(fmt = "Failed to compute deadline of now + {:?}", _0)] DeadlineComputeFailure(std::time::Duration), - /// Unable to dispatch agreement future - #[display(fmt = "Unable to dispatch agreement future: {:?}", _0)] - Executor(futures::future::ExecuteErrorKind), + Join(tokio::task::JoinError) } impl std::error::Error for Error { diff --git a/validation/src/lib.rs b/validation/src/lib.rs index dc677027836c..23cf1de6b2ae 100644 --- a/validation/src/lib.rs +++ b/validation/src/lib.rs @@ -56,12 +56,11 @@ use polkadot_primitives::parachain::{ use primitives::Pair; use runtime_primitives::traits::{ProvideRuntimeApi, DigestFor}; use futures_timer::Delay; -use async_std::stream::{interval, Interval}; use txpool_api::{TransactionPool, InPoolTransaction}; use attestation_service::ServiceHandle; use futures::prelude::*; -use futures03::{future::{self, Either}, FutureExt, StreamExt, TryFutureExt}; +use futures::{future::{self, Either, select, ready}, stream::unfold, task::{Spawn, SpawnExt}}; use collation::CollationFetch; use dynamic_inclusion::DynamicInclusion; use inherents::InherentData; @@ -70,10 +69,13 @@ use log::{info, debug, warn, trace, error}; use keystore::KeyStorePtr; use sp_api::ApiExt; -type TaskExecutor = - Arc< - dyn futures::future::Executor + Send>> - + Send + Sync>; +type TaskExecutor = Arc; + +fn interval(duration: Duration) -> impl Stream + Send + Unpin { + unfold((), move |_| { + futures_timer::Delay::new(duration).map(|_| Some(((), ()))) + }).map(drop) +} pub use self::collation::{ validate_collation, validate_incoming, message_queue_root, egress_roots, Collators, @@ -84,6 +86,8 @@ pub use self::shared_table::{ SharedTable, ParachainWork, PrimedParachainWork, Validated, Statement, SignedStatement, GenericStatement, }; + +#[cfg(not(target_os = "unknown"))] pub use parachain::wasm_executor::{run_worker as run_validation_worker}; mod attestation_service; @@ -107,7 +111,7 @@ pub trait TableRouter: Clone { /// Errors when fetching data from the network. type Error: std::fmt::Debug; /// Future that resolves when candidate data is fetched. - type FetchValidationProof: IntoFuture; + type FetchValidationProof: Future>; /// Call with local candidate data. This will make the data available on the network, /// and sign, import, and broadcast a statement about the candidate. @@ -134,7 +138,7 @@ pub trait Network { /// The future used for asynchronously building the table router. /// This should not fail. - type BuildTableRouter: IntoFuture; + type BuildTableRouter: Future>; /// Instantiate a table router using the given shared table. /// Also pass through any outgoing messages to be broadcast to peers. @@ -273,13 +277,13 @@ struct ParachainValidation { } impl ParachainValidation where - C: Collators + Send + 'static, + C: Collators + Send + Unpin + 'static, N: Network, P: ProvideRuntimeApi + HeaderBackend + BlockBody + Send + Sync + 'static, P::Api: ParachainHost + BlockBuilderApi + ApiExt, - ::Future: Send + 'static, + C::Collation: Send + Unpin + 'static, N::TableRouter: Send + 'static, - ::Future: Send + 'static, + N::BuildTableRouter: Unpin + Send + 'static, { /// Get an attestation table for given parent hash. /// @@ -400,7 +404,7 @@ impl ParachainValidation where max_block_data_size, ); - collation_work.then(move |result| match result { + collation_work.map(move |result| match result { Ok((collation, outgoing_targeted, fees_charged)) => { match produce_receipt_and_chunks( authorities_num, @@ -427,10 +431,9 @@ impl ParachainValidation where } .unit_error() .boxed() - .compat() .then(move |_| { router.local_collation(collation, receipt, outgoing_targeted, (local_id, &chunks)); - Ok(()) + ready(()) }); @@ -449,18 +452,16 @@ impl ParachainValidation where }) }; - let cancellable_work = build_router - .into_future() + let router = build_router + .map_ok(with_router) .map_err(|e| { warn!(target: "validation" , "Failed to build table router: {:?}", e); - }) - .and_then(with_router) - .then(|_| Ok(())) - .select(exit.unit_error().compat()) - .then(|_| Ok(())); + }); + + let cancellable_work = select(exit, router).map(drop); // spawn onto thread pool. - if self.handle.execute(Box::new(cancellable_work)).is_err() { + if self.handle.spawn(cancellable_work).is_err() { error!("Failed to spawn cancellable work task"); } } @@ -485,8 +486,8 @@ pub struct ProposerFactory { } impl ProposerFactory where - C: Collators + Send + Sync + 'static, - ::Future: Send + 'static, + C: Collators + Send + Sync + Unpin + 'static, + C::Collation: Send + Unpin + 'static, P: BlockchainEvents + BlockBody, P: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, P::Api: ParachainHost + @@ -495,7 +496,7 @@ impl ProposerFactory where ApiExt, N: Network + Send + Sync + 'static, N::TableRouter: Send + 'static, - ::Future: Send + 'static, + N::BuildTableRouter: Send + Unpin + 'static, TxPool: TransactionPool, SC: SelectChain + 'static, { @@ -543,7 +544,7 @@ impl ProposerFactory where } impl consensus::Environment for ProposerFactory where - C: Collators + Send + 'static, + C: Collators + Send + Unpin + 'static, N: Network, TxPool: TransactionPool + 'static, P: ProvideRuntimeApi + HeaderBackend + BlockBody + Send + Sync + 'static, @@ -551,9 +552,9 @@ impl consensus::Environment for ProposerFactory + BabeApi + ApiExt, - ::Future: Send + 'static, + C::Collation: Send + Unpin + 'static, N::TableRouter: Send + 'static, - ::Future: Send + 'static, + N::BuildTableRouter: Send + Unpin + 'static, SC: SelectChain, { type Proposer = Proposer; @@ -649,7 +650,7 @@ impl consensus::Proposer for Proposer where let timing = ProposalTiming { minimum: delay_future, - attempt_propose: interval(ATTEMPT_PROPOSE_EVERY), + attempt_propose: Box::new(interval(ATTEMPT_PROPOSE_EVERY)), enough_candidates: Delay::new(enough_candidates), dynamic_inclusion, last_included: initial_included, @@ -690,7 +691,7 @@ fn current_timestamp() -> u64 { struct ProposalTiming { minimum: Option, - attempt_propose: Interval, + attempt_propose: Box + Send + Unpin>, dynamic_inclusion: DynamicInclusion, enough_candidates: Delay, last_included: usize, @@ -746,7 +747,7 @@ enum CreateProposalState { /// Represents the state when we switch from pending to fired. Switching, /// Block proposing has fired. - Fired(tokio_executor::blocking::Blocking>), + Fired(tokio::task::JoinHandle>), } /// Inner data of the create proposal. @@ -858,7 +859,7 @@ impl CreateProposalData where } } -impl futures03::Future for CreateProposal where +impl Future for CreateProposal where TxPool: TransactionPool + 'static, C: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, C::Api: ParachainHost + BlockBuilderApi + ApiExt, @@ -892,18 +893,22 @@ impl futures03::Future for CreateProposal where thus Switching will never be reachable here; qed" ), CreateProposalState::Fired(mut future) => { - let ret = Pin::new(&mut future).poll(cx); + let ret = Pin::new(&mut future) + .poll(cx) + .map(|res| res.map_err(Error::Join).and_then(|res| res)); self.state = CreateProposalState::Fired(future); return ret }, }; // 2. propose - let mut future = tokio_executor::blocking::run(move || { + let mut future = tokio::task::spawn_blocking(move || { let proposed_candidates = data.table.proposed_set(); data.propose_with(proposed_candidates) }); - let polled = Pin::new(&mut future).poll(cx); + let polled = Pin::new(&mut future) + .poll(cx) + .map(|res| res.map_err(Error::Join).and_then(|res| res)); self.state = CreateProposalState::Fired(future); polled diff --git a/validation/src/shared_table/includable.rs b/validation/src/shared_table/includable.rs index 873c3af94c40..1b74abcf4329 100644 --- a/validation/src/shared_table/includable.rs +++ b/validation/src/shared_table/includable.rs @@ -19,7 +19,9 @@ use std::collections::HashMap; use futures::prelude::*; -use futures::sync::oneshot; +use futures::channel::oneshot; +use std::pin::Pin; +use std::task::{Poll, Context}; use polkadot_primitives::Hash; @@ -95,17 +97,17 @@ impl IncludabilitySender { pub struct Includable(oneshot::Receiver<()>); impl Future for Includable { - type Item = (); - type Error = oneshot::Canceled; + type Output = Result<(), oneshot::Canceled>; - fn poll(&mut self) -> Poll<(), oneshot::Canceled> { - self.0.poll() + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + Pin::new(&mut Pin::into_inner(self).0).poll(cx) } } #[cfg(test)] mod tests { use super::*; + use futures::executor::block_on; #[test] fn it_works() { @@ -132,6 +134,6 @@ mod tests { sender.update_candidate(hash1, true); assert!(sender.is_complete()); - recv.wait().unwrap(); + block_on(recv).unwrap(); } } diff --git a/validation/src/shared_table/mod.rs b/validation/src/shared_table/mod.rs index 62db1f89e49e..e0ce64201b1e 100644 --- a/validation/src/shared_table/mod.rs +++ b/validation/src/shared_table/mod.rs @@ -139,7 +139,7 @@ impl SharedTableInner { statement: table::SignedStatement, max_block_data_size: Option, ) -> Option::Future, + R::FetchValidationProof >> { let summary = self.table.import_statement(context, statement)?; self.update_trackers(&summary.candidate, context); @@ -172,7 +172,7 @@ impl SharedTableInner { None } Some(candidate) => { - let fetch = router.fetch_pov_block(candidate).into_future(); + let fetch = router.fetch_pov_block(candidate); Some(Work { candidate_receipt: candidate.clone(), @@ -267,13 +267,13 @@ pub struct ParachainWork { max_block_data_size: Option, } -impl ParachainWork { +impl ParachainWork { /// Prime the parachain work with an API reference for extracting /// chain information. pub fn prime(self, api: Arc

) -> PrimedParachainWork< Fetch, - impl Send + FnMut(&BlockId, &PoVBlock, &CandidateReceipt) -> Result<(OutgoingMessages, ErasureChunk), ()>, + impl Send + FnMut(&BlockId, &PoVBlock, &CandidateReceipt) -> Result<(OutgoingMessages, ErasureChunk), ()> + Unpin, > where P: Send + Sync + 'static, @@ -326,14 +326,13 @@ pub struct PrimedParachainWork { impl PrimedParachainWork where - Fetch: Future, - F: FnMut(&BlockId, &PoVBlock, &CandidateReceipt) -> Result<(OutgoingMessages, ErasureChunk), ()>, + Fetch: Future> + Unpin, + F: FnMut(&BlockId, &PoVBlock, &CandidateReceipt) -> Result<(OutgoingMessages, ErasureChunk), ()> + Unpin, Err: From<::std::io::Error>, { pub async fn validate(mut self) -> Result<(Validated, Option), Err> { - use futures03::compat::Future01CompatExt; let candidate = &self.inner.work.candidate_receipt; - let pov_block = self.inner.work.fetch.compat().await?; + let pov_block = self.inner.work.fetch.await?; let validation_res = (self.validate)( &BlockId::hash(self.inner.relay_parent), @@ -439,7 +438,7 @@ impl SharedTable { router: &R, statement: table::SignedStatement, ) -> Option::Future, + R::FetchValidationProof, >> { self.inner.lock().import_remote_statement(&*self.context, router, statement, self.max_block_data_size) } @@ -455,7 +454,7 @@ impl SharedTable { R: TableRouter, I: IntoIterator, U: ::std::iter::FromIterator::Future, + R::FetchValidationProof, >>>, { let mut inner = self.inner.lock(); @@ -575,8 +574,9 @@ mod tests { use polkadot_primitives::parachain::{AvailableMessages, BlockData, ConsolidatedIngress, Collation}; use polkadot_erasure_coding::{self as erasure}; use availability_store::ProvideGossipMessages; - - use futures::{future}; + use futures::future; + use futures::executor::block_on; + use std::pin::Pin; fn pov_block_with_data(data: Vec) -> PoVBlock { PoVBlock { @@ -592,8 +592,8 @@ mod tests { fn gossip_messages_for( &self, _topic: Hash - ) -> Box + Unpin + Send> { - Box::new(futures03::stream::empty()) + ) -> Pin + Send>> { + futures::stream::empty().boxed() } fn gossip_erasure_chunk( @@ -609,7 +609,7 @@ mod tests { struct DummyRouter; impl TableRouter for DummyRouter { type Error = ::std::io::Error; - type FetchValidationProof = future::FutureResult; + type FetchValidationProof = future::Ready>; fn local_collation( &self, @@ -766,7 +766,7 @@ mod tests { n_validators as u32, ).unwrap(); - let producer: ParachainWork> = ParachainWork { + let producer: ParachainWork>> = ParachainWork { work: Work { candidate_receipt: candidate, fetch: future::ok(pov_block.clone()), @@ -777,7 +777,7 @@ mod tests { max_block_data_size: None, }; - let validated = futures03::executor::block_on(producer.prime_with(|_, _, _| Ok(( + let validated = block_on(producer.prime_with(|_, _, _| Ok(( OutgoingMessages { outgoing_messages: Vec::new() }, ErasureChunk { chunk: vec![1, 2, 3], @@ -841,7 +841,7 @@ mod tests { max_block_data_size: None, }; - let validated = futures03::executor::block_on(producer.prime_with(|_, _, _| Ok(( + let validated = block_on(producer.prime_with(|_, _, _| Ok(( OutgoingMessages { outgoing_messages: Vec::new() }, ErasureChunk { chunk: chunks[local_index].clone(),