Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Second (control) mempool implementation #885

Merged
merged 4 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use futures::{FutureExt, StreamExt};
use nimiq_primitives::account::AccountType;
use parking_lot::RwLock;
use tokio::sync::broadcast::{channel as broadcast, Sender as BroadcastSender};
use tokio::time::Sleep;
use tokio_stream::wrappers::BroadcastStream;

use nimiq_blockchain::{AbstractBlockchain, Blockchain};
use nimiq_database::Environment;
use nimiq_mempool::mempool::TransactionTopic;
use nimiq_mempool::mempool::{ControlTransactionTopic, TransactionTopic};
use nimiq_network_interface::network::Network;
use nimiq_transaction::Transaction;

Expand Down Expand Up @@ -42,6 +43,9 @@ impl<N: Network> Clone for ConsensusProxy<N> {

impl<N: Network> ConsensusProxy<N> {
pub async fn send_transaction(&self, tx: Transaction) -> Result<(), N::Error> {
if tx.sender_type == AccountType::Staking || tx.recipient_type == AccountType::Staking {
return self.network.publish::<ControlTransactionTopic>(tx).await;
}
self.network.publish::<TransactionTopic>(tx).await
sisou marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
2 changes: 2 additions & 0 deletions lib/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,11 +696,13 @@ impl ClientConfigBuilder {
pub fn mempool(
&mut self,
size_limit: usize,
control_size_limit: usize,
filter_rules: MempoolRules,
filter_limit: usize,
) -> &mut Self {
self.mempool = Some(MempoolConfig {
size_limit,
control_size_limit,
filter_rules,
filter_limit,
});
Expand Down
4 changes: 4 additions & 0 deletions lib/src/config/config_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ pub struct DatabaseSettings {
pub struct MempoolSettings {
pub filter: Option<MempoolFilterSettings>,
pub size_limit: Option<usize>,
pub control_size_limit: Option<usize>,
pub blacklist_limit: Option<usize>,
}

Expand Down Expand Up @@ -427,6 +428,9 @@ impl From<MempoolSettings> for MempoolConfig {
fn from(mempool: MempoolSettings) -> Self {
Self {
size_limit: mempool.size_limit.unwrap_or(Mempool::DEFAULT_SIZE_LIMIT),
control_size_limit: mempool
.control_size_limit
.unwrap_or(Mempool::DEFAULT_CONTROL_SIZE_LIMIT),
filter_limit: mempool
.blacklist_limit
.unwrap_or(MempoolFilter::DEFAULT_BLACKLIST_SIZE),
Expand Down
5 changes: 4 additions & 1 deletion mempool/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use crate::mempool::Mempool;
/// Struct defining a Mempool configuration
#[derive(Debug, Clone)]
pub struct MempoolConfig {
/// Total size limit of transactions in the mempool (bytes)
/// Total size limit of transactions in the regular mempool (bytes)
pub size_limit: usize,
/// Total size limit of transactions in the control mempool (bytes)
pub control_size_limit: usize,
/// Mempool filter rules
pub filter_rules: MempoolRules,
/// Mempool filter limit or size
Expand All @@ -16,6 +18,7 @@ impl Default for MempoolConfig {
fn default() -> MempoolConfig {
MempoolConfig {
size_limit: Mempool::DEFAULT_SIZE_LIMIT,
control_size_limit: Mempool::DEFAULT_CONTROL_SIZE_LIMIT,
filter_rules: MempoolRules::default(),
filter_limit: MempoolFilter::DEFAULT_BLACKLIST_SIZE,
}
Expand Down
22 changes: 14 additions & 8 deletions mempool/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
use std::sync::Arc;
Expand All @@ -8,17 +9,17 @@ use futures::{ready, stream::BoxStream, StreamExt};
use parking_lot::{RwLock, RwLockUpgradableReadGuard};

use nimiq_blockchain::Blockchain;
use nimiq_network_interface::network::{MsgAcceptance, Network};
use nimiq_network_interface::network::{MsgAcceptance, Network, Topic};
use nimiq_primitives::networks::NetworkId;
use nimiq_transaction::Transaction;

use crate::filter::MempoolFilter;
use crate::mempool::{MempoolState, TransactionTopic};
use crate::mempool_state::MempoolState;
use crate::verify::{verify_tx, VerifyErr};

const CONCURRENT_VERIF_TASKS: u32 = 1000;

pub(crate) struct MempoolExecutor<N: Network> {
pub(crate) struct MempoolExecutor<N: Network, T: Topic + Unpin + Sync> {
// Blockchain reference
blockchain: Arc<RwLock<Blockchain>>,

Expand All @@ -31,37 +32,42 @@ pub(crate) struct MempoolExecutor<N: Network> {
// Ongoing verification tasks counter
verification_tasks: Arc<AtomicU32>,

// Reference to the network, to alow for message validation
// Reference to the network, to allow for message validation
network: Arc<N>,

// Network ID, used for tx verification
network_id: Arc<NetworkId>,

// Transaction stream that is used to listen to transactions from the network
txn_stream: BoxStream<'static, (Transaction, <N as Network>::PubsubId)>,

// Phantom data for the unused type T
_phantom: PhantomData<T>,
}

impl<N: Network> MempoolExecutor<N> {
impl<N: Network, T: Topic + Unpin + Sync> MempoolExecutor<N, T> {
pub fn new(
blockchain: Arc<RwLock<Blockchain>>,
state: Arc<RwLock<MempoolState>>,
filter: Arc<RwLock<MempoolFilter>>,
network: Arc<N>,
txn_stream: BoxStream<'static, (Transaction, <N as Network>::PubsubId)>,
verification_tasks: Arc<AtomicU32>,
) -> Self {
Self {
blockchain: blockchain.clone(),
state,
filter,
network,
network_id: Arc::new(blockchain.read().network_id),
verification_tasks: Arc::new(AtomicU32::new(0)),
verification_tasks,
txn_stream,
_phantom: PhantomData,
}
}
}

impl<N: Network> Future for MempoolExecutor<N> {
impl<N: Network, T: Topic + Unpin + Sync> Future for MempoolExecutor<N, T> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down Expand Up @@ -103,7 +109,7 @@ impl<N: Network> Future for MempoolExecutor<N> {
}
};

network.validate_message::<TransactionTopic>(pubsub_id, acceptance);
network.validate_message::<T>(pubsub_id, acceptance);

tasks_count.fetch_sub(1, AtomicOrdering::SeqCst);
});
Expand Down
6 changes: 6 additions & 0 deletions mempool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
#[macro_use]
extern crate log;

/// Mempool transaction module
mod mempool_transactions;

/// Mempool state module
mod mempool_state;

/// Mempool config module
pub mod config;
/// Mempool executor module
Expand Down
Loading