Skip to content

Commit

Permalink
Event callbacks for Network and Chain Events (#2598)
Browse files Browse the repository at this point in the history
* Add hooks for network and chain events. Move logging to an EventLogger
* implement webhooks
* fix failing test
* remove unnecessary 'pub'
* add some metadata to the json payload
* avoid unecessary init
* resolve conflicts
  • Loading branch information
mcdallas authored and ignopeverell committed Feb 26, 2019
1 parent c388e08 commit d560a36
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 68 deletions.
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ serde_derive = "1"
siphasher = "0.2"
uuid = { version = "0.6", features = ["serde", "v4"] }
log = "0.4"
chrono = "0.4.4"
chrono = { version = "0.4.4", features = ["serde"] }

grin_keychain = { path = "../keychain", version = "1.1.0" }
grin_util = { path = "../util", version = "1.1.0" }
Expand Down
4 changes: 2 additions & 2 deletions core/src/core/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl Hashed for HeaderEntry {
}

/// Block header, fairly standard compared to other blockchains.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct BlockHeader {
/// Version of the block
pub version: u16,
Expand Down Expand Up @@ -346,7 +346,7 @@ impl BlockHeader {
/// non-explicit, assumed to be deducible from block height (similar to
/// bitcoin's schedule) and expressed as a global transaction fee (added v.H),
/// additive to the total of fees ever collected.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct Block {
/// The header with metadata and commitments to the rest of the data
pub header: BlockHeader,
Expand Down
4 changes: 2 additions & 2 deletions core/src/pow/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl<'de> de::Visitor<'de> for DiffVisitor {
}

/// Block header information pertaining to the proof of work
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct ProofOfWork {
/// Total accumulated difficulty since genesis block
pub total_difficulty: Difficulty,
Expand Down Expand Up @@ -316,7 +316,7 @@ impl ProofOfWork {
/// them at their exact bit size. The resulting bit sequence is padded to be
/// byte-aligned.
///
#[derive(Clone, PartialOrd, PartialEq)]
#[derive(Clone, PartialOrd, PartialEq, Serialize)]
pub struct Proof {
/// Power of 2 used for the size of the cuckoo graph
pub edge_bits: u8,
Expand Down
1 change: 1 addition & 0 deletions servers/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@
pub mod adapters;
pub mod stats;
pub mod types;
pub mod hooks;
109 changes: 47 additions & 62 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::thread;
use std::time::Instant;

use crate::chain::{self, BlockStatus, ChainAdapter, Options};
use crate::common::hooks::{ChainEvents, NetEvents};
use crate::common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus};
use crate::core::core::hash::{Hash, Hashed};
use crate::core::core::transaction::Transaction;
Expand All @@ -47,6 +48,7 @@ pub struct NetToChainAdapter {
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
peers: OneTime<Weak<p2p::Peers>>,
config: ServerConfig,
hooks: Vec<Box<dyn NetEvents + Send + Sync>>,
}

impl p2p::ChainAdapter for NetToChainAdapter {
Expand Down Expand Up @@ -86,16 +88,13 @@ impl p2p::ChainAdapter for NetToChainAdapter {
identifier: "?.?.?.?".to_string(),
};

let tx_hash = tx.hash();
let header = self.chain().head_header().unwrap();

debug!(
"Received tx {}, [in/out/kern: {}/{}/{}] going to process.",
tx_hash,
tx.inputs().len(),
tx.outputs().len(),
tx.kernels().len(),
);
for hook in &self.hooks {
hook.on_transaction_received(&tx);
}

let tx_hash = tx.hash();

let res = {
let mut tx_pool = self.tx_pool.write();
Expand All @@ -108,35 +107,24 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}

fn block_received(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool {
debug!(
"Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.",
b.hash(),
b.header.height,
addr,
b.inputs().len(),
b.outputs().len(),
b.kernels().len(),
);
for hook in &self.hooks {
hook.on_block_received(&b, &addr);
}

self.process_block(b, addr, was_requested)
}

fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool {
let bhash = cb.hash();
debug!(
"Received compact_block {} at {} from {} [out/kern/kern_ids: {}/{}/{}] going to process.",
bhash,
cb.header.height,
addr,
cb.out_full().len(),
cb.kern_full().len(),
cb.kern_ids().len(),
);

let cb_hash = cb.hash();
if cb.kern_ids().is_empty() {
// push the freshly hydrated block through the chain pipeline
match core::Block::hydrate_from(cb, vec![]) {
Ok(block) => self.process_block(block, addr, false),
Ok(block) => {
for hook in &self.hooks {
hook.on_block_received(&block, &addr);
}
self.process_block(block, addr, false)
}
Err(e) => {
debug!("Invalid hydrated block {}: {:?}", cb_hash, e);
return false;
Expand Down Expand Up @@ -170,7 +158,12 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// 3) we hydrate an invalid block (peer sent us a "bad" compact block) - [TBD]

let block = match core::Block::hydrate_from(cb.clone(), txs) {
Ok(block) => block,
Ok(block) => {
for hook in &self.hooks {
hook.on_block_received(&block, &addr);
}
block
}
Err(e) => {
debug!("Invalid hydrated block {}: {:?}", cb.hash(), e);
return false;
Expand Down Expand Up @@ -202,11 +195,9 @@ impl p2p::ChainAdapter for NetToChainAdapter {
}

fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool {
let bhash = bh.hash();
debug!(
"Received block header {} at {} from {}, going to process.",
bhash, bh.height, addr,
);
for hook in &self.hooks {
hook.on_header_received(&bh, &addr);
}

// pushing the new block header through the header chain pipeline
// we will go ask for the block if this is a new header
Expand All @@ -215,7 +206,11 @@ impl p2p::ChainAdapter for NetToChainAdapter {
.process_block_header(&bh, self.chain_opts(false));

if let &Err(ref e) = &res {
debug!("Block header {} refused by chain: {:?}", bhash, e.kind());
debug!(
"Block header {} refused by chain: {:?}",
bh.hash(),
e.kind()
);
if e.is_bad_data() {
return false;
} else {
Expand All @@ -240,6 +235,12 @@ impl p2p::ChainAdapter for NetToChainAdapter {
return false;
}

for header in bhs.iter() {
for hook in &self.hooks {
hook.on_header_received(&header, &addr);
}
}

// try to add headers to our header chain
let res = self.chain().sync_block_headers(bhs, self.chain_opts(true));
if let &Err(ref e) = &res {
Expand Down Expand Up @@ -372,6 +373,7 @@ impl NetToChainAdapter {
tx_pool: Arc<RwLock<pool::TransactionPool>>,
verifier_cache: Arc<RwLock<dyn VerifierCache>>,
config: ServerConfig,
hooks: Vec<Box<dyn NetEvents + Send + Sync>>,
) -> NetToChainAdapter {
NetToChainAdapter {
sync_state,
Expand All @@ -380,6 +382,7 @@ impl NetToChainAdapter {
verifier_cache,
peers: OneTime::new(),
config,
hooks,
}
}

Expand Down Expand Up @@ -608,35 +611,13 @@ impl NetToChainAdapter {
pub struct ChainToPoolAndNetAdapter {
tx_pool: Arc<RwLock<pool::TransactionPool>>,
peers: OneTime<Weak<p2p::Peers>>,
hooks: Vec<Box<dyn ChainEvents + Send + Sync>>,
}

impl ChainAdapter for ChainToPoolAndNetAdapter {
fn block_accepted(&self, b: &core::Block, status: BlockStatus, opts: Options) {
match status {
BlockStatus::Reorg => {
warn!(
"block_accepted (REORG!): {:?} at {} (diff: {})",
b.hash(),
b.header.height,
b.header.total_difficulty(),
);
}
BlockStatus::Fork => {
debug!(
"block_accepted (fork?): {:?} at {} (diff: {})",
b.hash(),
b.header.height,
b.header.total_difficulty(),
);
}
BlockStatus::Next => {
debug!(
"block_accepted (head+): {:?} at {} (diff: {})",
b.hash(),
b.header.height,
b.header.total_difficulty(),
);
}
for hook in &self.hooks {
hook.on_block_accepted(b, &status);
}

// not broadcasting blocks received through sync
Expand Down Expand Up @@ -675,10 +656,14 @@ impl ChainAdapter for ChainToPoolAndNetAdapter {

impl ChainToPoolAndNetAdapter {
/// Construct a ChainToPoolAndNetAdapter instance.
pub fn new(tx_pool: Arc<RwLock<pool::TransactionPool>>) -> ChainToPoolAndNetAdapter {
pub fn new(
tx_pool: Arc<RwLock<pool::TransactionPool>>,
hooks: Vec<Box<dyn ChainEvents + Send + Sync>>,
) -> ChainToPoolAndNetAdapter {
ChainToPoolAndNetAdapter {
tx_pool,
peers: OneTime::new(),
hooks: hooks,
}
}

Expand Down
Loading

0 comments on commit d560a36

Please sign in to comment.