Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ substrate-client = { git = "https://github.com/paritytech/substrate" }
substrate-primitives = { git = "https://github.com/paritytech/substrate" }
substrate-executor = { git = "https://github.com/paritytech/substrate" }
substrate-state-machine = { git = "https://github.com/paritytech/substrate" }
log = "0.3"

[dev-dependencies]
substrate-keyring = { git = "https://github.com/paritytech/substrate" }
41 changes: 30 additions & 11 deletions api/src/full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,33 @@ impl<B: LocalBackend<Block, KeccakHasher, RlpCodec>> PolkadotApi for Client<B, L

fn evaluate_block(&self, at: &BlockId, block: Block) -> Result<bool> {
use substrate_executor::error::ErrorKind as ExecErrorKind;
use codec::{Decode, Encode};
use runtime::Block as RuntimeBlock;
use codec::Encode;
use state_machine::ExecutionManager;
use client::CallExecutor;

let parent = at;
let res = self.state_at(&parent).map_err(Error::from).and_then(|state| {
let mut overlay = Default::default();
let execution_manager = || ExecutionManager::Both(|wasm_result, native_result| {
warn!("Consensus error between wasm and native runtime execution at block {:?}", at);
warn!(" While executing block {:?}", (block.header.number, block.header.hash()));
warn!(" Native result {:?}", native_result);
warn!(" Wasm result {:?}", wasm_result);
wasm_result
});
let (r, _) = self.executor().call_at_state(
&state,
&mut overlay,
"execute_block",
&block.encode(),
execution_manager()
)?;

Ok(r)
});

let encoded = block.encode();
let runtime_block = match RuntimeBlock::decode(&mut &encoded[..]) {
Some(x) => x,
None => return Ok(false),
};

let res = with_runtime!(self, at, || ::runtime::Executive::execute_block(runtime_block));
match res {
Ok(()) => Ok(true),
Ok(_) => Ok(true),
Err(err) => match err.kind() {
&ErrorKind::Executor(ExecErrorKind::Runtime) => Ok(false),
_ => Err(err)
Expand Down Expand Up @@ -148,8 +163,10 @@ impl<B: LocalBackend<Block, KeccakHasher, RlpCodec>> PolkadotApi for Client<B, L
fn inherent_extrinsics(&self, at: &BlockId, inherent_data: InherentData) -> Result<Vec<UncheckedExtrinsic>> {
use codec::{Encode, Decode};

let runtime_version = self.runtime_version_at(at)?;

with_runtime!(self, at, || {
let extrinsics = ::runtime::inherent_extrinsics(inherent_data);
let extrinsics = ::runtime::inherent_extrinsics(inherent_data, runtime_version);
extrinsics.into_iter()
.map(|x| x.encode()) // get encoded representation
.map(|x| Decode::decode(&mut &x[..])) // get byte-vec equivalent to extrinsic
Expand Down Expand Up @@ -229,6 +246,7 @@ mod tests {

assert_eq!(block.header.number, 1);
assert!(block.header.extrinsics_root != Default::default());
assert!(client.evaluate_block(&id, block).unwrap());
}

#[test]
Expand All @@ -251,6 +269,7 @@ mod tests {

assert_eq!(block.header.number, 1);
assert!(block.header.extrinsics_root != Default::default());
assert!(client.evaluate_block(&id, block).unwrap());
}

#[test]
Expand Down
3 changes: 3 additions & 0 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ extern crate substrate_state_machine as state_machine;
#[macro_use]
extern crate error_chain;

#[macro_use]
extern crate log;

#[cfg(test)]
extern crate substrate_keyring as keyring;

Expand Down
82 changes: 56 additions & 26 deletions consensus/src/offline_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ use polkadot_primitives::AccountId;
use std::collections::HashMap;
use std::time::{Instant, Duration};

// time before we report a validator.
const REPORT_TIME: Duration = Duration::from_secs(60 * 5);

struct Observed {
last_round_end: Instant,
offline_since: Instant,
}

#[derive(Eq, PartialEq)]
enum Activity {
Offline,
StillOffline(Duration),
Online,
}

impl Observed {
fn new() -> Observed {
let now = Instant::now();
Expand All @@ -38,31 +42,32 @@ impl Observed {
}
}

fn note_round_end(&mut self, was_online: bool) {
let now = Instant::now();

fn note_round_end(&mut self, now: Instant, was_online: Option<bool>) {
self.last_round_end = now;
if was_online {
if let Some(false) = was_online {
self.offline_since = now;
}
}

fn is_active(&self) -> bool {
/// Returns what we have observed about the online/offline state of the validator.
fn activity(&self) -> Activity {
// can happen if clocks are not monotonic
if self.offline_since > self.last_round_end { return true }
self.last_round_end.duration_since(self.offline_since) < REPORT_TIME
if self.offline_since > self.last_round_end { return Activity::Online }
if self.offline_since == self.last_round_end { return Activity::Offline }
Activity::StillOffline(self.last_round_end.duration_since(self.offline_since))
}
}

/// Tracks offline validators and can issue a report for those offline.
pub struct OfflineTracker {
observed: HashMap<AccountId, Observed>,
block_instant: Instant,
}

impl OfflineTracker {
/// Create a new tracker.
pub fn new() -> Self {
OfflineTracker { observed: HashMap::new() }
OfflineTracker { observed: HashMap::new(), block_instant: Instant::now() }
}

/// Note new consensus is starting with the given set of validators.
Expand All @@ -71,23 +76,33 @@ impl OfflineTracker {

let set: HashSet<_> = validators.iter().cloned().collect();
self.observed.retain(|k, _| set.contains(k));

self.block_instant = Instant::now();
}

/// Note that a round has ended.
pub fn note_round_end(&mut self, validator: AccountId, was_online: bool) {
self.observed.entry(validator)
.or_insert_with(Observed::new)
.note_round_end(was_online);
self.observed.entry(validator).or_insert_with(Observed::new);
for (val, obs) in self.observed.iter_mut() {
obs.note_round_end(
self.block_instant,
if val == &validator {
Some(was_online)
} else {
None
}
)
}
}

/// Generate a vector of indices for offline account IDs.
pub fn reports(&self, validators: &[AccountId]) -> Vec<u32> {
validators.iter()
.enumerate()
.filter_map(|(i, v)| if self.is_online(v) {
None
} else {
.filter_map(|(i, v)| if self.is_known_offline_now(v) {
Some(i as u32)
} else {
None
})
.collect()
}
Expand All @@ -101,13 +116,15 @@ impl OfflineTracker {
};

// we must think all validators reported externally are offline.
let thinks_online = self.is_online(v);
!thinks_online
self.is_known_offline_now(v)
})
}

fn is_online(&self, v: &AccountId) -> bool {
self.observed.get(v).map(Observed::is_active).unwrap_or(true)
/// Rwturns true only if we have seen the validator miss the last round. For further
/// rounds where we can't say for sure that they're still offline, we give them the
/// benefit of the doubt.
fn is_known_offline_now(&self, v: &AccountId) -> bool {
self.observed.get(v).map(|o| o.activity() == Activity::Offline).unwrap_or(false)
}
}

Expand All @@ -121,17 +138,30 @@ mod tests {
let v = [0; 32].into();
let v2 = [1; 32].into();
let v3 = [2; 32].into();
tracker.note_new_block(&[v, v2, v3]);
tracker.note_round_end(v, true);
tracker.note_round_end(v2, true);
tracker.note_round_end(v3, true);
assert_eq!(tracker.reports(&[v, v2, v3]), vec![0u32; 0]);

tracker.note_new_block(&[v, v2, v3]);
tracker.note_round_end(v, true);
tracker.note_round_end(v2, false);
tracker.note_round_end(v3, true);
assert_eq!(tracker.reports(&[v, v2, v3]), vec![1]);

let slash_time = REPORT_TIME + Duration::from_secs(5);
tracker.observed.get_mut(&v).unwrap().offline_since -= slash_time;
tracker.observed.get_mut(&v2).unwrap().offline_since -= slash_time;
tracker.note_new_block(&[v, v2, v3]);
tracker.note_round_end(v, false);
assert_eq!(tracker.reports(&[v, v2, v3]), vec![0]);

assert_eq!(tracker.reports(&[v, v2, v3]), vec![0, 1]);
tracker.note_new_block(&[v, v2, v3]);
tracker.note_round_end(v, false);
tracker.note_round_end(v2, true);
tracker.note_round_end(v3, false);
assert_eq!(tracker.reports(&[v, v2, v3]), vec![0, 2]);

tracker.note_new_block(&[v, v3]);
tracker.note_new_block(&[v, v2]);
tracker.note_round_end(v, false);
assert_eq!(tracker.reports(&[v, v2, v3]), vec![0]);
}
}
51 changes: 22 additions & 29 deletions consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,26 @@ fn start_bft<F, C>(
const DELAY_UNTIL: Duration = Duration::from_millis(5000);

let mut handle = LocalThreadHandle::current();
let work = Delay::new(Instant::now() + DELAY_UNTIL)
.then(move |res| {
if let Err(e) = res {
warn!(target: "bft", "Failed to force delay of consensus: {:?}", e);
}
match bft_service.build_upon(&header) {
Ok(Some(bft_work)) => {
// do not poll work for some amount of time.
let work = Delay::new(Instant::now() + DELAY_UNTIL).then(move |res| {
if let Err(e) = res {
warn!(target: "bft", "Failed to force delay of consensus: {:?}", e);
}

match bft_service.build_upon(&header) {
Ok(maybe_bft_work) => {
if maybe_bft_work.is_some() {
debug!(target: "bft", "Starting agreement. After forced delay for {:?}",
DELAY_UNTIL);
}
debug!(target: "bft", "Starting agreement. After forced delay for {:?}",
DELAY_UNTIL);

maybe_bft_work
}
Err(e) => {
warn!(target: "bft", "BFT agreement error: {}", e);
None
}
bft_work
});
if let Err(e) = handle.spawn_local(Box::new(work)) {
warn!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
}
})
.map(|_| ());

if let Err(e) = handle.spawn_local(Box::new(work)) {
debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e);
}
}
Ok(None) => trace!(target: "bft", "Could not start agreement on top of {}", header.hash()),
Err(e) => warn!(target: "bft", "BFT agreement error: {}", e),
}
}

// creates a task to prune redundant entries in availability store upon block finalization
Expand Down Expand Up @@ -198,6 +192,7 @@ impl Service {

client.import_notification_stream().for_each(move |notification| {
if notification.is_new_best {
trace!(target: "bft", "Attempting to start new consensus round after import notification of {:?}", notification.hash);
start_bft(notification.header, bft_service.clone());
}
Ok(())
Expand All @@ -221,14 +216,12 @@ impl Service {
let c = client.clone();
let s = bft_service.clone();

interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
interval.map_err(|e| debug!(target: "bft", "Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() {
let hash = best_block.hash();
let last_agreement = s.last_agreement();
let can_build_upon = last_agreement
.map_or(true, |x| !x.live || x.parent_hash != hash);
if hash == prev_best && can_build_upon {
debug!("Starting consensus round after a timeout");

if hash == prev_best {
debug!(target: "bft", "Starting consensus round after a timeout");
start_bft(best_block, s.clone());
}
prev_best = hash;
Expand Down
2 changes: 1 addition & 1 deletion primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,4 @@ pub struct InherentData {
pub parachain_heads: Vec<::parachain::CandidateReceipt>,
/// Indices of offline validators.
pub offline_indices: Vec<u32>,
}
}
2 changes: 1 addition & 1 deletion runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ pub mod api {
apply_extrinsic => |extrinsic| super::Executive::apply_extrinsic(extrinsic),
execute_block => |block| super::Executive::execute_block(block),
finalise_block => |()| super::Executive::finalise_block(),
inherent_extrinsics => |inherent| super::inherent_extrinsics(inherent),
inherent_extrinsics => |(inherent, version)| super::inherent_extrinsics(inherent, version),
validator_count => |()| super::Session::validator_count(),
validators => |()| super::Session::validators()
);
Expand Down
5 changes: 3 additions & 2 deletions runtime/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use runtime_primitives::traits::{Checkable, AuxLookup};
use timestamp::Call as TimestampCall;
use parachains::Call as ParachainsCall;
use session::Call as SessionCall;
use version::RuntimeVersion;

/// Produces the list of inherent extrinsics.
pub fn inherent_extrinsics(data: ::primitives::InherentData) -> Vec<UncheckedExtrinsic> {
pub fn inherent_extrinsics(data: ::primitives::InherentData, runtime_version: RuntimeVersion) -> Vec<UncheckedExtrinsic> {
let make_inherent = |function| UncheckedExtrinsic::new(
Extrinsic {
signed: Default::default(),
Expand All @@ -39,7 +40,7 @@ pub fn inherent_extrinsics(data: ::primitives::InherentData) -> Vec<UncheckedExt
make_inherent(Call::Parachains(ParachainsCall::set_heads(data.parachain_heads))),
];

if !data.offline_indices.is_empty() {
if !data.offline_indices.is_empty() && runtime_version.spec_version == 4 {
inherent.push(make_inherent(
Call::Session(SessionCall::note_offline(data.offline_indices))
));
Expand Down