Skip to content

Commit

Permalink
feat: streamline processors
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Aug 2, 2023
1 parent c0991c5 commit 13421db
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 30 deletions.
14 changes: 8 additions & 6 deletions components/hord-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::archive::download_ordinals_dataset_if_required;
use crate::config::generator::generate_config;
use crate::config::Config;
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::pipeline::processors::start_ordinals_number_processor;
use crate::core::pipeline::processors::block_ingestion::start_block_ingestion_processor;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::{self};
use crate::scan::bitcoin::scan_bitcoin_chainstate_via_rpc_using_predicate;
use crate::service::Service;
Expand Down Expand Up @@ -622,12 +623,14 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;

let blocks_post_processor = start_block_ingestion_processor(&config, ctx, None);

download_and_pipeline_blocks(
&config,
cmd.start_block,
cmd.end_block,
hord_config.first_inscription_height,
None,
Some(&blocks_post_processor),
&ctx,
)
.await?
Expand All @@ -637,19 +640,18 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;

let (tx, handle) = start_ordinals_number_processor(&config, ctx, None);
let blocks_post_processor =
start_inscription_indexing_processor(&config, ctx, None);

download_and_pipeline_blocks(
&config,
cmd.start_block,
cmd.end_block,
hord_config.first_inscription_height,
Some(tx),
Some(&blocks_post_processor),
&ctx,
)
.await?;

let _ = handle.join();
}
RepairCommand::Transfers(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
Expand Down
35 changes: 31 additions & 4 deletions components/hord-cli/src/core/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use chainhook_sdk::types::BitcoinBlockData;
use chainhook_sdk::utils::Context;
use crossbeam_channel::bounded;
use std::collections::{HashMap, VecDeque};
use std::thread::sleep;
use std::thread::{sleep, JoinHandle};
use std::time::Duration;
use tokio::task::JoinSet;

Expand All @@ -18,12 +18,27 @@ use chainhook_sdk::indexer::bitcoin::{

use super::parse_ordinals_and_standardize_block;

pub enum PostProcessorCommand {
ProcessBlocks(Vec<(BitcoinBlockData, LazyBlock)>),
Terminate,
}

pub enum PostProcessorEvent {
EmptyQueue,
}

pub struct PostProcessorController {
pub commands_tx: crossbeam_channel::Sender<PostProcessorCommand>,
pub events_rx: crossbeam_channel::Receiver<PostProcessorEvent>,
pub thread_handle: JoinHandle<()>,
}

pub async fn download_and_pipeline_blocks(
config: &Config,
start_block: u64,
end_block: u64,
start_sequencing_blocks_at_height: u64,
blocks_post_processor: Option<crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>>,
blocks_post_processor: Option<&PostProcessorController>,
ctx: &Context,
) -> Result<(), String> {
// let guard = pprof::ProfilerGuardBuilder::default()
Expand Down Expand Up @@ -111,6 +126,10 @@ pub async fn download_and_pipeline_blocks(

let cloned_ctx = ctx.clone();

let post_processor_commands_tx = blocks_post_processor
.as_ref()
.and_then(|p| Some(p.commands_tx.clone()));

let storage_thread = hiro_system_kit::thread_named("Ordered blocks dispatcher")
.spawn(move || {
let mut inbox = HashMap::new();
Expand Down Expand Up @@ -151,8 +170,8 @@ pub async fn download_and_pipeline_blocks(
inbox_cursor += 1;
}
if !chunk.is_empty() {
if let Some(ref tx) = blocks_post_processor {
let _ = tx.send(chunk);
if let Some(ref blocks_tx) = post_processor_commands_tx {
let _ = blocks_tx.send(PostProcessorCommand::ProcessBlocks(chunk));
}
} else {
if blocks_processed == number_of_blocks_to_process {
Expand Down Expand Up @@ -200,6 +219,14 @@ pub async fn download_and_pipeline_blocks(
let _ = handle.join();
}

if let Some(post_processor) = blocks_post_processor {
loop {
if let Ok(PostProcessorEvent::EmptyQueue) = post_processor.events_rx.recv() {
break;
}
}
}

let _ = storage_thread.join();
let _ = set.shutdown();

Expand Down
104 changes: 104 additions & 0 deletions components/hord-cli/src/core/pipeline/processors/block_ingestion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::{
sync::mpsc::Sender,
thread::{sleep, JoinHandle},
time::Duration,
};

use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
use crossbeam_channel::TryRecvError;

use crate::{
config::Config,
core::pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
db::{insert_entry_in_blocks, open_readwrite_hord_db_conn_rocks_db},
};

pub fn start_block_ingestion_processor(
config: &Config,
ctx: &Context,
_post_processor: Option<Sender<BitcoinBlockData>>,
) -> PostProcessorController {
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();

let config = config.clone();
let ctx = ctx.clone();
let handle: JoinHandle<()> = hiro_system_kit::thread_named("Processor Runloop")
.spawn(move || {
let mut num_writes = 0;
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();

let mut empty_cycles = 0;

loop {
let blocks = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(blocks)) => blocks,
Ok(PostProcessorCommand::Terminate) => break,
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;

if empty_cycles == 30 {
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
}
sleep(Duration::from_secs(1));
if empty_cycles > 120 {
break;
}
continue;
}
_ => {
break;
}
},
};

info!(ctx.expect_logger(), "Storing {} blocks", blocks.len());

for (block, compacted_block) in blocks.into_iter() {
insert_entry_in_blocks(
block.block_identifier.index as u32,
&compacted_block,
&blocks_db_rw,
&ctx,
);
num_writes += 1;
}

// Early return
if num_writes % 128 == 0 {
ctx.try_log(|logger| {
info!(logger, "Flushing DB to disk ({num_writes} inserts)");
});
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
num_writes = 0;
continue;
}

// Write blocks to disk, before traversals
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
}

if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
error!(logger, "{}", e.to_string());
});
}
})
.expect("unable to spawn thread");

PostProcessorController {
commands_tx,
events_rx,
thread_handle: handle,
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
use std::{
sync::{mpsc::Sender, Arc},
thread::JoinHandle,
thread::{sleep, JoinHandle},
time::Duration,
};

use chainhook_sdk::{types::BitcoinBlockData, utils::Context};
use crossbeam_channel::TryRecvError;

use crate::{
config::Config,
core::{new_traversals_lazy_cache, protocol::sequencing::process_blocks},
core::{
new_traversals_lazy_cache,
pipeline::{PostProcessorCommand, PostProcessorController, PostProcessorEvent},
protocol::sequencing::process_blocks,
},
db::{
insert_entry_in_blocks, open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
InscriptionHeigthHint, LazyBlock,
InscriptionHeigthHint,
},
};

pub fn start_ordinals_number_processor(
pub fn start_inscription_indexing_processor(
config: &Config,
ctx: &Context,
post_processor: Option<Sender<BitcoinBlockData>>,
) -> (
crossbeam_channel::Sender<Vec<(BitcoinBlockData, LazyBlock)>>,
JoinHandle<()>,
) {
let (tx, rx) = crossbeam_channel::bounded::<Vec<(BitcoinBlockData, LazyBlock)>>(1);
) -> PostProcessorController {
let (commands_tx, commands_rx) = crossbeam_channel::bounded::<PostProcessorCommand>(2);
let (events_tx, events_rx) = crossbeam_channel::unbounded::<PostProcessorEvent>();

let config = config.clone();
let ctx = ctx.clone();
Expand All @@ -41,16 +45,39 @@ pub fn start_ordinals_number_processor(
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx).unwrap();

let mut inscription_height_hint = InscriptionHeigthHint::new();
let mut empty_cycles = 0;

loop {
let blocks_to_process = match commands_rx.try_recv() {
Ok(PostProcessorCommand::ProcessBlocks(blocks)) => blocks,
Ok(PostProcessorCommand::Terminate) => break,
Err(e) => match e {
TryRecvError::Empty => {
empty_cycles += 1;

if empty_cycles == 30 {
let _ = events_tx.send(PostProcessorEvent::EmptyQueue);
}
sleep(Duration::from_secs(1));
if empty_cycles > 120 {
break;
}
continue;
}
_ => {
break;
}
},
};

while let Ok(raw_blocks) = rx.recv() {
info!(
ctx.expect_logger(),
"Processing {} blocks",
raw_blocks.len()
blocks_to_process.len()
);

let mut blocks = vec![];
for (block, compacted_block) in raw_blocks.into_iter() {
for (block, compacted_block) in blocks_to_process.into_iter() {
insert_entry_in_blocks(
block.block_identifier.index as u32,
&compacted_block,
Expand Down Expand Up @@ -118,5 +145,9 @@ pub fn start_ordinals_number_processor(
})
.expect("unable to spawn thread");

(tx, handle)
PostProcessorController {
commands_tx,
events_rx,
thread_handle: handle,
}
}
3 changes: 2 additions & 1 deletion components/hord-cli/src/core/pipeline/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod block_ingestion;
pub mod inscription_indexing;

pub use inscription_indexing::start_ordinals_number_processor;
pub use inscription_indexing::start_inscription_indexing_processor;
10 changes: 4 additions & 6 deletions components/hord-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod runloops;
use crate::cli::fetch_and_standardize_block;
use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::core::pipeline::download_and_pipeline_blocks;
use crate::core::pipeline::processors::start_ordinals_number_processor;
use crate::core::pipeline::processors::start_inscription_indexing_processor;
use crate::core::protocol::sequencing::{
update_hord_db_and_augment_bitcoin_block_v3,
update_storage_and_augment_bitcoin_block_with_inscription_transfer_data_tx,
Expand Down Expand Up @@ -93,8 +93,8 @@ impl Service {
// Start predicate processor
let (tx_replayer, rx_replayer) = channel();

let (tx, handle) =
start_ordinals_number_processor(&self.config, &self.ctx, Some(tx_replayer));
let blocks_post_processor =
start_inscription_indexing_processor(&self.config, &self.ctx, Some(tx_replayer));

let mut moved_event_observer_config = event_observer_config.clone();
let moved_ctx = self.ctx.clone();
Expand Down Expand Up @@ -141,13 +141,11 @@ impl Service {
start_block,
end_block,
hord_config.first_inscription_height,
Some(tx.clone()),
Some(&blocks_post_processor),
&self.ctx,
)
.await?;
}

let _ = handle.join();
}

// Bitcoin scan operation threadpool
Expand Down

0 comments on commit 13421db

Please sign in to comment.