Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ check-web-wasm:
- time cargo web build -p substrate-trie
- sccache -s

node-exits:
stage: test
<<: *docker-env
except:
- /^v[0-9]+\.[0-9]+.*$/ # i.e. v1.0, v2.1rc1
script:
- ./ci/check_for_exit.sh

#### stage: build

Expand Down
16 changes: 16 additions & 0 deletions ci/check_for_exit.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bash

# Script that checks that a node exits after `SIGINT` was send.

set -e

cargo build --release
./target/release/substrate --dev &
PID=$!

# Let the chain running for 60 seconds
sleep 60

# Send `SIGINT` and give the process 30 seconds to end
kill -INT $PID
timeout 30 tail --pid=$PID -f /dev/null
17 changes: 11 additions & 6 deletions core/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ pub type TaskExecutor = Arc<dyn Executor<Box<dyn Future<Item = (), Error = ()> +
#[derive(Clone)]
pub struct SpawnTaskHandle {
sender: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>,
on_exit: exit_future::Exit,
}

impl Executor<Box<dyn Future<Item = (), Error = ()> + Send>> for SpawnTaskHandle {
fn execute(
&self,
future: Box<dyn Future<Item = (), Error = ()> + Send>
future: Box<dyn Future<Item = (), Error = ()> + Send>,
) -> Result<(), futures::future::ExecuteError<Box<dyn Future<Item = (), Error = ()> + Send>>> {
let future = Box::new(future.select(self.on_exit.clone()).then(|_| Ok(())));
if let Err(err) = self.sender.unbounded_send(future) {
let kind = futures::future::ExecuteErrorKind::Shutdown;
Err(futures::future::ExecuteError::new(kind, err.into_inner()))
Expand Down Expand Up @@ -350,7 +352,7 @@ macro_rules! new_impl {
//light_components.clone(),
system_rpc_tx.clone(),
system_info.clone(),
Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone() }),
Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone(), on_exit: exit.clone() }),
transaction_pool.clone(),
rpc_extensions.clone(),
keystore.clone(),
Expand Down Expand Up @@ -544,22 +546,25 @@ where
}

fn spawn_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) {
let task = task.select(self.on_exit()).then(|_| Ok(()));
let _ = self.to_spawn_tx.unbounded_send(Box::new(task));
}

fn spawn_essential_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) {
let essential_failed = self.essential_failed.clone();
let essential_task = Box::new(task.map_err(move |_| {
let essential_task = task.map_err(move |_| {
error!("Essential task failed. Shutting down service.");
essential_failed.store(true, Ordering::Relaxed);
}));
});
let task = essential_task.select(self.on_exit()).then(|_| Ok(()));

let _ = self.to_spawn_tx.unbounded_send(essential_task);
let _ = self.to_spawn_tx.unbounded_send(Box::new(task));
}

fn spawn_task_handle(&self) -> SpawnTaskHandle {
SpawnTaskHandle {
sender: self.to_spawn_tx.clone(),
on_exit: self.on_exit(),
}
}

Expand Down Expand Up @@ -589,7 +594,7 @@ where
self.transaction_pool.clone()
}

fn on_exit(&self) -> ::exit_future::Exit {
fn on_exit(&self) -> exit_future::Exit {
self.exit.clone()
}
}
Expand Down
16 changes: 4 additions & 12 deletions node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ macro_rules! new_full_start {
/// concrete types instead.
macro_rules! new_full {
($config:expr) => {{
use futures::Future;
use futures::sync::mpsc;
use network::DhtEvent;

Expand All @@ -118,7 +117,7 @@ macro_rules! new_full {
$config.disable_grandpa
);

let (builder, mut import_setup, inherent_data_providers, mut tasks_to_spawn) = new_full_start!($config);
let (builder, mut import_setup, inherent_data_providers, tasks_to_spawn) = new_full_start!($config);

// Dht event channel from the network to the authority discovery module. Use bounded channel to ensure
// back-pressure. Authority discovery is triggering one event per authority within the current authority set.
Expand All @@ -138,13 +137,7 @@ macro_rules! new_full {
.expect("Link Half and Block Import are present for Full Services or setup failed before. qed");

// spawn any futures that were created in the previous setup steps
for task in tasks_to_spawn.drain(..) {
service.spawn_task(
task.select(service.on_exit())
.map(|_| ())
.map_err(|_| ())
);
}
tasks_to_spawn.into_iter().for_each(|t| service.spawn_task(t));

if is_authority {
let proposer = substrate_basic_authorship::ProposerFactory {
Expand All @@ -170,15 +163,14 @@ macro_rules! new_full {
};

let babe = babe::start_babe(babe_config)?;
let select = babe.select(service.on_exit()).then(|_| Ok(()));
service.spawn_task(Box::new(select));
service.spawn_essential_task(babe);

let authority_discovery = authority_discovery::AuthorityDiscovery::new(
service.client(),
service.network(),
dht_event_rx,
);
service.spawn_task(Box::new(authority_discovery));
service.spawn_task(authority_discovery);
}

let config = grandpa::Config {
Expand Down