Skip to content
2 changes: 2 additions & 0 deletions transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ mod listener;
mod options;
mod pool;
mod ready;
mod replace;
mod status;
mod transactions;
mod verifier;
Expand All @@ -96,6 +97,7 @@ pub use self::listener::{Listener, NoopListener};
pub use self::options::Options;
pub use self::pool::{Pool, PendingIterator, UnorderedIterator, Transaction};
pub use self::ready::{Ready, Readiness};
pub use self::replace::{ShouldReplace, ReplaceTransaction};
pub use self::scoring::Scoring;
pub use self::status::{LightStatus, Status};
pub use self::verifier::Verifier;
Expand Down
41 changes: 27 additions & 14 deletions transaction-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use error;
use listener::{Listener, NoopListener};
use options::Options;
use ready::{Ready, Readiness};
use replace::{ShouldReplace, ReplaceTransaction};
use scoring::{self, Scoring, ScoreWithRef};
use status::{LightStatus, Status};
use transactions::{AddResult, Transactions};
Expand Down Expand Up @@ -130,10 +131,12 @@ impl<T, S, L> Pool<T, S, L> where
/// NOTE: The transaction may push out some other transactions from the pool
/// either because of limits (see `Options`) or because `Scoring` decides that the transaction
/// replaces an existing transaction from that sender.
/// If any limit is reached the transaction with the lowest `Score` is evicted to make room.
///
/// If any limit is reached the transaction with the lowest `Score` will be compared with the
/// new transaction via the supplied `ShouldReplace` implementation and may be evicted.
///
/// The `Listener` will be informed on any drops or rejections.
pub fn import(&mut self, transaction: T) -> error::Result<Arc<T>, T::Hash> {
pub fn import(&mut self, transaction: T, replace: &mut ShouldReplace<T>) -> error::Result<Arc<T>, T::Hash> {
let mem_usage = transaction.mem_usage();

if self.by_hash.contains_key(transaction.hash()) {
Expand All @@ -149,8 +152,8 @@ impl<T, S, L> Pool<T, S, L> where
// TODO [ToDr] Most likely move this after the transaction is inserted.
// Avoid using should_replace, but rather use scoring for that.
{
let remove_worst = |s: &mut Self, transaction| {
match s.remove_worst(transaction) {
let mut remove_worst = |s: &mut Self, transaction| {
match s.remove_worst(transaction, replace) {
Err(err) => {
s.listener.rejected(transaction, &err);
Err(err)
Expand Down Expand Up @@ -283,22 +286,32 @@ impl<T, S, L> Pool<T, S, L> where
///
/// Returns `None` in case we couldn't decide if the transaction should replace the worst transaction or not.
/// In such case we will accept the transaction even though it is going to exceed the limit.
fn remove_worst(&mut self, transaction: &Transaction<T>) -> error::Result<Option<Transaction<T>>, T::Hash> {
fn remove_worst(&mut self, transaction: &Transaction<T>, replace: &mut ShouldReplace<T>) -> error::Result<Option<Transaction<T>>, T::Hash> {
let to_remove = match self.worst_transactions.iter().next_back() {
// No elements to remove? and the pool is still full?
None => {
warn!("The pool is full but there are no transactions to remove.");
return Err(error::Error::TooCheapToEnter(transaction.hash().clone(), "unknown".into()))
},
Some(old) => match self.scoring.should_replace(&old.transaction, transaction) {
// We can't decide which of them should be removed, so accept both.
scoring::Choice::InsertNew => None,
// New transaction is better than the worst one so we can replace it.
scoring::Choice::ReplaceOld => Some(old.clone()),
// otherwise fail
scoring::Choice::RejectNew => {
return Err(error::Error::TooCheapToEnter(transaction.hash().clone(), format!("{:#x}", old.score)))
},
Some(old) => {
let txs = &self.transactions;
let get_replace_tx = |tx: &Transaction<T>| {
let sender_txs = txs.get(transaction.sender()).map(|txs| txs.iter().as_slice());
ReplaceTransaction::new(tx.clone(), sender_txs)
};
let old_replace = get_replace_tx(&old.transaction);
let new_replace = get_replace_tx(transaction);
Comment thread
ascjones marked this conversation as resolved.
Outdated

match replace.should_replace(&old_replace, &new_replace) {
// We can't decide which of them should be removed, so accept both.
scoring::Choice::InsertNew => None,
// New transaction is better than the worst one so we can replace it.
scoring::Choice::ReplaceOld => Some(old.clone()),
// otherwise fail
scoring::Choice::RejectNew => {
return Err(error::Error::TooCheapToEnter(transaction.hash().clone(), format!("{:#x}", old.score)))
},
}
},
};

Expand Down
53 changes: 53 additions & 0 deletions transaction-pool/src/replace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2015-2018 Parity Technologies (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

//! When queue limits are reached, decide whether to replace an existing transaction from the pool

use pool::Transaction;
use scoring::Choice;

/// Encapsulates a transaction to be compared, along with pooled transactions from the same sender
pub struct ReplaceTransaction<'a, T> {
/// The transaction to be compared for replacement
pub transaction: Transaction<T>,
Comment thread
ascjones marked this conversation as resolved.
Outdated
/// Other transactions currently in the pool for the same sender
pub pooled_by_sender: Option<&'a [Transaction<T>]>,
}

impl<'a, T> ReplaceTransaction<'a, T> {
/// Creates a new `ReplaceTransaction`
pub fn new(transaction: Transaction<T>, pooled_by_sender: Option<&'a [Transaction<T>]>) -> Self {
ReplaceTransaction {
transaction,
pooled_by_sender,
}
}
}

impl<'a, T> ::std::ops::Deref for ReplaceTransaction<'a, T> {
type Target = Transaction<T>;
fn deref(&self) -> &Self::Target {
&self.transaction
}
}

/// Chooses whether a new transaction should replace an existing transaction if the pool is full.
pub trait ShouldReplace<T> {
/// Decides if `new` should push out `old` transaction from the pool.
///
/// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits.
fn should_replace(&mut self, old: &ReplaceTransaction<T>, new: &ReplaceTransaction<T>) -> Choice;
}
7 changes: 0 additions & 7 deletions transaction-pool/src/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,12 @@ pub enum Change<T = ()> {
/// - Returned `Score`s should match ordering of `compare` method.
/// - `compare` will be called only within a context of transactions from the same sender.
/// - `choose` may be called even if `compare` returns `Ordering::Equal`
/// - `should_replace` is used to decide if new transaction should push out an old transaction already in the queue.
/// - `Score`s and `compare` should align with `Ready` implementation.
///
/// Example: Natural ordering of Ethereum transactions.
/// - `compare`: compares transaction `nonce` ()
/// - `choose`: compares transactions `gasPrice` (decides if old transaction should be replaced)
/// - `update_scores`: score defined as `gasPrice` if `n==0` and `max(scores[n-1], gasPrice)` if `n>0`
/// - `should_replace`: compares `gasPrice` (decides if transaction from a different sender is more valuable)
///
pub trait Scoring<T>: fmt::Debug {
/// A score of a transaction.
Expand All @@ -98,11 +96,6 @@ pub trait Scoring<T>: fmt::Debug {
/// (i.e. score at index `i` represents transaction at the same index)
fn update_scores(&self, txs: &[Transaction<T>], scores: &mut [Self::Score], change: Change<Self::Event>);

/// Decides if `new` should push out `old` transaction from the pool.
///
/// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits.
fn should_replace(&self, old: &T, new: &T) -> Choice;

/// Decides if the transaction should ignore per-sender limit in the pool.
///
/// If you return `true` for given transaction it's going to be accepted even though
Expand Down
14 changes: 8 additions & 6 deletions transaction-pool/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::cmp;
use std::collections::HashMap;

use ethereum_types::{H160 as Sender, U256};
use {pool, scoring, Scoring, Ready, Readiness};
use {pool, scoring, Scoring, ShouldReplace, ReplaceTransaction, Ready, Readiness};
use super::Transaction;

#[derive(Debug, Default)]
Expand Down Expand Up @@ -68,7 +68,13 @@ impl Scoring<Transaction> for DummyScoring {
}
}

fn should_replace(&self, old: &Transaction, new: &Transaction) -> scoring::Choice {
fn should_ignore_sender_limit(&self, _new: &Transaction) -> bool {
self.always_insert
}
}

impl ShouldReplace<Transaction> for DummyScoring {
fn should_replace(&mut self, old: &ReplaceTransaction<Transaction>, new: &ReplaceTransaction<Transaction>) -> scoring::Choice {
if self.always_insert {
scoring::Choice::InsertNew
} else if new.gas_price > old.gas_price {
Expand All @@ -77,10 +83,6 @@ impl Scoring<Transaction> for DummyScoring {
scoring::Choice::RejectNew
}
}

fn should_ignore_sender_limit(&self, _new: &Transaction) -> bool {
self.always_insert
}
}

#[derive(Default)]
Expand Down
Loading