diff --git a/src/config.rs b/src/config.rs index a8247eb..aabbf92 100644 --- a/src/config.rs +++ b/src/config.rs @@ -10,9 +10,6 @@ use clap::builder::PossibleValue; use clap::{Parser, ValueEnum}; use serde::{Deserialize, Serialize}; -/// Limit of retry errors to be kept and then printed in scope of a sampling interval -pub const PRINT_RETRY_ERROR_LIMIT: u64 = 5; - /// Parse a single key-value pair fn parse_key_val(s: &str) -> Result<(T, U), anyhow::Error> where @@ -58,7 +55,15 @@ impl Interval { } } - pub fn seconds(&self) -> Option { + pub fn period(&self) -> Option { + if let Interval::Time(d) = self { + Some(*d) + } else { + None + } + } + + pub fn period_secs(&self) -> Option { if let Interval::Time(d) = self { Some(d.as_secs_f32()) } else { diff --git a/src/context.rs b/src/context.rs index 23a560e..c6d68c4 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; use std::fs::File; +use std::future::Future; use std::hash::{Hash, Hasher}; use std::io; use std::io::{BufRead, BufReader, ErrorKind, Read}; @@ -38,7 +39,7 @@ use tokio::time::{Duration, Instant}; use try_lock::TryLock; use uuid::{Variant, Version}; -use crate::config::{ConnectionConf, RetryInterval, PRINT_RETRY_ERROR_LIMIT}; +use crate::config::{ConnectionConf, RetryInterval}; use crate::LatteError; fn ssl_context(conf: &&ConnectionConf) -> Result, CassError> { @@ -197,13 +198,6 @@ impl CassError { }; CassError(kind) } - - fn query_retries_exceeded(retry_number: u64) -> CassError { - CassError(CassErrorKind::QueryRetriesExceeded(format!( - "Max retry attempts ({}) reached", - retry_number - ))) - } } #[derive(Debug)] @@ -301,10 +295,9 @@ impl std::error::Error for CassError {} #[derive(Clone, Debug)] pub struct SessionStats { pub req_count: u64, - pub retry_errors: HashSet, - pub retry_error_count: u64, pub req_errors: HashSet, pub req_error_count: u64, + pub req_retry_count: u64, pub row_count: u64, pub queue_length: u64, pub mean_queue_length: f32, @@ -325,11 +318,17 @@ impl SessionStats { Instant::now() } - pub fn complete_request(&mut self, duration: Duration, rs: &Result) { + pub fn complete_request( + &mut self, + duration: Duration, + rs: &Result, + retries: u64, + ) { self.queue_length -= 1; let duration_ns = duration.as_nanos().clamp(1, u64::MAX as u128) as u64; self.resp_times_ns.record(duration_ns).unwrap(); self.req_count += 1; + self.req_retry_count += retries; match rs { Ok(rs) => self.row_count += rs.rows.as_ref().map(|r| r.len()).unwrap_or(0) as u64, Err(e) => { @@ -339,21 +338,13 @@ impl SessionStats { } } - pub fn store_retry_error(&mut self, error_str: String) { - self.retry_error_count += 1; - if self.retry_error_count <= PRINT_RETRY_ERROR_LIMIT { - self.retry_errors.insert(error_str); - } - } - /// Resets all accumulators pub fn reset(&mut self) { self.req_error_count = 0; self.row_count = 0; self.req_count = 0; + self.req_retry_count = 0; self.mean_queue_length = 0.0; - self.retry_error_count = 0; - self.retry_errors.clear(); self.req_errors.clear(); self.resp_times_ns.clear(); @@ -366,10 +357,9 @@ impl Default for SessionStats { fn default() -> Self { SessionStats { req_count: 0, - retry_errors: HashSet::new(), - retry_error_count: 0, req_errors: HashSet::new(), req_error_count: 0, + req_retry_count: 0, row_count: 0, queue_length: 0, mean_queue_length: 0.0, @@ -394,38 +384,6 @@ pub fn get_exponential_retry_interval( std::cmp::min(current_interval as u64, max_interval) } -pub async fn handle_retry_error( - ctxt: &Context, - current_attempt_num: u64, - current_error: CassError, -) { - let current_retry_interval = get_exponential_retry_interval( - ctxt.retry_interval.min_ms, - ctxt.retry_interval.max_ms, - current_attempt_num, - ); - - let mut next_attempt_str = String::new(); - let is_last_attempt = current_attempt_num == ctxt.retry_number; - if !is_last_attempt { - next_attempt_str += &format!("[Retry in {} ms]", current_retry_interval); - } - let err_msg = format!( - "{}: [ERROR][Attempt {}/{}]{} {}", - Utc::now().format("%Y-%m-%d %H:%M:%S%.3f"), - current_attempt_num, - ctxt.retry_number, - next_attempt_str, - current_error, - ); - if !is_last_attempt { - ctxt.stats.try_lock().unwrap().store_retry_error(err_msg); - tokio::time::sleep(Duration::from_millis(current_retry_interval)).await; - } else { - eprintln!("{}", err_msg); - } -} - /// This is the main object that a workload script uses to interface with the outside world. /// It also tracks query execution metrics such as number of requests, rows, response times etc. #[derive(Any)] @@ -519,26 +477,9 @@ impl Context { /// Executes an ad-hoc CQL statement with no parameters. Does not prepare. pub async fn execute(&self, cql: &str) -> Result<(), CassError> { - for current_attempt_num in 0..self.retry_number + 1 { - let start_time = self.stats.try_lock().unwrap().start_request(); - let rs = self.session.query(cql, ()).await; - let duration = Instant::now() - start_time; - match rs { - Ok(_) => {} - Err(e) => { - let current_error = CassError::query_execution_error(cql, &[], e.clone()); - handle_retry_error(self, current_attempt_num, current_error).await; - continue; - } - } - self.stats - .try_lock() - .unwrap() - .complete_request(duration, &rs); - rs.map_err(|e| CassError::query_execution_error(cql, &[], e.clone()))?; - return Ok(()); - } - Err(CassError::query_retries_exceeded(self.retry_number)) + let rs = self.execute_inner(|| self.session.query(cql, ())).await; + rs.map_err(|e| CassError::query_execution_error(cql, &[], e.clone()))?; + Ok(()) } /// Executes a statement prepared and registered earlier by a call to `prepare`. @@ -549,32 +490,41 @@ impl Context { .ok_or_else(|| CassError(CassErrorKind::PreparedStatementNotFound(key.to_string())))?; let params = bind::to_scylla_query_params(¶ms, statement.get_variable_col_specs())?; - for current_attempt_num in 0..self.retry_number + 1 { - let start_time = self.stats.try_lock().unwrap().start_request(); - let rs = self.session.execute(statement, params.clone()).await; - let duration = Instant::now() - start_time; - match rs { - Ok(_) => {} - Err(e) => { - let current_error = CassError::query_execution_error( - statement.get_statement(), - ¶ms, - e.clone(), - ); - handle_retry_error(self, current_attempt_num, current_error).await; - continue; - } + let rs = self + .execute_inner(|| self.session.execute(statement, params.clone())) + .await; + + rs.map_err(|e| CassError::query_execution_error(statement.get_statement(), ¶ms, e))?; + Ok(()) + } + + async fn execute_inner(&self, f: impl Fn() -> R) -> Result + where + R: Future>, + { + let start_time = self.stats.try_lock().unwrap().start_request(); + + let mut rs = Err(QueryError::TimeoutError); + let mut attempts = 0; + while attempts <= self.retry_number + 1 && rs.is_err() { + if attempts > 0 { + let current_retry_interval = get_exponential_retry_interval( + self.retry_interval.min_ms, + self.retry_interval.max_ms, + attempts, + ); + tokio::time::sleep(Duration::from_millis(current_retry_interval)).await; } - self.stats - .try_lock() - .unwrap() - .complete_request(duration, &rs); - rs.map_err(|e| { - CassError::query_execution_error(statement.get_statement(), ¶ms, e) - })?; - return Ok(()); + rs = f().await; + attempts += 1; } - Err(CassError::query_retries_exceeded(self.retry_number)) + + let duration = Instant::now() - start_time; + self.stats + .try_lock() + .unwrap() + .complete_request(duration, &rs, attempts - 1); + rs } /// Returns the current accumulated request stats snapshot and resets the stats. diff --git a/src/exec.rs b/src/exec.rs index a2c144d..5bc554f 100644 --- a/src/exec.rs +++ b/src/exec.rs @@ -52,17 +52,11 @@ async fn run_stream( progress: Arc>, mut out: Sender>, ) { - workload.reset(Instant::now()); - let mut iter_counter = cycle_counter; + let sample_size = sampling.count().unwrap_or(u64::MAX); + let sample_duration = sampling.period().unwrap_or(tokio::time::Duration::MAX); - let (sample_size, sample_duration) = match sampling { - Interval::Count(cnt) => (cnt, tokio::time::Duration::MAX), - Interval::Time(duration) => (u64::MAX, duration), - Interval::Unbounded => (u64::MAX, tokio::time::Duration::MAX), - }; - - let mut result_stream = stream + let mut stats_stream = stream .map(|_| iter_counter.next()) .take_while(|i| ready(i.is_some())) // unconstrained to workaround quadratic complexity of buffer_unordered () @@ -77,13 +71,14 @@ async fn run_stream( }) .map(|errors| (workload.take_stats(Instant::now()), errors)); - while let Some((stats, errors)) = result_stream.next().await { + workload.reset(Instant::now()); + while let Some((stats, errors)) = stats_stream.next().await { if out.send(Ok(stats)).await.is_err() { - break; + return; } for err in errors { if out.send(Err(err)).await.is_err() { - break; + return; } } } diff --git a/src/report.rs b/src/report.rs index 99f340a..cec3d6e 100644 --- a/src/report.rs +++ b/src/report.rs @@ -1,4 +1,4 @@ -use crate::config::{RunCommand, PRINT_RETRY_ERROR_LIMIT}; +use crate::config::RunCommand; use crate::stats::{ BenchmarkCmp, BenchmarkStats, Bucket, Mean, Percentile, Sample, Significance, TimeDistribution, }; @@ -547,19 +547,19 @@ impl<'a> Display for RunConfigCmp<'a> { }), self.line("Max rate", "op/s", |conf| Quantity::from(conf.rate)), self.line("Warmup", "s", |conf| { - Quantity::from(conf.warmup_duration.seconds()) + Quantity::from(conf.warmup_duration.period_secs()) }), self.line("└─", "op", |conf| { Quantity::from(conf.warmup_duration.count()) }), self.line("Run time", "s", |conf| { - Quantity::from(conf.run_duration.seconds()).with_precision(1) + Quantity::from(conf.run_duration.period_secs()).with_precision(1) }), self.line("└─", "op", |conf| { Quantity::from(conf.run_duration.count()) }), self.line("Sampling", "s", |conf| { - Quantity::from(conf.sampling_interval.seconds()).with_precision(1) + Quantity::from(conf.sampling_interval.period_secs()).with_precision(1) }), self.line("└─", "op", |conf| { Quantity::from(conf.sampling_interval.count()) @@ -567,14 +567,13 @@ impl<'a> Display for RunConfigCmp<'a> { self.line("Request timeout", "", |conf| { Quantity::from(conf.connection.request_timeout) }), - self.line("Retries", "", |_| Quantity::from("")), - self.line("┌──────┴number", "", |conf| { + self.line("Retries", "", |conf| { Quantity::from(conf.connection.retry_number) }), - self.line("├─min interval", "ms", |conf| { + self.line("├─ min interval", "ms", |conf| { Quantity::from(conf.connection.retry_interval.min_ms) }), - self.line("└─max interval", "ms", |conf| { + self.line("└─ max interval", "ms", |conf| { Quantity::from(conf.connection.retry_interval.max_ms) }), ]; @@ -588,47 +587,26 @@ impl<'a> Display for RunConfigCmp<'a> { pub fn print_log_header() { println!("{}", fmt_section_header("LOG")); - println!("{}", style(" Time ───── Throughput ───── ────────────────────────────────── Response times [ms] ───────────────────────────────────").yellow().bold().for_stdout()); - println!("{}", style(" [s] [op/s] [req/s] Min 25 50 75 90 95 99 99.9 Max").yellow().for_stdout()); + println!("{}", style(" Time Cycles Errors Throughput ───────────────────────────── Latency [ms/op] ─────────────────────────").yellow().bold().for_stdout()); + println!("{}", style(" [s] [op] [op] [op/s] Min 25 50 75 90 99 Max").yellow().for_stdout()); } impl Display for Sample { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - if self.retry_error_count > 0 { - let mut num_of_printed_errors = 0; - let mut error_msg_bunch = String::new(); - for retry_error in &self.retry_errors { - if num_of_printed_errors < PRINT_RETRY_ERROR_LIMIT { - error_msg_bunch += &format!("{}\n", retry_error); - num_of_printed_errors += 1; - } else { - break; - } - } - let num_of_dropped_errors = self.retry_error_count - num_of_printed_errors; - if num_of_dropped_errors > 0 { - error_msg_bunch += &format!( - "...number of dropped error messages per sampling period: {}", - num_of_dropped_errors, - ); - } - writeln!(f, "{}", error_msg_bunch)?; - } write!( f, - "{:8.3} {:11.0} {:11.0} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3}", + "{:8.3} {:11.0} {:11.0} {:11.0} {:9.1} {:9.1} {:9.1} {:9.1} {:9.1} {:9.1} {:9.1}", self.time_s + self.duration_s, + self.cycle_count, + self.cycle_error_count, self.cycle_throughput, - self.req_throughput, - self.resp_time_percentiles[Percentile::Min as usize], - self.resp_time_percentiles[Percentile::P25 as usize], - self.resp_time_percentiles[Percentile::P50 as usize], - self.resp_time_percentiles[Percentile::P75 as usize], - self.resp_time_percentiles[Percentile::P90 as usize], - self.resp_time_percentiles[Percentile::P95 as usize], - self.resp_time_percentiles[Percentile::P99 as usize], - self.resp_time_percentiles[Percentile::P99_9 as usize], - self.resp_time_percentiles[Percentile::Max as usize] + self.cycle_time_percentiles[Percentile::Min as usize], + self.cycle_time_percentiles[Percentile::P25 as usize], + self.cycle_time_percentiles[Percentile::P50 as usize], + self.cycle_time_percentiles[Percentile::P75 as usize], + self.cycle_time_percentiles[Percentile::P90 as usize], + self.cycle_time_percentiles[Percentile::P99 as usize], + self.cycle_time_percentiles[Percentile::Max as usize] ) } } @@ -678,6 +656,10 @@ impl<'a> Display for BenchmarkCmp<'a> { self.line("└─", "req/op", |s| { Quantity::from(s.requests_per_cycle).with_precision(1) }), + self.line("Retries", "ret", |s| Quantity::from(s.request_retry_count)), + self.line("└─", "ret/req", |s| { + Quantity::from(s.request_retry_per_request).with_precision(1) + }), self.line("Rows", "row", |s| Quantity::from(s.row_count)), self.line("└─", "row/req", |s| { Quantity::from(s.row_count_per_req).with_precision(1) diff --git a/src/stats.rs b/src/stats.rs index 91d7b42..a8b47e3 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -264,11 +264,11 @@ pub struct Sample { pub time_s: f32, pub duration_s: f32, pub cycle_count: u64, + pub cycle_error_count: u64, pub request_count: u64, - pub retry_errors: HashSet, - pub retry_error_count: u64, - pub errors: HashSet, - pub error_count: u64, + pub req_retry_count: u64, + pub req_errors: HashSet, + pub req_error_count: u64, pub row_count: u64, pub mean_queue_len: f32, pub cycle_throughput: f32, @@ -286,14 +286,14 @@ impl Sample { pub fn new(base_start_time: Instant, stats: &[WorkloadStats]) -> Sample { assert!(!stats.is_empty()); let mut cycle_count = 0; + let mut cycle_error_count = 0; let mut cycle_times_ns = Histogram::new(3).unwrap(); let mut request_count = 0; - let mut retry_errors = HashSet::new(); - let mut retry_error_count = 0; + let mut req_retry_count = 0; let mut row_count = 0; let mut errors = HashSet::new(); - let mut error_count = 0; + let mut req_error_count = 0; let mut mean_queue_len = 0.0; let mut duration_s = 0.0; let mut resp_times_ns = Histogram::new(3).unwrap(); @@ -309,15 +309,15 @@ impl Sample { if errors.len() < MAX_KEPT_ERRORS { errors.extend(ss.req_errors.iter().cloned()); } - error_count += ss.req_error_count; - retry_errors.extend(ss.retry_errors.iter().cloned()); - retry_error_count += ss.retry_error_count; + req_error_count += ss.req_error_count; + req_retry_count += ss.req_retry_count; mean_queue_len += ss.mean_queue_length / stats.len() as f32; duration_s += (s.end_time - s.start_time).as_secs_f32() / stats.len() as f32; resp_times_ns.add(&ss.resp_times_ns).unwrap(); resp_time_histogram_ns.add(&ss.resp_times_ns).unwrap(); cycle_count += fs.call_count; + cycle_error_count = fs.error_count; cycle_times_ns.add(&fs.call_times_ns).unwrap(); cycle_time_histogram_ns.add(&fs.call_times_ns).unwrap(); } @@ -328,11 +328,11 @@ impl Sample { time_s: (stats[0].start_time - base_start_time).as_secs_f32(), duration_s, cycle_count, + cycle_error_count, request_count, - retry_errors, - retry_error_count, - errors, - error_count, + req_retry_count, + req_errors: errors, + req_error_count, row_count, mean_queue_len: not_nan_f32(mean_queue_len).unwrap_or(0.0), cycle_throughput: cycle_count as f32 / duration_s, @@ -468,6 +468,8 @@ pub struct BenchmarkStats { pub cycle_count: u64, pub request_count: u64, pub requests_per_cycle: f64, + pub request_retry_count: u64, + pub request_retry_per_request: Option, pub errors: Vec, pub error_count: u64, pub errors_ratio: Option, @@ -555,8 +557,10 @@ pub struct Recorder { pub end_cpu_time: ProcessTime, pub cycle_count: u64, pub request_count: u64, + pub request_retry_count: u64, + pub request_error_count: u64, pub errors: HashSet, - pub error_count: u64, + pub cycle_error_count: u64, pub row_count: u64, pub cycle_times_ns: Histogram, pub resp_times_ns: Histogram, @@ -584,9 +588,11 @@ impl Recorder { concurrency_limit, cycle_count: 0, request_count: 0, + request_retry_count: 0, + request_error_count: 0, row_count: 0, errors: HashSet::new(), - error_count: 0, + cycle_error_count: 0, cycle_times_ns: Histogram::new(3).unwrap(), resp_times_ns: Histogram::new(3).unwrap(), } @@ -595,6 +601,7 @@ impl Recorder { /// Adds the statistics of the completed request to the already collected statistics. /// Called on completion of each sample. pub fn record(&mut self, samples: &[WorkloadStats]) -> &Sample { + assert!(!samples.is_empty()); for s in samples.iter() { self.resp_times_ns .add(&s.session_stats.resp_times_ns) @@ -605,12 +612,14 @@ impl Recorder { } let stats = Sample::new(self.start_instant, samples); self.cycle_count += stats.cycle_count; + self.cycle_error_count += stats.cycle_error_count; self.request_count += stats.request_count; + self.request_retry_count += stats.req_retry_count; + self.request_error_count += stats.req_error_count; self.row_count += stats.row_count; if self.errors.len() < MAX_KEPT_ERRORS { - self.errors.extend(stats.errors.iter().cloned()); + self.errors.extend(stats.req_errors.iter().cloned()); } - self.error_count += stats.error_count; self.log.append(stats) } @@ -631,8 +640,6 @@ impl Recorder { .duration_since(self.start_cpu_time) .as_secs_f64(); let cpu_util = 100.0 * cpu_time_s / elapsed_time_s / num_cpus::get() as f64; - let count = self.request_count + self.error_count; - let cycle_throughput = self.log.call_throughput(); let cycle_throughput_ratio = self.rate_limit.map(|r| 100.0 * cycle_throughput.value / r); let req_throughput = self.log.req_throughput(); @@ -655,9 +662,13 @@ impl Recorder { cpu_util, cycle_count: self.cycle_count, errors: self.errors.into_iter().collect(), - error_count: self.error_count, - errors_ratio: not_nan(100.0 * self.error_count as f64 / count as f64), + error_count: self.cycle_error_count, + errors_ratio: not_nan(100.0 * self.cycle_error_count as f64 / self.cycle_count as f64), request_count: self.request_count, + request_retry_count: self.request_retry_count, + request_retry_per_request: not_nan( + self.request_retry_count as f64 / self.request_count as f64, + ), requests_per_cycle: self.request_count as f64 / self.cycle_count as f64, row_count: self.row_count, row_count_per_req: not_nan(self.row_count as f64 / self.request_count as f64), diff --git a/src/workload.rs b/src/workload.rs index a9cfc6f..a5633a3 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -369,6 +369,7 @@ impl Program { #[derive(Clone, Debug)] pub struct FnStats { pub call_count: u64, + pub error_count: u64, pub call_times_ns: Histogram, } @@ -379,12 +380,21 @@ impl FnStats { .record(duration.as_nanos().clamp(1, u64::MAX as u128) as u64) .unwrap(); } + + pub fn operation_failed(&mut self, duration: Duration) { + self.call_count += 1; + self.error_count += 1; + self.call_times_ns + .record(duration.as_nanos().clamp(1, u64::MAX as u128) as u64) + .unwrap(); + } } impl Default for FnStats { fn default() -> Self { FnStats { call_count: 0, + error_count: 0, call_times_ns: Histogram::new(3).unwrap(), } } @@ -450,19 +460,25 @@ impl Workload { let result = self .program .async_call(&self.function, (context, cycle as i64)) - .await - .map(|_| ()); // erase Value, because Value is !Send + .await; let end_time = Instant::now(); let mut state = self.state.try_lock().unwrap(); - state.fn_stats.operation_completed(end_time - start_time); + let duration = end_time - start_time; match result { - Ok(_) => Ok((cycle, end_time)), + Ok(_) => { + state.fn_stats.operation_completed(duration); + Ok((cycle, end_time)) + } Err(LatteError::Cassandra(CassError(CassErrorKind::Overloaded(_, _)))) => { // don't stop on overload errors; // they are being counted by the context stats anyways + state.fn_stats.operation_failed(duration); Ok((cycle, end_time)) } - Err(e) => Err(e), + Err(e) => { + state.fn_stats.operation_failed(duration); + Err(e) + } } }