Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix error logging #90

Merged
merged 2 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ use clap::builder::PossibleValue;
use clap::{Parser, ValueEnum};
use serde::{Deserialize, Serialize};

/// Limit of retry errors to be kept and then printed in scope of a sampling interval
pub const PRINT_RETRY_ERROR_LIMIT: u64 = 5;

/// Parse a single key-value pair
fn parse_key_val<T, U>(s: &str) -> Result<(T, U), anyhow::Error>
where
Expand Down Expand Up @@ -58,7 +55,15 @@ impl Interval {
}
}

pub fn seconds(&self) -> Option<f32> {
pub fn period(&self) -> Option<tokio::time::Duration> {
if let Interval::Time(d) = self {
Some(*d)
} else {
None
}
}

pub fn period_secs(&self) -> Option<f32> {
if let Interval::Time(d) = self {
Some(d.as_secs_f32())
} else {
Expand Down
146 changes: 48 additions & 98 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::fs::File;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::io;
use std::io::{BufRead, BufReader, ErrorKind, Read};
Expand Down Expand Up @@ -38,7 +39,7 @@ use tokio::time::{Duration, Instant};
use try_lock::TryLock;
use uuid::{Variant, Version};

use crate::config::{ConnectionConf, RetryInterval, PRINT_RETRY_ERROR_LIMIT};
use crate::config::{ConnectionConf, RetryInterval};
use crate::LatteError;

fn ssl_context(conf: &&ConnectionConf) -> Result<Option<SslContext>, CassError> {
Expand Down Expand Up @@ -197,13 +198,6 @@ impl CassError {
};
CassError(kind)
}

fn query_retries_exceeded(retry_number: u64) -> CassError {
CassError(CassErrorKind::QueryRetriesExceeded(format!(
"Max retry attempts ({}) reached",
retry_number
)))
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -301,10 +295,9 @@ impl std::error::Error for CassError {}
#[derive(Clone, Debug)]
pub struct SessionStats {
pub req_count: u64,
pub retry_errors: HashSet<String>,
pub retry_error_count: u64,
pub req_errors: HashSet<String>,
pub req_error_count: u64,
pub req_retry_count: u64,
pub row_count: u64,
pub queue_length: u64,
pub mean_queue_length: f32,
Expand All @@ -325,11 +318,17 @@ impl SessionStats {
Instant::now()
}

pub fn complete_request(&mut self, duration: Duration, rs: &Result<QueryResult, QueryError>) {
pub fn complete_request(
&mut self,
duration: Duration,
rs: &Result<QueryResult, QueryError>,
retries: u64,
) {
self.queue_length -= 1;
let duration_ns = duration.as_nanos().clamp(1, u64::MAX as u128) as u64;
self.resp_times_ns.record(duration_ns).unwrap();
self.req_count += 1;
self.req_retry_count += retries;
match rs {
Ok(rs) => self.row_count += rs.rows.as_ref().map(|r| r.len()).unwrap_or(0) as u64,
Err(e) => {
Expand All @@ -339,21 +338,13 @@ impl SessionStats {
}
}

pub fn store_retry_error(&mut self, error_str: String) {
self.retry_error_count += 1;
if self.retry_error_count <= PRINT_RETRY_ERROR_LIMIT {
self.retry_errors.insert(error_str);
}
}

/// Resets all accumulators
pub fn reset(&mut self) {
self.req_error_count = 0;
self.row_count = 0;
self.req_count = 0;
self.req_retry_count = 0;
self.mean_queue_length = 0.0;
self.retry_error_count = 0;
self.retry_errors.clear();
self.req_errors.clear();
self.resp_times_ns.clear();

Expand All @@ -366,10 +357,9 @@ impl Default for SessionStats {
fn default() -> Self {
SessionStats {
req_count: 0,
retry_errors: HashSet::new(),
retry_error_count: 0,
req_errors: HashSet::new(),
req_error_count: 0,
req_retry_count: 0,
row_count: 0,
queue_length: 0,
mean_queue_length: 0.0,
Expand All @@ -394,38 +384,6 @@ pub fn get_exponential_retry_interval(
std::cmp::min(current_interval as u64, max_interval)
}

pub async fn handle_retry_error(
ctxt: &Context,
current_attempt_num: u64,
current_error: CassError,
) {
let current_retry_interval = get_exponential_retry_interval(
ctxt.retry_interval.min_ms,
ctxt.retry_interval.max_ms,
current_attempt_num,
);

let mut next_attempt_str = String::new();
let is_last_attempt = current_attempt_num == ctxt.retry_number;
if !is_last_attempt {
next_attempt_str += &format!("[Retry in {} ms]", current_retry_interval);
}
let err_msg = format!(
"{}: [ERROR][Attempt {}/{}]{} {}",
Utc::now().format("%Y-%m-%d %H:%M:%S%.3f"),
current_attempt_num,
ctxt.retry_number,
next_attempt_str,
current_error,
);
if !is_last_attempt {
ctxt.stats.try_lock().unwrap().store_retry_error(err_msg);
tokio::time::sleep(Duration::from_millis(current_retry_interval)).await;
} else {
eprintln!("{}", err_msg);
}
}

/// This is the main object that a workload script uses to interface with the outside world.
/// It also tracks query execution metrics such as number of requests, rows, response times etc.
#[derive(Any)]
Expand Down Expand Up @@ -519,26 +477,9 @@ impl Context {

/// Executes an ad-hoc CQL statement with no parameters. Does not prepare.
pub async fn execute(&self, cql: &str) -> Result<(), CassError> {
for current_attempt_num in 0..self.retry_number + 1 {
let start_time = self.stats.try_lock().unwrap().start_request();
let rs = self.session.query(cql, ()).await;
let duration = Instant::now() - start_time;
match rs {
Ok(_) => {}
Err(e) => {
let current_error = CassError::query_execution_error(cql, &[], e.clone());
handle_retry_error(self, current_attempt_num, current_error).await;
continue;
}
}
self.stats
.try_lock()
.unwrap()
.complete_request(duration, &rs);
rs.map_err(|e| CassError::query_execution_error(cql, &[], e.clone()))?;
return Ok(());
}
Err(CassError::query_retries_exceeded(self.retry_number))
let rs = self.execute_inner(|| self.session.query(cql, ())).await;
rs.map_err(|e| CassError::query_execution_error(cql, &[], e.clone()))?;
Ok(())
}

/// Executes a statement prepared and registered earlier by a call to `prepare`.
Expand All @@ -549,32 +490,41 @@ impl Context {
.ok_or_else(|| CassError(CassErrorKind::PreparedStatementNotFound(key.to_string())))?;

let params = bind::to_scylla_query_params(&params, statement.get_variable_col_specs())?;
for current_attempt_num in 0..self.retry_number + 1 {
let start_time = self.stats.try_lock().unwrap().start_request();
let rs = self.session.execute(statement, params.clone()).await;
let duration = Instant::now() - start_time;
match rs {
Ok(_) => {}
Err(e) => {
let current_error = CassError::query_execution_error(
statement.get_statement(),
&params,
e.clone(),
);
handle_retry_error(self, current_attempt_num, current_error).await;
continue;
}
let rs = self
.execute_inner(|| self.session.execute(statement, params.clone()))
.await;

rs.map_err(|e| CassError::query_execution_error(statement.get_statement(), &params, e))?;
Ok(())
}

async fn execute_inner<R>(&self, f: impl Fn() -> R) -> Result<QueryResult, QueryError>
where
R: Future<Output = Result<QueryResult, QueryError>>,
{
let start_time = self.stats.try_lock().unwrap().start_request();

let mut rs = Err(QueryError::TimeoutError);
let mut attempts = 0;
while attempts <= self.retry_number + 1 && rs.is_err() {
if attempts > 0 {
let current_retry_interval = get_exponential_retry_interval(
self.retry_interval.min_ms,
self.retry_interval.max_ms,
attempts,
);
tokio::time::sleep(Duration::from_millis(current_retry_interval)).await;
}
self.stats
.try_lock()
.unwrap()
.complete_request(duration, &rs);
rs.map_err(|e| {
CassError::query_execution_error(statement.get_statement(), &params, e)
})?;
return Ok(());
rs = f().await;
attempts += 1;
}
Err(CassError::query_retries_exceeded(self.retry_number))

let duration = Instant::now() - start_time;
self.stats
.try_lock()
.unwrap()
.complete_request(duration, &rs, attempts - 1);
rs
}

/// Returns the current accumulated request stats snapshot and resets the stats.
Expand Down
19 changes: 7 additions & 12 deletions src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,11 @@ async fn run_stream<T>(
progress: Arc<StatusLine<Progress>>,
mut out: Sender<Result<WorkloadStats>>,
) {
workload.reset(Instant::now());

let mut iter_counter = cycle_counter;
let sample_size = sampling.count().unwrap_or(u64::MAX);
let sample_duration = sampling.period().unwrap_or(tokio::time::Duration::MAX);

let (sample_size, sample_duration) = match sampling {
Interval::Count(cnt) => (cnt, tokio::time::Duration::MAX),
Interval::Time(duration) => (u64::MAX, duration),
Interval::Unbounded => (u64::MAX, tokio::time::Duration::MAX),
};

let mut result_stream = stream
let mut stats_stream = stream
.map(|_| iter_counter.next())
.take_while(|i| ready(i.is_some()))
// unconstrained to workaround quadratic complexity of buffer_unordered ()
Expand All @@ -77,13 +71,14 @@ async fn run_stream<T>(
})
.map(|errors| (workload.take_stats(Instant::now()), errors));

while let Some((stats, errors)) = result_stream.next().await {
workload.reset(Instant::now());
while let Some((stats, errors)) = stats_stream.next().await {
if out.send(Ok(stats)).await.is_err() {
break;
return;
}
for err in errors {
if out.send(Err(err)).await.is_err() {
break;
return;
}
}
}
Expand Down
Loading
Loading