diff --git a/Cargo.lock b/Cargo.lock index d00b27a..7348958 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -483,16 +483,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "ctrlc" -version = "3.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b467862cc8610ca6fc9a1532d7777cee0804e678ab45410897b9396495994a0b" -dependencies = [ - "nix", - "windows-sys 0.52.0", -] - [[package]] name = "darling" version = "0.20.8" @@ -1030,7 +1020,7 @@ dependencies = [ [[package]] name = "latte-cli" -version = "0.26.0" +version = "0.27.0" dependencies = [ "anyhow", "base64 0.22.1", @@ -1038,7 +1028,6 @@ dependencies = [ "clap", "console", "cpu-time", - "ctrlc", "err-derive", "futures", "hdrhistogram", @@ -1224,17 +1213,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "nix" -version = "0.27.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" -dependencies = [ - "bitflags 2.4.2", - "cfg-if", - "libc", -] - [[package]] name = "nom" version = "7.1.3" @@ -1935,8 +1913,6 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "scylla" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9439d92eea9f86c07175c819c3a129ca28b02477b47df26db354a1f4ea7ee276" dependencies = [ "arc-swap", "async-trait", @@ -1968,8 +1944,6 @@ dependencies = [ [[package]] name = "scylla-cql" version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64037fb9d9c59ae15137fff9a56c4d528908dfd38d09e75b5f8e56e3894966dd" dependencies = [ "async-trait", "byteorder", @@ -1985,8 +1959,6 @@ dependencies = [ [[package]] name = "scylla-macros" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e5fe1d389adebe6a1a27bce18b81a65ff18c25d58a795de490e18b0e7a27b9f" dependencies = [ "darling", "proc-macro2", @@ -2066,6 +2038,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "simba" version = "0.8.1" @@ -2286,6 +2267,7 @@ dependencies = [ "num_cpus", "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", diff --git a/Cargo.toml b/Cargo.toml index f969d62..8484c3b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "latte-cli" description = "A database benchmarking tool for Apache Cassandra" -version = "0.26.0" +version = "0.27.0" authors = ["Piotr Kołaczkowski "] edition = "2021" readme = "README.md" @@ -21,7 +21,6 @@ chrono = { version = "0.4.18", features = ["serde"] } clap = { version = "4", features = ["derive", "cargo", "env"] } console = "0.15.0" cpu-time = "1.0.0" -ctrlc = "3.2.1" err-derive = "0.3" futures = "0.3" hdrhistogram = "7.1.0" @@ -39,7 +38,7 @@ rand = "0.8" regex = "1.5" rune = "0.12" rust-embed = "8" -scylla = { version = "0.13", features = ["ssl"] } +scylla = { path = "../scylla-rust-driver/scylla", features = ["ssl"] } search_path = "0.1" serde = { version = "1.0.116", features = ["derive"] } serde_json = "1.0.57" @@ -49,7 +48,7 @@ strum = { version = "0.26", features = ["derive"] } strum_macros = "0.26" time = "0.3" thiserror = "1.0.26" -tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "parking_lot"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "parking_lot", "signal"] } tokio-stream = "0.1" tracing = "0.1" tracing-subscriber = "0.3" diff --git a/src/error.rs b/src/error.rs index 88e8e8e..5e7bbb4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,5 @@ use crate::context::CassError; +use crate::stats::BenchmarkStats; use err_derive::*; use hdrhistogram::serialization::interval_log::IntervalLogWriterError; use hdrhistogram::serialization::V2DeflateSerializeError; @@ -37,7 +38,7 @@ pub enum LatteError { HdrLogWrite(#[source] IntervalLogWriterError), #[error(display = "Interrupted")] - Interrupted, + Interrupted(Box), } pub type Result = std::result::Result; diff --git a/src/exec.rs b/src/exec.rs index 7097b9e..ef2a2ec 100644 --- a/src/exec.rs +++ b/src/exec.rs @@ -11,10 +11,10 @@ use std::sync::Arc; use std::time::Instant; use tokio_stream::wrappers::IntervalStream; -use crate::error::Result; +use crate::error::{LatteError, Result}; use crate::{ - BenchmarkStats, BoundedCycleCounter, InterruptHandler, Interval, Progress, Recorder, Sampler, - Workload, WorkloadStats, + BenchmarkStats, BoundedCycleCounter, Interval, Progress, Recorder, Sampler, Workload, + WorkloadStats, }; /// Returns a stream emitting `rate` events per second. @@ -43,7 +43,6 @@ async fn run_stream( cycle_counter: BoundedCycleCounter, concurrency: NonZeroUsize, sampling: Interval, - interrupt: Arc, progress: Arc>, mut out: Sender>, ) { @@ -68,9 +67,6 @@ async fn run_stream( return; } } - if interrupt.is_interrupted() { - break; - } } // Send the statistics of remaining requests sampler.finish().await; @@ -88,7 +84,6 @@ fn spawn_stream( sampling: Interval, workload: Workload, iter_counter: BoundedCycleCounter, - interrupt: Arc, progress: Arc>, ) -> Receiver> { let (tx, rx) = channel(1); @@ -103,7 +98,6 @@ fn spawn_stream( iter_counter, concurrency, sampling, - interrupt, progress, tx, ) @@ -117,7 +111,6 @@ fn spawn_stream( iter_counter, concurrency, sampling, - interrupt, progress, tx, ) @@ -169,7 +162,6 @@ pub async fn par_execute( exec_options: &ExecutionOptions, sampling: Interval, workload: Workload, - signals: Arc, show_progress: bool, ) -> Result { let thread_count = exec_options.threads.get(); @@ -196,29 +188,31 @@ pub async fn par_execute( sampling, workload.clone()?, deadline.share(), - signals.clone(), progress.clone(), ); streams.push(s); } loop { - let partial_stats: Vec<_> = receive_one_of_each(&mut streams) - .await - .into_iter() - .try_collect()?; - - if partial_stats.is_empty() { - break; - } + tokio::select! { + partial_stats = receive_one_of_each(&mut streams) => { + let partial_stats: Vec<_> = partial_stats.into_iter().try_collect()?; + if partial_stats.is_empty() { + break Ok(stats.finish()); + } + + let aggregate = stats.record(&partial_stats); + if sampling.is_bounded() { + progress.set_visible(false); + println!("{aggregate}"); + progress.set_visible(show_progress); + } + } - let aggregate = stats.record(&partial_stats); - if sampling.is_bounded() { - progress.set_visible(false); - println!("{aggregate}"); - progress.set_visible(show_progress); + _ = tokio::signal::ctrl_c() => { + progress.set_visible(false); + break Err(LatteError::Interrupted(Box::new(stats.finish()))); + } } } - - Ok(stats.finish()) } diff --git a/src/histogram.rs b/src/histogram.rs index ba90630..6a320a9 100644 --- a/src/histogram.rs +++ b/src/histogram.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Deserializer, Serialize}; /// A wrapper for HDR histogram that allows us to serialize/deserialize it to/from /// a base64 encoded string we can store in JSON report. +#[derive(Debug)] pub struct SerializableHistogram(pub Histogram); impl Serialize for SerializableHistogram { diff --git a/src/interrupt.rs b/src/interrupt.rs deleted file mode 100644 index 9ce27dd..0000000 --- a/src/interrupt.rs +++ /dev/null @@ -1,21 +0,0 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - -/// Notifies about received Ctrl-C signal -pub struct InterruptHandler { - interrupted: Arc, -} - -impl InterruptHandler { - pub fn install() -> InterruptHandler { - let cell = Arc::new(AtomicBool::new(false)); - let cell_ref = cell.clone(); - let _ = ctrlc::set_handler(move || cell_ref.store(true, Ordering::Relaxed)); - InterruptHandler { interrupted: cell } - } - - /// Returns true if Ctrl-C was pressed - pub fn is_interrupted(&self) -> bool { - self.interrupted.load(Ordering::Relaxed) - } -} diff --git a/src/main.rs b/src/main.rs index dfa2e3f..1f0691f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,6 @@ use std::fs::File; use std::io::{stdout, Write}; use std::path::{Path, PathBuf}; use std::process::exit; -use std::sync::Arc; use std::time::Duration; use clap::Parser; @@ -24,7 +23,6 @@ use crate::context::{CassError, CassErrorKind, Context, SessionStats}; use crate::cycle::BoundedCycleCounter; use crate::error::{LatteError, Result}; use crate::exec::{par_execute, ExecutionOptions}; -use crate::interrupt::InterruptHandler; use crate::plot::plot_graph; use crate::progress::Progress; use crate::report::{Report, RunConfigCmp}; @@ -38,7 +36,6 @@ mod cycle; mod error; mod exec; mod histogram; -mod interrupt; mod plot; mod progress; mod report; @@ -170,7 +167,6 @@ async fn load(conf: LoadCommand) -> Result<()> { } } - let interrupt = Arc::new(InterruptHandler::install()); eprintln!("info: Loading data..."); let loader = Workload::new(session.clone()?, program.clone(), FnRef::new(LOAD_FN)); let load_options = ExecutionOptions { @@ -184,7 +180,6 @@ async fn load(conf: LoadCommand) -> Result<()> { &load_options, config::Interval::Unbounded, loader, - interrupt.clone(), !conf.quiet, ) .await?; @@ -228,7 +223,6 @@ async fn run(conf: RunCommand) -> Result<()> { } let runner = Workload::new(session.clone()?, program.clone(), function); - let interrupt = Arc::new(InterruptHandler::install()); if conf.warmup_duration.is_not_zero() { eprintln!("info: Warming up..."); let warmup_options = ExecutionOptions { @@ -242,16 +236,11 @@ async fn run(conf: RunCommand) -> Result<()> { &warmup_options, Interval::Unbounded, runner.clone()?, - interrupt.clone(), !conf.quiet, ) .await?; } - if interrupt.is_interrupted() { - return Err(LatteError::Interrupted); - } - eprintln!("info: Running benchmark..."); println!( @@ -270,15 +259,21 @@ async fn run(conf: RunCommand) -> Result<()> { }; report::print_log_header(); - let stats = par_execute( + let stats = match par_execute( "Running...", &exec_options, conf.sampling_interval, runner, - interrupt.clone(), !conf.quiet, ) - .await?; + .await + { + Ok(stats) => stats, + Err(LatteError::Interrupted(stats)) => *stats, + Err(e) => { + return Err(e); + } + }; let stats_cmp = BenchmarkCmp { v1: &stats, diff --git a/src/stats.rs b/src/stats.rs index 4326b21..91d7b42 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -259,7 +259,7 @@ impl Percentile { } /// Records basic statistics for a sample (a group) of requests -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct Sample { pub time_s: f32, pub duration_s: f32, @@ -442,7 +442,7 @@ impl Log { } } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct Bucket { pub percentile: f64, pub duration_ms: f64, @@ -450,7 +450,7 @@ pub struct Bucket { pub cumulative_count: u64, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct TimeDistribution { pub mean: Mean, pub percentiles: Vec, @@ -458,7 +458,7 @@ pub struct TimeDistribution { } /// Stores the final statistics of the test run. -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct BenchmarkStats { pub start_time: DateTime, pub end_time: DateTime,