Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,084 changes: 994 additions & 1,090 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ edition = "2018"
[dependencies]
cli = { package = "polkadot-cli", path = "cli" }
futures = "0.3.1"
ctrlc = { version = "3.1.3", features = ["termination"] }
service = { package = "polkadot-service", path = "service" }

[build-dependencies]
Expand Down
99 changes: 44 additions & 55 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ pub trait Network: Send + Sync {
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>>;
}

impl<P, E, SP> Network for ValidationNetwork<P, E, SP> where
impl<P, SP> Network for ValidationNetwork<P, SP> where
P: 'static + Send + Sync,
E: 'static + Send + Sync,
SP: 'static + Spawn + Clone + Send + Sync,
{
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Expand Down Expand Up @@ -239,16 +238,15 @@ pub async fn collate<R, P>(
}

/// Polkadot-api context.
struct ApiContext<P, E, SP> {
network: Arc<ValidationNetwork<P, E, SP>>,
struct ApiContext<P, SP> {
network: Arc<ValidationNetwork<P, SP>>,
parent_hash: Hash,
validators: Vec<ValidatorId>,
}

impl<P: 'static, E: 'static, SP: 'static> RelayChainContext for ApiContext<P, E, SP> where
impl<P: 'static, SP: 'static> RelayChainContext for ApiContext<P, SP> where
P: ProvideRuntimeApi<Block> + Send + Sync,
P::Api: ParachainHost<Block>,
E: futures::Future<Output=()> + Clone + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync
{
type Error = String;
Expand Down Expand Up @@ -276,13 +274,12 @@ impl<P: 'static, E: 'static, SP: 'static> RelayChainContext for ApiContext<P, E,
}

/// Run the collator node using the given `service`.
fn run_collator_node<S, E, P, Extrinsic>(
fn run_collator_node<S, P, Extrinsic>(
service: S,
exit: E,
para_id: ParaId,
key: Arc<CollatorPair>,
build_parachain_context: P,
) -> polkadot_cli::error::Result<()>
) -> Result<S, polkadot_service::Error>
where
S: AbstractService<Block = service::Block, NetworkSpecialization = service::PolkadotProtocol>,
sc_client::Client<S::Backend, S::CallExecutor, service::Block, S::RuntimeApi>: ProvideRuntimeApi<Block>,
Expand All @@ -301,7 +298,6 @@ fn run_collator_node<S, E, P, Extrinsic>(
S::CallExecutor: service::CallExecutor<service::Block>,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
S::SelectChain: service::SelectChain<service::Block>,
E: futures::Future<Output=()> + Clone + Unpin + Send + Sync + 'static,
P: BuildParachainContext,
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
Expand All @@ -315,7 +311,7 @@ fn run_collator_node<S, E, P, Extrinsic>(
let select_chain = if let Some(select_chain) = service.select_chain() {
select_chain
} else {
return Err(polkadot_cli::error::Error::Other("The node cannot work because it can't select chain.".into()))
return Err("The node cannot work because it can't select chain.".into())
};

let is_known = move |block_hash: &Hash| {
Expand Down Expand Up @@ -345,7 +341,6 @@ fn run_collator_node<S, E, P, Extrinsic>(

let validation_network = Arc::new(ValidationNetwork::new(
message_validator,
exit.clone(),
client.clone(),
spawner.clone(),
));
Expand All @@ -357,11 +352,10 @@ fn run_collator_node<S, E, P, Extrinsic>(
) {
Ok(ctx) => ctx,
Err(()) => {
return Err(polkadot_cli::error::Error::Other("Could not build the parachain context!".into()))
return Err("Could not build the parachain context!".into())
}
};

let inner_exit = exit.clone();
let work = async move {
let mut notification_stream = client.import_notification_stream();

Expand All @@ -385,7 +379,6 @@ fn run_collator_node<S, E, P, Extrinsic>(
let key = key.clone();
let parachain_context = parachain_context.clone();
let validation_network = validation_network.clone();
let inner_exit_2 = inner_exit.clone();

let work = future::lazy(move |_| async move {
let api = client.runtime_api();
Expand Down Expand Up @@ -425,8 +418,7 @@ fn run_collator_node<S, E, P, Extrinsic>(
outgoing,
);

let exit = inner_exit_2.clone();
tokio::spawn(future::select(res.boxed(), exit));
tokio::spawn(res.boxed());
});
}
future::ok(())
Expand All @@ -444,19 +436,15 @@ fn run_collator_node<S, E, P, Extrinsic>(
}
});

let future = future::select(
silenced,
inner_exit.clone()
).map(drop);
let future = silenced.map(drop);

tokio::spawn(future);
}
}.boxed();

service.spawn_essential_task("collation", work);

// NOTE: this is not ideal as we should only provide the service
sc_cli::run_service_until_exit(Configuration::default(), |_config| Ok(service))
Ok(service)
}

fn compute_targets(para_id: ParaId, session_keys: &[ValidatorId], roster: DutyRoster) -> HashSet<ValidatorId> {
Expand All @@ -472,53 +460,54 @@ fn compute_targets(para_id: ParaId, session_keys: &[ValidatorId], roster: DutyRo
/// Run a collator node with the given `RelayChainContext` and `ParachainContext`
/// build by the given `BuildParachainContext` and arguments to the underlying polkadot node.
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
pub fn run_collator<P, E>(
pub fn run_collator<P>(
build_parachain_context: P,
para_id: ParaId,
exit: E,
key: Arc<CollatorPair>,
config: Configuration,
) -> polkadot_cli::error::Result<()> where
P: BuildParachainContext,
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
E: futures::Future<Output = ()> + Unpin + Send + Clone + Sync + 'static,
{
match (config.expect_chain_spec().is_kusama(), config.roles) {
(true, Roles::LIGHT) =>
run_collator_node(
service::kusama_new_light(config, Some((key.public(), para_id)))?,
exit,
para_id,
key,
build_parachain_context,
),
sc_cli::run_service_until_exit(config, |config| {
run_collator_node(
service::kusama_new_light(config, Some((key.public(), para_id)))?,
para_id,
key,
build_parachain_context,
)
}),
(true, _) =>
run_collator_node(
service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
exit,
para_id,
key,
build_parachain_context,
),
sc_cli::run_service_until_exit(config, |config| {
run_collator_node(
service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
para_id,
key,
build_parachain_context,
)
}),
(false, Roles::LIGHT) =>
run_collator_node(
service::polkadot_new_light(config, Some((key.public(), para_id)))?,
exit,
para_id,
key,
build_parachain_context,
),
sc_cli::run_service_until_exit(config, |config| {
run_collator_node(
service::polkadot_new_light(config, Some((key.public(), para_id)))?,
para_id,
key,
build_parachain_context,
)
}),
(false, _) =>
run_collator_node(
service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
exit,
para_id,
key,
build_parachain_context,
),
sc_cli::run_service_until_exit(config, |config| {
run_collator_node(
service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
para_id,
key,
build_parachain_context,
)
}),
}
}

Expand Down
23 changes: 10 additions & 13 deletions network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement,
use sp_api::ProvideRuntimeApi;

use futures::prelude::*;
use futures::{task::SpawnExt, future::{ready, select}};
use futures::{task::SpawnExt, future::ready};
use parking_lot::Mutex;
use log::{debug, trace};

Expand Down Expand Up @@ -73,18 +73,18 @@ pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
}

/// Table routing implementation.
pub struct Router<P, E, T> {
pub struct Router<P, T> {
table: Arc<SharedTable>,
attestation_topic: Hash,
fetcher: LeafWorkDataFetcher<P, E, T>,
fetcher: LeafWorkDataFetcher<P, T>,
deferred_statements: Arc<Mutex<DeferredStatements>>,
message_validator: RegisteredMessageValidator,
}

impl<P, E, T> Router<P, E, T> {
impl<P, T> Router<P, T> {
pub(crate) fn new(
table: Arc<SharedTable>,
fetcher: LeafWorkDataFetcher<P, E, T>,
fetcher: LeafWorkDataFetcher<P, T>,
message_validator: RegisteredMessageValidator,
) -> Self {
let parent_hash = fetcher.parent_hash();
Expand Down Expand Up @@ -116,7 +116,7 @@ impl<P, E, T> Router<P, E, T> {
}
}

impl<P, E: Clone, T: Clone> Clone for Router<P, E, T> {
impl<P, T: Clone> Clone for Router<P, T> {
fn clone(&self) -> Self {
Router {
table: self.table.clone(),
Expand All @@ -128,10 +128,9 @@ impl<P, E: Clone, T: Clone> Clone for Router<P, E, T> {
}
}

impl<P: ProvideRuntimeApi<Block> + Send + Sync + 'static, E, T> Router<P, E, T> where
impl<P: ProvideRuntimeApi<Block> + Send + Sync + 'static, T> Router<P, T> where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
T: Clone + Executor + Send + 'static,
E: Future<Output=()> + Clone + Send + Unpin + 'static,
{
/// Import a statement whose signature has been checked already.
pub(crate) fn import_statement(&self, statement: SignedStatement) {
Expand Down Expand Up @@ -176,8 +175,7 @@ impl<P: ProvideRuntimeApi<Block> + Send + Sync + 'static, E, T> Router<P, E, T>
if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
trace!(target: "validation", "driving statement work to completion");

let work = select(work.boxed(), self.fetcher.exit().clone())
.map(drop);
let work = work.boxed().map(drop);
let _ = self.fetcher.executor().spawn(work);
}
}
Expand Down Expand Up @@ -226,10 +224,9 @@ impl<P: ProvideRuntimeApi<Block> + Send + Sync + 'static, E, T> Router<P, E, T>
}
}

impl<P: ProvideRuntimeApi<Block> + Send, E, T> TableRouter for Router<P, E, T> where
impl<P: ProvideRuntimeApi<Block> + Send, T> TableRouter for Router<P, T> where
P::Api: ParachainHost<Block>,
T: Clone + Executor + Send + 'static,
E: Future<Output=()> + Clone + Send + 'static,
{
type Error = io::Error;
type FetchValidationProof = Pin<Box<dyn Future<Output = Result<PoVBlock, io::Error>> + Send>>;
Expand Down Expand Up @@ -283,7 +280,7 @@ impl<P: ProvideRuntimeApi<Block> + Send, E, T> TableRouter for Router<P, E, T> w
}
}

impl<P, E, T> Drop for Router<P, E, T> {
impl<P, T> Drop for Router<P, T> {
fn drop(&mut self) {
let parent_hash = self.parent_hash();
self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); });
Expand Down
Loading