Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 4 additions & 41 deletions src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::counter::Counter;
use crate::genesis_block::GenesisBlock;
use crate::last_id_queue::{LastIdQueue, MAX_ENTRY_IDS};
use crate::poh_service::NUM_TICKS_PER_SECOND;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::status_cache::StatusCache;
use bincode::{deserialize, serialize};
use log::Level;
Expand Down Expand Up @@ -89,8 +88,6 @@ pub struct Bank {
/// FIFO queue of `last_id` items
last_id_queue: RwLock<LastIdQueue>,

subscriptions: RwLock<Option<Arc<RpcSubscriptions>>>,

parent: Option<Arc<Bank>>,

parent_hash: Hash,
Expand All @@ -102,7 +99,6 @@ impl Default for Bank {
accounts: Accounts::default(),
last_id_queue: RwLock::new(LastIdQueue::default()),
status_cache: RwLock::new(BankStatusCache::default()),
subscriptions: RwLock::new(None),
parent: None,
parent_hash: Hash::default(),
}
Expand Down Expand Up @@ -131,11 +127,6 @@ impl Bank {
self.parent.clone()
}

pub fn set_subscriptions(&self, subscriptions: Arc<RpcSubscriptions>) {
let mut sub = self.subscriptions.write().unwrap();
*sub = Some(subscriptions)
}

fn process_genesis_block(&self, genesis_block: &GenesisBlock) {
assert!(genesis_block.mint_id != Pubkey::default());
assert!(genesis_block.bootstrap_leader_id != Pubkey::default());
Expand Down Expand Up @@ -258,13 +249,6 @@ impl Bank {
self.status_cache.write().unwrap().clear();
}

fn update_subscriptions(&self, txs: &[Transaction], res: &[Result<()>]) {
for (i, tx) in txs.iter().enumerate() {
if let Some(ref subs) = *self.subscriptions.read().unwrap() {
subs.check_signature(&tx.signatures[0], &res[i]);
}
}
}
fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) {
let mut status_cache = self.status_cache.write().unwrap();
for (i, tx) in txs.iter().enumerate() {
Expand Down Expand Up @@ -491,9 +475,6 @@ impl Bank {
self.accounts
.store_accounts(true, txs, executed, loaded_accounts);

// Check account subscriptions and send notifications
self.send_account_notifications(txs, executed, loaded_accounts);

// once committed there is no way to unroll
let write_elapsed = now.elapsed();
debug!(
Expand All @@ -502,7 +483,6 @@ impl Bank {
txs.len(),
);
self.update_transaction_statuses(txs, &executed);
self.update_subscriptions(txs, &executed);
}

/// Process a batch of transactions.
Expand Down Expand Up @@ -577,6 +557,10 @@ impl Bank {
Accounts::load_slow(&accounts, pubkey)
}

pub fn get_account_modified_since_parent(&self, pubkey: &Pubkey) -> Option<Account> {
Accounts::load_slow(&[&self.accounts], pubkey)
}

pub fn transaction_count(&self) -> u64 {
self.accounts.transaction_count()
}
Expand Down Expand Up @@ -609,27 +593,6 @@ impl Bank {
extend_and_hash(&self.parent_hash, &serialize(&accounts_delta_hash).unwrap())
}

fn send_account_notifications(
&self,
txs: &[Transaction],
res: &[Result<()>],
loaded: &[Result<(InstructionAccounts, InstructionLoaders)>],
) {
for (i, raccs) in loaded.iter().enumerate() {
if res[i].is_err() || raccs.is_err() {
continue;
}

let tx = &txs[i];
let accs = raccs.as_ref().unwrap();
for (key, account) in tx.account_keys.iter().zip(accs.0.iter()) {
if let Some(ref subs) = *self.subscriptions.read().unwrap() {
subs.check_account(&key, account)
}
}
}
}

pub fn vote_states<F>(&self, cond: F) -> Vec<VoteState>
where
F: Fn(&VoteState) -> bool,
Expand Down
5 changes: 4 additions & 1 deletion src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::leader_scheduler::{LeaderScheduler, LeaderSchedulerConfig};
use crate::poh_service::PohServiceConfig;
use crate::rpc_pubsub_service::PubSubService;
use crate::rpc_service::JsonRpcService;
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::tpu::{Tpu, TpuRotationReceiver};
Expand Down Expand Up @@ -168,8 +169,9 @@ impl Fullnode {
storage_state.clone(),
);

let subscriptions = Arc::new(RpcSubscriptions::default());
let rpc_pubsub_service = PubSubService::new(
&bank,
&subscriptions,
SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
node.info.rpc_pubsub.port(),
Expand Down Expand Up @@ -264,6 +266,7 @@ impl Fullnode {
config.entry_stream.as_ref(),
ledger_signal_receiver,
leader_scheduler.clone(),
&subscriptions,
);
let tpu = Tpu::new(id, &cluster_info);

Expand Down
11 changes: 11 additions & 0 deletions src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
use crate::leader_scheduler::LeaderScheduler;
use crate::packet::BlobError;
use crate::result::{Error, Result};
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use crate::tvu::TvuRotationSender;
use crate::voting_keypair::VotingKeypair;
Expand Down Expand Up @@ -63,6 +64,7 @@ impl ReplayStage {
current_blob_index: &mut u64,
last_entry_id: &Arc<RwLock<Hash>>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
subscriptions: &Arc<RpcSubscriptions>,
) -> Result<()> {
// Coalesce all the available entries into a single vote
submit(
Expand Down Expand Up @@ -120,6 +122,7 @@ impl ReplayStage {
}

if 0 == num_ticks_to_next_vote {
subscriptions.notify_subscribers(&bank);
if let Some(voting_keypair) = voting_keypair {
let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote(
Expand Down Expand Up @@ -178,11 +181,13 @@ impl ReplayStage {
to_leader_sender: &TvuRotationSender,
ledger_signal_receiver: Receiver<bool>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
subscriptions: &Arc<RpcSubscriptions>,
) -> (Self, EntryReceiver) {
let (ledger_entry_sender, ledger_entry_receiver) = channel();
let exit_ = exit.clone();
let leader_scheduler_ = leader_scheduler.clone();
let to_leader_sender = to_leader_sender.clone();
let subscriptions_ = subscriptions.clone();
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
.spawn(move || {
Expand Down Expand Up @@ -262,6 +267,7 @@ impl ReplayStage {
&mut current_blob_index,
&last_entry_id,
&leader_scheduler_,
&subscriptions_,
) {
error!("process_entries failed: {:?}", e);
}
Expand Down Expand Up @@ -445,6 +451,7 @@ mod test {
&rotation_sender,
l_receiver,
&leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
);

let total_entries_to_send = 2 * ticks_per_slot as usize - 2;
Expand Down Expand Up @@ -548,6 +555,7 @@ mod test {
&to_leader_sender,
l_receiver,
&leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
);

let keypair = voting_keypair.as_ref();
Expand Down Expand Up @@ -673,6 +681,7 @@ mod test {
&rotation_tx,
l_receiver,
&leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
);

let keypair = voting_keypair.as_ref();
Expand Down Expand Up @@ -757,6 +766,7 @@ mod test {
&mut current_blob_index,
&Arc::new(RwLock::new(last_entry_id)),
&leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
);

match res {
Expand All @@ -782,6 +792,7 @@ mod test {
&mut current_blob_index,
&Arc::new(RwLock::new(last_entry_id)),
&leader_scheduler,
&Arc::new(RpcSubscriptions::default()),
);

match res {
Expand Down
Loading