Skip to content

Commit

Permalink
bucket_transactions is now fee_to_weight aware
Browse files Browse the repository at this point in the history
bucket_transactions now returns the underlying txs
  • Loading branch information
antiochp committed Apr 17, 2019
1 parent b403ccb commit 26e522e
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 73 deletions.
13 changes: 8 additions & 5 deletions core/src/core/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,7 @@ pub enum Weighting {
AsTransaction,
/// Tx representing a tx with artificially limited max_weight.
/// This is used when selecting mineable txs from the pool.
AsLimitedTransaction {
/// The maximum (block) weight that we will allow.
max_weight: usize,
},
AsLimitedTransaction(usize),
/// Tx represents a block (max block weight).
AsBlock,
/// No max weight limit (skip the weight check).
Expand Down Expand Up @@ -628,7 +625,7 @@ impl TransactionBody {
//
let max_weight = match weighting {
Weighting::AsTransaction => global::max_block_weight().saturating_sub(coinbase_weight),
Weighting::AsLimitedTransaction { max_weight } => {
Weighting::AsLimitedTransaction(max_weight) => {
min(global::max_block_weight(), max_weight).saturating_sub(coinbase_weight)
}
Weighting::AsBlock => global::max_block_weight(),
Expand Down Expand Up @@ -964,6 +961,12 @@ impl Transaction {
Ok(())
}

/// Can be used to compare txs by their fee/weight ratio.
/// Don't use these values for anything else though due to precision multiplier.
pub fn fee_to_weight(&self) -> u64 {
self.fee() * 1_000 / self.tx_weight() as u64
}

/// Calculate transaction weight
pub fn tx_weight(&self) -> usize {
self.body.body_weight()
Expand Down
127 changes: 89 additions & 38 deletions pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,35 +116,27 @@ impl Pool {

/// Take pool transactions, filtering and ordering them in a way that's
/// appropriate to put in a mined block. Aggregates chains of dependent
/// transactions, orders by fee over weight and ensures to total weight
/// doesn't exceed block limits.
/// transactions, orders by fee over weight and ensures the total weight
/// does not exceed the provided max_weight (miner defined block weight).
pub fn prepare_mineable_transactions(
&self,
max_weight: usize,
) -> Result<Vec<Transaction>, PoolError> {
let header = self.blockchain.chain_head()?;
let mut tx_buckets = self.bucket_transactions(max_weight);

// At this point we know that all "buckets" are valid and that
// there are no dependencies between them.
// This allows us to arbitrarily sort them and filter them safely.
let weighting = Weighting::AsLimitedTransaction(max_weight);

// Sort them by fees over weight, multiplying by 1000 to keep some precision
// don't think we'll ever see a >max_u64/1000 fee transaction.
// We want to select the txs with highest fee per unit of weight first.
tx_buckets.sort_unstable_by_key(|tx| tx.fee() * 1000 / tx.tx_weight() as u64);
// Sort the txs in the pool via the "bucket" logic to -
// * maintain dependency ordering
// * maximize cut-through
// * maximize overall fees
let txs = self.bucket_transactions(weighting);

// Iteratively apply the txs to the current chain state,
// rejecting any that do not result in a valid state.
// Verify these txs produce an aggregated tx below max tx weight.
// Verify these txs produce an aggregated tx below max_weight.
// Return a vec of all the valid txs.
let txs = self.validate_raw_txs(
&tx_buckets,
None,
&header,
Weighting::AsLimitedTransaction { max_weight },
)?;
Ok(txs)
let header = self.blockchain.chain_head()?;
let valid_txs = self.validate_raw_txs(&txs, None, &header, weighting)?;
Ok(valid_txs)
}

pub fn all_transactions(&self) -> Vec<Transaction> {
Expand Down Expand Up @@ -310,11 +302,14 @@ impl Pool {
Ok(())
}

// Group dependent transactions in buckets (aggregated txs).
// Each bucket is independent from the others. Relies on the entries
// vector having parent transactions first (should always be the case).
fn bucket_transactions(&self, max_weight: usize) -> Vec<Transaction> {
let mut tx_buckets = vec![];
/// Buckets consist of a vec of txs and track the aggregated fee_to_weight and a "depth".
/// The depth is incremented if a bucket necessarily depends on an earlier bucket.
/// We aggregate (cut-through) dependent transactions within a bucket *unless* adding a tx
/// would reduce the aggregate fee_to_weight, in which case we start a new bucket (and increment the depth).
/// We can then sort the buckets by depth and fee_to_weight to preserve dependency ordering and maximize
/// both cut-through and overall fees.
fn bucket_transactions(&self, weighting: Weighting) -> Vec<Transaction> {
let mut tx_buckets: Vec<Bucket> = Vec::new();
let mut output_commits = HashMap::new();
let mut rejected = HashSet::new();

Expand Down Expand Up @@ -358,25 +353,27 @@ impl Pool {
// This is the common case for non 0-conf txs in the txpool.
// We assume the tx is valid here as we validated it on the way into the txpool.
insert_pos = Some(tx_buckets.len());
tx_buckets.push(entry.tx.clone());
tx_buckets.push(Bucket::new(entry.tx.clone()));
}
Some(pos) => {
// We found a single parent tx, so aggregate in the bucket
// if the aggregate tx is a valid tx.
// Otherwise discard and let the next block pick this tx up.
let current = tx_buckets[pos].clone();
if let Ok(agg_tx) = transaction::aggregate(vec![current, entry.tx.clone()]) {
if agg_tx
.validate(
Weighting::AsLimitedTransaction { max_weight },
self.verifier_cache.clone(),
)
.is_ok()
{
tx_buckets[pos] = agg_tx;
let bucket = &tx_buckets[pos];

if let Ok(new_bucket) = bucket.aggregate_with_tx(
entry.tx.clone(),
weighting,
self.verifier_cache.clone(),
) {
if new_bucket.fee_to_weight >= bucket.fee_to_weight {
// Only aggregate if it would not reduce the fee_to_weight ratio.
tx_buckets[pos] = new_bucket;
} else {
// Aggregated tx is not valid so discard this new tx.
is_rejected = true;
// Otherwise put it in its own bucket at the end.
// Note we increment the depth here to track the dependency.
tx_buckets
.push(Bucket::new_with_depth(entry.tx.clone(), bucket.depth + 1));
}
} else {
// Aggregation failed so discard this new tx.
Expand All @@ -397,7 +394,20 @@ impl Pool {
}
}
}

// Now sort them by depth (ascending) to maintain dependency ordering.
// And then sort them (within each depth) by fee_to_weight descending.
// Txs with no dependencies will be toward the start of the vec.
// Txs with a big chain of dependencies will be toward the end of the vec.
// Will satisfying the "depth" ordering we will then sort txs such that high
// fee_to_weight come first.
tx_buckets.sort_unstable_by_key(|x| (x.depth, -(x.fee_to_weight as i64)));

tx_buckets
.into_iter()
.map(|x| x.raw_txs)
.flatten()
.collect()
}

pub fn find_matching_transactions(&self, kernels: &[TxKernel]) -> Vec<Transaction> {
Expand Down Expand Up @@ -441,3 +451,44 @@ impl Pool {
self.entries.is_empty()
}
}

struct Bucket {
raw_txs: Vec<Transaction>,
fee_to_weight: u64,
depth: u8,
}

impl Bucket {
fn new(tx: Transaction) -> Bucket {
Bucket {
fee_to_weight: tx.fee_to_weight(),
raw_txs: vec![tx.clone()],
depth: 0,
}
}

fn new_with_depth(tx: Transaction, depth: u8) -> Bucket {
Bucket {
fee_to_weight: tx.fee_to_weight(),
raw_txs: vec![tx.clone()],
depth,
}
}

fn aggregate_with_tx(
&self,
new_tx: Transaction,
weighting: Weighting,
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
) -> Result<Bucket, PoolError> {
let mut raw_txs = self.raw_txs.clone();
raw_txs.push(new_tx);
let agg_tx = transaction::aggregate(raw_txs.clone())?;
agg_tx.validate(weighting, verifier_cache)?;
Ok(Bucket {
fee_to_weight: agg_tx.fee_to_weight(),
raw_txs: raw_txs,
depth: self.depth,
})
}
}
24 changes: 15 additions & 9 deletions pool/tests/block_building.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ fn test_transaction_pool_block_building() {

// Add the three root txs to the pool.
write_pool
.add_to_pool(test_source(), root_tx_1, false, &header)
.add_to_pool(test_source(), root_tx_1.clone(), false, &header)
.unwrap();
write_pool
.add_to_pool(test_source(), root_tx_2, false, &header)
.add_to_pool(test_source(), root_tx_2.clone(), false, &header)
.unwrap();
write_pool
.add_to_pool(test_source(), root_tx_3, false, &header)
.add_to_pool(test_source(), root_tx_3.clone(), false, &header)
.unwrap();

// Now add the two child txs to the pool.
Expand All @@ -104,15 +104,21 @@ fn test_transaction_pool_block_building() {
assert_eq!(write_pool.total_size(), 5);
}

let txs = {
let read_pool = pool.read();
read_pool.prepare_mineable_transactions().unwrap()
};
// children should have been aggregated into parents
assert_eq!(txs.len(), 3);
let txs = pool.read().prepare_mineable_transactions().unwrap();

let block = add_block(header, txs, &mut chain);

// Check the block contains what we expect.
assert_eq!(block.inputs().len(), 4);
assert_eq!(block.outputs().len(), 4);
assert_eq!(block.kernels().len(), 6);

assert!(block.kernels().contains(&root_tx_1.kernels()[0]));
assert!(block.kernels().contains(&root_tx_2.kernels()[0]));
assert!(block.kernels().contains(&root_tx_3.kernels()[0]));
assert!(block.kernels().contains(&child_tx_1.kernels()[0]));
assert!(block.kernels().contains(&child_tx_1.kernels()[0]));

// Now reconcile the transaction pool with the new block
// and check the resulting contents of the pool are what we expect.
{
Expand Down
48 changes: 27 additions & 21 deletions pool/tests/block_max_weight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,25 @@ fn test_block_building_max_weight() {
test_transaction(&keychain, vec![290], vec![280, 4]),
];

// Fees and weights of our original txs in insert order.
assert_eq!(
txs.iter().map(|x| x.fee()).collect::<Vec<_>>(),
[9, 8, 1, 7, 6]
);
assert_eq!(
txs.iter().map(|x| x.tx_weight()).collect::<Vec<_>>(),
[8, 8, 4, 8, 8]
);
assert_eq!(
txs.iter().map(|x| x.fee_to_weight()).collect::<Vec<_>>(),
[1125, 1000, 250, 875, 750]
);

// Populate our txpool with the txs.
{
let mut write_pool = pool.write();
for tx in txs {
println!("***** {}", tx.fee_to_weight());
write_pool
.add_to_pool(test_source(), tx, false, &header)
.unwrap();
Expand All @@ -101,35 +116,26 @@ fn test_block_building_max_weight() {
// Check we added them all to the txpool successfully.
assert_eq!(pool.read().total_size(), 5);

// Prepare some "mineable txs" from the txpool.
// Prepare some "mineable" txs from the txpool.
// Note: We cannot fit all the txs from the txpool into a block.
let txs = pool.read().prepare_mineable_transactions().unwrap();

// Check resulting tx aggregation is what we expect.
// We expect to produce 2 aggregated txs based on txpool contents.
assert_eq!(txs.len(), 2);

// Check the tx we built is the aggregation of the correct set of underlying txs.
// We included 4 out of the 5 txs here.
assert_eq!(txs[0].kernels().len(), 1);
assert_eq!(txs[1].kernels().len(), 2);

// Check our weights after aggregation.
assert_eq!(txs[0].inputs().len(), 1);
assert_eq!(txs[0].outputs().len(), 1);
assert_eq!(txs[0].kernels().len(), 1);
assert_eq!(txs[0].tx_weight_as_block(), 25);

assert_eq!(txs[1].inputs().len(), 1);
assert_eq!(txs[1].outputs().len(), 3);
assert_eq!(txs[1].kernels().len(), 2);
assert_eq!(txs[1].tx_weight_as_block(), 70);
// Fees and weights of the "mineable" txs.
assert_eq!(txs.iter().map(|x| x.fee()).collect::<Vec<_>>(), [9, 8, 7]);
assert_eq!(
txs.iter().map(|x| x.tx_weight()).collect::<Vec<_>>(),
[8, 8, 8]
);
assert_eq!(
txs.iter().map(|x| x.fee_to_weight()).collect::<Vec<_>>(),
[1125, 1000, 875]
);

let block = add_block(header, txs, &mut chain);

// Check contents of the block itself (including coinbase reward).
assert_eq!(block.inputs().len(), 2);
assert_eq!(block.outputs().len(), 5);
assert_eq!(block.outputs().len(), 6);
assert_eq!(block.kernels().len(), 4);

// Now reconcile the transaction pool with the new block
Expand Down

0 comments on commit 26e522e

Please sign in to comment.