Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

93 changes: 93 additions & 0 deletions core/src/bigtable_upload_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use solana_ledger::blockstore::Blockstore;
use solana_runtime::commitment::BlockCommitmentCache;
use std::{
sync::atomic::{AtomicBool, Ordering},
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
};
use tokio::runtime;

// Delay uploading the largest confirmed root for this many slots. This is done in an attempt to
// ensure that the `CacheBlockTimeService` has had enough time to add the block time for the root
// before it's uploaded to BigTable.
//
// A more direct connection between CacheBlockTimeService and BigTableUploadService would be
// preferable...
const LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY: usize = 100;

pub struct BigTableUploadService {
thread: JoinHandle<()>,
}

impl BigTableUploadService {
pub fn new(
runtime_handle: runtime::Handle,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) -> Self {
info!("Starting BigTable upload service");
let thread = Builder::new()
.name("bigtable-upload".to_string())
.spawn(move || {
Self::run(
runtime_handle,
bigtable_ledger_storage,
blockstore,
block_commitment_cache,
exit,
)
})
.unwrap();

Self { thread }
}

fn run(
runtime: runtime::Handle,
bigtable_ledger_storage: solana_storage_bigtable::LedgerStorage,
blockstore: Arc<Blockstore>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
exit: Arc<AtomicBool>,
) {
let mut start_slot = 0;
loop {
if exit.load(Ordering::Relaxed) {
break;
}

let end_slot = block_commitment_cache
.read()
.unwrap()
.highest_confirmed_root()
.saturating_sub(LARGEST_CONFIRMED_ROOT_UPLOAD_DELAY as u64);

if end_slot <= start_slot {
std::thread::sleep(std::time::Duration::from_secs(1));
continue;
}

let result = runtime.block_on(solana_ledger::bigtable_upload::upload_confirmed_blocks(
blockstore.clone(),
bigtable_ledger_storage.clone(),
start_slot,
Some(end_slot),
true,
exit.clone(),
));

match result {
Ok(()) => start_slot = end_slot,
Err(err) => {
warn!("bigtable: upload_confirmed_blocks: {}", err);
std::thread::sleep(std::time::Duration::from_secs(2));
}
}
}
}

pub fn join(self) -> thread::Result<()> {
self.thread.join()
}
}
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
pub mod accounts_background_service;
pub mod accounts_hash_verifier;
pub mod banking_stage;
pub mod bigtable_upload_service;
pub mod broadcast_stage;
pub mod cluster_info_vote_listener;
pub mod commitment_service;
Expand Down
1 change: 1 addition & 0 deletions core/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub struct JsonRpcConfig {
pub faucet_addr: Option<SocketAddr>,
pub health_check_slot_distance: u64,
pub enable_bigtable_ledger_storage: bool,
pub enable_bigtable_ledger_upload: bool,
}

#[derive(Clone)]
Expand Down
51 changes: 36 additions & 15 deletions core/src/rpc_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! The `rpc_service` module implements the Solana JSON RPC service.

use crate::{cluster_info::ClusterInfo, rpc::*, rpc_health::*, validator::ValidatorExit};
use crate::{
bigtable_upload_service::BigTableUploadService, cluster_info::ClusterInfo, rpc::*,
rpc_health::*, validator::ValidatorExit,
};
use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::{
hyper, AccessControlAllowOrigin, CloseHandle, DomainsValidation, RequestMiddleware,
Expand Down Expand Up @@ -260,20 +263,37 @@ impl JsonRpcService {
.build()
.expect("Runtime");

let bigtable_ledger_storage = if config.enable_bigtable_ledger_storage {
runtime
.block_on(solana_storage_bigtable::LedgerStorage::new(false))
.map(|x| {
info!("BigTable ledger storage initialized");
Some(x)
})
.unwrap_or_else(|err| {
error!("Failed to initialize BigTable ledger storage: {:?}", err);
None
})
} else {
None
};
let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false));

let (bigtable_ledger_storage, _bigtable_ledger_upload_service) =
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's slightly awkward that BigTableUploadService is plumbed into RpcService like this. Moving it up a level into Validator would require moving the tokio Runtime and solana_storage_bigtable::LedgerStorage::new as well and I didn't like that code churn in this PR. That's my excuse at least

if config.enable_bigtable_ledger_storage || config.enable_bigtable_ledger_upload {
runtime
.block_on(solana_storage_bigtable::LedgerStorage::new(
!config.enable_bigtable_ledger_upload,
))
.map(|bigtable_ledger_storage| {
info!("BigTable ledger storage initialized");

let bigtable_ledger_upload_service = Arc::new(BigTableUploadService::new(
runtime.handle().clone(),
bigtable_ledger_storage.clone(),
blockstore.clone(),
block_commitment_cache.clone(),
exit_bigtable_ledger_upload_service.clone(),
));

(
Some(bigtable_ledger_storage),
Some(bigtable_ledger_upload_service),
)
})
.unwrap_or_else(|err| {
error!("Failed to initialize BigTable ledger storage: {:?}", err);
(None, None)
})
} else {
(None, None)
};

let (request_processor, receiver) = JsonRpcRequestProcessor::new(
config,
Expand Down Expand Up @@ -341,6 +361,7 @@ impl JsonRpcService {
close_handle_sender.send(server.close_handle()).unwrap();
server.wait();
exit_send_transaction_service.store(true, Ordering::Relaxed);
exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed);
})
.unwrap();

Expand Down
Loading