Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 140 additions & 76 deletions ethcore/src/engines/authority_round/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::{cmp, fmt};
use std::iter::FromIterator;
use std::iter::{self, FromIterator};
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicU64, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Weak, Arc};
use std::time::{UNIX_EPOCH, SystemTime, Duration};
use std::u64;

use block::*;
use bytes::Bytes;
Expand All @@ -31,7 +32,7 @@ use engines::{Engine, Seal, SealingState, EngineError, ConstructedVerifier};
use engines::block_reward;
use engines::block_reward::{BlockRewardContract, RewardKind};
use error::{Error, ErrorKind, BlockError};
use ethjson;
use ethjson::{spec::StepDuration};
use machine::{AuxiliaryData, Call, EthereumMachine};
use hash::keccak;
use super::signer::EngineSigner;
Expand Down Expand Up @@ -61,12 +62,13 @@ pub type RandomnessPhaseError = randomness::PhaseError;

/// `AuthorityRound` params.
pub struct AuthorityRoundParams {
/// Time to wait before next block or authority switching,
/// in seconds.
/// A map defining intervals of blocks with the given times (in seconds) to wait before next
/// block or authority switching. The keys in the map are timestamps of starting blocks of those
/// periods. The entry at `0` should be defined.
///
/// Deliberately typed as u16 as too high of a value leads
/// to slow block issuance.
pub step_duration: u16,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep the step_duration parameter as well for backward compatibility.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should still be accepted: It can now either be a number, like before, or a map.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, OK.

/// Wait times (durations) are deliberately typed as `u16` since larger values lead to slow
/// block issuance.
pub step_durations: BTreeMap<u64, u16>,
/// Starting step,
pub start_step: Option<u64>,
/// Valid validators.
Expand Down Expand Up @@ -101,11 +103,24 @@ const U16_MAX: usize = ::std::u16::MAX as usize;

impl From<ethjson::spec::AuthorityRoundParams> for AuthorityRoundParams {
fn from(p: ethjson::spec::AuthorityRoundParams) -> Self {
let mut step_duration_usize: usize = p.step_duration.into();
if step_duration_usize > U16_MAX {
step_duration_usize = U16_MAX;
warn!(target: "engine", "step_duration is too high ({}), setting it to {}", step_duration_usize, U16_MAX);
}
let map_step_duration = |u: ethjson::uint::Uint| {
let mut step_duration_usize: usize = u.into();
if step_duration_usize == 0 {
panic!("AuthorityRoundParams: step duration cannot be 0");
}
if step_duration_usize > U16_MAX {
step_duration_usize = U16_MAX;
warn!(target: "engine", "step duration is too high ({}), setting it to {}", step_duration_usize, U16_MAX);
}
step_duration_usize as u16
};
let step_durations: BTreeMap<u64, u16> = match p.step_duration {
StepDuration::Single(u) => iter::once((0, map_step_duration(u))).collect(),
StepDuration::Transitions(tr) => {
tr.into_iter().map(|(timestamp, u)| (timestamp.into(), map_step_duration(u))).collect()
}
};

let transition_block_num = p.block_reward_contract_transition.map_or(0, Into::into);
let mut br_transitions: BTreeMap<_, _> = p.block_reward_contract_transitions
.unwrap_or_default()
Expand All @@ -130,8 +145,9 @@ impl From<ethjson::spec::AuthorityRoundParams> for AuthorityRoundParams {
BlockRewardContract::new_from_address(address.into())
);
}

AuthorityRoundParams {
step_duration: step_duration_usize as u16,
step_durations,
validators: new_validator_set(p.validators),
start_step: p.start_step.map(Into::into),
validate_score_transition: p.validate_score_transition.map_or(0, Into::into),
Expand All @@ -150,52 +166,93 @@ impl From<ethjson::spec::AuthorityRoundParams> for AuthorityRoundParams {
}
}

// Helper for managing the step.
/// Helper for managing the step.
#[derive(Debug)]
struct Step {
calibrate: bool, // whether calibration is enabled.
inner: AtomicUsize,
duration: u16,
inner: AtomicU64,
/// Planned durations of steps.
durations: BTreeMap<u64, u16>,
}

impl Step {
fn load(&self) -> u64 { self.inner.load(AtomicOrdering::SeqCst) as u64 }
fn load(&self) -> u64 { self.inner.load(AtomicOrdering::SeqCst) }

fn duration_remaining(&self) -> Duration {
self.opt_duration_remaining().unwrap_or_else(|| {
let ctr = self.load();
error!(target: "engine", "Step counter under- or overflow: {}, aborting", ctr);
panic!("step counter under- or overflow: {}", ctr)
})
}

fn opt_duration_remaining(&self) -> Option<Duration> {
let now = unix_now();
let expected_seconds = self.load()
.checked_add(1)
.and_then(|ctr| ctr.checked_mul(self.duration as u64))
.map(Duration::from_secs);

match expected_seconds {
Some(step_end) if step_end > now => step_end - now,
Some(_) => Duration::from_secs(0),
None => {
let ctr = self.load();
error!(target: "engine", "Step counter is too high: {}, aborting", ctr);
panic!("step counter is too high: {}", ctr)
},
let mut prev_dur = u64::from(self.durations[&0]);
let mut prev_step = 0u64;
let mut prev_time = 0u64;
let next_step = self.load().checked_add(1)?;
for (time, dur) in self.durations.iter().skip(1) {
let step_diff = time.checked_add(prev_dur)?.checked_sub(1)?.checked_sub(prev_time)?.checked_div(prev_dur)?;
let step = prev_step.checked_add(step_diff)?;
if step >= next_step {
break;
}
prev_step = step;
prev_time = step_diff.checked_mul(prev_dur)?.checked_add(prev_time)?;
prev_dur = u64::from(*dur);
}
let time = prev_time.checked_add(next_step.checked_sub(prev_step)?.checked_mul(prev_dur)?)?;
let step_end = Duration::from_secs(time);
if step_end > now {
Some(step_end - now)
} else {
Some(Duration::from_secs(0))
}

}

/// Increments the step number.
///
/// Panics if the new step number is `u64::MAX`.
fn increment(&self) {
use std::usize;
// fetch_add won't panic on overflow but will rather wrap
// around, leading to zero as the step counter, which might
// lead to unexpected situations, so it's better to shut down.
if self.inner.fetch_add(1, AtomicOrdering::SeqCst) == usize::MAX {
error!(target: "engine", "Step counter is too high: {}, aborting", usize::MAX);
panic!("step counter is too high: {}", usize::MAX);
if self.inner.fetch_add(1, AtomicOrdering::SeqCst) == u64::MAX {
error!(target: "engine", "Step counter is too high: {}, aborting", u64::MAX);
panic!("step counter is too high: {}", u64::MAX);
}

}

fn calibrate(&self) {
if self.calibrate {
let new_step = unix_now().as_secs() / (self.duration as u64);
self.inner.store(new_step as usize, AtomicOrdering::SeqCst);
if self.opt_calibrate().is_none() {
let ctr = self.load();
error!(target: "engine", "Step counter under- or overflow: {}, aborting", ctr);
panic!("step counter under- or overflow: {}", ctr)
}
}
}

fn opt_calibrate(&self) -> Option<()> {
let now = unix_now().as_secs();
let mut prev_dur = u64::from(self.durations[&0]);
let mut prev_step = 0u64;
let mut prev_time = 0u64;
for (time, dur) in self.durations.range(..now).skip(1) {
let step_diff = time.checked_add(prev_dur)?.checked_sub(1)?.checked_sub(prev_time)?.checked_div(prev_dur)?;
let step = prev_step.checked_add(step_diff)?;
let next_time = step_diff.checked_mul(prev_dur)?.checked_add(prev_time)?;
if next_time >= now {
break;
}
prev_time = next_time;
prev_step = step;
prev_dur = u64::from(*dur);
}
let new_step = (now.checked_sub(prev_time)? / prev_dur).checked_add(prev_step)?;
self.inner.store(new_step, AtomicOrdering::SeqCst);
Some(())
}

fn check_future(&self, given: u64) -> Result<(), Option<OutOfBounds<u64>>> {
Expand All @@ -215,7 +272,7 @@ impl Step {
Err(None)
// wait a bit for blocks in near future
} else if given > current {
let d = self.duration as u64;
let d = *self.durations.range(..=current).last().expect("Duration map has at least a 0 entry.").1 as u64;
Err(Some(OutOfBounds {
min: None,
max: Some(d * current),
Expand Down Expand Up @@ -700,23 +757,25 @@ impl<'a, A: ?Sized, B> Deref for CowLike<'a, A, B> where B: AsRef<A> {
impl AuthorityRound {
/// Create a new instance of AuthorityRound engine.
pub fn new(our_params: AuthorityRoundParams, machine: EthereumMachine) -> Result<Arc<Self>, Error> {
if our_params.step_duration == 0 {
error!(target: "engine", "Authority Round step duration can't be zero, aborting");
panic!("authority_round: step duration can't be zero")
if !our_params.step_durations.contains_key(&0) {
error!(target: "engine", "Authority Round step 0 duration is undefined, aborting");
panic!("authority_round: step 0 duration is undefined")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"step 0" -> "timestamp 0"

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure; timestamp 0 is always also step 0, so this is the duration of step 0.

}
if our_params.step_durations.values().any(|v| *v == 0) {
panic!("authority_round: step duration cannot be 0");
}
let should_timeout = our_params.start_step.is_none();
let initial_step = our_params.start_step.unwrap_or_else(|| (unix_now().as_secs() / (our_params.step_duration as u64)));
let initial_step = our_params.start_step.unwrap_or(0);
let step = Step {
inner: AtomicU64::new(initial_step),
calibrate: our_params.start_step.is_none(),
durations: our_params.step_durations.clone(),
};
step.calibrate();
let engine = Arc::new(
AuthorityRound {
transition_service: IoService::<()>::start()?,
step: Arc::new(PermissionedStep {
inner: Step {
inner: AtomicUsize::new(initial_step as usize),
calibrate: our_params.start_step.is_none(),
duration: our_params.step_duration,
},
can_propose: AtomicBool::new(true),
}),
step: Arc::new(PermissionedStep { inner: step, can_propose: AtomicBool::new(true) }),
client: Arc::new(RwLock::new(None)),
signer: RwLock::new(None),
validators: our_params.validators,
Expand Down Expand Up @@ -956,8 +1015,10 @@ impl IoHandler<()> for TransitionHandler {
}
}

let next_run_at = AsMillis::as_millis(&self.step.inner.duration_remaining()) >> 2;
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at))
let next_run_at = Duration::from_millis(
AsMillis::as_millis(&self.step.inner.duration_remaining()) >> 2
);
io.register_timer_once(ENGINE_TIMEOUT_TOKEN, next_run_at)
.unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e))
}
}
Expand Down Expand Up @@ -1253,7 +1314,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
self.validators.on_epoch_begin(first, &header, &mut call)
}

/// Apply the block reward on finalisation of the block.
/// Applies the block reward on finalisation of the block.
fn on_close_block(&self, block: &mut ExecutedBlock) -> Result<(), Error> {
let mut beneficiaries = Vec::new();

Expand Down Expand Up @@ -1318,7 +1379,12 @@ impl Engine<EthereumMachine> for AuthorityRound {
// Genesis is never a new block, but might as well check.
let header = block.header().clone();
let first = header.number() == 0;

let opt_signer = self.signer.read();
let signer = match opt_signer.as_ref() {
Some(signer) => signer,
None => return Ok(Vec::new()), // We are not a validator, so we shouldn't call the contracts.
};
let our_addr = signer.address();
let client = self.client.read().as_ref().and_then(|weak| weak.upgrade()).ok_or_else(|| {
debug!(target: "engine", "Unable to prepare block: missing client ref.");
EngineError::RequiresClient
Expand All @@ -1331,14 +1397,8 @@ impl Engine<EthereumMachine> for AuthorityRound {
full_client.call_contract(BlockId::Latest, to, data).map_err(|e| format!("{}", e))
};

let opt_signer = self.signer.read();
let signer = match opt_signer.as_ref() {
Some(signer) => signer,
None => return Ok(Vec::new()), // We are not a validator, so we shouldn't call the contracts.
};

// Our current account nonce. The transactions must have consecutive nonces, starting with this one.
let mut tx_nonce = block.state.nonce(&signer.address())?;
let mut tx_nonce = block.state.nonce(&our_addr)?;
let mut transactions = Vec::new();

// Creates and signs a transaction with the given contract call.
Expand All @@ -1352,7 +1412,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
if let Some(contract_addr) = self.randomness_contract_address {
let mut contract = util::BoundContract::bind(&*client, BlockId::Latest, contract_addr);
// TODO: How should these errors be handled?
let phase = randomness::RandomnessPhase::load(&contract, signer.address())
let phase = randomness::RandomnessPhase::load(&contract, our_addr)
.map_err(EngineError::RandomnessLoadError)?;
let mut rng = ::rand::OsRng::new()?;
if let Some(data) = phase.advance(&contract, &mut rng, signer.as_ref())
Expand Down Expand Up @@ -1703,7 +1763,7 @@ impl Engine<EthereumMachine> for AuthorityRound {
mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
use hash::keccak;
use accounts::AccountProvider;
use ethereum_types::{Address, H520, H256, U256};
Expand All @@ -1726,7 +1786,7 @@ mod tests {
F: FnOnce(&mut AuthorityRoundParams),
{
let mut params = AuthorityRoundParams {
step_duration: 1,
step_durations: [(0, 1)].to_vec().into_iter().collect(),
start_step: Some(1),
validators: Box::new(TestSet::default()),
validate_score_transition: 0,
Expand Down Expand Up @@ -2026,31 +2086,35 @@ mod tests {
#[should_panic(expected="counter is too high")]
fn test_counter_increment_too_high() {
use super::Step;
use std::sync::atomic::AtomicU16;

let step = Step {
calibrate: false,
inner: AtomicUsize::new(::std::usize::MAX),
duration: 1,
inner: AtomicU64::new(::std::u64::MAX),
durations: [(0, 1)].to_vec().into_iter().collect(),
};
step.increment();
}

#[test]
#[should_panic(expected="counter is too high")]
#[should_panic(expected="step counter under- or overflow")]
fn test_counter_duration_remaining_too_high() {
use super::Step;
use std::sync::atomic::AtomicU16;

let step = Step {
calibrate: false,
inner: AtomicUsize::new(::std::usize::MAX),
duration: 1,
inner: AtomicU64::new(::std::u64::MAX),
durations: [(0, 1)].to_vec().into_iter().collect(),
};
step.duration_remaining();
}

#[test]
#[should_panic(expected="authority_round: step duration can't be zero")]
#[should_panic(expected="authority_round: step duration cannot be 0")]
fn test_step_duration_zero() {
aura(|params| {
params.step_duration = 0;
params.step_durations = [(0, 0)].to_vec().into_iter().collect();;
});
}

Expand Down Expand Up @@ -2442,7 +2506,7 @@ mod tests {
#[test]
fn test_empty_steps() {
let engine = aura(|p| {
p.step_duration = 4;
p.step_durations = [(0, 4)].to_vec().into_iter().collect();
p.empty_steps_transition = 0;
p.maximum_empty_steps = 0;
});
Expand Down Expand Up @@ -2476,7 +2540,7 @@ mod tests {
let (_spec, tap, accounts) = setup_empty_steps();
let engine = aura(|p| {
p.validators = Box::new(SimpleList::new(accounts.clone()));
p.step_duration = 4;
p.step_durations = [(0, 4)].to_vec().into_iter().collect();
p.empty_steps_transition = 0;
p.maximum_empty_steps = 0;
});
Expand Down Expand Up @@ -2513,7 +2577,7 @@ mod tests {
let (_spec, tap, accounts) = setup_empty_steps();
let engine = aura(|p| {
p.validators = Box::new(SimpleList::new(accounts.clone()));
p.step_duration = 4;
p.step_durations = [(0, 4)].to_vec().into_iter().collect();
p.empty_steps_transition = 0;
p.maximum_empty_steps = 0;
});
Expand Down
Loading