From 675fd1eaf1fecef46ace6d426affc6885a2a0ae6 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 3 Sep 2020 11:30:21 -0700 Subject: [PATCH 1/9] Add --enable-bigtable-ledger-upload flag --- core/src/rpc.rs | 1 + core/src/rpc_service.rs | 32 ++++++++++++++++++-------------- validator/src/main.rs | 8 ++++++++ 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 58e53cef5f3..3a944245f49 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -87,6 +87,7 @@ pub struct JsonRpcConfig { pub faucet_addr: Option, pub health_check_slot_distance: u64, pub enable_bigtable_ledger_storage: bool, + pub enable_bigtable_ledger_upload: bool, } #[derive(Clone)] diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 5f29052870a..c3fa2707932 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -272,20 +272,24 @@ impl JsonRpcService { .build() .expect("Runtime"); - let bigtable_ledger_storage = if config.enable_bigtable_ledger_storage { - runtime - .block_on(solana_storage_bigtable::LedgerStorage::new(false)) - .map(|x| { - info!("BigTable ledger storage initialized"); - Some(x) - }) - .unwrap_or_else(|err| { - error!("Failed to initialize BigTable ledger storage: {:?}", err); - None - }) - } else { - None - }; + let bigtable_ledger_storage = + if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload { + runtime + .block_on(solana_storage_bigtable::LedgerStorage::new( + config.enable_bigtable_ledger_upload, + )) + .map(|x| { + info!("BigTable ledger storage initialized"); + Some(x) + }) + .unwrap_or_else(|err| { + error!("Failed to initialize BigTable ledger storage: {:?}", err); + None + }) + } else { + None + }; + let request_processor = JsonRpcRequestProcessor::new( config, bank_forks.clone(), diff --git a/validator/src/main.rs b/validator/src/main.rs index b36624baab1..b366636beb2 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -653,6 +653,13 @@ pub fn main() { .help("Fetch historical transaction info from a BigTable instance \ as a fallback to local ledger data"), ) + .arg( + Arg::with_name("enable_bigtable_ledger_upload") + .long("enable-bigtable-ledger-upload") + .requires("enable_rpc_transaction_history") + .takes_value(false) + .help("Upload new confirmed blocks into a BigTable instance"), + ) .arg( Arg::with_name("health_check_slot_distance") .long("health-check-slot-distance") @@ -970,6 +977,7 @@ pub fn main() { enable_rpc_transaction_history: matches.is_present("enable_rpc_transaction_history"), enable_bigtable_ledger_storage: matches .is_present("enable_rpc_bigtable_ledger_storage"), + enable_bigtable_ledger_upload: matches.is_present("enable_bigtable_ledger_upload"), identity_pubkey: identity_keypair.pubkey(), faucet_addr: matches.value_of("rpc_faucet_addr").map(|address| { solana_net_utils::parse_host_port(address).expect("failed to parse faucet address") From 79622f665745ba304f4b3bcdea060e9b21c17abd Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 3 Sep 2020 19:39:16 -0700 Subject: [PATCH 2/9] Relocate BigTable uploader to ledger/ crate --- ledger-tool/src/bigtable.rs | 203 ++------------------------------- ledger/Cargo.toml | 3 + ledger/src/bigtable_upload.rs | 206 ++++++++++++++++++++++++++++++++++ ledger/src/lib.rs | 1 + 4 files changed, 219 insertions(+), 194 deletions(-) create mode 100644 ledger/src/bigtable_upload.rs diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 3f137f1d3e1..725789922eb 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -1,23 +1,14 @@ /// The `bigtable` subcommand use clap::{value_t, value_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand}; -use log::*; use solana_clap_utils::{ input_parsers::pubkey_of, input_validators::{is_slot, is_valid_pubkey}, }; use solana_cli::display::println_transaction; use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType}; -use solana_measure::measure::Measure; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; -use std::{collections::HashSet, path::Path, process::exit, result::Result, time::Duration}; -use tokio::time::delay_for; - -// Attempt to upload this many blocks in parallel -const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; - -// Read up to this many blocks from blockstore before blocking on the upload process -const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2; +use std::{path::Path, process::exit, result::Result, sync::Arc}; async fn upload( blockstore: Blockstore, @@ -25,194 +16,18 @@ async fn upload( ending_slot: Option, allow_missing_metadata: bool, ) -> Result<(), Box> { - let mut measure = Measure::start("entire upload"); - let bigtable = solana_storage_bigtable::LedgerStorage::new(false) .await .map_err(|err| format!("Failed to connect to storage: {:?}", err))?; - info!("Loading ledger slots..."); - let blockstore_slots: Vec<_> = blockstore - .slot_meta_iterator(starting_slot) - .map_err(|err| { - format!( - "Failed to load entries starting from slot {}: {:?}", - starting_slot, err - ) - })? - .filter_map(|(slot, _slot_meta)| { - if let Some(ending_slot) = &ending_slot { - if slot > *ending_slot { - return None; - } - } - Some(slot) - }) - .collect(); - - if blockstore_slots.is_empty() { - info!("Ledger has no slots in the specified range"); - return Ok(()); - } - info!( - "Found {} slots in the range ({}, {})", - blockstore_slots.len(), - blockstore_slots.first().unwrap(), - blockstore_slots.last().unwrap() - ); - - let mut blockstore_slots_with_no_confirmed_block = HashSet::new(); - - // Gather the blocks that are already present in bigtable, by slot - let bigtable_slots = { - let mut bigtable_slots = vec![]; - let first_blockstore_slot = *blockstore_slots.first().unwrap(); - let last_blockstore_slot = *blockstore_slots.last().unwrap(); - info!( - "Loading list of bigtable blocks between slots {} and {}...", - first_blockstore_slot, last_blockstore_slot - ); - - let mut start_slot = *blockstore_slots.first().unwrap(); - while start_slot <= last_blockstore_slot { - let mut next_bigtable_slots = loop { - match bigtable.get_confirmed_blocks(start_slot, 1000).await { - Ok(slots) => break slots, - Err(err) => { - error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err); - // Consider exponential backoff... - delay_for(Duration::from_secs(2)).await; - } - } - }; - if next_bigtable_slots.is_empty() { - break; - } - bigtable_slots.append(&mut next_bigtable_slots); - start_slot = bigtable_slots.last().unwrap() + 1; - } - bigtable_slots - .into_iter() - .filter(|slot| *slot <= last_blockstore_slot) - .collect::>() - }; - - // The blocks that still need to be uploaded is the difference between what's already in the - // bigtable and what's in blockstore... - let blocks_to_upload = { - let blockstore_slots = blockstore_slots.iter().cloned().collect::>(); - let bigtable_slots = bigtable_slots.into_iter().collect::>(); - - let mut blocks_to_upload = blockstore_slots - .difference(&blockstore_slots_with_no_confirmed_block) - .cloned() - .collect::>() - .difference(&bigtable_slots) - .cloned() - .collect::>(); - blocks_to_upload.sort(); - blocks_to_upload - }; - - if blocks_to_upload.is_empty() { - info!("No blocks need to be uploaded to bigtable"); - return Ok(()); - } - info!( - "{} blocks to be uploaded to the bucket in the range ({}, {})", - blocks_to_upload.len(), - blocks_to_upload.first().unwrap(), - blocks_to_upload.last().unwrap() - ); - - // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading - let (_loader_thread, receiver) = { - let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH); - ( - std::thread::spawn(move || { - let mut measure = Measure::start("block loader thread"); - for (i, slot) in blocks_to_upload.iter().enumerate() { - let _ = match blockstore.get_confirmed_block( - *slot, - Some(solana_transaction_status::UiTransactionEncoding::Base64), - ) { - Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))), - Err(err) => { - warn!( - "Failed to get load confirmed block from slot {}: {:?}", - slot, err - ); - sender.send((*slot, None)) - } - }; - - if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { - info!( - "{}% of blocks processed ({}/{})", - i * 100 / blocks_to_upload.len(), - i, - blocks_to_upload.len() - ); - } - } - measure.stop(); - info!("{} to load {} blocks", measure, blocks_to_upload.len()); - }), - receiver, - ) - }; - - let mut failures = 0; - use futures::stream::StreamExt; - - let mut stream = - tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); - - while let Some(blocks) = stream.next().await { - let mut measure_upload = Measure::start("Upload"); - let mut num_blocks = blocks.len(); - info!("Preparing the next {} blocks for upload", num_blocks); - - let uploads = blocks.into_iter().filter_map(|(slot, block)| match block { - None => { - blockstore_slots_with_no_confirmed_block.insert(slot); - num_blocks -= 1; - None - } - Some(confirmed_block) => { - if confirmed_block - .transactions - .iter() - .any(|transaction| transaction.meta.is_none()) - { - if allow_missing_metadata { - info!("Transaction metadata missing from slot {}", slot); - } else { - panic!("Transaction metadata missing from slot {}", slot); - } - } - Some(bigtable.upload_confirmed_block(slot, confirmed_block)) - } - }); - - for result in futures::future::join_all(uploads).await { - if result.is_err() { - error!("upload_confirmed_block() failed: {:?}", result.err()); - failures += 1; - } - } - - measure_upload.stop(); - info!("{} for {} blocks", measure_upload, num_blocks); - } - - measure.stop(); - info!("{}", measure); - if failures > 0 { - Err(format!("Incomplete upload, {} operations failed", failures).into()) - } else { - Ok(()) - } + solana_ledger::bigtable_upload::upload_confirmed_blocks( + Arc::new(blockstore), + bigtable, + starting_slot, + ending_slot, + allow_missing_metadata, + ) + .await } async fn first_available_block() -> Result<(), Box> { diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 115c125aa96..8ff86465d95 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -21,6 +21,8 @@ sha2 = "0.8.2" flate2 = "1.0.14" zstd = "0.5.1" fs_extra = "1.1.0" +futures = "0.3.5" +futures-util = "0.3.5" itertools = "0.9.0" libc = "0.2.70" log = { version = "0.4.8" } @@ -44,6 +46,7 @@ solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "1.2.27" } solana-runtime = { path = "../runtime", version = "1.2.27" } solana-sdk = { path = "../sdk", version = "1.2.27" } solana-stake-program = { path = "../programs/stake", version = "1.2.27" } +solana-storage-bigtable = { path = "../storage-bigtable", version = "1.2.27" } solana-vote-program = { path = "../programs/vote", version = "1.2.27" } symlink = "0.1.0" tar = "0.4.28" diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs new file mode 100644 index 00000000000..fe2356ca490 --- /dev/null +++ b/ledger/src/bigtable_upload.rs @@ -0,0 +1,206 @@ +use crate::blockstore::Blockstore; +use log::*; +use solana_measure::measure::Measure; +use solana_sdk::clock::Slot; +use std::{collections::HashSet, result::Result, sync::Arc, time::Duration}; +use tokio::time::delay_for; + +// Attempt to upload this many blocks in parallel +const NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL: usize = 32; + +// Read up to this many blocks from blockstore before blocking on the upload process +const BLOCK_READ_AHEAD_DEPTH: usize = NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL * 2; + +pub async fn upload_confirmed_blocks( + blockstore: Arc, + bigtable: solana_storage_bigtable::LedgerStorage, + starting_slot: Slot, + ending_slot: Option, + allow_missing_metadata: bool, +) -> Result<(), Box> { + let mut measure = Measure::start("entire upload"); + + info!("Loading ledger slots..."); + let blockstore_slots: Vec<_> = blockstore + .slot_meta_iterator(starting_slot) + .map_err(|err| { + format!( + "Failed to load entries starting from slot {}: {:?}", + starting_slot, err + ) + })? + .filter_map(|(slot, _slot_meta)| { + if let Some(ending_slot) = &ending_slot { + if slot > *ending_slot { + return None; + } + } + Some(slot) + }) + .collect(); + + if blockstore_slots.is_empty() { + info!("Ledger has no slots in the specified range"); + return Ok(()); + } + + info!( + "Found {} slots in the range ({}, {})", + blockstore_slots.len(), + blockstore_slots.first().unwrap(), + blockstore_slots.last().unwrap() + ); + + let mut blockstore_slots_with_no_confirmed_block = HashSet::new(); + + // Gather the blocks that are already present in bigtable, by slot + let bigtable_slots = { + let mut bigtable_slots = vec![]; + let first_blockstore_slot = *blockstore_slots.first().unwrap(); + let last_blockstore_slot = *blockstore_slots.last().unwrap(); + info!( + "Loading list of bigtable blocks between slots {} and {}...", + first_blockstore_slot, last_blockstore_slot + ); + + let mut start_slot = *blockstore_slots.first().unwrap(); + while start_slot <= last_blockstore_slot { + let mut next_bigtable_slots = loop { + match bigtable.get_confirmed_blocks(start_slot, 1000).await { + Ok(slots) => break slots, + Err(err) => { + error!("get_confirmed_blocks for {} failed: {:?}", start_slot, err); + // Consider exponential backoff... + delay_for(Duration::from_secs(2)).await; + } + } + }; + if next_bigtable_slots.is_empty() { + break; + } + bigtable_slots.append(&mut next_bigtable_slots); + start_slot = bigtable_slots.last().unwrap() + 1; + } + bigtable_slots + .into_iter() + .filter(|slot| *slot <= last_blockstore_slot) + .collect::>() + }; + + // The blocks that still need to be uploaded is the difference between what's already in the + // bigtable and what's in blockstore... + let blocks_to_upload = { + let blockstore_slots = blockstore_slots.iter().cloned().collect::>(); + let bigtable_slots = bigtable_slots.into_iter().collect::>(); + + let mut blocks_to_upload = blockstore_slots + .difference(&blockstore_slots_with_no_confirmed_block) + .cloned() + .collect::>() + .difference(&bigtable_slots) + .cloned() + .collect::>(); + blocks_to_upload.sort(); + blocks_to_upload + }; + + if blocks_to_upload.is_empty() { + info!("No blocks need to be uploaded to bigtable"); + return Ok(()); + } + info!( + "{} blocks to be uploaded to the bucket in the range ({}, {})", + blocks_to_upload.len(), + blocks_to_upload.first().unwrap(), + blocks_to_upload.last().unwrap() + ); + + // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading + let (_loader_thread, receiver) = { + let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH); + ( + std::thread::spawn(move || { + let mut measure = Measure::start("block loader thread"); + for (i, slot) in blocks_to_upload.iter().enumerate() { + let _ = match blockstore.get_confirmed_block( + *slot, + Some(solana_transaction_status::UiTransactionEncoding::Base64), + ) { + Ok(confirmed_block) => sender.send((*slot, Some(confirmed_block))), + Err(err) => { + warn!( + "Failed to get load confirmed block from slot {}: {:?}", + slot, err + ); + sender.send((*slot, None)) + } + }; + + if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { + info!( + "{}% of blocks processed ({}/{})", + i * 100 / blocks_to_upload.len(), + i, + blocks_to_upload.len() + ); + } + } + measure.stop(); + info!("{} to load {} blocks", measure, blocks_to_upload.len()); + }), + receiver, + ) + }; + + let mut failures = 0; + use futures::stream::StreamExt; + + let mut stream = + tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); + + while let Some(blocks) = stream.next().await { + let mut measure_upload = Measure::start("Upload"); + let mut num_blocks = blocks.len(); + info!("Preparing the next {} blocks for upload", num_blocks); + + let uploads = blocks.into_iter().filter_map(|(slot, block)| match block { + None => { + blockstore_slots_with_no_confirmed_block.insert(slot); + num_blocks -= 1; + None + } + Some(confirmed_block) => { + if confirmed_block + .transactions + .iter() + .any(|transaction| transaction.meta.is_none()) + { + if allow_missing_metadata { + info!("Transaction metadata missing from slot {}", slot); + } else { + panic!("Transaction metadata missing from slot {}", slot); + } + } + Some(bigtable.upload_confirmed_block(slot, confirmed_block)) + } + }); + + for result in futures::future::join_all(uploads).await { + if result.is_err() { + error!("upload_confirmed_block() failed: {:?}", result.err()); + failures += 1; + } + } + + measure_upload.stop(); + info!("{} for {} blocks", measure_upload, num_blocks); + } + + measure.stop(); + info!("{}", measure); + if failures > 0 { + Err(format!("Incomplete upload, {} operations failed", failures).into()) + } else { + Ok(()) + } +} diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index 7f40488d8d7..48cfb32b6b9 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -1,5 +1,6 @@ pub mod bank_forks; pub mod bank_forks_utils; +pub mod bigtable_upload; pub mod block_error; #[macro_use] pub mod blockstore; From 8b0f5c7f78c86156d7125fae86902800349ae764 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 3 Sep 2020 19:39:05 -0700 Subject: [PATCH 3/9] Add BigTableUploadService --- core/src/bigtable_upload_service.rs | 83 +++++++++++++++++++++++++++++ core/src/lib.rs | 1 + core/src/rpc_service.rs | 32 ++++++++--- ledger/Cargo.toml | 1 + ledger/src/bigtable_upload.rs | 11 ++-- 5 files changed, 116 insertions(+), 12 deletions(-) create mode 100644 core/src/bigtable_upload_service.rs diff --git a/core/src/bigtable_upload_service.rs b/core/src/bigtable_upload_service.rs new file mode 100644 index 00000000000..1d423b14869 --- /dev/null +++ b/core/src/bigtable_upload_service.rs @@ -0,0 +1,83 @@ +use solana_ledger::blockstore::Blockstore; +use solana_runtime::commitment::BlockCommitmentCache; +use std::{ + sync::atomic::{AtomicBool, Ordering}, + sync::{Arc, RwLock}, + thread::{self, Builder, JoinHandle}, +}; +use tokio::runtime; + +pub struct BigTableUploadService { + thread: JoinHandle<()>, +} + +impl BigTableUploadService { + pub fn new( + runtime_handle: runtime::Handle, + bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, + blockstore: Arc, + block_commitment_cache: Arc>, + exit: Arc, + ) -> Self { + info!("Starting BigTable upload service"); + let thread = Builder::new() + .name("bigtable-upload".to_string()) + .spawn(move || { + Self::run( + runtime_handle, + bigtable_ledger_storage, + blockstore, + block_commitment_cache, + exit, + ) + }) + .unwrap(); + + Self { thread } + } + + fn run( + runtime: runtime::Handle, + bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage, + blockstore: Arc, + block_commitment_cache: Arc>, + exit: Arc, + ) { + let mut starting_slot = 0; + loop { + if exit.load(Ordering::Relaxed) { + break; + } + + let max_confirmed_root = block_commitment_cache + .read() + .unwrap() + .highest_confirmed_root(); + + if max_confirmed_root == starting_slot { + std::thread::sleep(std::time::Duration::from_secs(1)); + continue; + } + + let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks( + blockstore.clone(), + bigtable_ledger_storage.clone(), + starting_slot, + Some(max_confirmed_root), + true, + )); + + match result { + Ok(()) => starting_slot = max_confirmed_root, + Err(err) => { + warn!("bigtable: upload_confirmed_blocks: {}", err); + std::thread::sleep(std::time::Duration::from_secs(2)); + } + } + } + } + + pub fn join(self) -> thread::Result<()> { + self.thread.join() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index a7dde58607c..f02ed951db2 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -8,6 +8,7 @@ pub mod accounts_background_service; pub mod accounts_hash_verifier; pub mod banking_stage; +pub mod bigtable_upload_service; pub mod broadcast_stage; pub mod cluster_info_vote_listener; pub mod commitment; diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index c3fa2707932..351c399f694 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -1,9 +1,10 @@ //! The `rpc_service` module implements the Solana JSON RPC service. use crate::{ - cluster_info::ClusterInfo, commitment::BlockCommitmentCache, poh_recorder::PohRecorder, rpc::*, - rpc_health::*, send_transaction_service::LeaderInfo, - send_transaction_service::SendTransactionService, validator::ValidatorExit, + bigtable_upload_service::BigTableUploadService, cluster_info::ClusterInfo, + commitment::BlockCommitmentCache, poh_recorder::PohRecorder, rpc::*, rpc_health::*, + send_transaction_service::LeaderInfo, send_transaction_service::SendTransactionService, + validator::ValidatorExit, }; use jsonrpc_core::MetaIoHandler; use jsonrpc_http_server::{ @@ -272,22 +273,36 @@ impl JsonRpcService { .build() .expect("Runtime"); - let bigtable_ledger_storage = + let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false)); + + let (bigtable_ledger_storage, _bigtable_ledger_upload_service) = if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload { runtime .block_on(solana_storage_bigtable::LedgerStorage::new( config.enable_bigtable_ledger_upload, )) - .map(|x| { + .map(|bigtable_ledger_storage| { info!("BigTable ledger storage initialized"); - Some(x) + + let bigtable_ledger_upload_service = Arc::new(BigTableUploadService::new( + runtime.handle().clone(), + bigtable_ledger_storage.clone(), + blockstore.clone(), + block_commitment_cache.clone(), + exit_bigtable_ledger_upload_service.clone(), + )); + + ( + Some(bigtable_ledger_storage), + Some(bigtable_ledger_upload_service), + ) }) .unwrap_or_else(|err| { error!("Failed to initialize BigTable ledger storage: {:?}", err); - None + (None, None) }) } else { - None + (None, None) }; let request_processor = JsonRpcRequestProcessor::new( @@ -349,6 +364,7 @@ impl JsonRpcService { close_handle_sender.send(server.close_handle()).unwrap(); server.wait(); exit_send_transaction_service.store(true, Ordering::Relaxed); + exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed); }) .unwrap(); diff --git a/ledger/Cargo.toml b/ledger/Cargo.toml index 8ff86465d95..f8b687c473f 100644 --- a/ledger/Cargo.toml +++ b/ledger/Cargo.toml @@ -53,6 +53,7 @@ tar = "0.4.28" thiserror = "1.0" tempfile = "3.1.0" lazy_static = "1.4.0" +tokio = { version = "0.2.22", features = ["full"] } trees = "0.2.1" [dependencies.rocksdb] diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index fe2356ca490..fe347a1e98f 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -20,7 +20,7 @@ pub async fn upload_confirmed_blocks( ) -> Result<(), Box> { let mut measure = Measure::start("entire upload"); - info!("Loading ledger slots..."); + info!("Loading ledger slots starting at {}...", starting_slot); let blockstore_slots: Vec<_> = blockstore .slot_meta_iterator(starting_slot) .map_err(|err| { @@ -40,8 +40,11 @@ pub async fn upload_confirmed_blocks( .collect(); if blockstore_slots.is_empty() { - info!("Ledger has no slots in the specified range"); - return Ok(()); + return Err(format!( + "Ledger has no slots from {} to {:?}", + starting_slot, ending_slot + ) + .into()); } info!( @@ -136,7 +139,7 @@ pub async fn upload_confirmed_blocks( } }; - if i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { + if i > 0 && i % NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL == 0 { info!( "{}% of blocks processed ({}/{})", i * 100 / blocks_to_upload.len(), From 652f6f4b2443ce2b5f4c04858734722eb98885f2 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 4 Sep 2020 10:01:00 -0700 Subject: [PATCH 4/9] Add exit flag for bigtable upload operations --- core/src/bigtable_upload_service.rs | 1 + ledger-tool/src/bigtable.rs | 8 +++++++- ledger/src/bigtable_upload.rs | 21 ++++++++++++++++++++- 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/core/src/bigtable_upload_service.rs b/core/src/bigtable_upload_service.rs index 1d423b14869..b8a445504b5 100644 --- a/core/src/bigtable_upload_service.rs +++ b/core/src/bigtable_upload_service.rs @@ -65,6 +65,7 @@ impl BigTableUploadService { starting_slot, Some(max_confirmed_root), true, + exit.clone(), )); match result { diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 725789922eb..eff4e2b65b8 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -8,7 +8,12 @@ use solana_cli::display::println_transaction; use solana_ledger::{blockstore::Blockstore, blockstore_db::AccessType}; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}; use solana_transaction_status::UiTransactionEncoding; -use std::{path::Path, process::exit, result::Result, sync::Arc}; +use std::{ + path::Path, + process::exit, + result::Result, + sync::{atomic::AtomicBool, Arc}, +}; async fn upload( blockstore: Blockstore, @@ -26,6 +31,7 @@ async fn upload( starting_slot, ending_slot, allow_missing_metadata, + Arc::new(AtomicBool::new(false)), ) .await } diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index fe347a1e98f..4d42664fc72 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -2,7 +2,15 @@ use crate::blockstore::Blockstore; use log::*; use solana_measure::measure::Measure; use solana_sdk::clock::Slot; -use std::{collections::HashSet, result::Result, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + result::Result, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; use tokio::time::delay_for; // Attempt to upload this many blocks in parallel @@ -17,6 +25,7 @@ pub async fn upload_confirmed_blocks( starting_slot: Slot, ending_slot: Option, allow_missing_metadata: bool, + exit: Arc, ) -> Result<(), Box> { let mut measure = Measure::start("entire upload"); @@ -120,11 +129,17 @@ pub async fn upload_confirmed_blocks( // Load the blocks out of blockstore in a separate thread to allow for concurrent block uploading let (_loader_thread, receiver) = { + let exit = exit.clone(); + let (sender, receiver) = std::sync::mpsc::sync_channel(BLOCK_READ_AHEAD_DEPTH); ( std::thread::spawn(move || { let mut measure = Measure::start("block loader thread"); for (i, slot) in blocks_to_upload.iter().enumerate() { + if exit.load(Ordering::Relaxed) { + break; + } + let _ = match blockstore.get_confirmed_block( *slot, Some(solana_transaction_status::UiTransactionEncoding::Base64), @@ -162,6 +177,10 @@ pub async fn upload_confirmed_blocks( tokio::stream::iter(receiver.into_iter()).chunks(NUM_BLOCKS_TO_UPLOAD_IN_PARALLEL); while let Some(blocks) = stream.next().await { + if exit.load(Ordering::Relaxed) { + break; + } + let mut measure_upload = Measure::start("Upload"); let mut num_blocks = blocks.len(); info!("Preparing the next {} blocks for upload", num_blocks); From d08fb6e97c314061f81b47f7206414e7544f9a6d Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 4 Sep 2020 11:19:49 -0700 Subject: [PATCH 5/9] Cargo.lock --- Cargo.lock | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index abd2ebde01b..9bcae7b201b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3871,6 +3871,8 @@ dependencies = [ "ed25519-dalek", "flate2", "fs_extra", + "futures 0.3.5", + "futures-util", "itertools 0.9.0", "lazy_static", "libc", @@ -3897,12 +3899,14 @@ dependencies = [ "solana-runtime", "solana-sdk 1.2.27", "solana-stake-program", + "solana-storage-bigtable", "solana-transaction-status", "solana-vote-program", "symlink", "tar", "tempfile", "thiserror", + "tokio 0.2.22", "trees", "zstd", ] From 2f116e62fd8f86061c53592c481fabf03883f393 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 4 Sep 2020 18:49:48 +0000 Subject: [PATCH 6/9] Adapt to v1.2 --- core/src/bigtable_upload_service.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/bigtable_upload_service.rs b/core/src/bigtable_upload_service.rs index b8a445504b5..74709ab5088 100644 --- a/core/src/bigtable_upload_service.rs +++ b/core/src/bigtable_upload_service.rs @@ -1,5 +1,5 @@ +use crate::commitment::BlockCommitmentCache; use solana_ledger::blockstore::Blockstore; -use solana_runtime::commitment::BlockCommitmentCache; use std::{ sync::atomic::{AtomicBool, Ordering}, sync::{Arc, RwLock}, @@ -49,12 +49,12 @@ impl BigTableUploadService { break; } - let max_confirmed_root = block_commitment_cache + let largest_confirmed_root = block_commitment_cache .read() .unwrap() - .highest_confirmed_root(); + .largest_confirmed_root(); - if max_confirmed_root == starting_slot { + if largest_confirmed_root == starting_slot { std::thread::sleep(std::time::Duration::from_secs(1)); continue; } @@ -63,13 +63,13 @@ impl BigTableUploadService { blockstore.clone(), bigtable_ledger_storage.clone(), starting_slot, - Some(max_confirmed_root), + Some(largest_confirmed_root), true, exit.clone(), )); match result { - Ok(()) => starting_slot = max_confirmed_root, + Ok(()) => starting_slot = largest_confirmed_root, Err(err) => { warn!("bigtable: upload_confirmed_blocks: {}", err); std::thread::sleep(std::time::Duration::from_secs(2)); From 4d4649a06ffdd7d81d88e2b875a118ffe8695f9f Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 4 Sep 2020 13:09:33 -0700 Subject: [PATCH 7/9] Remove dead code --- ledger/src/bigtable_upload.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ledger/src/bigtable_upload.rs b/ledger/src/bigtable_upload.rs index 4d42664fc72..9d4120ef59b 100644 --- a/ledger/src/bigtable_upload.rs +++ b/ledger/src/bigtable_upload.rs @@ -63,8 +63,6 @@ pub async fn upload_confirmed_blocks( blockstore_slots.last().unwrap() ); - let mut blockstore_slots_with_no_confirmed_block = HashSet::new(); - // Gather the blocks that are already present in bigtable, by slot let bigtable_slots = { let mut bigtable_slots = vec![]; @@ -106,9 +104,6 @@ pub async fn upload_confirmed_blocks( let bigtable_slots = bigtable_slots.into_iter().collect::>(); let mut blocks_to_upload = blockstore_slots - .difference(&blockstore_slots_with_no_confirmed_block) - .cloned() - .collect::>() .difference(&bigtable_slots) .cloned() .collect::>(); @@ -187,7 +182,6 @@ pub async fn upload_confirmed_blocks( let uploads = blocks.into_iter().filter_map(|(slot, block)| match block { None => { - blockstore_slots_with_no_confirmed_block.insert(slot); num_blocks -= 1; None } From 7bb8996e2d9eb58128d1f64d966ea8c924765d51 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 4 Sep 2020 21:43:11 +0000 Subject: [PATCH 8/9] Request correct access --- core/src/rpc_service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 351c399f694..b189712a284 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -279,7 +279,7 @@ impl JsonRpcService { if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload { runtime .block_on(solana_storage_bigtable::LedgerStorage::new( - config.enable_bigtable_ledger_upload, + !config.enable_bigtable_ledger_upload, )) .map(|bigtable_ledger_storage| { info!("BigTable ledger storage initialized"); From 73f98b6f706e309650acf8f87f047eba13b7b891 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 4 Sep 2020 14:54:38 -0700 Subject: [PATCH 9/9] Add LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY --- core/src/bigtable_upload_service.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/core/src/bigtable_upload_service.rs b/core/src/bigtable_upload_service.rs index 74709ab5088..9184226a358 100644 --- a/core/src/bigtable_upload_service.rs +++ b/core/src/bigtable_upload_service.rs @@ -7,6 +7,14 @@ use std::{ }; use tokio::runtime; +// Delay uploading the largest confirmed root for this many slots. This is done in an attempt to +// ensure that the `CacheBlockTimeService` has had enough time to add the block time for the root +// before it's uploaded to BigTable. +// +// A more direct connection between CacheBlockTimeService and BigTableUploadService would be +// preferable... +const LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY: usize = 100; + pub struct BigTableUploadService { thread: JoinHandle<()>, } @@ -43,18 +51,19 @@ impl BigTableUploadService { block_commitment_cache: Arc>, exit: Arc, ) { - let mut starting_slot = 0; + let mut start_slot = 0; loop { if exit.load(Ordering::Relaxed) { break; } - let largest_confirmed_root = block_commitment_cache + let end_slot = block_commitment_cache .read() .unwrap() - .largest_confirmed_root(); + .largest_confirmed_root() + .saturating_sub(LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY as u64); - if largest_confirmed_root == starting_slot { + if end_slot <= start_slot { std::thread::sleep(std::time::Duration::from_secs(1)); continue; } @@ -62,14 +71,14 @@ impl BigTableUploadService { let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks( blockstore.clone(), bigtable_ledger_storage.clone(), - starting_slot, - Some(largest_confirmed_root), + start_slot, + Some(end_slot), true, exit.clone(), )); match result { - Ok(()) => starting_slot = largest_confirmed_root, + Ok(()) => start_slot = end_slot, Err(err) => { warn!("bigtable: upload_confirmed_blocks: {}", err); std::thread::sleep(std::time::Duration::from_secs(2));