Skip to content

Commit

Permalink
fix: boot sequence, logs, format
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Jul 24, 2023
1 parent 90af1de commit d03f851
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 47 deletions.
8 changes: 1 addition & 7 deletions components/hord-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,13 +726,7 @@ async fn handle_command(opts: Opts, ctx: &Context) -> Result<(), String> {
let mut hord_config = config.get_hord_config();
hord_config.network_thread_max = cmd.network_threads;

rebuild_rocks_db(
&config,
cmd.start_block,
cmd.end_block,
&ctx,
)
.await?
rebuild_rocks_db(&config, cmd.start_block, cmd.end_block, &ctx).await?
}
RepairCommand::Transfers(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
Expand Down
64 changes: 41 additions & 23 deletions components/hord-cli/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use std::{

use chainhook_sdk::{
indexer::bitcoin::{
build_http_client, download_block_with_retry, retrieve_block_hash_with_retry, try_download_block_bytes_with_retry, parse_fetched_block, download_block, parse_downloaded_block,
build_http_client, download_block, download_block_with_retry, parse_downloaded_block,
parse_fetched_block, retrieve_block_hash_with_retry, try_download_block_bytes_with_retry,
},
types::{
BitcoinBlockData, BlockIdentifier, OrdinalInscriptionRevealData,
Expand All @@ -21,18 +22,21 @@ use rand::{thread_rng, Rng};

use rocksdb::DB;
use rusqlite::{Connection, OpenFlags, ToSql, Transaction};
use tokio::task::JoinSet;
use std::io::Cursor;
use std::io::{Read, Write};
use threadpool::ThreadPool;
use tokio::task::JoinSet;

use chainhook_sdk::{
indexer::bitcoin::BitcoinBlockFullBreakdown, observer::BitcoinConfig, utils::Context,
};

use crate::{hord::{self, HordConfig}, config::Config};
use crate::hord::{new_traversals_lazy_cache, update_hord_db_and_augment_bitcoin_block};
use crate::ord::{height::Height, sat::Sat};
use crate::{
config::Config,
hord::{self, HordConfig},
};

fn get_default_hord_db_file_path(base_dir: &PathBuf) -> PathBuf {
let mut destination_path = base_dir.clone();
Expand Down Expand Up @@ -973,6 +977,10 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
};
let _ = block_data_tx.send(res);
});
// TODO: remove this join?
if block_height >= ordinal_computing_height {
let _ = retrieve_block_data_pool.join();
}
}
let res = retrieve_block_data_pool.join();
res
Expand All @@ -994,6 +1002,9 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
block_data,
)));
});
if block_height >= ordinal_computing_height {
let _ = compress_block_data_pool.join();
}
}
let res = compress_block_data_pool.join();
res
Expand All @@ -1018,10 +1029,15 @@ pub async fn fetch_and_cache_blocks_in_hord_db(

// Should we start look for inscriptions data in blocks?
if raw_block.height as u64 >= ordinal_computing_height {
if cursor == 0 {
if (cursor as u64) < ordinal_computing_height {
cursor = raw_block.height;
}
ctx.try_log(|logger| slog::info!(logger, "Queueing compacted block #{block_height}",));
ctx.try_log(|logger| {
slog::info!(
logger,
"Queueing compacted block #{block_height} (#{cursor})",
)
});
// Is the action of processing a block allows us
// to process more blocks present in the inbox?
inbox.insert(raw_block.height, raw_block);
Expand Down Expand Up @@ -1871,7 +1887,6 @@ pub async fn rebuild_rocks_db(
let number_of_blocks_to_process = end_block - start_block + 1;
let (block_req_lim, block_process_lim) = (128, 128);


let (block_data_tx, block_data_rx) = crossbeam_channel::bounded(block_req_lim);
let compress_block_data_pool = ThreadPool::new(hord_config.ingestion_thread_max);
let (block_compressed_tx, block_compressed_rx) = crossbeam_channel::bounded(block_process_lim);
Expand Down Expand Up @@ -1902,7 +1917,7 @@ pub async fn rebuild_rocks_db(

let _ = hiro_system_kit::thread_named("Block data compression")
.spawn(move || {
while let Ok(Some(block_bytes)) = block_data_rx.recv() {
while let Ok(Some(block_bytes)) = block_data_rx.recv() {
let block_compressed_tx_moved = block_compressed_tx.clone();
compress_block_data_pool.execute(move || {
let block_data = parse_downloaded_block(block_bytes).unwrap();
Expand All @@ -1919,43 +1934,46 @@ pub async fn rebuild_rocks_db(
let res = compress_block_data_pool.join();
res
})
.expect("unable to spawn thread");
.expect("unable to spawn thread");

let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let blocks_db_rw = open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let cloned_ctx = ctx.clone();
let ingestion_thread = hiro_system_kit::thread_named("Block data ingestion")
.spawn(move || {
let mut blocks_stored = 0;
let mut num_writes = 0;

while let Ok(Some((block_height, compacted_block, _raw_block))) = block_compressed_rx.recv() {

while let Ok(Some((block_height, compacted_block, _raw_block))) =
block_compressed_rx.recv()
{
insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &cloned_ctx);
blocks_stored += 1;
num_writes += 1;

// In the context of ordinals, we're constrained to process blocks sequentially
// Blocks are processed by a threadpool and could be coming out of order.
// Inbox block for later if the current block is not the one we should be
// processing.

// Should we start look for inscriptions data in blocks?
cloned_ctx.try_log(|logger| slog::info!(logger, "Storing compacted block #{block_height}",));

cloned_ctx.try_log(|logger| {
slog::info!(logger, "Storing compacted block #{block_height}",)
});

if blocks_stored == number_of_blocks_to_process {
cloned_ctx.try_log(|logger| {
slog::info!(
logger,
"Local block storage successfully seeded with #{blocks_stored} blocks"
)
});

// match guard.report().build() {
// Ok(report) => {
// ctx.try_log(|logger| {
// slog::info!(logger, "Generating report");
// });

// let file = std::fs::File::create("hord-perf.svg").unwrap();
// report.flamegraph(file).unwrap();
// }
Expand All @@ -1966,7 +1984,7 @@ pub async fn rebuild_rocks_db(
// }
// }
}

if num_writes % 128 == 0 {
cloned_ctx.try_log(|logger| {
slog::info!(logger, "Flushing DB to disk ({num_writes} inserts)");
Expand All @@ -1979,20 +1997,20 @@ pub async fn rebuild_rocks_db(
num_writes = 0;
}
}

if let Err(e) = blocks_db_rw.flush() {
cloned_ctx.try_log(|logger| {
slog::error!(logger, "{}", e.to_string());
});
}
()
}).expect("unable to spawn thread");
})
.expect("unable to spawn thread");

while let Some(res) = set.join_next().await {
let block = res.unwrap().unwrap();

let _ = block_data_tx
.send(Some(block));
let _ = block_data_tx.send(Some(block));

if let Some(block_height) = block_heights.pop_front() {
let config = moved_config.clone();
Expand Down
33 changes: 18 additions & 15 deletions components/hord-cli/src/hord/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ pub fn update_hord_db_and_augment_bitcoin_block(
&inner_ctx,
)?;

if !any_inscription_revealed && !any_inscription_transferred {
return Ok(());
}

if discard_changes {
ctx.try_log(|logger| {
slog::info!(
Expand All @@ -462,22 +466,21 @@ pub fn update_hord_db_and_augment_bitcoin_block(
)
});
}
if any_inscription_revealed || any_inscription_transferred {
let inscriptions_revealed = get_inscriptions_revealed_in_block(&new_block)
.iter()
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();

ctx.try_log(|logger| {
slog::info!(
logger,
"Block #{} processed through hord, revealing {} inscriptions [{}]",
new_block.block_identifier.index,
inscriptions_revealed.len(),
inscriptions_revealed.join(", ")
)
});
}
let inscriptions_revealed = get_inscriptions_revealed_in_block(&new_block)
.iter()
.map(|d| d.inscription_number.to_string())
.collect::<Vec<String>>();

ctx.try_log(|logger| {
slog::info!(
logger,
"Block #{} processed through hord, revealing {} inscriptions [{}]",
new_block.block_identifier.index,
inscriptions_revealed.len(),
inscriptions_revealed.join(", ")
)
});
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions components/hord-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::config::{Config, PredicatesApi, PredicatesApiConfig};
use crate::db::{
find_all_inscriptions_in_block, format_satpoint_to_watch, insert_entry_in_locations,
open_readwrite_hord_db_conn, open_readwrite_hord_dbs, parse_satpoint_to_watch,
remove_entries_from_locations_at_block_height, rebuild_rocks_db,
rebuild_rocks_db, remove_entries_from_locations_at_block_height,
};
use crate::hord::{
new_traversals_lazy_cache, revert_hord_db_with_augmented_bitcoin_block, should_sync_hord_db,
Expand Down Expand Up @@ -152,7 +152,7 @@ impl Service {
})
.expect("unable to spawn thread");

rebuild_rocks_db(&self.config, 420000, 767420, &self.ctx).await?;
// rebuild_rocks_db(&self.config, 420000, 767420, &self.ctx).await?;

while let Some((start_block, end_block)) = should_sync_hord_db(&self.config, &self.ctx)? {
if start_block == 0 {
Expand Down

0 comments on commit d03f851

Please sign in to comment.