diff --git a/frame/staking/src/lib.rs b/frame/staking/src/lib.rs index 3ea66e937e83c..43a2abcf6ddbc 100644 --- a/frame/staking/src/lib.rs +++ b/frame/staking/src/lib.rs @@ -345,7 +345,7 @@ macro_rules! log { ($level:tt, $patter:expr $(, $values:expr)* $(,)?) => { frame_support::debug::$level!( target: crate::LOG_TARGET, - $patter $(, $values)* + concat!("💸 ", $patter) $(, $values)* ) }; } @@ -1345,14 +1345,14 @@ decl_module! { ElectionStatus::::Open(now) ); add_weight(0, 1, 0); - log!(info, "💸 Election window is Open({:?}). Snapshot created", now); + log!(info, "Election window is Open({:?}). Snapshot created", now); } else { - log!(warn, "💸 Failed to create snapshot at {:?}.", now); + log!(warn, "Failed to create snapshot at {:?}.", now); } } } } else { - log!(warn, "💸 Estimating next session change failed."); + log!(warn, "Estimating next session change failed."); } add_weight(0, 0, T::NextNewSession::weight(now)) } @@ -1363,22 +1363,48 @@ decl_module! { consumed_weight } - /// Check if the current block number is the one at which the election window has been set - /// to open. If so, it runs the offchain worker code. + /// Offchain worker logic. fn offchain_worker(now: T::BlockNumber) { - use offchain_election::{set_check_offchain_execution_status, compute_offchain_election}; + use offchain_election::{ + set_check_offchain_execution_status, compute_save_and_submit, + restore_or_compute_then_submit, OFFCHAIN_REPEAT, + }; + // ensure that we don't run OCW in any case more at least with 5 blocks delay. + let threshold: T::BlockNumber = OFFCHAIN_REPEAT.into(); - if Self::era_election_status().is_open_at(now) { - let offchain_status = set_check_offchain_execution_status::(now); - if let Err(why) = offchain_status { - log!(warn, "💸 skipping offchain worker in open election window due to [{}]", why); - } else { - if let Err(e) = compute_offchain_election::() { - log!(error, "💸 Error in election offchain worker: {:?}", e); - } else { - log!(debug, "💸 Executed offchain worker thread without errors."); + let election_status = Self::era_election_status(); + + log!( + trace, + "ocw at {:?}, election status = {:?}, queued_score = {:?}, offchain solution: {:?}", + now, + election_status, + Self::queued_score(), + offchain_election::get_solution::() + .map(|call| ( + offchain_election::ensure_solution_is_recent(call.clone()).is_ok(), + call.encode().len(), + )) + ); + match Self::era_election_status() { + ElectionStatus::Open(opened) if opened == now => { + // If era election status is open at the current block, mine a new solution + // then save and submit it. + let initial_output = set_check_offchain_execution_status::(now, threshold) + .and_then(|_| compute_save_and_submit::()); + log!(info, "initial OCW output at {:?} = {:?}", now, initial_output); + }, + ElectionStatus::Open(opened) if now > opened => { + if !QueuedScore::exists() { + // If the election window is open, and we don't have a queued solution, + // constantly try to challenge it by either resubmitting a saved solution, + // or mining a new one (just in the case that the previous was skipped). + let resubmit_output = set_check_offchain_execution_status::(now, threshold) + .and_then(|_| restore_or_compute_then_submit::()); + log!(info, "resubmit OCW output at {:?} = {:?}", now, resubmit_output); } - } + }, + _ => {} } } @@ -2306,7 +2332,7 @@ impl Module { { log!( warn, - "💸 Snapshot size too big [{} <> {}][{} <> {}].", + "Snapshot size too big [{} <> {}][{} <> {}].", num_validators, MAX_VALIDATORS, num_nominators, @@ -2626,7 +2652,7 @@ impl Module { validator_at, ).map_err(|e| { // log the error since it is not propagated into the runtime error. - log!(warn, "💸 un-compacting solution failed due to {:?}", e); + log!(warn, "un-compacting solution failed due to {:?}", e); Error::::OffchainElectionBogusCompact })?; @@ -2641,7 +2667,7 @@ impl Module { // all of the indices must map to either a validator or a nominator. If this is ever // not the case, then the locking system of staking is most likely faulty, or we // have bigger problems. - log!(error, "💸 detected an error in the staking locking and snapshot."); + log!(error, "detected an error in the staking locking and snapshot."); // abort. return Err(Error::::OffchainElectionBogusNominator.into()); } @@ -2701,7 +2727,8 @@ impl Module { let exposures = Self::collect_exposure(supports); log!( info, - "💸 A better solution (with compute {:?} and score {:?}) has been validated and stored on chain.", + "A better solution (with compute {:?} and score {:?}) has been validated and stored \ + on chain.", compute, submitted_score, ); @@ -2901,7 +2928,7 @@ impl Module { log!( info, - "💸 new validator set of size {:?} has been elected via {:?} for era {:?}", + "new validator set of size {:?} has been elected via {:?} for era {:?}", elected_stashes.len(), compute, current_era, @@ -2957,7 +2984,7 @@ impl Module { .map_err(|_| log!( error, - "💸 on-chain phragmen is failing due to a problem in the result. This must be a bug." + "on-chain phragmen is failing due to a problem in the result. This must be a bug." ) ) .ok()?; @@ -3025,7 +3052,7 @@ impl Module { // If we don't have enough candidates, nothing to do. log!( warn, - "💸 Chain does not have enough staking candidates to operate. Era {:?}.", + "Chain does not have enough staking candidates to operate. Era {:?}.", Self::current_era() ); None @@ -3482,13 +3509,13 @@ impl frame_support::unsigned::ValidateUnsigned for Module { let invalid = to_invalid(error_with_post_info); log!( debug, - "💸 validate unsigned pre dispatch checks failed due to error #{:?}.", + "validate unsigned pre dispatch checks failed due to error #{:?}.", invalid, ); return invalid.into(); } - log!(debug, "💸 validateUnsigned succeeded for a solution at era {}.", era); + log!(debug, "validateUnsigned succeeded for a solution at era {}.", era); ValidTransaction::with_tag_prefix("StakingOffchain") // The higher the score[0], the better a solution is. diff --git a/frame/staking/src/offchain_election.rs b/frame/staking/src/offchain_election.rs index 4f80d75086e7e..32b940715a9ab 100644 --- a/frame/staking/src/offchain_election.rs +++ b/frame/staking/src/offchain_election.rs @@ -29,18 +29,17 @@ use sp_npos_elections::{ ExtendedBalance, CompactSolution, }; use sp_runtime::{ - offchain::storage::StorageValueRef, traits::TrailingZeroInput, RuntimeDebug, + offchain::storage::StorageValueRef, traits::TrailingZeroInput, }; -use sp_std::{convert::TryInto, prelude::*}; +use sp_std::{convert::TryInto, prelude::*, fmt::Debug}; /// Error types related to the offchain election machinery. -#[derive(RuntimeDebug)] +#[derive(Debug)] +#[cfg_attr(feature = "std", derive(PartialEq, Eq))] pub enum OffchainElectionError { /// election returned None. This means less candidate that minimum number of needed /// validators were present. The chain is in trouble and not much that we can do about it. ElectionFailed, - /// Submission to the transaction pool failed. - PoolSubmissionFailed, /// The snapshot data is not available. SnapshotUnavailable, /// Error from npos-election crate. This usually relates to compact operation. @@ -49,6 +48,20 @@ pub enum OffchainElectionError { InvalidWinner, /// A nominator is not available in the snapshot. NominatorSnapshotCorrupt, + /// Failed to write some data to the offchain worker storage. + DBWriteFailed, + /// An offchain thread was not executed because fork was detected (executed with a block + /// number less than the last record one). + Fork, + /// An offchain thread was not executed because the last executed one is too recent (less than + /// `OFFCHAIN_REPEAT`). + TooRecent, + /// An unreachable state was reached. Should never happen. + Unreachable, + /// Submission to the transaction pool failed. + PoolSubmissionFailed, + /// The stored solution belongs to an old era and cannot be used. + SolutionOld, } impl From for OffchainElectionError { @@ -73,42 +86,109 @@ pub(crate) const DEFAULT_LONGEVITY: u64 = 25; /// Returns `Ok(())` if offchain worker should happen, `Err(reason)` otherwise. pub(crate) fn set_check_offchain_execution_status( now: T::BlockNumber, -) -> Result<(), &'static str> { + threshold: T::BlockNumber, +) -> Result<(), OffchainElectionError> { let storage = StorageValueRef::persistent(&OFFCHAIN_HEAD_DB); - let threshold = T::BlockNumber::from(OFFCHAIN_REPEAT); let mutate_stat = - storage.mutate::<_, &'static str, _>(|maybe_head: Option>| { + storage.mutate(|maybe_head: Option>| { match maybe_head { - Some(Some(head)) if now < head => Err("fork."), + Some(Some(head)) if now < head => Err(OffchainElectionError::Fork), Some(Some(head)) if now >= head && now <= head + threshold => { - Err("recently executed.") + Err(OffchainElectionError::TooRecent) } Some(Some(head)) if now > head + threshold => { - // we can run again now. Write the new head. + // we can allow again now. Write the new head. Ok(now) } _ => { - // value doesn't exists. Probably this node just booted up. Write, and run + // value doesn't exists. Probably this node just booted up. Write, and allow. Ok(now) } } - }); + }); + crate::log!(trace, "attempting to acquire the OCW lock at {:?} = {:?}", now, mutate_stat); match mutate_stat { // all good Ok(Ok(_)) => Ok(()), // failed to write. - Ok(Err(_)) => Err("failed to write to offchain db."), + Ok(Err(_)) => Err(OffchainElectionError::DBWriteFailed), // fork etc. Err(why) => Err(why), } } +/// Storage path for the solution `call`. +pub(crate) const OFFCHAIN_QUEUED_CALL: &[u8] = b"parity/staking-election/call"; + +/// Save a given call OCW storage. +pub(crate) fn save_solution(call: Call) -> Result<(), OffchainElectionError> { + let storage = StorageValueRef::persistent(&OFFCHAIN_QUEUED_CALL); + // in all cases, just write the new value regardless of the the old one, if any. + let set_outcome = storage.mutate::<_, OffchainElectionError, _>(|_| Ok(call)); + + match set_outcome { + Ok(Ok(_)) => Ok(()), + // failed to write. + Ok(Err(_)) => Err(OffchainElectionError::DBWriteFailed), + _ => { + // Defensive only: should not happen. Inner mutate closure always returns ok. + Err(OffchainElectionError::Unreachable) + } + } +} + +/// Get a saved OCW solution, if it exists. +pub(crate) fn get_solution() -> Option> { + StorageValueRef::persistent(&OFFCHAIN_QUEUED_CALL).get().flatten() +} + +/// Submit a given solution as an unsigned transaction. +pub(crate) fn submit_solution(call: Call) -> Result<(), OffchainElectionError> { + SubmitTransaction::>::submit_unsigned_transaction(call.into()) + .map_err(|_| OffchainElectionError::PoolSubmissionFailed) +} + +/// Ensure that the given solution call belongs to the current era. +/// +/// Returns `Ok(call)` if it belongs to the current era. +pub(crate) fn ensure_solution_is_recent( + call: Call, +) -> Result, OffchainElectionError> { + let current_era = >::current_era().unwrap_or_default(); + match call { + Call::submit_election_solution_unsigned(_, _, _, era, _) if era == current_era => Ok(call), + _ => Err(OffchainElectionError::SolutionOld), + } +} + +/// Compute a new solution and save it to the OCW storage. +pub(crate) fn compute_and_save() -> Result, OffchainElectionError> { + let call = compute_offchain_election::()?; + save_solution::(call.clone())?; + Ok(call) +} + +/// Restore an old solution if exist, else compute a new one and save it, finally submit it. +pub(crate) fn restore_or_compute_then_submit() -> Result<(), OffchainElectionError> { + let call = match get_solution::() { + Some(call) => ensure_solution_is_recent(call)?, + None => compute_and_save::()?, + }; + submit_solution::(call) +} + +/// Compute the solution, save it, and submit it. +pub(crate) fn compute_save_and_submit() -> Result<(), OffchainElectionError> { + let call = compute_and_save::()?; + submit_solution::(call) +} + /// The internal logic of the offchain worker of this module. This runs the phragmen election, -/// compacts and reduces the solution, computes the score and submits it back to the chain as an -/// unsigned transaction, without any signature. -pub(crate) fn compute_offchain_election() -> Result<(), OffchainElectionError> { +/// compacts and reduces the solution, computes the score returns a call that can be submitted back +/// to the chain. +pub(crate) fn compute_offchain_election() -> Result, OffchainElectionError> { let iters = get_balancing_iters::(); // compute raw solution. Note that we use `OffchainAccuracy`. let ElectionResult { @@ -127,7 +207,7 @@ pub(crate) fn compute_offchain_election() -> Result<(), OffchainElect crate::log!( info, - "💸 prepared a seq-phragmen solution with {} balancing iterations and score {:?}", + "prepared a seq-phragmen solution with {} balancing iterations and score {:?}", iters, score, ); @@ -135,17 +215,13 @@ pub(crate) fn compute_offchain_election() -> Result<(), OffchainElect // defensive-only: current era can never be none except genesis. let current_era = >::current_era().unwrap_or_default(); - // send it. - let call = Call::submit_election_solution_unsigned( + Ok(Call::submit_election_solution_unsigned( winners, compact, score, current_era, size, - ).into(); - - SubmitTransaction::>::submit_unsigned_transaction(call) - .map_err(|_| OffchainElectionError::PoolSubmissionFailed) + )) } /// Get a random number of iterations to run the balancing. @@ -252,7 +328,7 @@ pub fn maximum_compact_len( /// Thus, we reside to stripping away some voters. This means only changing the `compact` struct. /// /// Note that the solution is already computed, and the winners are elected based on the merit of -/// teh entire stake in the system. Nonetheless, some of the voters will be removed further down the +/// the entire stake in the system. Nonetheless, some of the voters will be removed further down the /// line. /// /// Indeed, the score must be computed **after** this step. If this step reduces the score too much, @@ -284,7 +360,7 @@ where if compact.remove_voter(index) { crate::log!( trace, - "💸 removed a voter at index {} with stake {:?} from compact to reduce the size", + "removed a voter at index {} with stake {:?} from compact to reduce the size", index, _stake, ); @@ -298,7 +374,7 @@ where crate::log!( warn, - "💸 {} nominators out of {} had to be removed from compact solution due to size limits.", + "{} nominators out of {} had to be removed from compact solution due to size limits.", removed, compact.voter_count() + removed, ); @@ -308,7 +384,7 @@ where // nada, return as-is crate::log!( info, - "💸 Compact solution did not get trimmed due to block weight limits.", + "Compact solution did not get trimmed due to block weight limits.", ); Ok(compact) } @@ -390,7 +466,7 @@ pub fn prepare_submission( let maximum_allowed_voters = maximum_compact_len::(winners.len() as u32, size, maximum_weight); - crate::log!(debug, "💸 Maximum weight = {:?} // current weight = {:?} // maximum voters = {:?} // current votes = {:?}", + crate::log!(debug, "Maximum weight = {:?} // current weight = {:?} // maximum voters = {:?} // current votes = {:?}", maximum_weight, T::WeightInfo::submit_solution_better( size.validators.into(), diff --git a/frame/staking/src/tests.rs b/frame/staking/src/tests.rs index 1f5e2a48888a5..5c08cde732379 100644 --- a/frame/staking/src/tests.rs +++ b/frame/staking/src/tests.rs @@ -4137,35 +4137,36 @@ mod offchain_election { ext.execute_with(|| { use offchain_election::OFFCHAIN_HEAD_DB; use sp_runtime::offchain::storage::StorageValueRef; + use crate::offchain_election::{OffchainElectionError, OFFCHAIN_REPEAT}; let storage = StorageValueRef::persistent(&OFFCHAIN_HEAD_DB); run_to_block(12); // first run -- ok assert_eq!( - offchain_election::set_check_offchain_execution_status::(12), + offchain_election::set_check_offchain_execution_status::(12, OFFCHAIN_REPEAT.into()), Ok(()), ); assert_eq!(storage.get::().unwrap().unwrap(), 12); // re-execute after the next. not allowed. assert_eq!( - offchain_election::set_check_offchain_execution_status::(13), - Err("recently executed."), + offchain_election::set_check_offchain_execution_status::(13, OFFCHAIN_REPEAT.into()), + Err(OffchainElectionError::TooRecent), ); // a fork like situation -- re-execute 10, 11, 12. But it won't go through. assert_eq!( - offchain_election::set_check_offchain_execution_status::(10), - Err("fork."), + offchain_election::set_check_offchain_execution_status::(10, OFFCHAIN_REPEAT.into()), + Err(OffchainElectionError::Fork), ); assert_eq!( - offchain_election::set_check_offchain_execution_status::(11), - Err("fork."), + offchain_election::set_check_offchain_execution_status::(11, OFFCHAIN_REPEAT.into()), + Err(OffchainElectionError::Fork), ); assert_eq!( - offchain_election::set_check_offchain_execution_status::(12), - Err("recently executed."), + offchain_election::set_check_offchain_execution_status::(12, OFFCHAIN_REPEAT.into()), + Err(OffchainElectionError::TooRecent), ); }) }