Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make interrupting by Ctrl-C more reliable #72

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 11 additions & 29 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ chrono = { version = "0.4.18", features = ["serde"] }
clap = { version = "4", features = ["derive", "cargo", "env"] }
console = "0.15.0"
cpu-time = "1.0.0"
ctrlc = "3.2.1"
err-derive = "0.3"
futures = "0.3"
hdrhistogram = "7.1.0"
Expand Down Expand Up @@ -49,7 +48,7 @@ strum = { version = "0.26", features = ["derive"] }
strum_macros = "0.26"
time = "0.3"
thiserror = "1.0.26"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "parking_lot"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "parking_lot", "signal"] }
tokio-stream = "0.1"
tracing = "0.1"
tracing-subscriber = "0.3"
Expand Down
3 changes: 2 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::context::CassError;
use crate::stats::BenchmarkStats;
use err_derive::*;
use hdrhistogram::serialization::interval_log::IntervalLogWriterError;
use hdrhistogram::serialization::V2DeflateSerializeError;
Expand Down Expand Up @@ -37,7 +38,7 @@ pub enum LatteError {
HdrLogWrite(#[source] IntervalLogWriterError<V2DeflateSerializeError>),

#[error(display = "Interrupted")]
Interrupted,
Interrupted(Box<BenchmarkStats>),
}

pub type Result<T> = std::result::Result<T, LatteError>;
48 changes: 21 additions & 27 deletions src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use std::sync::Arc;
use std::time::Instant;
use tokio_stream::wrappers::IntervalStream;

use crate::error::Result;
use crate::error::{LatteError, Result};
use crate::{
BenchmarkStats, BoundedCycleCounter, InterruptHandler, Interval, Progress, Recorder, Sampler,
Workload, WorkloadStats,
BenchmarkStats, BoundedCycleCounter, Interval, Progress, Recorder, Sampler, Workload,
WorkloadStats,
};

/// Returns a stream emitting `rate` events per second.
Expand Down Expand Up @@ -43,7 +43,6 @@ async fn run_stream<T>(
cycle_counter: BoundedCycleCounter,
concurrency: NonZeroUsize,
sampling: Interval,
interrupt: Arc<InterruptHandler>,
progress: Arc<StatusLine<Progress>>,
mut out: Sender<Result<WorkloadStats>>,
) {
Expand All @@ -68,9 +67,6 @@ async fn run_stream<T>(
return;
}
}
if interrupt.is_interrupted() {
break;
}
}
// Send the statistics of remaining requests
sampler.finish().await;
Expand All @@ -88,7 +84,6 @@ fn spawn_stream(
sampling: Interval,
workload: Workload,
iter_counter: BoundedCycleCounter,
interrupt: Arc<InterruptHandler>,
progress: Arc<StatusLine<Progress>>,
) -> Receiver<Result<WorkloadStats>> {
let (tx, rx) = channel(1);
Expand All @@ -103,7 +98,6 @@ fn spawn_stream(
iter_counter,
concurrency,
sampling,
interrupt,
progress,
tx,
)
Expand All @@ -117,7 +111,6 @@ fn spawn_stream(
iter_counter,
concurrency,
sampling,
interrupt,
progress,
tx,
)
Expand Down Expand Up @@ -169,7 +162,6 @@ pub async fn par_execute(
exec_options: &ExecutionOptions,
sampling: Interval,
workload: Workload,
signals: Arc<InterruptHandler>,
show_progress: bool,
) -> Result<BenchmarkStats> {
let thread_count = exec_options.threads.get();
Expand All @@ -196,29 +188,31 @@ pub async fn par_execute(
sampling,
workload.clone()?,
deadline.share(),
signals.clone(),
progress.clone(),
);
streams.push(s);
}

loop {
let partial_stats: Vec<_> = receive_one_of_each(&mut streams)
.await
.into_iter()
.try_collect()?;

if partial_stats.is_empty() {
break;
}
tokio::select! {
partial_stats = receive_one_of_each(&mut streams) => {
let partial_stats: Vec<_> = partial_stats.into_iter().try_collect()?;
if partial_stats.is_empty() {
break Ok(stats.finish());
}

let aggregate = stats.record(&partial_stats);
if sampling.is_bounded() {
progress.set_visible(false);
println!("{aggregate}");
progress.set_visible(show_progress);
}
}

let aggregate = stats.record(&partial_stats);
if sampling.is_bounded() {
progress.set_visible(false);
println!("{aggregate}");
progress.set_visible(show_progress);
_ = tokio::signal::ctrl_c() => {
progress.set_visible(false);
break Err(LatteError::Interrupted(Box::new(stats.finish())));
}
}
}

Ok(stats.finish())
}
1 change: 1 addition & 0 deletions src/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use serde::{Deserialize, Deserializer, Serialize};

/// A wrapper for HDR histogram that allows us to serialize/deserialize it to/from
/// a base64 encoded string we can store in JSON report.
#[derive(Debug)]
pub struct SerializableHistogram(pub Histogram<u64>);

impl Serialize for SerializableHistogram {
Expand Down
21 changes: 0 additions & 21 deletions src/interrupt.rs

This file was deleted.

23 changes: 9 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::fs::File;
use std::io::{stdout, Write};
use std::path::{Path, PathBuf};
use std::process::exit;
use std::sync::Arc;
use std::time::Duration;

use clap::Parser;
Expand All @@ -24,7 +23,6 @@ use crate::context::{CassError, CassErrorKind, Context, SessionStats};
use crate::cycle::BoundedCycleCounter;
use crate::error::{LatteError, Result};
use crate::exec::{par_execute, ExecutionOptions};
use crate::interrupt::InterruptHandler;
use crate::plot::plot_graph;
use crate::progress::Progress;
use crate::report::{Report, RunConfigCmp};
Expand All @@ -38,7 +36,6 @@ mod cycle;
mod error;
mod exec;
mod histogram;
mod interrupt;
mod plot;
mod progress;
mod report;
Expand Down Expand Up @@ -170,7 +167,6 @@ async fn load(conf: LoadCommand) -> Result<()> {
}
}

let interrupt = Arc::new(InterruptHandler::install());
eprintln!("info: Loading data...");
let loader = Workload::new(session.clone()?, program.clone(), FnRef::new(LOAD_FN));
let load_options = ExecutionOptions {
Expand All @@ -184,7 +180,6 @@ async fn load(conf: LoadCommand) -> Result<()> {
&load_options,
config::Interval::Unbounded,
loader,
interrupt.clone(),
!conf.quiet,
)
.await?;
Expand Down Expand Up @@ -228,7 +223,6 @@ async fn run(conf: RunCommand) -> Result<()> {
}

let runner = Workload::new(session.clone()?, program.clone(), function);
let interrupt = Arc::new(InterruptHandler::install());
if conf.warmup_duration.is_not_zero() {
eprintln!("info: Warming up...");
let warmup_options = ExecutionOptions {
Expand All @@ -242,16 +236,11 @@ async fn run(conf: RunCommand) -> Result<()> {
&warmup_options,
Interval::Unbounded,
runner.clone()?,
interrupt.clone(),
!conf.quiet,
)
.await?;
}

if interrupt.is_interrupted() {
return Err(LatteError::Interrupted);
}

eprintln!("info: Running benchmark...");

println!(
Expand All @@ -270,15 +259,21 @@ async fn run(conf: RunCommand) -> Result<()> {
};

report::print_log_header();
let stats = par_execute(
let stats = match par_execute(
"Running...",
&exec_options,
conf.sampling_interval,
runner,
interrupt.clone(),
!conf.quiet,
)
.await?;
.await
{
Ok(stats) => stats,
Err(LatteError::Interrupted(stats)) => *stats,
Err(e) => {
return Err(e);
}
};

let stats_cmp = BenchmarkCmp {
v1: &stats,
Expand Down
Loading
Loading