Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e457a29
skeleton of collators object
rphmeier Jul 9, 2018
b04aa4e
awaiting and handling collations. rename `collators` to CollationPool
rphmeier Jul 10, 2018
e9a3ace
add some tests
rphmeier Jul 10, 2018
c152a0c
add tests
rphmeier Jul 10, 2018
352565c
implement Collators trait for ConsensusNetwork
rphmeier Jul 10, 2018
11875ec
plug collators into main polkadot-network
rphmeier Jul 10, 2018
0ba2c70
Merge branch 'master' into rh-collator-protocol
rphmeier Jul 10, 2018
d236663
ignore collator role message
rphmeier Jul 11, 2018
bd70053
add a couple more tests
rphmeier Jul 11, 2018
2a03637
garbage collection for collations
rphmeier Jul 11, 2018
3d1f9e8
extract session-key tracking from consensus
rphmeier Jul 13, 2018
531fc54
add local_collations.rs
rphmeier Jul 17, 2018
5bd4ba5
Merge branch 'master' into rh-collator-push
rphmeier Jul 17, 2018
c07c35a
finish polish of local_collations
rphmeier Jul 17, 2018
86b2678
integrate local_collations into network layer
rphmeier Jul 17, 2018
c6daf24
introduce API for adding local collations
rphmeier Jul 17, 2018
96a7093
mostly finish collator implementation pending service fix
rphmeier Jul 17, 2018
0ee831e
Merge remote-tracking branch 'upstream/master' into rh-collator-push
rphmeier Jul 17, 2018
1eaa1bc
Specialized network()
arkpar Jul 17, 2018
398b29c
push collations to the network
rphmeier Jul 17, 2018
7645d50
grumbles
rphmeier Jul 18, 2018
7a1d194
substrate-service has custom configuration
rphmeier Jul 18, 2018
2382cbb
initialize network in collator mode as necessary
rphmeier Jul 18, 2018
5866f7e
Merge remote-tracking branch 'upstream/master' into rh-collator-push
rphmeier Jul 18, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions polkadot/collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ polkadot-primitives = { path = "../primitives", version = "0.1" }
polkadot-cli = { path = "../cli" }
log = "0.4"
ed25519 = { path = "../../substrate/ed25519" }
tokio = "0.1.7"
89 changes: 58 additions & 31 deletions polkadot/collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ extern crate substrate_client as client;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
extern crate ed25519;
extern crate tokio;

extern crate polkadot_api;
extern crate polkadot_cli;
Expand All @@ -58,16 +59,20 @@ extern crate polkadot_primitives;
#[macro_use]
extern crate log;

use std::collections::{BTreeSet, BTreeMap};
use std::collections::{BTreeSet, BTreeMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};

use futures::{future, stream, Stream, Future, IntoFuture};
use client::BlockchainEvents;
use polkadot_api::PolkadotApi;
use polkadot_primitives::BlockId;
use polkadot_primitives::parachain::{self, BlockData, HeadData, ConsolidatedIngress, Collation, Message, Id as ParaId};
use polkadot_primitives::{BlockId, SessionKey};
use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId};
use polkadot_cli::{ServiceComponents, Service};
use polkadot_cli::Worker;
use tokio::timer::Deadline;

const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);

/// Parachain context needed for collation.
///
Expand Down Expand Up @@ -183,7 +188,7 @@ pub fn collate<'a, R, P>(
struct ApiContext;

impl RelayChainContext for ApiContext {
type Error = ();
type Error = ::polkadot_api::Error;
type FutureEgress = Result<Vec<Vec<Message>>, Self::Error>;

fn routing_parachains(&self) -> BTreeSet<ParaId> {
Expand Down Expand Up @@ -217,35 +222,48 @@ impl<P, E> Worker for CollationNode<P, E> where
let CollationNode { parachain_context, exit, para_id, key } = self;
let client = service.client();
let api = service.api();
let network = service.network();

let work = client.import_notification_stream()
.and_then(move |notification| {
let id = BlockId::hash(notification.hash);

match api.parachain_head(&id, para_id) {
Ok(Some(last_head)) => {
let collation_work = collate(
para_id,
HeadData(last_head),
ApiContext,
parachain_context.clone(),
key.clone(),
).map(Some);

future::Either::A(collation_work)
}
Ok(None) => {
info!("Parachain {:?} appears to be inactive. Cannot collate.", id);
future::Either::B(future::ok(None))
}
Err(e) => {
warn!("Could not collate for parachain {:?}: {:?}", id, e);
future::Either::B(future::ok(None)) // returning error would shut down the collation node
}
}
})
.for_each(|_collation: Option<Collation>| {
// TODO: import into network.
.for_each(move |notification| {
let relay_parent = notification.hash;
let id = BlockId::hash(relay_parent);

let network = network.clone();
let api = api.clone();
let key = key.clone();
let parachain_context = parachain_context.clone();

let work = future::lazy(move || {
let last_head = match api.parachain_head(&id, para_id)? {
Some(last_head) => last_head,
None => return Ok(()),
};

let targets = compute_targets(
para_id,
api.session_keys(&id)?.as_slice(),
api.duty_roster(&id)?,
);

collate(
para_id,
HeadData(last_head),
ApiContext,
parachain_context,
key,
).map(|collation| {
network.with_spec(|spec, ctx| spec.add_local_collation(
ctx,
relay_parent,
targets,
collation,
))
})
});
let deadlined = Deadline::new(work, Instant::now() + COLLATION_TIMEOUT);

tokio::spawn(deadlined.map_err(|e| warn!("Collation failure: {}", e)));
Ok(())
});

Expand All @@ -254,6 +272,15 @@ impl<P, E> Worker for CollationNode<P, E> where
}
}

fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRoster) -> HashSet<SessionKey> {
use polkadot_primitives::parachain::Chain;

roster.validator_duty.iter().enumerate()
.filter(|&(_, ref c)| c == &Chain::Parachain(para_id))
.filter_map(|(i, _)| session_keys.get(i))
.collect()
}

/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and
/// arguments to the underlying polkadot node.
///
Expand Down
1 change: 0 additions & 1 deletion polkadot/network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Network for ConsensusNetwork<P
knowledge,
parent_hash,
local_session_key,
session_keys: Default::default(),
});

MessageProcessTask {
Expand Down
Loading