diff --git a/Cargo.lock b/Cargo.lock
index 6dff31936f8..a56d1b663fc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1152,8 +1152,8 @@ dependencies = [
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"macros 0.1.0",
"parity-bytes 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
- "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-runtime 0.1.0",
+ "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rlp 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/parity/light_helpers/mod.rs b/parity/light_helpers/mod.rs
index 9a9bbf2cd87..843dd419d4c 100644
--- a/parity/light_helpers/mod.rs
+++ b/parity/light_helpers/mod.rs
@@ -17,7 +17,5 @@
//! Utilities and helpers for the light client.
mod epoch_fetch;
-mod queue_cull;
pub use self::epoch_fetch::EpochFetch;
-pub use self::queue_cull::QueueCull;
diff --git a/parity/light_helpers/queue_cull.rs b/parity/light_helpers/queue_cull.rs
deleted file mode 100644
index 693d8f93cff..00000000000
--- a/parity/light_helpers/queue_cull.rs
+++ /dev/null
@@ -1,105 +0,0 @@
-// Copyright 2015-2019 Parity Technologies (UK) Ltd.
-// This file is part of Parity Ethereum.
-
-// Parity Ethereum is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Parity Ethereum is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Parity Ethereum. If not, see .
-
-//! Service for culling the light client's transaction queue.
-
-use std::sync::Arc;
-use std::time::Duration;
-
-use ethcore::client::ClientIoMessage;
-use sync::{LightSync, LightNetworkDispatcher};
-use io::{IoContext, IoHandler, TimerToken};
-
-use light::client::LightChainClient;
-use light::on_demand::{request, OnDemand, OnDemandRequester};
-use light::TransactionQueue;
-
-use futures::{future, Future};
-
-use parity_runtime::Executor;
-
-use parking_lot::RwLock;
-
-// Attepmt to cull once every 10 minutes.
-const TOKEN: TimerToken = 1;
-const TIMEOUT: Duration = Duration::from_secs(60 * 10);
-
-// But make each attempt last only 9 minutes
-const PURGE_TIMEOUT: Duration = Duration::from_secs(60 * 9);
-
-/// Periodically culls the transaction queue of mined transactions.
-pub struct QueueCull {
- /// A handle to the client, for getting the latest block header.
- pub client: Arc,
- /// A handle to the sync service.
- pub sync: Arc,
- /// The on-demand request service.
- pub on_demand: Arc,
- /// The transaction queue.
- pub txq: Arc>,
- /// Event loop executor.
- pub executor: Executor,
-}
-
-impl IoHandler for QueueCull {
- fn initialize(&self, io: &IoContext) {
- io.register_timer(TOKEN, TIMEOUT).expect("Error registering timer");
- }
-
- fn timeout(&self, _io: &IoContext, timer: TimerToken) {
- if timer != TOKEN { return }
-
- let senders = self.txq.read().queued_senders();
- if senders.is_empty() { return }
-
- let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone());
- let best_header = self.client.best_block_header();
- let start_nonce = self.client.engine().account_start_nonce(best_header.number());
-
- info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len());
- self.executor.spawn_with_timeout(move || {
- let maybe_fetching = sync.with_context(move |ctx| {
- // fetch the nonce of each sender in the queue.
- let nonce_reqs = senders.iter()
- .map(|&address| request::Account { header: best_header.clone().into(), address: address })
- .collect::>();
-
- // when they come in, update each sender to the new nonce.
- on_demand.request(ctx, nonce_reqs)
- .expect("No back-references; therefore all back-references are valid; qed")
- .map(move |accs| {
- let txq = txq.write();
- let _ = accs.into_iter()
- .map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce))
- .zip(senders)
- .fold(txq, |mut txq, (nonce, addr)| {
- txq.cull(addr, nonce);
- txq
- });
- })
- .map_err(|_| debug!(target: "cull", "OnDemand prematurely closed channel."))
- });
-
- match maybe_fetching {
- Some(fut) => future::Either::A(fut),
- None => {
- debug!(target: "cull", "Unable to acquire network context; qed");
- future::Either::B(future::ok(()))
- },
- }
- }, PURGE_TIMEOUT, || {})
- }
-}
diff --git a/parity/run.rs b/parity/run.rs
index 8160c930d52..b6f8c53a019 100644
--- a/parity/run.rs
+++ b/parity/run.rs
@@ -295,17 +295,6 @@ fn execute_light_impl(cmd: RunCmd, logger: Arc, on_client_rq
// spin up event loop
let runtime = Runtime::with_default_thread_count();
- // queue cull service.
- let queue_cull = Arc::new(::light_helpers::QueueCull {
- client: client.clone(),
- sync: light_sync.clone(),
- on_demand: on_demand.clone(),
- txq: txq.clone(),
- executor: runtime.executor(),
- });
-
- service.register_handler(queue_cull).map_err(|e| format!("Error attaching service: {:?}", e))?;
-
// start the network.
light_sync.start_network();
diff --git a/rpc/src/v1/helpers/light_fetch.rs b/rpc/src/v1/helpers/light_fetch.rs
index 540772a5d11..92b7f2ac0bc 100644
--- a/rpc/src/v1/helpers/light_fetch.rs
+++ b/rpc/src/v1/helpers/light_fetch.rs
@@ -16,8 +16,9 @@
//! Helpers for fetching blockchain data either from the light client or the network.
-use std::cmp;
use std::clone::Clone;
+use std::cmp;
+use std::collections::BTreeMap;
use std::sync::Arc;
use types::basic_account::BasicAccount;
@@ -48,7 +49,6 @@ use ethereum_types::{Address, U256};
use hash::H256;
use parking_lot::{Mutex, RwLock};
use fastmap::H256FastMap;
-use std::collections::BTreeMap;
use types::transaction::{Action, Transaction as EthTransaction, PendingTransaction, SignedTransaction, LocalizedTransaction};
use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch};
@@ -522,6 +522,46 @@ where
}))
}
+ /// Helper to cull the `light` transaction queue of mined transactions
+ pub fn light_cull(&self, txq: Arc>) -> impl Future - + Send {
+ let senders = txq.read().queued_senders();
+ if senders.is_empty() {
+ return Either::B(future::err(errors::internal("No pending local transactions", "")));
+ }
+
+ let sync = self.sync.clone();
+ let on_demand = self.on_demand.clone();
+ let best_header = self.client.best_block_header();
+ let start_nonce = self.client.engine().account_start_nonce(best_header.number());
+
+ let account_request = sync.with_context(move |ctx| {
+ // fetch the nonce of each sender in the queue.
+ let nonce_reqs = senders.iter()
+ .map(|&address| request::Account { header: best_header.clone().into(), address })
+ .collect::>();
+
+ // when they come in, update each sender to the new nonce.
+ on_demand.request(ctx, nonce_reqs)
+ .expect(NO_INVALID_BACK_REFS_PROOF)
+ .map(move |accs| {
+ let mut txq = txq.write();
+ accs.into_iter()
+ .map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce))
+ .zip(senders)
+ .for_each(|(nonce, addr)| {
+ txq.cull(addr, nonce);
+ });
+ })
+ .map_err(errors::on_demand_error)
+ });
+
+ if let Some(fut) = account_request {
+ Either::A(fut)
+ } else {
+ Either::B(future::err(errors::network_disabled()))
+ }
+ }
+
fn send_requests(&self, reqs: Vec, parse_response: F) -> impl Future
- + Send where
F: FnOnce(Vec) -> T + Send + 'static,
T: Send + 'static,
diff --git a/rpc/src/v1/impls/light/eth.rs b/rpc/src/v1/impls/light/eth.rs
index 73e2b99c61d..6467bfbc78b 100644
--- a/rpc/src/v1/impls/light/eth.rs
+++ b/rpc/src/v1/impls/light/eth.rs
@@ -420,15 +420,22 @@ where
}
fn transaction_by_hash(&self, hash: H256) -> BoxFuture