Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

pub mod chain_spec;

use futures01::sync::mpsc;
use futures::{FutureExt, TryFutureExt, task::{Spawn, SpawnError, FutureObj}};
use client::LongestChain;
use std::sync::Arc;
Expand Down Expand Up @@ -276,8 +275,12 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(config: Configuration)
Dispatch: NativeExecutionDispatch + 'static,
Extrinsic: RuntimeExtrinsic,
{
use sc_network::DhtEvent;
use futures::{compat::Stream01CompatExt, stream::StreamExt};
use sc_network::Event;
use futures01::Stream;
use futures::{
compat::Stream01CompatExt,
stream::StreamExt,
};

let is_collator = config.custom.collating_for.is_some();
let is_authority = config.roles.is_authority() && !is_collator;
Expand All @@ -301,19 +304,11 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(config: Configuration)

let (builder, mut import_setup, inherent_data_providers) = new_full_start!(config, Runtime, Dispatch);

// Dht event channel from the network to the authority discovery module. Use
// bounded channel to ensure back-pressure. Authority discovery is triggering one
// event per authority within the current authority set. This estimates the
// authority set size to be somewhere below 10 000 thereby setting the channel
// buffer size to 10 000.
let (dht_event_tx, dht_event_rx) = mpsc::channel::<DhtEvent>(10000);

let service = builder
.with_network_protocol(|config| Ok(PolkadotProtocol::new(config.custom.collating_for.clone())))?
.with_finality_proof_provider(|client, backend|
Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, client)) as _)
)?
.with_dht_event_tx(dht_event_tx)?
.build()?;

let (block_import, link_half, babe_link) = import_setup.take()
Expand Down Expand Up @@ -437,15 +432,20 @@ pub fn new_full<Runtime, Dispatch, Extrinsic>(config: Configuration)
service.spawn_essential_task(babe);

if authority_discovery_enabled {
let future03_dht_event_rx = dht_event_rx.compat()
let network = service.network();
let dht_event_stream = network.event_stream().filter_map(|e| match e {
Event::Dht(e) => Some(e),
_ => None,
});
let future03_dht_event_stream = dht_event_stream.compat()
.map(|x| x.expect("<mpsc::channel::Receiver as Stream> never returns an error; qed"))
.boxed();
let authority_discovery = authority_discovery::AuthorityDiscovery::new(
service.client(),
service.network(),
network,
sentry_nodes,
service.keystore(),
future03_dht_event_rx,
future03_dht_event_stream,
);
let future01_authority_discovery = authority_discovery.map(|x| Ok(x)).compat();

Expand Down