From 99fa56d76f4de8ace6d507b165205e7a3feb0923 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sat, 30 Mar 2019 10:46:58 +0100 Subject: [PATCH 1/4] fix(light cull): poll light cull instead of timer --- parity/light_helpers/mod.rs | 2 - parity/light_helpers/queue_cull.rs | 105 ----------------------------- parity/run.rs | 11 --- rpc/src/v1/helpers/errors.rs | 9 +++ rpc/src/v1/helpers/light_fetch.rs | 43 +++++++++++- rpc/src/v1/impls/light/eth.rs | 17 +++-- 6 files changed, 63 insertions(+), 124 deletions(-) delete mode 100644 parity/light_helpers/queue_cull.rs diff --git a/parity/light_helpers/mod.rs b/parity/light_helpers/mod.rs index 9a9bbf2cd87..843dd419d4c 100644 --- a/parity/light_helpers/mod.rs +++ b/parity/light_helpers/mod.rs @@ -17,7 +17,5 @@ //! Utilities and helpers for the light client. mod epoch_fetch; -mod queue_cull; pub use self::epoch_fetch::EpochFetch; -pub use self::queue_cull::QueueCull; diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs deleted file mode 100644 index 693d8f93cff..00000000000 --- a/parity/light_helpers/queue_cull.rs +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2015-2019 Parity Technologies (UK) Ltd. -// This file is part of Parity Ethereum. - -// Parity Ethereum is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Parity Ethereum is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Parity Ethereum. If not, see . - -//! Service for culling the light client's transaction queue. - -use std::sync::Arc; -use std::time::Duration; - -use ethcore::client::ClientIoMessage; -use sync::{LightSync, LightNetworkDispatcher}; -use io::{IoContext, IoHandler, TimerToken}; - -use light::client::LightChainClient; -use light::on_demand::{request, OnDemand, OnDemandRequester}; -use light::TransactionQueue; - -use futures::{future, Future}; - -use parity_runtime::Executor; - -use parking_lot::RwLock; - -// Attepmt to cull once every 10 minutes. -const TOKEN: TimerToken = 1; -const TIMEOUT: Duration = Duration::from_secs(60 * 10); - -// But make each attempt last only 9 minutes -const PURGE_TIMEOUT: Duration = Duration::from_secs(60 * 9); - -/// Periodically culls the transaction queue of mined transactions. -pub struct QueueCull { - /// A handle to the client, for getting the latest block header. - pub client: Arc, - /// A handle to the sync service. - pub sync: Arc, - /// The on-demand request service. - pub on_demand: Arc, - /// The transaction queue. - pub txq: Arc>, - /// Event loop executor. - pub executor: Executor, -} - -impl IoHandler for QueueCull { - fn initialize(&self, io: &IoContext) { - io.register_timer(TOKEN, TIMEOUT).expect("Error registering timer"); - } - - fn timeout(&self, _io: &IoContext, timer: TimerToken) { - if timer != TOKEN { return } - - let senders = self.txq.read().queued_senders(); - if senders.is_empty() { return } - - let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone()); - let best_header = self.client.best_block_header(); - let start_nonce = self.client.engine().account_start_nonce(best_header.number()); - - info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len()); - self.executor.spawn_with_timeout(move || { - let maybe_fetching = sync.with_context(move |ctx| { - // fetch the nonce of each sender in the queue. - let nonce_reqs = senders.iter() - .map(|&address| request::Account { header: best_header.clone().into(), address: address }) - .collect::>(); - - // when they come in, update each sender to the new nonce. - on_demand.request(ctx, nonce_reqs) - .expect("No back-references; therefore all back-references are valid; qed") - .map(move |accs| { - let txq = txq.write(); - let _ = accs.into_iter() - .map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce)) - .zip(senders) - .fold(txq, |mut txq, (nonce, addr)| { - txq.cull(addr, nonce); - txq - }); - }) - .map_err(|_| debug!(target: "cull", "OnDemand prematurely closed channel.")) - }); - - match maybe_fetching { - Some(fut) => future::Either::A(fut), - None => { - debug!(target: "cull", "Unable to acquire network context; qed"); - future::Either::B(future::ok(())) - }, - } - }, PURGE_TIMEOUT, || {}) - } -} diff --git a/parity/run.rs b/parity/run.rs index 8160c930d52..b6f8c53a019 100644 --- a/parity/run.rs +++ b/parity/run.rs @@ -295,17 +295,6 @@ fn execute_light_impl(cmd: RunCmd, logger: Arc, on_client_rq // spin up event loop let runtime = Runtime::with_default_thread_count(); - // queue cull service. - let queue_cull = Arc::new(::light_helpers::QueueCull { - client: client.clone(), - sync: light_sync.clone(), - on_demand: on_demand.clone(), - txq: txq.clone(), - executor: runtime.executor(), - }); - - service.register_handler(queue_cull).map_err(|e| format!("Error attaching service: {:?}", e))?; - // start the network. light_sync.start_network(); diff --git a/rpc/src/v1/helpers/errors.rs b/rpc/src/v1/helpers/errors.rs index c04374d6562..2a8285b93c2 100644 --- a/rpc/src/v1/helpers/errors.rs +++ b/rpc/src/v1/helpers/errors.rs @@ -57,6 +57,7 @@ mod codes { pub const FETCH_ERROR: i64 = -32060; pub const NO_LIGHT_PEERS: i64 = -32065; pub const NO_PEERS: i64 = -32066; + pub const NO_LIGHT_TRANSACTIONS: i64 = -32066; pub const DEPRECATED: i64 = -32070; pub const EXPERIMENTAL_RPC: i64 = -32071; pub const CANNOT_RESTART: i64 = -32080; @@ -504,6 +505,14 @@ pub fn no_light_peers() -> Error { } } +pub fn no_light_transactions() -> Error { + Error { + code: ErrorCode::ServerError(codes::NO_LIGHT_TRANSACTIONS), + message: "No pending transactions in the light transaction queue".into(), + data: None, + } +} + pub fn deprecated, T: Into>>(message: T) -> Error { Error { code: ErrorCode::ServerError(codes::DEPRECATED), diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 540772a5d11..94216299569 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -16,8 +16,9 @@ //! Helpers for fetching blockchain data either from the light client or the network. -use std::cmp; use std::clone::Clone; +use std::cmp; +use std::collections::BTreeMap; use std::sync::Arc; use types::basic_account::BasicAccount; @@ -48,7 +49,6 @@ use ethereum_types::{Address, U256}; use hash::H256; use parking_lot::{Mutex, RwLock}; use fastmap::H256FastMap; -use std::collections::BTreeMap; use types::transaction::{Action, Transaction as EthTransaction, PendingTransaction, SignedTransaction, LocalizedTransaction}; use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch}; @@ -522,6 +522,45 @@ where })) } + pub fn light_cull(&self, txq: Arc>) -> impl Future + Send { + let senders = txq.read().queued_senders(); + if senders.is_empty() { + return Either::B(future::err(errors::no_light_transactions())); + } + + let sync = self.sync.clone(); + let on_demand = self.on_demand.clone(); + let best_header = self.client.best_block_header(); + let start_nonce = self.client.engine().account_start_nonce(best_header.number()); + + let account_request = sync.with_context(move |ctx| { + // fetch the nonce of each sender in the queue. + let nonce_reqs = senders.iter() + .map(|&address| request::Account { header: best_header.clone().into(), address }) + .collect::>(); + + // when they come in, update each sender to the new nonce. + on_demand.request(ctx, nonce_reqs) + .expect("No back-references; therefore all back-references are valid; qed") + .map(move |accs| { + let mut txq = txq.write(); + accs.into_iter() + .map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce)) + .zip(senders) + .for_each(|(nonce, addr)| { + txq.cull(addr, nonce); + }); + }) + .map_err(errors::on_demand_error) + }); + + if let Some(fut) = account_request { + Either::A(fut) + } else { + Either::B(future::err(errors::network_disabled())) + } + } + fn send_requests(&self, reqs: Vec, parse_response: F) -> impl Future + Send where F: FnOnce(Vec) -> T + Send + 'static, T: Send + 'static, diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index 73e2b99c61d..5abb2d16593 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -420,15 +420,24 @@ where } fn transaction_by_hash(&self, hash: H256) -> BoxFuture> { - { - let tx_queue = self.transaction_queue.read(); - if let Some(tx) = tx_queue.get(&hash) { + let in_txqueue = self.transaction_queue.read().get(&hash).is_some(); + + // The transaction is in the `local txqueue` + // Check if the nonce on the accounts are still fresh and return result from + // the `local txqueue` if it is still fresh. + // Otherwise make a network request to fetch the transaction from the network + if in_txqueue { + // Note, this will block (relies on HTTP timeout) to make sure `cull` will finish to avoid having to call + // `eth_getTransactionByHash` more than once to ensure the `txqueue` is up to `date` when it is called + if let Err(e) = self.fetcher().light_cull(self.transaction_queue.clone()).wait() { + debug!(target: "cull", "failed because of: {:?}", e); + } + if let Some(tx) = self.transaction_queue.read().get(&hash) { return Box::new(future::ok(Some(Transaction::from_pending( tx.clone(), )))); } } - Box::new(self.fetcher().transaction_by_hash(hash).map(|x| x.map(|(tx, _)| tx))) } From 23d41cb8817201f847aafaeb5adf0a2dec67609d Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 3 Apr 2019 14:46:53 +0200 Subject: [PATCH 2/4] fix(grumbles): remove error + updated docs --- Cargo.lock | 2 +- rpc/src/v1/helpers/errors.rs | 9 --------- rpc/src/v1/helpers/light_fetch.rs | 3 ++- rpc/src/v1/impls/light/eth.rs | 8 ++++---- 4 files changed, 7 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6dff31936f8..a56d1b663fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1152,8 +1152,8 @@ dependencies = [ "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "macros 0.1.0", "parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "parity-runtime 0.1.0", + "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/rpc/src/v1/helpers/errors.rs b/rpc/src/v1/helpers/errors.rs index 2a8285b93c2..c04374d6562 100644 --- a/rpc/src/v1/helpers/errors.rs +++ b/rpc/src/v1/helpers/errors.rs @@ -57,7 +57,6 @@ mod codes { pub const FETCH_ERROR: i64 = -32060; pub const NO_LIGHT_PEERS: i64 = -32065; pub const NO_PEERS: i64 = -32066; - pub const NO_LIGHT_TRANSACTIONS: i64 = -32066; pub const DEPRECATED: i64 = -32070; pub const EXPERIMENTAL_RPC: i64 = -32071; pub const CANNOT_RESTART: i64 = -32080; @@ -505,14 +504,6 @@ pub fn no_light_peers() -> Error { } } -pub fn no_light_transactions() -> Error { - Error { - code: ErrorCode::ServerError(codes::NO_LIGHT_TRANSACTIONS), - message: "No pending transactions in the light transaction queue".into(), - data: None, - } -} - pub fn deprecated, T: Into>>(message: T) -> Error { Error { code: ErrorCode::ServerError(codes::DEPRECATED), diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 94216299569..1aa9f4ffd8e 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -522,10 +522,11 @@ where })) } + /// Helper to cull the `light` transaction queue of mined transactions pub fn light_cull(&self, txq: Arc>) -> impl Future + Send { let senders = txq.read().queued_senders(); if senders.is_empty() { - return Either::B(future::err(errors::no_light_transactions())); + return Either::B(future::err(errors::internal("No pending local transactions", ""))); } let sync = self.sync.clone(); diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index 5abb2d16593..31a5b86ecdb 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -422,10 +422,10 @@ where fn transaction_by_hash(&self, hash: H256) -> BoxFuture> { let in_txqueue = self.transaction_queue.read().get(&hash).is_some(); - // The transaction is in the `local txqueue` - // Check if the nonce on the accounts are still fresh and return result from - // the `local txqueue` if it is still fresh. - // Otherwise make a network request to fetch the transaction from the network + // The transaction is in the `local txqueue` then fetch the latest state from the network and attempt + // to cull the transaction queue. + // + // If it fails return the local transaction if in_txqueue { // Note, this will block (relies on HTTP timeout) to make sure `cull` will finish to avoid having to call // `eth_getTransactionByHash` more than once to ensure the `txqueue` is up to `date` when it is called From bcb38a2ed8963e5070ffaaba902adbed03d3e7fe Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 3 Apr 2019 14:58:09 +0200 Subject: [PATCH 3/4] fix(on-demand request): `expect()` reason --- rpc/src/v1/helpers/light_fetch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs index 1aa9f4ffd8e..92b7f2ac0bc 100644 --- a/rpc/src/v1/helpers/light_fetch.rs +++ b/rpc/src/v1/helpers/light_fetch.rs @@ -542,7 +542,7 @@ where // when they come in, update each sender to the new nonce. on_demand.request(ctx, nonce_reqs) - .expect("No back-references; therefore all back-references are valid; qed") + .expect(NO_INVALID_BACK_REFS_PROOF) .map(move |accs| { let mut txq = txq.write(); accs.into_iter() From bef325a54e38dcedf6337b0cf4da42f5fc78ae66 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Wed, 3 Apr 2019 16:28:38 +0200 Subject: [PATCH 4/4] docs(remove misleading info) --- rpc/src/v1/impls/light/eth.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs index 31a5b86ecdb..6467bfbc78b 100644 --- a/rpc/src/v1/impls/light/eth.rs +++ b/rpc/src/v1/impls/light/eth.rs @@ -424,8 +424,6 @@ where // The transaction is in the `local txqueue` then fetch the latest state from the network and attempt // to cull the transaction queue. - // - // If it fails return the local transaction if in_txqueue { // Note, this will block (relies on HTTP timeout) to make sure `cull` will finish to avoid having to call // `eth_getTransactionByHash` more than once to ensure the `txqueue` is up to `date` when it is called