-
Notifications
You must be signed in to change notification settings - Fork 1.5k
staking miner: Check the queue one last time before submission #4819
Changes from all commits
feb0a7b
633ee73
4c2dbd6
4d9644e
21546b1
00ff014
54f3bf1
ecfe004
5fddbbe
b760a30
1d08111
69a3096
32c877a
9ea9f99
e87406d
1a5d18b
c94aed1
92de8a0
756694a
5488de1
b3b9a58
b3740f5
359a469
fabfd6e
c633e44
2e638d5
5946494
0d8aca7
be2f3fc
51c4140
deeee60
e42aca8
19fc667
75d9698
e3ebf7c
64c4a33
35f26ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,12 +16,16 @@ | |
|
|
||
| //! The monitor command. | ||
|
|
||
| use crate::{prelude::*, rpc::*, signer::Signer, Error, MonitorConfig, SharedRpcClient}; | ||
| use crate::{ | ||
| prelude::*, rpc::*, signer::Signer, Error, MonitorConfig, SharedRpcClient, SubmissionStrategy, | ||
| }; | ||
| use codec::Encode; | ||
| use jsonrpsee::core::Error as RpcError; | ||
| use sc_transaction_pool_api::TransactionStatus; | ||
| use sp_core::storage::StorageKey; | ||
| use sp_runtime::Perbill; | ||
| use tokio::sync::mpsc; | ||
| use EPM::{signed::SubmissionIndicesOf, SignedSubmissionOf}; | ||
|
|
||
| /// Ensure that now is the signed phase. | ||
| async fn ensure_signed_phase<T: EPM::Config, B: BlockT<Hash = Hash>>( | ||
|
|
@@ -43,21 +47,70 @@ async fn ensure_signed_phase<T: EPM::Config, B: BlockT<Hash = Hash>>( | |
| } | ||
|
|
||
| /// Ensure that our current `us` have not submitted anything previously. | ||
| async fn ensure_no_previous_solution< | ||
| T: EPM::Config + frame_system::Config<AccountId = AccountId>, | ||
| B: BlockT, | ||
| >( | ||
| ext: &mut Ext, | ||
| async fn ensure_no_previous_solution<T, B>( | ||
| rpc: &SharedRpcClient, | ||
| at: Hash, | ||
| us: &AccountId, | ||
| ) -> Result<(), Error<T>> | ||
| where | ||
| T: EPM::Config + frame_system::Config<AccountId = AccountId, Hash = Hash>, | ||
| B: BlockT, | ||
| { | ||
| let indices_key = StorageKey(EPM::SignedSubmissionIndices::<T>::hashed_key().to_vec()); | ||
|
|
||
| let indices: SubmissionIndicesOf<T> = rpc | ||
| .get_storage_and_decode(&indices_key, Some(at)) | ||
| .await | ||
| .map_err::<Error<T>, _>(Into::into)? | ||
| .unwrap_or_default(); | ||
|
|
||
| for (_score, idx) in indices { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kinda wanted to rewrite this with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's possible but then we need convert the futures to a stream or batch them together. Something like FuturesOrdered or JSON-RPC batch request might make this is little bit nicer to read. |
||
| let key = StorageKey(EPM::SignedSubmissionsMap::<T>::hashed_key_for(idx)); | ||
|
|
||
| if let Some(submission) = rpc | ||
| .get_storage_and_decode::<SignedSubmissionOf<T>>(&key, Some(at)) | ||
| .await | ||
| .map_err::<Error<T>, _>(Into::into)? | ||
| { | ||
| if &submission.who == us { | ||
| return Err(Error::AlreadySubmitted) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Reads all current solutions and checks the scores according to the `SubmissionStrategy`. | ||
| async fn ensure_no_better_solution<T: EPM::Config, B: BlockT>( | ||
niklasad1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| rpc: &SharedRpcClient, | ||
| at: Hash, | ||
| score: sp_npos_elections::ElectionScore, | ||
| strategy: SubmissionStrategy, | ||
| ) -> Result<(), Error<T>> { | ||
| use EPM::signed::SignedSubmissions; | ||
| ext.execute_with(|| { | ||
| if <SignedSubmissions<T>>::get().iter().any(|ss| &ss.who == us) { | ||
| Err(Error::AlreadySubmitted) | ||
| } else { | ||
| Ok(()) | ||
| let epsilon = match strategy { | ||
emostov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // don't care about current scores. | ||
| SubmissionStrategy::Always => return Ok(()), | ||
| SubmissionStrategy::IfLeading => Perbill::zero(), | ||
| SubmissionStrategy::ClaimBetterThan(epsilon) => epsilon, | ||
| }; | ||
|
|
||
| let indices_key = StorageKey(EPM::SignedSubmissionIndices::<T>::hashed_key().to_vec()); | ||
|
|
||
| let indices: SubmissionIndicesOf<T> = rpc | ||
| .get_storage_and_decode(&indices_key, Some(at)) | ||
| .await | ||
| .map_err::<Error<T>, _>(Into::into)? | ||
| .unwrap_or_default(); | ||
|
|
||
| // BTreeMap is ordered, take last to get the max score. | ||
| if let Some(curr_max_score) = indices.into_iter().last().map(|(s, _)| s) { | ||
emostov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if !score.strict_threshold_better(curr_max_score, epsilon) { | ||
| return Err(Error::StrategyNotSatisfied) | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { | ||
|
|
@@ -131,39 +184,52 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { | |
| config: MonitorConfig, | ||
| ) { | ||
|
|
||
| async fn flatten<T>( | ||
emostov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| handle: tokio::task::JoinHandle<Result<T, StakingMinerError>> | ||
| ) -> Result<T, StakingMinerError> { | ||
| match handle.await { | ||
| Ok(Ok(result)) => Ok(result), | ||
| Ok(Err(err)) => Err(err), | ||
| Err(err) => panic!("tokio spawn task failed; kill task: {:?}", err), | ||
| } | ||
| } | ||
|
|
||
| let hash = at.hash(); | ||
| log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", at.number, hash); | ||
|
|
||
| // if the runtime version has changed, terminate. | ||
| // block on this because if this fails there is no way to recover from | ||
| // that error i.e, upgrade/downgrade required. | ||
| if let Err(err) = crate::check_versions::<Runtime>(&rpc).await { | ||
emostov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let _ = tx.send(err.into()); | ||
| return; | ||
| } | ||
|
|
||
| // we prefer doing this check before fetching anything into a remote-ext. | ||
| if ensure_signed_phase::<Runtime, Block>(&rpc, hash).await.is_err() { | ||
| log::debug!(target: LOG_TARGET, "phase closed, not interested in this block at all."); | ||
| let rpc1 = rpc.clone(); | ||
| let rpc2 = rpc.clone(); | ||
| let account = signer.account.clone(); | ||
|
|
||
| let signed_phase_fut = tokio::spawn(async move { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume we do this here to prevent heavy network usage from I think its fine for now, but in the future I think we should look into only doing it right before submitting. Even if it ends up with a lot of wasted network usage it should be fine as long as we are not executing this function righter before the signed phase, since that is when we want to be really fast to get the solution in. But if there fear is that this will be executing at the end of the sign phase I think its fine because we will have awhile until the next signed phase, so the extra network usage won't slow itself down. (Assuming its just traffic to a node on the local network)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yeah
I'm just afraid that it will flood the JSON-RPC client and it can't keep up with all pending calls because fetching the entire pallet data is quite large (even if the future is dropped the client has to process to responses to check whether it's a valid call). This will occur on every block if we try to do everything in parallel. Thus, I think we could try to make a subscription to the |
||
| ensure_signed_phase::<Runtime, Block>(&rpc1, hash).await | ||
| }); | ||
|
|
||
| let no_prev_sol_fut = tokio::spawn(async move { | ||
| ensure_no_previous_solution::<Runtime, Block>(&rpc2, hash, &account).await | ||
| }); | ||
|
|
||
| // Run the calls in parallel and return once all has completed or any failed. | ||
| if let Err(err) = tokio::try_join!(flatten(signed_phase_fut), flatten(no_prev_sol_fut)) { | ||
| log::debug!(target: LOG_TARGET, "Skipping block {}; {}", at.number, err); | ||
| return; | ||
| } | ||
|
|
||
| // grab an externalities without staking, just the election snapshot. | ||
| let mut ext = match crate::create_election_ext::<Runtime, Block>( | ||
| rpc.clone(), | ||
| Some(hash), | ||
| vec![], | ||
| ).await { | ||
| let mut ext = match crate::create_election_ext::<Runtime, Block>(rpc.clone(), Some(hash), vec![]).await { | ||
| Ok(ext) => ext, | ||
| Err(err) => { | ||
| let _ = tx.send(err); | ||
| log::debug!(target: LOG_TARGET, "Skipping block {}; {}", at.number, err); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| if ensure_no_previous_solution::<Runtime, Block>(&mut ext, &signer.account).await.is_err() { | ||
| log::debug!(target: LOG_TARGET, "We already have a solution in this phase, skipping."); | ||
| return; | ||
| } | ||
|
|
||
| // mine a solution, and run feasibility check on it as well. | ||
| let raw_solution = match crate::mine_with::<Runtime>(&config.solver, &mut ext, true) { | ||
| Ok(r) => r, | ||
|
|
@@ -173,7 +239,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { | |
| } | ||
| }; | ||
|
|
||
| log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score); | ||
| let score = raw_solution.score; | ||
| log::info!(target: LOG_TARGET, "mined solution with {:?}", score); | ||
|
|
||
| let nonce = match crate::get_account_info::<Runtime>(&rpc, &signer.account, Some(hash)).await { | ||
| Ok(maybe_account) => { | ||
|
|
@@ -200,6 +267,25 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { | |
| let extrinsic = ext.execute_with(|| create_uxt(raw_solution, signer.clone(), nonce, tip, era)); | ||
| let bytes = sp_core::Bytes(extrinsic.encode()); | ||
|
|
||
| let rpc1 = rpc.clone(); | ||
| let rpc2 = rpc.clone(); | ||
|
|
||
| let ensure_no_better_fut = tokio::spawn(async move { | ||
| ensure_no_better_solution::<Runtime, Block>(&rpc1, hash, score, config.submission_strategy).await | ||
| }); | ||
|
|
||
| let ensure_signed_phase_fut = tokio::spawn(async move { | ||
| ensure_signed_phase::<Runtime, Block>(&rpc2, hash).await | ||
| }); | ||
|
|
||
| // Run the calls in parallel and return once all has completed or any failed. | ||
| if tokio::try_join!( | ||
| flatten(ensure_no_better_fut), | ||
| flatten(ensure_signed_phase_fut), | ||
| ).is_err() { | ||
| return; | ||
| } | ||
|
|
||
| let mut tx_subscription = match rpc.watch_extrinsic(&bytes).await { | ||
| Ok(sub) => sub, | ||
| Err(RpcError::RestartNeeded(e)) => { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.