Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bucket_transactions is now fee_to_weight aware #2758

Merged
merged 3 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions core/src/core/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,7 @@ pub enum Weighting {
AsTransaction,
/// Tx representing a tx with artificially limited max_weight.
/// This is used when selecting mineable txs from the pool.
AsLimitedTransaction {
/// The maximum (block) weight that we will allow.
max_weight: usize,
},
AsLimitedTransaction(usize),
/// Tx represents a block (max block weight).
AsBlock,
/// No max weight limit (skip the weight check).
Expand Down Expand Up @@ -628,7 +625,7 @@ impl TransactionBody {
//
let max_weight = match weighting {
Weighting::AsTransaction => global::max_block_weight().saturating_sub(coinbase_weight),
Weighting::AsLimitedTransaction { max_weight } => {
Weighting::AsLimitedTransaction(max_weight) => {
min(global::max_block_weight(), max_weight).saturating_sub(coinbase_weight)
}
Weighting::AsBlock => global::max_block_weight(),
Expand Down Expand Up @@ -964,6 +961,12 @@ impl Transaction {
Ok(())
}

/// Can be used to compare txs by their fee/weight ratio.
/// Don't use these values for anything else though due to precision multiplier.
pub fn fee_to_weight(&self) -> u64 {
self.fee() * 1_000 / self.tx_weight() as u64
}

/// Calculate transaction weight
pub fn tx_weight(&self) -> usize {
self.body.body_weight()
Expand Down
115 changes: 77 additions & 38 deletions pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use self::util::RwLock;
use crate::types::{BlockChain, PoolEntry, PoolError};
use grin_core as core;
use grin_util as util;
use std::cmp::Reverse;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

Expand Down Expand Up @@ -116,35 +117,27 @@ impl Pool {

/// Take pool transactions, filtering and ordering them in a way that's
/// appropriate to put in a mined block. Aggregates chains of dependent
/// transactions, orders by fee over weight and ensures to total weight
/// doesn't exceed block limits.
/// transactions, orders by fee over weight and ensures the total weight
/// does not exceed the provided max_weight (miner defined block weight).
pub fn prepare_mineable_transactions(
&self,
max_weight: usize,
) -> Result<Vec<Transaction>, PoolError> {
let header = self.blockchain.chain_head()?;
let mut tx_buckets = self.bucket_transactions(max_weight);

// At this point we know that all "buckets" are valid and that
// there are no dependencies between them.
// This allows us to arbitrarily sort them and filter them safely.
let weighting = Weighting::AsLimitedTransaction(max_weight);

// Sort them by fees over weight, multiplying by 1000 to keep some precision
// don't think we'll ever see a >max_u64/1000 fee transaction.
// We want to select the txs with highest fee per unit of weight first.
tx_buckets.sort_unstable_by_key(|tx| tx.fee() * 1000 / tx.tx_weight() as u64);
// Sort the txs in the pool via the "bucket" logic to -
// * maintain dependency ordering
// * maximize cut-through
// * maximize overall fees
let txs = self.bucket_transactions(weighting);

// Iteratively apply the txs to the current chain state,
// rejecting any that do not result in a valid state.
// Verify these txs produce an aggregated tx below max tx weight.
// Verify these txs produce an aggregated tx below max_weight.
// Return a vec of all the valid txs.
let txs = self.validate_raw_txs(
&tx_buckets,
None,
&header,
Weighting::AsLimitedTransaction { max_weight },
)?;
Ok(txs)
let header = self.blockchain.chain_head()?;
let valid_txs = self.validate_raw_txs(&txs, None, &header, weighting)?;
Ok(valid_txs)
}

pub fn all_transactions(&self) -> Vec<Transaction> {
Expand Down Expand Up @@ -310,11 +303,15 @@ impl Pool {
Ok(())
}

// Group dependent transactions in buckets (aggregated txs).
// Each bucket is independent from the others. Relies on the entries
// vector having parent transactions first (should always be the case).
fn bucket_transactions(&self, max_weight: usize) -> Vec<Transaction> {
let mut tx_buckets = vec![];
/// Buckets consist of a vec of txs and track the aggregate fee_to_weight.
/// We aggregate (cut-through) dependent transactions within a bucket *unless* adding a tx
/// would reduce the aggregate fee_to_weight, in which case we start a new bucket.
/// Note this new bucket will by definition have a lower fee_to_weight than the bucket
/// containing the tx it depends on.
/// Sorting the buckets by fee_to_weight will therefore preserve dependency ordering,
/// maximizing both cut-through and overall fees.
fn bucket_transactions(&self, weighting: Weighting) -> Vec<Transaction> {
let mut tx_buckets: Vec<Bucket> = Vec::new();
let mut output_commits = HashMap::new();
let mut rejected = HashSet::new();

Expand Down Expand Up @@ -358,25 +355,27 @@ impl Pool {
// This is the common case for non 0-conf txs in the txpool.
// We assume the tx is valid here as we validated it on the way into the txpool.
insert_pos = Some(tx_buckets.len());
tx_buckets.push(entry.tx.clone());
tx_buckets.push(Bucket::new(entry.tx.clone()));
}
Some(pos) => {
// We found a single parent tx, so aggregate in the bucket
// if the aggregate tx is a valid tx.
// Otherwise discard and let the next block pick this tx up.
let current = tx_buckets[pos].clone();
if let Ok(agg_tx) = transaction::aggregate(vec![current, entry.tx.clone()]) {
if agg_tx
.validate(
Weighting::AsLimitedTransaction { max_weight },
self.verifier_cache.clone(),
)
.is_ok()
{
tx_buckets[pos] = agg_tx;
let bucket = &tx_buckets[pos];

if let Ok(new_bucket) = bucket.aggregate_with_tx(
entry.tx.clone(),
weighting,
self.verifier_cache.clone(),
) {
if new_bucket.fee_to_weight >= bucket.fee_to_weight {
// Only aggregate if it would not reduce the fee_to_weight ratio.
tx_buckets[pos] = new_bucket;
} else {
// Aggregated tx is not valid so discard this new tx.
is_rejected = true;
// Otherwise put it in its own bucket at the end.
// Note: This bucket will have a lower fee_to_weight
// than the bucket it depends on.
tx_buckets.push(Bucket::new(entry.tx.clone()));
}
} else {
// Aggregation failed so discard this new tx.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity out frequently this happens and how?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The aggregation failure?

Basically the only way this would fail is if the aggregated tx ended up being too big to fit in a block. Say we attempted two huge transactions that in aggregate was larger than our block weight limit. This should happen only rarely.

The only other way this could happen is if the pool got itself into a bad state somehow with a double spend or something similar in there.
This should never happen in practice as we check for consistency at various stages of the tx pool lifecycle.

Expand All @@ -397,7 +396,17 @@ impl Pool {
}
}
}

// Sort them by fee_to_weight (descending).
// Txs with no dependencies will be toward the start of the vec.
// Txs with a big chain of dependencies will be toward the end of the vec.
tx_buckets.sort_unstable_by_key(|x| Reverse(x.fee_to_weight));

tx_buckets
.into_iter()
.map(|x| x.raw_txs)
.flatten()
.collect()
}

pub fn find_matching_transactions(&self, kernels: &[TxKernel]) -> Vec<Transaction> {
Expand Down Expand Up @@ -441,3 +450,33 @@ impl Pool {
self.entries.is_empty()
}
}

struct Bucket {
raw_txs: Vec<Transaction>,
fee_to_weight: u64,
}

impl Bucket {
fn new(tx: Transaction) -> Bucket {
Bucket {
fee_to_weight: tx.fee_to_weight(),
raw_txs: vec![tx.clone()],
}
}

fn aggregate_with_tx(
&self,
new_tx: Transaction,
weighting: Weighting,
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
) -> Result<Bucket, PoolError> {
let mut raw_txs = self.raw_txs.clone();
raw_txs.push(new_tx);
let agg_tx = transaction::aggregate(raw_txs.clone())?;
agg_tx.validate(weighting, verifier_cache)?;
Ok(Bucket {
fee_to_weight: agg_tx.fee_to_weight(),
raw_txs: raw_txs,
})
}
}
24 changes: 15 additions & 9 deletions pool/tests/block_building.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ fn test_transaction_pool_block_building() {

// Add the three root txs to the pool.
write_pool
.add_to_pool(test_source(), root_tx_1, false, &header)
.add_to_pool(test_source(), root_tx_1.clone(), false, &header)
.unwrap();
write_pool
.add_to_pool(test_source(), root_tx_2, false, &header)
.add_to_pool(test_source(), root_tx_2.clone(), false, &header)
.unwrap();
write_pool
.add_to_pool(test_source(), root_tx_3, false, &header)
.add_to_pool(test_source(), root_tx_3.clone(), false, &header)
.unwrap();

// Now add the two child txs to the pool.
Expand All @@ -104,15 +104,21 @@ fn test_transaction_pool_block_building() {
assert_eq!(write_pool.total_size(), 5);
}

let txs = {
let read_pool = pool.read();
read_pool.prepare_mineable_transactions().unwrap()
};
// children should have been aggregated into parents
assert_eq!(txs.len(), 3);
let txs = pool.read().prepare_mineable_transactions().unwrap();

let block = add_block(header, txs, &mut chain);

// Check the block contains what we expect.
assert_eq!(block.inputs().len(), 4);
assert_eq!(block.outputs().len(), 4);
assert_eq!(block.kernels().len(), 6);

assert!(block.kernels().contains(&root_tx_1.kernels()[0]));
assert!(block.kernels().contains(&root_tx_2.kernels()[0]));
assert!(block.kernels().contains(&root_tx_3.kernels()[0]));
assert!(block.kernels().contains(&child_tx_1.kernels()[0]));
assert!(block.kernels().contains(&child_tx_1.kernels()[0]));

// Now reconcile the transaction pool with the new block
// and check the resulting contents of the pool are what we expect.
{
Expand Down
48 changes: 27 additions & 21 deletions pool/tests/block_max_weight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,25 @@ fn test_block_building_max_weight() {
test_transaction(&keychain, vec![290], vec![280, 4]),
];

// Fees and weights of our original txs in insert order.
assert_eq!(
txs.iter().map(|x| x.fee()).collect::<Vec<_>>(),
[9, 8, 1, 7, 6]
);
assert_eq!(
txs.iter().map(|x| x.tx_weight()).collect::<Vec<_>>(),
[8, 8, 4, 8, 8]
);
assert_eq!(
txs.iter().map(|x| x.fee_to_weight()).collect::<Vec<_>>(),
[1125, 1000, 250, 875, 750]
);

// Populate our txpool with the txs.
{
let mut write_pool = pool.write();
for tx in txs {
println!("***** {}", tx.fee_to_weight());
write_pool
.add_to_pool(test_source(), tx, false, &header)
.unwrap();
Expand All @@ -101,35 +116,26 @@ fn test_block_building_max_weight() {
// Check we added them all to the txpool successfully.
assert_eq!(pool.read().total_size(), 5);

// Prepare some "mineable txs" from the txpool.
// Prepare some "mineable" txs from the txpool.
// Note: We cannot fit all the txs from the txpool into a block.
let txs = pool.read().prepare_mineable_transactions().unwrap();

// Check resulting tx aggregation is what we expect.
// We expect to produce 2 aggregated txs based on txpool contents.
assert_eq!(txs.len(), 2);

// Check the tx we built is the aggregation of the correct set of underlying txs.
// We included 4 out of the 5 txs here.
assert_eq!(txs[0].kernels().len(), 1);
assert_eq!(txs[1].kernels().len(), 2);

// Check our weights after aggregation.
assert_eq!(txs[0].inputs().len(), 1);
assert_eq!(txs[0].outputs().len(), 1);
assert_eq!(txs[0].kernels().len(), 1);
assert_eq!(txs[0].tx_weight_as_block(), 25);

assert_eq!(txs[1].inputs().len(), 1);
assert_eq!(txs[1].outputs().len(), 3);
assert_eq!(txs[1].kernels().len(), 2);
assert_eq!(txs[1].tx_weight_as_block(), 70);
// Fees and weights of the "mineable" txs.
assert_eq!(txs.iter().map(|x| x.fee()).collect::<Vec<_>>(), [9, 8, 7]);
assert_eq!(
txs.iter().map(|x| x.tx_weight()).collect::<Vec<_>>(),
[8, 8, 8]
);
assert_eq!(
txs.iter().map(|x| x.fee_to_weight()).collect::<Vec<_>>(),
[1125, 1000, 875]
);

let block = add_block(header, txs, &mut chain);

// Check contents of the block itself (including coinbase reward).
assert_eq!(block.inputs().len(), 2);
assert_eq!(block.outputs().len(), 5);
assert_eq!(block.outputs().len(), 6);
assert_eq!(block.kernels().len(), 4);

// Now reconcile the transaction pool with the new block
Expand Down