Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Change client-demo to run continuosly for some amount of time
Browse files Browse the repository at this point in the history
Also retry for get_last_id/transaction_count if dropped.
  • Loading branch information
sakridge committed Jun 15, 2018
1 parent b3a05d6 commit 88cb731
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 88 deletions.
202 changes: 129 additions & 73 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use solana::ncp::Ncp;
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window;
use solana::thin_client::ThinClient;
use solana::timing::{duration_as_ms, duration_as_s};
use solana::transaction::Transaction;
use std::env;
use std::fs::File;
Expand All @@ -24,6 +25,7 @@ use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::Builder;
use std::thread::JoinHandle;
use std::time::Duration;
use std::time::Instant;
Expand All @@ -37,16 +39,64 @@ fn print_usage(program: &str, opts: Options) {
print!("{}", opts.usage(&brief));
}

fn sample_tx_count(
thread_addr: Arc<RwLock<SocketAddr>>,
exit: Arc<AtomicBool>,
maxes: Arc<RwLock<Vec<(f64, u64)>>>,
first_count: u64,
v: ReplicatedData,
sample_period: u64,
) {
let mut client = mk_client(&thread_addr, &v);
let mut now = Instant::now();
let mut initial_tx_count = client.transaction_count();
let mut max_tps = 0.0;
let mut total;
loop {
let tx_count = client.transaction_count();
let duration = now.elapsed();
now = Instant::now();
let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count;
println!("{}: Transactions processed {}", v.transactions_addr, sample);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (sample * 1_000_000_000) as f64 / ns as f64;
if tps > max_tps {
max_tps = tps;
}
println!("{}: {} tps", v.transactions_addr, tps);
total = tx_count - first_count;
println!(
"{}: Total Transactions processed {}",
v.transactions_addr, total
);
sleep(Duration::new(sample_period, 0));

if exit.load(Ordering::Relaxed) {
println!("exiting validator thread");
maxes.write().unwrap().push((max_tps, total));
break;
}
}
}

fn main() {
env_logger::init();
let mut threads = 4usize;
let mut num_nodes = 1usize;
let mut time_sec = 60;

let mut opts = Options::new();
opts.optopt("l", "", "leader", "leader.json");
opts.optopt("c", "", "client port", "port");
opts.optopt("t", "", "number of threads", &format!("{}", threads));
opts.optflag("d", "dyn", "detect network address dynamically");
opts.optopt(
"s",
"",
"send transactions for this many seconds",
&format!("{}", time_sec),
);
opts.optopt(
"n",
"",
Expand Down Expand Up @@ -83,6 +133,9 @@ fn main() {
if matches.opt_present("n") {
num_nodes = matches.opt_str("n").unwrap().parse().expect("integer");
}
if matches.opt_present("s") {
time_sec = matches.opt_str("s").unwrap().parse().expect("integer");
}

let leader = if matches.opt_present("l") {
read_leader(matches.opt_str("l").unwrap())
Expand Down Expand Up @@ -122,7 +175,7 @@ fn main() {
let mut client = mk_client(&client_addr, &leader);

println!("Get last ID...");
let last_id = client.get_last_id();
let mut last_id = client.get_last_id();
println!("Got last ID {:?}", last_id);

let mut seed = [0u8; 32];
Expand All @@ -134,85 +187,84 @@ fn main() {
let keypairs = rnd.gen_n_keypairs(demo.num_accounts);
let keypair_pairs: Vec<_> = keypairs.chunks(2).collect();

println!("Signing transactions...");
let now = Instant::now();
let transactions: Vec<_> = keypair_pairs
.into_par_iter()
.map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id))
.collect();
let duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = txs as f64 / ns as f64;
let nsps = ns as f64 / txs as f64;
println!(
"Done. {} thousand signatures per second, {}us per signature",
bsps * 1_000_000_f64,
nsps / 1_000_f64
);

let first_count = client.transaction_count();
println!("initial count {}", first_count);

println!("Transfering {} transactions in {} batches", txs, threads);
let sz = transactions.len() / threads;
let chunks: Vec<_> = transactions.chunks(sz).collect();
chunks.into_par_iter().for_each(|txs| {
println!(
"Transferring 1 unit {} times... to {:?}",
txs.len(),
leader.transactions_addr
);
let client = mk_client(&client_addr, &leader);
for tx in txs {
client.transfer_signed(tx.clone()).unwrap();
}
});
println!("Sampling tps every second...",);

let maxes = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
println!("Sampling tps every second...",);
let maxes: Vec<_> = validators
.into_par_iter()
.map(|val| {
let mut client = mk_client(&client_addr, &val);
let mut now = Instant::now();
let mut initial_tx_count = client.transaction_count();
let mut max_tps = 0.0;
let mut total = 0;
for i in 0..100 {
let tx_count = client.transaction_count();
let duration = now.elapsed();
now = Instant::now();
let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count;
println!(
"{}: Transactions processed {}",
val.transactions_addr, sample
);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (sample * 1_000_000_000) as f64 / ns as f64;
if tps > max_tps {
max_tps = tps;
}
println!("{}: {} tps", val.transactions_addr, tps);
total = tx_count - first_count;
println!(
"{}: Total Transactions processed {}",
val.transactions_addr, total
);
if total == transactions.len() as u64 {
break;
}
if i > 20 && sample == 0 {
break;
}
sleep(Duration::new(sample_period, 0));
}
(max_tps, total)
let v_threads: Vec<_> = validators
.into_iter()
.map(|v| {
let exit = signal.clone();
let thread_addr = client_addr.clone();
let maxes = maxes.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_tx_count(thread_addr, exit, maxes, first_count, v, sample_period);
})
.unwrap()
})
.collect();

let time = Duration::new(time_sec, 0);
let now = Instant::now();
while now.elapsed() < time {
println!(
"Signing transactions... {} {}",
keypair_pairs.len(),
keypair_pairs[0].len()
);
let signing_start = Instant::now();
let transactions: Vec<_> = keypair_pairs
.par_iter()
.map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id))
.collect();

let duration = signing_start.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = txs as f64 / ns as f64;
let nsps = ns as f64 / txs as f64;
println!(
"Done. {} thousand signatures per second, {}us per signature",
bsps * 1_000_000_f64,
nsps / 1_000_f64
);

println!("Transfering {} transactions in {} batches", txs, threads);
let transfer_start = Instant::now();
let sz = transactions.len() / threads;
let chunks: Vec<_> = transactions.chunks(sz).collect();
chunks.into_par_iter().for_each(|txs| {
println!(
"Transferring 1 unit {} times... to {:?}",
txs.len(),
leader.transactions_addr
);
let client = mk_client(&client_addr, &leader);
for tx in txs {
client.transfer_signed(tx.clone()).unwrap();
}
});
println!(
"Transfer done. {:?} ms {} tps",
duration_as_ms(&transfer_start.elapsed()),
txs as f32 / (duration_as_s(&transfer_start.elapsed()))
);

last_id = client.get_last_id();
}

signal.store(true, Ordering::Relaxed);
for t in v_threads {
t.join().unwrap();
}

let mut max_of_maxes = 0.0;
let mut total_txs = 0;
for (max, txs) in &maxes {
for (max, txs) in maxes.read().unwrap().iter() {
if *max > max_of_maxes {
max_of_maxes = *max;
}
Expand All @@ -223,9 +275,9 @@ fn main() {
max_of_maxes,
sample_period,
total_txs,
maxes.len()
maxes.read().unwrap().len()
);
signal.store(true, Ordering::Relaxed);

for t in c_threads {
t.join().unwrap();
}
Expand All @@ -237,6 +289,10 @@ fn mk_client(locked_addr: &Arc<RwLock<SocketAddr>>, r: &ReplicatedData) -> ThinC
let transactions_socket = UdpSocket::bind(addr.clone()).unwrap();
addr.set_port(port + 1);
let requests_socket = UdpSocket::bind(addr.clone()).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();

addr.set_port(port + 2);
ThinClient::new(
r.requests_addr,
Expand Down
34 changes: 19 additions & 15 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,19 @@ impl ThinClient {
let req = Request::GetTransactionCount;
let data =
serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count");
self.requests_socket
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn transaction_count");
let mut done = false;
while !done {
let resp = self.recv_response().expect("transaction count dropped");
info!("recv_response {:?}", resp);
if let &Response::TransactionCount { .. } = &resp {
done = true;
self.requests_socket
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn transaction_count");

if let Ok(resp) = self.recv_response() {
info!("recv_response {:?}", resp);
if let &Response::TransactionCount { .. } = &resp {
done = true;
}
self.process_response(resp);
}
self.process_response(resp);
}
self.transaction_count
}
Expand All @@ -142,16 +144,18 @@ impl ThinClient {
info!("get_last_id");
let req = Request::GetLastId;
let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id");
self.requests_socket
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn get_last_id");
let mut done = false;
while !done {
let resp = self.recv_response().expect("get_last_id response");
if let &Response::LastId { .. } = &resp {
done = true;
self.requests_socket
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn get_last_id");

if let Ok(resp) = self.recv_response() {
if let &Response::LastId { .. } = &resp {
done = true;
}
self.process_response(resp);
}
self.process_response(resp);
}
self.last_id.expect("some last_id")
}
Expand Down

0 comments on commit 88cb731

Please sign in to comment.