Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;

use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
use log::{warn, error};
Expand Down Expand Up @@ -242,20 +243,26 @@ impl<P: 'static, E: 'static, SP: 'static> RelayChainContext for ApiContext<P, E,
SP: Spawn + Clone + Send + Sync
{
type Error = String;
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Unpin + Send>;
type FutureEgress = Pin<Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Send>>;

fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
// TODO: https://github.com/paritytech/polkadot/issues/253
//
// Fetch ingress and accumulate all unrounted egress
let _session = self.network.instantiate_leaf_work(LeafWorkParams {
local_session_key: None,
parent_hash: self.parent_hash,
authorities: self.validators.clone(),
})
.map_err(|e| format!("unable to instantiate validation session: {:?}", e));

Box::new(future::ok(ConsolidatedIngress(Vec::new())))
let network = self.network.clone();
let parent_hash = self.parent_hash;
let authorities = self.validators.clone();

async move {
// TODO: https://github.com/paritytech/polkadot/issues/253
//
// Fetch ingress and accumulate all unrounted egress
let _session = network.instantiate_leaf_work(LeafWorkParams {
local_session_key: None,
parent_hash,
authorities,
})
.map_err(|e| format!("unable to instantiate validation session: {:?}", e));

Ok(ConsolidatedIngress(Vec::new()))
}.boxed()
}
}

Expand Down Expand Up @@ -425,7 +432,7 @@ impl<P, E> Worker for CollationNode<P, E> where
);

let exit = inner_exit_2.clone();
tokio::spawn(future::select(res, exit).map(drop));
tokio::spawn(future::select(res.boxed(), exit).map(drop));
})
});

Expand Down
34 changes: 13 additions & 21 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub mod gossip;
use codec::{Decode, Encode};
use futures::channel::{oneshot, mpsc};
use futures::prelude::*;
use futures::future::Either;
use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{
Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock,
Expand Down Expand Up @@ -837,25 +836,6 @@ impl PolkadotProtocol {
debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
relay_parent, collation.info.parachain_index);

let res = match self.availability_store {
Some(ref availability_store) => {
let availability_store_cloned = availability_store.clone();
let collation_cloned = collation.clone();
Either::Left((async move {
let _ = availability_store_cloned.make_available(av_store::Data {
relay_parent,
parachain_id: collation_cloned.info.parachain_index,
block_data: collation_cloned.pov.block_data.clone(),
outgoing_queues: Some(outgoing_targeted.clone().into()),
}).await;
}
)
.boxed()
)
}
None => Either::Right(futures::future::ready(())),
};

for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
match self.validators.get(&primary) {
Some(who) => {
Expand All @@ -871,7 +851,19 @@ impl PolkadotProtocol {
}
}

res
let availability_store = self.availability_store.clone();
let collation_cloned = collation.clone();

async move {
if let Some(availability_store) = availability_store {
let _ = availability_store.make_available(av_store::Data {
relay_parent,
parachain_id: collation_cloned.info.parachain_index,
block_data: collation_cloned.pov.block_data.clone(),
outgoing_queues: Some(outgoing_targeted.clone().into()),
}).await;
}
}
}

/// Give the network protocol a handle to an availability store, used for
Expand Down
5 changes: 3 additions & 2 deletions network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ use log::{debug, trace};
use std::collections::{HashMap, HashSet};
use std::io;
use std::sync::Arc;
use std::pin::Pin;

use crate::validation::{self, LeafWorkDataFetcher, Executor};
use crate::validation::{LeafWorkDataFetcher, Executor};
use crate::NetworkService;

/// Compute the gossip topic for attestations on the given parent hash.
Expand Down Expand Up @@ -232,7 +233,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
E: Future<Output=()> + Clone + Send + 'static,
{
type Error = io::Error;
type FetchValidationProof = validation::PoVReceiver;
type FetchValidationProof = Pin<Box<dyn Future<Output = Result<PoVBlock, io::Error>> + Send>>;

// We have fetched from a collator and here the receipt should have been already formed.
fn local_collation(
Expand Down
91 changes: 31 additions & 60 deletions network/src/tests/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::pin::Pin;
use std::task::{Poll, Context};
use futures::{prelude::*, channel::mpsc};
use futures::{prelude::*, channel::mpsc, future::{select, Either}};
use codec::Encode;

use super::{TestContext, TestChainContext};
Expand All @@ -66,77 +66,48 @@ fn clone_gossip(n: &TopicNotification) -> TopicNotification {
}
}

struct GossipRouter {
incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>,
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
outgoing: Vec<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
messages: Vec<(Hash, TopicNotification)>,
}

impl GossipRouter {
fn add_message(&mut self, topic: Hash, message: TopicNotification) {
self.outgoing.retain(|&(ref o_topic, ref sender)| {
o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok()
});
self.messages.push((topic, message));
}

fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<TopicNotification>) {
for message in self.messages.iter()
.filter(|&&(ref t, _)| t == &topic)
.map(|&(_, ref msg)| clone_gossip(msg))
{
if let Err(_) = sender.unbounded_send(message) { return }
}

self.outgoing.push((topic, sender));
}
}

impl Future for GossipRouter {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);

loop {
match Pin::new(&mut this.incoming_messages).poll_next(cx) {
Poll::Ready(Some((topic, message))) => this.add_message(topic, message),
Poll::Ready(None) => panic!("ended early."),
Poll::Pending => break,
}
}

loop {
match Pin::new(&mut this.incoming_streams).poll_next(cx) {
Poll::Ready(Some((topic, sender))) => this.add_outgoing(topic, sender),
Poll::Ready(None) => panic!("ended early."),
Poll::Pending => break,
}
async fn gossip_router(
mut incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>,
mut incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<TopicNotification>)>
) {
let mut outgoing: Vec<(Hash, mpsc::UnboundedSender<TopicNotification>)> = Vec::new();
let mut messages = Vec::new();

loop {
match select(incoming_messages.next(), incoming_streams.next()).await {
Either::Left((Some((topic, message)), _)) => {
outgoing.retain(|&(ref o_topic, ref sender)| {
o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok()
});
messages.push((topic, message));
},
Either::Right((Some((topic, sender)), _)) => {
for message in messages.iter()
.filter(|&&(ref t, _)| t == &topic)
.map(|&(_, ref msg)| clone_gossip(msg))
{
if let Err(_) = sender.unbounded_send(message) { return }
}

outgoing.push((topic, sender));
},
Either::Left((None, _)) | Either::Right((None, _)) => panic!("ended early.")
}

Poll::Pending
}
}


#[derive(Clone)]
struct GossipHandle {
send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>,
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
}

fn make_gossip() -> (GossipRouter, GossipHandle) {
fn make_gossip() -> (impl Future<Output = ()>, GossipHandle) {
let (message_tx, message_rx) = mpsc::unbounded();
let (listener_tx, listener_rx) = mpsc::unbounded();

(
GossipRouter {
incoming_messages: message_rx,
incoming_streams: listener_rx,
outgoing: Vec::new(),
messages: Vec::new(),
},
gossip_router(message_rx, listener_rx),
GossipHandle { send_message: message_tx, send_listener: listener_tx },
)
}
Expand Down Expand Up @@ -344,7 +315,7 @@ type TestValidationNetwork = crate::validation::ValidationNetwork<
>;

struct Built {
gossip: GossipRouter,
gossip: Pin<Box<dyn Future<Output = ()>>>,
api_handle: Arc<Mutex<ApiData>>,
networks: Vec<TestValidationNetwork>,
}
Expand Down Expand Up @@ -377,7 +348,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
let networks: Vec<_> = networks.collect();

Built {
gossip: gossip_router,
gossip: gossip_router.boxed(),
api_handle,
networks,
}
Expand Down
Loading