diff --git a/Cargo.lock b/Cargo.lock index 8cc90c725e7..55466ad20ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -597,6 +597,9 @@ name = "bitflags" version = "2.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" +dependencies = [ + "serde", +] [[package]] name = "bitvec" @@ -2039,7 +2042,8 @@ dependencies = [ "rand 0.8.5", "regex", "reopen", - "rocksdb 0.23.0", + "rocksdb 0.24.0", + "ron", "rust_decimal", "rust_decimal_macros", "serde", @@ -2049,6 +2053,7 @@ dependencies = [ "tempfile", "tenderdash-abci", "thiserror 1.0.69", + "time", "tokio", "tokio-util", "tracing", @@ -5337,6 +5342,20 @@ dependencies = [ "librocksdb-sys", ] +[[package]] +name = "ron" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd490c5b18261893f14449cbd28cb9c0b637aebf161cd77900bfdedaff21ec32" +dependencies = [ + "bitflags 2.9.4", + "once_cell", + "serde", + "serde_derive", + "typeid", + "unicode-ident", +] + [[package]] name = "rpassword" version = "7.4.0" @@ -6431,6 +6450,7 @@ dependencies = [ "hex", "lhash", "semver", + "serde_json", "tenderdash-proto", "thiserror 2.0.16", "tokio", @@ -7166,6 +7186,12 @@ dependencies = [ "utf-8", ] +[[package]] +name = "typeid" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" + [[package]] name = "typenum" version = "1.18.0" diff --git a/packages/rs-drive-abci/.env.testnet b/packages/rs-drive-abci/.env.testnet index dccf681adf6..7d7c361c562 100644 --- a/packages/rs-drive-abci/.env.testnet +++ b/packages/rs-drive-abci/.env.testnet @@ -33,21 +33,21 @@ CORE_CHECK_TX_JSON_RPC_PASSWORD=password INITIAL_CORE_CHAINLOCKED_HEIGHT=1243 # https://github.com/dashevo/dashcore-lib/blob/286c33a9d29d33f05d874c47a9b33764a0be0cf1/lib/constants/index.js#L42-L57 -VALIDATOR_SET_QUORUM_TYPE=llmq_25_67 -VALIDATOR_SET_QUORUM_SIZE=25 +VALIDATOR_SET_QUORUM_TYPE=6 +VALIDATOR_SET_QUORUM_SIZE=100 VALIDATOR_SET_QUORUM_WINDOW=24 VALIDATOR_SET_QUORUM_ACTIVE_SIGNERS=24 VALIDATOR_SET_QUORUM_ROTATION=false VALIDATOR_SET_ROTATION_BLOCK_COUNT=64 -CHAIN_LOCK_QUORUM_TYPE=llmq_50_60 -CHAIN_LOCK_QUORUM_SIZE=50 +CHAIN_LOCK_QUORUM_TYPE=1 +CHAIN_LOCK_QUORUM_SIZE=400 CHAIN_LOCK_QUORUM_WINDOW=24 CHAIN_LOCK_QUORUM_ACTIVE_SIGNERS=24 CHAIN_LOCK_QUORUM_ROTATION=false -INSTANT_LOCK_QUORUM_TYPE=llmq_60_75 -INSTANT_LOCK_QUORUM_SIZE=50 +INSTANT_LOCK_QUORUM_TYPE=5 +INSTANT_LOCK_QUORUM_SIZE=60 INSTANT_LOCK_QUORUM_WINDOW=288 INSTANT_LOCK_QUORUM_ACTIVE_SIGNERS=32 INSTANT_LOCK_QUORUM_ROTATION=true @@ -77,9 +77,9 @@ MASTERNODE_REWARD_SHARES_SECOND_PUBLIC_KEY=02bf55f97f189895da29824781053140ee66b WITHDRAWALS_MASTER_PUBLIC_KEY=027057cdf58628635ef7b75e6b6c90dd996a16929cd68130e16b9328d429e5e03a WITHDRAWALS_SECOND_PUBLIC_KEY=022084d827fea4823a69aa7c8d3e02fe780eaa0ef1e5e9841af395ba7e40465ab6 -EPOCH_TIME_LENGTH_S=788400 +EPOCH_TIME_LENGTH_S=3600 -CHAIN_ID=devnet +CHAIN_ID=dash-testnet-51 BLOCK_SPACING_MS=5000 TOKIO_CONSOLE_ENABLED=false diff --git a/packages/rs-drive-abci/Cargo.toml b/packages/rs-drive-abci/Cargo.toml index def4a93cae2..a326d4d5c83 100644 --- a/packages/rs-drive-abci/Cargo.toml +++ b/packages/rs-drive-abci/Cargo.toml @@ -15,6 +15,7 @@ license = "MIT" [dependencies] arc-swap = "1.7.0" bincode = { version = "=2.0.0-rc.3", features = ["serde"] } +base64 = { version = "0.22.1", optional = true } ciborium = { version = "0.2.2" } chrono = "0.4.35" serde = { version = "1.0.219", features = ["derive"] } @@ -54,6 +55,11 @@ tracing-subscriber = { version = "0.3.16", default-features = false, features = tenderdash-abci = { git = "https://github.com/dashpay/rs-tenderdash-abci", tag = "v1.5.0-dev.2", features = [ "grpc", ] } +time = { version = "0.3", optional = true, features = [ + "macros", + "formatting", + "serde-human-readable", +] } lazy_static = "1.4.0" itertools = { version = "0.13" } @@ -75,9 +81,9 @@ tokio = { version = "1.40", features = [ tokio-util = { version = "0.7" } derive_more = { version = "1.0", features = ["from", "deref", "deref_mut"] } async-trait = "0.1.77" +ron = { version = "0.12", optional = true } console-subscriber = { version = "0.4", optional = true } bls-signatures = { git = "https://github.com/dashpay/bls-signatures", rev = "0842b17583888e8f46c252a4ee84cdfd58e0546f", optional = true } - [dev-dependencies] bs58 = { version = "0.5.0" } base64 = "0.22.1" @@ -104,7 +110,7 @@ bls-signatures = { git = "https://github.com/dashpay/bls-signatures", rev = "084 mockall = { version = "0.13" } # For tests of grovedb verify -rocksdb = { version = "0.23.0" } +rocksdb = { version = "0.24.0" } integer-encoding = { version = "4.0.0" } [features] @@ -113,7 +119,8 @@ mocks = ["mockall", "drive/fixtures-and-mocks", "bls-signatures"] console = ["console-subscriber", "tokio/tracing"] testing-config = [] grovedbg = ["drive/grovedbg"] - +# `abci-server replay` command +replay = ["dep:ron", "dep:time", "tenderdash-abci/serde"] [[bin]] name = "drive-abci" path = "src/main.rs" diff --git a/packages/rs-drive-abci/src/abci/handler/process_proposal.rs b/packages/rs-drive-abci/src/abci/handler/process_proposal.rs index e10a60aef0d..9a8c3cd9c35 100644 --- a/packages/rs-drive-abci/src/abci/handler/process_proposal.rs +++ b/packages/rs-drive-abci/src/abci/handler/process_proposal.rs @@ -33,12 +33,11 @@ where if let Some(block_execution_context) = block_execution_context_guard.as_mut() { // We are already in a block, or in init chain. // This only makes sense if we were the proposer unless we are at a future round - if block_execution_context.block_state_info().round() != (request.round as u32) { + let block_state_info = block_execution_context.block_state_info(); + if block_state_info.round() != (request.round as u32) { // We were not the proposer, and we should process something new drop_block_execution_context = true; - } else if let Some(current_block_hash) = - block_execution_context.block_state_info().block_hash() - { + } else if let Some(current_block_hash) = block_state_info.block_hash() { // There is also the possibility that this block already came in, but tenderdash crashed // Now tenderdash is sending it again if let Some(proposal_info) = block_execution_context.proposer_results() { @@ -69,6 +68,15 @@ where } else { // We are getting a different block hash for a block of the same round // This is a terrible issue + tracing::error!( + method = "process_proposal", + block_state_info = ?block_state_info, + "received a process proposal request twice with different hash for height {}/round {}: existing hash {:?}, new hash {:?}", + request.height, + request.round, + current_block_hash, + request.hash, + ); Err(Error::Abci(AbciError::BadRequest( "received a process proposal request twice with different hash".to_string(), )))?; diff --git a/packages/rs-drive-abci/src/execution/engine/run_block_proposal/v0/mod.rs b/packages/rs-drive-abci/src/execution/engine/run_block_proposal/v0/mod.rs index e8ee8308050..7c829b5043f 100644 --- a/packages/rs-drive-abci/src/execution/engine/run_block_proposal/v0/mod.rs +++ b/packages/rs-drive-abci/src/execution/engine/run_block_proposal/v0/mod.rs @@ -399,6 +399,7 @@ where tracing::trace!( method = "run_block_proposal_v0", app_hash = hex::encode(root_hash), + block_hash = hex::encode(block_proposal.block_hash.unwrap_or_default()), platform_state_fingerprint = hex::encode( block_execution_context .block_platform_state() diff --git a/packages/rs-drive-abci/src/execution/storage/fetch_platform_state/mod.rs b/packages/rs-drive-abci/src/execution/storage/fetch_platform_state/mod.rs index f2537eca34f..1955ac5dc2f 100644 --- a/packages/rs-drive-abci/src/execution/storage/fetch_platform_state/mod.rs +++ b/packages/rs-drive-abci/src/execution/storage/fetch_platform_state/mod.rs @@ -1,3 +1,5 @@ +//! Fetches execution state from grovedb storage + use crate::error::execution::ExecutionError; use crate::error::Error; use crate::platform_types::platform::Platform; diff --git a/packages/rs-drive-abci/src/execution/storage/mod.rs b/packages/rs-drive-abci/src/execution/storage/mod.rs index 54302920e5d..92c2b2417dc 100644 --- a/packages/rs-drive-abci/src/execution/storage/mod.rs +++ b/packages/rs-drive-abci/src/execution/storage/mod.rs @@ -1,2 +1,2 @@ -mod fetch_platform_state; +pub mod fetch_platform_state; mod store_platform_state; diff --git a/packages/rs-drive-abci/src/lib.rs b/packages/rs-drive-abci/src/lib.rs index a81b24d0ef0..bc696fcd8b0 100644 --- a/packages/rs-drive-abci/src/lib.rs +++ b/packages/rs-drive-abci/src/lib.rs @@ -50,5 +50,10 @@ pub mod query; /// Various utils pub mod utils; +/// Replay captured ABCI requests against drive-abci +#[cfg(feature = "replay")] +pub mod replay; /// Drive server pub mod server; +/// Verification helpers +pub mod verify; diff --git a/packages/rs-drive-abci/src/main.rs b/packages/rs-drive-abci/src/main.rs index fdb1b9e23c7..5f8e332301a 100644 --- a/packages/rs-drive-abci/src/main.rs +++ b/packages/rs-drive-abci/src/main.rs @@ -1,13 +1,15 @@ //! Main server process for RS-Drive-ABCI //! //! RS-Drive-ABCI server starts a single-threaded server and listens to connections from Tenderdash. +#[cfg(feature = "replay")] +use drive_abci::replay::{self, ReplayArgs}; +use drive_abci::verify::verify_grovedb; use clap::{Parser, Subcommand}; use dapi_grpc::platform::v0::get_status_request; use dapi_grpc::platform::v0::get_status_request::GetStatusRequestV0; use dapi_grpc::platform::v0::platform_client::PlatformClient; use dapi_grpc::tonic::transport::Uri; -use dpp::version::PlatformVersion; use drive_abci::config::{FromEnv, PlatformConfig}; use drive_abci::core::wait_for_core_to_sync::v0::wait_for_core_to_sync_v0; use drive_abci::logging::{LogBuilder, LogConfig, LogDestination, Loggers}; @@ -16,7 +18,6 @@ use drive_abci::platform_types::platform::Platform; use drive_abci::rpc::core::DefaultCoreRPC; use drive_abci::{logging, server}; use itertools::Itertools; -use std::fs::remove_file; #[cfg(all(tokio_unstable, feature = "console"))] use std::net::SocketAddr; use std::path::PathBuf; @@ -63,6 +64,11 @@ enum Commands { /// Print current software version #[command()] Version, + + /// Replay ABCI requests captured from drive-abci logs. + #[cfg(feature = "replay")] + #[command()] + Replay(ReplayArgs), } /// Server that accepts connections from Tenderdash, and @@ -151,8 +157,13 @@ impl Cli { } Commands::Config => dump_config(&config)?, Commands::Status => runtime.block_on(check_status(&config))?, - Commands::Verify => verify_grovedb(&config.db_path, true)?, + Commands::Verify => drive_abci::verify::run(&config, true)?, Commands::Version => print_version(), + #[cfg(feature = "replay")] + Commands::Replay(args) => { + replay::run(config, args, cancel.clone()).map_err(|e| e.to_string())?; + return Ok(()); + } }; Ok(()) @@ -331,62 +342,6 @@ async fn check_status(config: &PlatformConfig) -> Result<(), String> { .map_err(|e| format!("can't request status: {e}")) } -/// Verify GroveDB integrity. -/// -/// This function will execute GroveDB integrity checks if one of the following conditions is met: -/// - `force` is `true` -/// - file `.fsck` in `config.db_path` exists -/// -/// After successful verification, .fsck file is removed. -fn verify_grovedb(db_path: &PathBuf, force: bool) -> Result<(), String> { - let fsck = PathBuf::from(db_path).join(".fsck"); - - if !force { - if !fsck.exists() { - return Ok(()); - } - tracing::info!( - "found {} file, starting grovedb verification", - fsck.display() - ); - } - - let grovedb = drive::grovedb::GroveDb::open(db_path).expect("open grovedb"); - //todo: get platform version instead of taking latest - let result = grovedb - .visualize_verify_grovedb( - None, - true, - true, - &PlatformVersion::latest().drive.grove_version, - ) - .map_err(|e| e.to_string()); - - match result { - Ok(data) => { - for result in data { - tracing::warn!(?result, "grovedb verification") - } - tracing::info!("grovedb verification finished"); - - if fsck.exists() { - if let Err(e) = remove_file(&fsck) { - tracing::warn!( - error = ?e, - path =fsck.display().to_string(), - "grovedb verification: cannot remove .fsck file: please remove it manually to avoid running verification again", - ); - } - } - Ok(()) - } - Err(e) => { - tracing::error!("grovedb verification failed: {}", e); - Err(e) - } - } -} - /// Print current software version. fn print_version() { println!("{}", env!("CARGO_PKG_VERSION")); diff --git a/packages/rs-drive-abci/src/replay/cli.rs b/packages/rs-drive-abci/src/replay/cli.rs new file mode 100644 index 00000000000..b9e5963bfd0 --- /dev/null +++ b/packages/rs-drive-abci/src/replay/cli.rs @@ -0,0 +1,85 @@ +use clap::Args; +use std::path::PathBuf; + +/// Replay ABCI requests captured from drive-abci logs. +#[derive(Debug, Args, Clone)] +#[command(about, long_about)] +pub struct ReplayArgs { + /// Path to the GroveDB database that should be used for execution. + /// Defaults to the path from drive-abci configuration. + #[arg(long, value_hint = clap::ValueHint::DirPath)] + pub db_path: Option, + + /// drive-abci JSON log that contains TRACE level "received ABCI request" entries. + /// Relevant requests will be extracted and replayed chronologically. + /// Other log entries are ignored. + #[arg(long = "log", value_hint = clap::ValueHint::FilePath)] + pub log: PathBuf, + + /// Log progress information at INFO level after each finalize_block. + #[arg(short, long)] + pub progress: bool, + + /// Skip replaying specific log entries by their line numbers (supports ranges and comma lists). + #[arg( + long = "skip", + value_name = "LINE[-LINE]", + value_delimiter = ',', + value_parser = parse_skip_selector + )] + pub skip: Vec, + + /// Stop replay after reaching this block height (inclusive). + #[arg(long, value_name = "HEIGHT")] + pub stop_height: Option, +} + +/// Selector used by `--skip` flag to filter log line numbers. +#[derive(Debug, Clone, Copy)] +pub enum SkipSelector { + Line(usize), + Range { start: usize, end: usize }, +} + +impl SkipSelector { + pub fn matches(&self, line: usize) -> bool { + match self { + SkipSelector::Line(target) => line == *target, + SkipSelector::Range { start, end } => (*start..=*end).contains(&line), + } + } +} + +pub fn parse_skip_selector(raw: &str) -> Result { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return Err("skip value cannot be empty".to_string()); + } + + if let Some((start, end)) = trimmed.split_once('-') { + let start_line = parse_line_number(start)?; + let end_line = parse_line_number(end)?; + if start_line > end_line { + return Err(format!( + "invalid skip range {}-{} (start must be <= end)", + start_line, end_line + )); + } + return Ok(SkipSelector::Range { + start: start_line, + end: end_line, + }); + } + + let line = parse_line_number(trimmed)?; + Ok(SkipSelector::Line(line)) +} + +fn parse_line_number(raw: &str) -> Result { + raw.trim().parse::().map_err(|_| { + format!( + "invalid skip target '{}'; expected positive line number", + raw.trim() + ) + }) +} diff --git a/packages/rs-drive-abci/src/replay/log_ingest.rs b/packages/rs-drive-abci/src/replay/log_ingest.rs new file mode 100644 index 00000000000..2ec22af2eb9 --- /dev/null +++ b/packages/rs-drive-abci/src/replay/log_ingest.rs @@ -0,0 +1,1212 @@ +use crate::replay::{LoadedRequest, ReplayItem}; +use hex::encode as hex_encode; +use serde::de::value; +use serde::de::Error as DeError; +use serde::de::IgnoredAny; +use serde::de::{DeserializeOwned, DeserializeSeed, EnumAccess, IntoDeserializer, VariantAccess}; +use serde::forward_to_deserialize_any; +use serde::{Deserialize, Deserializer}; +use std::collections::BTreeMap; +use std::fmt; +use std::fs::File; +use std::io::{BufRead, BufReader, Lines}; +use std::path::{Path, PathBuf}; +use tenderdash_abci::proto::serializers::timestamp::to_rfc3339_nanos; +use time::OffsetDateTime; + +pub struct LogRequestStream { + path: PathBuf, + lines: Lines>, + line_number: usize, + buffered: Option, +} + +impl LogRequestStream { + pub fn open(path: &Path) -> Result> { + let file = File::open(path)?; + let canonical = path.canonicalize().unwrap_or_else(|_| path.to_path_buf()); + Ok(Self { + path: canonical, + lines: BufReader::new(file).lines(), + line_number: 0, + buffered: None, + }) + } + + pub fn path(&self) -> &Path { + &self.path + } + + pub fn peek(&mut self) -> Result, Box> { + if self.buffered.is_none() { + self.buffered = self.read_next_request()?; + } + Ok(self.buffered.as_ref()) + } + + pub fn next_item(&mut self) -> Result, Box> { + if let Some(item) = self.buffered.take() { + return Ok(Some(item)); + } + self.read_next_request() + } + + pub fn skip_processed_entries( + &mut self, + max_height: u64, + ) -> Result> { + let mut skipped = 0usize; + loop { + let Some(item) = self.peek()? else { + break; + }; + + let should_skip = match item.request.block_height() { + Some(height) => height <= max_height, + None => false, + }; + + if should_skip { + let _ = self.next_item()?; + skipped += 1; + } else { + break; + } + } + + Ok(skipped) + } + + fn read_next_request(&mut self) -> Result, Box> { + while let Some(line_result) = self.lines.next() { + let line = line_result?; + self.line_number += 1; + if line.trim().is_empty() { + continue; + } + + let entry: LogEntry = match serde_json::from_str(&line) { + Ok(entry) => entry, + Err(err) => { + tracing::debug!( + "skipping malformed log line {}:{} ({:?})", + self.path.display(), + self.line_number, + err + ); + continue; + } + }; + + let LogEntry { + timestamp, + fields, + span, + } = entry; + + let Some(fields) = fields else { + continue; + }; + + if fields.message.as_deref() != Some("received ABCI request") { + continue; + } + + let Some(raw_request) = fields.request.as_deref() else { + continue; + }; + + let endpoint = span.and_then(|s| s.endpoint); + + match parse_logged_request(raw_request) { + Ok(Some(loaded)) => { + return Ok(Some(ReplayItem::from_log( + self.path(), + self.line_number, + timestamp, + endpoint, + loaded, + ))); + } + Ok(None) => { + // tracing::trace!( + // "skipping replay for ABCI request {}:{} (no-op variant)", + // self.path.display(), + // self.line_number + // ); + } + Err(err) => { + tracing::warn!( + "failed to parse ABCI request from {}:{} ({})", + self.path.display(), + self.line_number, + err + ); + } + } + } + + Ok(None) + } +} + +#[derive(Debug, Deserialize)] +struct LogEntry { + timestamp: Option, + fields: Option, + span: Option, +} + +#[derive(Debug, Deserialize)] +struct LogFields { + message: Option, + request: Option, +} + +#[derive(Debug, Deserialize)] +struct LogSpan { + endpoint: Option, +} + +fn parse_logged_request(raw: &str) -> Result, Box> { + let parser = DebugValueParser::new(raw); + let parsed = parser + .parse() + .map_err(|err| format!("failed to parse request payload: {}", err))?; + convert_parsed_request(parsed) +} + +fn convert_parsed_request( + value: ParsedValue, +) -> Result, Box> { + match value { + ParsedValue::Tagged { tag, value } if tag == "Request" => { + let mut object = value.into_object()?; + let inner = object + .remove("value") + .ok_or_else(|| "request log entry missing value field".to_string())?; + parse_request_variant(inner) + } + other => parse_request_variant(other), + } +} + +fn parse_request_variant( + value: ParsedValue, +) -> Result, Box> { + let (tag, inner) = match value { + ParsedValue::Tagged { tag, value } => (tag, *value), + _ => return Err("request log entry is missing request variant tag".into()), + }; + + let normalized = tag.trim_start_matches("Request"); + + match normalized { + "InitChain" => deserialize_to_loaded(inner, LoadedRequest::InitChain).map(Some), + "Info" => deserialize_to_loaded(inner, LoadedRequest::Info).map(Some), + "PrepareProposal" => deserialize_to_loaded(inner, LoadedRequest::Prepare).map(Some), + "ProcessProposal" => deserialize_to_loaded(inner, LoadedRequest::Process).map(Some), + "FinalizeBlock" => deserialize_to_loaded(inner, LoadedRequest::Finalize).map(Some), + "ExtendVote" => deserialize_to_loaded(inner, LoadedRequest::ExtendVote).map(Some), + "VerifyVoteExtension" => { + deserialize_to_loaded(inner, LoadedRequest::VerifyVoteExtension).map(Some) + } + "Flush" => Ok(None), + other => Err(format!("unsupported request variant {}", other).into()), + } +} + +pub struct DebugValueParser<'a> { + input: &'a [u8], + pos: usize, +} + +impl<'a> DebugValueParser<'a> { + fn new(raw: &'a str) -> Self { + Self { + input: raw.as_bytes(), + pos: 0, + } + } + + fn parse(mut self) -> Result { + let value = self.parse_value()?; + self.skip_ws(); + if self.pos != self.input.len() { + Err(self.error("unexpected trailing characters")) + } else { + Ok(value) + } + } + + fn parse_value(&mut self) -> Result { + self.skip_ws(); + + if self.consume("Some(") { + let value = self.parse_value()?; + self.skip_ws(); + self.expect(')')?; + return Ok(value); + } + + if self.consume("None") { + return Ok(ParsedValue::Null); + } + + if self.consume("true") { + return Ok(ParsedValue::Bool(true)); + } + + if self.consume("false") { + return Ok(ParsedValue::Bool(false)); + } + + match self.peek_char() { + Some('{') => return self.parse_object(None), + Some('[') => return self.parse_array(), + Some('"') => return self.parse_string(), + Some(ch) if ch == '-' || ch.is_ascii_digit() => return self.parse_number(), + Some(_) => {} + None => return Err(self.error("unexpected end of input")), + } + + let ident = self.parse_identifier()?; + self.skip_ws(); + match self.peek_char() { + Some('{') => self.parse_object(Some(ident)), + Some('(') => { + self.pos += 1; + let value = self.parse_value()?; + self.skip_ws(); + self.expect(')')?; + Ok(ParsedValue::Tagged { + tag: ident, + value: Box::new(value), + }) + } + _ => Ok(normalize_identifier_value(ident)), + } + } + + fn parse_object(&mut self, tag: Option) -> Result { + self.expect('{')?; + let mut map = BTreeMap::new(); + loop { + self.skip_ws(); + if self.peek_char() == Some('}') { + self.pos += 1; + break; + } + let key = normalize_field_key(self.parse_identifier()?); + self.skip_ws(); + self.expect(':')?; + let value = self.parse_value()?; + map.insert(key, value); + self.skip_ws(); + if self.peek_char() == Some(',') { + self.pos += 1; + } + } + + let object = ParsedValue::Object(map); + if let Some(tag) = tag { + ParsedValue::from_tagged(tag, object) + } else { + Ok(object) + } + } + + fn parse_array(&mut self) -> Result { + self.expect('[')?; + let mut values = Vec::new(); + loop { + self.skip_ws(); + if self.peek_char() == Some(']') { + self.pos += 1; + break; + } + let value = self.parse_value()?; + values.push(value); + self.skip_ws(); + if self.peek_char() == Some(',') { + self.pos += 1; + } + } + Ok(ParsedValue::Array(values)) + } + + fn parse_string(&mut self) -> Result { + self.expect('"')?; + let mut output = String::new(); + while let Some(ch) = self.next_char() { + match ch { + '"' => return Ok(ParsedValue::String(output)), + '\\' => { + let escaped = self + .next_char() + .ok_or_else(|| self.error("unterminated escape sequence"))?; + let translated = match escaped { + '"' => '"', + '\\' => '\\', + 'n' => '\n', + 'r' => '\r', + 't' => '\t', + other => other, + }; + output.push(translated); + } + other => output.push(other), + } + } + Err(self.error("unterminated string literal")) + } + + fn parse_number(&mut self) -> Result { + let start = self.pos; + if self.peek_char() == Some('-') { + self.pos += 1; + } + while matches!(self.peek_char(), Some(ch) if ch.is_ascii_digit()) { + self.pos += 1; + } + let number = std::str::from_utf8(&self.input[start..self.pos]) + .map_err(|_| self.error("invalid number encoding"))?; + Ok(ParsedValue::Number(number.to_string())) + } + + fn parse_identifier(&mut self) -> Result { + self.skip_ws(); + let start = self.pos; + while let Some(ch) = self.peek_char() { + if ch.is_alphanumeric() || ch == '_' || ch == '#' { + self.pos += 1; + } else { + break; + } + } + if self.pos == start { + Err(self.error("expected identifier")) + } else { + Ok(String::from_utf8_lossy(&self.input[start..self.pos]).to_string()) + } + } + + fn expect(&mut self, expected: char) -> Result<(), ParseError> { + self.skip_ws(); + match self.peek_char() { + Some(ch) if ch == expected => { + self.pos += 1; + Ok(()) + } + _ => Err(self.error(format!("expected '{}'", expected))), + } + } + + fn consume(&mut self, expected: &str) -> bool { + if self.input[self.pos..].starts_with(expected.as_bytes()) { + self.pos += expected.len(); + true + } else { + false + } + } + + fn peek_char(&self) -> Option { + self.input.get(self.pos).copied().map(|b| b as char) + } + + fn next_char(&mut self) -> Option { + let ch = self.peek_char()?; + self.pos += 1; + Some(ch) + } + + fn skip_ws(&mut self) { + while matches!(self.peek_char(), Some(ch) if ch.is_whitespace()) { + self.pos += 1; + } + } + + fn error>(&self, msg: T) -> ParseError { + ParseError { + message: msg.into(), + } + } +} + +#[derive(Debug, Clone)] +enum ParsedValue { + Null, + Bool(bool), + Number(String), + String(String), + Array(Vec), + Object(BTreeMap), + Tagged { + tag: String, + value: Box, + }, +} + +impl ParsedValue { + fn into_object(self) -> Result, Box> { + match self { + ParsedValue::Object(map) => Ok(map), + ParsedValue::Tagged { value, .. } => value.into_object(), + other => Err(format!("expected object but found {:?}", other).into()), + } + } + + fn to_string_value(&self) -> Result> { + match self { + ParsedValue::String(value) => Ok(value.clone()), + ParsedValue::Number(value) => Ok(value.clone()), + ParsedValue::Tagged { value, .. } => value.to_string_value(), + other => Err(format!("expected string but found {:?}", other).into()), + } + } + + fn from_tagged(tag: String, value: ParsedValue) -> Result { + match normalize_type_tag(&tag) { + "Timestamp" => Ok(ParsedValue::String(format_timestamp(value)?)), + "Duration" => Ok(ParsedValue::String(format_duration(value)?)), + "VoteExtension" => Ok(ParsedValue::Tagged { + tag, + value: Box::new(normalize_vote_extension(value)?), + }), + _ => Ok(ParsedValue::Tagged { + tag, + value: Box::new(value), + }), + } + } +} + +fn normalize_type_tag(tag: &str) -> &str { + tag.rsplit("::").next().unwrap_or(tag) +} + +fn normalize_identifier_value(token: String) -> ParsedValue { + let trimmed = token.trim(); + if let Some(suffix) = trimmed.strip_prefix("ConsensusVersion") { + if !suffix.is_empty() && suffix.chars().all(|ch| ch.is_ascii_digit()) { + return ParsedValue::Number(suffix.to_string()); + } + } + if is_enum_variant_identifier(trimmed) { + return ParsedValue::Tagged { + tag: "EnumVariant".to_string(), + value: Box::new(ParsedValue::String(trimmed.to_string())), + }; + } + + ParsedValue::String(trimmed.to_string()) +} + +fn is_enum_variant_identifier(value: &str) -> bool { + let mut chars = value.chars(); + let first = match chars.next() { + Some(ch) => ch, + None => return false, + }; + if !first.is_ascii_uppercase() { + return false; + } + chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') +} + +fn normalize_field_key(key: String) -> String { + key.strip_prefix("r#").unwrap_or(&key).to_string() +} + +fn normalize_vote_extension(value: ParsedValue) -> Result { + let mut map = value.into_object()?; + if let Some(field) = map.get_mut("type") { + convert_enum_variant(field, vote_extension_type_from_name)?; + } + Ok(ParsedValue::Object(map)) +} + +fn convert_enum_variant(field: &mut ParsedValue, resolver: F) -> Result<(), ParseError> +where + F: Fn(&str) -> Option, +{ + if let ParsedValue::Tagged { tag, value } = field { + if tag == "EnumVariant" { + let name = value + .to_string_value() + .map_err(|err| ParseError::new(err.to_string()))?; + if let Some(num) = resolver(&name) { + *field = ParsedValue::Number(num.to_string()); + } else { + *field = ParsedValue::String(name); + } + } + } + Ok(()) +} + +fn vote_extension_type_from_name(name: &str) -> Option { + use tenderdash_abci::proto::types::VoteExtensionType; + + let normalized = camel_to_upper_snake(name); + VoteExtensionType::from_str_name(normalized.as_str()).map(|value| value as i32) +} + +fn camel_to_upper_snake(value: &str) -> String { + let mut out = String::new(); + for (idx, ch) in value.chars().enumerate() { + if ch.is_ascii_uppercase() && idx != 0 { + out.push('_'); + } + out.push(ch.to_ascii_uppercase()); + } + out +} + +#[derive(Debug)] +struct ParseError { + message: String, +} + +impl fmt::Display for ParseError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.message) + } +} + +impl std::error::Error for ParseError {} + +impl ParseError { + fn new>(msg: T) -> Self { + Self { + message: msg.into(), + } + } +} + +impl From> for ParseError { + fn from(err: Box) -> Self { + ParseError::new(err.to_string()) + } +} + +struct TaggedEnumAccess { + tag: String, + value: ParsedValue, +} + +impl<'de> EnumAccess<'de> for TaggedEnumAccess { + type Error = value::Error; + type Variant = VariantDeserializer; + + fn variant_seed(self, seed: V) -> Result<(V::Value, Self::Variant), Self::Error> + where + V: DeserializeSeed<'de>, + { + let variant = seed.deserialize(self.tag.into_deserializer())?; + Ok((variant, VariantDeserializer { value: self.value })) + } +} + +struct VariantDeserializer { + value: ParsedValue, +} + +impl<'de> VariantAccess<'de> for VariantDeserializer { + type Error = value::Error; + + fn unit_variant(self) -> Result<(), Self::Error> { + IgnoredAny::deserialize(self.value.into_deserializer()).map(|_| ()) + } + + fn newtype_variant_seed(self, seed: T) -> Result + where + T: DeserializeSeed<'de>, + { + let normalized = match self.value { + ParsedValue::Tagged { tag, value: inner } if tag == "EnumVariant" => { + ParsedValue::String(inner.to_string_value().map_err(value::Error::custom)?) + } + other => other, + }; + seed.deserialize(normalized) + } + + fn tuple_variant(self, len: usize, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + Deserializer::deserialize_tuple(self.value, len, visitor) + } + + fn struct_variant( + self, + fields: &'static [&'static str], + visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + Deserializer::deserialize_struct(self.value, "", fields, visitor) + } +} + +impl<'de> IntoDeserializer<'de, value::Error> for ParsedValue { + type Deserializer = Self; + + fn into_deserializer(self) -> Self { + self + } +} + +impl<'de> Deserializer<'de> for ParsedValue { + type Error = value::Error; + + fn deserialize_any(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::Null => visitor.visit_unit(), + ParsedValue::Bool(value) => visitor.visit_bool(value), + ParsedValue::Number(value) => { + if let Ok(int) = value.parse::() { + visitor.visit_i64(int) + } else if let Ok(uint) = value.parse::() { + visitor.visit_u64(uint) + } else if let Ok(float) = value.parse::() { + visitor.visit_f64(float) + } else { + visitor.visit_string(value) + } + } + ParsedValue::String(value) => visitor.visit_string(value), + ParsedValue::Array(values) => { + let iter = values.into_iter().map(|v| v.into_deserializer()); + let mut seq = value::SeqDeserializer::new(iter); + let output = visitor.visit_seq(&mut seq)?; + seq.end()?; + Ok(output) + } + ParsedValue::Object(map) => { + let iter = map + .into_iter() + .map(|(k, v)| (k.into_deserializer(), v.into_deserializer())); + let mut map_deserializer = value::MapDeserializer::new(iter); + let output = visitor.visit_map(&mut map_deserializer)?; + map_deserializer.end()?; + Ok(output) + } + ParsedValue::Tagged { value, .. } => value.deserialize_any(visitor), + } + } + + fn deserialize_bool(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::Bool(value) => visitor.visit_bool(value), + ParsedValue::String(value) => match value.to_lowercase().as_str() { + "true" => visitor.visit_bool(true), + "false" => visitor.visit_bool(false), + _ => Err(DeError::custom("invalid boolean value")), + }, + ParsedValue::Tagged { value, .. } => value.deserialize_bool(visitor), + other => Err(DeError::custom(format!("expected bool got {:?}", other))), + } + } + + fn deserialize_i64(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::Number(value) | ParsedValue::String(value) => { + let parsed = value.parse::().map_err(DeError::custom)?; + visitor.visit_i64(parsed) + } + ParsedValue::Tagged { value, .. } => value.deserialize_i64(visitor), + other => Err(DeError::custom(format!("expected i64 got {:?}", other))), + } + } + + fn deserialize_u64(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::Number(value) | ParsedValue::String(value) => { + let parsed = value.parse::().map_err(DeError::custom)?; + visitor.visit_u64(parsed) + } + ParsedValue::Tagged { value, .. } => value.deserialize_u64(visitor), + other => Err(DeError::custom(format!("expected u64 got {:?}", other))), + } + } + + fn deserialize_f32(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::Number(value) | ParsedValue::String(value) => { + let parsed = value.parse::().map_err(DeError::custom)?; + visitor.visit_f32(parsed) + } + ParsedValue::Tagged { value, .. } => value.deserialize_f32(visitor), + other => Err(DeError::custom(format!("expected f32 got {:?}", other))), + } + } + + fn deserialize_f64(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::Number(value) | ParsedValue::String(value) => { + let parsed = value.parse::().map_err(DeError::custom)?; + visitor.visit_f64(parsed) + } + ParsedValue::Tagged { value, .. } => value.deserialize_f64(visitor), + other => Err(DeError::custom(format!("expected f64 got {:?}", other))), + } + } + + fn deserialize_string(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::String(value) => visitor.visit_string(value), + ParsedValue::Number(value) => visitor.visit_string(value), + ParsedValue::Bool(value) => visitor.visit_string(value.to_string()), + ParsedValue::Array(values) => { + let hex = array_to_hex(values)?; + visitor.visit_string(hex) + } + ParsedValue::Tagged { value, .. } => value.deserialize_string(visitor), + other => Err(DeError::custom(format!("expected string got {:?}", other))), + } + } + + fn deserialize_str(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_string(visitor) + } + + fn deserialize_option(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::Null => visitor.visit_none(), + ParsedValue::Tagged { value, .. } => value.deserialize_option(visitor), + other => visitor.visit_some(other.into_deserializer()), + } + } + + fn deserialize_seq(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::Array(values) => { + let iter = values.into_iter().map(|v| v.into_deserializer()); + let mut seq = value::SeqDeserializer::new(iter); + let output = visitor.visit_seq(&mut seq)?; + seq.end()?; + Ok(output) + } + ParsedValue::Tagged { value, .. } => value.deserialize_seq(visitor), + other => Err(DeError::custom(format!( + "expected sequence got {:?}", + other + ))), + } + } + + fn deserialize_map(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::Object(map) => { + let iter = map + .into_iter() + .map(|(k, v)| (k.into_deserializer(), v.into_deserializer())); + let mut map_deserializer = value::MapDeserializer::new(iter); + let output = visitor.visit_map(&mut map_deserializer)?; + map_deserializer.end()?; + Ok(output) + } + ParsedValue::Tagged { value, .. } => value.deserialize_map(visitor), + other => Err(DeError::custom(format!("expected map got {:?}", other))), + } + } + + fn deserialize_enum( + self, + _name: &'static str, + _variants: &'static [&'static str], + visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::String(tag) | ParsedValue::Number(tag) => { + visitor.visit_enum(TaggedEnumAccess { + tag, + value: ParsedValue::Null, + }) + } + ParsedValue::Tagged { tag, value } => { + visitor.visit_enum(TaggedEnumAccess { tag, value: *value }) + } + ParsedValue::Object(map) => { + if map.len() == 1 { + let (tag, value) = map.into_iter().next().unwrap(); + visitor.visit_enum(TaggedEnumAccess { tag, value }) + } else { + Err(value::Error::custom(format!( + "invalid enum representation {:?}", + map + ))) + } + } + other => Err(value::Error::custom(format!( + "invalid enum representation {:?}", + other + ))), + } + } + + fn deserialize_bytes(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::Array(values) => { + let mut buffer = Vec::with_capacity(values.len()); + for value in values { + let byte = match value { + ParsedValue::Number(s) | ParsedValue::String(s) => { + s.parse::().map_err(DeError::custom)? + } + other => { + return Err(DeError::custom(format!("expected byte got {:?}", other))); + } + }; + buffer.push(byte); + } + visitor.visit_byte_buf(buffer) + } + ParsedValue::String(value) => visitor.visit_byte_buf(value.into_bytes()), + ParsedValue::Tagged { value, .. } => value.deserialize_bytes(visitor), + other => Err(DeError::custom(format!("expected bytes got {:?}", other))), + } + } + + fn deserialize_byte_buf(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_bytes(visitor) + } + + fn deserialize_identifier(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_string(visitor) + } + + fn deserialize_ignored_any(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + visitor.visit_unit() + } + + fn deserialize_unit(self, visitor: V) -> Result + where + V: serde::de::Visitor<'de>, + { + match self { + ParsedValue::Null => visitor.visit_unit(), + ParsedValue::Tagged { value, .. } => value.deserialize_unit(visitor), + other => Err(DeError::custom(format!("expected unit got {:?}", other))), + } + } + + fn deserialize_struct( + self, + _name: &'static str, + _fields: &'static [&'static str], + visitor: V, + ) -> Result + where + V: serde::de::Visitor<'de>, + { + self.deserialize_map(visitor) + } + + forward_to_deserialize_any! { + char i8 i16 i32 i128 u8 u16 u32 u128 + unit_struct newtype_struct tuple tuple_struct + } +} + +fn format_timestamp(value: ParsedValue) -> Result { + let (seconds, nanos) = parse_seconds_nanos(value, "Timestamp")?; + if !(-999_999_999..=999_999_999).contains(&nanos) { + return Err(ParseError::new("timestamp nanos out of range")); + } + let total = seconds + .checked_mul(1_000_000_000) + .and_then(|base| base.checked_add(nanos)) + .ok_or_else(|| ParseError::new("timestamp overflow"))?; + let datetime = OffsetDateTime::from_unix_timestamp_nanos(total) + .map_err(|_| ParseError::new("invalid timestamp"))?; + Ok(to_rfc3339_nanos(datetime)) +} + +fn format_duration(value: ParsedValue) -> Result { + let (seconds, nanos) = parse_seconds_nanos(value, "Duration")?; + if seconds < 0 || nanos < 0 { + return Err(ParseError::new("duration cannot be negative")); + } + if nanos >= 1_000_000_000 { + return Err(ParseError::new("duration nanos out of range")); + } + let total = seconds + .checked_mul(1_000_000_000) + .and_then(|base| base.checked_add(nanos)) + .ok_or_else(|| ParseError::new("duration overflow"))?; + Ok(total.to_string()) +} + +fn parse_seconds_nanos(value: ParsedValue, kind: &str) -> Result<(i128, i128), ParseError> { + let mut map = match value { + ParsedValue::Object(map) => map, + ParsedValue::Tagged { value, .. } => return parse_seconds_nanos(*value, kind), + other => { + return Err(ParseError::new(format!( + "expected {} object, got {:?}", + kind, other + ))); + } + }; + let seconds = parse_number_field(map.remove("seconds"), "seconds")?; + let nanos = parse_number_field(map.remove("nanos"), "nanos")?; + Ok((seconds, nanos)) +} + +fn parse_number_field(value: Option, field: &str) -> Result { + match value { + Some(value) => parse_number_value(value, field), + None => Ok(0), + } +} + +fn parse_number_value(value: ParsedValue, field: &str) -> Result { + match value { + ParsedValue::Number(num) | ParsedValue::String(num) => num + .parse::() + .map_err(|_| ParseError::new(format!("invalid number for {}", field))), + ParsedValue::Bool(value) => Ok(if value { 1 } else { 0 }), + ParsedValue::Tagged { value, .. } => parse_number_value(*value, field), + ParsedValue::Object(map) => Err(ParseError::new(format!( + "expected numeric value for {} but got {:?}", + field, + ParsedValue::Object(map) + ))), + other => Err(ParseError::new(format!( + "expected numeric value for {} but got {:?}", + field, other + ))), + } +} + +fn array_to_hex(values: Vec) -> Result { + let mut bytes = Vec::with_capacity(values.len()); + for value in values { + let byte = match value { + ParsedValue::Number(num) | ParsedValue::String(num) => { + let parsed = num.parse::().map_err(value::Error::custom)?; + if !(0..=255).contains(&parsed) { + return Err(value::Error::custom("byte out of range")); + } + parsed as u8 + } + other => { + return Err(value::Error::custom(format!( + "expected byte got {:?}", + other + ))); + } + }; + bytes.push(byte); + } + Ok(hex_encode(bytes)) +} +fn deserialize_to_loaded( + value: ParsedValue, + wrap: F, +) -> Result> +where + T: DeserializeOwned, + F: FnOnce(T) -> LoadedRequest, +{ + let request = T::deserialize(value)?; + Ok(wrap(request)) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn parse_from_log(raw: &str) -> LoadedRequest { + parse_logged_request(raw) + .expect("log line should parse") + .expect("request should not be filtered out") + } + + #[test] + fn parses_info_requests_from_testnet_log() { + let request_one = parse_from_log(TESTNET_INFO_ONE); + assert!(matches!(request_one, LoadedRequest::Info(_))); + } + + #[test] + fn parses_init_chain_from_testnet_log() { + let request = parse_from_log(TESTNET_INIT_CHAIN); + assert!(matches!(request, LoadedRequest::InitChain(_))); + } + + #[test] + fn parses_process_proposal_samples_from_testnet_log() { + let request_one = parse_from_log(TESTNET_PROCESS_PROPOSAL_ONE); + let request_two = parse_from_log(TESTNET_PROCESS_PROPOSAL_TWO); + assert!(matches!(request_one, LoadedRequest::Process(_))); + assert!(matches!(request_two, LoadedRequest::Process(_))); + } + + #[test] + fn parses_finalize_block_samples_from_testnet_log() { + let request_one = parse_from_log(TESTNET_FINALIZE_BLOCK_ONE); + let request_two = parse_from_log(TESTNET_FINALIZE_BLOCK_TWO); + assert!(matches!(request_one, LoadedRequest::Finalize(_))); + assert!(matches!(request_two, LoadedRequest::Finalize(_))); + } + + const TESTNET_INFO_ONE: &str = r#"Request { value: Some(Info(RequestInfo { version: "1.5.1", block_version: 14, p2p_version: 10, abci_version: "1.3.0" })) }"#; + + const TESTNET_INIT_CHAIN: &str = r#"Request { value: Some(InitChain(RequestInitChain { time: Some(Timestamp { seconds: 1764074708, nanos: 919000000 }), chain_id: "dash-testnet-51", consensus_params: Some(ConsensusParams { block: Some(BlockParams { max_bytes: 2097152, max_gas: 57631392000 }), evidence: Some(EvidenceParams { max_age_num_blocks: 100000, max_age_duration: Some(Duration { seconds: 172800, nanos: 0 }), max_bytes: 512000 }), validator: Some(ValidatorParams { pub_key_types: ["bls12381"], voting_power_threshold: None }), version: Some(VersionParams { app_version: 1, consensus_version: ConsensusVersion0 }), synchrony: Some(SynchronyParams { message_delay: Some(Duration { seconds: 32, nanos: 0 }), precision: Some(Duration { seconds: 0, nanos: 500000000 }) }), timeout: Some(TimeoutParams { propose: Some(Duration { seconds: 30, nanos: 0 }), propose_delta: Some(Duration { seconds: 1, nanos: 0 }), vote: Some(Duration { seconds: 2, nanos: 0 }), vote_delta: Some(Duration { seconds: 0, nanos: 500000000 }) }), abci: Some(AbciParams { recheck_tx: true }) }), validator_set: None, app_state_bytes: [], initial_height: 1, initial_core_height: 0 })) }"#; + + const TESTNET_PROCESS_PROPOSAL_ONE: &str = r#"Request { value: Some(ProcessProposal(RequestProcessProposal { txs: [], proposed_last_commit: Some(CommitInfo { round: 0, quorum_hash: [], block_signature: [], threshold_vote_extensions: [] }), misbehavior: [], hash: [8, 250, 2, 194, 126, 192, 57, 11, 163, 1, 228, 252, 126, 61, 126, 173, 179, 80, 200, 25, 62, 62, 98, 160, 147, 104, 151, 6, 227, 162, 11, 250], height: 1, round: 0, time: Some(Timestamp { seconds: 1721353209, nanos: 0 }), next_validators_hash: [67, 141, 67, 156, 14, 170, 45, 148, 50, 130, 83, 98, 80, 9, 168, 59, 20, 74, 245, 208, 222, 120, 248, 14, 0, 20, 181, 247, 167, 138, 75, 141], core_chain_locked_height: 1090319, core_chain_lock_update: Some(CoreChainLock { core_block_height: 1090319, core_block_hash: [188, 208, 158, 250, 148, 183, 195, 57, 139, 20, 13, 219, 224, 167, 123, 230, 49, 159, 25, 154, 160, 12, 51, 34, 68, 102, 209, 84, 48, 1, 0, 0], signature: [149, 121, 58, 158, 78, 248, 78, 170, 243, 117, 235, 255, 251, 46, 2, 74, 179, 31, 158, 246, 193, 93, 23, 193, 136, 122, 196, 15, 108, 247, 172, 6, 169, 101, 215, 149, 150, 32, 35, 97, 252, 255, 134, 112, 170, 172, 13, 222, 3, 228, 106, 52, 44, 158, 242, 165, 19, 40, 253, 34, 104, 130, 246, 229, 84, 55, 221, 223, 40, 152, 72, 43, 159, 46, 78, 5, 1, 158, 149, 145, 239, 220, 27, 196, 104, 176, 16, 54, 90, 38, 68, 184, 43, 57, 17, 245] }), proposer_pro_tx_hash: [5, 182, 135, 151, 131, 68, 250, 36, 51, 178, 170, 153, 212, 31, 100, 62, 45, 133, 129, 167, 137, 205, 194, 48, 132, 136, 156, 236, 165, 36, 78, 168], proposed_app_version: 1, version: Some(Consensus { block: 14, app: 1 }), quorum_hash: [0, 0, 0, 175, 88, 108, 85, 57, 26, 90, 172, 207, 12, 99, 142, 240, 100, 211, 71, 145, 227, 27, 243, 154, 158, 173, 160, 199, 112, 203, 8, 120] })) }"#; + + const TESTNET_PROCESS_PROPOSAL_TWO: &str = r#" + Request { value: Some(ProcessProposal(RequestProcessProposal { + txs: [ + [2, 0, 45, 78, 164, 90, 178, 144, 237, 162, 36, 16, 179, 136, 18, 243, 44, 66, + 117, 63, 224, 169, 203, 74, 197, 21, 20, 112, 113, 175, 43, 58, 235, 155, 1, 0, + 0, 0, 243, 207, 138, 132, 145, 226, 98, 94, 28, 101, 1, 87, 179, 136, 251, 237, + 65, 245, 65, 210, 208, 250, 92, 95, 42, 14, 220, 187, 116, 60, 109, 126, 5, 14, + 99, 111, 110, 116, 97, 99, 116, 82, 101, 113, 117, 101, 115, 116, 162, 161, 180, + 172, 111, 239, 34, 234, 42, 26, 104, 232, 18, 54, 68, 179, 87, 135, 95, 107, 65, + 44, 24, 16, 146, 129, 193, 70, 231, 178, 113, 188, 218, 18, 242, 101, 39, 134, + 235, 177, 130, 11, 124, 221, 128, 91, 176, 191, 225, 147, 222, 242, 151, 236, + 236, 119, 26, 9, 32, 3, 155, 51, 167, 142, 6, 16, 97, 99, 99, 111, 117, 110, 116, + 82, 101, 102, 101, 114, 101, 110, 99, 101, 3, 252, 27, 90, 108, 236, 21, 101, 110, + 99, 114, 121, 112, 116, 101, 100, 65, 99, 99, 111, 117, 110, 116, 76, 97, 98, 101, + 108, 10, 48, 248, 254, 160, 126, 247, 196, 142, 207, 10, 98, 88, 135, 178, 200, + 37, 183, 104, 217, 69, 81, 239, 30, 184, 149, 165, 180, 206, 62, 82, 172, 165, + 148, 122, 215, 182, 102, 159, 96, 57, 23, 138, 78, 41, 202, 192, 88, 118, 155, + 18, 101, 110, 99, 114, 121, 112, 116, 101, 100, 80, 117, 98, 108, 105, 99, 75, 101, + 121, 10, 96, 119, 70, 51, 109, 72, 189, 126, 141, 103, 114, 93, 25, 171, 156, 67, + 231, 69, 47, 111, 71, 201, 112, 14, 39, 163, 85, 103, 129, 55, 72, 255, 60, 11, 84, + 118, 32, 32, 59, 37, 45, 157, 214, 105, 54, 239, 69, 2, 113, 133, 8, 59, 41, 161, + 75, 22, 61, 13, 211, 49, 222, 72, 61, 1, 175, 251, 138, 74, 218, 228, 202, 199, 1, + 212, 219, 106, 81, 12, 123, 204, 181, 33, 215, 246, 216, 4, 168, 10, 128, 39, 13, + 98, 209, 118, 32, 60, 184, 17, 114, 101, 99, 105, 112, 105, 101, 110, 116, 75, 101, + 121, 73, 110, 100, 101, 120, 3, 0, 14, 115, 101, 110, 100, 101, 114, 75, 101, 121, + 73, 110, 100, 101, 120, 3, 2, 8, 116, 111, 85, 115, 101, 114, 73, 100, 16, 97, 76, + 52, 201, 139, 195, 240, 166, 24, 149, 31, 14, 97, 49, 5, 152, 160, 173, 142, 194, + 37, 0, 127, 31, 0, 31, 221, 247, 231, 178, 146, 240, 0, 0, 1, 65, 32, 27, 157, 91, + 5, 79, 181, 46, 205, 236, 51, 129, 157, 4, 6, 188, 236, 188, 172, 164, 231, 164, + 44, 201, 66, 104, 173, 34, 228, 80, 60, 212, 197, 5, 47, 41, 78, 154, 99, 29, 118, + 227, 86, 165, 172, 247, 43, 246, 245, 194, 84, 41, 198, 99, 130, 36, 116, 66, 187, + 56, 25, 160, 87, 213, 138], + [2, 0, 45, 78, 164, 90, 178, 144, 237, 162, 36, 16, 179, 136, 18, 243, 44, 66, + 117, 63, 224, 169, 203, 74, 197, 21, 20, 112, 113, 175, 43, 58, 235, 155, 1, 0, + 0, 0, 23, 72, 90, 243, 202, 126, 196, 210, 19, 54, 234, 23, 177, 178, 215, 167, + 129, 47, 66, 193, 218, 34, 147, 66, 196, 207, 97, 224, 245, 71, 183, 9, 6, 14, 99, + 111, 110, 116, 97, 99, 116, 82, 101, 113, 117, 101, 115, 116, 162, 161, 180, 172, + 111, 239, 34, 234, 42, 26, 104, 232, 18, 54, 68, 179, 87, 135, 95, 107, 65, 44, + 24, 16, 146, 129, 193, 70, 231, 178, 113, 188, 112, 237, 137, 163, 235, 139, 117, + 158, 116, 242, 14, 29, 130, 74, 80, 169, 171, 187, 92, 209, 189, 71, 123, 134, 23, + 239, 26, 77, 73, 181, 204, 96, 6, 16, 97, 99, 99, 111, 117, 110, 116, 82, 101, 102, + 101, 114, 101, 110, 99, 101, 3, 252, 8, 49, 150, 10, 21, 101, 110, 99, 114, 121, + 112, 116, 101, 100, 65, 99, 99, 111, 117, 110, 116, 76, 97, 98, 101, 108, 10, 48, + 164, 30, 75, 218, 213, 182, 166, 107, 190, 19, 180, 130, 228, 190, 22, 103, 105, + 179, 192, 237, 213, 2, 126, 237, 158, 129, 98, 101, 236, 82, 40, 147, 165, 129, + 70, 0, 29, 52, 143, 160, 58, 10, 236, 15, 56, 144, 224, 186, 18, 101, 110, 99, 114, + 121, 112, 116, 101, 100, 80, 117, 98, 108, 105, 99, 75, 101, 121, 10, 96, 215, 58, + 88, 42, 120, 211, 175, 41, 107, 102, 20, 249, 7, 50, 252, 46, 247, 204, 180, 39, + 25, 7, 21, 243, 52, 77, 95, 48, 207, 57, 229, 118, 30, 133, 144, 91, 233, 3, 49, + 164, 44, 194, 138, 30, 156, 11, 130, 80, 67, 117, 154, 30, 173, 134, 89, 38, 96, 2, + 198, 229, 126, 157, 193, 7, 88, 16, 40, 3, 55, 187, 56, 168, 63, 160, 1, 199, 22, + 54, 254, 6, 57, 104, 83, 86, 253, 16, 68, 66, 59, 111, 88, 37, 30, 49, 169, 180, + 17, 114, 101, 99, 105, 112, 105, 101, 110, 116, 75, 101, 121, 73, 110, 100, 101, + 120, 3, 0, 14, 115, 101, 110, 100, 101, 114, 75, 101, 121, 73, 110, 100, 101, 120, + 3, 2, 8, 116, 111, 85, 115, 101, 114, 73, 100, 16, 196, 168, 68, 208, 100, 228, + 139, 149, 204, 118, 244, 251, 67, 150, 233, 2, 230, 196, 230, 251, 123, 37, 91, + 103, 36, 82, 228, 68, 124, 146, 62, 106, 0, 0, 1, 65, 31, 179, 111, 38, 201, 227, + 221, 204, 150, 217, 184, 31, 64, 226, 82, 115, 119, 176, 0, 222, 67, 184, 189, 4, + 188, 81, 57, 153, 140, 156, 32, 175, 129, 38, 61, 58, 107, 95, 142, 225, 152, 199, + 186, 84, 188, 96, 9, 117, 65, 129, 190, 57, 24, 19, 206, 160, 123, 156, 141, 244, + 247, 131, 25, 195, 240], + ], + proposed_last_commit: Some(CommitInfo { + round: 0, + quorum_hash: [0, 0, 1, 140, 150, 8, 55, 78, 73, 90, 22, 124, 90, 202, 73, 150, 16, + 239, 91, 176, 76, 16, 181, 122, 236, 22, 197, 148, 202, 119, 77, 210], + block_signature: [167, 217, 185, 35, 134, 188, 204, 114, 135, 31, 147, 221, 25, 172, + 21, 164, 212, 143, 70, 44, 40, 154, 69, 16, 86, 203, 154, 15, 178, + 35, 99, 179, 48, 33, 97, 122, 88, 145, 128, 177, 217, 39, 247, 107, + 67, 76, 68, 229, 8, 94, 197, 196, 13, 137, 165, 213, 181, 9, 147, + 68, 147, 173, 38, 105, 250, 176, 184, 88, 221, 229, 33, 180, 241, + 186, 127, 206, 105, 27, 124, 46, 191, 61, 110, 60, 237, 9, 83, 133, + 218, 103, 111, 218, 252, 150, 64, 84], + threshold_vote_extensions: [], + }), + misbehavior: [], + hash: [103, 112, 201, 57, 9, 25, 69, 8, 44, 216, 190, 152, 108, 87, 2, 168, 213, 242, + 101, 49, 254, 70, 159, 120, 184, 44, 135, 82, 71, 117, 194, 8], + height: 1763, + round: 0, + time: Some(Timestamp { seconds: 1724858882, nanos: 478000000 }), + next_validators_hash: [29, 218, 86, 252, 12, 33, 231, 178, 231, 204, 7, 223, 50, 81, 89, + 121, 242, 146, 120, 127, 206, 60, 181, 255, 207, 242, 41, 32, 40, + 153, 108, 65], + core_chain_locked_height: 1092300, + core_chain_lock_update: None, + proposer_pro_tx_hash: [136, 37, 27, 212, 177, 36, 239, 235, 135, 83, 125, 234, 190, 236, + 84, 246, 200, 245, 117, 244, 223, 129, 241, 12, 245, 232, 238, + 160, 115, 9, 43, 111], + proposed_app_version: 1, + version: Some(Consensus { block: 14, app: 1 }), + quorum_hash: [0, 0, 1, 140, 150, 8, 55, 78, 73, 90, 22, 124, 90, 202, 73, 150, 16, 239, + 91, 176, 76, 16, 181, 122, 236, 22, 197, 148, 202, 119, 77, 210], + })) } + "#; + + const TESTNET_FINALIZE_BLOCK_ONE: &str = r#"Request { value: Some(FinalizeBlock(RequestFinalizeBlock { commit: Some(CommitInfo { round: 0, quorum_hash: [0, 0, 0, 175, 88, 108, 85, 57, 26, 90, 172, 207, 12, 99, 142, 240, 100, 211, 71, 145, 227, 27, 243, 154, 158, 173, 160, 199, 112, 203, 8, 120], block_signature: [135, 31, 49, 75, 118, 108, 104, 68, 227, 56, 171, 253, 41, 152, 98, 72, 166, 144, 178, 146, 18, 67, 56, 20, 130, 110, 158, 62, 6, 77, 138, 57, 113, 138, 206, 99, 241, 112, 245, 11, 237, 77, 36, 214, 24, 174, 166, 1, 21, 165, 233, 228, 11, 201, 201, 241, 5, 23, 224, 16, 178, 200, 142, 7, 105, 222, 229, 179, 82, 115, 216, 243, 16, 48, 204, 162, 100, 154, 25, 120, 1, 85, 170, 219, 228, 3, 170, 60, 81, 141, 68, 116, 22, 7, 239, 249], threshold_vote_extensions: [] }), misbehavior: [], hash: [8, 250, 2, 194, 126, 192, 57, 11, 163, 1, 228, 252, 126, 61, 126, 173, 179, 80, 200, 25, 62, 62, 98, 160, 147, 104, 151, 6, 227, 162, 11, 250], height: 1, round: 0, block: Some(Block { header: Some(Header { version: Some(Consensus { block: 14, app: 1 }), chain_id: "dash-testnet-51", height: 1, time: Some(Timestamp { seconds: 1721353209, nanos: 0 }), last_block_id: Some(BlockId { hash: [], part_set_header: Some(PartSetHeader { total: 0, hash: [] }), state_id: [] }), last_commit_hash: [227, 176, 196, 66, 152, 252, 28, 20, 154, 251, 244, 200, 153, 111, 185, 36, 39, 174, 65, 228, 100, 155, 147, 76, 164, 149, 153, 27, 120, 82, 184, 85], data_hash: [227, 176, 196, 66, 152, 252, 28, 20, 154, 251, 244, 200, 153, 111, 185, 36, 39, 174, 65, 228, 100, 155, 147, 76, 164, 149, 153, 27, 120, 82, 184, 85], validators_hash: [151, 116, 141, 147, 83, 105, 231, 87, 207, 88, 182, 176, 208, 225, 89, 251, 214, 179, 23, 205, 112, 160, 49, 250, 160, 132, 171, 97, 31, 189, 74, 77], next_validators_hash: [67, 141, 67, 156, 14, 170, 45, 148, 50, 130, 83, 98, 80, 9, 168, 59, 20, 74, 245, 208, 222, 120, 248, 14, 0, 20, 181, 247, 167, 138, 75, 141], consensus_hash: [180, 208, 169, 232, 73, 81, 202, 221, 119, 226, 150, 68, 128, 3, 115, 113, 118, 178, 89, 72, 173, 246, 104, 172, 110, 192, 105, 38, 200, 31, 172, 134], next_consensus_hash: [180, 208, 169, 232, 73, 81, 202, 221, 119, 226, 150, 68, 128, 3, 115, 113, 118, 178, 89, 72, 173, 246, 104, 172, 110, 192, 105, 38, 200, 31, 172, 134], app_hash: [191, 12, 203, 156, 160, 113, 186, 1, 174, 110, 103, 160, 192, 144, 249, 120, 3, 210, 109, 86, 214, 117, 220, 213, 19, 23, 129, 203, 202, 200, 236, 143], results_hash: [227, 176, 196, 66, 152, 252, 28, 20, 154, 251, 244, 200, 153, 111, 185, 36, 39, 174, 65, 228, 100, 155, 147, 76, 164, 149, 153, 27, 120, 82, 184, 85], evidence_hash: [227, 176, 196, 66, 152, 252, 28, 20, 154, 251, 244, 200, 153, 111, 185, 36, 39, 174, 65, 228, 100, 155, 147, 76, 164, 149, 153, 27, 120, 82, 184, 85], proposed_app_version: 1, proposer_pro_tx_hash: [5, 182, 135, 151, 131, 68, 250, 36, 51, 178, 170, 153, 212, 31, 100, 62, 45, 133, 129, 167, 137, 205, 194, 48, 132, 136, 156, 236, 165, 36, 78, 168], core_chain_locked_height: 1090319 }), data: Some(Data { txs: [] }), evidence: Some(EvidenceList { evidence: [] }), last_commit: Some(Commit { height: 0, round: 0, block_id: Some(BlockId { hash: [], part_set_header: Some(PartSetHeader { total: 0, hash: [] }), state_id: [] }), quorum_hash: [], threshold_block_signature: [], threshold_vote_extensions: [] }), core_chain_lock: Some(CoreChainLock { core_block_height: 1090319, core_block_hash: [188, 208, 158, 250, 148, 183, 195, 57, 139, 20, 13, 219, 224, 167, 123, 230, 49, 159, 25, 154, 160, 12, 51, 34, 68, 102, 209, 84, 48, 1, 0, 0], signature: [149, 121, 58, 158, 78, 248, 78, 170, 243, 117, 235, 255, 251, 46, 2, 74, 179, 31, 158, 246, 193, 93, 23, 193, 136, 122, 196, 15, 108, 247, 172, 6, 169, 101, 215, 149, 150, 32, 35, 97, 252, 255, 134, 112, 170, 172, 13, 222, 3, 228, 106, 52, 44, 158, 242, 165, 19, 40, 253, 34, 104, 130, 246, 229, 84, 55, 221, 223, 40, 152, 72, 43, 159, 46, 78, 5, 1, 158, 149, 145, 239, 220, 27, 196, 104, 176, 16, 54, 90, 38, 68, 184, 43, 57, 17, 245] }) }), block_id: Some(BlockId { hash: [8, 250, 2, 194, 126, 192, 57, 11, 163, 1, 228, 252, 126, 61, 126, 173, 179, 80, 200, 25, 62, 62, 98, 160, 147, 104, 151, 6, 227, 162, 11, 250], part_set_header: Some(PartSetHeader { total: 1, hash: [67, 44, 224, 208, 150, 55, 48, 6, 49, 94, 42, 158, 112, 26, 240, 103, 185, 51, 202, 165, 106, 78, 124, 206, 99, 140, 58, 172, 22, 209, 227, 68] }), state_id: [209, 145, 66, 195, 68, 86, 204, 201, 247, 175, 60, 157, 38, 207, 138, 107, 56, 77, 204, 254, 146, 83, 246, 110, 152, 76, 39, 22, 153, 243, 181, 131] }) })) }"#; + + const TESTNET_FINALIZE_BLOCK_TWO: &str = r#"Request { value: Some(FinalizeBlock(RequestFinalizeBlock { commit: Some(CommitInfo { round: 1, quorum_hash: [0, 0, 0, 217, 55, 192, 219, 26, 45, 97, 224, 140, 152, 79, 68, 251, 208, 148, 3, 190, 152, 206, 230, 126, 107, 37, 117, 76, 217, 104, 133, 231], block_signature: [151, 132, 224, 84, 5, 251, 44, 38, 191, 110, 146, 82, 255, 249, 7, 150, 50, 168, 38, 237, 69, 227, 135, 55, 144, 47, 4, 155, 3, 138, 102, 99, 154, 133, 217, 187, 169, 154, 174, 115, 139, 173, 105, 25, 62, 73, 129, 87, 7, 28, 172, 121, 137, 254, 243, 31, 71, 185, 143, 151, 213, 225, 44, 199, 70, 25, 157, 230, 75, 62, 81, 254, 217, 86, 171, 220, 120, 188, 69, 24, 109, 222, 98, 224, 102, 201, 145, 82, 53, 13, 228, 150, 0, 137, 116, 16], threshold_vote_extensions: [] }), misbehavior: [], hash: [98, 147, 39, 112, 110, 207, 177, 37, 173, 61, 20, 160, 95, 228, 179, 90, 184, 83, 202, 86, 64, 254, 53, 133, 57, 32, 50, 21, 253, 32, 16, 172], height: 2, round: 1, block: Some(Block { header: Some(Header { version: Some(Consensus { block: 14, app: 1 }), chain_id: "dash-testnet-51", height: 2, time: Some(Timestamp { seconds: 1724575888, nanos: 328000000 }), last_block_id: Some(BlockId { hash: [8, 250, 2, 194, 126, 192, 57, 11, 163, 1, 228, 252, 126, 61, 126, 173, 179, 80, 200, 25, 62, 62, 98, 160, 147, 104, 151, 6, 227, 162, 11, 250], part_set_header: Some(PartSetHeader { total: 1, hash: [67, 44, 224, 208, 150, 55, 48, 6, 49, 94, 42, 158, 112, 26, 240, 103, 185, 51, 202, 165, 106, 78, 124, 206, 99, 140, 58, 172, 22, 209, 227, 68] }), state_id: [209, 145, 66, 195, 68, 86, 204, 201, 247, 175, 60, 157, 38, 207, 138, 107, 56, 77, 204, 254, 146, 83, 246, 110, 152, 76, 39, 22, 153, 243, 181, 131] }), last_commit_hash: [125, 172, 60, 185, 43, 201, 215, 188, 72, 11, 207, 101, 109, 98, 7, 127, 216, 155, 4, 101, 230, 18, 156, 15, 99, 93, 122, 164, 178, 27, 60, 194], data_hash: [227, 176, 196, 66, 152, 252, 28, 20, 154, 251, 244, 200, 153, 111, 185, 36, 39, 174, 65, 228, 100, 155, 147, 76, 164, 149, 153, 27, 120, 82, 184, 85], validators_hash: [67, 141, 67, 156, 14, 170, 45, 148, 50, 130, 83, 98, 80, 9, 168, 59, 20, 74, 245, 208, 222, 120, 248, 14, 0, 20, 181, 247, 167, 138, 75, 141], next_validators_hash: [67, 141, 67, 156, 14, 170, 45, 148, 50, 130, 83, 98, 80, 9, 168, 59, 20, 74, 245, 208, 222, 120, 248, 14, 0, 20, 181, 247, 167, 138, 75, 141], consensus_hash: [180, 208, 169, 232, 73, 81, 202, 221, 119, 226, 150, 68, 128, 3, 115, 113, 118, 178, 89, 72, 173, 246, 104, 172, 110, 192, 105, 38, 200, 31, 172, 134], next_consensus_hash: [180, 208, 169, 232, 73, 81, 202, 221, 119, 226, 150, 68, 128, 3, 115, 113, 118, 178, 89, 72, 173, 246, 104, 172, 110, 192, 105, 38, 200, 31, 172, 134], app_hash: [77, 189, 142, 170, 172, 111, 216, 169, 207, 157, 71, 134, 71, 149, 189, 31, 53, 102, 178, 80, 54, 238, 37, 128, 18, 176, 16, 200, 2, 126, 109, 200], results_hash: [227, 176, 196, 66, 152, 252, 28, 20, 154, 251, 244, 200, 153, 111, 185, 36, 39, 174, 65, 228, 100, 155, 147, 76, 164, 149, 153, 27, 120, 82, 184, 85], evidence_hash: [227, 176, 196, 66, 152, 252, 28, 20, 154, 251, 244, 200, 153, 111, 185, 36, 39, 174, 65, 228, 100, 155, 147, 76, 164, 149, 153, 27, 120, 82, 184, 85], proposed_app_version: 1, proposer_pro_tx_hash: [20, 61, 205, 106, 107, 118, 132, 253, 224, 30, 136, 161, 14, 93, 101, 222, 154, 41, 36, 76, 94, 205, 88, 109, 20, 163, 66, 101, 112, 37, 241, 19], core_chain_locked_height: 1090320 }), data: Some(Data { txs: [] }), evidence: Some(EvidenceList { evidence: [] }), last_commit: Some(Commit { height: 1, round: 0, block_id: Some(BlockId { hash: [8, 250, 2, 194, 126, 192, 57, 11, 163, 1, 228, 252, 126, 61, 126, 173, 179, 80, 200, 25, 62, 62, 98, 160, 147, 104, 151, 6, 227, 162, 11, 250], part_set_header: Some(PartSetHeader { total: 1, hash: [67, 44, 224, 208, 150, 55, 48, 6, 49, 94, 42, 158, 112, 26, 240, 103, 185, 51, 202, 165, 106, 78, 124, 206, 99, 140, 58, 172, 22, 209, 227, 68] }), state_id: [209, 145, 66, 195, 68, 86, 204, 201, 247, 175, 60, 157, 38, 207, 138, 107, 56, 77, 204, 254, 146, 83, 246, 110, 152, 76, 39, 22, 153, 243, 181, 131] }), quorum_hash: [0, 0, 0, 175, 88, 108, 85, 57, 26, 90, 172, 207, 12, 99, 142, 240, 100, 211, 71, 145, 227, 27, 243, 154, 158, 173, 160, 199, 112, 203, 8, 120], threshold_block_signature: [135, 31, 49, 75, 118, 108, 104, 68, 227, 56, 171, 253, 41, 152, 98, 72, 166, 144, 178, 146, 18, 67, 56, 20, 130, 110, 158, 62, 6, 77, 138, 57, 113, 138, 206, 99, 241, 112, 245, 11, 237, 77, 36, 214, 24, 174, 166, 1, 21, 165, 233, 228, 11, 201, 201, 241, 5, 23, 224, 16, 178, 200, 142, 7, 105, 222, 229, 179, 82, 115, 216, 243, 16, 48, 204, 162, 100, 154, 25, 120, 1, 85, 170, 219, 228, 3, 170, 60, 81, 141, 68, 116, 22, 7, 239, 249], threshold_vote_extensions: [] }), core_chain_lock: Some(CoreChainLock { core_block_height: 1090320, core_block_hash: [26, 216, 69, 45, 191, 148, 30, 6, 168, 162, 186, 137, 78, 18, 75, 156, 96, 37, 84, 103, 102, 16, 247, 253, 135, 49, 45, 177, 223, 0, 0, 0], signature: [180, 194, 224, 93, 153, 74, 203, 51, 184, 6, 210, 221, 105, 118, 214, 63, 153, 251, 150, 114, 108, 127, 204, 60, 218, 225, 67, 219, 1, 243, 242, 197, 83, 108, 245, 71, 67, 13, 18, 17, 65, 133, 148, 4, 209, 211, 188, 233, 24, 183, 215, 105, 227, 42, 217, 128, 60, 80, 202, 225, 220, 209, 240, 9, 200, 195, 48, 13, 154, 13, 142, 216, 112, 83, 180, 84, 254, 238, 233, 201, 141, 58, 116, 171, 125, 110, 118, 14, 225, 89, 66, 58, 3, 9, 42, 179] }) }), block_id: Some(BlockId { hash: [98, 147, 39, 112, 110, 207, 177, 37, 173, 61, 20, 160, 95, 228, 179, 90, 184, 83, 202, 86, 64, 254, 53, 133, 57, 32, 50, 21, 253, 32, 16, 172], part_set_header: Some(PartSetHeader { total: 1, hash: [200, 235, 186, 216, 169, 238, 109, 8, 187, 66, 3, 161, 186, 230, 247, 3, 129, 181, 218, 148, 158, 153, 68, 17, 111, 14, 75, 230, 202, 104, 255, 47] }), state_id: [45, 73, 190, 78, 51, 91, 154, 204, 110, 181, 196, 96, 54, 5, 235, 82, 94, 133, 4, 141, 52, 209, 253, 137, 36, 137, 200, 161, 182, 225, 218, 147] }) })) }"#; +} diff --git a/packages/rs-drive-abci/src/replay/mod.rs b/packages/rs-drive-abci/src/replay/mod.rs new file mode 100644 index 00000000000..942dd0896cb --- /dev/null +++ b/packages/rs-drive-abci/src/replay/mod.rs @@ -0,0 +1,310 @@ +mod cli; +mod log_ingest; +mod runner; + +use crate::abci::app::FullAbciApplication; +use crate::config::PlatformConfig; +use crate::platform_types::platform::Platform; +use crate::platform_types::platform_state::v0::PlatformStateV0Methods; +use crate::rpc::core::DefaultCoreRPC; +use crate::verify; +use cli::SkipSelector; +use dpp::block::extended_block_info::v0::ExtendedBlockInfoV0Getters; +use log_ingest::LogRequestStream; +use runner::{ + ensure_db_directory, execute_request, log_last_committed_block, stop_height_reached, + LoadedRequest, ProgressReporter, ReplayItem, +}; +use std::error::Error; +use std::path::PathBuf; +use tokio_util::sync::CancellationToken; + +pub use cli::ReplayArgs; + +/// Replay ABCI requests captured in drive-abci JSON logs. +pub fn run( + mut config: PlatformConfig, + args: ReplayArgs, + cancel: CancellationToken, +) -> Result<(), Box> { + let db_path = args + .db_path + .clone() + .unwrap_or_else(|| config.db_path.clone()); + config.db_path = db_path; + let db_was_created = ensure_db_directory(&config.db_path)?; + + tracing::info!("running database verification before replay"); + verify::run(&config, true).map_err(|e| format!("verification failed before replay: {}", e))?; + + let mut stream = LogRequestStream::open(&args.log)?; + if stream.peek()?.is_some() { + tracing::info!("streaming ABCI requests from log {}", args.log.display()); + } else { + return Err(format!( + "no supported ABCI requests found in log {}; provide --log with relevant inputs", + args.log.display() + ) + .into()); + } + + if db_was_created { + let mut first_is_init_chain = false; + advance_stream(&mut stream, None, &args.skip)?; + if let Some(item) = stream.peek()? { + first_is_init_chain = matches!(item.request, LoadedRequest::InitChain(_)); + } + + if !first_is_init_chain { + return Err( + "database path did not exist; first replayed request must be init_chain".into(), + ); + } + } + + let core_rpc = DefaultCoreRPC::open( + config.core.consensus_rpc.url().as_str(), + config.core.consensus_rpc.username.clone(), + config.core.consensus_rpc.password.clone(), + )?; + + let platform: Platform = + Platform::open_with_client(&config.db_path, Some(config.clone()), core_rpc, None)?; + log_last_committed_block(&platform); + let mut known_height = platform + .state + .load() + .last_committed_block_info() + .as_ref() + .map(|info| info.basic_info().height); + + if let Some(limit) = args.stop_height { + if let Some(current) = known_height { + if current >= limit { + tracing::info!( + "current platform height {} is already at or above stop height {}; ending replay", + current, + limit + ); + return Ok(()); + } + } + } + + let app = FullAbciApplication::new(&platform); + let mut progress = if args.progress { + Some(ProgressReporter::new(args.stop_height)) + } else { + None + }; + let mut cancelled = false; + let mut validator = RequestSequenceValidator::new(stream.path().to_path_buf()); + let mut executed = 0usize; + loop { + if cancel.is_cancelled() { + tracing::info!( + "cancellation requested; stopping replay for log {}", + stream.path().display() + ); + cancelled = true; + break; + } + if stop_height_reached(args.stop_height, known_height) { + tracing::info!( + "stop height {} reached; stopping replay for log {}", + args.stop_height.unwrap(), + stream.path().display() + ); + break; + } + advance_stream(&mut stream, known_height, &args.skip)?; + let Some(item) = stream.next_item()? else { + break; + }; + validator.observe(&item)?; + let committed = execute_request(&app, item, progress.as_mut())?; + update_known_height(&mut known_height, committed); + executed += 1; + } + validator.finish()?; + tracing::info!( + "replayed {} ABCI requests from log {}", + executed, + stream.path().display() + ); + + if cancelled { + tracing::info!("replay interrupted by cancellation"); + } + + Ok(()) +} + +fn advance_stream( + stream: &mut LogRequestStream, + known_height: Option, + skip: &[SkipSelector], +) -> Result<(), Box> { + if let Some(height) = known_height { + let skipped = stream.skip_processed_entries(height)?; + if skipped > 0 { + tracing::info!( + "skipped {} ABCI requests already applied (height <= {}) in log {}", + skipped, + height, + stream.path().display() + ); + } + } + drain_skipped_entries(stream, skip)?; + Ok(()) +} + +fn drain_skipped_entries( + stream: &mut LogRequestStream, + skip: &[SkipSelector], +) -> Result<(), Box> { + if skip.is_empty() { + return Ok(()); + } + + loop { + let Some(item) = stream.peek()? else { + break; + }; + + if should_skip_item(item, skip) { + let description = item.describe(); + if stream.next_item()?.is_none() { + break; + } + tracing::info!("skipping request {} due to --skip", description); + continue; + } + + break; + } + + Ok(()) +} + +fn should_skip_item(item: &ReplayItem, skip: &[SkipSelector]) -> bool { + let line = item.source.line(); + skip.iter().any(|selector| selector.matches(line)) +} + +fn update_known_height(current: &mut Option, new_height: Option) { + if let Some(height) = new_height { + match current { + Some(existing) if height <= *existing => {} + _ => *current = Some(height), + } + } +} + +struct RequestSequenceValidator { + path: PathBuf, + last_height: Option, + saw_process: bool, + saw_finalize: bool, +} + +impl RequestSequenceValidator { + fn new(path: PathBuf) -> Self { + Self { + path, + last_height: None, + saw_process: false, + saw_finalize: false, + } + } + + fn observe(&mut self, item: &ReplayItem) -> Result<(), Box> { + let height = match item.request.block_height() { + Some(height) => height, + None => return Ok(()), + }; + + match &item.request { + LoadedRequest::Process(_) => self.record_process(height, &item.describe()), + LoadedRequest::Finalize(_) => self.record_finalize(height, &item.describe()), + _ => Ok(()), + } + } + + fn record_process(&mut self, height: u64, origin: &str) -> Result<(), Box> { + self.bump_height(height, origin)?; + self.saw_process = true; + Ok(()) + } + + fn record_finalize(&mut self, height: u64, origin: &str) -> Result<(), Box> { + self.bump_height(height, origin)?; + if !self.saw_process { + return Err(format!( + "log {} contains finalize_block before process_proposal at height {} ({})", + self.path.display(), + height, + origin + ) + .into()); + } + self.saw_finalize = true; + Ok(()) + } + + fn bump_height(&mut self, height: u64, origin: &str) -> Result<(), Box> { + match self.last_height { + Some(last) if height < last => Err(format!( + "log {} has out-of-order height {} before {} ({})", + self.path.display(), + last, + height, + origin + ) + .into()), + Some(last) if height == last => Ok(()), + Some(last) => { + if !self.saw_process || !self.saw_finalize { + return Err(format!( + "log {} missing process/finalize pair for height {} before {}", + self.path.display(), + last, + origin + ) + .into()); + } + if height != last + 1 { + return Err(format!( + "log {} skipped heights ({} -> {}) before {}", + self.path.display(), + last, + height, + origin + ) + .into()); + } + self.last_height = Some(height); + self.saw_process = false; + self.saw_finalize = false; + Ok(()) + } + None => { + self.last_height = Some(height); + Ok(()) + } + } + } + + fn finish(&self) -> Result<(), Box> { + if self.last_height.is_some() && (!self.saw_process || !self.saw_finalize) { + return Err(format!( + "log {} ended before height {} had both process_proposal and finalize_block", + self.path.display(), + self.last_height.unwrap() + ) + .into()); + } + Ok(()) + } +} diff --git a/packages/rs-drive-abci/src/replay/runner.rs b/packages/rs-drive-abci/src/replay/runner.rs new file mode 100644 index 00000000000..2aca5ead2ac --- /dev/null +++ b/packages/rs-drive-abci/src/replay/runner.rs @@ -0,0 +1,552 @@ +use crate::abci::app::FullAbciApplication; +use crate::platform_types::platform::Platform; +use crate::platform_types::platform_state::v0::PlatformStateV0Methods; +use dpp::block::extended_block_info::v0::ExtendedBlockInfoV0Getters; +use dpp::version::PlatformVersion; +use hex::ToHex; +use std::collections::VecDeque; +use std::convert::TryFrom; +use std::error::Error; +use std::fmt::Write as _; +use std::fs; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; +use tenderdash_abci::proto::abci::{ + response_process_proposal, response_verify_vote_extension, RequestExtendVote, + RequestFinalizeBlock, RequestInfo, RequestInitChain, RequestPrepareProposal, + RequestProcessProposal, RequestVerifyVoteExtension, +}; +use tenderdash_abci::Application; + +#[derive(Debug, Clone)] +pub(super) struct ReplayItem { + pub(super) source: ReplaySource, + pub(super) request: LoadedRequest, +} + +impl ReplayItem { + pub(super) fn from_log( + path: &Path, + line: usize, + timestamp: Option, + endpoint: Option, + request: LoadedRequest, + ) -> Self { + Self { + source: ReplaySource { + path: path.to_path_buf(), + line, + timestamp, + endpoint, + }, + request, + } + } + + pub(super) fn describe(&self) -> String { + self.source.describe() + } +} + +#[derive(Debug, Clone)] +pub(super) struct ReplaySource { + path: PathBuf, + line: usize, + timestamp: Option, + endpoint: Option, +} + +impl ReplaySource { + fn describe(&self) -> String { + let mut out = format!("{}:{}", self.path.display(), self.line); + if let Some(endpoint) = &self.endpoint { + write!(&mut out, " {}", endpoint).ok(); + } + if let Some(ts) = &self.timestamp { + write!(&mut out, " @{}", ts).ok(); + } + out + } + + pub(super) fn line(&self) -> usize { + self.line + } +} + +#[derive(Debug, Clone)] +pub(super) enum LoadedRequest { + InitChain(RequestInitChain), + Info(RequestInfo), + Prepare(RequestPrepareProposal), + Process(RequestProcessProposal), + Finalize(RequestFinalizeBlock), + ExtendVote(RequestExtendVote), + VerifyVoteExtension(RequestVerifyVoteExtension), +} + +impl LoadedRequest { + pub(super) fn block_height(&self) -> Option { + fn to_height(value: i64) -> Option { + u64::try_from(value).ok() + } + + match self { + LoadedRequest::InitChain(_) => Some(0), + LoadedRequest::Info(_) => None, + LoadedRequest::Prepare(request) => to_height(request.height), + LoadedRequest::Process(request) => to_height(request.height), + LoadedRequest::Finalize(request) => to_height(request.height), + LoadedRequest::ExtendVote(request) => to_height(request.height), + LoadedRequest::VerifyVoteExtension(request) => to_height(request.height), + } + } +} + +pub(super) fn ensure_db_directory(path: &Path) -> Result> { + if path.exists() { + if !path.is_dir() { + return Err(format!("{} exists but is not a directory", path.display()).into()); + } + return Ok(false); + } + + fs::create_dir_all(path)?; + tracing::info!( + "database directory {} was missing; created a fresh instance (expect InitChain)", + path.display() + ); + Ok(true) +} + +pub(super) fn execute_request( + app: &FullAbciApplication, + item: ReplayItem, + progress: Option<&mut ProgressReporter>, +) -> Result, Box> +where + C: crate::rpc::core::CoreRPCLike, +{ + let origin = item.describe(); + let mut committed_height = None; + match item.request { + LoadedRequest::InitChain(request) => { + tracing::debug!("executing init_chain from {}", origin); + let response = app.init_chain(request).map_err(|err| { + logged_error(format!("init_chain failed for {}: {:?}", origin, err)) + })?; + tracing::info!( + "init_chain result ({}): app_hash=0x{}, validator_updates={}", + origin, + hex::encode(&response.app_hash), + response + .validator_set_update + .as_ref() + .map(|v| v.validator_updates.len()) + .unwrap_or(0) + ); + committed_height = app + .platform + .state + .load() + .last_committed_block_info() + .as_ref() + .map(|info| info.basic_info().height); + } + LoadedRequest::Info(request) => { + tracing::debug!("executing info from {}", origin); + let response = app + .info(request) + .map_err(|err| logged_error(format!("info failed for {}: {:?}", origin, err)))?; + tracing::info!( + "info result ({}): last_block_height={}, last_block_app_hash=0x{}", + origin, + response.last_block_height, + hex::encode(response.last_block_app_hash) + ); + } + LoadedRequest::Prepare(request) => { + let height = request.height; + let context = format_height_round(Some(height), Some(request.round)); + tracing::debug!( + "executing prepare_proposal from {} (height={})", + origin, + height + ); + let response = app.prepare_proposal(request).map_err(|err| { + logged_error(format!( + "prepare_proposal failed for {}{}: {:?}", + origin, context, err + )) + })?; + tracing::debug!( + "prepare_proposal result ({}): height={}, app_hash=0x{}, tx_results={}, tx_records={}", + origin, + height, + response.app_hash.encode_hex::(), + response.tx_results.len(), + response.tx_records.len() + ); + } + LoadedRequest::Process(request) => { + let height = request.height; + let context = format_height_round(Some(height), Some(request.round)); + tracing::debug!( + "executing process_proposal from {} (height={})", + origin, + height + ); + let response = app.process_proposal(request).map_err(|err| { + logged_error(format!( + "process_proposal failed for {}{}: {:?}", + origin, context, err + )) + })?; + let status = response_process_proposal::ProposalStatus::try_from(response.status) + .unwrap_or(response_process_proposal::ProposalStatus::Unknown); + tracing::debug!( + "process_proposal result ({}): status={:?}, height={}, app_hash=0x{}, tx_results={}, events={}", + origin, + status, + height, + hex::encode(response.app_hash), + response.tx_results.len(), + response.events.len() + ); + } + LoadedRequest::Finalize(request) => { + let height = request.height; + let round = request.round; + let context = format_height_round(Some(height), Some(round)); + tracing::debug!( + "executing finalize_block from {} (height={}, round={})", + origin, + height, + round + ); + let expected = extract_expected_app_hash(&request); + + let response = app.finalize_block(request).map_err(|err| { + logged_error(format!( + "finalize_block failed for {}{}: {:?}", + origin, context, err + )) + })?; + let actual_hash = app + .platform + .state + .load() + .last_committed_block_app_hash() + .map(|hash| hash.to_vec()); + let grove_hash = app + .platform + .drive + .grove + .root_hash( + None, + &app.platform + .state + .load() + .current_platform_version()? + .drive + .grove_version, + ) + .unwrap() + .unwrap_or_default(); + let actual_hex = actual_hash + .as_ref() + .map(hex::encode) + .unwrap_or_else(|| "unknown".to_string()); + tracing::debug!( + "finalize_block result ({}): height={}, retain_height={}, state_app_hash=0x{}, grove_root=0x{}", + origin, + height, + response.retain_height, + actual_hex, + hex::encode(grove_hash) + ); + match (expected.as_ref(), actual_hash.as_ref()) { + (Some(expected_hash), Some(actual_hash)) => { + if expected_hash != actual_hash { + return Err(logged_error(format!( + "app_hash mismatch for {}{}: expected 0x{}, got 0x{}", + origin, + context, + hex::encode(expected_hash), + actual_hex + ))); + } + if expected_hash != &grove_hash { + return Err(logged_error(format!( + "grovedb root mismatch for {}{}: expected 0x{}, grove 0x{}", + origin, + context, + hex::encode(expected_hash), + hex::encode(grove_hash) + ))); + } + } + (Some(_), None) => { + return Err(logged_error(format!( + "finalize_block executed for {}{} but platform state did not expose app_hash", + origin, context + ))); + } + (None, Some(_)) => { + tracing::warn!( + "could not extract expected app_hash from finalize_block request {}{}", + origin, + context + ); + } + (None, None) => { + tracing::warn!( + "could not extract expected app_hash from request or state for {}{}", + origin, + context + ); + } + } + if let Some(reporter) = progress { + if let Ok(block_height) = u64::try_from(height) { + if let Some(hash) = actual_hash.as_deref().or(expected.as_deref()) { + reporter.record(block_height, hash); + } + } + } + committed_height = u64::try_from(height).ok(); + } + LoadedRequest::ExtendVote(request) => { + let context = format_height_round(Some(request.height), Some(request.round)); + tracing::debug!( + "executing extend_vote from {} (height={}, round={})", + origin, + request.height, + request.round + ); + let response = app.extend_vote(request).map_err(|err| { + logged_error(format!( + "extend_vote failed for {}{}: {:?}", + origin, context, err + )) + })?; + let total_bytes: usize = response + .vote_extensions + .iter() + .map(|ext| ext.extension.len()) + .sum(); + tracing::debug!( + "extend_vote result ({}): vote_extensions={}, total_extension_bytes={}", + origin, + response.vote_extensions.len(), + total_bytes + ); + } + LoadedRequest::VerifyVoteExtension(request) => { + let context = format_height_round(Some(request.height), Some(request.round)); + tracing::debug!( + "executing verify_vote_extension from {} (height={}, round={})", + origin, + request.height, + request.round + ); + let response = app.verify_vote_extension(request).map_err(|err| { + logged_error(format!( + "verify_vote_extension failed for {}{}: {:?}", + origin, context, err + )) + })?; + let status = response_verify_vote_extension::VerifyStatus::try_from(response.status) + .unwrap_or(response_verify_vote_extension::VerifyStatus::Unknown); + tracing::debug!( + "verify_vote_extension result ({}): status={:?}", + origin, + status + ); + } + } + + Ok(committed_height) +} + +const PROGRESS_MIN_INTERVAL: Duration = Duration::from_secs(1); +const PROGRESS_WINDOW_SHORT: Duration = Duration::from_secs(60); +const PROGRESS_WINDOW_LONG: Duration = Duration::from_secs(300); + +pub(super) struct ProgressReporter { + history: VecDeque<(Instant, u64)>, + last_emit: Option, + stop_height: Option, +} + +impl ProgressReporter { + pub(super) fn new(stop_height: Option) -> Self { + Self { + history: VecDeque::new(), + last_emit: None, + stop_height, + } + } + + pub(super) fn record(&mut self, height: u64, app_hash: &[u8]) { + let now = Instant::now(); + self.history.push_back((now, height)); + self.trim_history(now); + + if let Some(last) = self.last_emit { + if now.duration_since(last) < PROGRESS_MIN_INTERVAL { + return; + } + } + + let rate_1m = self.rate_per_minute(now, PROGRESS_WINDOW_SHORT); + let rate_5m = self.rate_per_minute(now, PROGRESS_WINDOW_LONG); + let eta = Self::format_eta(self.eta(rate_5m, height)); + tracing::info!( + height, + app_hash = Self::short_hash(app_hash), + rate_1m = Self::format_rate(rate_1m), + rate_5m = Self::format_rate(rate_5m), + eta, + "block processed", + ); + self.last_emit = Some(now); + } + + fn trim_history(&mut self, now: Instant) { + while let Some((ts, _)) = self.history.front() { + if now.duration_since(*ts) > PROGRESS_WINDOW_LONG { + self.history.pop_front(); + } else { + break; + } + } + } + + fn rate_per_minute(&self, now: Instant, window: Duration) -> Option { + let latest = self.history.back()?; + let start = self + .history + .iter() + .find(|(ts, _)| now.duration_since(*ts) <= window)?; + let elapsed = now.duration_since(start.0).as_secs_f64(); + if elapsed <= 0.0 { + return None; + } + let delta = latest.1.saturating_sub(start.1) as f64; + Some(delta / elapsed * 60.0) + } + + fn format_rate(rate: Option) -> String { + rate.map(|value| format!("{:.2} blk/min", value)) + .unwrap_or_else(|| "n/a".to_string()) + } + + fn eta(&self, rate_5m: Option, current_height: u64) -> Option { + let target = self.stop_height?; + if current_height >= target { + return Some(Duration::from_secs(0)); + } + let rate = rate_5m?; + if rate <= 0.0 { + return None; + } + let remaining = target.saturating_sub(current_height) as f64; + let seconds = (remaining / rate * 60.0).round(); + if seconds.is_finite() && seconds >= 0.0 { + Some(Duration::from_secs(seconds as u64)) + } else { + None + } + } + + fn format_eta(duration: Option) -> String { + duration + .map(|d| { + let total = d.as_secs(); + let hours = total / 3600; + let minutes = (total % 3600) / 60; + let seconds = total % 60; + if hours > 0 { + format!("{:02}h{:02}m{:02}s", hours, minutes, seconds) + } else if minutes > 0 { + format!("{:02}m{:02}s", minutes, seconds) + } else { + format!("{:02}s", seconds) + } + }) + .unwrap_or_else(|| "n/a".to_string()) + } + + fn short_hash(app_hash: &[u8]) -> String { + let hex = hex::encode(app_hash); + let end = hex.len().min(10); + hex[..end].to_string() + } +} + +fn logged_error(message: String) -> Box { + tracing::error!("{}", message); + message.into() +} + +pub(super) fn log_last_committed_block(platform: &Platform) +where + C: crate::rpc::core::CoreRPCLike, +{ + let platform_state = platform.state.load(); + let grove_version = &platform_state + .current_platform_version() + .unwrap_or(PlatformVersion::latest()) + .drive + .grove_version; + let grove_hash = platform + .drive + .grove + .root_hash(None, grove_version) + .unwrap() + .unwrap_or_default(); + + if let Some(info) = platform_state.last_committed_block_info() { + let app_hash = info.app_hash(); + let basic_info = info.basic_info(); + tracing::info!( + height = basic_info.height, + round = info.round(), + core_height = basic_info.core_height, + block_id_hash = %hex::encode(info.block_id_hash()), + app_hash = %hex::encode(app_hash), + grove_hash = %hex::encode(grove_hash), + "Platform state at last committed block", + ); + } else { + tracing::info!("last_committed_block: None"); + } +} + +pub(super) fn stop_height_reached(limit: Option, known_height: Option) -> bool { + matches!((limit, known_height), (Some(limit), Some(height)) if height >= limit) +} + +fn extract_expected_app_hash(request: &RequestFinalizeBlock) -> Option> { + request + .block + .as_ref() + .and_then(|block| block.header.as_ref()) + .map(|header| header.app_hash.clone()) +} + +fn format_height_round(height: Option, round: Option) -> String { + let mut parts = Vec::new(); + if let Some(h) = height { + parts.push(format!("height={}", h)); + } + if let Some(r) = round { + parts.push(format!("round={}", r)); + } + + if parts.is_empty() { + String::new() + } else { + format!(" ({})", parts.join(", ")) + } +} diff --git a/packages/rs-drive-abci/src/verify/mod.rs b/packages/rs-drive-abci/src/verify/mod.rs new file mode 100644 index 00000000000..fb529bbf548 --- /dev/null +++ b/packages/rs-drive-abci/src/verify/mod.rs @@ -0,0 +1,140 @@ +use crate::config::PlatformConfig; +use crate::platform_types::platform::Platform; +use crate::platform_types::platform_state::v0::PlatformStateV0Methods; +use crate::platform_types::platform_state::PlatformState; +use crate::rpc::core::DefaultCoreRPC; +use dpp::version::PlatformVersion; +use drive::drive::Drive; +use std::fs::remove_file; +use std::path::PathBuf; + +/// Run all verification steps: +/// - GroveDB integrity +/// - platform state app hash vs GroveDB root +/// - configuration consistency with stored protocol version +pub fn run(config: &PlatformConfig, force: bool) -> Result<(), String> { + verify_grovedb(&config.db_path, force)?; + + let (drive, maybe_platform_version) = + Drive::open(&config.db_path, Some(config.drive.clone())).map_err(|e| e.to_string())?; + + let platform_version = maybe_platform_version.unwrap_or_else(PlatformVersion::latest); + + let platform_state = + Platform::::fetch_platform_state(&drive, None, platform_version) + .map_err(|e| e.to_string())? + .ok_or_else(|| "platform state missing from database".to_string())?; + + verify_app_hash_matches_grove_root(&drive, &platform_state, platform_version)?; + verify_config_matches_db(config, &drive)?; + + Ok(()) +} + +/// Verify GroveDB integrity. +/// +/// This function will execute GroveDB integrity checks if one of the following conditions is met: +/// - `force` is `true` +/// - file `.fsck` in `db_path` exists +/// +/// After successful verification, .fsck file is removed. +pub fn verify_grovedb(db_path: &PathBuf, force: bool) -> Result<(), String> { + let fsck = PathBuf::from(db_path).join(".fsck"); + + if !force { + if !fsck.exists() { + return Ok(()); + } + tracing::info!( + "found {} file, starting grovedb verification", + fsck.display() + ); + } + + let grovedb = drive::grovedb::GroveDb::open(db_path).expect("open grovedb"); + // TODO: fetch platform version instead of taking latest + let result = grovedb + .visualize_verify_grovedb( + None, + true, + true, + &PlatformVersion::latest().drive.grove_version, + ) + .map_err(|e| e.to_string()); + + match result { + Ok(data) => { + for result in data { + tracing::warn!(?result, "grovedb verification") + } + tracing::info!("grovedb verification finished"); + + if fsck.exists() { + if let Err(e) = remove_file(&fsck) { + tracing::warn!( + error = ?e, + path =fsck.display().to_string(), + "grovedb verification: cannot remove .fsck file: please remove it manually to avoid running verification again", + ); + } + } + Ok(()) + } + Err(e) => { + tracing::error!("grovedb verification failed: {}", e); + Err(e) + } + } +} + +fn verify_app_hash_matches_grove_root( + drive: &Drive, + platform_state: &PlatformState, + platform_version: &PlatformVersion, +) -> Result<(), String> { + let app_hash = platform_state + .last_committed_block_app_hash() + .ok_or_else(|| "platform state missing last committed app hash".to_string())?; + + let grove_root = drive + .grove + .root_hash(None, &platform_version.drive.grove_version) + .unwrap() + .map_err(|e| e.to_string())?; + + if grove_root != app_hash { + return Err(format!( + "app hash mismatch: platform state 0x{} vs grovedb root 0x{}", + hex::encode(app_hash), + hex::encode(grove_root) + )); + } + + tracing::info!("platform state app hash matches grovedb root hash"); + Ok(()) +} + +fn verify_config_matches_db(_config: &PlatformConfig, drive: &Drive) -> Result<(), String> { + let stored_protocol_version = drive + .fetch_current_protocol_version(None) + .map_err(|e| e.to_string())?; + + if let Some(stored) = stored_protocol_version { + let binary_supported = PlatformVersion::latest().protocol_version; + if stored > binary_supported { + return Err(format!( + "database protocol version {} is newer than binary supports {}", + stored, binary_supported + )); + } + tracing::info!( + "protocol version in DB {} is compatible with binary {}", + stored, + binary_supported + ); + } else { + tracing::warn!("no protocol version found in DB to compare with binary support"); + } + + Ok(()) +} diff --git a/packages/rs-drive/src/drive/votes/cleanup/remove_all_votes_given_by_identities/v0/mod.rs b/packages/rs-drive/src/drive/votes/cleanup/remove_all_votes_given_by_identities/v0/mod.rs index 2f2346d88b6..e0a125b1a57 100644 --- a/packages/rs-drive/src/drive/votes/cleanup/remove_all_votes_given_by_identities/v0/mod.rs +++ b/packages/rs-drive/src/drive/votes/cleanup/remove_all_votes_given_by_identities/v0/mod.rs @@ -138,7 +138,8 @@ impl Drive { deletion_batch, &mut vec![], &platform_version.drive, - )?; + ) + .inspect_err(|err| tracing::error!(?err, "vote deletion batch failed"))?; } Ok(())