Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e457a29
skeleton of collators object
rphmeier Jul 9, 2018
b04aa4e
awaiting and handling collations. rename `collators` to CollationPool
rphmeier Jul 10, 2018
e9a3ace
add some tests
rphmeier Jul 10, 2018
c152a0c
add tests
rphmeier Jul 10, 2018
352565c
implement Collators trait for ConsensusNetwork
rphmeier Jul 10, 2018
11875ec
plug collators into main polkadot-network
rphmeier Jul 10, 2018
0ba2c70
Merge branch 'master' into rh-collator-protocol
rphmeier Jul 10, 2018
d236663
ignore collator role message
rphmeier Jul 11, 2018
bd70053
add a couple more tests
rphmeier Jul 11, 2018
2a03637
garbage collection for collations
rphmeier Jul 11, 2018
3d1f9e8
extract session-key tracking from consensus
rphmeier Jul 13, 2018
531fc54
add local_collations.rs
rphmeier Jul 17, 2018
5bd4ba5
Merge branch 'master' into rh-collator-push
rphmeier Jul 17, 2018
c07c35a
finish polish of local_collations
rphmeier Jul 17, 2018
86b2678
integrate local_collations into network layer
rphmeier Jul 17, 2018
c6daf24
introduce API for adding local collations
rphmeier Jul 17, 2018
96a7093
mostly finish collator implementation pending service fix
rphmeier Jul 17, 2018
0ee831e
Merge remote-tracking branch 'upstream/master' into rh-collator-push
rphmeier Jul 17, 2018
1eaa1bc
Specialized network()
arkpar Jul 17, 2018
398b29c
push collations to the network
rphmeier Jul 17, 2018
7645d50
grumbles
rphmeier Jul 18, 2018
7a1d194
substrate-service has custom configuration
rphmeier Jul 18, 2018
2382cbb
initialize network in collator mode as necessary
rphmeier Jul 18, 2018
5866f7e
Merge remote-tracking branch 'upstream/master' into rh-collator-push
rphmeier Jul 18, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions polkadot/cli/src/cli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ args:
value_name: KEY
help: Specify node secret key (64-character hex string)
takes_value: true
- collator:
long: collator
help: Enable collator mode
takes_value: false
- validator:
long: validator
help: Enable validator mode
Expand Down
24 changes: 13 additions & 11 deletions polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub use client::error::Error as ClientError;
pub use client::backend::Backend as ClientBackend;
pub use state_machine::Backend as StateMachineBackend;
pub use polkadot_primitives::Block as PolkadotBlock;
pub use service::{Components as ServiceComponents, Service};
pub use service::{Components as ServiceComponents, Service, CustomConfiguration};

use std::io::{self, Write, Read, stdin, stdout};
use std::fs::File;
Expand Down Expand Up @@ -134,11 +134,16 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
pub trait Worker {
/// A future that resolves when the work is done or the node should exit.
/// This will be run on a tokio runtime.
type Work: Future<Item=(),Error=()>;
type Work: Future<Item=(),Error=()> + Send + 'static;

/// An exit scheduled for the future.
type Exit: Future<Item=(),Error=()> + Send + 'static;

/// Return configuration for the polkadot node.
// TODO: make this the full configuration, so embedded nodes don't need
// string CLI args
fn configuration(&self) -> CustomConfiguration { Default::default() }

/// Don't work, but schedule an exit.
fn exit_only(self) -> Self::Exit;

Expand Down Expand Up @@ -217,13 +222,7 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
};

let role =
if matches.is_present("collator") {
info!("Starting collator");
// TODO [rob]: collation node implementation
// This isn't a thing. Different parachains will have their own collator executables and
// maybe link to libpolkadot to get a light-client.
service::Roles::LIGHT
} else if matches.is_present("light") {
if matches.is_present("light") {
info!("Starting (light)");
config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible;
service::Roles::LIGHT
Expand Down Expand Up @@ -262,9 +261,10 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
config.network.net_config_path = config.network.config_path.clone();

let port = match matches.value_of("port") {
Some(port) => port.parse().expect("Invalid p2p port value specified."),
Some(port) => port.parse().map_err(|_| "Invalid p2p port value specified.")?,
None => 30333,
};

config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port));
config.network.public_address = None;
config.network.client_version = format!("parity-polkadot/{}", crate_version!());
Expand All @@ -275,6 +275,8 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
};
}

config.custom = worker.configuration();

config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect();
if matches.is_present("dev") {
config.keys.push("Alice".into());
Expand Down Expand Up @@ -494,7 +496,7 @@ fn run_until_exit<C, W>(
)
};

let _ = worker.work(&service).wait();
let _ = runtime.block_on(worker.work(&service));
exit_send.fire();
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions polkadot/collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ polkadot-primitives = { path = "../primitives", version = "0.1" }
polkadot-cli = { path = "../cli" }
log = "0.4"
ed25519 = { path = "../../substrate/ed25519" }
tokio = "0.1.7"
127 changes: 93 additions & 34 deletions polkadot/collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ extern crate substrate_client as client;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
extern crate ed25519;
extern crate tokio;

extern crate polkadot_api;
extern crate polkadot_cli;
Expand All @@ -58,16 +59,20 @@ extern crate polkadot_primitives;
#[macro_use]
extern crate log;

use std::collections::{BTreeSet, BTreeMap};
use std::collections::{BTreeSet, BTreeMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};

use futures::{future, stream, Stream, Future, IntoFuture};
use client::BlockchainEvents;
use polkadot_api::PolkadotApi;
use polkadot_primitives::BlockId;
use polkadot_primitives::parachain::{self, BlockData, HeadData, ConsolidatedIngress, Collation, Message, Id as ParaId};
use polkadot_cli::{ServiceComponents, Service};
use polkadot_primitives::{AccountId, BlockId, SessionKey};
use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId};
use polkadot_cli::{ServiceComponents, Service, CustomConfiguration};
use polkadot_cli::Worker;
use tokio::timer::Deadline;

const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);

/// Parachain context needed for collation.
///
Expand Down Expand Up @@ -99,6 +104,11 @@ pub trait RelayChainContext {
fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress;
}

fn key_to_account_id(key: &ed25519::Pair) -> AccountId {
let pubkey_bytes: [u8; 32] = key.public().into();
pubkey_bytes.into()
}

/// Collate the necessary ingress queue using the given context.
pub fn collate_ingress<'a, R>(relay_context: R)
-> impl Future<Item=ConsolidatedIngress, Error=R::Error> + 'a
Expand Down Expand Up @@ -159,11 +169,10 @@ pub fn collate<'a, R, P>(

let block_data_hash = block_data.hash();
let signature = key.sign(&block_data_hash.0[..]).into();
let pubkey_bytes: [u8; 32] = key.public().into();

let receipt = parachain::CandidateReceipt {
parachain_index: local_id,
collator: pubkey_bytes.into(),
collator: key_to_account_id(&*key),
signature,
head_data,
balance_uploads: Vec::new(),
Expand All @@ -183,7 +192,7 @@ pub fn collate<'a, R, P>(
struct ApiContext;

impl RelayChainContext for ApiContext {
type Error = ();
type Error = ::polkadot_api::Error;
type FutureEgress = Result<Vec<Vec<Message>>, Self::Error>;

fn routing_parachains(&self) -> BTreeSet<ParaId> {
Expand All @@ -203,12 +212,21 @@ struct CollationNode<P, E> {
}

impl<P, E> Worker for CollationNode<P, E> where
P: ParachainContext + 'static,
P: ParachainContext + Send + 'static,
E: Future<Item=(),Error=()> + Send + 'static
{
type Work = Box<Future<Item=(),Error=()>>;
type Work = Box<Future<Item=(),Error=()> + Send>;
type Exit = E;

fn configuration(&self) -> CustomConfiguration {
let mut config = CustomConfiguration::default();
config.collating_for = Some((
key_to_account_id(&*self.key),
self.para_id.clone(),
));
config
}

fn exit_only(self) -> Self::Exit {
self.exit
}
Expand All @@ -217,35 +235,66 @@ impl<P, E> Worker for CollationNode<P, E> where
let CollationNode { parachain_context, exit, para_id, key } = self;
let client = service.client();
let api = service.api();
let network = service.network();

let work = client.import_notification_stream()
.and_then(move |notification| {
let id = BlockId::hash(notification.hash);

match api.parachain_head(&id, para_id) {
Ok(Some(last_head)) => {
let collation_work = collate(
para_id,
HeadData(last_head),
ApiContext,
parachain_context.clone(),
key.clone(),
).map(Some);

future::Either::A(collation_work)
}
Ok(None) => {
info!("Parachain {:?} appears to be inactive. Cannot collate.", id);
future::Either::B(future::ok(None))
.for_each(move |notification| {
macro_rules! try_fr {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this macro inlined? It doesn't need any of the context of the closure, or?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's also only used in the closure, so I felt it was better to limit it there

($e:expr) => {
match $e {
Ok(x) => x,
Err(e) => return future::Either::A(future::err(e)),
}
}
}

let relay_parent = notification.hash;
let id = BlockId::hash(relay_parent);

let network = network.clone();
let api = api.clone();
let key = key.clone();
let parachain_context = parachain_context.clone();

let work = future::lazy(move || {
let last_head = match try_fr!(api.parachain_head(&id, para_id)) {
Some(last_head) => last_head,
None => return future::Either::A(future::ok(())),
};

let targets = compute_targets(
para_id,
try_fr!(api.session_keys(&id)).as_slice(),
try_fr!(api.duty_roster(&id)),
);

let collation_work = collate(
para_id,
HeadData(last_head),
ApiContext,
parachain_context,
key,
).map(move |collation| {
network.with_spec(|spec, ctx| spec.add_local_collation(
ctx,
relay_parent,
targets,
collation,
));
});

future::Either::B(collation_work)
});
let deadlined = Deadline::new(work, Instant::now() + COLLATION_TIMEOUT);
let silenced = deadlined.then(|res| match res {
Ok(()) => Ok(()),
Err(e) => {
warn!("Could not collate for parachain {:?}: {:?}", id, e);
future::Either::B(future::ok(None)) // returning error would shut down the collation node
warn!("Collation failure: {}", e);
Ok(())
}
}
})
.for_each(|_collation: Option<Collation>| {
// TODO: import into network.
});

tokio::spawn(silenced);
Ok(())
});

Expand All @@ -254,6 +303,16 @@ impl<P, E> Worker for CollationNode<P, E> where
}
}

fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRoster) -> HashSet<SessionKey> {
use polkadot_primitives::parachain::Chain;

roster.validator_duty.iter().enumerate()
.filter(|&(_, c)| c == &Chain::Parachain(para_id))
.filter_map(|(i, _)| session_keys.get(i))
.cloned()
.collect()
}

/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and
/// arguments to the underlying polkadot node.
///
Expand All @@ -266,7 +325,7 @@ pub fn run_collator<P, E>(
key: Arc<ed25519::Pair>,
args: Vec<::std::ffi::OsString>
) -> polkadot_cli::error::Result<()> where
P: ParachainContext + 'static,
P: ParachainContext + Send + 'static,
E: IntoFuture<Item=(),Error=()>,
E::Future: Send + 'static,
{
Expand Down
1 change: 0 additions & 1 deletion polkadot/network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Network for ConsensusNetwork<P
knowledge,
parent_hash,
local_session_key,
session_keys: Default::default(),
});

MessageProcessTask {
Expand Down
Loading