Skip to content

Commit

Permalink
Make interrupting by Ctrl-C more reliable
Browse files Browse the repository at this point in the history
The retrying logic made Ctrl-C handling a bit broken,
because the streams could be terminated only at the
complete cycles. If a query retries for a long time
it prevented terminating the program.

This commit implements interruption logic in
a different way, by using tokio::signal
and tokio::select! so all streams are terminated
immediately, regardless of their progress.
  • Loading branch information
pkolaczk committed Jun 21, 2024
1 parent 206d6ee commit 0c7e80e
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 100 deletions.
40 changes: 11 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "latte-cli"
description = "A database benchmarking tool for Apache Cassandra"
version = "0.26.0"
version = "0.27.0"
authors = ["Piotr Kołaczkowski <[email protected]>"]
edition = "2021"
readme = "README.md"
Expand All @@ -21,7 +21,6 @@ chrono = { version = "0.4.18", features = ["serde"] }
clap = { version = "4", features = ["derive", "cargo", "env"] }
console = "0.15.0"
cpu-time = "1.0.0"
ctrlc = "3.2.1"
err-derive = "0.3"
futures = "0.3"
hdrhistogram = "7.1.0"
Expand All @@ -39,7 +38,7 @@ rand = "0.8"
regex = "1.5"
rune = "0.12"
rust-embed = "8"
scylla = { version = "0.13", features = ["ssl"] }
scylla = { path = "../scylla-rust-driver/scylla", features = ["ssl"] }
search_path = "0.1"
serde = { version = "1.0.116", features = ["derive"] }
serde_json = "1.0.57"
Expand All @@ -49,7 +48,7 @@ strum = { version = "0.26", features = ["derive"] }
strum_macros = "0.26"
time = "0.3"
thiserror = "1.0.26"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "parking_lot"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "parking_lot", "signal"] }
tokio-stream = "0.1"
tracing = "0.1"
tracing-subscriber = "0.3"
Expand Down
3 changes: 2 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::context::CassError;
use crate::stats::BenchmarkStats;
use err_derive::*;
use hdrhistogram::serialization::interval_log::IntervalLogWriterError;
use hdrhistogram::serialization::V2DeflateSerializeError;
Expand Down Expand Up @@ -37,7 +38,7 @@ pub enum LatteError {
HdrLogWrite(#[source] IntervalLogWriterError<V2DeflateSerializeError>),

#[error(display = "Interrupted")]
Interrupted,
Interrupted(Box<BenchmarkStats>),
}

pub type Result<T> = std::result::Result<T, LatteError>;
48 changes: 21 additions & 27 deletions src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use std::sync::Arc;
use std::time::Instant;
use tokio_stream::wrappers::IntervalStream;

use crate::error::Result;
use crate::error::{LatteError, Result};
use crate::{
BenchmarkStats, BoundedCycleCounter, InterruptHandler, Interval, Progress, Recorder, Sampler,
Workload, WorkloadStats,
BenchmarkStats, BoundedCycleCounter, Interval, Progress, Recorder, Sampler, Workload,
WorkloadStats,
};

/// Returns a stream emitting `rate` events per second.
Expand Down Expand Up @@ -43,7 +43,6 @@ async fn run_stream<T>(
cycle_counter: BoundedCycleCounter,
concurrency: NonZeroUsize,
sampling: Interval,
interrupt: Arc<InterruptHandler>,
progress: Arc<StatusLine<Progress>>,
mut out: Sender<Result<WorkloadStats>>,
) {
Expand All @@ -68,9 +67,6 @@ async fn run_stream<T>(
return;
}
}
if interrupt.is_interrupted() {
break;
}
}
// Send the statistics of remaining requests
sampler.finish().await;
Expand All @@ -88,7 +84,6 @@ fn spawn_stream(
sampling: Interval,
workload: Workload,
iter_counter: BoundedCycleCounter,
interrupt: Arc<InterruptHandler>,
progress: Arc<StatusLine<Progress>>,
) -> Receiver<Result<WorkloadStats>> {
let (tx, rx) = channel(1);
Expand All @@ -103,7 +98,6 @@ fn spawn_stream(
iter_counter,
concurrency,
sampling,
interrupt,
progress,
tx,
)
Expand All @@ -117,7 +111,6 @@ fn spawn_stream(
iter_counter,
concurrency,
sampling,
interrupt,
progress,
tx,
)
Expand Down Expand Up @@ -169,7 +162,6 @@ pub async fn par_execute(
exec_options: &ExecutionOptions,
sampling: Interval,
workload: Workload,
signals: Arc<InterruptHandler>,
show_progress: bool,
) -> Result<BenchmarkStats> {
let thread_count = exec_options.threads.get();
Expand All @@ -196,29 +188,31 @@ pub async fn par_execute(
sampling,
workload.clone()?,
deadline.share(),
signals.clone(),
progress.clone(),
);
streams.push(s);
}

loop {
let partial_stats: Vec<_> = receive_one_of_each(&mut streams)
.await
.into_iter()
.try_collect()?;

if partial_stats.is_empty() {
break;
}
tokio::select! {
partial_stats = receive_one_of_each(&mut streams) => {
let partial_stats: Vec<_> = partial_stats.into_iter().try_collect()?;
if partial_stats.is_empty() {
break Ok(stats.finish());
}

let aggregate = stats.record(&partial_stats);
if sampling.is_bounded() {
progress.set_visible(false);
println!("{aggregate}");
progress.set_visible(show_progress);
}
}

let aggregate = stats.record(&partial_stats);
if sampling.is_bounded() {
progress.set_visible(false);
println!("{aggregate}");
progress.set_visible(show_progress);
_ = tokio::signal::ctrl_c() => {
progress.set_visible(false);
break Err(LatteError::Interrupted(Box::new(stats.finish())));
}
}
}

Ok(stats.finish())
}
1 change: 1 addition & 0 deletions src/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use serde::{Deserialize, Deserializer, Serialize};

/// A wrapper for HDR histogram that allows us to serialize/deserialize it to/from
/// a base64 encoded string we can store in JSON report.
#[derive(Debug)]
pub struct SerializableHistogram(pub Histogram<u64>);

impl Serialize for SerializableHistogram {
Expand Down
21 changes: 0 additions & 21 deletions src/interrupt.rs

This file was deleted.

23 changes: 9 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::fs::File;
use std::io::{stdout, Write};
use std::path::{Path, PathBuf};
use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use clap::Parser;
Expand All @@ -24,7 +23,6 @@ use crate::context::{CassError, CassErrorKind, Context, SessionStats};
use crate::cycle::BoundedCycleCounter;
use crate::error::{LatteError, Result};
use crate::exec::{par_execute, ExecutionOptions};
use crate::interrupt::InterruptHandler;
use crate::plot::plot_graph;
use crate::progress::Progress;
use crate::report::{Report, RunConfigCmp};
Expand All @@ -38,7 +36,6 @@ mod cycle;
mod error;
mod exec;
mod histogram;
mod interrupt;
mod plot;
mod progress;
mod report;
Expand Down Expand Up @@ -170,7 +167,6 @@ async fn load(conf: LoadCommand) -> Result<()> {
}
}

let interrupt = Arc::new(InterruptHandler::install());
eprintln!("info: Loading data...");
let loader = Workload::new(session.clone()?, program.clone(), FnRef::new(LOAD_FN));
let load_options = ExecutionOptions {
Expand All @@ -184,7 +180,6 @@ async fn load(conf: LoadCommand) -> Result<()> {
&load_options,
config::Interval::Unbounded,
loader,
interrupt.clone(),
!conf.quiet,
)
.await?;
Expand Down Expand Up @@ -228,7 +223,6 @@ async fn run(conf: RunCommand) -> Result<()> {
}

let runner = Workload::new(session.clone()?, program.clone(), function);
let interrupt = Arc::new(InterruptHandler::install());
if conf.warmup_duration.is_not_zero() {
eprintln!("info: Warming up...");
let warmup_options = ExecutionOptions {
Expand All @@ -242,16 +236,11 @@ async fn run(conf: RunCommand) -> Result<()> {
&warmup_options,
Interval::Unbounded,
runner.clone()?,
interrupt.clone(),
!conf.quiet,
)
.await?;
}

if interrupt.is_interrupted() {
return Err(LatteError::Interrupted);
}

eprintln!("info: Running benchmark...");

println!(
Expand All @@ -270,15 +259,21 @@ async fn run(conf: RunCommand) -> Result<()> {
};

report::print_log_header();
let stats = par_execute(
let stats = match par_execute(
"Running...",
&exec_options,
conf.sampling_interval,
runner,
interrupt.clone(),
!conf.quiet,
)
.await?;
.await
{
Ok(stats) => stats,
Err(LatteError::Interrupted(stats)) => *stats,
Err(e) => {
return Err(e);
}
};

let stats_cmp = BenchmarkCmp {
v1: &stats,
Expand Down
Loading

0 comments on commit 0c7e80e

Please sign in to comment.