1
1
#![ allow( clippy:: arithmetic_side_effects) ]
2
2
use {
3
+ assert_matches:: assert_matches,
3
4
clap:: { crate_description, crate_name, Arg , ArgEnum , Command } ,
4
5
crossbeam_channel:: { unbounded, Receiver } ,
5
6
log:: * ,
6
7
rand:: { thread_rng, Rng } ,
7
8
rayon:: prelude:: * ,
8
9
solana_client:: connection_cache:: ConnectionCache ,
9
10
solana_core:: {
10
- banking_stage:: BankingStage ,
11
- banking_trace:: { BankingPacketBatch , BankingTracer , BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT } ,
11
+ banking_stage:: { update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage } ,
12
+ banking_trace:: {
13
+ BankingPacketBatch , BankingTracer , Channels , BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT ,
14
+ } ,
12
15
validator:: BlockProductionMethod ,
13
16
} ,
14
17
solana_gossip:: cluster_info:: { ClusterInfo , Node } ,
@@ -29,13 +32,15 @@ use {
29
32
hash:: Hash ,
30
33
message:: Message ,
31
34
pubkey:: { self , Pubkey } ,
35
+ scheduling:: SchedulingMode ,
32
36
signature:: { Keypair , Signature , Signer } ,
33
37
system_instruction, system_transaction,
34
38
timing:: timestamp,
35
39
transaction:: Transaction ,
36
40
} ,
37
41
solana_streamer:: socket:: SocketAddrSpace ,
38
42
solana_tpu_client:: tpu_client:: DEFAULT_TPU_CONNECTION_POOL_SIZE ,
43
+ solana_unified_scheduler_pool:: { DefaultSchedulerPool , SupportedSchedulingMode } ,
39
44
std:: {
40
45
sync:: { atomic:: Ordering , Arc , RwLock } ,
41
46
thread:: sleep,
@@ -347,7 +352,7 @@ fn main() {
347
352
let ( replay_vote_sender, _replay_vote_receiver) = unbounded ( ) ;
348
353
let bank0 = Bank :: new_for_benches ( & genesis_config) ;
349
354
let bank_forks = BankForks :: new_rw_arc ( bank0) ;
350
- let mut bank = bank_forks. read ( ) . unwrap ( ) . working_bank ( ) ;
355
+ let mut bank = bank_forks. read ( ) . unwrap ( ) . working_bank_with_scheduler ( ) ;
351
356
352
357
// set cost tracker limits to MAX so it will not filter out TXs
353
358
bank. write_cost_tracker ( )
@@ -440,9 +445,36 @@ fn main() {
440
445
BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT ,
441
446
) ) )
442
447
. unwrap ( ) ;
443
- let ( non_vote_sender, non_vote_receiver) = banking_tracer. create_channel_non_vote ( ) ;
444
- let ( tpu_vote_sender, tpu_vote_receiver) = banking_tracer. create_channel_tpu_vote ( ) ;
445
- let ( gossip_vote_sender, gossip_vote_receiver) = banking_tracer. create_channel_gossip_vote ( ) ;
448
+ let prioritization_fee_cache = Arc :: new ( PrioritizationFeeCache :: new ( 0u64 ) ) ;
449
+ let scheduler_pool = if matches ! (
450
+ block_production_method,
451
+ BlockProductionMethod :: UnifiedScheduler
452
+ ) {
453
+ let pool = DefaultSchedulerPool :: new (
454
+ SupportedSchedulingMode :: Either ( SchedulingMode :: BlockProduction ) ,
455
+ None ,
456
+ None ,
457
+ None ,
458
+ Some ( replay_vote_sender. clone ( ) ) ,
459
+ prioritization_fee_cache. clone ( ) ,
460
+ poh_recorder. read ( ) . unwrap ( ) . new_recorder ( ) ,
461
+ ) ;
462
+ bank_forks
463
+ . write ( )
464
+ . unwrap ( )
465
+ . install_scheduler_pool ( pool. clone ( ) ) ;
466
+ Some ( pool)
467
+ } else {
468
+ None
469
+ } ;
470
+ let Channels {
471
+ non_vote_sender,
472
+ non_vote_receiver,
473
+ tpu_vote_sender,
474
+ tpu_vote_receiver,
475
+ gossip_vote_sender,
476
+ gossip_vote_receiver,
477
+ } = banking_tracer. create_channels ( scheduler_pool. as_ref ( ) ) ;
446
478
let cluster_info = {
447
479
let keypair = Arc :: new ( Keypair :: new ( ) ) ;
448
480
let node = Node :: new_localhost_with_pubkey ( & keypair. pubkey ( ) ) ;
@@ -462,7 +494,7 @@ fn main() {
462
494
)
463
495
} ;
464
496
let banking_stage = BankingStage :: new_num_threads (
465
- block_production_method,
497
+ block_production_method. clone ( ) ,
466
498
& cluster_info,
467
499
& poh_recorder,
468
500
non_vote_receiver,
@@ -474,10 +506,23 @@ fn main() {
474
506
None ,
475
507
Arc :: new ( connection_cache) ,
476
508
bank_forks. clone ( ) ,
477
- & Arc :: new ( PrioritizationFeeCache :: new ( 0u64 ) ) ,
509
+ & prioritization_fee_cache ,
478
510
false ,
511
+ scheduler_pool,
479
512
) ;
480
513
514
+ // This bench processes transactions, starting from the very first bank, so special-casing is
515
+ // needed for unified scheduler.
516
+ if matches ! (
517
+ block_production_method,
518
+ BlockProductionMethod :: UnifiedScheduler
519
+ ) {
520
+ bank = bank_forks
521
+ . write ( )
522
+ . unwrap ( )
523
+ . reinstall_block_production_scheduler_into_working_genesis_bank ( ) ;
524
+ }
525
+
481
526
// This is so that the signal_receiver does not go out of scope after the closure.
482
527
// If it is dropped before poh_service, then poh_service will error when
483
528
// calling send() on the channel.
@@ -538,33 +583,31 @@ fn main() {
538
583
tx_total_us += now. elapsed ( ) . as_micros ( ) as u64 ;
539
584
540
585
let mut poh_time = Measure :: start ( "poh_time" ) ;
541
- poh_recorder
586
+ let cleared_bank = poh_recorder
542
587
. write ( )
543
588
. unwrap ( )
544
589
. reset ( bank. clone ( ) , Some ( ( bank. slot ( ) , bank. slot ( ) + 1 ) ) ) ;
590
+ assert_matches ! ( cleared_bank, None ) ;
545
591
poh_time. stop ( ) ;
546
592
547
593
let mut new_bank_time = Measure :: start ( "new_bank" ) ;
594
+ if let Some ( ( result, _timings) ) = bank. wait_for_completed_scheduler ( ) {
595
+ assert_matches ! ( result, Ok ( _) ) ;
596
+ }
548
597
let new_slot = bank. slot ( ) + 1 ;
549
- let new_bank = Bank :: new_from_parent ( bank, & collector, new_slot) ;
598
+ let new_bank = Bank :: new_from_parent ( bank. clone ( ) , & collector, new_slot) ;
550
599
new_bank_time. stop ( ) ;
551
600
552
601
let mut insert_time = Measure :: start ( "insert_time" ) ;
553
- bank_forks. write ( ) . unwrap ( ) . insert ( new_bank) ;
554
- bank = bank_forks. read ( ) . unwrap ( ) . working_bank ( ) ;
602
+ update_bank_forks_and_poh_recorder_for_new_tpu_bank (
603
+ & bank_forks,
604
+ & poh_recorder,
605
+ new_bank,
606
+ false ,
607
+ ) ;
608
+ bank = bank_forks. read ( ) . unwrap ( ) . working_bank_with_scheduler ( ) ;
555
609
insert_time. stop ( ) ;
556
610
557
- // set cost tracker limits to MAX so it will not filter out TXs
558
- bank. write_cost_tracker ( )
559
- . unwrap ( )
560
- . set_limits ( u64:: MAX , u64:: MAX , u64:: MAX ) ;
561
-
562
- assert ! ( poh_recorder. read( ) . unwrap( ) . bank( ) . is_none( ) ) ;
563
- poh_recorder
564
- . write ( )
565
- . unwrap ( )
566
- . set_bank_for_test ( bank. clone ( ) ) ;
567
- assert ! ( poh_recorder. read( ) . unwrap( ) . bank( ) . is_some( ) ) ;
568
611
debug ! (
569
612
"new_bank_time: {}us insert_time: {}us poh_time: {}us" ,
570
613
new_bank_time. as_us( ) ,
0 commit comments