Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ members = [
"bench-exchange",
"bench-streamer",
"bench-tps",
"banking_bench",
"chacha-sys",
"client",
"core",
Expand Down
2 changes: 2 additions & 0 deletions banking_bench/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target/
/farf/
23 changes: 23 additions & 0 deletions banking_bench/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
authors = ["Solana Maintainers <maintainers@solana.com>"]
edition = "2018"
name = "solana-banking-bench"
version = "0.19.0-pre0"
repository = "https://github.com/solana-labs/solana"
license = "Apache-2.0"
homepage = "https://solana.com/"

[dependencies]
log = "0.4.6"
rayon = "1.1.0"
solana-core = { path = "../core", version = "0.19.0-pre0" }
solana-logger = { path = "../logger", version = "0.19.0-pre0" }
solana-runtime = { path = "../runtime", version = "0.19.0-pre0" }
solana-measure = { path = "../measure", version = "0.19.0-pre0" }
solana-sdk = { path = "../sdk", version = "0.19.0-pre0" }
rand = "0.6.5"
crossbeam-channel = "0.3"

[features]
cuda = []

317 changes: 317 additions & 0 deletions banking_bench/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
#[macro_use]
extern crate solana_core;
extern crate crossbeam_channel;

use crossbeam_channel::unbounded;
use log::*;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana_core::bank_forks::BankForks;
use solana_core::banking_stage::{create_test_recorder, BankingStage};
use solana_core::blocktree::{get_tmp_ledger_path, Blocktree};
use solana_core::cluster_info::ClusterInfo;
use solana_core::cluster_info::Node;
use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo};
use solana_core::packet::to_packets_chunked;
use solana_core::poh_recorder::PohRecorder;
use solana_core::poh_recorder::WorkingBankEntries;
use solana_core::service::Service;
use solana_measure::measure::Measure;
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::signature::Signature;
use solana_sdk::system_transaction;
use solana_sdk::timing::{duration_as_us, timestamp};
use solana_sdk::transaction::Transaction;
use std::iter;
use std::sync::atomic::Ordering;
use std::sync::mpsc::Receiver;
use std::sync::{Arc, Mutex, RwLock};
use std::thread::sleep;
use std::time::{Duration, Instant};

fn check_txs(
receiver: &Arc<Receiver<WorkingBankEntries>>,
ref_tx_count: usize,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> bool {
let mut total = 0;
let now = Instant::now();
let mut no_bank = false;
loop {
let entries = receiver.recv_timeout(Duration::from_millis(10));
if let Ok((_, entries)) = entries {
for (entry, _) in &entries {
total += entry.transactions.len();
}
}
if total >= ref_tx_count {
break;
}
if now.elapsed().as_secs() > 60 {
break;
}
if poh_recorder.lock().unwrap().bank().is_none() {
trace!("no bank");
no_bank = true;
break;
}
}
if !no_bank {
assert!(total >= ref_tx_count);
}
no_bank
}

fn make_accounts_txs(txes: usize, mint_keypair: &Keypair, hash: Hash) -> Vec<Transaction> {
let to_pubkey = Pubkey::new_rand();
let dummy = system_transaction::transfer(mint_keypair, &to_pubkey, 1, hash);
(0..txes)
.into_par_iter()
.map(|_| {
let mut new = dummy.clone();
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
new.message.account_keys[0] = Pubkey::new_rand();
new.message.account_keys[1] = Pubkey::new_rand();
new.signatures = vec![Signature::new(&sig[0..64])];
new
})
.collect()
}

struct Config {
packets_per_batch: usize,
chunk_len: usize,
num_threads: usize,
}

impl Config {
fn get_transactions_index(&self, chunk_index: usize) -> usize {
chunk_index * (self.chunk_len / self.num_threads) * self.packets_per_batch
}
}

fn bytes_as_usize(bytes: &[u8]) -> usize {
bytes[0] as usize | (bytes[1] as usize) << 8
}

fn main() {
solana_logger::setup();
let num_threads = BankingStage::num_threads() as usize;
// a multiple of packet chunk duplicates to avoid races
const CHUNKS: usize = 8 * 2;
const PACKETS_PER_BATCH: usize = 192;
let txes = PACKETS_PER_BATCH * num_threads * CHUNKS;
let mint_total = 1_000_000_000_000;
let GenesisBlockInfo {
genesis_block,
mint_keypair,
..
} = create_genesis_block(mint_total);

let (verified_sender, verified_receiver) = unbounded();
let (vote_sender, vote_receiver) = unbounded();
let bank0 = Bank::new(&genesis_block);
let mut bank_forks = BankForks::new(0, bank0);
let mut bank = bank_forks.working_bank();

info!("threads: {} txs: {}", num_threads, txes);

let mut transactions = make_accounts_txs(txes, &mint_keypair, genesis_block.hash());

// fund all the accounts
transactions.iter().for_each(|tx| {
let fund = system_transaction::transfer(
&mint_keypair,
&tx.message.account_keys[0],
mint_total / txes as u64,
genesis_block.hash(),
);
let x = bank.process_transaction(&fund);
x.unwrap();
});
//sanity check, make sure all the transactions can execute sequentially
transactions.iter().for_each(|tx| {
let res = bank.process_transaction(&tx);
assert!(res.is_ok(), "sanity test transactions");
});
bank.clear_signatures();
//sanity check, make sure all the transactions can execute in parallel
let res = bank.process_transactions(&transactions);
for r in res {
assert!(r.is_ok(), "sanity parallel execution");
}
bank.clear_signatures();
let mut verified: Vec<_> = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH)
.into_iter()
.map(|x| {
let len = x.packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
let ledger_path = get_tmp_ledger_path!();
{
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let (exit, poh_recorder, poh_service, signal_receiver) =
create_test_recorder(&bank, &blocktree);
let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let _banking_stage = BankingStage::new(
&cluster_info,
&poh_recorder,
verified_receiver,
vote_receiver,
);
poh_recorder.lock().unwrap().set_bank(&bank);

let chunk_len = verified.len() / CHUNKS;
let mut start = 0;

// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
// calling send() on the channel.
let signal_receiver = Arc::new(signal_receiver);
let signal_receiver2 = signal_receiver.clone();
let mut total = 0;
let mut tx_total = 0;
let mut txs_processed = 0;
let mut root = 1;
let collector = Pubkey::new_rand();
const ITERS: usize = 1_000;
let config = Config {
packets_per_batch: PACKETS_PER_BATCH,
chunk_len,
num_threads,
};
for _ in 0..ITERS {
let now = Instant::now();
let mut sent = 0;

for (i, v) in verified[start..start + chunk_len]
.chunks(chunk_len / num_threads)
.enumerate()
{
let mut byte = 0;
let index = config.get_transactions_index(start + i);
if index < transactions.len() {
byte = bytes_as_usize(transactions[index].signatures[0].as_ref());
}
trace!(
"sending... {}..{} {} v.len: {} sig: {} transactions.len: {} index: {}",
start + i,
start + chunk_len,
timestamp(),
v.len(),
byte,
transactions.len(),
index,
);
for xv in v {
sent += xv.0.packets.len();
}
verified_sender.send(v.to_vec()).unwrap();
}
let start_tx_index = config.get_transactions_index(start);
let end_tx_index = config.get_transactions_index(start + chunk_len);
for tx in &transactions[start_tx_index..end_tx_index] {
loop {
if bank.get_signature_status(&tx.signatures[0]).is_some() {
break;
}
if poh_recorder.lock().unwrap().bank().is_none() {
break;
}
sleep(Duration::from_millis(5));
}
}
if check_txs(&signal_receiver2, txes / CHUNKS, &poh_recorder) {
debug!(
"resetting bank {} tx count: {} txs_proc: {}",
bank.slot(),
bank.transaction_count(),
txs_processed
);
assert!(txs_processed < bank.transaction_count());
txs_processed = bank.transaction_count();
tx_total += duration_as_us(&now.elapsed());
let mut new_bank_time = Measure::start("new_bank");
new_bank_time.stop();
let mut insert_time = Measure::start("insert_time");
insert_time.stop();
let mut poh_time = Measure::start("poh_time");
poh_recorder.lock().unwrap().reset(
bank.last_blockhash(),
bank.slot(),
Some((bank.slot(), bank.slot() + 1)),
);
let new_bank = Bank::new_from_parent(&bank, &collector, bank.slot() + 1);
bank_forks.insert(new_bank);
bank = bank_forks.working_bank();
poh_recorder.lock().unwrap().set_bank(&bank);
assert!(poh_recorder.lock().unwrap().bank().is_some());
if bank.slot() > 32 {
bank_forks.set_root(root, &None);
root += 1;
}
poh_time.stop();
debug!(
"new_bank_time: {}us insert_time: {}us poh_time: {}us",
new_bank_time.as_us(),
insert_time.as_us(),
poh_time.as_us(),
);
} else {
tx_total += duration_as_us(&now.elapsed());
}

// This signature clear may not actually clear the signatures
// in this chunk, but since we rotate between CHUNKS then
// we should clear them by the time we come around again to re-use that chunk.
bank.clear_signatures();
total += duration_as_us(&now.elapsed());
debug!(
"time: {} us checked: {} sent: {}",
duration_as_us(&now.elapsed()),
txes / CHUNKS,
sent,
);

if bank.slot() > 0 && bank.slot() % 16 == 0 {
for tx in transactions.iter_mut() {
tx.message.recent_blockhash = bank.last_blockhash();
let sig: Vec<u8> = (0..64).map(|_| thread_rng().gen()).collect();
tx.signatures[0] = Signature::new(&sig[0..64]);
}
verified = to_packets_chunked(&transactions.clone(), PACKETS_PER_BATCH)
.into_iter()
.map(|x| {
let len = x.packets.len();
(x, iter::repeat(1).take(len).collect())
})
.collect();
}

start += chunk_len;
start %= verified.len();
}
eprintln!(
"{{'name': 'banking_bench_total', 'median': '{}'}}",
total / ITERS as u64,
);
eprintln!(
"{{'name': 'banking_bench_tx_total', 'median': '{}'}}",
tx_total / ITERS as u64,
);

drop(vote_sender);
exit.store(true, Ordering::Relaxed);
poh_service.join().unwrap();
sleep(Duration::from_secs(1));
debug!("waited for poh_service");
}
let _unused = Blocktree::destroy(&ledger_path);
}
3 changes: 3 additions & 0 deletions ci/test-bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ _ cargo +$rust_nightly bench --manifest-path core/Cargo.toml ${V:+--verbose} \
_ cargo +$rust_nightly bench --manifest-path programs/bpf/Cargo.toml ${V:+--verbose} --features=bpf_c \
-- -Z unstable-options --format=json --nocapture | tee -a "$BENCH_FILE"

# Run banking bench. Doesn't require nightly, but use since it is already built.
_ cargo +$rust_nightly run --release --manifest-path banking_bench/Cargo.toml ${V:+--verbose} | tee -a "$BENCH_FILE"

# TODO: debug why solana-upload-perf takes over 30 minutes to complete.
exit 0

Expand Down