Skip to content

Commit

Permalink
Dandelion++ Rewrite (#2628)
Browse files Browse the repository at this point in the history
* reworked the dandelion rewrite (dandelion++)

* fallback to fluff/broadcast if we cannot stem the tx for any reason

* rework stem vs fluff logic during accepting tx

* cleanup docs

* add is_stem to logging

* cleanup

* rustfmt

* cleanup monitor and logging

* rework dandelion monitor to use simple cutoff for aggregation

* transition to next epoch *after* processing tx
so we fluff final outstanding txs

* fluff all txs in stempool if any are older than 30s
aggressively aggregate when we can

* fix rebase onto 1.1.0

* default config comments for Dandelion

* fix code to reflect our tests - fallback to txpool on stempool error

* log fluff and expire errors in dandelion monitor

* cleanup

* fix off by one

* cleanup

* cleanup

* various fixes

* one less clone

* cleanup
  • Loading branch information
antiochp authored Mar 20, 2019
1 parent 16487a3 commit a2adf2d
Show file tree
Hide file tree
Showing 12 changed files with 333 additions and 372 deletions.
13 changes: 7 additions & 6 deletions config/src/comments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,28 +141,29 @@ fn comments() -> HashMap<String, String> {
);

retval.insert(
"relay_secs".to_string(),
"epoch_secs".to_string(),
"
#dandelion relay time (choose new relay peer every n secs)
#dandelion epoch duration
"
.to_string(),
);

retval.insert(
"embargo_secs".to_string(),
"aggregation_secs".to_string(),
"
#fluff and broadcast after embargo expires if tx not seen on network
#dandelion aggregation period in secs
"
.to_string(),
);

retval.insert(
"patience_secs".to_string(),
"embargo_secs".to_string(),
"
#run dandelion stem/fluff processing every n secs (stem tx aggregation in this window)
#fluff and broadcast after embargo expires if tx not seen on network
"
.to_string(),
);

retval.insert(
"stem_probability".to_string(),
"
Expand Down
7 changes: 7 additions & 0 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use crate::util::{Mutex, RwLock};
use std::fmt;
use std::fs::File;
use std::net::{Shutdown, TcpStream};
use std::sync::Arc;
Expand Down Expand Up @@ -54,6 +55,12 @@ pub struct Peer {
connection: Option<Mutex<conn::Tracker>>,
}

impl fmt::Debug for Peer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Peer({:?})", &self.info)
}
}

impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, adapter: Arc<dyn NetAdapter>) -> Peer {
Expand Down
55 changes: 0 additions & 55 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub struct Peers {
pub adapter: Arc<dyn ChainAdapter>,
store: PeerStore,
peers: RwLock<HashMap<PeerAddr, Arc<Peer>>>,
dandelion_relay: RwLock<Option<(i64, Arc<Peer>)>>,
config: P2PConfig,
}

Expand All @@ -48,7 +47,6 @@ impl Peers {
store,
config,
peers: RwLock::new(HashMap::new()),
dandelion_relay: RwLock::new(None),
}
}

Expand Down Expand Up @@ -87,39 +85,6 @@ impl Peers {
self.save_peer(&peer_data)
}

// Update the dandelion relay
pub fn update_dandelion_relay(&self) {
let peers = self.outgoing_connected_peers();

let peer = &self
.config
.dandelion_peer
.and_then(|ip| peers.iter().find(|x| x.info.addr == ip))
.or(thread_rng().choose(&peers));

match peer {
Some(peer) => self.set_dandelion_relay(peer),
None => debug!("Could not update dandelion relay"),
}
}

fn set_dandelion_relay(&self, peer: &Arc<Peer>) {
// Clear the map and add new relay
let dandelion_relay = &self.dandelion_relay;
dandelion_relay
.write()
.replace((Utc::now().timestamp(), peer.clone()));
debug!(
"Successfully updated Dandelion relay to: {}",
peer.info.addr
);
}

// Get the dandelion relay
pub fn get_dandelion_relay(&self) -> Option<(i64, Arc<Peer>)> {
self.dandelion_relay.read().clone()
}

pub fn is_known(&self, addr: PeerAddr) -> bool {
self.peers.read().contains_key(&addr)
}
Expand Down Expand Up @@ -335,26 +300,6 @@ impl Peers {
);
}

/// Relays the provided stem transaction to our single stem peer.
pub fn relay_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
self.get_dandelion_relay()
.or_else(|| {
debug!("No dandelion relay, updating.");
self.update_dandelion_relay();
self.get_dandelion_relay()
})
// If still return an error, let the caller handle this as they see fit.
// The caller will "fluff" at this point as the stem phase is finished.
.ok_or(Error::NoDandelionRelay)
.map(|(_, relay)| {
if relay.is_connected() {
if let Err(e) = relay.send_stem_transaction(tx) {
debug!("Error sending stem transaction to peer relay: {:?}", e);
}
}
})
}

/// Broadcasts the provided transaction to PEER_PREFERRED_COUNT of our
/// peers. We may be connected to PEER_MAX_COUNT peers so we only
/// want to broadcast to a random subset of peers.
Expand Down
3 changes: 2 additions & 1 deletion pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ mod pool;
pub mod transaction_pool;
pub mod types;

pub use crate::pool::Pool;
pub use crate::transaction_pool::TransactionPool;
pub use crate::types::{
BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntryState, PoolError, TxSource,
BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource,
};
37 changes: 5 additions & 32 deletions pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use self::core::core::{
Block, BlockHeader, BlockSums, Committed, Transaction, TxKernel, Weighting,
};
use self::util::RwLock;
use crate::types::{BlockChain, PoolEntry, PoolEntryState, PoolError};
use crate::types::{BlockChain, PoolEntry, PoolError};
use grin_core as core;
use grin_util as util;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -139,7 +139,7 @@ impl Pool {
// Verify these txs produce an aggregated tx below max tx weight.
// Return a vec of all the valid txs.
let txs = self.validate_raw_txs(
tx_buckets,
&tx_buckets,
None,
&header,
Weighting::AsLimitedTransaction { max_weight },
Expand Down Expand Up @@ -167,33 +167,6 @@ impl Pool {
Ok(Some(tx))
}

pub fn select_valid_transactions(
&self,
txs: Vec<Transaction>,
extra_tx: Option<Transaction>,
header: &BlockHeader,
) -> Result<Vec<Transaction>, PoolError> {
let valid_txs = self.validate_raw_txs(txs, extra_tx, header, Weighting::NoLimit)?;
Ok(valid_txs)
}

pub fn get_transactions_in_state(&self, state: PoolEntryState) -> Vec<Transaction> {
self.entries
.iter()
.filter(|x| x.state == state)
.map(|x| x.tx.clone())
.collect::<Vec<_>>()
}

// Transition the specified pool entries to the new state.
pub fn transition_to_state(&mut self, txs: &[Transaction], state: PoolEntryState) {
for x in &mut self.entries {
if txs.contains(&x.tx) {
x.state = state;
}
}
}

// Aggregate this new tx with all existing txs in the pool.
// If we can validate the aggregated tx against the current chain state
// then we can safely add the tx to the pool.
Expand Down Expand Up @@ -267,9 +240,9 @@ impl Pool {
Ok(new_sums)
}

fn validate_raw_txs(
pub fn validate_raw_txs(
&self,
txs: Vec<Transaction>,
txs: &[Transaction],
extra_tx: Option<Transaction>,
header: &BlockHeader,
weighting: Weighting,
Expand All @@ -289,7 +262,7 @@ impl Pool {

// We know the tx is valid if the entire aggregate tx is valid.
if self.validate_raw_tx(&agg_tx, header, weighting).is_ok() {
valid_txs.push(tx);
valid_txs.push(tx.clone());
}
}

Expand Down
36 changes: 13 additions & 23 deletions pool/src/transaction_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use self::core::core::verifier_cache::VerifierCache;
use self::core::core::{transaction, Block, BlockHeader, Transaction, Weighting};
use self::util::RwLock;
use crate::pool::Pool;
use crate::types::{
BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource,
};
use crate::types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource};
use chrono::prelude::*;
use grin_core as core;
use grin_util as util;
Expand Down Expand Up @@ -76,13 +74,10 @@ impl TransactionPool {
self.blockchain.chain_head()
}

// Add tx to stempool (passing in all txs from txpool to validate against).
fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> {
// Add tx to stempool (passing in all txs from txpool to validate against).
self.stempool
.add_to_pool(entry, self.txpool.all_transactions(), header)?;

// Note: we do not notify the adapter here,
// we let the dandelion monitor handle this.
Ok(())
}

Expand Down Expand Up @@ -124,8 +119,6 @@ impl TransactionPool {
let txpool_tx = self.txpool.all_transactions_aggregate()?;
self.stempool.reconcile(txpool_tx, header)?;
}

self.adapter.tx_accepted(&entry.tx);
Ok(())
}

Expand Down Expand Up @@ -159,28 +152,25 @@ impl TransactionPool {
self.blockchain.verify_coinbase_maturity(&tx)?;

let entry = PoolEntry {
state: PoolEntryState::Fresh,
src,
tx_at: Utc::now(),
tx,
};

// If we are in "stem" mode then check if this is a new tx or if we have seen it before.
// If new tx - add it to our stempool.
// If we have seen any of the kernels before then fallback to fluff,
// adding directly to txpool.
if stem
&& self
.stempool
.find_matching_transactions(entry.tx.kernels())
.is_empty()
// If not stem then we are fluff.
// If this is a stem tx then attempt to stem.
// Any problems during stem, fallback to fluff.
if !stem
|| self
.add_to_stempool(entry.clone(), header)
.and_then(|_| self.adapter.stem_tx_accepted(&entry.tx))
.is_err()
{
self.add_to_stempool(entry, header)?;
return Ok(());
self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry.clone());
self.adapter.tx_accepted(&entry.tx);
}

self.add_to_txpool(entry.clone(), header)?;
self.add_to_reorg_cache(entry);
Ok(())
}

Expand Down
Loading

0 comments on commit a2adf2d

Please sign in to comment.