Skip to content

Commit

Permalink
Implementation of second mempool
Browse files Browse the repository at this point in the history
- Introduced a second mempool, used for control transactions
- When blocks are produced, first we try to include control txns
- If there is space left, we include regular txns
- This means that control txns have a higher priority over reg txns.
  • Loading branch information
viquezclaudio committed Jul 5, 2022
1 parent 95ee119 commit 703b0bb
Show file tree
Hide file tree
Showing 6 changed files with 787 additions and 125 deletions.
6 changes: 5 additions & 1 deletion consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use futures::{FutureExt, StreamExt};
use nimiq_primitives::account::AccountType;
use parking_lot::RwLock;
use tokio::sync::broadcast::{channel as broadcast, Sender as BroadcastSender};
use tokio::time::Sleep;
use tokio_stream::wrappers::BroadcastStream;

use nimiq_blockchain::{AbstractBlockchain, Blockchain};
use nimiq_database::Environment;
use nimiq_mempool::mempool::TransactionTopic;
use nimiq_mempool::mempool::{ControlTransactionTopic, TransactionTopic};
use nimiq_network_interface::network::Network;
use nimiq_transaction::Transaction;

Expand Down Expand Up @@ -42,6 +43,9 @@ impl<N: Network> Clone for ConsensusProxy<N> {

impl<N: Network> ConsensusProxy<N> {
pub async fn send_transaction(&self, tx: Transaction) -> Result<(), N::Error> {
if tx.sender_type == AccountType::Staking || tx.recipient_type == AccountType::Staking {
return self.network.publish::<ControlTransactionTopic>(tx).await;
}
self.network.publish::<TransactionTopic>(tx).await
}

Expand Down
101 changes: 99 additions & 2 deletions mempool/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use nimiq_primitives::networks::NetworkId;
use nimiq_transaction::Transaction;

use crate::filter::MempoolFilter;
use crate::mempool::{MempoolState, TransactionTopic};
use crate::mempool::{ControlTransactionTopic, MempoolState, TransactionTopic};
use crate::verify::{verify_tx, VerifyErr};

const CONCURRENT_VERIF_TASKS: u32 = 1000;
Expand All @@ -31,7 +31,7 @@ pub(crate) struct MempoolExecutor<N: Network> {
// Ongoing verification tasks counter
verification_tasks: Arc<AtomicU32>,

// Reference to the network, to alow for message validation
// Reference to the network, to allow for message validation
network: Arc<N>,

// Network ID, used for tx verification
Expand Down Expand Up @@ -114,3 +114,100 @@ impl<N: Network> Future for MempoolExecutor<N> {
Poll::Ready(())
}
}

pub(crate) struct ControlMempoolExecutor<N: Network> {
// Blockchain reference
blockchain: Arc<RwLock<Blockchain>>,

// The mempool state: the data structure where the transactions are stored
state: Arc<RwLock<MempoolState>>,

// Mempool filter
filter: Arc<RwLock<MempoolFilter>>,

// Ongoing verification tasks counter
verification_tasks: Arc<AtomicU32>,

// Reference to the network, to allow for message validation
network: Arc<N>,

// Network ID, used for tx verification
network_id: Arc<NetworkId>,

// Transaction stream that is used to listen to transactions from the network
txn_stream: BoxStream<'static, (Transaction, <N as Network>::PubsubId)>,
}

impl<N: Network> ControlMempoolExecutor<N> {
pub fn new(
blockchain: Arc<RwLock<Blockchain>>,
state: Arc<RwLock<MempoolState>>,
filter: Arc<RwLock<MempoolFilter>>,
network: Arc<N>,
txn_stream: BoxStream<'static, (Transaction, <N as Network>::PubsubId)>,
) -> Self {
Self {
blockchain: blockchain.clone(),
state,
filter,
network,
network_id: Arc::new(blockchain.read().network_id),
verification_tasks: Arc::new(AtomicU32::new(0)),
txn_stream,
}
}
}

impl<N: Network> Future for ControlMempoolExecutor<N> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
while let Some((tx, pubsub_id)) = ready!(self.txn_stream.as_mut().poll_next_unpin(cx)) {
if self.verification_tasks.fetch_add(0, AtomicOrdering::SeqCst)
>= CONCURRENT_VERIF_TASKS
{
log::debug!("Reached the max number of verification tasks");
continue;
}

let blockchain = Arc::clone(&self.blockchain);
let mempool_state = Arc::clone(&self.state);
let filter = Arc::clone(&self.filter);
let tasks_count = Arc::clone(&self.verification_tasks);
let network_id = Arc::clone(&self.network_id);
let network = Arc::clone(&self.network);

// Spawn the transaction verification task
tokio::task::spawn(async move {
tasks_count.fetch_add(1, AtomicOrdering::SeqCst);

// Verifying and pushing the TX in a separate scope to drop the lock that is returned by
// the verify_tx function immediately
let acceptance = {
let verify_tx_ret =
verify_tx(&tx, blockchain, network_id, &mempool_state, filter).await;

match verify_tx_ret {
Ok(mempool_state_lock) => {
RwLockUpgradableReadGuard::upgrade(mempool_state_lock).put(&tx);
MsgAcceptance::Accept
}
// Reject the message if signature verification fails or transaction is invalid
// for current validation window
Err(VerifyErr::InvalidSignature) => MsgAcceptance::Reject,
Err(VerifyErr::InvalidTxWindow) => MsgAcceptance::Reject,
Err(_) => MsgAcceptance::Ignore,
}
};

network.validate_message::<ControlTransactionTopic>(pubsub_id, acceptance);

tasks_count.fetch_sub(1, AtomicOrdering::SeqCst);
});
}

// We have exited the loop, so poll_next() must have returned Poll::Ready(None).
// Thus, we terminate the executor future.
Poll::Ready(())
}
}
Loading

0 comments on commit 703b0bb

Please sign in to comment.