Skip to content

Commit

Permalink
Always stem local txs if configured that way (unless explicitly fluff…
Browse files Browse the repository at this point in the history
…ed) (#2876)

* always stem local txs if configured that way (unless explicitly fluff from wallet)
this overrides current epoch behavior for txs coming in via "push-api"
rename "local" to "our" txs

* TxSource is now an enum for type safety.
  • Loading branch information
antiochp authored Jul 4, 2019
1 parent 8277516 commit f4eb3e3
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 75 deletions.
5 changes: 1 addition & 4 deletions api/src/handlers/pool_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,7 @@ impl PoolPushHandler {
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into())
})
.and_then(move |tx: Transaction| {
let source = pool::TxSource {
debug_name: "push-api".to_string(),
identifier: "?.?.?.?".to_string(),
};
let source = pool::TxSource::PushApi;
info!(
"Pushing transaction {} to pool (inputs: {}, outputs: {}, kernels: {})",
tx.hash(),
Expand Down
8 changes: 8 additions & 0 deletions config/src/comments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ fn comments() -> HashMap<String, String> {
.to_string(),
);

retval.insert(
"always_stem_our_txs".to_string(),
"
#always stem our (pushed via api) txs regardless of stem/fluff epoch (as per Dandelion++ paper)
"
.to_string(),
);

retval.insert(
"[server.p2p_config]".to_string(),
"#test miner wallet URL (burns if this doesn't exist)
Expand Down
4 changes: 2 additions & 2 deletions pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ impl Pool {

fn log_pool_add(&self, entry: &PoolEntry, header: &BlockHeader) {
debug!(
"add_to_pool [{}]: {} ({}) [in/out/kern: {}/{}/{}] pool: {} (at block {})",
"add_to_pool [{}]: {} ({:?}) [in/out/kern: {}/{}/{}] pool: {} (at block {})",
self.name,
entry.tx.hash(),
entry.src.debug_name,
entry.src,
entry.tx.inputs().len(),
entry.tx.outputs().len(),
entry.tx.kernels().len(),
Expand Down
6 changes: 3 additions & 3 deletions pool/src/transaction_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl TransactionPool {
tx.validate(Weighting::AsTransaction, self.verifier_cache.clone())?;

entry.tx = tx;
entry.src.debug_name = "deagg".to_string();
entry.src = TxSource::Deaggregate;
}
}
self.txpool.add_to_pool(entry.clone(), vec![], header)?;
Expand Down Expand Up @@ -169,12 +169,12 @@ impl TransactionPool {
if !stem
|| self
.add_to_stempool(entry.clone(), header)
.and_then(|_| self.adapter.stem_tx_accepted(&entry.tx))
.and_then(|_| self.adapter.stem_tx_accepted(&entry))
.is_err()
{
self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry.clone());
self.adapter.tx_accepted(&entry.tx);
self.adapter.tx_accepted(&entry);
}

// Transaction passed all the checks but we have to make space for it
Expand Down
71 changes: 47 additions & 24 deletions pool/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,32 @@ const DANDELION_AGGREGATION_SECS: u16 = 30;
/// Dandelion stem probability (stem 90% of the time, fluff 10%).
const DANDELION_STEM_PROBABILITY: u8 = 90;

/// Always stem our (pushed via api) txs?
/// Defaults to true to match the Dandelion++ paper.
/// But can be overridden to allow a node to fluff our txs if desired.
/// If set to false we will stem/fluff our txs as per current epoch.
const DANDELION_ALWAYS_STEM_OUR_TXS: bool = true;

/// Configuration for "Dandelion".
/// Note: shared between p2p and pool.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct DandelionConfig {
/// Length of each "epoch".
#[serde(default = "default_dandelion_epoch_secs")]
pub epoch_secs: Option<u16>,
pub epoch_secs: u16,
/// Dandelion embargo timer. Fluff and broadcast individual txs if not seen
/// on network before embargo expires.
#[serde(default = "default_dandelion_embargo_secs")]
pub embargo_secs: Option<u16>,
pub embargo_secs: u16,
/// Dandelion aggregation timer.
#[serde(default = "default_dandelion_aggregation_secs")]
pub aggregation_secs: Option<u16>,
pub aggregation_secs: u16,
/// Dandelion stem probability (stem 90% of the time, fluff 10% etc.)
#[serde(default = "default_dandelion_stem_probability")]
pub stem_probability: Option<u8>,
pub stem_probability: u8,
/// Default to always stem our txs as described in Dandelion++ paper.
#[serde(default = "default_dandelion_always_stem_our_txs")]
pub always_stem_our_txs: bool,
}

impl Default for DandelionConfig {
Expand All @@ -65,24 +74,29 @@ impl Default for DandelionConfig {
embargo_secs: default_dandelion_embargo_secs(),
aggregation_secs: default_dandelion_aggregation_secs(),
stem_probability: default_dandelion_stem_probability(),
always_stem_our_txs: default_dandelion_always_stem_our_txs(),
}
}
}

fn default_dandelion_epoch_secs() -> Option<u16> {
Some(DANDELION_EPOCH_SECS)
fn default_dandelion_epoch_secs() -> u16 {
DANDELION_EPOCH_SECS
}

fn default_dandelion_embargo_secs() -> u16 {
DANDELION_EMBARGO_SECS
}

fn default_dandelion_embargo_secs() -> Option<u16> {
Some(DANDELION_EMBARGO_SECS)
fn default_dandelion_aggregation_secs() -> u16 {
DANDELION_AGGREGATION_SECS
}

fn default_dandelion_aggregation_secs() -> Option<u16> {
Some(DANDELION_AGGREGATION_SECS)
fn default_dandelion_stem_probability() -> u8 {
DANDELION_STEM_PROBABILITY
}

fn default_dandelion_stem_probability() -> Option<u8> {
Some(DANDELION_STEM_PROBABILITY)
fn default_dandelion_always_stem_our_txs() -> bool {
DANDELION_ALWAYS_STEM_OUR_TXS
}

/// Transaction pool configuration
Expand Down Expand Up @@ -145,20 +159,29 @@ pub struct PoolEntry {
pub tx: Transaction,
}

/// Placeholder: the data representing where we heard about a tx from.
///
/// Used to make decisions based on transaction acceptance priority from
/// various sources. For example, a node may want to bypass pool size
/// restrictions when accepting a transaction from a local wallet.
///
/// Most likely this will evolve to contain some sort of network identifier,
/// once we get a better sense of what transaction building might look like.
#[derive(Clone, Debug)]
pub struct TxSource {
/// Human-readable name used for logging and errors.
pub debug_name: String,
/// Unique identifier used to distinguish this peer from others.
pub identifier: String,
#[derive(Clone, Debug, PartialEq)]
pub enum TxSource {
PushApi,
Broadcast,
Fluff,
EmbargoExpired,
Deaggregate,
}

impl TxSource {
/// Convenience fn for checking if this tx was sourced via the push api.
pub fn is_pushed(&self) -> bool {
match self {
TxSource::PushApi => true,
_ => false,
}
}
}

/// Possible errors when interacting with the transaction pool.
Expand Down Expand Up @@ -250,19 +273,19 @@ pub trait BlockChain: Sync + Send {
/// importantly the broadcasting of transactions to our peers.
pub trait PoolAdapter: Send + Sync {
/// The transaction pool has accepted this transaction as valid.
fn tx_accepted(&self, tx: &transaction::Transaction);
fn tx_accepted(&self, entry: &PoolEntry);

/// The stem transaction pool has accepted this transactions as valid.
fn stem_tx_accepted(&self, tx: &transaction::Transaction) -> Result<(), PoolError>;
fn stem_tx_accepted(&self, entry: &PoolEntry) -> Result<(), PoolError>;
}

/// Dummy adapter used as a placeholder for real implementations
#[allow(dead_code)]
pub struct NoopAdapter {}

impl PoolAdapter for NoopAdapter {
fn tx_accepted(&self, _tx: &transaction::Transaction) {}
fn stem_tx_accepted(&self, _tx: &transaction::Transaction) -> Result<(), PoolError> {
fn tx_accepted(&self, _entry: &PoolEntry) {}
fn stem_tx_accepted(&self, _entry: &PoolEntry) -> Result<(), PoolError> {
Ok(())
}
}
5 changes: 1 addition & 4 deletions pool/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,7 @@ where
}

pub fn test_source() -> TxSource {
TxSource {
debug_name: format!("test"),
identifier: format!("127.0.0.1"),
}
TxSource::Broadcast
}

pub fn clean_output_dir(db_root: String) {
Expand Down
6 changes: 4 additions & 2 deletions pool/tests/transaction_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ use self::core::core::{transaction, Block, BlockHeader, Weighting};
use self::core::libtx;
use self::core::pow::Difficulty;
use self::keychain::{ExtKeychain, Keychain};
use self::pool::TxSource;
use self::util::RwLock;
use crate::common::*;
use grin_core as core;
use grin_keychain as keychain;
use grin_pool as pool;
use grin_util as util;
use std::sync::Arc;

Expand Down Expand Up @@ -237,7 +239,7 @@ fn test_the_transaction_pool() {
assert_eq!(write_pool.total_size(), 6);
let entry = write_pool.txpool.entries.last().unwrap();
assert_eq!(entry.tx.kernels().len(), 1);
assert_eq!(entry.src.debug_name, "deagg");
assert_eq!(entry.src, TxSource::Deaggregate);
}

// Check we cannot "double spend" an output spent in a previous block.
Expand Down Expand Up @@ -447,7 +449,7 @@ fn test_the_transaction_pool() {
assert_eq!(write_pool.total_size(), 6);
let entry = write_pool.txpool.entries.last().unwrap();
assert_eq!(entry.tx.kernels().len(), 1);
assert_eq!(entry.src.debug_name, "deagg");
assert_eq!(entry.src, TxSource::Deaggregate);
}

// Check we cannot "double spend" an output spent in a previous block.
Expand Down
19 changes: 8 additions & 11 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::core::{core, global};
use crate::p2p;
use crate::p2p::types::PeerInfo;
use crate::pool;
use crate::pool::types::DandelionConfig;
use crate::util::OneTime;
use chrono::prelude::*;
use chrono::Duration;
Expand Down Expand Up @@ -97,10 +96,7 @@ impl p2p::ChainAdapter for NetToChainAdapter {
return Ok(true);
}

let source = pool::TxSource {
debug_name: "p2p".to_string(),
identifier: "?.?.?.?".to_string(),
};
let source = pool::TxSource::Broadcast;

let header = self.chain().head_header()?;

Expand Down Expand Up @@ -804,21 +800,22 @@ impl DandelionAdapter for PoolToNetAdapter {
}

impl pool::PoolAdapter for PoolToNetAdapter {
fn tx_accepted(&self, tx: &core::Transaction) {
self.peers().broadcast_transaction(tx);
fn tx_accepted(&self, entry: &pool::PoolEntry) {
self.peers().broadcast_transaction(&entry.tx);
}

fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> {
fn stem_tx_accepted(&self, entry: &pool::PoolEntry) -> Result<(), pool::PoolError> {
// Take write lock on the current epoch.
// We need to be able to update the current relay peer if not currently connected.
let mut epoch = self.dandelion_epoch.write();

// If "stem" epoch attempt to relay the tx to the next Dandelion relay.
// Fallback to immediately fluffing the tx if we cannot stem for any reason.
// If "fluff" epoch then nothing to do right now (fluff via Dandelion monitor).
if epoch.is_stem() {
// If node is configured to always stem our (pushed via api) txs then do so.
if epoch.is_stem() || (entry.src.is_pushed() && epoch.always_stem_our_txs()) {
if let Some(peer) = epoch.relay_peer(&self.peers()) {
match peer.send_stem_transaction(tx) {
match peer.send_stem_transaction(&entry.tx) {
Ok(_) => {
info!("Stemming this epoch, relaying to next peer.");
Ok(())
Expand All @@ -841,7 +838,7 @@ impl pool::PoolAdapter for PoolToNetAdapter {

impl PoolToNetAdapter {
/// Create a new pool to net adapter
pub fn new(config: DandelionConfig) -> PoolToNetAdapter {
pub fn new(config: pool::DandelionConfig) -> PoolToNetAdapter {
PoolToNetAdapter {
peers: OneTime::new(),
dandelion_epoch: Arc::new(RwLock::new(DandelionEpoch::new(config))),
Expand Down
14 changes: 8 additions & 6 deletions servers/src/common/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,8 @@ impl DandelionEpoch {
match self.start_time {
None => true,
Some(start_time) => {
let epoch_secs = self.config.epoch_secs.expect("epoch_secs config missing") as i64;
Utc::now().timestamp().saturating_sub(start_time) > epoch_secs
let epoch_secs = self.config.epoch_secs;
Utc::now().timestamp().saturating_sub(start_time) > epoch_secs as i64
}
}
}
Expand All @@ -511,10 +511,7 @@ impl DandelionEpoch {

// If stem_probability == 90 then we stem 90% of the time.
let mut rng = rand::thread_rng();
let stem_probability = self
.config
.stem_probability
.expect("stem_probability config missing");
let stem_probability = self.config.stem_probability;
self.is_stem = rng.gen_range(0, 100) < stem_probability;

let addr = self.relay_peer.clone().map(|p| p.info.addr);
Expand All @@ -529,6 +526,11 @@ impl DandelionEpoch {
self.is_stem
}

/// Always stem our (pushed via api) txs regardless of stem/fluff epoch?
pub fn always_stem_our_txs(&self) -> bool {
self.config.always_stem_our_txs
}

/// What is our current relay peer?
/// If it is not connected then choose a new one.
pub fn relay_peer(&mut self, peers: &Arc<p2p::Peers>) -> Option<Arc<p2p::Peer>> {
Expand Down
23 changes: 4 additions & 19 deletions servers/src/grin/dandelion_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ fn process_fluff_phase(
return Ok(());
}

let cutoff_secs = dandelion_config
.aggregation_secs
.expect("aggregation secs config missing");
let cutoff_secs = dandelion_config.aggregation_secs;
let cutoff_entries = select_txs_cutoff(&tx_pool.stempool, cutoff_secs);

// If epoch is expired, fluff *all* outstanding entries in stempool.
Expand Down Expand Up @@ -149,12 +147,7 @@ fn process_fluff_phase(
verifier_cache.clone(),
)?;

let src = TxSource {
debug_name: "fluff".to_string(),
identifier: "?.?.?.?".to_string(),
};

tx_pool.add_to_pool(src, agg_tx, false, &header)?;
tx_pool.add_to_pool(TxSource::Fluff, agg_tx, false, &header)?;
Ok(())
}

Expand All @@ -165,10 +158,7 @@ fn process_expired_entries(
// Take a write lock on the txpool for the duration of this processing.
let mut tx_pool = tx_pool.write();

let embargo_secs = dandelion_config
.embargo_secs
.expect("embargo_secs config missing")
+ thread_rng().gen_range(0, 31);
let embargo_secs = dandelion_config.embargo_secs + thread_rng().gen_range(0, 31);
let expired_entries = select_txs_cutoff(&tx_pool.stempool, embargo_secs);

if expired_entries.is_empty() {
Expand All @@ -179,14 +169,9 @@ fn process_expired_entries(

let header = tx_pool.chain_head()?;

let src = TxSource {
debug_name: "embargo_expired".to_string(),
identifier: "?.?.?.?".to_string(),
};

for entry in expired_entries {
let txhash = entry.tx.hash();
match tx_pool.add_to_pool(src.clone(), entry.tx, false, &header) {
match tx_pool.add_to_pool(TxSource::EmbargoExpired, entry.tx, false, &header) {
Ok(_) => info!(
"dand_mon: embargo expired for {}, fluffed successfully.",
txhash
Expand Down

0 comments on commit f4eb3e3

Please sign in to comment.