Skip to content

Commit

Permalink
chore: migration commands
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Apr 9, 2023
1 parent 43d37d7 commit a88cec4
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 24 deletions.
136 changes: 117 additions & 19 deletions components/chainhook-cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use chainhook_event_observer::chainhooks::types::{
StacksPrintEventBasedPredicate,
};
use chainhook_event_observer::hord::db::{
delete_data_in_hord_db, fetch_and_cache_blocks_in_hord_db, find_block_at_block_height_sqlite,
delete_blocks_in_block_range_sqlite, delete_data_in_hord_db, fetch_and_cache_blocks_in_hord_db,
find_block_at_block_height, find_block_at_block_height_sqlite,
find_inscriptions_at_wached_outpoint, find_last_block_inserted, initialize_hord_db,
insert_entry_in_blocks, open_readonly_hord_db_conn, open_readonly_hord_db_conn_rocks_db,
open_readwrite_hord_db_conn, open_readwrite_hord_db_conn_rocks_db,
Expand Down Expand Up @@ -202,6 +203,12 @@ enum DbCommand {
/// Patch DB
#[clap(name = "patch", bin_name = "patch")]
Patch(PatchHordDbCommand),
/// Check integrity
#[clap(name = "check", bin_name = "check")]
Check(CheckHordDbCommand),
/// Legacy command
#[clap(name = "init", bin_name = "init")]
Init(InitHordDbCommand),
}

#[derive(Subcommand, PartialEq, Clone, Debug)]
Expand Down Expand Up @@ -300,6 +307,22 @@ struct PatchHordDbCommand {
pub config_path: Option<String>,
}

#[derive(Parser, PartialEq, Clone, Debug)]
struct CheckHordDbCommand {
/// Load config file path
#[clap(long = "config-path")]
pub config_path: Option<String>,
}

#[derive(Parser, PartialEq, Clone, Debug)]
struct InitHordDbCommand {
/// Load config file path
#[clap(long = "config-path")]
pub config_path: Option<String>,
/// # of Networking thread
pub network_threads: usize,
}

pub fn main() {
let logger = hiro_system_kit::log::setup_logger();
let _guard = hiro_system_kit::log::setup_global_logger(logger.clone());
Expand Down Expand Up @@ -554,6 +577,65 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
}
},
Command::Hord(HordCommand::Db(subcmd)) => match subcmd {
DbCommand::Init(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;

let sqlite_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;

// Migrate if required
if find_block_at_block_height_sqlite(1, &sqlite_db_conn_rw).is_some() {
let blocks_db =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;

for i in 0..783986 {
match find_block_at_block_height_sqlite(i, &sqlite_db_conn_rw) {
Some(block) => {
insert_entry_in_blocks(i, &block, &blocks_db, &ctx);
info!(ctx.expect_logger(), "Block #{} inserted", i);
}
None => {
info!(ctx.expect_logger(), "Block #{} missing", i);
}
}
}

let _ = blocks_db.flush();

delete_blocks_in_block_range_sqlite(0, 783986, &sqlite_db_conn_rw, &ctx);
}

// Sync
for _ in 0..5 {
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
if start_block == 0 {
info!(
ctx.expect_logger(),
"Initializing hord indexing from block #{}", start_block
);
} else {
info!(
ctx.expect_logger(),
"Resuming hord indexing from block #{}", start_block
);
}
perform_hord_db_update(
start_block,
end_block,
10,
&config,
&ctx,
)
.await?;
} else {
info!(ctx.expect_logger(), "Database hord up to date");
}
}

// Start node
let mut service = Service::new(config, ctx);
return service.run(vec![]).await;
}
DbCommand::Sync(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
if let Some((start_block, end_block)) = should_sync_hord_db(&config, &ctx)? {
Expand Down Expand Up @@ -583,19 +665,20 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
DbCommand::Rewrite(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
// Delete data, if any
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;

delete_data_in_hord_db(
cmd.start_block,
cmd.end_block,
&blocks_db_rw,
&inscriptions_db_conn_rw,
&ctx,
)?;

{
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;
let inscriptions_db_conn_rw =
open_readwrite_hord_db_conn(&config.expected_cache_path(), &ctx)?;

delete_data_in_hord_db(
cmd.start_block,
cmd.end_block,
&blocks_db_rw,
&inscriptions_db_conn_rw,
&ctx,
)?;
}
// Update data
perform_hord_db_update(
cmd.start_block,
Expand All @@ -606,6 +689,23 @@ async fn handle_command(opts: Opts, ctx: Context) -> Result<(), String> {
)
.await?;
}
DbCommand::Check(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
// Delete data, if any
{
let blocks_db_rw =
open_readwrite_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)?;

let mut missing_blocks = vec![];
for i in 1..=780000 {
if find_block_at_block_height(i, &blocks_db_rw).is_none() {
println!("Missing block {i}");
missing_blocks.push(i);
}
}
println!("{:?}", missing_blocks);
}
}
DbCommand::Drop(cmd) => {
let config = Config::default(false, false, false, &cmd.config_path)?;
let blocks_db =
Expand Down Expand Up @@ -664,13 +764,11 @@ pub fn should_sync_hord_db(config: &Config, ctx: &Context) -> Result<Option<(u64
}
};

let start_block = match open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx) {
let start_block = match open_readonly_hord_db_conn_rocks_db(&config.expected_cache_path(), &ctx)
{
Ok(blocks_db) => find_last_block_inserted(&blocks_db) as u64,
Err(err) => {
warn!(
ctx.expect_logger(),
"{}", err
);
warn!(ctx.expect_logger(), "{}", err);
0
}
};
Expand Down
43 changes: 38 additions & 5 deletions components/chainhook-event-observer/src/hord/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,9 @@ pub fn open_readonly_hord_db_conn_rocks_db(
let path = get_default_hord_db_file_path_rocks_db(&base_dir);
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts.set_max_open_files(1000);
let db = DB::open_for_read_only(&opts, path, false).map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
let db = DB::open_for_read_only(&opts, path, false)
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
Ok(db)
}

Expand All @@ -358,9 +358,9 @@ pub fn open_readwrite_hord_db_conn_rocks_db(
let path = get_default_hord_db_file_path_rocks_db(&base_dir);
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
opts.set_max_open_files(1000);
let db = DB::open(&opts, path).map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
let db = DB::open(&opts, path)
.map_err(|e| format!("unable to open blocks_db: {}", e.to_string()))?;
Ok(db)
}

Expand Down Expand Up @@ -437,6 +437,20 @@ pub fn delete_blocks_in_block_range(
}
}

pub fn delete_blocks_in_block_range_sqlite(
start_block: u32,
end_block: u32,
rw_hord_db_conn: &Connection,
ctx: &Context,
) {
if let Err(e) = rw_hord_db_conn.execute(
"DELETE FROM blocks WHERE id >= ?1 AND id <= ?2",
rusqlite::params![&start_block, &end_block],
) {
ctx.try_log(|logger| slog::error!(logger, "{}", e.to_string()));
}
}

pub fn store_new_inscription(
inscription_data: &OrdinalInscriptionRevealData,
block_identifier: &BlockIdentifier,
Expand Down Expand Up @@ -722,11 +736,12 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
let mut blocks_stored = 0;
let mut cursor = 1 + start_block as usize;
let mut inbox = HashMap::new();
let mut num_writes = 0;

while let Ok(Some((block_height, compacted_block, raw_block))) = block_compressed_rx.recv() {
insert_entry_in_blocks(block_height, &compacted_block, &blocks_db_rw, &ctx);
blocks_stored += 1;

num_writes += 1;
inbox.insert(raw_block.height, raw_block);

// In the context of ordinals, we're constrained to process blocks sequentially
Expand Down Expand Up @@ -776,6 +791,24 @@ pub async fn fetch_and_cache_blocks_in_hord_db(
});
return Ok(());
}

if num_writes > 5000 {
ctx.try_log(|logger| {
slog::info!(logger, "Flushing DB to disk...");
});
if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
slog::error!(logger, "{}", e.to_string());
});
}
num_writes = 0;
}
}

if let Err(e) = blocks_db_rw.flush() {
ctx.try_log(|logger| {
slog::error!(logger, "{}", e.to_string());
});
}

retrieve_block_hash_pool.join();
Expand Down

0 comments on commit a88cec4

Please sign in to comment.