Skip to content

Commit

Permalink
Introduce transaction prioritization to mempool
Browse files Browse the repository at this point in the history
This commit allows one to specify a transaction priority
when txns are being addded to the mempool, such that
txns with higher priority are returned first.
This is exposed to the RPC interface
It is also used to prioritize our own unpark transactions
  • Loading branch information
viquezclaudio committed Jul 5, 2022
1 parent 8fae9da commit ca7297e
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 41 deletions.
6 changes: 4 additions & 2 deletions mempool/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ impl<N: Network> Future for MempoolExecutor<N> {

match verify_tx_ret {
Ok(mempool_state_lock) => {
RwLockUpgradableReadGuard::upgrade(mempool_state_lock).put(&tx);
RwLockUpgradableReadGuard::upgrade(mempool_state_lock)
.put(&tx, crate::mempool::TxPriority::MediumPriority);
MsgAcceptance::Accept
}
// Reject the message if signature verification fails or transaction is invalid
Expand Down Expand Up @@ -189,7 +190,8 @@ impl<N: Network> Future for ControlMempoolExecutor<N> {

match verify_tx_ret {
Ok(mempool_state_lock) => {
RwLockUpgradableReadGuard::upgrade(mempool_state_lock).put(&tx);
RwLockUpgradableReadGuard::upgrade(mempool_state_lock)
.put(&tx, crate::mempool::TxPriority::MediumPriority);
MsgAcceptance::Accept
}
// Reject the message if signature verification fails or transaction is invalid
Expand Down
71 changes: 56 additions & 15 deletions mempool/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct ControlTransactionTopic;
impl Topic for ControlTransactionTopic {
type Item = Transaction;

const BUFFER_SIZE: usize = 100;
const BUFFER_SIZE: usize = 1024;
const NAME: &'static str = "Controltransactions";
const VALIDATE: bool = true;
}
Expand Down Expand Up @@ -555,7 +555,7 @@ impl Mempool {
let pending_balance = tx.total_value() + sender_total;

if pending_balance <= sender_balance {
mempool_state.put(tx);
mempool_state.put(tx, TxPriority::MediumPriority);
} else {
debug!(
block_number = block.block_number(),
Expand Down Expand Up @@ -693,7 +693,11 @@ impl Mempool {
}

/// Adds a transaction to the Mempool.
pub async fn add_transaction(&self, transaction: Transaction) -> Result<(), VerifyErr> {
pub async fn add_transaction(
&self,
transaction: Transaction,
tx_priority: Option<TxPriority>,
) -> Result<(), VerifyErr> {
let blockchain = Arc::clone(&self.blockchain);
let mempool_state = Arc::clone(&self.state);
let filter = Arc::clone(&self.filter);
Expand All @@ -703,7 +707,15 @@ impl Mempool {

match verify_tx_ret {
Ok(mempool_state_lock) => {
RwLockUpgradableReadGuard::upgrade(mempool_state_lock).put(&transaction);
if let Some(priority) = tx_priority {
RwLockUpgradableReadGuard::upgrade(mempool_state_lock)
.put(&transaction, priority);
} else {
//If the priority is not provided we default to medium
RwLockUpgradableReadGuard::upgrade(mempool_state_lock)
.put(&transaction, TxPriority::MediumPriority);
}

Ok(())
}
Err(e) => Err(e),
Expand Down Expand Up @@ -862,7 +874,7 @@ impl MempoolTransactions {
self.transactions.get(hash)
}

pub(crate) fn insert(&mut self, tx: &Transaction) -> bool {
pub(crate) fn insert(&mut self, tx: &Transaction, priority: TxPriority) -> bool {
let tx_hash = tx.hash();

if self.transactions.contains_key(&tx_hash) {
Expand All @@ -874,13 +886,15 @@ impl MempoolTransactions {
self.best_transactions.push(
tx_hash.clone(),
BestTxOrder {
priority,
fee_per_byte: tx.fee_per_byte(),
insertion_order: self.tx_counter,
},
);
self.worst_transactions.push(
tx_hash.clone(),
WorstTxOrder {
priority,
fee_per_byte: tx.fee_per_byte(),
insertion_order: self.tx_counter,
},
Expand Down Expand Up @@ -951,7 +965,7 @@ impl MempoolState {
}
}

pub(crate) fn put(&mut self, tx: &Transaction) -> bool {
pub(crate) fn put(&mut self, tx: &Transaction, priority: TxPriority) -> bool {
let tx_hash = tx.hash();

if self.regular_transactions.contains_key(&tx_hash)
Expand All @@ -963,9 +977,11 @@ impl MempoolState {
// If we are adding a stacking transaction we insert it into the control txns container
// Staking txns are control txns
if tx.sender_type == AccountType::Staking || tx.recipient_type == AccountType::Staking {
self.control_transactions.insert(tx);
// Insert it into the control transactions
self.control_transactions.insert(tx, priority);
} else {
self.regular_transactions.insert(tx);
// Regular transaction
self.regular_transactions.insert(tx, priority);
}

// Update the per sender state
Expand Down Expand Up @@ -1171,6 +1187,18 @@ pub(crate) struct SenderPendingState {
pub(crate) txns: HashSet<Blake2bHash>,
}

/// TxPriority that is used when adding transactions into the mempool
/// Higher Priority transactions are returned first from the mempool
#[derive(Copy, Clone, PartialEq)]
pub enum TxPriority {
/// Low Priority transactions
LowPriority = 1,
/// Medium Priority transactions, this is thed default
MediumPriority = 2,
/// High Priority transactions,
HighPriority = 3,
}

/// Ordering in which transactions removed from the mempool to be included in blocks.
/// This is stored on a max-heap, so the greater transaction comes first.
/// Compares by fee per byte (higher first), then by insertion order (lower i.e. older first).
Expand All @@ -1179,6 +1207,7 @@ pub(crate) struct SenderPendingState {
// we might prefer basic transactions over staking contract transactions, etc, etc.
#[derive(PartialEq)]
pub struct BestTxOrder {
priority: TxPriority,
fee_per_byte: f64,
insertion_order: u64,
}
Expand All @@ -1193,9 +1222,14 @@ impl PartialOrd for BestTxOrder {

impl Ord for BestTxOrder {
fn cmp(&self, other: &Self) -> Ordering {
self.fee_per_byte
.partial_cmp(&other.fee_per_byte)
.expect("fees can't be NaN")
(self.priority as u8)
.partial_cmp(&(other.priority as u8))
.expect("TX Priority is required")
.then(
self.fee_per_byte
.partial_cmp(&other.fee_per_byte)
.expect("fees can't be NaN"),
)
.then(self.insertion_order.cmp(&other.insertion_order).reverse())
}
}
Expand All @@ -1205,6 +1239,7 @@ impl Ord for BestTxOrder {
/// Compares by fee per byte (lower first), then by insertion order (higher i.e. newer first).
#[derive(PartialEq)]
pub struct WorstTxOrder {
priority: TxPriority,
fee_per_byte: f64,
insertion_order: u64,
}
Expand All @@ -1219,10 +1254,16 @@ impl PartialOrd for WorstTxOrder {

impl Ord for WorstTxOrder {
fn cmp(&self, other: &Self) -> Ordering {
self.fee_per_byte
.partial_cmp(&other.fee_per_byte)
.expect("fees can't be NaN")
(self.priority as u8)
.partial_cmp(&(other.priority as u8))
.expect("TX Priority is required")
.reverse()
.then(self.insertion_order.cmp(&other.insertion_order))
.then(
self.fee_per_byte
.partial_cmp(&other.fee_per_byte)
.expect("fees can't be NaN")
.reverse()
.then(self.insertion_order.cmp(&other.insertion_order)),
)
}
}
47 changes: 31 additions & 16 deletions mempool/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use nimiq_keys::{
Address, KeyPair as SchnorrKeyPair, PrivateKey as SchnorrPrivateKey,
PublicKey as SchnorrPublicKey, SecureGenerate,
};
use nimiq_mempool::config::MempoolConfig;
use nimiq_mempool::mempool::Mempool;
use nimiq_mempool::{config::MempoolConfig, mempool::TxPriority};
use nimiq_network_mock::{MockHub, MockId, MockNetwork, MockPeerId};
use nimiq_primitives::{networks::NetworkId, policy};
use nimiq_test_log::test;
Expand All @@ -37,7 +37,11 @@ const NUM_TXNS_START_STOP: usize = 100;
pub const ACCOUNT_SECRET_KEY: &str =
"6c9320ac201caf1f8eaa5b05f5d67a9e77826f3f6be266a0ecccc20416dc6587";

pub const VALIDATOR_SECRET_KEY: &str =
"041580cc67e66e9e08b68fd9e4c9deb68737168fbe7488de2638c2e906c2f5ad";

const STAKER_ADDRESS: &str = "NQ20TSB0DFSMUH9C15GQGAGJTTE4D3MA859E";
const VALIDATOR_ADDRESS: &str = "NQ20 TSB0 DFSM UH9C 15GQ GAGJ TTE4 D3MA 859E";

fn ed25519_key_pair(secret_key: &str) -> SchnorrKeyPair {
let priv_key: SchnorrPrivateKey =
Expand Down Expand Up @@ -1551,12 +1555,24 @@ async fn mempool_update_create_staker_twice() {
}

#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))]
async fn mempool_basic_control_tx() {
async fn mempool_basic_prioritization_control_tx() {
let time = Arc::new(OffsetTime::new());
let env = VolatileEnvironment::new(10).unwrap();

let key_pair = ed25519_key_pair(ACCOUNT_SECRET_KEY);
let validator_signing_key = ed25519_key_pair(VALIDATOR_SECRET_KEY);
let address = Address::from_any_str(STAKER_ADDRESS).unwrap();
let validator_address = Address::from_any_str(VALIDATOR_ADDRESS).unwrap();

let unpark = TransactionBuilder::new_unpark_validator(
&key_pair,
validator_address,
&validator_signing_key,
1.try_into().unwrap(),
1,
NetworkId::UnitAlbatross,
)
.unwrap();

// This is the transaction produced in the block
let tx = TransactionBuilder::new_create_staker(
Expand All @@ -1583,35 +1599,34 @@ async fn mempool_basic_control_tx() {
let mock_network = Arc::new(hub.new_network());

// Send txns to mempool
send_control_txn_to_mempool(&mempool, mock_network, mock_id, txns).await;
send_control_txn_to_mempool(&mempool, mock_network, mock_id, txns.clone()).await;

assert_eq!(
mempool.num_transactions(),
1,
"Number of txns in mempool is not what is expected"
);

// Get regular txns from mempool
let (updated_txns, _) = mempool.get_transactions_for_block(10_000);

//We should obtain 0 regular txns since we only have 1 control tx in the mempool
assert_eq!(
updated_txns.len(),
0,
"Number of txns is not what is expected"
);
// Insert unpark with high priority
mempool
.add_transaction(unpark.clone(), Some(TxPriority::HighPriority))
.await
.unwrap();

// Get control txns from mempool
let (updated_txns, _) = mempool.get_control_transactions_for_block(10_000);

//Now we should obtain one control transaction
// Now we should obtain one control transaction
assert_eq!(
updated_txns.len(),
1,
2,
"Number of txns is not what is expected"
);

//Now the mempool should have 0 total txns
// We should obtain the txns in the reversed ordered as the unpark should have been prioritized.
assert_eq!(updated_txns[0], unpark);

// Now the mempool should have 0 total txns
assert_eq!(
mempool.num_transactions(),
0,
Expand Down Expand Up @@ -1875,7 +1890,7 @@ async fn applies_total_tx_size_limits() {
let worst_tx = txns[1].hash::<Blake2bHash>();

for tx in txns {
mempool.add_transaction(tx).await.unwrap();
mempool.add_transaction(tx, None).await.unwrap();
}

let (mempool_txns, _) = mempool.get_transactions_for_block(txns_len);
Expand Down
5 changes: 5 additions & 0 deletions rpc-interface/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ pub trait MempoolInterface {

async fn push_transaction(&mut self, raw_tx: String) -> Result<Blake2bHash, Self::Error>;

async fn push_high_priority_transaction(
&mut self,
raw_tx: String,
) -> Result<Blake2bHash, Self::Error>;

async fn mempool_content(
&mut self,
include_transactions: bool,
Expand Down
23 changes: 21 additions & 2 deletions rpc-server/src/dispatchers/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;
use beserial::Deserialize;

use nimiq_hash::{Blake2bHash, Hash};
use nimiq_mempool::mempool::Mempool;
use nimiq_mempool::mempool::{Mempool, TxPriority};

use nimiq_rpc_interface::mempool::MempoolInterface;
use nimiq_rpc_interface::types::{HashOrTx, MempoolInfo};
Expand Down Expand Up @@ -33,7 +33,26 @@ impl MempoolInterface for MempoolDispatcher {
Deserialize::deserialize_from_vec(&hex::decode(&raw_tx)?)?;
let txid = tx.hash::<Blake2bHash>();

match self.mempool.add_transaction(tx).await {
match self.mempool.add_transaction(tx, None).await {
Ok(_) => Ok(txid),
Err(e) => Err(Error::MempoolError(e)),
}
}

/// Pushes the given serialized transaction to the local mempool with high priority
async fn push_high_priority_transaction(
&mut self,
raw_tx: String,
) -> Result<Blake2bHash, Self::Error> {
let tx: nimiq_transaction::Transaction =
Deserialize::deserialize_from_vec(&hex::decode(&raw_tx)?)?;
let txid = tx.hash::<Blake2bHash>();

match self
.mempool
.add_transaction(tx, Some(TxPriority::HighPriority))
.await
{
Ok(_) => Ok(txid),
Err(e) => Err(Error::MempoolError(e)),
}
Expand Down
2 changes: 1 addition & 1 deletion spammer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ async fn spam(
let consensus1 = consensus.clone();
let mp = std::sync::Arc::clone(&mempool);
tokio::spawn(async move {
if let Err(e) = mp.add_transaction(tx.clone()).await {
if let Err(e) = mp.add_transaction(tx.clone(), None).await {
log::warn!("Mempool rejected transaction: {:?} - {:#?}", e, tx);
}
if let Err(e) = consensus1.send_transaction(tx).await {
Expand Down
14 changes: 9 additions & 5 deletions validator/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use nimiq_database::{Database, Environment, ReadTransaction, WriteTransaction};
use nimiq_hash::{Blake2bHash, Hash};
use nimiq_keys::{Address, KeyPair as SchnorrKeyPair};
use nimiq_macros::store_waker;
use nimiq_mempool::{config::MempoolConfig, mempool::Mempool};
use nimiq_mempool::{config::MempoolConfig, mempool::Mempool, mempool::TxPriority};
use nimiq_network_interface::network::{Network, PubsubId, Topic};
use nimiq_primitives::coin::Coin;
use nimiq_primitives::policy;
Expand Down Expand Up @@ -629,11 +629,15 @@ impl<TNetwork: Network, TValidatorNetwork: ValidatorNetwork>
.unwrap(); // TODO: Handle transaction creation error
let tx_hash = unpark_transaction.hash();

let cn = self.consensus.clone();
let mempool = self.mempool.clone();
tokio::spawn(async move {
debug!("Sending unpark transaction");
if cn.send_transaction(unpark_transaction).await.is_err() {
error!("Failed to send unpark transaction");
debug!("Adding unpark transaction to mempool");
if mempool
.add_transaction(unpark_transaction, Some(TxPriority::HighPriority))
.await
.is_err()
{
error!("Failed adding unpark transaction into mempool");
}
});

Expand Down

0 comments on commit ca7297e

Please sign in to comment.