From d560a36dd6408685a611d1fcb996e768e404ba41 Mon Sep 17 00:00:00 2001 From: Mike Dallas Date: Tue, 26 Feb 2019 22:24:50 +0200 Subject: [PATCH] Event callbacks for Network and Chain Events (#2598) * Add hooks for network and chain events. Move logging to an EventLogger * implement webhooks * fix failing test * remove unnecessary 'pub' * add some metadata to the json payload * avoid unecessary init * resolve conflicts --- core/Cargo.toml | 2 +- core/src/core/block.rs | 4 +- core/src/pow/types.rs | 4 +- servers/src/common.rs | 1 + servers/src/common/adapters.rs | 109 +++++------- servers/src/common/hooks.rs | 305 +++++++++++++++++++++++++++++++++ servers/src/common/types.rs | 29 ++++ servers/src/grin/server.rs | 7 +- 8 files changed, 393 insertions(+), 68 deletions(-) create mode 100644 servers/src/common/hooks.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index a58b3ff370..29079b7511 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -26,7 +26,7 @@ serde_derive = "1" siphasher = "0.2" uuid = { version = "0.6", features = ["serde", "v4"] } log = "0.4" -chrono = "0.4.4" +chrono = { version = "0.4.4", features = ["serde"] } grin_keychain = { path = "../keychain", version = "1.1.0" } grin_util = { path = "../util", version = "1.1.0" } diff --git a/core/src/core/block.rs b/core/src/core/block.rs index 5f1887af04..838d8478a7 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -168,7 +168,7 @@ impl Hashed for HeaderEntry { } /// Block header, fairly standard compared to other blockchains. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize)] pub struct BlockHeader { /// Version of the block pub version: u16, @@ -346,7 +346,7 @@ impl BlockHeader { /// non-explicit, assumed to be deducible from block height (similar to /// bitcoin's schedule) and expressed as a global transaction fee (added v.H), /// additive to the total of fees ever collected. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub struct Block { /// The header with metadata and commitments to the rest of the data pub header: BlockHeader, diff --git a/core/src/pow/types.rs b/core/src/pow/types.rs index 49e53a76da..99c97ef120 100644 --- a/core/src/pow/types.rs +++ b/core/src/pow/types.rs @@ -215,7 +215,7 @@ impl<'de> de::Visitor<'de> for DiffVisitor { } /// Block header information pertaining to the proof of work -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize)] pub struct ProofOfWork { /// Total accumulated difficulty since genesis block pub total_difficulty: Difficulty, @@ -316,7 +316,7 @@ impl ProofOfWork { /// them at their exact bit size. The resulting bit sequence is padded to be /// byte-aligned. /// -#[derive(Clone, PartialOrd, PartialEq)] +#[derive(Clone, PartialOrd, PartialEq, Serialize)] pub struct Proof { /// Power of 2 used for the size of the cuckoo graph pub edge_bits: u8, diff --git a/servers/src/common.rs b/servers/src/common.rs index 6aa35e0e2d..a1fca442d2 100644 --- a/servers/src/common.rs +++ b/servers/src/common.rs @@ -17,3 +17,4 @@ pub mod adapters; pub mod stats; pub mod types; +pub mod hooks; diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index ae160267b6..d684fb31fa 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -22,6 +22,7 @@ use std::thread; use std::time::Instant; use crate::chain::{self, BlockStatus, ChainAdapter, Options}; +use crate::common::hooks::{ChainEvents, NetEvents}; use crate::common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus}; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::transaction::Transaction; @@ -47,6 +48,7 @@ pub struct NetToChainAdapter { verifier_cache: Arc>, peers: OneTime>, config: ServerConfig, + hooks: Vec>, } impl p2p::ChainAdapter for NetToChainAdapter { @@ -86,16 +88,13 @@ impl p2p::ChainAdapter for NetToChainAdapter { identifier: "?.?.?.?".to_string(), }; - let tx_hash = tx.hash(); let header = self.chain().head_header().unwrap(); - debug!( - "Received tx {}, [in/out/kern: {}/{}/{}] going to process.", - tx_hash, - tx.inputs().len(), - tx.outputs().len(), - tx.kernels().len(), - ); + for hook in &self.hooks { + hook.on_transaction_received(&tx); + } + + let tx_hash = tx.hash(); let res = { let mut tx_pool = self.tx_pool.write(); @@ -108,35 +107,24 @@ impl p2p::ChainAdapter for NetToChainAdapter { } fn block_received(&self, b: core::Block, addr: PeerAddr, was_requested: bool) -> bool { - debug!( - "Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.", - b.hash(), - b.header.height, - addr, - b.inputs().len(), - b.outputs().len(), - b.kernels().len(), - ); + for hook in &self.hooks { + hook.on_block_received(&b, &addr); + } + self.process_block(b, addr, was_requested) } fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool { - let bhash = cb.hash(); - debug!( - "Received compact_block {} at {} from {} [out/kern/kern_ids: {}/{}/{}] going to process.", - bhash, - cb.header.height, - addr, - cb.out_full().len(), - cb.kern_full().len(), - cb.kern_ids().len(), - ); - let cb_hash = cb.hash(); if cb.kern_ids().is_empty() { // push the freshly hydrated block through the chain pipeline match core::Block::hydrate_from(cb, vec![]) { - Ok(block) => self.process_block(block, addr, false), + Ok(block) => { + for hook in &self.hooks { + hook.on_block_received(&block, &addr); + } + self.process_block(block, addr, false) + } Err(e) => { debug!("Invalid hydrated block {}: {:?}", cb_hash, e); return false; @@ -170,7 +158,12 @@ impl p2p::ChainAdapter for NetToChainAdapter { // 3) we hydrate an invalid block (peer sent us a "bad" compact block) - [TBD] let block = match core::Block::hydrate_from(cb.clone(), txs) { - Ok(block) => block, + Ok(block) => { + for hook in &self.hooks { + hook.on_block_received(&block, &addr); + } + block + } Err(e) => { debug!("Invalid hydrated block {}: {:?}", cb.hash(), e); return false; @@ -202,11 +195,9 @@ impl p2p::ChainAdapter for NetToChainAdapter { } fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool { - let bhash = bh.hash(); - debug!( - "Received block header {} at {} from {}, going to process.", - bhash, bh.height, addr, - ); + for hook in &self.hooks { + hook.on_header_received(&bh, &addr); + } // pushing the new block header through the header chain pipeline // we will go ask for the block if this is a new header @@ -215,7 +206,11 @@ impl p2p::ChainAdapter for NetToChainAdapter { .process_block_header(&bh, self.chain_opts(false)); if let &Err(ref e) = &res { - debug!("Block header {} refused by chain: {:?}", bhash, e.kind()); + debug!( + "Block header {} refused by chain: {:?}", + bh.hash(), + e.kind() + ); if e.is_bad_data() { return false; } else { @@ -240,6 +235,12 @@ impl p2p::ChainAdapter for NetToChainAdapter { return false; } + for header in bhs.iter() { + for hook in &self.hooks { + hook.on_header_received(&header, &addr); + } + } + // try to add headers to our header chain let res = self.chain().sync_block_headers(bhs, self.chain_opts(true)); if let &Err(ref e) = &res { @@ -372,6 +373,7 @@ impl NetToChainAdapter { tx_pool: Arc>, verifier_cache: Arc>, config: ServerConfig, + hooks: Vec>, ) -> NetToChainAdapter { NetToChainAdapter { sync_state, @@ -380,6 +382,7 @@ impl NetToChainAdapter { verifier_cache, peers: OneTime::new(), config, + hooks, } } @@ -608,35 +611,13 @@ impl NetToChainAdapter { pub struct ChainToPoolAndNetAdapter { tx_pool: Arc>, peers: OneTime>, + hooks: Vec>, } impl ChainAdapter for ChainToPoolAndNetAdapter { fn block_accepted(&self, b: &core::Block, status: BlockStatus, opts: Options) { - match status { - BlockStatus::Reorg => { - warn!( - "block_accepted (REORG!): {:?} at {} (diff: {})", - b.hash(), - b.header.height, - b.header.total_difficulty(), - ); - } - BlockStatus::Fork => { - debug!( - "block_accepted (fork?): {:?} at {} (diff: {})", - b.hash(), - b.header.height, - b.header.total_difficulty(), - ); - } - BlockStatus::Next => { - debug!( - "block_accepted (head+): {:?} at {} (diff: {})", - b.hash(), - b.header.height, - b.header.total_difficulty(), - ); - } + for hook in &self.hooks { + hook.on_block_accepted(b, &status); } // not broadcasting blocks received through sync @@ -675,10 +656,14 @@ impl ChainAdapter for ChainToPoolAndNetAdapter { impl ChainToPoolAndNetAdapter { /// Construct a ChainToPoolAndNetAdapter instance. - pub fn new(tx_pool: Arc>) -> ChainToPoolAndNetAdapter { + pub fn new( + tx_pool: Arc>, + hooks: Vec>, + ) -> ChainToPoolAndNetAdapter { ChainToPoolAndNetAdapter { tx_pool, peers: OneTime::new(), + hooks: hooks, } } diff --git a/servers/src/common/hooks.rs b/servers/src/common/hooks.rs new file mode 100644 index 0000000000..cf7ced9d2f --- /dev/null +++ b/servers/src/common/hooks.rs @@ -0,0 +1,305 @@ +// Copyright 2019 The Grin Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This module allows to register callbacks on certain events. To add a custom +//! callback simply implement the coresponding trait and add it to the init function + +extern crate hyper; +extern crate tokio; + +use crate::chain::BlockStatus; +use crate::common::types::{ServerConfig, WebHooksConfig}; +use crate::core::core; +use crate::core::core::hash::Hashed; +use futures::future::Future; +use hyper::client::HttpConnector; +use hyper::header::HeaderValue; +use hyper::Client; +use hyper::{Body, Method, Request}; +use serde::Serialize; +use serde_json::{json, to_string}; +use crate::p2p::types::PeerAddr; +use tokio::runtime::Runtime; + +/// Returns the list of event hooks that will be initialized for network events +pub fn init_net_hooks(config: &ServerConfig) -> Vec> { + let mut list: Vec> = Vec::new(); + list.push(Box::new(EventLogger)); + if config.webhook_config.block_received_url.is_some() + || config.webhook_config.tx_received_url.is_some() + || config.webhook_config.header_received_url.is_some() + { + list.push(Box::new(WebHook::from_config(&config.webhook_config))); + } + list +} + +/// Returns the list of event hooks that will be initialized for chain events +pub fn init_chain_hooks(config: &ServerConfig) -> Vec> { + let mut list: Vec> = Vec::new(); + list.push(Box::new(EventLogger)); + if config.webhook_config.block_accepted_url.is_some() { + list.push(Box::new(WebHook::from_config(&config.webhook_config))); + } + list +} + +#[allow(unused_variables)] +/// Trait to be implemented by Network Event Hooks +pub trait NetEvents { + /// Triggers when a new transaction arrives + fn on_transaction_received(&self, tx: &core::Transaction) {} + + /// Triggers when a new block arrives + fn on_block_received(&self, block: &core::Block, addr: &PeerAddr) {} + + /// Triggers when a new block header arrives + fn on_header_received(&self, header: &core::BlockHeader, addr: &PeerAddr) {} +} + +#[allow(unused_variables)] +/// Trait to be implemented by Chain Event Hooks +pub trait ChainEvents { + /// Triggers when a new block is accepted by the chain (might be a Reorg or a Fork) + fn on_block_accepted(&self, block: &core::Block, status: &BlockStatus) {} +} + +/// Basic Logger +struct EventLogger; + +impl NetEvents for EventLogger { + fn on_transaction_received(&self, tx: &core::Transaction) { + debug!( + "Received tx {}, [in/out/kern: {}/{}/{}] going to process.", + tx.hash(), + tx.inputs().len(), + tx.outputs().len(), + tx.kernels().len(), + ); + } + + fn on_block_received(&self, block: &core::Block, addr: &PeerAddr) { + debug!( + "Received block {} at {} from {} [in/out/kern: {}/{}/{}] going to process.", + block.hash(), + block.header.height, + addr, + block.inputs().len(), + block.outputs().len(), + block.kernels().len(), + ); + } + + fn on_header_received(&self, header: &core::BlockHeader, addr: &PeerAddr) { + debug!( + "Received block header {} at {} from {}, going to process.", + header.hash(), + header.height, + addr + ); + } +} + +impl ChainEvents for EventLogger { + fn on_block_accepted(&self, block: &core::Block, status: &BlockStatus) { + match status { + BlockStatus::Reorg => { + warn!( + "block_accepted (REORG!): {:?} at {} (diff: {})", + block.hash(), + block.header.height, + block.header.total_difficulty(), + ); + } + BlockStatus::Fork => { + debug!( + "block_accepted (fork?): {:?} at {} (diff: {})", + block.hash(), + block.header.height, + block.header.total_difficulty(), + ); + } + BlockStatus::Next => { + debug!( + "block_accepted (head+): {:?} at {} (diff: {})", + block.hash(), + block.header.height, + block.header.total_difficulty(), + ); + } + } + } +} + +fn parse_url(value: &Option) -> Option { + match value { + Some(url) => { + let uri: hyper::Uri = match url.parse() { + Ok(value) => value, + Err(_) => panic!("Invalid url : {}", url), + }; + let scheme = uri.scheme_part().map(|s| s.as_str()); + if scheme != Some("http") { + panic!("Invalid url scheme {}, expected 'http'", url) + }; + Some(uri) + } + None => None, + } +} + +/// A struct that holds the hyper/tokio runtime. +struct WebHook { + /// url to POST transaction data when a new transaction arrives from a peer + tx_received_url: Option, + /// url to POST header data when a new header arrives from a peer + header_received_url: Option, + /// url to POST block data when a new block arrives from a peer + block_received_url: Option, + /// url to POST block data when a new block is accepted by our node (might be a reorg or a fork) + block_accepted_url: Option, + /// The hyper client to be used for all requests + client: Client, + /// The tokio event loop + runtime: Runtime, +} + +impl WebHook { + /// Instantiates a Webhook struct + fn new( + tx_received_url: Option, + header_received_url: Option, + block_received_url: Option, + block_accepted_url: Option, + ) -> WebHook { + WebHook { + tx_received_url, + block_received_url, + header_received_url, + block_accepted_url, + client: Client::new(), + runtime: Runtime::new().unwrap(), + } + } + + /// Instantiates a Webhook struct from a configuration file + fn from_config(config: &WebHooksConfig) -> WebHook { + WebHook::new( + parse_url(&config.tx_received_url), + parse_url(&config.header_received_url), + parse_url(&config.block_received_url), + parse_url(&config.block_accepted_url), + ) + } + + fn post(&self, url: hyper::Uri, data: String) { + let mut req = Request::new(Body::from(data)); + *req.method_mut() = Method::POST; + *req.uri_mut() = url.clone(); + req.headers_mut().insert( + hyper::header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + + let future = self + .client + .request(req) + .map(|_res| {}) + .map_err(move |_res| { + warn!("Error sending POST request to {}", url); + }); + + let handle = self.runtime.executor(); + handle.spawn(future); + } + fn make_request(&self, payload: &T, uri: &Option) -> bool { + if let Some(url) = uri { + let payload = match to_string(payload) { + Ok(serialized) => serialized, + Err(_) => { + return false; // print error message + } + }; + self.post(url.clone(), payload); + } + true + } +} + +impl ChainEvents for WebHook { + fn on_block_accepted(&self, block: &core::Block, status: &BlockStatus) { + let status = match status { + BlockStatus::Reorg => "reorg", + BlockStatus::Fork => "fork", + BlockStatus::Next => "head", + }; + let payload = json!({ + "hash": block.header.hash().to_hex(), + "status": status, + "data": block + }); + if !self.make_request(&payload, &self.block_accepted_url) { + error!( + "Failed to serialize block {} at height {}", + block.hash(), + block.header.height + ); + } + } +} + +impl NetEvents for WebHook { + /// Triggers when a new transaction arrives + fn on_transaction_received(&self, tx: &core::Transaction) { + let payload = json!({ + "hash": tx.hash().to_hex(), + "data": tx + }); + if !self.make_request(&payload, &self.tx_received_url) { + error!("Failed to serialize transaction {}", tx.hash()); + } + } + + /// Triggers when a new block arrives + fn on_block_received(&self, block: &core::Block, addr: &PeerAddr) { + let payload = json!({ + "hash": block.header.hash().to_hex(), + "peer": addr, + "data": block + }); + if !self.make_request(&payload, &self.block_received_url) { + error!( + "Failed to serialize block {} at height {}", + block.hash().to_hex(), + block.header.height + ); + } + } + + /// Triggers when a new block header arrives + fn on_header_received(&self, header: &core::BlockHeader, addr: &PeerAddr) { + let payload = json!({ + "hash": header.hash().to_hex(), + "peer": addr, + "data": header + }); + if !self.make_request(&payload, &self.header_received_url) { + error!( + "Failed to serialize header {} at height {}", + header.hash(), + header.height + ); + } + } +} diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index 77852f5a89..6b9ed73cb8 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -169,6 +169,10 @@ pub struct ServerConfig { /// Configuration for the mining daemon #[serde(default)] pub stratum_mining_config: Option, + + /// Configuration for the webhooks that trigger on certain events + #[serde(default)] + pub webhook_config: WebHooksConfig, } impl Default for ServerConfig { @@ -190,6 +194,7 @@ impl Default for ServerConfig { run_tui: Some(true), run_test_miner: Some(false), test_miner_wallet_url: None, + webhook_config: WebHooksConfig::default(), } } } @@ -232,6 +237,30 @@ impl Default for StratumServerConfig { } } +/// Web hooks configuration +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct WebHooksConfig { + /// url to POST transaction data when a new transaction arrives from a peer + pub tx_received_url: Option, + /// url to POST header data when a new header arrives from a peer + pub header_received_url: Option, + /// url to POST block data when a new block arrives from a peer + pub block_received_url: Option, + /// url to POST block data when a new block is accepted by our node (might be a reorg or a fork) + pub block_accepted_url: Option, +} + +impl Default for WebHooksConfig { + fn default() -> WebHooksConfig { + WebHooksConfig { + tx_received_url: None, + header_received_url: None, + block_received_url: None, + block_accepted_url: None, + } + } +} + /// Various status sync can be in, whether it's fast sync or archival. #[derive(Debug, Clone, Copy, Eq, PartialEq)] #[allow(missing_docs)] diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 265067d056..80ee209a87 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -31,6 +31,7 @@ use crate::chain; use crate::common::adapters::{ ChainToPoolAndNetAdapter, NetToChainAdapter, PoolToChainAdapter, PoolToNetAdapter, }; +use crate::common::hooks::{init_chain_hooks, init_net_hooks}; use crate::common::stats::{DiffBlock, DiffStats, PeerStats, ServerStateInfo, ServerStats}; use crate::common::types::{Error, ServerConfig, StratumServerConfig, SyncState, SyncStatus}; use crate::core::core::hash::{Hashed, ZERO_HASH}; @@ -164,7 +165,10 @@ impl Server { let sync_state = Arc::new(SyncState::new()); - let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new(tx_pool.clone())); + let chain_adapter = Arc::new(ChainToPoolAndNetAdapter::new( + tx_pool.clone(), + init_chain_hooks(&config), + )); let genesis = match config.chain_type { global::ChainTypes::AutomatedTesting => genesis::genesis_dev(), @@ -195,6 +199,7 @@ impl Server { tx_pool.clone(), verifier_cache.clone(), config.clone(), + init_net_hooks(&config), )); let peer_db_env = Arc::new(store::new_named_env(