From feb0a7b505dc30023cb17ef58d47a8130f1fb65a Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 11 Jan 2022 14:51:54 +0100 Subject: [PATCH 01/10] staking miner: use config for emergency solution Fixes #4678 --- utils/staking-miner/src/emergency_solution.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/utils/staking-miner/src/emergency_solution.rs b/utils/staking-miner/src/emergency_solution.rs index a3847825f5d8..43bfc389cca9 100644 --- a/utils/staking-miner/src/emergency_solution.rs +++ b/utils/staking-miner/src/emergency_solution.rs @@ -18,7 +18,6 @@ use crate::{prelude::*, EmergencySolutionConfig, Error, SharedConfig}; use codec::Encode; -use frame_election_provider_support::SequentialPhragmen; use std::io::Write; macro_rules! emergency_solution_cmd_for { ($runtime:ident) => { paste::paste! { @@ -28,15 +27,15 @@ macro_rules! emergency_solution_cmd_for { ($runtime:ident) => { paste::paste! { config: EmergencySolutionConfig, ) -> Result<(), Error<$crate::[<$runtime _runtime_exports>]::Runtime>> { use $crate::[<$runtime _runtime_exports>]::*; - let mut ext = crate::create_election_ext::(shared.uri.clone(), None, vec![]).await?; + + let mut ext = crate::create_election_ext::(shared.uri.clone(), config.at, vec![]).await?; + let (raw_solution, _witness) = crate::mine_with::(&config.solver, &mut ext, false)?; + ext.execute_with(|| { assert!(EPM::Pallet::::current_phase().is_emergency()); - // NOTE: this internally calls feasibility_check, but we just re-do it here as an easy way - // to get a `ReadySolution`. - let (raw_solution, _) = - >::mine_solution::>()?; log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score); + let mut ready_solution = EPM::Pallet::::feasibility_check(raw_solution, EPM::ElectionCompute::Signed)?; // maybe truncate. From 633ee732f47b65a31b037c73bd54b394e042a10e Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 11 Jan 2022 14:54:00 +0100 Subject: [PATCH 02/10] bump jsonrpsee --- Cargo.lock | 91 ++++++++++++++++++++++++-- utils/staking-miner/Cargo.toml | 2 +- utils/staking-miner/src/main.rs | 4 +- utils/staking-miner/src/monitor.rs | 39 +++++++++-- utils/staking-miner/src/rpc_helpers.rs | 6 +- 5 files changed, 125 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e8341c1bcc79..51fb471535d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3018,9 +3018,61 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6373a33d987866ccfe1af4bc11b089dce941764313f9fd8b7cf13fcb51b72dc5" dependencies = [ "jsonrpsee-proc-macros", - "jsonrpsee-types", + "jsonrpsee-types 0.4.1", "jsonrpsee-utils", - "jsonrpsee-ws-client", + "jsonrpsee-ws-client 0.4.1", +] + +[[package]] +name = "jsonrpsee" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "726b6cb76e568aefc4cc127fdb39cb9d92c176f4df0385eaf8053f770351719c" +dependencies = [ + "jsonrpsee-core", + "jsonrpsee-types 0.7.0", + "jsonrpsee-ws-client 0.7.0", +] + +[[package]] +name = "jsonrpsee-client-transport" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bc39096d2bd470ecbd5ed96c8464e2b2c2ef7ec6f8cb9611604255608624773" +dependencies = [ + "futures 0.3.18", + "http", + "jsonrpsee-core", + "jsonrpsee-types 0.7.0", + "pin-project 1.0.8", + "soketto", + "thiserror", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "jsonrpsee-core" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b863e5e86a11bfaf46bb3ab5aba184671bd62058e8e3ab741c3395904c7afbf3" +dependencies = [ + "anyhow", + "arrayvec 0.7.2", + "async-trait", + "beef", + "futures-channel", + "futures-util", + "hyper", + "jsonrpsee-types 0.7.0", + "rustc-hash", + "serde", + "serde_json", + "soketto", + "thiserror", + "tokio", + "tracing", ] [[package]] @@ -3055,6 +3107,20 @@ dependencies = [ "thiserror", ] +[[package]] +name = "jsonrpsee-types" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e169725e476234f3f96079fb9d8a6d00226db602d3fa056f044994239a490d78" +dependencies = [ + "anyhow", + "beef", + "serde", + "serde_json", + "thiserror", + "tracing", +] + [[package]] name = "jsonrpsee-utils" version = "0.4.1" @@ -3063,7 +3129,7 @@ checksum = "0109c4f972058f3b1925b73a17210aff7b63b65967264d0045d15ee88fe84f0c" dependencies = [ "arrayvec 0.7.2", "beef", - "jsonrpsee-types", + "jsonrpsee-types 0.4.1", ] [[package]] @@ -3077,7 +3143,7 @@ dependencies = [ "fnv", "futures 0.3.18", "http", - "jsonrpsee-types", + "jsonrpsee-types 0.4.1", "log", "pin-project 1.0.8", "rustls-native-certs", @@ -3090,6 +3156,17 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "jsonrpsee-ws-client" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c97f67449d58b8d90ad57986d12dacab8fd594759ff64eb5e6b6e84e470db977" +dependencies = [ + "jsonrpsee-client-transport", + "jsonrpsee-core", + "jsonrpsee-types 0.7.0", +] + [[package]] name = "keccak" version = "0.1.0" @@ -7684,7 +7761,7 @@ version = "0.10.0-dev" source = "git+https://github.com/paritytech/substrate?branch=master#3904d77ad2499ab36b242e52fbe23d5cd8773ad9" dependencies = [ "env_logger 0.9.0", - "jsonrpsee", + "jsonrpsee 0.4.1", "log", "parity-scale-codec", "serde", @@ -10003,7 +10080,7 @@ dependencies = [ "frame-election-provider-support", "frame-support", "frame-system", - "jsonrpsee", + "jsonrpsee 0.7.0", "kusama-runtime", "log", "pallet-balances", @@ -10838,7 +10915,7 @@ name = "try-runtime-cli" version = "0.10.0-dev" source = "git+https://github.com/paritytech/substrate?branch=master#3904d77ad2499ab36b242e52fbe23d5cd8773ad9" dependencies = [ - "jsonrpsee", + "jsonrpsee 0.4.1", "log", "parity-scale-codec", "remote-externalities", diff --git a/utils/staking-miner/Cargo.toml b/utils/staking-miner/Cargo.toml index e982dcfde229..bd7b84428055 100644 --- a/utils/staking-miner/Cargo.toml +++ b/utils/staking-miner/Cargo.toml @@ -10,7 +10,7 @@ tokio = { version = "1.14", features = ["macros"] } log = "0.4.11" env_logger = "0.9.0" structopt = "0.3.25" -jsonrpsee = { version = "0.4.1", default-features = false, features = ["ws-client"] } +jsonrpsee = { version = "0.7", default-features = false, features = ["ws-client"] } serde_json = "1.0" serde = "1.0.130" paste = "1.0.6" diff --git a/utils/staking-miner/src/main.rs b/utils/staking-miner/src/main.rs index 95319f9e24a4..7dea1232049e 100644 --- a/utils/staking-miner/src/main.rs +++ b/utils/staking-miner/src/main.rs @@ -40,7 +40,7 @@ pub(crate) use signer::get_account_info; use frame_election_provider_support::NposSolver; use frame_support::traits::Get; -use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; +use jsonrpsee::{core::client::Client as WsClient, ws_client::WsClientBuilder}; use remote_externalities::{Builder, Mode, OnlineConfig}; use sp_npos_elections::ExtendedBalance; use sp_runtime::{traits::Block as BlockT, DeserializeOwned}; @@ -225,7 +225,7 @@ macro_rules! any_runtime_unit { #[derive(frame_support::DebugNoBound, thiserror::Error)] enum Error { Io(#[from] std::io::Error), - JsonRpsee(#[from] jsonrpsee::types::Error), + JsonRpsee(#[from] jsonrpsee::core::Error), RpcHelperError(#[from] rpc_helpers::RpcHelperError), Codec(#[from] codec::Error), Crypto(sp_core::crypto::SecretStringError), diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index 19259098df53..e5074b34e0e0 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -19,9 +19,11 @@ use crate::{prelude::*, rpc_helpers::*, signer::Signer, Error, MonitorConfig, SharedConfig}; use codec::Encode; use jsonrpsee::{ + core::{ + client::{Client as WsClient, Subscription, SubscriptionClientT}, + Error as RpcError, + }, rpc_params, - types::{traits::SubscriptionClient, Subscription}, - ws_client::WsClient, }; use sc_transaction_pool_api::TransactionStatus; @@ -86,7 +88,22 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { .await .unwrap(); - while let Some(now) = subscription.next().await? { + while let Some(rp) = subscription.next().await { + let now = match rp { + Ok(r) => r, + Err(RpcError::SubscriptionClosed(reason)) => { + log::debug!("[rpc]: subscription closed by the server: {:?}, starting a new one", reason); + continue; + } + Err(e) => { + // NOTE(niklasad1): this should only occur if the response couldn't + // be decoded as `Header`. + log::error!("{:?}", e); + return Err(e.into()); + } + }; + + let hash = now.hash(); log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", now.number, hash); @@ -151,7 +168,21 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { } }; - while let Some(status_update) = tx_subscription.next().await? { + while let Some(rp) = tx_subscription.next().await { + let status_update = match rp { + Ok(r) => r, + Err(RpcError::SubscriptionClosed(reason)) => { + log::debug!("[rpc]: subscription closed by the server: {:?}, starting a new one", reason); + continue; + } + Err(e) => { + // NOTE(niklasad1): this should only occur if the response couldn't + // be decoded as `Header`. + log::error!("{:?}", e); + return Err(e.into()); + } + }; + log::trace!(target: LOG_TARGET, "status update {:?}", status_update); match status_update { TransactionStatus::Ready | TransactionStatus::Broadcast(_) | TransactionStatus::Future => continue, diff --git a/utils/staking-miner/src/rpc_helpers.rs b/utils/staking-miner/src/rpc_helpers.rs index 1277564ebd93..153ca0e65c03 100644 --- a/utils/staking-miner/src/rpc_helpers.rs +++ b/utils/staking-miner/src/rpc_helpers.rs @@ -17,12 +17,12 @@ //! Helper method for RPC. use super::*; -use jsonrpsee::types::traits::Client; -pub(crate) use jsonrpsee::types::v2::ParamsSer; +use jsonrpsee::core::client::ClientT; +pub(crate) use jsonrpsee::types::ParamsSer; #[derive(frame_support::DebugNoBound, thiserror::Error)] pub(crate) enum RpcHelperError { - JsonRpsee(#[from] jsonrpsee::types::Error), + JsonRpsee(#[from] jsonrpsee::core::Error), Codec(#[from] codec::Error), } From 4d9644ed26d562117e02969d31ffbf31198dc897 Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 13 Jan 2022 16:27:43 +0100 Subject: [PATCH 03/10] run `monitor_cmd_for` until the connection is closed --- Cargo.lock | 4 ++-- utils/staking-miner/src/monitor.rs | 31 ++++++++++++++++++++---------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fda4ad06b862..379a82ad8545 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3102,11 +3102,11 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6bc39096d2bd470ecbd5ed96c8464e2b2c2ef7ec6f8cb9611604255608624773" dependencies = [ - "futures 0.3.18", + "futures 0.3.19", "http", "jsonrpsee-core", "jsonrpsee-types 0.7.0", - "pin-project 1.0.8", + "pin-project 1.0.10", "soketto", "thiserror", "tokio", diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index e5074b34e0e0..31f7aa492a6a 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -83,11 +83,20 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { loop { log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?}", sub, unsub); - let mut subscription: Subscription
= client - .subscribe(&sub, None, &unsub) - .await - .unwrap(); + let mut subscription = match client.subscribe::
(&sub, None, &unsub).await { + Ok(sub) => sub, + Err(RpcError::RestartNeeded(e)) => { + log::error!("[rpc] connection closed: {:?}", e); + return Err(RpcError::RestartNeeded(e).into()); + } + Err(e) => { + log::warn!("[rpc] subscription: `{}` failed {:?}; retrying", sub, e); + continue; + } + }; + + // If this fails try to re-establish the subscription again in the next loop iteration. while let Some(rp) = subscription.next().await { let now = match rp { Ok(r) => r, @@ -97,8 +106,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { } Err(e) => { // NOTE(niklasad1): this should only occur if the response couldn't - // be decoded as `Header`. - log::error!("{:?}", e); + // be decoded as `Header` => it's a bug. + log::error!("[rpc]: subscription failed to decode Header {:?}, this is bug please file an issue", e); return Err(e.into()); } }; @@ -155,6 +164,10 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { .await { Ok(sub) => sub, + Err(RpcError::RestartNeeded(e)) => { + log::error!("{:?}", e); + return Err(RpcError::RestartNeeded(e).into()); + } Err(why) => { // This usually happens when we've been busy with mining for a few blocks, and // now we're receiving the subscriptions of blocks in which we were busy. In @@ -172,13 +185,11 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { let status_update = match rp { Ok(r) => r, Err(RpcError::SubscriptionClosed(reason)) => { - log::debug!("[rpc]: subscription closed by the server: {:?}, starting a new one", reason); + log::warn!("[rpc]: subscription closed by the server: {:?}; continuing...", reason); continue; } Err(e) => { - // NOTE(niklasad1): this should only occur if the response couldn't - // be decoded as `Header`. - log::error!("{:?}", e); + log::error!("[rpc]: subscription failed to decode TransactionStatus {:?}, this is a bug please file an issue", e); return Err(e.into()); } }; From 21546b1642a295cf7f521e177b2bce3814235e4f Mon Sep 17 00:00:00 2001 From: Niklas Date: Thu, 13 Jan 2022 19:22:23 +0100 Subject: [PATCH 04/10] new tokio task for submit_and_watch xt --- Cargo.lock | 109 ++++++++++-- utils/staking-miner/Cargo.toml | 4 +- utils/staking-miner/src/main.rs | 2 +- utils/staking-miner/src/monitor.rs | 277 ++++++++++++++++------------- 4 files changed, 254 insertions(+), 138 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 379a82ad8545..2e2a8fc83747 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1335,7 +1335,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1a816186fa68d9e426e3cb4ae4dff1fcd8e4a2c34b781bf7a822574a0d0aac8" dependencies = [ - "sct", + "sct 0.6.1", ] [[package]] @@ -2300,8 +2300,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a1387e07917c711fb4ee4f48ea0adb04a3c9739e53ef85bf43ae1edc2937a8b" dependencies = [ "futures-io", - "rustls", - "webpki", + "rustls 0.19.1", + "webpki 0.21.4", ] [[package]] @@ -2701,11 +2701,11 @@ dependencies = [ "futures-util", "hyper", "log", - "rustls", - "rustls-native-certs", + "rustls 0.19.1", + "rustls-native-certs 0.5.0", "tokio", - "tokio-rustls", - "webpki", + "tokio-rustls 0.22.0", + "webpki 0.21.4", ] [[package]] @@ -3091,6 +3091,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "726b6cb76e568aefc4cc127fdb39cb9d92c176f4df0385eaf8053f770351719c" dependencies = [ + "jsonrpsee-client-transport", "jsonrpsee-core", "jsonrpsee-types 0.7.0", "jsonrpsee-ws-client 0.7.0", @@ -3107,11 +3108,14 @@ dependencies = [ "jsonrpsee-core", "jsonrpsee-types 0.7.0", "pin-project 1.0.10", + "rustls-native-certs 0.6.1", "soketto", "thiserror", "tokio", + "tokio-rustls 0.23.2", "tokio-util", "tracing", + "webpki-roots 0.22.2", ] [[package]] @@ -3208,13 +3212,13 @@ dependencies = [ "jsonrpsee-types 0.4.1", "log", "pin-project 1.0.10", - "rustls-native-certs", + "rustls-native-certs 0.5.0", "serde", "serde_json", "soketto", "thiserror", "tokio", - "tokio-rustls", + "tokio-rustls 0.22.0", "tokio-util", ] @@ -3908,7 +3912,7 @@ dependencies = [ "rw-stream-sink", "soketto", "url 2.2.2", - "webpki-roots", + "webpki-roots 0.21.1", ] [[package]] @@ -8265,8 +8269,20 @@ dependencies = [ "base64", "log", "ring", - "sct", - "webpki", + "sct 0.6.1", + "webpki 0.21.4", +] + +[[package]] +name = "rustls" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d37e5e2290f3e040b594b1a9e04377c2c671f1a1cfd9bfdef82106ac1c113f84" +dependencies = [ + "log", + "ring", + "sct 0.7.0", + "webpki 0.22.0", ] [[package]] @@ -8276,11 +8292,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" dependencies = [ "openssl-probe", - "rustls", + "rustls 0.19.1", "schannel", "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca9ebdfa27d3fc180e42879037b5338ab1c040c06affd00d8338598e7800943" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9" +dependencies = [ + "base64", +] + [[package]] name = "rustversion" version = "1.0.6" @@ -9311,6 +9348,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "secrecy" version = "0.8.0" @@ -10963,9 +11010,20 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ - "rustls", + "rustls 0.19.1", + "tokio", + "webpki 0.21.4", +] + +[[package]] +name = "tokio-rustls" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a27d5f2b839802bd8267fa19b0530f5a08b9c08cd417976be2a65d130fe1c11b" +dependencies = [ + "rustls 0.20.2", "tokio", - "webpki", + "webpki 0.22.0", ] [[package]] @@ -11778,13 +11836,32 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "webpki-roots" version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" dependencies = [ - "webpki", + "webpki 0.21.4", +] + +[[package]] +name = "webpki-roots" +version = "0.22.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552ceb903e957524388c4d3475725ff2c8b7960922063af6ce53c9a43da07449" +dependencies = [ + "webpki 0.22.0", ] [[package]] diff --git a/utils/staking-miner/Cargo.toml b/utils/staking-miner/Cargo.toml index 84d00419da0f..a8686f882963 100644 --- a/utils/staking-miner/Cargo.toml +++ b/utils/staking-miner/Cargo.toml @@ -6,11 +6,11 @@ edition = "2018" [dependencies] codec = { package = "parity-scale-codec", version = "2.0.0" } -tokio = { version = "1.15", features = ["macros"] } +tokio = { version = "1.15", features = ["macros", "rt-multi-thread", "sync"] } log = "0.4.11" env_logger = "0.9.0" structopt = "0.3.25" -jsonrpsee = { version = "0.7", default-features = false, features = ["ws-client"] } +jsonrpsee = { version = "0.7", default-features = false, features = ["ws-client", "client-ws-transport"] } serde_json = "1.0" serde = "1.0.132" paste = "1.0.6" diff --git a/utils/staking-miner/src/main.rs b/utils/staking-miner/src/main.rs index 279b46bfadb7..13a6d822e15d 100644 --- a/utils/staking-miner/src/main.rs +++ b/utils/staking-miner/src/main.rs @@ -602,7 +602,7 @@ async fn main() { let outcome = any_runtime! { match command.clone() { - Command::Monitor(c) => monitor_cmd(&client, shared, c, signer_account).await + Command::Monitor(c) => monitor_cmd(client, shared, c, signer_account).await .map_err(|e| { log::error!(target: LOG_TARGET, "Monitor error: {:?}", e); }), diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index 31f7aa492a6a..f2b985d19312 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -25,9 +25,10 @@ use jsonrpsee::{ }, rpc_params, }; - use sc_transaction_pool_api::TransactionStatus; use sp_core::storage::StorageKey; +use std::sync::Arc; +use tokio::sync::mpsc; /// Ensure that now is the signed phase. async fn ensure_signed_phase( @@ -68,19 +69,23 @@ async fn ensure_no_previous_solution< macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { /// The monitor command. pub(crate) async fn []( - client: &WsClient, + client: WsClient, shared: SharedConfig, config: MonitorConfig, signer: Signer, ) -> Result<(), Error<$crate::[<$runtime _runtime_exports>]::Runtime>> { use $crate::[<$runtime _runtime_exports>]::*; + let client = Arc::new(client); + let (sub, unsub) = if config.listen == "head" { ("chain_subscribeNewHeads", "chain_unsubscribeNewHeads") } else { ("chain_subscribeFinalizedHeads", "chain_unsubscribeFinalizedHeads") }; + let (tx, mut rx) = mpsc::unbounded_channel::(); + loop { log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?}", sub, unsub); @@ -96,130 +101,164 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { } }; - // If this fails try to re-establish the subscription again in the next loop iteration. - while let Some(rp) = subscription.next().await { - let now = match rp { - Ok(r) => r, - Err(RpcError::SubscriptionClosed(reason)) => { - log::debug!("[rpc]: subscription closed by the server: {:?}, starting a new one", reason); - continue; - } - Err(e) => { - // NOTE(niklasad1): this should only occur if the response couldn't - // be decoded as `Header` => it's a bug. - log::error!("[rpc]: subscription failed to decode Header {:?}, this is bug please file an issue", e); - return Err(e.into()); - } - }; - - - let hash = now.hash(); - log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", now.number, hash); - - // if the runtime version has changed, terminate. - crate::check_versions::(client).await?; - - // we prefer doing this check before fetching anything into a remote-ext. - if ensure_signed_phase::(client, hash).await.is_err() { - log::debug!(target: LOG_TARGET, "phase closed, not interested in this block at all."); - continue; - }; - - // grab an externalities without staking, just the election snapshot. - let mut ext = crate::create_election_ext::( - shared.uri.clone(), - Some(hash), - vec![], - ).await?; + let rp = tokio::select! { + Some(rp) = subscription.next() => rp, + Some(err) = rx.recv() => return Err(err.into()), + else => { + log::warn!(target: LOG_TARGET, "subscription to {} terminated. Retrying..", sub); + continue + } + }; - if ensure_no_previous_solution::(&mut ext, &signer.account).await.is_err() { - log::debug!(target: LOG_TARGET, "We already have a solution in this phase, skipping."); + let now = match rp { + Ok(r) => r, + Err(RpcError::SubscriptionClosed(reason)) => { + log::debug!("[rpc]: subscription closed by the server: {:?}, starting a new one", reason); continue; } + Err(e) => { + // NOTE(niklasad1): this should only occur if the response couldn't + // be decoded as `Header` => it's a bug. + log::error!("[rpc]: subscription failed to decode Header {:?}, this is bug please file an issue", e); + return Err(e.into()); + } + }; - // mine a solution, and run feasibility check on it as well. - let (raw_solution, witness) = crate::mine_with::(&config.solver, &mut ext, true)?; - log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score); - - let nonce = crate::get_account_info::(client, &signer.account, Some(hash)) - .await? - .map(|i| i.nonce) - .expect(crate::signer::SIGNER_ACCOUNT_WILL_EXIST); - let tip = 0 as Balance; - let period = ::BlockHashCount::get() / 2; - let current_block = now.number.saturating_sub(1); - let era = sp_runtime::generic::Era::mortal(period.into(), current_block.into()); - log::trace!( - target: LOG_TARGET, "transaction mortality: {:?} -> {:?}", - era.birth(current_block.into()), - era.death(current_block.into()), - ); - let extrinsic = ext.execute_with(|| create_uxt(raw_solution, witness, signer.clone(), nonce, tip, era)); - let bytes = sp_core::Bytes(extrinsic.encode()); - - let mut tx_subscription: Subscription< - TransactionStatus<::Hash, ::Hash> - > = match client - .subscribe(&"author_submitAndWatchExtrinsic", rpc_params! { bytes }, "author_unwatchExtrinsic") - .await - { - Ok(sub) => sub, - Err(RpcError::RestartNeeded(e)) => { - log::error!("{:?}", e); - return Err(RpcError::RestartNeeded(e).into()); - } - Err(why) => { - // This usually happens when we've been busy with mining for a few blocks, and - // now we're receiving the subscriptions of blocks in which we were busy. In - // these blocks, we still don't have a solution, so we re-compute a new solution - // and submit it with an outdated `Nonce`, which yields most often `Stale` - // error. NOTE: to improve this overall, and to be able to introduce an array of - // other fancy features, we should make this multi-threaded and do the - // computation outside of this callback. - log::warn!(target: LOG_TARGET, "failing to submit a transaction {:?}. continuing...", why); - continue - } - }; - - while let Some(rp) = tx_subscription.next().await { - let status_update = match rp { - Ok(r) => r, - Err(RpcError::SubscriptionClosed(reason)) => { - log::warn!("[rpc]: subscription closed by the server: {:?}; continuing...", reason); - continue; - } - Err(e) => { - log::error!("[rpc]: subscription failed to decode TransactionStatus {:?}, this is a bug please file an issue", e); - return Err(e.into()); - } - }; + let hash = now.hash(); + log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", now.number, hash); + + // if the runtime version has changed, terminate. + crate::check_versions::(&*client).await?; - log::trace!(target: LOG_TARGET, "status update {:?}", status_update); - match status_update { - TransactionStatus::Ready | TransactionStatus::Broadcast(_) | TransactionStatus::Future => continue, - TransactionStatus::InBlock(hash) => { - log::info!(target: LOG_TARGET, "included at {:?}", hash); - let key = StorageKey(frame_support::storage::storage_prefix(b"System",b"Events").to_vec()); - let events = get_storage::::Hash>>, - >(client, rpc_params!{ key, hash }).await?.unwrap_or_default(); - log::info!(target: LOG_TARGET, "events at inclusion {:?}", events); - } - TransactionStatus::Retracted(hash) => { - log::info!(target: LOG_TARGET, "Retracted at {:?}", hash); - } - TransactionStatus::Finalized(hash) => { - log::info!(target: LOG_TARGET, "Finalized at {:?}", hash); - break - } - _ => { - log::warn!(target: LOG_TARGET, "Stopping listen due to other status {:?}", status_update); - break - } - } - }; + // we prefer doing this check before fetching anything into a remote-ext. + if ensure_signed_phase::(&*client, hash).await.is_err() { + log::debug!(target: LOG_TARGET, "phase closed, not interested in this block at all."); + continue; + }; + + // grab an externalities without staking, just the election snapshot. + let mut ext = crate::create_election_ext::( + shared.uri.clone(), + Some(hash), + vec![], + ).await?; + + if ensure_no_previous_solution::(&mut ext, &signer.account).await.is_err() { + log::debug!(target: LOG_TARGET, "We already have a solution in this phase, skipping."); + continue; } - log::warn!(target: LOG_TARGET, "subscription to {} terminated. Retrying..", sub) + // mine a solution, and run feasibility check on it as well. + let (raw_solution, witness) = crate::mine_with::(&config.solver, &mut ext, true)?; + log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score); + + let nonce = crate::get_account_info::(&*client, &signer.account, Some(hash)) + .await? + .map(|i| i.nonce) + .expect(crate::signer::SIGNER_ACCOUNT_WILL_EXIST); + let tip = 0 as Balance; + let period = ::BlockHashCount::get() / 2; + let current_block = now.number.saturating_sub(1); + let era = sp_runtime::generic::Era::mortal(period.into(), current_block.into()); + log::trace!( + target: LOG_TARGET, "transaction mortality: {:?} -> {:?}", + era.birth(current_block.into()), + era.death(current_block.into()), + ); + let extrinsic = ext.execute_with(|| create_uxt(raw_solution, witness, signer.clone(), nonce, tip, era)); + let bytes = sp_core::Bytes(extrinsic.encode()); + + let client2 = client.clone(); + let tx2 = tx.clone(); + + // send and watch extrinsic + tokio::spawn(async move { + let mut tx_subscription: Subscription< + TransactionStatus<::Hash, ::Hash> + > = match client2.subscribe( + "author_submitAndWatchExtrinsic", + rpc_params! { bytes }, + "author_unwatchExtrinsic" + ).await { + Ok(sub) => sub, + Err(RpcError::RestartNeeded(e)) => { + log::error!("{:?}", e); + tx2.send(RpcError::RestartNeeded(e)).unwrap(); + return + }, + Err(why) => { + // This usually happens when we've been busy with mining for a few blocks, and + // now we're receiving the subscriptions of blocks in which we were busy. In + // these blocks, we still don't have a solution, so we re-compute a new solution + // and submit it with an outdated `Nonce`, which yields most often `Stale` + // error. NOTE: to improve this overall, and to be able to introduce an array of + // other fancy features, we should make this multi-threaded and do the + // computation outside of this callback. + log::warn!( + target: LOG_TARGET, + "failing to submit a transaction {:?}. continuing...", + why + ); + return; + }, + }; + + while let Some(rp) = tx_subscription.next().await { + let status_update = match rp { + Ok(r) => r, + Err(RpcError::SubscriptionClosed(reason)) => { + log::warn!( + "[rpc]: subscription closed by the server: {:?}; continuing...", + reason + ); + continue + }, + Err(e) => { + log::error!("[rpc]: subscription failed to decode TransactionStatus {:?}, this is a bug please file an issue", e); + tx2.send(e).unwrap(); + return; + }, + }; + + log::trace!(target: LOG_TARGET, "status update {:?}", status_update); + match status_update { + TransactionStatus::Ready | + TransactionStatus::Broadcast(_) | + TransactionStatus::Future => continue, + TransactionStatus::InBlock(hash) => { + log::info!(target: LOG_TARGET, "included at {:?}", hash); + let key = StorageKey( + frame_support::storage::storage_prefix(b"System", b"Events").to_vec(), + ); + + // TODO(niklasad1): what to do if this fails; the task will die here now + let events = get_storage::< + Vec::Hash>>, + >(&*client2, rpc_params! { key, hash }) + .await + .unwrap() + .unwrap_or_default(); + + log::info!(target: LOG_TARGET, "events at inclusion {:?}", events); + }, + TransactionStatus::Retracted(hash) => { + log::info!(target: LOG_TARGET, "Retracted at {:?}", hash); + }, + TransactionStatus::Finalized(hash) => { + log::info!(target: LOG_TARGET, "Finalized at {:?}", hash); + break + }, + _ => { + log::warn!( + target: LOG_TARGET, + "Stopping listen due to other status {:?}", + status_update + ); + break + }, + }; + } + }); } } }}} From 00ff01425ea6cf660ce5631f5f3b922db6113fb5 Mon Sep 17 00:00:00 2001 From: Niklas Date: Fri, 14 Jan 2022 16:14:31 +0100 Subject: [PATCH 05/10] re-use header subscription --- runtime/polkadot/constants/src/lib.rs | 2 +- runtime/polkadot/src/lib.rs | 2 +- utils/staking-miner/README.md | 7 + utils/staking-miner/src/monitor.rs | 323 +++++++++++++++----------- 4 files changed, 196 insertions(+), 138 deletions(-) diff --git a/runtime/polkadot/constants/src/lib.rs b/runtime/polkadot/constants/src/lib.rs index 37c26c62074f..2e48a11a1115 100644 --- a/runtime/polkadot/constants/src/lib.rs +++ b/runtime/polkadot/constants/src/lib.rs @@ -40,7 +40,7 @@ pub mod time { use primitives::v0::{BlockNumber, Moment}; pub const MILLISECS_PER_BLOCK: Moment = 6000; pub const SLOT_DURATION: Moment = MILLISECS_PER_BLOCK; - pub const EPOCH_DURATION_IN_SLOTS: BlockNumber = 4 * HOURS; + pub const EPOCH_DURATION_IN_SLOTS: BlockNumber = 1 * MINUTES; // These time units are defined in number of blocks. pub const MINUTES: BlockNumber = 60_000 / (MILLISECS_PER_BLOCK as BlockNumber); diff --git a/runtime/polkadot/src/lib.rs b/runtime/polkadot/src/lib.rs index 69c419e048c4..e97da07c941a 100644 --- a/runtime/polkadot/src/lib.rs +++ b/runtime/polkadot/src/lib.rs @@ -521,7 +521,7 @@ pallet_staking_reward_curve::build! { parameter_types! { // Six sessions in an era (24 hours). - pub const SessionsPerEra: SessionIndex = 6; + pub const SessionsPerEra: SessionIndex = 2; // 28 eras for unbonding (28 days). pub const BondingDuration: pallet_staking::EraIndex = 28; pub const SlashDeferDuration: pallet_staking::EraIndex = 27; diff --git a/utils/staking-miner/README.md b/utils/staking-miner/README.md index 944f870d6dfc..6355395b1ab1 100644 --- a/utils/staking-miner/README.md +++ b/utils/staking-miner/README.md @@ -59,3 +59,10 @@ docker run --rm -it \ -e URI=wss://your-node:9944 \ staking-miner dry-run ``` + +### Test locally + +1. Modify `EPOCH_DURATION_IN_SLOTS` and `SessionsPerEra` to force an election + more often than once per day. +2. $ polkadot --chain polkadot-dev --tmp --alice --execution Native -lruntime=debug --offchain-worker=Always --ws-port 9999 +3. $ staking-miner --uri ws://localhost:9999 --seed //Alice monitor phrag-mms diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index f2b985d19312..d52ac749518b 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -67,6 +67,7 @@ async fn ensure_no_previous_solution< } macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { + /// The monitor command. pub(crate) async fn []( client: WsClient, @@ -75,190 +76,240 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { signer: Signer, ) -> Result<(), Error<$crate::[<$runtime _runtime_exports>]::Runtime>> { use $crate::[<$runtime _runtime_exports>]::*; + type StakingMinerError = Error<$crate::[<$runtime _runtime_exports>]::Runtime>; let client = Arc::new(client); - let (sub, unsub) = if config.listen == "head" { ("chain_subscribeNewHeads", "chain_unsubscribeNewHeads") } else { ("chain_subscribeFinalizedHeads", "chain_unsubscribeFinalizedHeads") }; - let (tx, mut rx) = mpsc::unbounded_channel::(); + /// Create a new header subscription + /// + /// Fails if the connection is closed or fails after 10 attempts. + async fn create_header_subscription( + client: &WsClient, + sub: &str, + unsub: &str + ) -> Result, StakingMinerError> { + let mut last_err = None; + for _ in 0..10 { + match client.subscribe(&sub, None, &unsub).await { + Ok(sub) => return Ok(sub), + Err(RpcError::RestartNeeded(e)) => { + log::error!("[rpc] connection closed: {:?}", e); + return Err(RpcError::RestartNeeded(e).into()); + } + Err(e) => { + log::warn!("[rpc] subscription: `{}` failed {:?}; retrying", sub, e); + last_err = Some(e.into()); + continue; + } + }; + }; - loop { - log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?}", sub, unsub); + Err(last_err.expect("looped 10 times must be Some; qed")) + } - let mut subscription = match client.subscribe::
(&sub, None, &unsub).await { - Ok(sub) => sub, - Err(RpcError::RestartNeeded(e)) => { - log::error!("[rpc] connection closed: {:?}", e); - return Err(RpcError::RestartNeeded(e).into()); - } - Err(e) => { - log::warn!("[rpc] subscription: `{}` failed {:?}; retrying", sub, e); - continue; - } - }; + let (tx, mut rx) = mpsc::unbounded_channel::>(); + let mut subscription = create_header_subscription(&*client, sub, unsub).await?; - let rp = tokio::select! { - Some(rp) = subscription.next() => rp, - Some(err) = rx.recv() => return Err(err.into()), + loop { + let at = tokio::select! { + Some(rp) = subscription.next() => { + match rp { + Ok(r) => r, + Err(RpcError::SubscriptionClosed(reason)) => { + log::debug!("[rpc]: subscription closed by the server: {:?}, starting a new one", reason); + continue; + } + Err(e) => { + log::error!("[rpc]: subscription failed to decode Header {:?}, this is bug please file an issue", e); + return Err(e.into()); + } + } + }, + Some(err) = rx.recv() => return Err(err), else => { + log::warn!("[rpc]: restarting header subscription"); + subscription = create_header_subscription(&*client, sub, unsub).await?; log::warn!(target: LOG_TARGET, "subscription to {} terminated. Retrying..", sub); continue } }; - let now = match rp { - Ok(r) => r, - Err(RpcError::SubscriptionClosed(reason)) => { - log::debug!("[rpc]: subscription closed by the server: {:?}, starting a new one", reason); - continue; - } - Err(e) => { - // NOTE(niklasad1): this should only occur if the response couldn't - // be decoded as `Header` => it's a bug. - log::error!("[rpc]: subscription failed to decode Header {:?}, this is bug please file an issue", e); - return Err(e.into()); - } - }; + log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?} at: {}", sub, unsub, at.number()); + + // Spawn task and non-recoverable errors are sent back to the main task + // such as if the connection has been closed. + tokio::spawn(send_and_watch_extrinsic(client.clone(), tx.clone(), at, signer.clone(), shared.clone(), config.clone())); + } + + /// Construct extrinsic at given block and watch it. + async fn send_and_watch_extrinsic( + client: Arc, + tx: mpsc::UnboundedSender, + at: Header, + signer: Signer, + shared: SharedConfig, + config: MonitorConfig, + ) { - let hash = now.hash(); - log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", now.number, hash); + let hash = at.hash(); + log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", at.number, hash); // if the runtime version has changed, terminate. - crate::check_versions::(&*client).await?; + if let Err(err) = crate::check_versions::(&*client).await { + let _ = tx.send(err.into()); + return; + } // we prefer doing this check before fetching anything into a remote-ext. if ensure_signed_phase::(&*client, hash).await.is_err() { log::debug!(target: LOG_TARGET, "phase closed, not interested in this block at all."); - continue; - }; + return; + } // grab an externalities without staking, just the election snapshot. - let mut ext = crate::create_election_ext::( + let mut ext = match crate::create_election_ext::( shared.uri.clone(), Some(hash), vec![], - ).await?; + ).await { + Ok(ext) => ext, + Err(err) => { + let _ = tx.send(err); + return; + } + }; if ensure_no_previous_solution::(&mut ext, &signer.account).await.is_err() { log::debug!(target: LOG_TARGET, "We already have a solution in this phase, skipping."); - continue; + return; } // mine a solution, and run feasibility check on it as well. - let (raw_solution, witness) = crate::mine_with::(&config.solver, &mut ext, true)?; + let (raw_solution, witness) = match crate::mine_with::(&config.solver, &mut ext, true) { + Ok(r) => r, + Err(err) => { + let _ = tx.send(err.into()); + return; + } + }; + log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score); - let nonce = crate::get_account_info::(&*client, &signer.account, Some(hash)) - .await? - .map(|i| i.nonce) - .expect(crate::signer::SIGNER_ACCOUNT_WILL_EXIST); + let nonce = match crate::get_account_info::(&*client, &signer.account, Some(hash)).await { + Ok(maybe_account) => { + let acc = maybe_account.expect(crate::signer::SIGNER_ACCOUNT_WILL_EXIST); + acc.nonce + } + Err(err) => { + let _ = tx.send(err); + return; + } + }; + let tip = 0 as Balance; let period = ::BlockHashCount::get() / 2; - let current_block = now.number.saturating_sub(1); + let current_block = at.number.saturating_sub(1); let era = sp_runtime::generic::Era::mortal(period.into(), current_block.into()); + log::trace!( target: LOG_TARGET, "transaction mortality: {:?} -> {:?}", era.birth(current_block.into()), era.death(current_block.into()), ); + let extrinsic = ext.execute_with(|| create_uxt(raw_solution, witness, signer.clone(), nonce, tip, era)); let bytes = sp_core::Bytes(extrinsic.encode()); - let client2 = client.clone(); - let tx2 = tx.clone(); - - // send and watch extrinsic - tokio::spawn(async move { - let mut tx_subscription: Subscription< - TransactionStatus<::Hash, ::Hash> - > = match client2.subscribe( - "author_submitAndWatchExtrinsic", - rpc_params! { bytes }, - "author_unwatchExtrinsic" - ).await { - Ok(sub) => sub, - Err(RpcError::RestartNeeded(e)) => { - log::error!("{:?}", e); - tx2.send(RpcError::RestartNeeded(e)).unwrap(); - return - }, - Err(why) => { - // This usually happens when we've been busy with mining for a few blocks, and - // now we're receiving the subscriptions of blocks in which we were busy. In - // these blocks, we still don't have a solution, so we re-compute a new solution - // and submit it with an outdated `Nonce`, which yields most often `Stale` - // error. NOTE: to improve this overall, and to be able to introduce an array of - // other fancy features, we should make this multi-threaded and do the - // computation outside of this callback. - log::warn!( - target: LOG_TARGET, - "failing to submit a transaction {:?}. continuing...", - why - ); - return; - }, - }; - - while let Some(rp) = tx_subscription.next().await { - let status_update = match rp { - Ok(r) => r, - Err(RpcError::SubscriptionClosed(reason)) => { - log::warn!( - "[rpc]: subscription closed by the server: {:?}; continuing...", - reason - ); - continue - }, - Err(e) => { - log::error!("[rpc]: subscription failed to decode TransactionStatus {:?}, this is a bug please file an issue", e); - tx2.send(e).unwrap(); - return; - }, - }; - - log::trace!(target: LOG_TARGET, "status update {:?}", status_update); - match status_update { - TransactionStatus::Ready | - TransactionStatus::Broadcast(_) | - TransactionStatus::Future => continue, - TransactionStatus::InBlock(hash) => { - log::info!(target: LOG_TARGET, "included at {:?}", hash); - let key = StorageKey( - frame_support::storage::storage_prefix(b"System", b"Events").to_vec(), - ); - - // TODO(niklasad1): what to do if this fails; the task will die here now - let events = get_storage::< - Vec::Hash>>, - >(&*client2, rpc_params! { key, hash }) - .await - .unwrap() - .unwrap_or_default(); - - log::info!(target: LOG_TARGET, "events at inclusion {:?}", events); - }, - TransactionStatus::Retracted(hash) => { - log::info!(target: LOG_TARGET, "Retracted at {:?}", hash); - }, - TransactionStatus::Finalized(hash) => { - log::info!(target: LOG_TARGET, "Finalized at {:?}", hash); - break - }, - _ => { - log::warn!( - target: LOG_TARGET, - "Stopping listen due to other status {:?}", - status_update - ); - break - }, - }; - } - }); + let mut tx_subscription: Subscription< + TransactionStatus<::Hash, ::Hash> + > = match client.subscribe( + "author_submitAndWatchExtrinsic", + rpc_params! { bytes }, + "author_unwatchExtrinsic" + ).await { + Ok(sub) => sub, + Err(RpcError::RestartNeeded(e)) => { + log::error!("{:?}", e); + let _ = tx.send(RpcError::RestartNeeded(e).into()); + return + }, + Err(why) => { + // This usually happens when we've been busy with mining for a few blocks, and + // now we're receiving the subscriptions of blocks in which we were busy. In + // these blocks, we still don't have a solution, so we re-compute a new solution + // and submit it with an outdated `Nonce`, which yields most often `Stale` + // error. NOTE: to improve this overall, and to be able to introduce an array of + // other fancy features, we should make this multi-threaded and do the + // computation outside of this callback. + log::warn!( + target: LOG_TARGET, + "failing to submit a transaction {:?}. continuing...", + why + ); + return; + }, + }; + + while let Some(rp) = tx_subscription.next().await { + let status_update = match rp { + Ok(r) => r, + Err(RpcError::SubscriptionClosed(reason)) => { + log::warn!( + "[rpc]: subscription closed by the server: {:?}; continuing...", + reason + ); + continue + }, + Err(e) => { + log::error!("[rpc]: subscription failed to decode TransactionStatus {:?}, this is a bug please file an issue", e); + let _ = tx.send(e.into()); + return; + }, + }; + + log::trace!(target: LOG_TARGET, "status update {:?}", status_update); + match status_update { + TransactionStatus::Ready | + TransactionStatus::Broadcast(_) | + TransactionStatus::Future => continue, + TransactionStatus::InBlock(hash) => { + log::info!(target: LOG_TARGET, "included at {:?}", hash); + let key = StorageKey( + frame_support::storage::storage_prefix(b"System", b"Events").to_vec(), + ); + + // TODO(niklasad1): what to do if this fails; the task will die here now + let events = get_storage::< + Vec::Hash>>, + >(&*client, rpc_params! { key, hash }) + .await + .unwrap() + .unwrap_or_default(); + + log::info!(target: LOG_TARGET, "events at inclusion {:?}", events); + }, + TransactionStatus::Retracted(hash) => { + log::info!(target: LOG_TARGET, "Retracted at {:?}", hash); + }, + TransactionStatus::Finalized(hash) => { + log::info!(target: LOG_TARGET, "Finalized at {:?}", hash); + break + }, + _ => { + log::warn!( + target: LOG_TARGET, + "Stopping listen due to other status {:?}", + status_update + ); + break + }, + }; + } } } }}} From ecfe004eb2b06b2b89ec7fa0b4a2aacb389d21aa Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 24 Jan 2022 11:02:25 +0100 Subject: [PATCH 06/10] update jsonrpsee + simplify code --- Cargo.lock | 33 +++++------ utils/staking-miner/Cargo.toml | 2 +- utils/staking-miner/src/main.rs | 2 +- utils/staking-miner/src/monitor.rs | 93 +++++++++++++++--------------- 4 files changed, 63 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9bb59e59962..75e37b57c029 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3070,26 +3070,25 @@ dependencies = [ [[package]] name = "jsonrpsee" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "726b6cb76e568aefc4cc127fdb39cb9d92c176f4df0385eaf8053f770351719c" +checksum = "05fd8cd6c6b1bbd06881d2cf88f1fc83cc36c98f2219090f839115fb4a956cb9" dependencies = [ - "jsonrpsee-client-transport", "jsonrpsee-core", - "jsonrpsee-types 0.7.0", - "jsonrpsee-ws-client 0.7.0", + "jsonrpsee-types 0.8.0", + "jsonrpsee-ws-client 0.8.0", ] [[package]] name = "jsonrpsee-client-transport" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bc39096d2bd470ecbd5ed96c8464e2b2c2ef7ec6f8cb9611604255608624773" +checksum = "3303cdf246e6ab76e2866fb3d9acb6c76a068b1b28bd923a1b7a8122257ad7b5" dependencies = [ "futures 0.3.19", "http", "jsonrpsee-core", - "jsonrpsee-types 0.7.0", + "jsonrpsee-types 0.8.0", "pin-project 1.0.10", "rustls-native-certs 0.6.1", "soketto", @@ -3103,9 +3102,9 @@ dependencies = [ [[package]] name = "jsonrpsee-core" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b863e5e86a11bfaf46bb3ab5aba184671bd62058e8e3ab741c3395904c7afbf3" +checksum = "f220b5a238dc7992b90f1144fbf6eaa585872c9376afe6fe6863ffead6191bf3" dependencies = [ "anyhow", "arrayvec 0.7.2", @@ -3114,7 +3113,7 @@ dependencies = [ "futures-channel", "futures-util", "hyper", - "jsonrpsee-types 0.7.0", + "jsonrpsee-types 0.8.0", "rustc-hash", "serde", "serde_json", @@ -3158,9 +3157,9 @@ dependencies = [ [[package]] name = "jsonrpsee-types" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e169725e476234f3f96079fb9d8a6d00226db602d3fa056f044994239a490d78" +checksum = "c1b3f601bbbe45cd63f5407b6f7d7950e08a7d4f82aa699ff41a4a5e9e54df58" dependencies = [ "anyhow", "beef", @@ -3207,13 +3206,13 @@ dependencies = [ [[package]] name = "jsonrpsee-ws-client" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c97f67449d58b8d90ad57986d12dacab8fd594759ff64eb5e6b6e84e470db977" +checksum = "aff425cee7c779e33920913bc695447416078ee6d119f443f3060feffa4e86b5" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", - "jsonrpsee-types 0.7.0", + "jsonrpsee-types 0.8.0", ] [[package]] @@ -10381,7 +10380,7 @@ dependencies = [ "frame-election-provider-support", "frame-support", "frame-system", - "jsonrpsee 0.7.0", + "jsonrpsee 0.8.0", "kusama-runtime", "log", "pallet-balances", diff --git a/utils/staking-miner/Cargo.toml b/utils/staking-miner/Cargo.toml index a8686f882963..c4124024fe6b 100644 --- a/utils/staking-miner/Cargo.toml +++ b/utils/staking-miner/Cargo.toml @@ -10,7 +10,7 @@ tokio = { version = "1.15", features = ["macros", "rt-multi-thread", "sync"] } log = "0.4.11" env_logger = "0.9.0" structopt = "0.3.25" -jsonrpsee = { version = "0.7", default-features = false, features = ["ws-client", "client-ws-transport"] } +jsonrpsee = { version = "0.8", default-features = false, features = ["ws-client"] } serde_json = "1.0" serde = "1.0.132" paste = "1.0.6" diff --git a/utils/staking-miner/src/main.rs b/utils/staking-miner/src/main.rs index ee9f9363eebc..f911e4e90412 100644 --- a/utils/staking-miner/src/main.rs +++ b/utils/staking-miner/src/main.rs @@ -40,7 +40,7 @@ pub(crate) use signer::get_account_info; use frame_election_provider_support::NposSolver; use frame_support::traits::Get; -use jsonrpsee::{core::client::Client as WsClient, ws_client::WsClientBuilder}; +use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; use remote_externalities::{Builder, Mode, OnlineConfig}; use sp_npos_elections::ExtendedBalance; use sp_runtime::{traits::Block as BlockT, DeserializeOwned}; diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index d52ac749518b..b7d9c9df8948 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -20,10 +20,11 @@ use crate::{prelude::*, rpc_helpers::*, signer::Signer, Error, MonitorConfig, Sh use codec::Encode; use jsonrpsee::{ core::{ - client::{Client as WsClient, Subscription, SubscriptionClientT}, + client::{Subscription, SubscriptionClientT}, Error as RpcError, }, rpc_params, + ws_client::WsClient, }; use sc_transaction_pool_api::TransactionStatus; use sp_core::storage::StorageKey; @@ -78,64 +79,47 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { use $crate::[<$runtime _runtime_exports>]::*; type StakingMinerError = Error<$crate::[<$runtime _runtime_exports>]::Runtime>; - let client = Arc::new(client); let (sub, unsub) = if config.listen == "head" { ("chain_subscribeNewHeads", "chain_unsubscribeNewHeads") } else { ("chain_subscribeFinalizedHeads", "chain_unsubscribeFinalizedHeads") }; - /// Create a new header subscription - /// - /// Fails if the connection is closed or fails after 10 attempts. - async fn create_header_subscription( - client: &WsClient, - sub: &str, - unsub: &str - ) -> Result, StakingMinerError> { - let mut last_err = None; - for _ in 0..10 { - match client.subscribe(&sub, None, &unsub).await { - Ok(sub) => return Ok(sub), - Err(RpcError::RestartNeeded(e)) => { - log::error!("[rpc] connection closed: {:?}", e); - return Err(RpcError::RestartNeeded(e).into()); - } - Err(e) => { - log::warn!("[rpc] subscription: `{}` failed {:?}; retrying", sub, e); - last_err = Some(e.into()); - continue; - } - }; - }; - - Err(last_err.expect("looped 10 times must be Some; qed")) - } + let mut subscription: Subscription
= client.subscribe(&sub, None, &unsub).await?; - let (tx, mut rx) = mpsc::unbounded_channel::>(); - let mut subscription = create_header_subscription(&*client, sub, unsub).await?; + let client = Arc::new(client); + let (tx, mut rx) = mpsc::unbounded_channel::(); loop { let at = tokio::select! { - Some(rp) = subscription.next() => { - match rp { - Ok(r) => r, - Err(RpcError::SubscriptionClosed(reason)) => { + maybe_rp = subscription.next() => { + match maybe_rp { + Some(Ok(r)) => r, + // Custom `jsonrpsee` message; should not occur. + Some(Err(RpcError::SubscriptionClosed(reason))) => { log::debug!("[rpc]: subscription closed by the server: {:?}, starting a new one", reason); continue; } - Err(e) => { + Some(Err(e)) => { log::error!("[rpc]: subscription failed to decode Header {:?}, this is bug please file an issue", e); return Err(e.into()); } + // The subscription was dropped, should only happen if: + // - the connection was closed. + // - the subscription could not need keep up with the server. + None => { + log::warn!("[rpc]: restarting header subscription"); + subscription = client.subscribe(&sub, None, &unsub).await?; + log::warn!(target: LOG_TARGET, "subscription to {} terminated. Retrying..", sub); + continue + } } }, - Some(err) = rx.recv() => return Err(err), - else => { - log::warn!("[rpc]: restarting header subscription"); - subscription = create_header_subscription(&*client, sub, unsub).await?; - log::warn!(target: LOG_TARGET, "subscription to {} terminated. Retrying..", sub); - continue + maybe_err = rx.recv() => { + match maybe_err { + Some(err) => return Err(err), + None => unreachable!("at least one sender kept in the main loop should always return Some; qed"), + } } }; @@ -143,7 +127,9 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { // Spawn task and non-recoverable errors are sent back to the main task // such as if the connection has been closed. - tokio::spawn(send_and_watch_extrinsic(client.clone(), tx.clone(), at, signer.clone(), shared.clone(), config.clone())); + tokio::spawn( + send_and_watch_extrinsic(client.clone(), tx.clone(), at, signer.clone(), shared.clone(), config.clone()) + ); } /// Construct extrinsic at given block and watch it. @@ -234,7 +220,6 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { ).await { Ok(sub) => sub, Err(RpcError::RestartNeeded(e)) => { - log::error!("{:?}", e); let _ = tx.send(RpcError::RestartNeeded(e).into()); return }, @@ -258,6 +243,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { while let Some(rp) = tx_subscription.next().await { let status_update = match rp { Ok(r) => r, + // Custom `jsonrpsee` message; should not occur. Err(RpcError::SubscriptionClosed(reason)) => { log::warn!( "[rpc]: subscription closed by the server: {:?}; continuing...", @@ -282,14 +268,25 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { let key = StorageKey( frame_support::storage::storage_prefix(b"System", b"Events").to_vec(), ); + let key2 = key.clone(); - // TODO(niklasad1): what to do if this fails; the task will die here now - let events = get_storage::< + let events = match get_storage::< Vec::Hash>>, >(&*client, rpc_params! { key, hash }) - .await - .unwrap() - .unwrap_or_default(); + .await { + Ok(rp) => rp.unwrap_or_default(), + Err(RpcHelperError::JsonRpsee(RpcError::RestartNeeded(e))) => { + let _ = tx.send(RpcError::RestartNeeded(e).into()); + return; + } + // Decoding or other RPC error => just terminate the task. + Err(e) => { + log::warn!(target: LOG_TARGET, "get_storage [key: {:?}, hash: {:?}] failed: {:?}", + key2, hash, e + ); + return; + } + }; log::info!(target: LOG_TARGET, "events at inclusion {:?}", events); }, From 5fddbbec40702c122b079526915cabac8facdbc1 Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 24 Jan 2022 11:49:57 +0100 Subject: [PATCH 07/10] revert polkadot runtime changes --- runtime/polkadot/constants/src/lib.rs | 2 +- runtime/polkadot/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/polkadot/constants/src/lib.rs b/runtime/polkadot/constants/src/lib.rs index 2e48a11a1115..37c26c62074f 100644 --- a/runtime/polkadot/constants/src/lib.rs +++ b/runtime/polkadot/constants/src/lib.rs @@ -40,7 +40,7 @@ pub mod time { use primitives::v0::{BlockNumber, Moment}; pub const MILLISECS_PER_BLOCK: Moment = 6000; pub const SLOT_DURATION: Moment = MILLISECS_PER_BLOCK; - pub const EPOCH_DURATION_IN_SLOTS: BlockNumber = 1 * MINUTES; + pub const EPOCH_DURATION_IN_SLOTS: BlockNumber = 4 * HOURS; // These time units are defined in number of blocks. pub const MINUTES: BlockNumber = 60_000 / (MILLISECS_PER_BLOCK as BlockNumber); diff --git a/runtime/polkadot/src/lib.rs b/runtime/polkadot/src/lib.rs index bfcffb6ad2ba..13dd2375e0f9 100644 --- a/runtime/polkadot/src/lib.rs +++ b/runtime/polkadot/src/lib.rs @@ -534,7 +534,7 @@ pallet_staking_reward_curve::build! { parameter_types! { // Six sessions in an era (24 hours). - pub const SessionsPerEra: SessionIndex = 2; + pub const SessionsPerEra: SessionIndex = 6; // 28 eras for unbonding (28 days). pub const BondingDuration: sp_staking::EraIndex = 28; pub const SlashDeferDuration: sp_staking::EraIndex = 27; From 8ea11a3b1c56289d78da790c844e9cfd3cd373d0 Mon Sep 17 00:00:00 2001 From: Niklas Date: Mon, 24 Jan 2022 22:02:02 +0100 Subject: [PATCH 08/10] fix grumbles --- utils/staking-miner/src/monitor.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index b7d9c9df8948..cb1f42e8c3e9 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -97,20 +97,20 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { Some(Ok(r)) => r, // Custom `jsonrpsee` message; should not occur. Some(Err(RpcError::SubscriptionClosed(reason))) => { - log::debug!("[rpc]: subscription closed by the server: {:?}, starting a new one", reason); + log::warn!(target: LOG_TARGET, "subscription to {} terminated: {:?}. Retrying..", reason, sub); + subscription = client.subscribe(&sub, None, &unsub).await?; continue; } Some(Err(e)) => { - log::error!("[rpc]: subscription failed to decode Header {:?}, this is bug please file an issue", e); + log::error!(target: LOG_TARGET, "subscription failed to decode Header {:?}, this is bug please file an issue", e); return Err(e.into()); } // The subscription was dropped, should only happen if: // - the connection was closed. // - the subscription could not need keep up with the server. None => { - log::warn!("[rpc]: restarting header subscription"); - subscription = client.subscribe(&sub, None, &unsub).await?; log::warn!(target: LOG_TARGET, "subscription to {} terminated. Retrying..", sub); + subscription = client.subscribe(&sub, None, &unsub).await?; continue } } From f4a3b7453fbfa590650fd056fe9f75f8f46c7c68 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Mon, 24 Jan 2022 22:26:57 +0100 Subject: [PATCH 09/10] Update utils/staking-miner/src/monitor.rs --- utils/staking-miner/src/monitor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index cb1f42e8c3e9..441d2e3235be 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -97,7 +97,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { Some(Ok(r)) => r, // Custom `jsonrpsee` message; should not occur. Some(Err(RpcError::SubscriptionClosed(reason))) => { - log::warn!(target: LOG_TARGET, "subscription to {} terminated: {:?}. Retrying..", reason, sub); + log::warn!(target: LOG_TARGET, "subscription to {} terminated: {:?}. Retrying..", sub, reason); subscription = client.subscribe(&sub, None, &unsub).await?; continue; } From 8ee8595bb19ca4200d477d164513e36225454672 Mon Sep 17 00:00:00 2001 From: Niklas Date: Sat, 5 Feb 2022 13:56:05 +0100 Subject: [PATCH 10/10] grumbles: fix logs + nits --- utils/staking-miner/src/monitor.rs | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/utils/staking-miner/src/monitor.rs b/utils/staking-miner/src/monitor.rs index 441d2e3235be..8d71b242a411 100644 --- a/utils/staking-miner/src/monitor.rs +++ b/utils/staking-miner/src/monitor.rs @@ -95,7 +95,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { maybe_rp = subscription.next() => { match maybe_rp { Some(Ok(r)) => r, - // Custom `jsonrpsee` message; should not occur. + // Custom `jsonrpsee` message sent by the server if the subscription was closed on the server side. Some(Err(RpcError::SubscriptionClosed(reason))) => { log::warn!(target: LOG_TARGET, "subscription to {} terminated: {:?}. Retrying..", sub, reason); subscription = client.subscribe(&sub, None, &unsub).await?; @@ -107,7 +107,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { } // The subscription was dropped, should only happen if: // - the connection was closed. - // - the subscription could not need keep up with the server. + // - the subscription could not keep up with the server. None => { log::warn!(target: LOG_TARGET, "subscription to {} terminated. Retrying..", sub); subscription = client.subscribe(&sub, None, &unsub).await?; @@ -123,8 +123,6 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { } }; - log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?} at: {}", sub, unsub, at.number()); - // Spawn task and non-recoverable errors are sent back to the main task // such as if the connection has been closed. tokio::spawn( @@ -233,8 +231,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { // computation outside of this callback. log::warn!( target: LOG_TARGET, - "failing to submit a transaction {:?}. continuing...", - why + "failing to submit a transaction {:?}. ignore block: {}", + why, at.number ); return; }, @@ -243,16 +241,17 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { while let Some(rp) = tx_subscription.next().await { let status_update = match rp { Ok(r) => r, - // Custom `jsonrpsee` message; should not occur. + // Custom `jsonrpsee` message sent by the server if the subscription was closed on the server side. Err(RpcError::SubscriptionClosed(reason)) => { log::warn!( - "[rpc]: subscription closed by the server: {:?}; continuing...", - reason + target: LOG_TARGET, + "tx subscription closed by the server: {:?}; skip block: {}", + reason, at.number ); - continue + return; }, Err(e) => { - log::error!("[rpc]: subscription failed to decode TransactionStatus {:?}, this is a bug please file an issue", e); + log::error!(target: LOG_TARGET, "subscription failed to decode TransactionStatus {:?}, this is a bug please file an issue", e); let _ = tx.send(e.into()); return; }, @@ -281,8 +280,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { } // Decoding or other RPC error => just terminate the task. Err(e) => { - log::warn!(target: LOG_TARGET, "get_storage [key: {:?}, hash: {:?}] failed: {:?}", - key2, hash, e + log::warn!(target: LOG_TARGET, "get_storage [key: {:?}, hash: {:?}] failed: {:?}; skip block: {}", + key2, hash, e, at.number ); return; }