Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions parity/light_helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,5 @@
//! Utilities and helpers for the light client.

mod epoch_fetch;
mod queue_cull;

pub use self::epoch_fetch::EpochFetch;
pub use self::queue_cull::QueueCull;
105 changes: 0 additions & 105 deletions parity/light_helpers/queue_cull.rs

This file was deleted.

11 changes: 0 additions & 11 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,17 +295,6 @@ fn execute_light_impl<Cr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq
// spin up event loop
let runtime = Runtime::with_default_thread_count();

// queue cull service.
let queue_cull = Arc::new(::light_helpers::QueueCull {
client: client.clone(),
sync: light_sync.clone(),
on_demand: on_demand.clone(),
txq: txq.clone(),
executor: runtime.executor(),
});

service.register_handler(queue_cull).map_err(|e| format!("Error attaching service: {:?}", e))?;

// start the network.
light_sync.start_network();

Expand Down
44 changes: 42 additions & 2 deletions rpc/src/v1/helpers/light_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

//! Helpers for fetching blockchain data either from the light client or the network.

use std::cmp;
use std::clone::Clone;
use std::cmp;
use std::collections::BTreeMap;
use std::sync::Arc;

use types::basic_account::BasicAccount;
Expand Down Expand Up @@ -48,7 +49,6 @@ use ethereum_types::{Address, U256};
use hash::H256;
use parking_lot::{Mutex, RwLock};
use fastmap::H256FastMap;
use std::collections::BTreeMap;
use types::transaction::{Action, Transaction as EthTransaction, PendingTransaction, SignedTransaction, LocalizedTransaction};

use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch};
Expand Down Expand Up @@ -522,6 +522,46 @@ where
}))
}

/// Helper to cull the `light` transaction queue of mined transactions
pub fn light_cull(&self, txq: Arc<RwLock<TransactionQueue>>) -> impl Future <Item = (), Error = Error> + Send {
Comment thread
niklasad1 marked this conversation as resolved.
let senders = txq.read().queued_senders();
if senders.is_empty() {
return Either::B(future::err(errors::internal("No pending local transactions", "")));
}

let sync = self.sync.clone();
let on_demand = self.on_demand.clone();
let best_header = self.client.best_block_header();
let start_nonce = self.client.engine().account_start_nonce(best_header.number());

let account_request = sync.with_context(move |ctx| {
// fetch the nonce of each sender in the queue.
let nonce_reqs = senders.iter()
.map(|&address| request::Account { header: best_header.clone().into(), address })
.collect::<Vec<_>>();

// when they come in, update each sender to the new nonce.
on_demand.request(ctx, nonce_reqs)
.expect(NO_INVALID_BACK_REFS_PROOF)
.map(move |accs| {
let mut txq = txq.write();
accs.into_iter()
.map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce))
.zip(senders)
.for_each(|(nonce, addr)| {
txq.cull(addr, nonce);
});
})
.map_err(errors::on_demand_error)
});

if let Some(fut) = account_request {
Either::A(fut)
} else {
Either::B(future::err(errors::network_disabled()))
}
}

fn send_requests<T, F>(&self, reqs: Vec<OnDemandRequest>, parse_response: F) -> impl Future<Item = T, Error = Error> + Send where
F: FnOnce(Vec<OnDemandResponse>) -> T + Send + 'static,
T: Send + 'static,
Expand Down
15 changes: 11 additions & 4 deletions rpc/src/v1/impls/light/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,22 @@ where
}

fn transaction_by_hash(&self, hash: H256) -> BoxFuture<Option<Transaction>> {
{
let tx_queue = self.transaction_queue.read();
if let Some(tx) = tx_queue.get(&hash) {
let in_txqueue = self.transaction_queue.read().get(&hash).is_some();

// The transaction is in the `local txqueue` then fetch the latest state from the network and attempt
// to cull the transaction queue.
if in_txqueue {
// Note, this will block (relies on HTTP timeout) to make sure `cull` will finish to avoid having to call
// `eth_getTransactionByHash` more than once to ensure the `txqueue` is up to `date` when it is called
if let Err(e) = self.fetcher().light_cull(self.transaction_queue.clone()).wait() {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably the most controversial thing about this PR.

The light_cull could be spawned in an executor but then we would most likely not get result in this call (accessing an atomic variable is much faster than a network request) and we would have to call this at least one more time to get the result!

debug!(target: "cull", "failed because of: {:?}", e);
}
if let Some(tx) = self.transaction_queue.read().get(&hash) {
return Box::new(future::ok(Some(Transaction::from_pending(
tx.clone(),
))));
}
}

Box::new(self.fetcher().transaction_by_hash(hash).map(|x| x.map(|(tx, _)| tx)))
}

Expand Down