Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions core/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::cluster_info::ClusterInfo;
use crate::contact_info::ContactInfo;
use crate::packet::PACKET_DATA_SIZE;
use crate::storage_stage::StorageState;
use crate::validator::ValidatorExit;
use crate::version::VERSION;
use bincode::{deserialize, serialize};
use jsonrpc_core::{Error, Metadata, Result};
Expand All @@ -18,7 +19,6 @@ use solana_sdk::signature::Signature;
use solana_sdk::transaction::{self, Transaction};
use solana_vote_api::vote_state::{VoteState, MAX_LOCKOUT_HISTORY};
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::{Duration, Instant};
Expand All @@ -43,7 +43,7 @@ pub struct JsonRpcRequestProcessor {
bank_forks: Arc<RwLock<BankForks>>,
storage_state: StorageState,
config: JsonRpcConfig,
fullnode_exit: Arc<AtomicBool>,
validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
}

impl JsonRpcRequestProcessor {
Expand All @@ -55,13 +55,13 @@ impl JsonRpcRequestProcessor {
storage_state: StorageState,
config: JsonRpcConfig,
bank_forks: Arc<RwLock<BankForks>>,
fullnode_exit: &Arc<AtomicBool>,
validator_exit: &Arc<RwLock<Option<ValidatorExit>>>,
) -> Self {
JsonRpcRequestProcessor {
bank_forks,
storage_state,
config,
fullnode_exit: fullnode_exit.clone(),
validator_exit: validator_exit.clone(),
}
}

Expand Down Expand Up @@ -185,7 +185,9 @@ impl JsonRpcRequestProcessor {
pub fn fullnode_exit(&self) -> Result<bool> {
if self.config.enable_fullnode_exit {
warn!("fullnode_exit request...");
self.fullnode_exit.store(true, Ordering::Relaxed);
if let Some(x) = self.validator_exit.write().unwrap().take() {
x.exit()
}
Ok(true)
} else {
debug!("fullnode_exit ignored");
Expand Down Expand Up @@ -660,7 +662,7 @@ impl RpcSol for RpcSolImpl {
}

#[cfg(test)]
mod tests {
pub mod tests {
use super::*;
use crate::contact_info::ContactInfo;
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
Expand All @@ -671,6 +673,7 @@ mod tests {
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
use solana_sdk::transaction::TransactionError;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;

const TEST_MINT_LAMPORTS: u64 = 10_000;
Expand All @@ -682,6 +685,7 @@ mod tests {
let bank = bank_forks.read().unwrap().working_bank();
let leader_pubkey = *bank.collector_id();
let exit = Arc::new(AtomicBool::new(false));
let validator_exit = create_validator_exit(&exit);

let blockhash = bank.confirmed_last_blockhash().0;
let tx = system_transaction::transfer(&alice, pubkey, 20, blockhash);
Expand All @@ -694,7 +698,7 @@ mod tests {
StorageState::default(),
JsonRpcConfig::default(),
bank_forks,
&exit,
&validator_exit,
)));
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
ContactInfo::default(),
Expand Down Expand Up @@ -722,13 +726,14 @@ mod tests {
fn test_rpc_request_processor_new() {
let bob_pubkey = Pubkey::new_rand();
let exit = Arc::new(AtomicBool::new(false));
let validator_exit = create_validator_exit(&exit);
let (bank_forks, alice) = new_bank_forks();
let bank = bank_forks.read().unwrap().working_bank();
let request_processor = JsonRpcRequestProcessor::new(
StorageState::default(),
JsonRpcConfig::default(),
bank_forks,
&exit,
&validator_exit,
);
thread::spawn(move || {
let blockhash = bank.confirmed_last_blockhash().0;
Expand Down Expand Up @@ -1037,6 +1042,7 @@ mod tests {
#[test]
fn test_rpc_send_bad_tx() {
let exit = Arc::new(AtomicBool::new(false));
let validator_exit = create_validator_exit(&exit);

let mut io = MetaIoHandler::default();
let rpc = RpcSolImpl;
Expand All @@ -1047,7 +1053,7 @@ mod tests {
StorageState::default(),
JsonRpcConfig::default(),
new_bank_forks().0,
&exit,
&validator_exit,
);
Arc::new(RwLock::new(request_processor))
},
Expand Down Expand Up @@ -1117,14 +1123,22 @@ mod tests {
)
}

pub fn create_validator_exit(exit: &Arc<AtomicBool>) -> Arc<RwLock<Option<ValidatorExit>>> {
let mut validator_exit = ValidatorExit::default();
let exit_ = exit.clone();
validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed)));
Arc::new(RwLock::new(Some(validator_exit)))
}

#[test]
fn test_rpc_request_processor_config_default_trait_fullnode_exit_fails() {
let exit = Arc::new(AtomicBool::new(false));
let validator_exit = create_validator_exit(&exit);
let request_processor = JsonRpcRequestProcessor::new(
StorageState::default(),
JsonRpcConfig::default(),
new_bank_forks().0,
&exit,
&validator_exit,
);
assert_eq!(request_processor.fullnode_exit(), Ok(false));
assert_eq!(exit.load(Ordering::Relaxed), false);
Expand All @@ -1133,13 +1147,14 @@ mod tests {
#[test]
fn test_rpc_request_processor_allow_fullnode_exit_config() {
let exit = Arc::new(AtomicBool::new(false));
let validator_exit = create_validator_exit(&exit);
let mut config = JsonRpcConfig::default();
config.enable_fullnode_exit = true;
let request_processor = JsonRpcRequestProcessor::new(
StorageState::default(),
config,
new_bank_forks().0,
&exit,
&validator_exit,
);
assert_eq!(request_processor.fullnode_exit(), Ok(true));
assert_eq!(exit.load(Ordering::Relaxed), true);
Expand Down
46 changes: 33 additions & 13 deletions core/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ use crate::cluster_info::ClusterInfo;
use crate::rpc::*;
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::validator::ValidatorExit;
use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::CloseHandle;
use jsonrpc_http_server::{
hyper, AccessControlAllowOrigin, DomainsValidation, RequestMiddleware, RequestMiddlewareAction,
ServerBuilder,
};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
use std::thread::{self, Builder, JoinHandle};
use tokio::prelude::Future;

pub struct JsonRpcService {
thread_hdl: JoinHandle<()>,

#[cfg(test)]
pub request_processor: Arc<RwLock<JsonRpcRequestProcessor>>, // Used only by test_rpc_new()...

close_handle: Option<CloseHandle>,
}

#[derive(Default)]
Expand Down Expand Up @@ -88,22 +91,22 @@ impl JsonRpcService {
config: JsonRpcConfig,
bank_forks: Arc<RwLock<BankForks>>,
ledger_path: &Path,
exit: &Arc<AtomicBool>,
validator_exit: &Arc<RwLock<Option<ValidatorExit>>>,
) -> Self {
info!("rpc bound to {:?}", rpc_addr);
info!("rpc configuration: {:?}", config);
let request_processor = Arc::new(RwLock::new(JsonRpcRequestProcessor::new(
storage_state,
config,
bank_forks,
exit,
validator_exit,
)));
let request_processor_ = request_processor.clone();

let cluster_info = cluster_info.clone();
let exit_ = exit.clone();
let ledger_path = ledger_path.to_path_buf();

let (close_handle_sender, close_handle_receiver) = channel();
let thread_hdl = Builder::new()
.name("solana-jsonrpc".to_string())
.spawn(move || {
Expand All @@ -126,16 +129,30 @@ impl JsonRpcService {
return;
}

while !exit_.load(Ordering::Relaxed) {
sleep(Duration::from_millis(100));
}
server.unwrap().close();
let server = server.unwrap();
close_handle_sender.send(server.close_handle()).unwrap();
server.wait();
})
.unwrap();

let close_handle = close_handle_receiver.recv().unwrap();
let close_handle_ = close_handle.clone();
let mut validator_exit_write = validator_exit.write().unwrap();
validator_exit_write
.as_mut()
.unwrap()
.register_exit(Box::new(move || close_handle_.close()));
Self {
thread_hdl,
#[cfg(test)]
request_processor,
close_handle: Some(close_handle),
}
}

pub fn exit(&mut self) {
if let Some(c) = self.close_handle.take() {
c.close()
}
}
}
Expand All @@ -153,9 +170,11 @@ mod tests {
use super::*;
use crate::contact_info::ContactInfo;
use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use crate::rpc::tests::create_validator_exit;
use solana_runtime::bank::Bank;
use solana_sdk::signature::KeypairUtil;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::AtomicBool;

#[test]
fn test_rpc_new() {
Expand All @@ -165,6 +184,7 @@ mod tests {
..
} = create_genesis_block(10_000);
let exit = Arc::new(AtomicBool::new(false));
let validator_exit = create_validator_exit(&exit);
let bank = Bank::new(&genesis_block);
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
ContactInfo::default(),
Expand All @@ -174,14 +194,14 @@ mod tests {
solana_netutil::find_available_port_in_range((10000, 65535)).unwrap(),
);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank.slot(), bank)));
let rpc_service = JsonRpcService::new(
let mut rpc_service = JsonRpcService::new(
&cluster_info,
rpc_addr,
StorageState::default(),
JsonRpcConfig::default(),
bank_forks,
&PathBuf::from("farf"),
&exit,
&validator_exit,
);
let thread = rpc_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-jsonrpc");
Expand All @@ -194,7 +214,7 @@ mod tests {
.unwrap()
.get_balance(&mint_keypair.pubkey())
);
exit.store(true, Ordering::Relaxed);
rpc_service.exit();
rpc_service.join().unwrap();
}
}
40 changes: 32 additions & 8 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,26 @@ impl Default for ValidatorConfig {
}
}

#[derive(Default)]
pub struct ValidatorExit {
exits: Vec<Box<FnOnce() + Send + Sync>>,
}

impl ValidatorExit {
pub fn register_exit(&mut self, exit: Box<FnOnce() -> () + Send + Sync>) {
self.exits.push(exit);
}

pub fn exit(self) {
for exit in self.exits {
exit();
}
}
}

pub struct Validator {
pub id: Pubkey,
exit: Arc<AtomicBool>,
validator_exit: Arc<RwLock<Option<ValidatorExit>>>,
rpc_service: Option<JsonRpcService>,
rpc_pubsub_service: Option<PubSubService>,
gossip_service: GossipService,
Expand Down Expand Up @@ -140,6 +157,11 @@ impl Validator {
let bank = bank_forks[bank_info.bank_slot].clone();
let bank_forks = Arc::new(RwLock::new(bank_forks));

let mut validator_exit = ValidatorExit::default();
let exit_ = exit.clone();
validator_exit.register_exit(Box::new(move || exit_.store(true, Ordering::Relaxed)));
let validator_exit = Arc::new(RwLock::new(Some(validator_exit)));

node.info.wallclock = timestamp();
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new(
node.info.clone(),
Expand All @@ -162,7 +184,7 @@ impl Validator {
config.rpc_config.clone(),
bank_forks.clone(),
ledger_path,
&exit,
&validator_exit,
))
};

Expand Down Expand Up @@ -312,19 +334,21 @@ impl Validator {
rpc_pubsub_service,
tpu,
tvu,
exit,
poh_service,
poh_recorder,
ip_echo_server,
validator_exit,
}
}

// Used for notifying many nodes in parallel to exit
pub fn exit(&self) {
self.exit.store(true, Ordering::Relaxed);
pub fn exit(&mut self) {
if let Some(x) = self.validator_exit.write().unwrap().take() {
x.exit()
}
}

pub fn close(self) -> Result<()> {
pub fn close(mut self) -> Result<()> {
self.exit();
self.join()
}
Expand Down Expand Up @@ -543,7 +567,7 @@ mod tests {
let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());

let mut ledger_paths = vec![];
let validators: Vec<Validator> = (0..2)
let mut validators: Vec<Validator> = (0..2)
.map(|_| {
let validator_keypair = Keypair::new();
let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
Expand All @@ -569,7 +593,7 @@ mod tests {
.collect();

// Each validator can exit in parallel to speed many sequential calls to `join`
validators.iter().for_each(|v| v.exit());
validators.iter_mut().for_each(|v| v.exit());
// While join is called sequentially, the above exit call notified all the
// validators to exit from all their threads
validators.into_iter().for_each(|validator| {
Expand Down
Loading