Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuous client #362

Merged
merged 3 commits into from
Jun 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 158 additions & 74 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ use atty::{is, Stream};
use getopts::Options;
use rayon::prelude::*;
use solana::crdt::{get_ip_addr, Crdt, ReplicatedData};
use solana::hash::Hash;
use solana::mint::MintDemo;
use solana::ncp::Ncp;
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window;
use solana::thin_client::ThinClient;
use solana::timing::{duration_as_ms, duration_as_s};
use solana::transaction::Transaction;
use std::env;
use std::fs::File;
Expand All @@ -24,6 +26,7 @@ use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::Builder;
use std::thread::JoinHandle;
use std::time::Duration;
use std::time::Instant;
Expand All @@ -37,16 +40,119 @@ fn print_usage(program: &str, opts: Options) {
print!("{}", opts.usage(&brief));
}

fn sample_tx_count(
thread_addr: Arc<RwLock<SocketAddr>>,
exit: Arc<AtomicBool>,
maxes: Arc<RwLock<Vec<(f64, u64)>>>,
first_count: u64,
v: ReplicatedData,
sample_period: u64,
) {
let mut client = mk_client(&thread_addr, &v);
let mut now = Instant::now();
let mut initial_tx_count = client.transaction_count();
let mut max_tps = 0.0;
let mut total;
loop {
let tx_count = client.transaction_count();
let duration = now.elapsed();
now = Instant::now();
let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count;
println!("{}: Transactions processed {}", v.transactions_addr, sample);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (sample * 1_000_000_000) as f64 / ns as f64;
if tps > max_tps {
max_tps = tps;
}
println!("{}: {:.2} tps", v.transactions_addr, tps);
total = tx_count - first_count;
println!(
"{}: Total Transactions processed {}",
v.transactions_addr, total
);
sleep(Duration::new(sample_period, 0));

if exit.load(Ordering::Relaxed) {
println!("exiting validator thread");
maxes.write().unwrap().push((max_tps, total));
break;
}
}
}

fn generate_and_send_txs(
client: &mut ThinClient,
keypair_pairs: &Vec<&[KeyPair]>,
leader: &ReplicatedData,
txs: i64,
last_id: &mut Hash,
threads: usize,
client_addr: Arc<RwLock<SocketAddr>>,
) {
println!(
"Signing transactions... {} {}",
keypair_pairs.len(),
keypair_pairs[0].len()
);
let signing_start = Instant::now();
let transactions: Vec<_> = keypair_pairs
.par_iter()
.map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, *last_id))
.collect();

let duration = signing_start.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = txs as f64 / ns as f64;
let nsps = ns as f64 / txs as f64;
println!(
"Done. {:.2} thousand signatures per second, {:.2} us per signature, {} ms total time",
bsps * 1_000_000_f64,
nsps / 1_000_f64,
duration_as_ms(&duration),
);

println!("Transfering {} transactions in {} batches", txs, threads);
let transfer_start = Instant::now();
let sz = transactions.len() / threads;
let chunks: Vec<_> = transactions.chunks(sz).collect();
chunks.into_par_iter().for_each(|txs| {
println!(
"Transferring 1 unit {} times... to {:?}",
txs.len(),
leader.transactions_addr
);
let client = mk_client(&client_addr, &leader);
for tx in txs {
client.transfer_signed(tx.clone()).unwrap();
}
});
println!(
"Transfer done. {:?} ms {} tps",
duration_as_ms(&transfer_start.elapsed()),
txs as f32 / (duration_as_s(&transfer_start.elapsed()))
);

*last_id = client.get_last_id();
}

fn main() {
env_logger::init();
let mut threads = 4usize;
let mut num_nodes = 1usize;
let mut time_sec = 60;

let mut opts = Options::new();
opts.optopt("l", "", "leader", "leader.json");
opts.optopt("c", "", "client port", "port");
opts.optopt("t", "", "number of threads", &format!("{}", threads));
opts.optflag("d", "dyn", "detect network address dynamically");
opts.optopt(
"s",
"",
"send transactions for this many seconds",
&format!("{}", time_sec),
);
opts.optopt(
"n",
"",
Expand Down Expand Up @@ -83,6 +189,9 @@ fn main() {
if matches.opt_present("n") {
num_nodes = matches.opt_str("n").unwrap().parse().expect("integer");
}
if matches.opt_present("s") {
time_sec = matches.opt_str("s").unwrap().parse().expect("integer");
}

let leader = if matches.opt_present("l") {
read_leader(matches.opt_str("l").unwrap())
Expand Down Expand Up @@ -122,7 +231,7 @@ fn main() {
let mut client = mk_client(&client_addr, &leader);

println!("Get last ID...");
let last_id = client.get_last_id();
let mut last_id = client.get_last_id();
println!("Got last ID {:?}", last_id);

let mut seed = [0u8; 32];
Expand All @@ -134,98 +243,69 @@ fn main() {
let keypairs = rnd.gen_n_keypairs(demo.num_accounts);
let keypair_pairs: Vec<_> = keypairs.chunks(2).collect();

println!("Signing transactions...");
let now = Instant::now();
let transactions: Vec<_> = keypair_pairs
.into_par_iter()
.map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id))
.collect();
let duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = txs as f64 / ns as f64;
let nsps = ns as f64 / txs as f64;
println!(
"Done. {} thousand signatures per second, {}us per signature",
bsps * 1_000_000_f64,
nsps / 1_000_f64
);

let first_count = client.transaction_count();
println!("initial count {}", first_count);

println!("Transfering {} transactions in {} batches", txs, threads);
let sz = transactions.len() / threads;
let chunks: Vec<_> = transactions.chunks(sz).collect();
chunks.into_par_iter().for_each(|txs| {
println!(
"Transferring 1 unit {} times... to {:?}",
txs.len(),
leader.transactions_addr
);
let client = mk_client(&client_addr, &leader);
for tx in txs {
client.transfer_signed(tx.clone()).unwrap();
}
});
println!("Sampling tps every second...",);

// Setup a thread per validator to sample every period
// collect the max transaction rate and total tx count seen
let maxes = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
println!("Sampling tps every second...",);
let maxes: Vec<_> = validators
.into_par_iter()
.map(|val| {
let mut client = mk_client(&client_addr, &val);
let mut now = Instant::now();
let mut initial_tx_count = client.transaction_count();
let mut max_tps = 0.0;
let mut total = 0;
for i in 0..100 {
let tx_count = client.transaction_count();
let duration = now.elapsed();
now = Instant::now();
let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count;
println!(
"{}: Transactions processed {}",
val.transactions_addr, sample
);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (sample * 1_000_000_000) as f64 / ns as f64;
if tps > max_tps {
max_tps = tps;
}
println!("{}: {} tps", val.transactions_addr, tps);
total = tx_count - first_count;
println!(
"{}: Total Transactions processed {}",
val.transactions_addr, total
);
if total == transactions.len() as u64 {
break;
}
if i > 20 && sample == 0 {
break;
}
sleep(Duration::new(sample_period, 0));
}
(max_tps, total)
let v_threads: Vec<_> = validators
.into_iter()
.map(|v| {
let exit = signal.clone();
let thread_addr = client_addr.clone();
let maxes = maxes.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_tx_count(thread_addr, exit, maxes, first_count, v, sample_period);
})
.unwrap()
})
.collect();

// generate and send transactions for the specified duration
let time = Duration::new(time_sec, 0);
let now = Instant::now();
while now.elapsed() < time {
generate_and_send_txs(
&mut client,
&keypair_pairs,
&leader,
txs,
&mut last_id,
threads,
client_addr.clone(),
);
}

// Stop the sampling threads so it will collect the stats
signal.store(true, Ordering::Relaxed);
for t in v_threads {
t.join().unwrap();
}

// Compute/report stats
let mut max_of_maxes = 0.0;
let mut total_txs = 0;
for (max, txs) in &maxes {
for (max, txs) in maxes.read().unwrap().iter() {
if *max > max_of_maxes {
max_of_maxes = *max;
}
total_txs += *txs;
}
println!(
"\nHighest TPS: {} sampling period {}s total transactions: {} clients: {}",
"\nHighest TPS: {:.2} sampling period {}s total transactions: {} clients: {}",
max_of_maxes,
sample_period,
total_txs,
maxes.len()
maxes.read().unwrap().len()
);
signal.store(true, Ordering::Relaxed);

// join the crdt client threads
for t in c_threads {
t.join().unwrap();
}
Expand All @@ -237,6 +317,10 @@ fn mk_client(locked_addr: &Arc<RwLock<SocketAddr>>, r: &ReplicatedData) -> ThinC
let transactions_socket = UdpSocket::bind(addr.clone()).unwrap();
addr.set_port(port + 1);
let requests_socket = UdpSocket::bind(addr.clone()).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(1, 0)))
.unwrap();

addr.set_port(port + 2);
ThinClient::new(
r.requests_addr,
Expand Down
2 changes: 1 addition & 1 deletion src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub type BlobRecycler = Recycler<Blob>;

pub const NUM_PACKETS: usize = 1024 * 8;
pub const BLOB_SIZE: usize = 64 * 1024;
pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_ID_END;
pub const BLOB_DATA_SIZE: usize = BLOB_SIZE - BLOB_HEADER_SIZE;
pub const PACKET_DATA_SIZE: usize = 256;
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_DATA_SIZE) / BLOB_SIZE;

Expand Down
34 changes: 19 additions & 15 deletions src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,19 @@ impl ThinClient {
let req = Request::GetTransactionCount;
let data =
serialize(&req).expect("serialize GetTransactionCount in pub fn transaction_count");
self.requests_socket
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn transaction_count");
let mut done = false;
while !done {
let resp = self.recv_response().expect("transaction count dropped");
info!("recv_response {:?}", resp);
if let &Response::TransactionCount { .. } = &resp {
done = true;
self.requests_socket
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn transaction_count");

if let Ok(resp) = self.recv_response() {
info!("recv_response {:?}", resp);
if let &Response::TransactionCount { .. } = &resp {
done = true;
}
self.process_response(resp);
}
self.process_response(resp);
}
self.transaction_count
}
Expand All @@ -142,16 +144,18 @@ impl ThinClient {
info!("get_last_id");
let req = Request::GetLastId;
let data = serialize(&req).expect("serialize GetLastId in pub fn get_last_id");
self.requests_socket
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn get_last_id");
let mut done = false;
while !done {
let resp = self.recv_response().expect("get_last_id response");
if let &Response::LastId { .. } = &resp {
done = true;
self.requests_socket
.send_to(&data, &self.requests_addr)
.expect("buffer error in pub fn get_last_id");

if let Ok(resp) = self.recv_response() {
if let &Response::LastId { .. } = &resp {
done = true;
}
self.process_response(resp);
}
self.process_response(resp);
}
self.last_id.expect("some last_id")
}
Expand Down