Skip to content

Commit 6b6a03b

Browse files
authored
Apply cleanups to solana-core for unified scheduler (anza-xyz#4123)
* Setup PohRecorder earlier for unified scheduler * Use BankWithScheduler in banking-bench for US * Move BankingPacket{Batch,Receiver} to new crate * Extract duplicate tpu bank handling code for US * Call wait_for_completed_scheduler in banking simulator
1 parent 45a27ae commit 6b6a03b

21 files changed

+160
-76
lines changed

Cargo.lock

+11
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ members = [
1919
"accounts-db/store-histogram",
2020
"accounts-db/store-tool",
2121
"banking-bench",
22+
"banking-stage-ingress-types",
2223
"banks-client",
2324
"banks-interface",
2425
"banks-server",
@@ -255,6 +256,7 @@ check-cfg = [
255256

256257
[workspace.dependencies]
257258
Inflector = "0.11.4"
259+
agave-banking-stage-ingress-types = { path = "banking-stage-ingress-types", version = "=2.2.0" }
258260
agave-transaction-view = { path = "transaction-view", version = "=2.2.0" }
259261
aquamarine = "0.3.3"
260262
aes-gcm-siv = "0.11.1"

banking-bench/Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ license = { workspace = true }
99
edition = { workspace = true }
1010

1111
[dependencies]
12+
agave-banking-stage-ingress-types = { workspace = true }
13+
assert_matches = { workspace = true }
1214
clap = { version = "3.1.8", features = ["derive", "cargo"] }
1315
crossbeam-channel = { workspace = true }
1416
log = { workspace = true }
1517
rand = { workspace = true }
1618
rayon = { workspace = true }
1719
solana-client = { workspace = true }
18-
solana-core = { workspace = true }
20+
solana-core = { workspace = true, features = ["dev-context-only-utils"] }
1921
solana-gossip = { workspace = true }
2022
solana-ledger = { workspace = true }
2123
solana-logger = { workspace = true }

banking-bench/src/main.rs

+18-15
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
#![allow(clippy::arithmetic_side_effects)]
22
use {
3+
agave_banking_stage_ingress_types::BankingPacketBatch,
4+
assert_matches::assert_matches,
35
clap::{crate_description, crate_name, Arg, ArgEnum, Command},
46
crossbeam_channel::{unbounded, Receiver},
57
log::*,
68
rand::{thread_rng, Rng},
79
rayon::prelude::*,
810
solana_client::connection_cache::ConnectionCache,
911
solana_core::{
10-
banking_stage::BankingStage,
11-
banking_trace::{
12-
BankingPacketBatch, BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
13-
},
12+
banking_stage::{update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage},
13+
banking_trace::{BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT},
1414
validator::BlockProductionMethod,
1515
},
1616
solana_gossip::cluster_info::{ClusterInfo, Node},
@@ -349,7 +349,7 @@ fn main() {
349349
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
350350
let bank0 = Bank::new_for_benches(&genesis_config);
351351
let bank_forks = BankForks::new_rw_arc(bank0);
352-
let mut bank = bank_forks.read().unwrap().working_bank();
352+
let mut bank = bank_forks.read().unwrap().working_bank_with_scheduler();
353353

354354
// set cost tracker limits to MAX so it will not filter out TXs
355355
bank.write_cost_tracker()
@@ -552,21 +552,24 @@ fn main() {
552552
poh_time.stop();
553553

554554
let mut new_bank_time = Measure::start("new_bank");
555+
if let Some((result, _timings)) = bank.wait_for_completed_scheduler() {
556+
assert_matches!(result, Ok(_));
557+
}
555558
let new_slot = bank.slot() + 1;
556-
let new_bank = Bank::new_from_parent(bank, &collector, new_slot);
559+
let new_bank = Bank::new_from_parent(bank.clone(), &collector, new_slot);
557560
new_bank_time.stop();
558561

559562
let mut insert_time = Measure::start("insert_time");
560-
bank_forks.write().unwrap().insert(new_bank);
561-
bank = bank_forks.read().unwrap().working_bank();
563+
assert_matches!(poh_recorder.read().unwrap().bank(), None);
564+
update_bank_forks_and_poh_recorder_for_new_tpu_bank(
565+
&bank_forks,
566+
&poh_recorder,
567+
new_bank,
568+
false,
569+
);
570+
bank = bank_forks.read().unwrap().working_bank_with_scheduler();
571+
assert_matches!(poh_recorder.read().unwrap().bank(), Some(_));
562572
insert_time.stop();
563-
564-
assert!(poh_recorder.read().unwrap().bank().is_none());
565-
poh_recorder
566-
.write()
567-
.unwrap()
568-
.set_bank_for_test(bank.clone());
569-
assert!(poh_recorder.read().unwrap().bank().is_some());
570573
debug!(
571574
"new_bank_time: {}us insert_time: {}us poh_time: {}us",
572575
new_bank_time.as_us(),
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[package]
2+
name = "agave-banking-stage-ingress-types"
3+
description = "Agave banking stage ingress types"
4+
documentation = "https://docs.rs/agave-banking-stage-ingress-types"
5+
version = { workspace = true }
6+
authors = { workspace = true }
7+
repository = { workspace = true }
8+
homepage = { workspace = true }
9+
license = { workspace = true }
10+
edition = { workspace = true }
11+
12+
[dependencies]
13+
crossbeam-channel = { workspace = true }
14+
solana-perf = { workspace = true }
+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
use {crossbeam_channel::Receiver, solana_perf::packet::PacketBatch, std::sync::Arc};
2+
3+
pub type BankingPacketBatch = Arc<Vec<PacketBatch>>;
4+
pub type BankingPacketReceiver = Receiver<BankingPacketBatch>;

core/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ edition = { workspace = true }
1414
codecov = { repository = "solana-labs/solana", branch = "master", service = "github" }
1515

1616
[dependencies]
17+
agave-banking-stage-ingress-types = { workspace = true }
1718
ahash = { workspace = true }
1819
anyhow = { workspace = true }
1920
arrayvec = { workspace = true }
21+
assert_matches = { workspace = true }
2022
base64 = { workspace = true }
2123
bincode = { workspace = true }
2224
bs58 = { workspace = true }

core/benches/banking_stage.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#![feature(test)]
33

44
use {
5+
agave_banking_stage_ingress_types::BankingPacketBatch,
56
solana_core::{banking_trace::Channels, validator::BlockProductionMethod},
67
solana_vote_program::{vote_state::TowerSync, vote_transaction::new_tower_sync_transaction},
78
};
@@ -24,7 +25,7 @@ use {
2425
unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage},
2526
BankingStage, BankingStageStats,
2627
},
27-
banking_trace::{BankingPacketBatch, BankingTracer},
28+
banking_trace::BankingTracer,
2829
},
2930
solana_entry::entry::{next_hash, Entry},
3031
solana_gossip::cluster_info::{ClusterInfo, Node},

core/benches/banking_trace.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
extern crate test;
44

55
use {
6+
agave_banking_stage_ingress_types::BankingPacketBatch,
67
solana_core::banking_trace::{
78
for_test::{
89
drop_and_clean_temp_dir_unless_suppressed, sample_packet_batch, terminate_tracer,
910
},
10-
receiving_loop_with_minimized_sender_overhead, BankingPacketBatch, BankingTracer, Channels,
11-
TraceError, TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
11+
receiving_loop_with_minimized_sender_overhead, BankingTracer, Channels, TraceError,
12+
TracerThreadResult, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
1213
},
1314
std::{
1415
path::PathBuf,

core/src/banking_simulation.rs

+16-9
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
#![cfg(feature = "dev-context-only-utils")]
22
use {
33
crate::{
4-
banking_stage::{BankingStage, LikeClusterInfo},
4+
banking_stage::{
5+
update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage, LikeClusterInfo,
6+
},
57
banking_trace::{
6-
BankingPacketBatch, BankingTracer, ChannelLabel, Channels, TimedTracedEvent,
7-
TracedEvent, TracedSender, TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
8-
BASENAME,
8+
BankingTracer, ChannelLabel, Channels, TimedTracedEvent, TracedEvent, TracedSender,
9+
TracerThread, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, BASENAME,
910
},
1011
validator::BlockProductionMethod,
1112
},
13+
agave_banking_stage_ingress_types::BankingPacketBatch,
14+
assert_matches::assert_matches,
1215
bincode::deserialize_from,
1316
crossbeam_channel::{unbounded, Sender},
1417
itertools::Itertools,
@@ -450,6 +453,9 @@ impl SimulatorLoop {
450453
info!("Bank::new_from_parent()!");
451454

452455
logger.log_jitter(&bank);
456+
if let Some((result, _execute_timings)) = bank.wait_for_completed_scheduler() {
457+
assert_matches!(result, Ok(()));
458+
}
453459
bank.freeze();
454460
let new_slot = if bank.slot() == self.parent_slot {
455461
info!("initial leader block!");
@@ -484,16 +490,17 @@ impl SimulatorLoop {
484490
logger.log_frozen_bank_cost(&bank);
485491
}
486492
self.retransmit_slots_sender.send(bank.slot()).unwrap();
487-
self.bank_forks.write().unwrap().insert(new_bank);
493+
update_bank_forks_and_poh_recorder_for_new_tpu_bank(
494+
&self.bank_forks,
495+
&self.poh_recorder,
496+
new_bank,
497+
false,
498+
);
488499
bank = self
489500
.bank_forks
490501
.read()
491502
.unwrap()
492503
.working_bank_with_scheduler();
493-
self.poh_recorder
494-
.write()
495-
.unwrap()
496-
.set_bank(bank.clone_with_scheduler(), false);
497504
} else {
498505
logger.log_ongoing_bank_cost(&bank);
499506
}

core/src/banking_stage.rs

+20-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
//! to construct a software pipeline. The stage uses all available CPU cores and
33
//! can do its processing in parallel with signature verification on the GPU.
44
5+
#[cfg(feature = "dev-context-only-utils")]
6+
use qualifier_attr::qualifiers;
57
use {
68
self::{
79
committer::Committer,
@@ -23,9 +25,9 @@ use {
2325
scheduler_controller::SchedulerController, scheduler_error::SchedulerError,
2426
},
2527
},
26-
banking_trace::BankingPacketReceiver,
2728
validator::BlockProductionMethod,
2829
},
30+
agave_banking_stage_ingress_types::BankingPacketReceiver,
2931
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
3032
histogram::Histogram,
3133
solana_client::connection_cache::ConnectionCache,
@@ -35,7 +37,7 @@ use {
3537
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
3638
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
3739
solana_runtime::{
38-
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
40+
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
3941
vote_sender_types::ReplayVoteSender,
4042
},
4143
solana_sdk::{pubkey::Pubkey, timing::AtomicInterval},
@@ -716,11 +718,26 @@ impl BankingStage {
716718
}
717719
}
718720

721+
#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
722+
pub(crate) fn update_bank_forks_and_poh_recorder_for_new_tpu_bank(
723+
bank_forks: &RwLock<BankForks>,
724+
poh_recorder: &RwLock<PohRecorder>,
725+
tpu_bank: Bank,
726+
track_transaction_indexes: bool,
727+
) {
728+
let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank);
729+
poh_recorder
730+
.write()
731+
.unwrap()
732+
.set_bank(tpu_bank, track_transaction_indexes);
733+
}
734+
719735
#[cfg(test)]
720736
mod tests {
721737
use {
722738
super::*,
723-
crate::banking_trace::{BankingPacketBatch, BankingTracer, Channels},
739+
crate::banking_trace::{BankingTracer, Channels},
740+
agave_banking_stage_ingress_types::BankingPacketBatch,
724741
crossbeam_channel::{unbounded, Receiver},
725742
itertools::Itertools,
726743
solana_entry::entry::{self, Entry, EntrySlice},

core/src/banking_stage/packet_deserializer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use {
55
immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket},
66
packet_filter::PacketFilterFailure,
77
},
8-
crate::banking_trace::{BankingPacketBatch, BankingPacketReceiver},
8+
agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver},
99
crossbeam_channel::RecvTimeoutError,
1010
solana_perf::packet::PacketBatch,
1111
solana_sdk::saturating_add_assign,

core/src/banking_stage/packet_receiver.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use {
66
unprocessed_transaction_storage::UnprocessedTransactionStorage,
77
BankingStageStats,
88
},
9-
crate::banking_trace::BankingPacketReceiver,
9+
agave_banking_stage_ingress_types::BankingPacketReceiver,
1010
crossbeam_channel::RecvTimeoutError,
1111
solana_measure::{measure::Measure, measure_us},
1212
solana_sdk::{saturating_add_assign, timing::timestamp},

core/src/banking_stage/transaction_scheduler/scheduler_controller.rs

+9-11
Original file line numberDiff line numberDiff line change
@@ -435,19 +435,17 @@ impl<C: LikeClusterInfo, R: ReceiveAndBuffer> SchedulerController<C, R> {
435435
mod tests {
436436
use {
437437
super::*,
438-
crate::{
439-
banking_stage::{
440-
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
441-
packet_deserializer::PacketDeserializer,
442-
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
443-
tests::create_slow_genesis_config,
444-
transaction_scheduler::{
445-
prio_graph_scheduler::PrioGraphSchedulerConfig,
446-
receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
447-
},
438+
crate::banking_stage::{
439+
consumer::TARGET_NUM_TRANSACTIONS_PER_BATCH,
440+
packet_deserializer::PacketDeserializer,
441+
scheduler_messages::{ConsumeWork, FinishedConsumeWork, TransactionBatchId},
442+
tests::create_slow_genesis_config,
443+
transaction_scheduler::{
444+
prio_graph_scheduler::PrioGraphSchedulerConfig,
445+
receive_and_buffer::SanitizedTransactionReceiveAndBuffer,
448446
},
449-
banking_trace::BankingPacketBatch,
450447
},
448+
agave_banking_stage_ingress_types::BankingPacketBatch,
451449
crossbeam_channel::{unbounded, Receiver, Sender},
452450
itertools::Itertools,
453451
solana_gossip::cluster_info::ClusterInfo,

core/src/banking_trace.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use {
2+
agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver},
23
bincode::serialize_into,
34
chrono::{DateTime, Local},
45
crossbeam_channel::{unbounded, Receiver, SendError, Sender, TryRecvError},
56
rolling_file::{RollingCondition, RollingConditionBasic, RollingFileAppender},
6-
solana_perf::packet::PacketBatch,
77
solana_sdk::{hash::Hash, slot_history::Slot},
88
std::{
99
fs::{create_dir_all, remove_dir_all},
@@ -19,9 +19,7 @@ use {
1919
thiserror::Error,
2020
};
2121

22-
pub type BankingPacketBatch = Arc<Vec<PacketBatch>>;
2322
pub type BankingPacketSender = TracedSender;
24-
pub type BankingPacketReceiver = Receiver<BankingPacketBatch>;
2523
pub type TracerThreadResult = Result<(), TraceError>;
2624
pub type TracerThread = Option<JoinHandle<TracerThreadResult>>;
2725
pub type DirByteLimit = u64;

core/src/cluster_info_vote_listener.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use {
22
crate::{
3-
banking_trace::{BankingPacketBatch, BankingPacketSender},
3+
banking_trace::BankingPacketSender,
44
consensus::vote_stake_tracker::VoteStakeTracker,
55
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
66
replay_stage::DUPLICATE_THRESHOLD,
77
result::{Error, Result},
88
sigverify,
99
},
10+
agave_banking_stage_ingress_types::BankingPacketBatch,
1011
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Select, Sender},
1112
log::*,
1213
solana_gossip::{

0 commit comments

Comments
 (0)