diff --git a/collator/src/lib.rs b/collator/src/lib.rs index bfc5c346d46a..20a17a216691 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -48,6 +48,7 @@ use std::collections::HashSet; use std::fmt; use std::sync::Arc; use std::time::Duration; +use std::pin::Pin; use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn}; use log::{warn, error}; @@ -242,20 +243,26 @@ impl RelayChainContext for ApiContext> + Unpin + Send>; + type FutureEgress = Pin> + Send>>; fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress { - // TODO: https://github.com/paritytech/polkadot/issues/253 - // - // Fetch ingress and accumulate all unrounted egress - let _session = self.network.instantiate_leaf_work(LeafWorkParams { - local_session_key: None, - parent_hash: self.parent_hash, - authorities: self.validators.clone(), - }) - .map_err(|e| format!("unable to instantiate validation session: {:?}", e)); - - Box::new(future::ok(ConsolidatedIngress(Vec::new()))) + let network = self.network.clone(); + let parent_hash = self.parent_hash; + let authorities = self.validators.clone(); + + async move { + // TODO: https://github.com/paritytech/polkadot/issues/253 + // + // Fetch ingress and accumulate all unrounted egress + let _session = network.instantiate_leaf_work(LeafWorkParams { + local_session_key: None, + parent_hash, + authorities, + }) + .map_err(|e| format!("unable to instantiate validation session: {:?}", e)); + + Ok(ConsolidatedIngress(Vec::new())) + }.boxed() } } @@ -425,7 +432,7 @@ impl Worker for CollationNode where ); let exit = inner_exit_2.clone(); - tokio::spawn(future::select(res, exit).map(drop)); + tokio::spawn(future::select(res.boxed(), exit).map(drop)); }) }); diff --git a/network/src/lib.rs b/network/src/lib.rs index 312e0ed69d2a..65f7315b3e99 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -28,7 +28,6 @@ pub mod gossip; use codec::{Decode, Encode}; use futures::channel::{oneshot, mpsc}; use futures::prelude::*; -use futures::future::Either; use polkadot_primitives::{Block, Hash, Header}; use polkadot_primitives::parachain::{ Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock, @@ -837,25 +836,6 @@ impl PolkadotProtocol { debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}", relay_parent, collation.info.parachain_index); - let res = match self.availability_store { - Some(ref availability_store) => { - let availability_store_cloned = availability_store.clone(); - let collation_cloned = collation.clone(); - Either::Left((async move { - let _ = availability_store_cloned.make_available(av_store::Data { - relay_parent, - parachain_id: collation_cloned.info.parachain_index, - block_data: collation_cloned.pov.block_data.clone(), - outgoing_queues: Some(outgoing_targeted.clone().into()), - }).await; - } - ) - .boxed() - ) - } - None => Either::Right(futures::future::ready(())), - }; - for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { match self.validators.get(&primary) { Some(who) => { @@ -871,7 +851,19 @@ impl PolkadotProtocol { } } - res + let availability_store = self.availability_store.clone(); + let collation_cloned = collation.clone(); + + async move { + if let Some(availability_store) = availability_store { + let _ = availability_store.make_available(av_store::Data { + relay_parent, + parachain_id: collation_cloned.info.parachain_index, + block_data: collation_cloned.pov.block_data.clone(), + outgoing_queues: Some(outgoing_targeted.clone().into()), + }).await; + } + } } /// Give the network protocol a handle to an availability store, used for diff --git a/network/src/router.rs b/network/src/router.rs index 78922a1307ec..ae1667bab7fa 100644 --- a/network/src/router.rs +++ b/network/src/router.rs @@ -41,8 +41,9 @@ use log::{debug, trace}; use std::collections::{HashMap, HashSet}; use std::io; use std::sync::Arc; +use std::pin::Pin; -use crate::validation::{self, LeafWorkDataFetcher, Executor}; +use crate::validation::{LeafWorkDataFetcher, Executor}; use crate::NetworkService; /// Compute the gossip topic for attestations on the given parent hash. @@ -232,7 +233,7 @@ impl TableRouter for Router wh E: Future + Clone + Send + 'static, { type Error = io::Error; - type FetchValidationProof = validation::PoVReceiver; + type FetchValidationProof = Pin> + Send>>; // We have fetched from a collator and here the receipt should have been already formed. fn local_collation( diff --git a/network/src/tests/validation.rs b/network/src/tests/validation.rs index fc976f9bdea7..463ab50d599e 100644 --- a/network/src/tests/validation.rs +++ b/network/src/tests/validation.rs @@ -41,7 +41,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::pin::Pin; use std::task::{Poll, Context}; -use futures::{prelude::*, channel::mpsc}; +use futures::{prelude::*, channel::mpsc, future::{select, Either}}; use codec::Encode; use super::{TestContext, TestChainContext}; @@ -66,77 +66,48 @@ fn clone_gossip(n: &TopicNotification) -> TopicNotification { } } -struct GossipRouter { - incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>, - incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender)>, - outgoing: Vec<(Hash, mpsc::UnboundedSender)>, - messages: Vec<(Hash, TopicNotification)>, -} - -impl GossipRouter { - fn add_message(&mut self, topic: Hash, message: TopicNotification) { - self.outgoing.retain(|&(ref o_topic, ref sender)| { - o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok() - }); - self.messages.push((topic, message)); - } - - fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender) { - for message in self.messages.iter() - .filter(|&&(ref t, _)| t == &topic) - .map(|&(_, ref msg)| clone_gossip(msg)) - { - if let Err(_) = sender.unbounded_send(message) { return } - } - - self.outgoing.push((topic, sender)); - } -} - -impl Future for GossipRouter { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); - - loop { - match Pin::new(&mut this.incoming_messages).poll_next(cx) { - Poll::Ready(Some((topic, message))) => this.add_message(topic, message), - Poll::Ready(None) => panic!("ended early."), - Poll::Pending => break, - } - } - - loop { - match Pin::new(&mut this.incoming_streams).poll_next(cx) { - Poll::Ready(Some((topic, sender))) => this.add_outgoing(topic, sender), - Poll::Ready(None) => panic!("ended early."), - Poll::Pending => break, - } +async fn gossip_router( + mut incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>, + mut incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender)> +) { + let mut outgoing: Vec<(Hash, mpsc::UnboundedSender)> = Vec::new(); + let mut messages = Vec::new(); + + loop { + match select(incoming_messages.next(), incoming_streams.next()).await { + Either::Left((Some((topic, message)), _)) => { + outgoing.retain(|&(ref o_topic, ref sender)| { + o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok() + }); + messages.push((topic, message)); + }, + Either::Right((Some((topic, sender)), _)) => { + for message in messages.iter() + .filter(|&&(ref t, _)| t == &topic) + .map(|&(_, ref msg)| clone_gossip(msg)) + { + if let Err(_) = sender.unbounded_send(message) { return } + } + + outgoing.push((topic, sender)); + }, + Either::Left((None, _)) | Either::Right((None, _)) => panic!("ended early.") } - - Poll::Pending } } - #[derive(Clone)] struct GossipHandle { send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>, send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender)>, } -fn make_gossip() -> (GossipRouter, GossipHandle) { +fn make_gossip() -> (impl Future, GossipHandle) { let (message_tx, message_rx) = mpsc::unbounded(); let (listener_tx, listener_rx) = mpsc::unbounded(); ( - GossipRouter { - incoming_messages: message_rx, - incoming_streams: listener_rx, - outgoing: Vec::new(), - messages: Vec::new(), - }, + gossip_router(message_rx, listener_rx), GossipHandle { send_message: message_tx, send_listener: listener_tx }, ) } @@ -344,7 +315,7 @@ type TestValidationNetwork = crate::validation::ValidationNetwork< >; struct Built { - gossip: GossipRouter, + gossip: Pin>>, api_handle: Arc>, networks: Vec, } @@ -377,7 +348,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built { let networks: Vec<_> = networks.collect(); Built { - gossip: gossip_router, + gossip: gossip_router.boxed(), api_handle, networks, } diff --git a/network/src/validation.rs b/network/src/validation.rs index f6e652fae280..1a7da20e6495 100644 --- a/network/src/validation.rs +++ b/network/src/validation.rs @@ -33,14 +33,13 @@ use polkadot_primitives::parachain::{ use futures::prelude::*; use futures::task::SpawnExt; pub use futures::task::Spawn as Executor; -use futures::channel::oneshot::{self, Receiver}; +use futures::channel::oneshot; use futures::future::{ready, select}; use std::collections::hash_map::{HashMap, Entry}; use std::io; use std::sync::Arc; use std::pin::Pin; -use std::task::{Poll, Context}; use arrayvec::ArrayVec; use parking_lot::Mutex; @@ -242,47 +241,30 @@ impl ParachainNetwork for ValidationNetwork where #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct NetworkDown; -/// A future that resolves when a collation is received. -pub struct AwaitingCollation { - outer: oneshot::Receiver>, - inner: Option> -} - -impl Future for AwaitingCollation { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); - - if let Some(ref mut inner) = this.inner { - return Pin::new(inner).poll(cx).map_err(|_| NetworkDown) - } - match Pin::new(&mut this.outer).poll(cx) { - Poll::Ready(Ok(inner)) => { - this.inner = Some(inner); - Pin::new(this).poll(cx) - }, - Poll::Ready(Err(_)) => Poll::Ready(Err(NetworkDown)), - Poll::Pending => Poll::Pending, - } - } -} - impl Collators for ValidationNetwork where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, N: NetworkService, { type Error = NetworkDown; - type Collation = AwaitingCollation; + type Collation = Pin> + Send>>; fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation { let (tx, rx) = oneshot::channel(); - self.network.with_spec(move |spec, _| { - let collation = spec.await_collation(relay_parent, parachain); - let _ = tx.send(collation); - }); - AwaitingCollation{outer: rx, inner: None} + let network = self.network.clone(); + + // A future that resolves when a collation is received. + async move { + network.with_spec(move |spec, _| { + let collation = spec.await_collation(relay_parent, parachain); + let _ = tx.send(collation); + }); + + rx.await + .map_err(|_| NetworkDown)? + .await + .map_err(|_| NetworkDown) + }.boxed() } @@ -348,27 +330,6 @@ impl Knowledge { } } -/// receiver for incoming data. -#[derive(Clone)] -pub struct IncomingReceiver { - inner: future::Shared> -} - -impl Future for IncomingReceiver { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - match Pin::new(&mut Pin::into_inner(self).inner).poll(cx) { - Poll::Ready(Ok(i)) => Poll::Ready(Ok(Incoming::clone(&i))), - Poll::Ready(Err(_)) => Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - "Sending end of channel hung up", - ))), - Poll::Pending => Poll::Pending, - } - } -} - /// A current validation leaf-work instance #[derive(Clone)] pub(crate) struct LiveValidationLeaf { @@ -564,36 +525,6 @@ impl LiveValidationLeaves { } } -/// Receiver for block data. -pub struct PoVReceiver { - outer: Receiver>, - inner: Option> -} - -impl Future for PoVReceiver { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); - - let map_err = |_| io::Error::new( - io::ErrorKind::Other, - "Sending end of channel hung up", - ); - - if let Some(ref mut inner) = this.inner { - return Pin::new(inner).poll(cx).map_err(map_err); - } - match Pin::new(&mut this.outer).poll(cx).map_err(map_err)? { - Poll::Ready(inner) => { - this.inner = Some(inner); - Pin::new(this).poll(cx) - } - Poll::Pending => Poll::Pending, - } - } -} - /// Can fetch data for a given validation leaf-work instance. pub struct LeafWorkDataFetcher { network: Arc, @@ -658,9 +589,14 @@ impl LeafWorkDataFetcher where E: Future + Clone + Send + 'static, { /// Fetch PoV block for the given candidate receipt. - pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver { + pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) + -> Pin> + Send>> { + let parachain = candidate.parachain_index; let parent_hash = self.parent_hash; + let network = self.network.clone(); + let candidate = candidate.clone(); + let (tx, rx) = oneshot::channel(); let canon_roots = self.api.runtime_api().ingress( &BlockId::hash(parent_hash), @@ -676,15 +612,24 @@ impl LeafWorkDataFetcher where ) ); - let candidate = candidate.clone(); - let (tx, rx) = oneshot::channel(); - self.network.with_spec(move |spec, ctx| { - if let Ok(Some(canon_roots)) = canon_roots { - let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots); - let _ = tx.send(inner_rx); - } - }); - PoVReceiver { outer: rx, inner: None } + async move { + network.with_spec(move |spec, ctx| { + if let Ok(Some(canon_roots)) = canon_roots { + let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots); + let _ = tx.send(inner_rx); + } + }); + + let map_err = |_| io::Error::new( + io::ErrorKind::Other, + "Sending end of channel hung up", + ); + + rx.await + .map_err(map_err)? + .await + .map_err(map_err) + }.boxed() } } diff --git a/validation/Cargo.toml b/validation/Cargo.toml index f7c7636e3d5b..b4d1400fcfd1 100644 --- a/validation/Cargo.toml +++ b/validation/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" futures = "0.3.1" futures-timer = "2.0" parking_lot = "0.9.0" -tokio = { version = "0.2.4", features = ["rt-core", "blocking"] } +tokio = { version = "0.2.4", features = ["rt-core"] } derive_more = "0.14.1" log = "0.4.8" exit-future = "0.2.0" diff --git a/validation/src/collation.rs b/validation/src/collation.rs index cce4d50ff9cb..1acb23591ee9 100644 --- a/validation/src/collation.rs +++ b/validation/src/collation.rs @@ -32,8 +32,6 @@ use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, Messag use trie::TrieConfiguration; use futures::prelude::*; use log::debug; -use std::task::{Poll, Context}; -use std::pin::Pin; /// Encapsulates connections to collators and allows collation on any parachain. /// @@ -58,94 +56,41 @@ pub trait Collators: Clone { } /// A future which resolves when a collation is available. -/// -/// This future is fused. -pub struct CollationFetch { +pub async fn collation_fetch( parachain: ParaId, relay_parent_hash: Hash, - relay_parent: BlockId, collators: C, - live_fetch: Option, client: Arc

, max_block_data_size: Option, -} - -impl CollationFetch { - /// Create a new collation fetcher for the given chain. - pub fn new( - parachain: ParaId, - relay_parent_hash: Hash, - collators: C, - client: Arc

, - max_block_data_size: Option, - ) -> Self { - CollationFetch { - relay_parent: BlockId::hash(relay_parent_hash), - relay_parent_hash, - collators, - client, - parachain, - live_fetch: None, - max_block_data_size, - } - } - - /// Access the underlying relay parent hash. - pub fn relay_parent(&self) -> Hash { - self.relay_parent_hash - } - - /// Access the local parachain ID. - pub fn parachain(&self) -> ParaId { - self.parachain - } -} - -impl Future for CollationFetch +) -> Result<(Collation, OutgoingMessages, Balance),C::Error> where P::Api: ParachainHost, C: Collators + Unpin, P: ProvideRuntimeApi, ::Collation: Unpin, { - type Output = Result<(Collation, OutgoingMessages, Balance),C::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let this = Pin::into_inner(self); - - loop { - let collation = { - let parachain = this.parachain.clone(); - let (r, c) = (this.relay_parent_hash, &this.collators); + let relay_parent = BlockId::hash(relay_parent_hash); - let future = this.live_fetch - .get_or_insert_with(move || c.collate(parachain, r)); - - match Pin::new(future).poll(cx) { - Poll::Ready(Ok(c)) => c, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending - } - }; + loop { + let collation = collators.collate(parachain, relay_parent_hash) + .await?; - let res = validate_collation( - &*this.client, - &this.relay_parent, - &collation, - this.max_block_data_size, - ); + let res = validate_collation( + &*client, + &relay_parent, + &collation, + max_block_data_size, + ); - match res { - Ok((messages, fees)) => { - return Poll::Ready(Ok((collation, messages, fees))) - } - Err(e) => { - debug!("Failed to validate parachain due to API error: {}", e); + match res { + Ok((messages, fees)) => { + return Ok((collation, messages, fees)) + } + Err(e) => { + debug!("Failed to validate parachain due to API error: {}", e); - // just continue if we got a bad collation or failed to validate - this.live_fetch = None; - this.collators.note_bad_collator(collation.info.collator) - } + // just continue if we got a bad collation or failed to validate + collators.note_bad_collator(collation.info.collator) } } } diff --git a/validation/src/lib.rs b/validation/src/lib.rs index b779ccbc9851..3412dab26a11 100644 --- a/validation/src/lib.rs +++ b/validation/src/lib.rs @@ -34,8 +34,6 @@ use std::{ pin::Pin, sync::Arc, time::{self, Duration, Instant}, - task::{Poll, Context}, - mem, }; use babe_primitives::BabeApi; @@ -60,8 +58,8 @@ use txpool_api::{TransactionPool, InPoolTransaction}; use attestation_service::ServiceHandle; use futures::prelude::*; -use futures::{future::{self, Either, select, ready}, stream::unfold, task::{Spawn, SpawnExt}}; -use collation::CollationFetch; +use futures::{future::{select, ready}, stream::unfold, task::{Spawn, SpawnExt}}; +use collation::collation_fetch; use dynamic_inclusion::DynamicInclusion; use inherents::InherentData; use sp_timestamp::TimestampInherentData; @@ -396,7 +394,7 @@ impl ParachainValidation where let with_router = move |router: N::TableRouter| { // fetch a local collation from connected collators. - let collation_work = CollationFetch::new( + let collation_work = collation_fetch( validation_para, relay_parent, collators, @@ -611,14 +609,13 @@ impl consensus::Proposer for Proposer where C::Api: ParachainHost + BlockBuilderApi + ApiExt, { type Error = Error; - type Create = Either, future::Ready>>; + type Create = Pin> + Send>>; fn propose(&mut self, inherent_data: InherentData, inherent_digests: DigestFor, max_duration: Duration, ) -> Self::Create { - const ATTEMPT_PROPOSE_EVERY: Duration = Duration::from_millis(100); const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates. let initial_included = self.tracker.table.includable_count(); @@ -630,56 +627,59 @@ impl consensus::Proposer for Proposer where Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR), ); - let enough_candidates = dynamic_inclusion.acceptable_in( - now, - initial_included, - ).unwrap_or_else(|| Duration::from_millis(1)); - - let believed_timestamp = match inherent_data.timestamp_inherent_data() { - Ok(timestamp) => timestamp, - Err(e) => return Either::Right(future::err(Error::InherentError(e))), - }; - - // set up delay until next allowed timestamp. - let current_timestamp = current_timestamp(); - let delay_future = if current_timestamp >= believed_timestamp { - None - } else { - Some(Delay::new(Duration::from_millis (current_timestamp - believed_timestamp))) - }; - - let timing = ProposalTiming { - minimum: delay_future, - attempt_propose: Box::new(interval(ATTEMPT_PROPOSE_EVERY)), - enough_candidates: Delay::new(enough_candidates), - dynamic_inclusion, - last_included: initial_included, - }; - - let deadline_diff = max_duration - max_duration / 3; - let deadline = match Instant::now().checked_add(deadline_diff) { - None => return Either::Right( - future::err(Error::DeadlineComputeFailure(deadline_diff)), - ), - Some(d) => d, - }; - - Either::Left(CreateProposal { - state: CreateProposalState::Pending(CreateProposalData { - parent_hash: self.parent_hash.clone(), - parent_number: self.parent_number.clone(), - parent_id: self.parent_id.clone(), - client: self.client.clone(), - transaction_pool: self.transaction_pool.clone(), - table: self.tracker.table.clone(), + let parent_hash = self.parent_hash.clone(); + let parent_number = self.parent_number.clone(); + let parent_id = self.parent_id.clone(); + let client = self.client.clone(); + let transaction_pool = self.transaction_pool.clone(); + let table = self.tracker.table.clone(); + + async move { + let enough_candidates = dynamic_inclusion.acceptable_in( + now, + initial_included, + ).unwrap_or_else(|| Duration::from_millis(1)); + + let believed_timestamp = match inherent_data.timestamp_inherent_data() { + Ok(timestamp) => timestamp, + Err(e) => return Err(Error::InherentError(e)), + }; + + let deadline_diff = max_duration - max_duration / 3; + let deadline = match Instant::now().checked_add(deadline_diff) { + None => return Err(Error::DeadlineComputeFailure(deadline_diff)), + Some(d) => d, + }; + + let data = CreateProposalData { + parent_hash, + parent_number, + parent_id, + client, + transaction_pool, + table, believed_minimum_timestamp: believed_timestamp, - timing, inherent_data: Some(inherent_data), inherent_digests, // leave some time for the proposal finalisation deadline, + }; + + // set up delay until next allowed timestamp. + let current_timestamp = current_timestamp(); + if current_timestamp < believed_timestamp { + Delay::new(Duration::from_millis(current_timestamp - believed_timestamp)) + .await; + } + + Delay::new(enough_candidates).await; + + tokio_executor::blocking::run(move || { + let proposed_candidates = data.table.proposed_set(); + data.propose_with(proposed_candidates) }) - }) + .await + }.boxed() } } @@ -689,67 +689,6 @@ fn current_timestamp() -> u64 { .as_millis() as u64 } -struct ProposalTiming { - minimum: Option, - attempt_propose: Box + Send + Unpin>, - dynamic_inclusion: DynamicInclusion, - enough_candidates: Delay, - last_included: usize, -} - -impl ProposalTiming { - // whether it's time to attempt a proposal. - // shouldn't be called outside of the context of a task. - fn poll(&mut self, cx: &mut Context, included: usize) -> Poll<()> { - // first drain from the interval so when the minimum delay is up - // we don't have any notifications built up. - // - // this interval is just meant to produce periodic task wakeups - // that lead to the `dynamic_inclusion` getting updated as necessary. - while let Poll::Ready(x) = self.attempt_propose.poll_next_unpin(cx) { - x.expect("timer still alive; intervals never end; qed"); - } - - // wait until the minimum time has passed. - if let Some(mut minimum) = self.minimum.take() { - if let Poll::Pending = minimum.poll_unpin(cx) { - self.minimum = Some(minimum); - return Poll::Pending; - } - } - - if included == self.last_included { - return self.enough_candidates.poll_unpin(cx); - } - - // the amount of includable candidates has changed. schedule a wakeup - // if it's not sufficient anymore. - match self.dynamic_inclusion.acceptable_in(Instant::now(), included) { - Some(instant) => { - self.last_included = included; - self.enough_candidates.reset(Instant::now() + instant); - self.enough_candidates.poll_unpin(cx) - } - None => Poll::Ready(()), - } - } -} - -/// Future which resolves upon the creation of a proposal. -pub struct CreateProposal { - state: CreateProposalState, -} - -/// Current status of the proposal future. -enum CreateProposalState { - /// Pending inclusion, with given proposal data. - Pending(CreateProposalData), - /// Represents the state when we switch from pending to fired. - Switching, - /// Block proposing has fired. - Fired(tokio_executor::blocking::Blocking>), -} - /// Inner data of the create proposal. struct CreateProposalData { parent_hash: Hash, @@ -758,7 +697,6 @@ struct CreateProposalData { client: Arc, transaction_pool: Arc, table: Arc, - timing: ProposalTiming, believed_minimum_timestamp: u64, inherent_data: Option, inherent_digests: DigestFor, @@ -859,58 +797,6 @@ impl CreateProposalData where } } -impl Future for CreateProposal where - TxPool: TransactionPool + 'static, - C: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, - C::Api: ParachainHost + BlockBuilderApi + ApiExt, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut state = CreateProposalState::Switching; - mem::swap(&mut state, &mut self.state); - - // 1. try to propose if we have enough includable candidates and other - // delays have concluded. - let data = match state { - CreateProposalState::Pending(mut data) => { - let included = data.table.includable_count(); - match data.timing.poll(cx, included) { - Poll::Pending => { - self.state = CreateProposalState::Pending(data); - return Poll::Pending - }, - Poll::Ready(()) => (), - } - - data - }, - CreateProposalState::Switching => - unreachable!( - "State Switching are only created on call, \ - and immediately swapped out; \ - the data being read is from state; \ - thus Switching will never be reachable here; qed" - ), - CreateProposalState::Fired(mut future) => { - let ret = Pin::new(&mut future).poll(cx); - self.state = CreateProposalState::Fired(future); - return ret - }, - }; - - // 2. propose - let mut future = tokio_executor::blocking::run(move || { - let proposed_candidates = data.table.proposed_set(); - data.propose_with(proposed_candidates) - }); - let polled = Pin::new(&mut future).poll(cx); - self.state = CreateProposalState::Fired(future); - - polled - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/validation/src/shared_table/includable.rs b/validation/src/shared_table/includable.rs index 1b74abcf4329..fa94578adf9d 100644 --- a/validation/src/shared_table/includable.rs +++ b/validation/src/shared_table/includable.rs @@ -17,16 +17,13 @@ //! Implements a future which resolves when all of the candidates referenced are includable. use std::collections::HashMap; - -use futures::prelude::*; use futures::channel::oneshot; -use std::pin::Pin; -use std::task::{Poll, Context}; - use polkadot_primitives::Hash; /// Track includability of a set of candidates, -pub(super) fn track>(candidates: I) -> (IncludabilitySender, Includable) { +pub(super) fn track>(candidates: I) + -> (IncludabilitySender, oneshot::Receiver<()>) { + let (tx, rx) = oneshot::channel(); let tracking: HashMap<_, _> = candidates.into_iter().collect(); let includable_count = tracking.values().filter(|x| **x).count(); @@ -39,10 +36,7 @@ pub(super) fn track>(candidates: I) -> (Inclu sender.try_complete(); - ( - sender, - Includable(rx), - ) + (sender, rx) } /// The sending end of the includability sender. @@ -93,17 +87,6 @@ impl IncludabilitySender { } } -/// Future that resolves when all the candidates within are includable. -pub struct Includable(oneshot::Receiver<()>); - -impl Future for Includable { - type Output = Result<(), oneshot::Canceled>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - Pin::new(&mut Pin::into_inner(self).0).poll(cx) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/validation/src/shared_table/mod.rs b/validation/src/shared_table/mod.rs index e0ce64201b1e..3784aa7e1c16 100644 --- a/validation/src/shared_table/mod.rs +++ b/validation/src/shared_table/mod.rs @@ -30,6 +30,7 @@ use polkadot_primitives::parachain::{ use parking_lot::Mutex; use futures::prelude::*; +use futures::channel::oneshot; use log::{warn, debug}; use bitvec::bitvec; @@ -40,7 +41,6 @@ use runtime_primitives::traits::ProvideRuntimeApi; mod includable; -pub use self::includable::Includable; pub use table::{SignedStatement, Statement}; pub use table::generic::Statement as GenericStatement; @@ -543,7 +543,7 @@ impl SharedTable { } /// Track includability of a given set of candidate hashes. - pub fn track_includability(&self, iterable: I) -> Includable + pub fn track_includability(&self, iterable: I) -> oneshot::Receiver<()> where I: IntoIterator { let mut inner = self.inner.lock();