Skip to content

Commit

Permalink
bucket_transactions is now fee_to_weight aware (#2758)
Browse files Browse the repository at this point in the history
* bucket_transactions is now fee_to_weight aware
bucket_transactions now returns the underlying txs

* cleanup

* simplify pool bucket sorting, no need for depth sort key
  • Loading branch information
antiochp authored Apr 30, 2019
1 parent 3d0f9bd commit b2b96f3
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 73 deletions.
13 changes: 8 additions & 5 deletions core/src/core/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,7 @@ pub enum Weighting {
AsTransaction,
/// Tx representing a tx with artificially limited max_weight.
/// This is used when selecting mineable txs from the pool.
AsLimitedTransaction {
/// The maximum (block) weight that we will allow.
max_weight: usize,
},
AsLimitedTransaction(usize),
/// Tx represents a block (max block weight).
AsBlock,
/// No max weight limit (skip the weight check).
Expand Down Expand Up @@ -628,7 +625,7 @@ impl TransactionBody {
//
let max_weight = match weighting {
Weighting::AsTransaction => global::max_block_weight().saturating_sub(coinbase_weight),
Weighting::AsLimitedTransaction { max_weight } => {
Weighting::AsLimitedTransaction(max_weight) => {
min(global::max_block_weight(), max_weight).saturating_sub(coinbase_weight)
}
Weighting::AsBlock => global::max_block_weight(),
Expand Down Expand Up @@ -964,6 +961,12 @@ impl Transaction {
Ok(())
}

/// Can be used to compare txs by their fee/weight ratio.
/// Don't use these values for anything else though due to precision multiplier.
pub fn fee_to_weight(&self) -> u64 {
self.fee() * 1_000 / self.tx_weight() as u64
}

/// Calculate transaction weight
pub fn tx_weight(&self) -> usize {
self.body.body_weight()
Expand Down
115 changes: 77 additions & 38 deletions pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use self::util::RwLock;
use crate::types::{BlockChain, PoolEntry, PoolError};
use grin_core as core;
use grin_util as util;
use std::cmp::Reverse;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

Expand Down Expand Up @@ -116,35 +117,27 @@ impl Pool {

/// Take pool transactions, filtering and ordering them in a way that's
/// appropriate to put in a mined block. Aggregates chains of dependent
/// transactions, orders by fee over weight and ensures to total weight
/// doesn't exceed block limits.
/// transactions, orders by fee over weight and ensures the total weight
/// does not exceed the provided max_weight (miner defined block weight).
pub fn prepare_mineable_transactions(
&self,
max_weight: usize,
) -> Result<Vec<Transaction>, PoolError> {
let header = self.blockchain.chain_head()?;
let mut tx_buckets = self.bucket_transactions(max_weight);

// At this point we know that all "buckets" are valid and that
// there are no dependencies between them.
// This allows us to arbitrarily sort them and filter them safely.
let weighting = Weighting::AsLimitedTransaction(max_weight);

// Sort them by fees over weight, multiplying by 1000 to keep some precision
// don't think we'll ever see a >max_u64/1000 fee transaction.
// We want to select the txs with highest fee per unit of weight first.
tx_buckets.sort_unstable_by_key(|tx| tx.fee() * 1000 / tx.tx_weight() as u64);
// Sort the txs in the pool via the "bucket" logic to -
// * maintain dependency ordering
// * maximize cut-through
// * maximize overall fees
let txs = self.bucket_transactions(weighting);

// Iteratively apply the txs to the current chain state,
// rejecting any that do not result in a valid state.
// Verify these txs produce an aggregated tx below max tx weight.
// Verify these txs produce an aggregated tx below max_weight.
// Return a vec of all the valid txs.
let txs = self.validate_raw_txs(
&tx_buckets,
None,
&header,
Weighting::AsLimitedTransaction { max_weight },
)?;
Ok(txs)
let header = self.blockchain.chain_head()?;
let valid_txs = self.validate_raw_txs(&txs, None, &header, weighting)?;
Ok(valid_txs)
}

pub fn all_transactions(&self) -> Vec<Transaction> {
Expand Down Expand Up @@ -310,11 +303,15 @@ impl Pool {
Ok(())
}

// Group dependent transactions in buckets (aggregated txs).
// Each bucket is independent from the others. Relies on the entries
// vector having parent transactions first (should always be the case).
fn bucket_transactions(&self, max_weight: usize) -> Vec<Transaction> {
let mut tx_buckets = vec![];
/// Buckets consist of a vec of txs and track the aggregate fee_to_weight.
/// We aggregate (cut-through) dependent transactions within a bucket *unless* adding a tx
/// would reduce the aggregate fee_to_weight, in which case we start a new bucket.
/// Note this new bucket will by definition have a lower fee_to_weight than the bucket
/// containing the tx it depends on.
/// Sorting the buckets by fee_to_weight will therefore preserve dependency ordering,
/// maximizing both cut-through and overall fees.
fn bucket_transactions(&self, weighting: Weighting) -> Vec<Transaction> {
let mut tx_buckets: Vec<Bucket> = Vec::new();
let mut output_commits = HashMap::new();
let mut rejected = HashSet::new();

Expand Down Expand Up @@ -358,25 +355,27 @@ impl Pool {
// This is the common case for non 0-conf txs in the txpool.
// We assume the tx is valid here as we validated it on the way into the txpool.
insert_pos = Some(tx_buckets.len());
tx_buckets.push(entry.tx.clone());
tx_buckets.push(Bucket::new(entry.tx.clone()));
}
Some(pos) => {
// We found a single parent tx, so aggregate in the bucket
// if the aggregate tx is a valid tx.
// Otherwise discard and let the next block pick this tx up.
let current = tx_buckets[pos].clone();
if let Ok(agg_tx) = transaction::aggregate(vec![current, entry.tx.clone()]) {
if agg_tx
.validate(
Weighting::AsLimitedTransaction { max_weight },
self.verifier_cache.clone(),
)
.is_ok()
{
tx_buckets[pos] = agg_tx;
let bucket = &tx_buckets[pos];

if let Ok(new_bucket) = bucket.aggregate_with_tx(
entry.tx.clone(),
weighting,
self.verifier_cache.clone(),
) {
if new_bucket.fee_to_weight >= bucket.fee_to_weight {
// Only aggregate if it would not reduce the fee_to_weight ratio.
tx_buckets[pos] = new_bucket;
} else {
// Aggregated tx is not valid so discard this new tx.
is_rejected = true;
// Otherwise put it in its own bucket at the end.
// Note: This bucket will have a lower fee_to_weight
// than the bucket it depends on.
tx_buckets.push(Bucket::new(entry.tx.clone()));
}
} else {
// Aggregation failed so discard this new tx.
Expand All @@ -397,7 +396,17 @@ impl Pool {
}
}
}

// Sort them by fee_to_weight (descending).
// Txs with no dependencies will be toward the start of the vec.
// Txs with a big chain of dependencies will be toward the end of the vec.
tx_buckets.sort_unstable_by_key(|x| Reverse(x.fee_to_weight));

tx_buckets
.into_iter()
.map(|x| x.raw_txs)
.flatten()
.collect()
}

pub fn find_matching_transactions(&self, kernels: &[TxKernel]) -> Vec<Transaction> {
Expand Down Expand Up @@ -441,3 +450,33 @@ impl Pool {
self.entries.is_empty()
}
}

struct Bucket {
raw_txs: Vec<Transaction>,
fee_to_weight: u64,
}

impl Bucket {
fn new(tx: Transaction) -> Bucket {
Bucket {
fee_to_weight: tx.fee_to_weight(),
raw_txs: vec![tx.clone()],
}
}

fn aggregate_with_tx(
&self,
new_tx: Transaction,
weighting: Weighting,
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
) -> Result<Bucket, PoolError> {
let mut raw_txs = self.raw_txs.clone();
raw_txs.push(new_tx);
let agg_tx = transaction::aggregate(raw_txs.clone())?;
agg_tx.validate(weighting, verifier_cache)?;
Ok(Bucket {
fee_to_weight: agg_tx.fee_to_weight(),
raw_txs: raw_txs,
})
}
}
24 changes: 15 additions & 9 deletions pool/tests/block_building.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ fn test_transaction_pool_block_building() {

// Add the three root txs to the pool.
write_pool
.add_to_pool(test_source(), root_tx_1, false, &header)
.add_to_pool(test_source(), root_tx_1.clone(), false, &header)
.unwrap();
write_pool
.add_to_pool(test_source(), root_tx_2, false, &header)
.add_to_pool(test_source(), root_tx_2.clone(), false, &header)
.unwrap();
write_pool
.add_to_pool(test_source(), root_tx_3, false, &header)
.add_to_pool(test_source(), root_tx_3.clone(), false, &header)
.unwrap();

// Now add the two child txs to the pool.
Expand All @@ -104,15 +104,21 @@ fn test_transaction_pool_block_building() {
assert_eq!(write_pool.total_size(), 5);
}

let txs = {
let read_pool = pool.read();
read_pool.prepare_mineable_transactions().unwrap()
};
// children should have been aggregated into parents
assert_eq!(txs.len(), 3);
let txs = pool.read().prepare_mineable_transactions().unwrap();

let block = add_block(header, txs, &mut chain);

// Check the block contains what we expect.
assert_eq!(block.inputs().len(), 4);
assert_eq!(block.outputs().len(), 4);
assert_eq!(block.kernels().len(), 6);

assert!(block.kernels().contains(&root_tx_1.kernels()[0]));
assert!(block.kernels().contains(&root_tx_2.kernels()[0]));
assert!(block.kernels().contains(&root_tx_3.kernels()[0]));
assert!(block.kernels().contains(&child_tx_1.kernels()[0]));
assert!(block.kernels().contains(&child_tx_1.kernels()[0]));

// Now reconcile the transaction pool with the new block
// and check the resulting contents of the pool are what we expect.
{
Expand Down
48 changes: 27 additions & 21 deletions pool/tests/block_max_weight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,25 @@ fn test_block_building_max_weight() {
test_transaction(&keychain, vec![290], vec![280, 4]),
];

// Fees and weights of our original txs in insert order.
assert_eq!(
txs.iter().map(|x| x.fee()).collect::<Vec<_>>(),
[9, 8, 1, 7, 6]
);
assert_eq!(
txs.iter().map(|x| x.tx_weight()).collect::<Vec<_>>(),
[8, 8, 4, 8, 8]
);
assert_eq!(
txs.iter().map(|x| x.fee_to_weight()).collect::<Vec<_>>(),
[1125, 1000, 250, 875, 750]
);

// Populate our txpool with the txs.
{
let mut write_pool = pool.write();
for tx in txs {
println!("***** {}", tx.fee_to_weight());
write_pool
.add_to_pool(test_source(), tx, false, &header)
.unwrap();
Expand All @@ -101,35 +116,26 @@ fn test_block_building_max_weight() {
// Check we added them all to the txpool successfully.
assert_eq!(pool.read().total_size(), 5);

// Prepare some "mineable txs" from the txpool.
// Prepare some "mineable" txs from the txpool.
// Note: We cannot fit all the txs from the txpool into a block.
let txs = pool.read().prepare_mineable_transactions().unwrap();

// Check resulting tx aggregation is what we expect.
// We expect to produce 2 aggregated txs based on txpool contents.
assert_eq!(txs.len(), 2);

// Check the tx we built is the aggregation of the correct set of underlying txs.
// We included 4 out of the 5 txs here.
assert_eq!(txs[0].kernels().len(), 1);
assert_eq!(txs[1].kernels().len(), 2);

// Check our weights after aggregation.
assert_eq!(txs[0].inputs().len(), 1);
assert_eq!(txs[0].outputs().len(), 1);
assert_eq!(txs[0].kernels().len(), 1);
assert_eq!(txs[0].tx_weight_as_block(), 25);

assert_eq!(txs[1].inputs().len(), 1);
assert_eq!(txs[1].outputs().len(), 3);
assert_eq!(txs[1].kernels().len(), 2);
assert_eq!(txs[1].tx_weight_as_block(), 70);
// Fees and weights of the "mineable" txs.
assert_eq!(txs.iter().map(|x| x.fee()).collect::<Vec<_>>(), [9, 8, 7]);
assert_eq!(
txs.iter().map(|x| x.tx_weight()).collect::<Vec<_>>(),
[8, 8, 8]
);
assert_eq!(
txs.iter().map(|x| x.fee_to_weight()).collect::<Vec<_>>(),
[1125, 1000, 875]
);

let block = add_block(header, txs, &mut chain);

// Check contents of the block itself (including coinbase reward).
assert_eq!(block.inputs().len(), 2);
assert_eq!(block.outputs().len(), 5);
assert_eq!(block.outputs().len(), 6);
assert_eq!(block.kernels().len(), 4);

// Now reconcile the transaction pool with the new block
Expand Down

0 comments on commit b2b96f3

Please sign in to comment.