Skip to content

Commit

Permalink
Introduce transaction prioritization to mempool
Browse files Browse the repository at this point in the history
This commit allows one to specify a transaction priority
when txns are being addded to the mempool, such that
txns with higher priority are returned first.
This is exposed to the RPC interface
It is also used to prioritize our own unpark transactions
  • Loading branch information
viquezclaudio committed Jul 14, 2022
1 parent f75c6fd commit cf35d46
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 44 deletions.
4 changes: 3 additions & 1 deletion mempool/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use nimiq_transaction::Transaction;

use crate::filter::MempoolFilter;
use crate::mempool_state::MempoolState;
use crate::mempool_transactions::TxPriority;
use crate::verify::{verify_tx, VerifyErr};

const CONCURRENT_VERIF_TASKS: u32 = 1000;
Expand Down Expand Up @@ -98,7 +99,8 @@ impl<N: Network, T: Topic + Unpin + Sync> Future for MempoolExecutor<N, T> {

match verify_tx_ret {
Ok(mempool_state_lock) => {
RwLockUpgradableReadGuard::upgrade(mempool_state_lock).put(&tx);
RwLockUpgradableReadGuard::upgrade(mempool_state_lock)
.put(&tx, TxPriority::MediumPriority);
MsgAcceptance::Accept
}
// Reject the message if signature verification fails or transaction is invalid
Expand Down
7 changes: 3 additions & 4 deletions mempool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@
//! transactions that should be included in a block.
#[macro_use]
extern crate log;

/// Mempool transaction module
mod mempool_transactions;

/// Mempool state module
mod mempool_state;

/// Mempool config module
pub mod config;
/// Mempool executor module
pub mod executor;

/// Mempool filter module
pub mod filter;
/// Main mempool module
pub mod mempool;
/// Mempool metrics
#[cfg(feature = "metrics")]
mod mempool_metrics;
/// Mempool transaction module
pub mod mempool_transactions;
/// Verify transaction module
pub mod verify;
18 changes: 14 additions & 4 deletions mempool/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::filter::{MempoolFilter, MempoolRules};
#[cfg(feature = "metrics")]
use crate::mempool_metrics::MempoolMetrics;
use crate::mempool_state::{EvictionReason, MempoolState};
use crate::mempool_transactions::TxPriority;
use crate::verify::{verify_tx, VerifyErr};

/// Transaction topic for the Mempool to request transactions from the network
Expand All @@ -46,7 +47,7 @@ pub struct ControlTransactionTopic;
impl Topic for ControlTransactionTopic {
type Item = Transaction;

const BUFFER_SIZE: usize = 100;
const BUFFER_SIZE: usize = 1024;
const NAME: &'static str = "Controltransactions";
const VALIDATE: bool = true;
}
Expand Down Expand Up @@ -520,7 +521,8 @@ impl Mempool {
let pending_balance = tx.total_value() + sender_total;

if pending_balance <= sender_balance {
mempool_state.put(tx);
//TODO: This could be improved by re-adding unpark txns with high priority
mempool_state.put(tx, TxPriority::MediumPriority);
} else {
debug!(
block_number = block.block_number(),
Expand Down Expand Up @@ -658,7 +660,11 @@ impl Mempool {
}

/// Adds a transaction to the Mempool.
pub async fn add_transaction(&self, transaction: Transaction) -> Result<(), VerifyErr> {
pub async fn add_transaction(
&self,
transaction: Transaction,
tx_priority: Option<TxPriority>,
) -> Result<(), VerifyErr> {
let blockchain = Arc::clone(&self.blockchain);
let mempool_state = Arc::clone(&self.state);
let filter = Arc::clone(&self.filter);
Expand All @@ -668,7 +674,11 @@ impl Mempool {

match verify_tx_ret {
Ok(mempool_state_lock) => {
RwLockUpgradableReadGuard::upgrade(mempool_state_lock).put(&transaction);
RwLockUpgradableReadGuard::upgrade(mempool_state_lock).put(
&transaction,
tx_priority.unwrap_or(TxPriority::MediumPriority),
);

Ok(())
}
Err(e) => Err(e),
Expand Down
8 changes: 4 additions & 4 deletions mempool/src/mempool_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{

#[cfg(feature = "metrics")]
use crate::mempool_metrics::MempoolMetrics;
use crate::mempool_transactions::MempoolTransactions;
use crate::mempool_transactions::{MempoolTransactions, TxPriority};
use nimiq_hash::{Blake2bHash, Hash};
use nimiq_keys::Address;
use nimiq_primitives::{account::AccountType, coin::Coin};
Expand Down Expand Up @@ -69,7 +69,7 @@ impl MempoolState {
}
}

pub(crate) fn put(&mut self, tx: &Transaction) -> bool {
pub(crate) fn put(&mut self, tx: &Transaction, priority: TxPriority) -> bool {
let tx_hash = tx.hash();

if self.regular_transactions.contains_key(&tx_hash)
Expand All @@ -81,9 +81,9 @@ impl MempoolState {
// If we are adding a stacking transaction we insert it into the control txns container
// Staking txns are control txns
if tx.sender_type == AccountType::Staking || tx.recipient_type == AccountType::Staking {
self.control_transactions.insert(tx);
self.control_transactions.insert(tx, priority);
} else {
self.regular_transactions.insert(tx);
self.regular_transactions.insert(tx, priority);
}

// Update the per sender state
Expand Down
43 changes: 35 additions & 8 deletions mempool/src/mempool_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ use keyed_priority_queue::KeyedPriorityQueue;
use nimiq_hash::{Blake2bHash, Hash};
use nimiq_transaction::Transaction;

/// TxPriority that is used when adding transactions into the mempool
/// Higher Priority transactions are returned first from the mempool
#[derive(Copy, Clone, PartialEq)]
pub enum TxPriority {
/// Low Priority transactions
LowPriority = 1,
/// Medium Priority transactions, this is thed default
MediumPriority = 2,
/// High Priority transactions,
HighPriority = 3,
}

/// Ordering in which transactions removed from the mempool to be included in blocks.
/// This is stored on a max-heap, so the greater transaction comes first.
/// Compares by fee per byte (higher first), then by insertion order (lower i.e. older first).
Expand All @@ -17,6 +29,7 @@ use nimiq_transaction::Transaction;
// we might prefer basic transactions over staking contract transactions, etc, etc.
#[derive(PartialEq)]
pub struct BestTxOrder {
priority: TxPriority,
fee_per_byte: f64,
insertion_order: u64,
}
Expand All @@ -31,9 +44,14 @@ impl PartialOrd for BestTxOrder {

impl Ord for BestTxOrder {
fn cmp(&self, other: &Self) -> Ordering {
self.fee_per_byte
.partial_cmp(&other.fee_per_byte)
.expect("fees can't be NaN")
(self.priority as u8)
.partial_cmp(&(other.priority as u8))
.expect("TX Priority is required")
.then(
self.fee_per_byte
.partial_cmp(&other.fee_per_byte)
.expect("fees can't be NaN"),
)
.then(self.insertion_order.cmp(&other.insertion_order).reverse())
}
}
Expand All @@ -43,6 +61,7 @@ impl Ord for BestTxOrder {
/// Compares by fee per byte (lower first), then by insertion order (higher i.e. newer first).
#[derive(PartialEq)]
pub struct WorstTxOrder {
priority: TxPriority,
fee_per_byte: f64,
insertion_order: u64,
}
Expand All @@ -57,11 +76,17 @@ impl PartialOrd for WorstTxOrder {

impl Ord for WorstTxOrder {
fn cmp(&self, other: &Self) -> Ordering {
self.fee_per_byte
.partial_cmp(&other.fee_per_byte)
.expect("fees can't be NaN")
(self.priority as u8)
.partial_cmp(&(other.priority as u8))
.expect("TX Priority is required")
.reverse()
.then(self.insertion_order.cmp(&other.insertion_order))
.then(
self.fee_per_byte
.partial_cmp(&other.fee_per_byte)
.expect("fees can't be NaN")
.reverse()
.then(self.insertion_order.cmp(&other.insertion_order)),
)
}
}

Expand Down Expand Up @@ -147,7 +172,7 @@ impl MempoolTransactions {
self.transactions.get(hash)
}

pub(crate) fn insert(&mut self, tx: &Transaction) -> bool {
pub(crate) fn insert(&mut self, tx: &Transaction, priority: TxPriority) -> bool {
let tx_hash = tx.hash();

if self.transactions.contains_key(&tx_hash) {
Expand All @@ -159,13 +184,15 @@ impl MempoolTransactions {
self.best_transactions.push(
tx_hash.clone(),
BestTxOrder {
priority,
fee_per_byte: tx.fee_per_byte(),
insertion_order: self.tx_counter,
},
);
self.worst_transactions.push(
tx_hash.clone(),
WorstTxOrder {
priority,
fee_per_byte: tx.fee_per_byte(),
insertion_order: self.tx_counter,
},
Expand Down
47 changes: 31 additions & 16 deletions mempool/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use nimiq_keys::{
Address, KeyPair as SchnorrKeyPair, PrivateKey as SchnorrPrivateKey,
PublicKey as SchnorrPublicKey, SecureGenerate,
};
use nimiq_mempool::config::MempoolConfig;
use nimiq_mempool::mempool::Mempool;
use nimiq_mempool::{config::MempoolConfig, mempool_transactions::TxPriority};
use nimiq_network_mock::{MockHub, MockId, MockNetwork, MockPeerId};
use nimiq_primitives::{networks::NetworkId, policy};
use nimiq_test_log::test;
Expand All @@ -37,7 +37,11 @@ const NUM_TXNS_START_STOP: usize = 100;
pub const ACCOUNT_SECRET_KEY: &str =
"6c9320ac201caf1f8eaa5b05f5d67a9e77826f3f6be266a0ecccc20416dc6587";

pub const VALIDATOR_SECRET_KEY: &str =
"041580cc67e66e9e08b68fd9e4c9deb68737168fbe7488de2638c2e906c2f5ad";

const STAKER_ADDRESS: &str = "NQ20TSB0DFSMUH9C15GQGAGJTTE4D3MA859E";
const VALIDATOR_ADDRESS: &str = "NQ20 TSB0 DFSM UH9C 15GQ GAGJ TTE4 D3MA 859E";

fn ed25519_key_pair(secret_key: &str) -> SchnorrKeyPair {
let priv_key: SchnorrPrivateKey =
Expand Down Expand Up @@ -1551,12 +1555,24 @@ async fn mempool_update_create_staker_twice() {
}

#[test(tokio::test(flavor = "multi_thread", worker_threads = 10))]
async fn mempool_basic_control_tx() {
async fn mempool_basic_prioritization_control_tx() {
let time = Arc::new(OffsetTime::new());
let env = VolatileEnvironment::new(10).unwrap();

let key_pair = ed25519_key_pair(ACCOUNT_SECRET_KEY);
let validator_signing_key = ed25519_key_pair(VALIDATOR_SECRET_KEY);
let address = Address::from_any_str(STAKER_ADDRESS).unwrap();
let validator_address = Address::from_any_str(VALIDATOR_ADDRESS).unwrap();

let unpark = TransactionBuilder::new_unpark_validator(
&key_pair,
validator_address,
&validator_signing_key,
1.try_into().unwrap(),
1,
NetworkId::UnitAlbatross,
)
.unwrap();

// This is the transaction produced in the block
let tx = TransactionBuilder::new_create_staker(
Expand All @@ -1583,35 +1599,34 @@ async fn mempool_basic_control_tx() {
let mock_network = Arc::new(hub.new_network());

// Send txns to mempool
send_control_txn_to_mempool(&mempool, mock_network, mock_id, txns).await;
send_control_txn_to_mempool(&mempool, mock_network, mock_id, txns.clone()).await;

assert_eq!(
mempool.num_transactions(),
1,
"Number of txns in mempool is not what is expected"
);

// Get regular txns from mempool
let (updated_txns, _) = mempool.get_transactions_for_block(10_000);

//We should obtain 0 regular txns since we only have 1 control tx in the mempool
assert_eq!(
updated_txns.len(),
0,
"Number of txns is not what is expected"
);
// Insert unpark with high priority
mempool
.add_transaction(unpark.clone(), Some(TxPriority::HighPriority))
.await
.unwrap();

// Get control txns from mempool
let (updated_txns, _) = mempool.get_control_transactions_for_block(10_000);

//Now we should obtain one control transaction
// Now we should obtain one control transaction
assert_eq!(
updated_txns.len(),
1,
2,
"Number of txns is not what is expected"
);

//Now the mempool should have 0 total txns
// We should obtain the txns in the reversed ordered as the unpark should have been prioritized.
assert_eq!(updated_txns[0], unpark);

// Now the mempool should have 0 total txns
assert_eq!(
mempool.num_transactions(),
0,
Expand Down Expand Up @@ -1875,7 +1890,7 @@ async fn applies_total_tx_size_limits() {
let worst_tx = txns[1].hash::<Blake2bHash>();

for tx in txns {
mempool.add_transaction(tx).await.unwrap();
mempool.add_transaction(tx, None).await.unwrap();
}

let (mempool_txns, _) = mempool.get_transactions_for_block(txns_len);
Expand Down
5 changes: 5 additions & 0 deletions rpc-interface/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ pub trait MempoolInterface {

async fn push_transaction(&mut self, raw_tx: String) -> Result<Blake2bHash, Self::Error>;

async fn push_high_priority_transaction(
&mut self,
raw_tx: String,
) -> Result<Blake2bHash, Self::Error>;

async fn mempool_content(
&mut self,
include_transactions: bool,
Expand Down
22 changes: 21 additions & 1 deletion rpc-server/src/dispatchers/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use beserial::Deserialize;
use nimiq_hash::{Blake2bHash, Hash};
use nimiq_mempool::mempool::Mempool;

use nimiq_mempool::mempool_transactions::TxPriority;
use nimiq_rpc_interface::mempool::MempoolInterface;
use nimiq_rpc_interface::types::{HashOrTx, MempoolInfo};

Expand Down Expand Up @@ -33,7 +34,26 @@ impl MempoolInterface for MempoolDispatcher {
Deserialize::deserialize_from_vec(&hex::decode(&raw_tx)?)?;
let txid = tx.hash::<Blake2bHash>();

match self.mempool.add_transaction(tx).await {
match self.mempool.add_transaction(tx, None).await {
Ok(_) => Ok(txid),
Err(e) => Err(Error::MempoolError(e)),
}
}

/// Pushes the given serialized transaction to the local mempool with high priority
async fn push_high_priority_transaction(
&mut self,
raw_tx: String,
) -> Result<Blake2bHash, Self::Error> {
let tx: nimiq_transaction::Transaction =
Deserialize::deserialize_from_vec(&hex::decode(&raw_tx)?)?;
let txid = tx.hash::<Blake2bHash>();

match self
.mempool
.add_transaction(tx, Some(TxPriority::HighPriority))
.await
{
Ok(_) => Ok(txid),
Err(e) => Err(Error::MempoolError(e)),
}
Expand Down
2 changes: 1 addition & 1 deletion spammer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ async fn spam(
let consensus1 = consensus.clone();
let mp = std::sync::Arc::clone(&mempool);
tokio::spawn(async move {
if let Err(e) = mp.add_transaction(tx.clone()).await {
if let Err(e) = mp.add_transaction(tx.clone(), None).await {
log::warn!("Mempool rejected transaction: {:?} - {:#?}", e, tx);
}
if let Err(e) = consensus1.send_transaction(tx).await {
Expand Down
Loading

0 comments on commit cf35d46

Please sign in to comment.