From 98e7e9b21dd68e96f9d41f8b9e5386b1d6f8cf1e Mon Sep 17 00:00:00 2001 From: Ludo Galabru Date: Thu, 3 Aug 2023 09:37:26 +0200 Subject: [PATCH] feat: restore ability to replay transfers --- components/hord-cli/src/cli/mod.rs | 2 +- .../processors/inscription_indexing.rs | 11 +- .../src/core/pipeline/processors/mod.rs | 1 + .../processors/transfers_recomputing.rs | 205 ++++++++++++ components/hord-cli/src/db/mod.rs | 36 +- components/hord-cli/src/service/http_api.rs | 10 +- components/hord-cli/src/service/mod.rs | 316 ++++++------------ 7 files changed, 342 insertions(+), 239 deletions(-) create mode 100644 components/hord-cli/src/core/pipeline/processors/transfers_recomputing.rs diff --git a/components/hord-cli/src/cli/mod.rs b/components/hord-cli/src/cli/mod.rs index 7593bf20..29edbffa 100644 --- a/components/hord-cli/src/cli/mod.rs +++ b/components/hord-cli/src/cli/mod.rs @@ -666,7 +666,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> { RepairCommand::Transfers(cmd) => { let config = Config::default(false, false, false, &cmd.config_path)?; let service = Service::new(config, ctx.clone()); - service.replay_transfers(cmd.start_block, cmd.end_block, None)?; + service.replay_transfers(cmd.start_block, cmd.end_block, None).await?; } }, Command::Db(HordDbCommand::Check(cmd)) => { diff --git a/components/hord-cli/src/core/pipeline/processors/inscription_indexing.rs b/components/hord-cli/src/core/pipeline/processors/inscription_indexing.rs index 7e3eef97..e60d8b25 100644 --- a/components/hord-cli/src/core/pipeline/processors/inscription_indexing.rs +++ b/components/hord-cli/src/core/pipeline/processors/inscription_indexing.rs @@ -7,8 +7,8 @@ use std::{ use chainhook_sdk::{ bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script}, types::{ - BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType, OrdinalOperation, - TransactionIdentifier, OrdinalInscriptionTransferData, + BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionCurseType, + OrdinalInscriptionTransferData, OrdinalOperation, TransactionIdentifier, }, utils::Context, }; @@ -238,7 +238,7 @@ pub fn re_augment_block_with_ordinals_operations( // Restore inscriptions data let mut inscriptions = find_all_inscriptions_in_block(&block.block_identifier.index, inscriptions_db_conn, ctx); - + let mut should_become_cursed = vec![]; for (tx_index, tx) in block.transactions.iter_mut().enumerate() { for (op_index, operation) in tx.metadata.ordinal_operations.iter_mut().enumerate() { @@ -296,7 +296,10 @@ pub fn re_augment_block_with_ordinals_operations( let OrdinalOperation::InscriptionRevealed(inscription) = tx.metadata.ordinal_operations.remove(op_index) else { continue; }; - tx.metadata.ordinal_operations.insert(op_index, OrdinalOperation::CursedInscriptionRevealed(inscription)); + tx.metadata.ordinal_operations.insert( + op_index, + OrdinalOperation::CursedInscriptionRevealed(inscription), + ); } // TODO: Handle transfers diff --git a/components/hord-cli/src/core/pipeline/processors/mod.rs b/components/hord-cli/src/core/pipeline/processors/mod.rs index 937f1808..b575ad67 100644 --- a/components/hord-cli/src/core/pipeline/processors/mod.rs +++ b/components/hord-cli/src/core/pipeline/processors/mod.rs @@ -1,4 +1,5 @@ pub mod block_ingestion; pub mod inscription_indexing; +pub mod transfers_recomputing; pub use inscription_indexing::start_inscription_indexing_processor; diff --git a/components/hord-cli/src/core/pipeline/processors/transfers_recomputing.rs b/components/hord-cli/src/core/pipeline/processors/transfers_recomputing.rs new file mode 100644 index 00000000..19fd8bcf --- /dev/null +++ b/components/hord-cli/src/core/pipeline/processors/transfers_recomputing.rs @@ -0,0 +1,205 @@ +use std::{ + collections::BTreeMap, + thread::{sleep, JoinHandle}, + time::Duration, +}; + +use chainhook_sdk::{ + bitcoincore_rpc_json::bitcoin::{hashes::hex::FromHex, Address, Network, Script}, + types::{BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionTransferData, OrdinalOperation}, + utils::Context, +}; +use crossbeam_channel::{Sender, TryRecvError}; + +use crate::{ + core::protocol::sequencing::update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx, + db::{ + find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_locations, + parse_satpoint_to_watch, remove_entries_from_locations_at_block_height, + }, +}; + +use crate::{ + config::Config, + core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent}, + db::open_readwrite_hord_db_conn, +}; + +pub fn start_transfers_recomputing_processor( + config: &Config, + ctx: &Context, + post_processor: Option>, +) -> PostProcessorController { + let (commands_tx, commands_rx) = crossbeam_channel::bounded::(2); + let (events_tx, events_rx) = crossbeam_channel::unbounded::(); + + let config = config.clone(); + let ctx = ctx.clone(); + let handle: JoinHandle<()> = hiro_system_kit::thread_named("Inscription indexing runloop") + .spawn(move || { + let mut inscriptions_db_conn_rw = + open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx).unwrap(); + let mut empty_cycles = 0; + + if let Ok(PostProcessorCommand::Start) = commands_rx.recv() { + info!(ctx.expect_logger(), "Start transfers recomputing runloop"); + } + + loop { + let mut blocks = match commands_rx.try_recv() { + Ok(PostProcessorCommand::ProcessBlocks(_, blocks)) => { + empty_cycles = 0; + blocks + } + Ok(PostProcessorCommand::Terminate) => break, + Ok(PostProcessorCommand::Start) => continue, + Err(e) => match e { + TryRecvError::Empty => { + empty_cycles += 1; + if empty_cycles == 10 { + empty_cycles = 0; + let _ = events_tx.send(PostProcessorEvent::EmptyQueue); + } + sleep(Duration::from_secs(1)); + continue; + } + _ => { + break; + } + }, + }; + + info!(ctx.expect_logger(), "Processing {} blocks", blocks.len()); + + for block in blocks.iter_mut() { + let network = match block.metadata.network { + BitcoinNetwork::Mainnet => Network::Bitcoin, + BitcoinNetwork::Regtest => Network::Regtest, + BitcoinNetwork::Testnet => Network::Testnet, + }; + + info!( + ctx.expect_logger(), + "Cleaning transfers from block {}", block.block_identifier.index + ); + let inscriptions = find_all_inscriptions_in_block( + &block.block_identifier.index, + &inscriptions_db_conn_rw, + &ctx, + ); + info!( + ctx.expect_logger(), + "{} inscriptions retrieved at block {}", + inscriptions.len(), + block.block_identifier.index + ); + let mut operations = BTreeMap::new(); + + let transaction = inscriptions_db_conn_rw.transaction().unwrap(); + + remove_entries_from_locations_at_block_height( + &block.block_identifier.index, + &transaction, + &ctx, + ); + + for (_, entry) in inscriptions.iter() { + let inscription_id = entry.get_inscription_id(); + info!( + ctx.expect_logger(), + "Processing inscription {}", inscription_id + ); + insert_entry_in_locations( + &inscription_id, + block.block_identifier.index, + &entry.transfer_data, + &transaction, + &ctx, + ); + + operations.insert( + entry.transaction_identifier_inscription.clone(), + OrdinalInscriptionTransferData { + inscription_id: entry.get_inscription_id(), + updated_address: None, + satpoint_pre_transfer: format_satpoint_to_watch( + &entry.transaction_identifier_inscription, + entry.inscription_input_index, + 0, + ), + satpoint_post_transfer: format_satpoint_to_watch( + &entry.transfer_data.transaction_identifier_location, + entry.transfer_data.output_index, + entry.transfer_data.inscription_offset_intra_output, + ), + post_transfer_output_value: None, + tx_index: 0, + }, + ); + } + + info!( + ctx.expect_logger(), + "Rewriting transfers for block {}", block.block_identifier.index + ); + + for tx in block.transactions.iter_mut() { + tx.metadata.ordinal_operations.clear(); + if let Some(mut entry) = operations.remove(&tx.transaction_identifier) { + let (_, output_index, _) = + parse_satpoint_to_watch(&entry.satpoint_post_transfer); + + let script_pub_key_hex = + tx.metadata.outputs[output_index].get_script_pubkey_hex(); + let updated_address = match Script::from_hex(&script_pub_key_hex) { + Ok(script) => { + match Address::from_script(&script, network.clone()) { + Ok(address) => Some(address.to_string()), + Err(_e) => None, + } + } + Err(_e) => None, + }; + + entry.updated_address = updated_address; + entry.post_transfer_output_value = + Some(tx.metadata.outputs[output_index].value); + + tx.metadata + .ordinal_operations + .push(OrdinalOperation::InscriptionTransferred(entry)); + } + } + + update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx( + block, + &transaction, + &ctx, + ) + .unwrap(); + + info!( + ctx.expect_logger(), + "Saving supdates for block {}", block.block_identifier.index + ); + transaction.commit().unwrap(); + + info!( + ctx.expect_logger(), + "Transfers in block {} repaired", block.block_identifier.index + ); + + if let Some(ref post_processor) = post_processor { + let _ = post_processor.send(block.clone()); + } + } + } + }) + .expect("unable to spawn thread"); + + PostProcessorController { + commands_tx, + events_rx, + thread_handle: handle, + } +} diff --git a/components/hord-cli/src/db/mod.rs b/components/hord-cli/src/db/mod.rs index b05cb9ca..857be647 100644 --- a/components/hord-cli/src/db/mod.rs +++ b/components/hord-cli/src/db/mod.rs @@ -76,22 +76,6 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection { ) }); } else { - if let Err(e) = conn.execute( - "CREATE TABLE IF NOT EXISTS locations ( - inscription_id TEXT NOT NULL, - block_height INTEGER NOT NULL, - tx_index INTEGER NOT NULL, - outpoint_to_watch TEXT NOT NULL, - offset INTEGER NOT NULL, - UNIQUE(outpoint_to_watch,offset) - )", - [], - ) { - ctx.try_log(|logger| { - warn!(logger, "Unable to create table locations:{}", e.to_string()) - }); - } - if let Err(e) = conn.execute( "CREATE INDEX IF NOT EXISTS index_inscriptions_on_ordinal_number ON inscriptions(ordinal_number);", [], @@ -110,7 +94,22 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection { [], ) { ctx.try_log(|logger| warn!(logger, "{}", e.to_string())); - } + } + } + if let Err(e) = conn.execute( + "CREATE TABLE IF NOT EXISTS locations ( + inscription_id TEXT NOT NULL, + block_height INTEGER NOT NULL, + tx_index INTEGER NOT NULL, + outpoint_to_watch TEXT NOT NULL, + offset INTEGER NOT NULL + )", + [], + ) { + ctx.try_log(|logger| { + warn!(logger, "Unable to create table locations: {}", e.to_string()) + }); + } else { if let Err(e) = conn.execute( "CREATE INDEX IF NOT EXISTS index_locations_on_block_height ON locations(block_height);", [], @@ -128,8 +127,9 @@ pub fn initialize_hord_db(path: &PathBuf, ctx: &Context) -> Connection { [], ) { ctx.try_log(|logger| warn!(logger, "{}", e.to_string())); - } + } } + conn } diff --git a/components/hord-cli/src/service/http_api.rs b/components/hord-cli/src/service/http_api.rs index f044c660..d2fecc73 100644 --- a/components/hord-cli/src/service/http_api.rs +++ b/components/hord-cli/src/service/http_api.rs @@ -133,10 +133,7 @@ fn handle_create_predicate( let predicate_uuid = predicate.get_uuid().to_string(); if let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn(api_config) { - let key: String = format!( - "{}", - ChainhookSpecification::bitcoin_key(&predicate_uuid) - ); + let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid)); match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) { Ok(Some(_)) => { return Json(json!({ @@ -172,10 +169,7 @@ fn handle_get_predicate( match open_readwrite_predicates_db_conn(api_config) { Ok(mut predicates_db_conn) => { - let key: String = format!( - "{}", - ChainhookSpecification::bitcoin_key(&predicate_uuid) - ); + let key: String = format!("{}", ChainhookSpecification::bitcoin_key(&predicate_uuid)); let entry = match get_entry_from_predicates_db(&key, &mut predicates_db_conn, &ctx) { Ok(Some((ChainhookSpecification::Stacks(spec), status))) => json!({ "chain": "stacks", diff --git a/components/hord-cli/src/service/mod.rs b/components/hord-cli/src/service/mod.rs index 04aad2b9..2063d57e 100644 --- a/components/hord-cli/src/service/mod.rs +++ b/components/hord-cli/src/service/mod.rs @@ -5,6 +5,7 @@ use crate::cli::fetch_and_standardize_block; use crate::config::{Config, PredicatesApi, PredicatesApiConfig}; use crate::core::pipeline::processors::inscription_indexing::process_blocks; use crate::core::pipeline::processors::start_inscription_indexing_processor; +use crate::core::pipeline::processors::transfers_recomputing::start_transfers_recomputing_processor; use crate::core::pipeline::{download_and_pipeline_blocks, PostProcessorCommand}; use crate::core::protocol::sequencing::update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx; use crate::core::{ @@ -12,10 +13,10 @@ use crate::core::{ revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db, }; use crate::db::{ - find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_blocks, - insert_entry_in_locations, open_readwrite_hord_db_conn, open_readwrite_hord_dbs, - parse_satpoint_to_watch, remove_entries_from_locations_at_block_height, InscriptionHeigthHint, - LazyBlock, + find_all_inscriptions_in_block, find_latest_inscription_block_height, format_satpoint_to_watch, + insert_entry_in_blocks, insert_entry_in_locations, open_readonly_hord_db_conn, + open_readwrite_hord_db_conn, open_readwrite_hord_dbs, parse_satpoint_to_watch, + remove_entries_from_locations_at_block_height, InscriptionHeigthHint, LazyBlock, initialize_hord_db, }; use crate::scan::bitcoin::process_block_with_predicates; use crate::service::http_api::{load_predicates_from_redis, start_predicate_api_server}; @@ -36,6 +37,7 @@ use chainhook_sdk::types::{ BitcoinBlockData, BitcoinNetwork, OrdinalInscriptionTransferData, OrdinalOperation, }; use chainhook_sdk::utils::Context; +use crossbeam_channel::unbounded; use redis::{Commands, Connection}; use std::collections::BTreeMap; @@ -64,7 +66,9 @@ impl Service { let hord_config = self.config.get_hord_config(); // Sleep - // std::thread::sleep(std::time::Duration::from_secs(180)); + std::thread::sleep(std::time::Duration::from_secs(600)); + + let _ = initialize_hord_db(&self.config.expected_cache_path(), &self.ctx); // Force rebuild // { @@ -84,72 +88,48 @@ impl Service { // )?; // } - // download_and_pipeline_blocks(&self.config, 767400, 767429, 767400, None, &self.ctx).await?; + let (tx_replayer, rx_replayer) = unbounded(); + let mut moved_event_observer_config = event_observer_config.clone(); + let moved_ctx = self.ctx.clone(); - // Catch-up with chain tip - { - // Start predicate processor - let (tx_replayer, rx_replayer) = channel(); - - let blocks_post_processor = - start_inscription_indexing_processor(&self.config, &self.ctx, Some(tx_replayer)); - - let mut moved_event_observer_config = event_observer_config.clone(); - let moved_ctx = self.ctx.clone(); - - let _ = hiro_system_kit::thread_named("Initial predicate processing") - .spawn(move || { - if let Some(mut chainhook_config) = - moved_event_observer_config.chainhook_config.take() - { - let mut bitcoin_predicates_ref: Vec<&BitcoinChainhookSpecification> = - vec![]; - for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut() { - bitcoin_predicates_ref.push(bitcoin_predicate); - } - while let Ok(block) = rx_replayer.recv() { - let future = process_block_with_predicates( - block, - &bitcoin_predicates_ref, - &moved_event_observer_config, - &moved_ctx, - ); - let res = hiro_system_kit::nestable_block_on(future); - if let Err(_) = res { - error!(moved_ctx.expect_logger(), "Initial ingestion failing"); - } + let _ = hiro_system_kit::thread_named("Initial predicate processing") + .spawn(move || { + if let Some(mut chainhook_config) = + moved_event_observer_config.chainhook_config.take() + { + let mut bitcoin_predicates_ref: Vec<&BitcoinChainhookSpecification> = vec![]; + for bitcoin_predicate in chainhook_config.bitcoin_chainhooks.iter_mut() { + bitcoin_predicates_ref.push(bitcoin_predicate); + } + while let Ok(block) = rx_replayer.recv() { + let future = process_block_with_predicates( + block, + &bitcoin_predicates_ref, + &moved_event_observer_config, + &moved_ctx, + ); + let res = hiro_system_kit::nestable_block_on(future); + if let Err(_) = res { + error!(moved_ctx.expect_logger(), "Initial ingestion failing"); } } - }) - .expect("unable to spawn thread"); - - while let Some((start_block, end_block, speed)) = - should_sync_hord_db(&self.config, &self.ctx)? - { - info!( - self.ctx.expect_logger(), - "Indexing inscriptions from block #{start_block} to block #{end_block}" - ); + } + }) + .expect("unable to spawn thread"); - let hord_config = self.config.get_hord_config(); - - download_and_pipeline_blocks( - &self.config, - start_block, - end_block, - hord_config.first_inscription_height, - None, - Some(&blocks_post_processor), - speed, - &self.ctx, - ) - .await?; + let tip = { + let inscriptions_db_conn = + open_readonly_hord_db_conn(&self.config.expected_cache_path(), &self.ctx)?; + match find_latest_inscription_block_height(&inscriptions_db_conn, &self.ctx)? { + Some(height) => height, + None => panic!(), } + }; - let _ = blocks_post_processor - .commands_tx - .send(PostProcessorCommand::Terminate); - } + self.replay_transfers(767430, tip, Some(tx_replayer.clone())).await?; + self.update_state(Some(tx_replayer.clone())).await?; + + // Catch-up with chain tip // Bitcoin scan operation threadpool let (observer_command_tx, observer_command_rx) = channel(); @@ -420,165 +400,85 @@ impl Service { Ok(()) } - pub fn replay_transfers( + pub async fn update_state( &self, - start_block: u64, - end_block: u64, block_post_processor: Option>, ) -> Result<(), String> { - info!(self.ctx.expect_logger(), "Transfers only"); - - let bitcoin_config = BitcoinConfig { - username: self.config.network.bitcoind_rpc_username.clone(), - password: self.config.network.bitcoind_rpc_password.clone(), - rpc_url: self.config.network.bitcoind_rpc_url.clone(), - network: self.config.network.bitcoin_network.clone(), - bitcoin_block_signaling: self.config.network.bitcoin_block_signaling.clone(), - }; - let (tx, rx) = crossbeam_channel::bounded(100); - let moved_ctx = self.ctx.clone(); - hiro_system_kit::thread_named("Block fetch") - .spawn(move || { - let http_client = build_http_client(); - for cursor in start_block..=end_block { - info!(moved_ctx.expect_logger(), "Fetching block {}", cursor); - let future = fetch_and_standardize_block( - &http_client, - cursor, - &bitcoin_config, - &moved_ctx, - ); - - let block = hiro_system_kit::nestable_block_on(future).unwrap(); + // Start predicate processor + let blocks_post_processor = + start_transfers_recomputing_processor(&self.config, &self.ctx, block_post_processor); - let _ = tx.send(block); - } - }) - .unwrap(); - - let mut inscriptions_db_conn_rw = - open_readwrite_hord_db_conn(&self.config.expected_cache_path(), &self.ctx)?; - - while let Ok(mut block) = rx.recv() { - let network = match block.metadata.network { - BitcoinNetwork::Mainnet => Network::Bitcoin, - BitcoinNetwork::Regtest => Network::Regtest, - BitcoinNetwork::Testnet => Network::Testnet, - }; - - info!( - self.ctx.expect_logger(), - "Cleaning transfers from block {}", block.block_identifier.index - ); - let inscriptions = find_all_inscriptions_in_block( - &block.block_identifier.index, - &inscriptions_db_conn_rw, - &self.ctx, - ); + while let Some((start_block, end_block, speed)) = + should_sync_hord_db(&self.config, &self.ctx)? + { info!( self.ctx.expect_logger(), - "{} inscriptions retrieved at block {}", - inscriptions.len(), - block.block_identifier.index + "Indexing inscriptions from block #{start_block} to block #{end_block}" ); - let mut operations = BTreeMap::new(); - - let transaction = inscriptions_db_conn_rw.transaction().unwrap(); - remove_entries_from_locations_at_block_height( - &block.block_identifier.index, - &transaction, + let hord_config = self.config.get_hord_config(); + let first_inscription_height = hord_config.first_inscription_height; + download_and_pipeline_blocks( + &self.config, + start_block, + end_block, + first_inscription_height, + if end_block < first_inscription_height { + Some(&blocks_post_processor) + } else { + None + }, + if end_block < first_inscription_height { + None + } else { + Some(&blocks_post_processor) + }, + speed, &self.ctx, - ); - - for (_, entry) in inscriptions.iter() { - let inscription_id = entry.get_inscription_id(); - info!( - self.ctx.expect_logger(), - "Processing inscription {}", inscription_id - ); - insert_entry_in_locations( - &inscription_id, - block.block_identifier.index, - &entry.transfer_data, - &transaction, - &self.ctx, - ); + ) + .await?; + } - operations.insert( - entry.transaction_identifier_inscription.clone(), - OrdinalInscriptionTransferData { - inscription_id: entry.get_inscription_id(), - updated_address: None, - satpoint_pre_transfer: format_satpoint_to_watch( - &entry.transaction_identifier_inscription, - entry.inscription_input_index, - 0, - ), - satpoint_post_transfer: format_satpoint_to_watch( - &entry.transfer_data.transaction_identifier_location, - entry.transfer_data.output_index, - entry.transfer_data.inscription_offset_intra_output, - ), - post_transfer_output_value: None, - tx_index: 0, - }, - ); - } + let _ = blocks_post_processor + .commands_tx + .send(PostProcessorCommand::Terminate); - info!( - self.ctx.expect_logger(), - "Rewriting transfers for block {}", block.block_identifier.index - ); + Ok(()) + } - for tx in block.transactions.iter_mut() { - tx.metadata.ordinal_operations.clear(); - if let Some(mut entry) = operations.remove(&tx.transaction_identifier) { - let (_, output_index, _) = - parse_satpoint_to_watch(&entry.satpoint_post_transfer); - - let script_pub_key_hex = - tx.metadata.outputs[output_index].get_script_pubkey_hex(); - let updated_address = match Script::from_hex(&script_pub_key_hex) { - Ok(script) => match Address::from_script(&script, network.clone()) { - Ok(address) => Some(address.to_string()), - Err(_e) => None, - }, - Err(_e) => None, - }; - - entry.updated_address = updated_address; - entry.post_transfer_output_value = - Some(tx.metadata.outputs[output_index].value); - - tx.metadata - .ordinal_operations - .push(OrdinalOperation::InscriptionTransferred(entry)); - } - } + pub async fn replay_transfers( + &self, + start_block: u64, + end_block: u64, + block_post_processor: Option>, + ) -> Result<(), String> { + // Start predicate processor + let blocks_post_processor = + start_transfers_recomputing_processor(&self.config, &self.ctx, block_post_processor); - update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx( - &mut block, - &transaction, - &self.ctx, - ) - .unwrap(); + info!( + self.ctx.expect_logger(), + "Indexing inscriptions from block #{start_block} to block #{end_block}" + ); - info!( - self.ctx.expect_logger(), - "Saving supdates for block {}", block.block_identifier.index - ); - transaction.commit().unwrap(); + let hord_config = self.config.get_hord_config(); + let first_inscription_height = hord_config.first_inscription_height; + download_and_pipeline_blocks( + &self.config, + start_block, + end_block, + first_inscription_height, + None, + Some(&blocks_post_processor), + 100, + &self.ctx, + ) + .await?; - info!( - self.ctx.expect_logger(), - "Transfers in block {} repaired", block.block_identifier.index - ); + let _ = blocks_post_processor + .commands_tx + .send(PostProcessorCommand::Terminate); - if let Some(ref tx) = block_post_processor { - let _ = tx.send(block); - } - } Ok(()) } }