Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.
Merged
5 changes: 2 additions & 3 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ pub use verification::queue::QueueInfo as BlockQueueInfo;

use_contract!(registry, "Registry", "res/contracts/registrar.json");

const MAX_TX_QUEUE_SIZE: usize = 4096;
const MAX_ANCIENT_BLOCKS_QUEUE_SIZE: usize = 4096;
// Max number of blocks imported at once.
const MAX_ANCIENT_BLOCKS_TO_IMPORT: usize = 4;
Expand Down Expand Up @@ -760,13 +759,12 @@ impl Client {
tracedb: tracedb,
engine: engine,
pruning: config.pruning.clone(),
config: config,
db: RwLock::new(db.clone()),
state_db: RwLock::new(state_db),
report: RwLock::new(Default::default()),
io_channel: Mutex::new(message_channel),
notify: RwLock::new(Vec::new()),
queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE),
queue_transactions: IoChannelQueue::new(config.transaction_verification_queue_size),
queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE),
queued_ancient_blocks: Default::default(),
ancient_blocks_import_lock: Default::default(),
Expand All @@ -779,6 +777,7 @@ impl Client {
registrar_address,
exit_handler: Mutex::new(None),
importer,
config,
});

// prune old states.
Expand Down
41 changes: 29 additions & 12 deletions ethcore/src/client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,6 @@ pub enum Mode {
Off,
}

impl Default for Mode {
fn default() -> Self {
Mode::Active
}
}

impl Display for Mode {
fn fmt(&self, f: &mut Formatter) -> Result<(), FmtError> {
match *self {
Expand All @@ -88,7 +82,7 @@ impl Display for Mode {
}

/// Client configuration. Includes configs for all sub-systems.
#[derive(Debug, PartialEq, Default, Clone)]
#[derive(Debug, PartialEq, Clone)]
pub struct ClientConfig {
/// Block queue configuration.
pub queue: QueueConfig,
Expand Down Expand Up @@ -126,8 +120,36 @@ pub struct ClientConfig {
pub history_mem: usize,
/// Check seal valididity on block import
pub check_seal: bool,
/// Maximal number of transactions queued for verification in a separate thread.
pub transaction_verification_queue_size: usize,
}

impl Default for ClientConfig {
fn default() -> Self {
let mb = 1024 * 1024;
ClientConfig {
queue: Default::default(),
blockchain: Default::default(),
tracing: Default::default(),
vm_type: Default::default(),
fat_db: false,
pruning: journaldb::Algorithm::OverlayRecent,
name: "default".into(),
db_cache_size: None,
db_compaction: Default::default(),
db_wal: true,
mode: Mode::Active,
spec_name: "".into(),
verifier_type: VerifierType::Canon,
state_cache_size: 1 * mb,
jump_table_size: 1 * mb,
history: 64,
history_mem: 32 * mb,
check_seal: true,
transaction_verification_queue_size: 8192,
}
}
}
#[cfg(test)]
mod test {
use super::{DatabaseCompactionProfile, Mode};
Expand All @@ -143,9 +165,4 @@ mod test {
assert_eq!(DatabaseCompactionProfile::SSD, "ssd".parse().unwrap());
assert_eq!(DatabaseCompactionProfile::HDD, "hdd".parse().unwrap());
}

#[test]
fn test_mode_default() {
assert_eq!(Mode::default(), Mode::Active);
}
}
6 changes: 0 additions & 6 deletions ethcore/src/verification/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ pub enum VerifierType {
Noop,
}

impl Default for VerifierType {
fn default() -> Self {
VerifierType::Canon
}
}

/// Create a new verifier based on type.
pub fn new<C: BlockInfo + CallContract>(v: VerifierType) -> Box<Verifier<C>> {
match v {
Expand Down
33 changes: 17 additions & 16 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,20 +662,29 @@ impl ChainSync {
None
}
).collect();
let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)|
self.active_peers.contains(&peer_id)
).map(|v| *v).collect();

random::new().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));
trace!(
target: "sync",
"Syncing with peers: {} active, {} confirmed, {} total",
self.active_peers.len(), confirmed_peers.len(), self.peers.len()
);
for (peer_id, _) in peers {
self.sync_peer(io, peer_id, false);

if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
} else if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
} else {
let mut peers: Vec<(PeerId, u8)> = confirmed_peers.iter().filter(|&&(peer_id, _)|
self.active_peers.contains(&peer_id)
).map(|v| *v).collect();

random::new().shuffle(&mut peers); //TODO: sort by rating
// prefer peers with higher protocol version
peers.sort_by(|&(_, ref v1), &(_, ref v2)| v1.cmp(v2));

for (peer_id, _) in peers {
self.sync_peer(io, peer_id, false);
}
}

if
Expand Down Expand Up @@ -710,14 +719,6 @@ impl ChainSync {
trace!(target: "sync", "Skipping busy peer {}", peer_id);
return;
}
if self.state == SyncState::Waiting {
trace!(target: "sync", "Waiting for the block queue");
return;
}
if self.state == SyncState::SnapshotWaiting {
trace!(target: "sync", "Waiting for the snapshot restoration");
return;
}
(peer.latest_hash.clone(), peer.difficulty.clone(), peer.snapshot_number.as_ref().cloned().unwrap_or(0), peer.snapshot_hash.as_ref().cloned())
} else {
return;
Expand Down
82 changes: 73 additions & 9 deletions miner/src/pool/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::{cmp, fmt};
use std::sync::Arc;
use std::sync::atomic::{self, AtomicUsize};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};

use ethereum_types::{H256, U256, Address};
use parking_lot::RwLock;
Expand Down Expand Up @@ -138,6 +138,50 @@ impl CachedPending {
}
}

#[derive(Debug)]
struct RecentlyRejected {
inner: RwLock<HashMap<H256, transaction::Error>>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about why you're storing the error here as opposed to using a HashSet. Do you plan to use the error count stats to refine the logic further down?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error is stored here, so that we can return it later (we could return generic RecentlyRejected, but It's more descriptive to have the actual values), remember that Error can look like this: InsufficientGasPrice { got, minimal }, so for every transaction the values might be completely different.

I'm planning to refine the logic a bit as well, especially with regard to cache invalidation (like: "Clear all InsufficientBlanace errors efficiently"), and was considering approaches like Vec<(Error, HashSet)> or Vec<ErrorCode, HashMap<Hash, Error>>, but haven't decided yet, what path to follow. Will run some performance tests, cause maybe HashMap<Hash, Error> and iterating over all entries (or using retain) will be efficient enough.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation! :)

limit: usize,
}

impl RecentlyRejected {
fn new(limit: usize) -> Self {
RecentlyRejected {
limit,
inner: RwLock::new(HashMap::with_capacity(MIN_REJECTED_CACHE_SIZE)),
}
}

fn clear(&self) {
self.inner.write().clear();
}

fn get(&self, hash: &H256) -> Option<transaction::Error> {
self.inner.read().get(hash).cloned()
}

fn insert(&self, hash: H256, err: &transaction::Error) {
if self.inner.read().contains_key(&hash) {
return;
}

let mut inner = self.inner.write();
inner.insert(hash, err.clone());

// clean up
if inner.len() > self.limit {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we never go above the limit the cache entries are never expired, is that ok?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache is completely cleared on Pool::cull, which in turn should be run on every block.

// randomly remove half of the entries
let to_remove: Vec<_> = inner.keys().take(self.limit / 2).cloned().collect();
for key in to_remove {
inner.remove(&key);
}
}
}
}

/// Minimal size of rejection cache, by default it's equal to queue size.
const MIN_REJECTED_CACHE_SIZE: usize = 2048;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use this to instantiate the hashmap with_capacity()? Not sure how hot this cache is expected to be.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, the cache is used quite extensively, especially when you are running with a small transaction pool.


/// Ethereum Transaction Queue
///
/// Responsible for:
Expand All @@ -150,6 +194,7 @@ pub struct TransactionQueue {
pool: RwLock<Pool>,
options: RwLock<verifier::Options>,
cached_pending: RwLock<CachedPending>,
recently_rejected: RecentlyRejected,
}

impl TransactionQueue {
Expand All @@ -159,11 +204,13 @@ impl TransactionQueue {
verification_options: verifier::Options,
strategy: PrioritizationStrategy,
) -> Self {
let max_count = limits.max_count;
TransactionQueue {
insertion_id: Default::default(),
pool: RwLock::new(txpool::Pool::new(Default::default(), scoring::NonceAndGasPrice(strategy), limits)),
options: RwLock::new(verification_options),
cached_pending: RwLock::new(CachedPending::none()),
recently_rejected: RecentlyRejected::new(cmp::max(MIN_REJECTED_CACHE_SIZE, max_count / 4)),
}
}

Expand Down Expand Up @@ -195,26 +242,42 @@ impl TransactionQueue {
None
}
};

let verifier = verifier::Verifier::new(
client,
options,
self.insertion_id.clone(),
transaction_to_replace,
);

let results = transactions
.into_iter()
.map(|transaction| {
if self.pool.read().find(&transaction.hash()).is_some() {
bail!(transaction::Error::AlreadyImported)
let hash = transaction.hash();

if self.pool.read().find(&hash).is_some() {
return Err(transaction::Error::AlreadyImported);
}

verifier.verify_transaction(transaction)
if let Some(err) = self.recently_rejected.get(&hash) {
trace!(target: "txqueue", "[{:?}] Rejecting recently rejected: {:?}", hash, err);
return Err(err);
}

let imported = verifier
.verify_transaction(transaction)
.and_then(|verified| {
self.pool.write().import(verified).map_err(convert_error)
});

match imported {
Ok(_) => Ok(()),
Err(err) => {
self.recently_rejected.insert(hash, &err);
Err(err)
},
}
})
.map(|result| result.and_then(|verified| {
self.pool.write().import(verified)
.map(|_imported| ())
.map_err(convert_error)
}))
.collect::<Vec<_>>();

// Notify about imported transactions.
Expand Down Expand Up @@ -342,6 +405,7 @@ impl TransactionQueue {

let state_readiness = ready::State::new(client, stale_id, nonce_cap);

self.recently_rejected.clear();
let removed = self.pool.write().cull(None, state_readiness);
debug!(target: "txqueue", "Removed {} stalled transactions. {}", removed, self.status());
}
Expand Down
41 changes: 41 additions & 0 deletions miner/src/pool/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,47 @@ fn should_avoid_verifying_transaction_already_in_pool() {
}

#[test]
fn should_avoid_reverifying_recently_rejected_transactions() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 1,
max_per_sender: 2,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
);

let client = TestClient::new();
let tx1 = Tx::gas_price(10_000).signed().unverified();

let res = txq.import(client.clone(), vec![tx1.clone()]);
assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance {
balance: 0xf67c.into(),
cost: 0xc8458e4.into(),
})]);
assert_eq!(txq.status().status.transaction_count, 0);
assert!(client.was_verification_triggered());

// when
let client = TestClient::new();
let res = txq.import(client.clone(), vec![tx1]);
assert_eq!(res, vec![Err(transaction::Error::InsufficientBalance {
balance: 0xf67c.into(),
cost: 0xc8458e4.into(),
})]);
assert!(!client.was_verification_triggered());

// then
assert_eq!(txq.status().status.transaction_count, 0);
}


fn should_reject_early_in_case_gas_price_is_less_than_min_effective() {
// given
let txq = TransactionQueue::new(
Expand Down
2 changes: 1 addition & 1 deletion parity/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ fn execute_import(cmd: ImportBlockchain) -> Result<(), String> {
algorithm,
cmd.pruning_history,
cmd.pruning_memory,
cmd.check_seal
cmd.check_seal,
);

client_config.queue.verifier_settings = cmd.verifier_settings;
Expand Down
2 changes: 2 additions & 0 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
// fetch service
let fetch = fetch::Client::new().map_err(|e| format!("Error starting fetch client: {:?}", e))?;

let txpool_size = cmd.miner_options.pool_limits.max_count;
// create miner
let miner = Arc::new(Miner::new(
cmd.miner_options,
Expand Down Expand Up @@ -574,6 +575,7 @@ fn execute_impl<Cr, Rr>(cmd: RunCmd, logger: Arc<RotatingLogger>, on_client_rq:
);

client_config.queue.verifier_settings = cmd.verifier_settings;
client_config.transaction_verification_queue_size = ::std::cmp::max(2048, txpool_size / 4);

// set up bootnodes
let mut net_conf = cmd.net_conf;
Expand Down
2 changes: 1 addition & 1 deletion parity/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl SnapshotCommand {
algorithm,
self.pruning_history,
self.pruning_memory,
true
true,
);

let restoration_db_handler = db::restoration_db_handler(&client_path, &client_config);
Expand Down
2 changes: 1 addition & 1 deletion parity/user_defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Default for UserDefaults {
fn default() -> Self {
UserDefaults {
is_first_launch: true,
pruning: Algorithm::default(),
pruning: Algorithm::OverlayRecent,
tracing: false,
fat_db: false,
mode: Mode::Active,
Expand Down
Loading