diff --git a/components/chainhook-cli/src/cli/mod.rs b/components/chainhook-cli/src/cli/mod.rs index 90fa46d4..71c53cc6 100644 --- a/components/chainhook-cli/src/cli/mod.rs +++ b/components/chainhook-cli/src/cli/mod.rs @@ -1,8 +1,8 @@ use crate::block::DigestingCommand; use crate::config::generator::generate_config; use crate::config::Config; -use crate::scan::bitcoin::scan_bitcoin_chain_with_predicate_via_http; -use crate::scan::stacks::scan_stacks_chain_with_predicate; +use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate; +use crate::scan::stacks::scan_stacks_chainstate_via_csv_using_predicate; use crate::service::Service; use chainhook_event_observer::bitcoincore_rpc::{Auth, Client, RpcApi}; @@ -535,10 +535,32 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> { } }; - scan_bitcoin_chain_with_predicate_via_http(predicate_spec, &config, &ctx).await?; + scan_bitcoin_chainstate_via_http_using_predicate( + predicate_spec, + &config, + &ctx, + ) + .await?; } ChainhookFullSpecification::Stacks(predicate) => { - scan_stacks_chain_with_predicate(predicate, &mut config, &ctx).await?; + let predicate_spec = match predicate + .into_selected_network_specification(&config.network.stacks_network) + { + Ok(predicate) => predicate, + Err(e) => { + return Err(format!( + "Specification missing for network {:?}: {e}", + config.network.bitcoin_network + )); + } + }; + + scan_stacks_chainstate_via_csv_using_predicate( + predicate_spec, + &mut config, + &ctx, + ) + .await?; } } } diff --git a/components/chainhook-cli/src/scan/stacks.rs b/components/chainhook-cli/src/scan/stacks.rs index a8185d9a..1fd71736 100644 --- a/components/chainhook-cli/src/scan/stacks.rs +++ b/components/chainhook-cli/src/scan/stacks.rs @@ -9,33 +9,31 @@ use crate::{ config::Config, }; use chainhook_event_observer::{ - chainhooks::stacks::{ - handle_stacks_hook_action, StacksChainhookOccurrence, StacksTriggerChainhook, + chainhooks::{ + stacks::evaluate_stacks_chainhook_on_blocks, }, - utils::{file_append, send_request, AbstractStacksBlock}, + indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer}, + utils::Context, }; use chainhook_event_observer::{ chainhooks::{ - stacks::evaluate_stacks_chainhook_on_blocks, types::StacksChainhookFullSpecification, + stacks::{handle_stacks_hook_action, StacksChainhookOccurrence, StacksTriggerChainhook}, + types::StacksChainhookSpecification, }, - indexer::{self, stacks::standardize_stacks_serialized_block_header, Indexer}, - utils::Context, + utils::{file_append, send_request, AbstractStacksBlock}, }; use chainhook_types::BlockIdentifier; -pub async fn scan_stacks_chain_with_predicate( - predicate: StacksChainhookFullSpecification, +pub async fn scan_stacks_chainstate_via_csv_using_predicate( + predicate_spec: StacksChainhookSpecification, config: &mut Config, ctx: &Context, -) -> Result<(), String> { - let selected_predicate = - predicate.into_selected_network_specification(&config.network.stacks_network)?; - - let start_block = match selected_predicate.start_block { +) -> Result { + let start_block = match predicate_spec.start_block { Some(start_block) => start_block, None => { return Err( - "Chainhook specification must include fields 'start_block' and 'end_block' when using the scan command" + "Chainhook specification must include fields 'start_block' when using the scan command" .into(), ); } @@ -105,7 +103,7 @@ pub async fn scan_stacks_chain_with_predicate( continue; } - if let Some(end_block) = selected_predicate.end_block { + if let Some(end_block) = predicate_spec.end_block { if block_identifier.index > end_block { break; } @@ -137,7 +135,10 @@ pub async fn scan_stacks_chain_with_predicate( ctx.expect_logger(), "Starting predicate evaluation on Stacks blocks" ); - for (_block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) { + let mut last_block_scanned = BlockIdentifier::default(); + let mut err_count = 0; + for (block_identifier, _parent_block_identifier, blob) in canonical_fork.drain(..) { + last_block_scanned = block_identifier; blocks_scanned += 1; let block_data = match indexer::stacks::standardize_stacks_serialized_block( &indexer.config, @@ -154,13 +155,13 @@ pub async fn scan_stacks_chain_with_predicate( let blocks: Vec<&dyn AbstractStacksBlock> = vec![&block_data]; - let hits_per_blocks = evaluate_stacks_chainhook_on_blocks(blocks, &selected_predicate, ctx); + let hits_per_blocks = evaluate_stacks_chainhook_on_blocks(blocks, &predicate_spec, ctx); if hits_per_blocks.is_empty() { continue; } let trigger = StacksTriggerChainhook { - chainhook: &selected_predicate, + chainhook: &predicate_spec, apply: hits_per_blocks, rollback: vec![], }; @@ -170,24 +171,29 @@ pub async fn scan_stacks_chain_with_predicate( } Ok(action) => { actions_triggered += 1; - match action { - StacksChainhookOccurrence::Http(request) => { - send_request(request, &ctx).await; - } - StacksChainhookOccurrence::File(path, bytes) => { - file_append(path, bytes, &ctx); - } + let res = match action { + StacksChainhookOccurrence::Http(request) => send_request(request, &ctx).await, + StacksChainhookOccurrence::File(path, bytes) => file_append(path, bytes, &ctx), StacksChainhookOccurrence::Data(_payload) => unreachable!(), + }; + if res.is_err() { + err_count += 1; + } else { + err_count = 0; } } } + // We abort after 3 consecutive errors + if err_count >= 3 { + return Err(format!("Scan aborted (consecutive action errors >= 3)")); + } } info!( ctx.expect_logger(), "{blocks_scanned} blocks scanned, {actions_triggered} actions triggered" ); - Ok(()) + Ok(last_block_scanned) } async fn download_dataset_if_required(config: &mut Config, ctx: &Context) -> bool { diff --git a/components/chainhook-cli/src/service/mod.rs b/components/chainhook-cli/src/service/mod.rs index 744c3602..921f7ad5 100644 --- a/components/chainhook-cli/src/service/mod.rs +++ b/components/chainhook-cli/src/service/mod.rs @@ -1,30 +1,21 @@ use crate::config::Config; -use crate::scan::bitcoin::scan_bitcoin_chain_with_predicate_via_http; - - -use chainhook_event_observer::chainhooks::types::{ - ChainhookConfig, ChainhookFullSpecification, -}; +use crate::scan::bitcoin::scan_bitcoin_chainstate_via_http_using_predicate; +use crate::scan::stacks::scan_stacks_chainstate_via_csv_using_predicate; +use chainhook_event_observer::chainhooks::types::{ChainhookConfig, ChainhookFullSpecification}; use chainhook_event_observer::observer::{start_event_observer, ApiKey, ObserverEvent}; -use chainhook_event_observer::utils::{Context}; +use chainhook_event_observer::utils::Context; use chainhook_event_observer::{ - chainhooks::stacks::{ - evaluate_stacks_predicate_on_transaction, handle_stacks_hook_action, - StacksChainhookOccurrence, StacksTriggerChainhook, - }, chainhooks::types::ChainhookSpecification, }; use chainhook_types::{ - BitcoinBlockSignaling, BlockIdentifier, StacksBlockData, StacksBlockMetadata, StacksChainEvent, - StacksTransactionData, + BitcoinBlockSignaling, StacksBlockData, StacksChainEvent, }; use redis::{Commands, Connection}; -use std::collections::{HashMap}; -use std::sync::mpsc::channel; +use std::sync::mpsc::channel; pub const DEFAULT_INGESTION_PORT: u16 = 20455; pub const DEFAULT_CONTROL_PORT: u16 = 20456; @@ -191,129 +182,30 @@ impl Service { ); } match chainhook { - ChainhookSpecification::Stacks(stacks_hook) => { - // Retrieve highest block height stored - let tip_height: u64 = redis_con.get(&format!("stx:tip")).unwrap_or(1); - - let start_block = stacks_hook.start_block.unwrap_or(1); // TODO(lgalabru): handle STX hooks and genesis block :s - let end_block = stacks_hook.end_block.unwrap_or(tip_height); // TODO(lgalabru): handle STX hooks and genesis block :s - + ChainhookSpecification::Stacks(predicate_spec) => { + let end_block = match scan_stacks_chainstate_via_csv_using_predicate( + predicate_spec, + &mut self.config, + &self.ctx, + ) + .await + { + Ok(end_block) => end_block, + Err(e) => { + error!( + self.ctx.expect_logger(), + "Unable to evaluate predicate on Bitcoin chainstate: {e}", + ); + continue; + } + }; info!( self.ctx.expect_logger(), - "Processing Stacks chainhook {}, will scan blocks [{}; {}]", - stacks_hook.uuid, - start_block, - end_block + "Stacks chainstate scan completed up to block: {}", end_block.index ); - let mut total_hits = 0; - for cursor in start_block..=end_block { - debug!( - self.ctx.expect_logger(), - "Evaluating predicate #{} on block #{}", - stacks_hook.uuid, - cursor - ); - let ( - block_identifier, - parent_block_identifier, - timestamp, - transactions, - metadata, - ) = { - let payload: Vec = redis_con - .hget( - &format!("stx:{}", cursor), - &[ - "block_identifier", - "parent_block_identifier", - "timestamp", - "transactions", - "metadata", - ], - ) - .expect("unable to retrieve tip height"); - if payload.len() != 5 { - warn!(self.ctx.expect_logger(), "Chain still being processed, please retry in a few minutes"); - continue; - } - ( - serde_json::from_str::(&payload[0]) - .unwrap(), - serde_json::from_str::(&payload[1]) - .unwrap(), - serde_json::from_str::(&payload[2]).unwrap(), - serde_json::from_str::>( - &payload[3], - ) - .unwrap(), - serde_json::from_str::(&payload[4]) - .unwrap(), - ) - }; - let mut hits = vec![]; - for tx in transactions.iter() { - if evaluate_stacks_predicate_on_transaction( - &tx, - &stacks_hook, - &self.ctx, - ) { - debug!( - self.ctx.expect_logger(), - "Action #{} triggered by transaction {} (block #{})", - stacks_hook.uuid, - tx.transaction_identifier.hash, - cursor - ); - hits.push(tx); - total_hits += 1; - } - } - - if hits.len() > 0 { - let block = StacksBlockData { - block_identifier, - parent_block_identifier, - timestamp, - transactions: vec![], - metadata, - }; - let trigger = StacksTriggerChainhook { - chainhook: &stacks_hook, - apply: vec![(hits, &block)], - rollback: vec![], - }; - - let proofs = HashMap::new(); - match handle_stacks_hook_action(trigger, &proofs, &self.ctx) { - Err(e) => { - info!( - self.ctx.expect_logger(), - "unable to handle action {}", e - ); - } - Ok(StacksChainhookOccurrence::Http(request)) => { - if let Err(e) = - hiro_system_kit::nestable_block_on(request.send()) - { - error!( - self.ctx.expect_logger(), - "unable to perform action {}", e - ); - } - } - Ok(_) => { - error!( - self.ctx.expect_logger(), - "action not supported" - ); - } - } - } - } - info!(self.ctx.expect_logger(), "Stacks chainhook {} scan completed: action triggered by {} transactions", stacks_hook.uuid, total_hits); } ChainhookSpecification::Bitcoin(predicate_spec) => { - match scan_bitcoin_chain_with_predicate_via_http( + match scan_bitcoin_chainstate_via_http_using_predicate( predicate_spec, &self.config, &self.ctx, @@ -322,7 +214,7 @@ impl Service { { Ok(_) => {} Err(e) => { - info!( + error!( self.ctx.expect_logger(), "Unable to evaluate predicate on Bitcoin chainstate: {e}", );