diff --git a/src/config.rs b/src/config.rs index 42aeec9..c60560e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,22 +25,29 @@ where /// Controls how long the benchmark should run. /// We can specify either a time-based duration or a number of calls to perform. +/// It is also used for controlling sampling. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub enum Duration { +pub enum Interval { Count(u64), Time(tokio::time::Duration), + Unbounded, } -impl Duration { +impl Interval { pub fn is_not_zero(&self) -> bool { match self { - Duration::Count(cnt) => *cnt > 0, - Duration::Time(d) => !d.is_zero(), + Interval::Count(cnt) => *cnt > 0, + Interval::Time(d) => !d.is_zero(), + Interval::Unbounded => false, } } + pub fn is_bounded(&self) -> bool { + !matches!(self, Interval::Unbounded) + } + pub fn count(&self) -> Option { - if let Duration::Count(c) = self { + if let Interval::Count(c) = self { Some(*c) } else { None @@ -48,7 +55,7 @@ impl Duration { } pub fn seconds(&self) -> Option { - if let Duration::Time(d) = self { + if let Interval::Time(d) = self { Some(d.as_secs_f32()) } else { None @@ -59,14 +66,14 @@ impl Duration { /// If the string is a valid integer, it is assumed to be the number of iterations. /// If the string additionally contains a time unit, e.g. "s" or "secs", it is parsed /// as time duration. -impl FromStr for Duration { +impl FromStr for Interval { type Err = String; fn from_str(s: &str) -> Result { if let Ok(i) = s.parse() { - Ok(Duration::Count(i)) + Ok(Interval::Count(i)) } else if let Ok(d) = parse_duration::parse(s) { - Ok(Duration::Time(d)) + Ok(Interval::Time(d)) } else { Err("Required integer number of iterations or time duration".to_string()) } @@ -92,7 +99,7 @@ pub struct RunCommand { default_value = "1", value_name = "TIME | COUNT" )] - pub warmup_duration: Duration, + pub warmup_duration: Interval, /// Number of iterations or time duration of the main benchmark phase #[clap( @@ -101,7 +108,7 @@ pub struct RunCommand { default_value = "60s", value_name = "TIME | COUNT" )] - pub run_duration: Duration, + pub run_duration: Interval, /// Number of worker threads used by the driver #[clap(short('t'), long, default_value = "1", value_name = "COUNT")] @@ -126,7 +133,7 @@ pub struct RunCommand { default_value = "1s", value_name = "TIME | COUNT" )] - pub sampling_period: Duration, + pub sampling_interval: Interval, /// Label that will be added to the report to help identifying the test #[clap(long("tag"), number_of_values = 1, multiple_occurrences = true)] diff --git a/src/exec.rs b/src/exec.rs new file mode 100644 index 0000000..25a0c70 --- /dev/null +++ b/src/exec.rs @@ -0,0 +1,224 @@ +//! Implementation of the main benchmarking loop + +use futures::channel::mpsc::{channel, Receiver, Sender}; +use futures::{SinkExt, Stream, StreamExt}; +use itertools::Itertools; +use status_line::StatusLine; +use std::cmp::max; +use std::future::ready; +use std::num::NonZeroUsize; +use std::sync::Arc; +use std::time::Instant; +use tokio_stream::wrappers::IntervalStream; + +use crate::error::Result; +use crate::{ + BenchmarkStats, BoundedIterationCounter, InterruptHandler, Interval, Progress, Recorder, + Sampler, Workload, WorkloadStats, +}; + +/// Returns a stream emitting `rate` events per second. +fn interval_stream(rate: f64) -> IntervalStream { + let interval = tokio::time::Duration::from_nanos(max(1, (1000000000.0 / rate) as u64)); + IntervalStream::new(tokio::time::interval(interval)) +} + +/// Runs a stream of workload iterations till completion in the context of the current task. +/// Periodically sends workload statistics to the `out` channel. +/// +/// # Parameters +/// - stream: a stream of iteration numbers; None means the end of the stream +/// - workload: defines the function to call +/// - iter_counter: shared iteration numbers provider +/// - concurrency: the maximum number of pending workload calls +/// - sampling: controls when to output workload statistics +/// - progress: progress bar notified about each successful iteration +/// - interrupt: allows for terminating the stream early +/// - out: the channel to receive workload statistics +/// +#[allow(clippy::too_many_arguments)] // todo: refactor +async fn run_stream( + stream: impl Stream + std::marker::Unpin, + workload: Workload, + iter_counter: BoundedIterationCounter, + concurrency: NonZeroUsize, + sampling: Interval, + interrupt: Arc, + progress: Arc>, + mut out: Sender>, +) { + workload.reset(Instant::now()); + + let mut iter_counter = iter_counter; + let mut sampler = Sampler::new(iter_counter.duration, sampling, &workload, &mut out); + + let mut result_stream = stream + .map(|_| iter_counter.next()) + .take_while(|i| ready(i.is_some())) + // unconstrained to workaround quadratic complexity of buffer_unordered () + .map(|i| tokio::task::unconstrained(workload.run(i.unwrap()))) + .buffer_unordered(concurrency.get()) + .inspect(|_| progress.tick()); + + while let Some(res) = result_stream.next().await { + match res { + Ok((iter, end_time)) => sampler.iteration_completed(iter, end_time).await, + Err(e) => { + out.send(Err(e)).await.unwrap(); + return; + } + } + if interrupt.is_interrupted() { + break; + } + } + // Send the statistics of remaining requests + sampler.finish().await; +} + +/// Launches a new worker task that runs a series of invocations of the workload function. +/// +/// The task will run as long as `deadline` produces new iteration numbers. +/// The task updates the `progress` bar after each successful iteration. +/// +/// Returns a stream where workload statistics are published. +fn spawn_stream( + concurrency: NonZeroUsize, + rate: Option, + sampling: Interval, + workload: Workload, + iter_counter: BoundedIterationCounter, + interrupt: Arc, + progress: Arc>, +) -> Receiver> { + let (tx, rx) = channel(1); + + tokio::spawn(async move { + match rate { + Some(rate) => { + let stream = interval_stream(rate); + run_stream( + stream, + workload, + iter_counter, + concurrency, + sampling, + interrupt, + progress, + tx, + ) + .await + } + None => { + let stream = futures::stream::repeat_with(|| ()); + run_stream( + stream, + workload, + iter_counter, + concurrency, + sampling, + interrupt, + progress, + tx, + ) + .await + } + } + }); + rx +} + +/// Receives one item from each of the streams. +/// Streams that are closed are ignored. +async fn receive_one_of_each(streams: &mut [S]) -> Vec +where + S: Stream + Unpin, +{ + let mut items = Vec::with_capacity(streams.len()); + for s in streams { + if let Some(item) = s.next().await { + items.push(item); + } + } + items +} + +/// Controls the intensity of requests sent to the server +pub struct ExecutionOptions { + /// How long to execute + pub duration: Interval, + /// Maximum rate of requests in requests per second, `None` means no limit + pub rate: Option, + /// Number of parallel threads of execution + pub threads: NonZeroUsize, + /// Number of outstanding async requests per each thread + pub concurrency: NonZeroUsize, +} + +/// Executes the given function many times in parallel. +/// Draws a progress bar. +/// Returns the statistics such as throughput or duration histogram. +/// +/// # Parameters +/// - `name`: text displayed next to the progress bar +/// - `count`: number of iterations +/// - `exec_options`: controls execution options such as parallelism level and rate +/// - `workload`: encapsulates a set of queries to execute +pub async fn par_execute( + name: &str, + exec_options: &ExecutionOptions, + sampling: Interval, + workload: Workload, + signals: Arc, + show_progress: bool, +) -> Result { + let thread_count = exec_options.threads.get(); + let concurrency = exec_options.concurrency; + let rate = exec_options.rate; + let progress = match exec_options.duration { + Interval::Count(count) => Progress::with_count(name.to_string(), count), + Interval::Time(duration) => Progress::with_duration(name.to_string(), duration), + Interval::Unbounded => unreachable!(), + }; + let progress_opts = status_line::Options { + initially_visible: show_progress, + ..Default::default() + }; + let progress = Arc::new(StatusLine::with_options(progress, progress_opts)); + let deadline = BoundedIterationCounter::new(exec_options.duration); + let mut streams = Vec::with_capacity(thread_count); + let mut stats = Recorder::start(rate, concurrency); + + for _ in 0..thread_count { + let s = spawn_stream( + concurrency, + rate.map(|r| r / (thread_count as f64)), + sampling, + workload.clone(), + deadline.share(), + signals.clone(), + progress.clone(), + ); + streams.push(s); + } + + loop { + let partial_stats: Vec<_> = receive_one_of_each(&mut streams) + .await + .into_iter() + .try_collect()?; + + if partial_stats.is_empty() { + break; + } + + let aggregate = stats.record(&partial_stats); + if sampling.is_bounded() { + progress.set_visible(false); + println!("{}", aggregate); + progress.set_visible(show_progress); + } + } + + Ok(stats.finish()) +} diff --git a/src/iteration.rs b/src/iteration.rs index e7ecae7..887e58d 100644 --- a/src/iteration.rs +++ b/src/iteration.rs @@ -1,5 +1,5 @@ use crate::config; -use crate::config::Duration; +use crate::config::Interval; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; @@ -55,7 +55,7 @@ impl IterationCounter { /// Provides distinct benchmark iteration numbers to multiple threads of execution. /// Decides when to stop the benchmark execution. pub struct BoundedIterationCounter { - pub duration: config::Duration, + pub duration: config::Interval, start_time: Instant, iteration_counter: IterationCounter, } @@ -63,7 +63,7 @@ pub struct BoundedIterationCounter { impl BoundedIterationCounter { /// Creates a new iteration counter based on configured benchmark duration. /// For time-based deadline, the clock starts ticking when this object is created. - pub fn new(duration: config::Duration) -> Self { + pub fn new(duration: config::Interval) -> Self { BoundedIterationCounter { duration, start_time: Instant::now(), @@ -74,7 +74,7 @@ impl BoundedIterationCounter { /// Returns the next iteration number or `None` if deadline or iteration count was exceeded. pub fn next(&mut self) -> Option { match self.duration { - Duration::Count(count) => { + Interval::Count(count) => { let result = self.iteration_counter.next(); if result < count { Some(result) @@ -82,13 +82,14 @@ impl BoundedIterationCounter { None } } - Duration::Time(duration) => { + Interval::Time(duration) => { if Instant::now() < self.start_time + duration { Some(self.iteration_counter.next()) } else { None } } + Interval::Unbounded => Some(self.iteration_counter.next()), } } diff --git a/src/main.rs b/src/main.rs index a1db667..ca7dd84 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,35 +1,26 @@ -use std::cmp::max; -use std::default::Default; use std::fs::File; use std::io::{stdout, Write}; -use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::process::exit; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use clap::Parser; -use futures::channel::mpsc; -use futures::channel::mpsc::{Receiver, Sender}; -use futures::future::ready; -use futures::{SinkExt, Stream, StreamExt}; use hdrhistogram::serialization::interval_log::Tag; use hdrhistogram::serialization::{interval_log, V2DeflateSerializer}; -use itertools::Itertools; - use rune::Source; -use status_line::StatusLine; use tokio::runtime::Builder; -use tokio_stream::wrappers::IntervalStream; use config::RunCommand; -use crate::config::{AppConfig, Command, HdrCommand, ShowCommand}; +use crate::config::{AppConfig, Command, HdrCommand, Interval, ShowCommand}; use crate::error::{LatteError, Result}; +use crate::exec::{par_execute, ExecutionOptions}; use crate::interrupt::InterruptHandler; use crate::iteration::BoundedIterationCounter; use crate::progress::Progress; use crate::report::{Report, RunConfigCmp}; +use crate::sampler::Sampler; use crate::session::*; use crate::session::{CassError, CassErrorKind, Session, SessionStats}; use crate::stats::{BenchmarkCmp, BenchmarkStats, Recorder}; @@ -37,11 +28,13 @@ use crate::workload::{FnRef, Workload, WorkloadStats, LOAD_FN, RUN_FN}; mod config; mod error; +mod exec; mod histogram; mod interrupt; mod iteration; mod progress; mod report; +mod sampler; mod session; mod stats; mod workload; @@ -51,281 +44,6 @@ const VERSION: &str = env!("CARGO_PKG_VERSION"); #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -fn interval_stream(rate: f64) -> IntervalStream { - let interval = Duration::from_nanos(max(1, (1000000000.0 / rate) as u64)); - IntervalStream::new(tokio::time::interval(interval)) -} - -/// Fetches session statistics and sends them to the channel. -async fn send_stats(workload: &Workload, tx: &mut Sender>) { - let stats = workload.take_stats(Instant::now()); - tx.send(Ok(stats)).await.unwrap(); -} - -/// Responsible for periodically getting a snapshot of statistics from the `workload` -/// and sending them to the `output` channel. The sampling period is controlled by `sampling`. -/// Snapshot is not taken near the end of the run to avoid small final sample. -struct Snapshotter<'a> { - run_duration: config::Duration, - sampling: config::Duration, - workload: &'a Workload, - output: &'a mut Sender>, - start_time: Instant, - last_snapshot_time: Instant, - last_snapshot_iter: u64, -} - -impl<'a> Snapshotter<'a> { - pub fn new( - run_duration: config::Duration, - sampling: config::Duration, - workload: &'a Workload, - output: &'a mut Sender>, - ) -> Snapshotter<'a> { - let start_time = Instant::now(); - Snapshotter { - run_duration, - sampling, - workload, - output, - start_time, - last_snapshot_time: start_time, - last_snapshot_iter: 0, - } - } - - /// Should be called when a workload iteration finished. - /// If there comes the time, it will send the stats to the output. - pub async fn iteration_completed(&mut self, iteration: u64, now: Instant) { - let current_interval_duration = now - self.last_snapshot_time; - let current_interval_iter_count = iteration - self.last_snapshot_iter; - - // Don't snapshot if we're too close to the end of the run, - // to avoid excessively small samples: - let far_from_the_end = match self.run_duration { - config::Duration::Time(d) => now < self.start_time + d - current_interval_duration / 2, - config::Duration::Count(count) => iteration < count - current_interval_iter_count / 2, - }; - - match self.sampling { - config::Duration::Time(d) => { - if now > self.last_snapshot_time + d && far_from_the_end { - send_stats(self.workload, self.output).await; - self.last_snapshot_time += d; - } - } - config::Duration::Count(cnt) => { - if iteration > self.last_snapshot_iter + cnt && far_from_the_end { - send_stats(self.workload, self.output).await; - self.last_snapshot_iter += cnt; - } - } - } - } -} - -/// Runs a stream of workload iterations till completion in the context of the current task. -/// Periodically sends workload statistics to the `out` channel. -/// -/// # Parameters -/// - stream: a stream of iteration numbers; None means the end of the stream -/// - workload: defines the function to call -/// - concurrency: the maximum number of pending workload calls -/// - sampling: controls when to output workload statistics -/// - progress: progress bar notified about each successful iteration -/// - interrupt: allows for terminating the stream early -/// - out: the channel to receive workload statistics -async fn run_stream( - stream: impl Stream + std::marker::Unpin, - workload: Workload, - iter_counter: BoundedIterationCounter, - concurrency: NonZeroUsize, - sampling: Option, - interrupt: Arc, - progress: Arc>, - mut out: Sender>, -) { - workload.reset(Instant::now()); - - let mut iter_counter = iter_counter; - let mut snapshotter = - sampling.map(|s| Snapshotter::new(iter_counter.duration, s, &workload, &mut out)); - - let mut result_stream = stream - .map(|_| iter_counter.next()) - .take_while(|i| ready(i.is_some())) - // unconstrained to workaround quadratic complexity of buffer_unordered () - .map(|i| tokio::task::unconstrained(workload.run(i.unwrap()))) - .buffer_unordered(concurrency.get()) - .inspect(|_| progress.tick()); - - while let Some(res) = result_stream.next().await { - match res { - Ok((iter, end_time)) => { - if let Some(snapshotter) = &mut snapshotter { - snapshotter.iteration_completed(iter, end_time).await - } - } - Err(e) => { - out.send(Err(e)).await.unwrap(); - return; - } - } - - if interrupt.is_interrupted() { - break; - } - } - // Send the statistics of remaining requests - send_stats(&workload, &mut out).await; -} - -/// Launches a new worker task that runs a series of invocations of the workload function. -/// -/// The task will run as long as `deadline` produces new iteration numbers. -/// The task updates the `progress` bar after each successful iteration. -/// -/// Returns a stream where workload statistics are published. -fn spawn_stream( - concurrency: NonZeroUsize, - rate: Option, - sampling: Option, - workload: Workload, - iter_counter: BoundedIterationCounter, - interrupt: Arc, - progress: Arc>, -) -> Receiver> { - let (tx, rx) = mpsc::channel(1); - - tokio::spawn(async move { - match rate { - Some(rate) => { - let stream = interval_stream(rate); - run_stream( - stream, - workload, - iter_counter, - concurrency, - sampling, - interrupt, - progress, - tx, - ) - .await - } - None => { - let stream = futures::stream::repeat_with(|| ()); - run_stream( - stream, - workload, - iter_counter, - concurrency, - sampling, - interrupt, - progress, - tx, - ) - .await - } - } - }); - rx -} - -/// Receives one item from each of the streams. -/// Streams that are closed are ignored. -async fn receive_one_of_each(streams: &mut [S]) -> Vec -where - S: Stream + Unpin, -{ - let mut items = Vec::with_capacity(streams.len()); - for s in streams { - if let Some(item) = s.next().await { - items.push(item); - } - } - items -} - -/// Controls the intensity of requests sent to the server -struct ExecutionOptions { - /// How long to execute - duration: config::Duration, - /// Maximum rate of requests in requests per second, `None` means no limit - rate: Option, - /// Number of parallel threads of execution - threads: NonZeroUsize, - /// Number of outstanding async requests per each thread - concurrency: NonZeroUsize, -} - -/// Executes the given function many times in parallel. -/// Draws a progress bar. -/// Returns the statistics such as throughput or duration histogram. -/// -/// # Parameters -/// - `name`: text displayed next to the progress bar -/// - `count`: number of iterations -/// - `exec_options`: controls execution options such as parallelism level and rate -/// - `workload`: encapsulates a set of queries to execute -async fn par_execute( - name: &str, - exec_options: &ExecutionOptions, - sampling_period: Option, - workload: Workload, - signals: Arc, - show_progress: bool, -) -> Result { - let thread_count = exec_options.threads.get(); - let concurrency = exec_options.concurrency; - let rate = exec_options.rate; - let progress = match exec_options.duration { - config::Duration::Count(count) => Progress::with_count(name.to_string(), count), - config::Duration::Time(duration) => Progress::with_duration(name.to_string(), duration), - }; - let progress_opts = status_line::Options { - initially_visible: show_progress, - ..Default::default() - }; - let progress = Arc::new(StatusLine::with_options(progress, progress_opts)); - let deadline = BoundedIterationCounter::new(exec_options.duration); - let mut streams = Vec::with_capacity(thread_count); - let mut stats = Recorder::start(rate, concurrency); - - for _ in 0..thread_count { - let s = spawn_stream( - concurrency, - rate.map(|r| r / (thread_count as f64)), - sampling_period, - workload.clone(), - deadline.share(), - signals.clone(), - progress.clone(), - ); - streams.push(s); - } - - loop { - let partial_stats: Vec<_> = receive_one_of_each(&mut streams) - .await - .into_iter() - .try_collect()?; - - if partial_stats.is_empty() { - break; - } - - let aggregate = stats.record(&partial_stats); - if sampling_period.is_some() { - progress.set_visible(false); - println!("{}", aggregate); - progress.set_visible(show_progress); - } - } - - Ok(stats.finish()) -} - fn load_report_or_abort(path: &Path) -> Report { match Report::load(path) { Ok(r) => r, @@ -432,7 +150,7 @@ async fn run(conf: RunCommand) -> Result<()> { if load_count > 0 && program.has_load() { eprintln!("info: Loading data..."); let load_options = ExecutionOptions { - duration: config::Duration::Count(load_count), + duration: config::Interval::Count(load_count), rate: None, threads: conf.threads, concurrency: conf.load_concurrency, @@ -440,7 +158,7 @@ async fn run(conf: RunCommand) -> Result<()> { par_execute( "Loading...", &load_options, - None, + config::Interval::Unbounded, loader, interrupt.clone(), !conf.quiet, @@ -464,7 +182,7 @@ async fn run(conf: RunCommand) -> Result<()> { par_execute( "Warming up...", &warmup_options, - None, + Interval::Unbounded, runner.clone(), interrupt.clone(), !conf.quiet, @@ -497,7 +215,7 @@ async fn run(conf: RunCommand) -> Result<()> { let stats = par_execute( "Running...", &exec_options, - Some(conf.sampling_period), + conf.sampling_interval, runner, interrupt.clone(), !conf.quiet, diff --git a/src/report.rs b/src/report.rs index cabfe81..923c116 100644 --- a/src/report.rs +++ b/src/report.rs @@ -509,10 +509,10 @@ impl<'a> Display for RunConfigCmp<'a> { Quantity::from(conf.run_duration.count()) }), self.line("Sampling", "s", |conf| { - Quantity::from(conf.sampling_period.seconds()).with_precision(1) + Quantity::from(conf.sampling_interval.seconds()).with_precision(1) }), self.line("└─", "op", |conf| { - Quantity::from(conf.sampling_period.count()) + Quantity::from(conf.sampling_interval.count()) }), ]; diff --git a/src/sampler.rs b/src/sampler.rs new file mode 100644 index 0000000..3b921b1 --- /dev/null +++ b/src/sampler.rs @@ -0,0 +1,80 @@ +use crate::error::Result; +use crate::{config, Interval, Workload, WorkloadStats}; +use futures::channel::mpsc::Sender; +use futures::SinkExt; +use std::time::Instant; + +/// Responsible for periodically getting a snapshot of statistics from the `workload` +/// and sending them to the `output` channel. The sampling period is controlled by `sampling`. +/// Snapshot is not taken near the end of the run to avoid small final sample. +pub struct Sampler<'a> { + run_duration: config::Interval, + sampling: config::Interval, + workload: &'a Workload, + output: &'a mut Sender>, + start_time: Instant, + last_snapshot_time: Instant, + last_snapshot_iter: u64, +} + +impl<'a> Sampler<'a> { + pub fn new( + run_duration: config::Interval, + sampling: config::Interval, + workload: &'a Workload, + output: &'a mut Sender>, + ) -> Sampler<'a> { + let start_time = Instant::now(); + Sampler { + run_duration, + sampling, + workload, + output, + start_time, + last_snapshot_time: start_time, + last_snapshot_iter: 0, + } + } + + /// Should be called when a workload iteration finished. + /// If there comes the time, it will send the stats to the output. + pub async fn iteration_completed(&mut self, iteration: u64, now: Instant) { + let current_interval_duration = now - self.last_snapshot_time; + let current_interval_iter_count = iteration - self.last_snapshot_iter; + + // Don't snapshot if we're too close to the end of the run, + // to avoid excessively small samples: + let far_from_the_end = match self.run_duration { + config::Interval::Time(d) => now < self.start_time + d - current_interval_duration / 2, + config::Interval::Count(count) => iteration < count - current_interval_iter_count / 2, + config::Interval::Unbounded => true, + }; + + match self.sampling { + Interval::Time(d) => { + if now > self.last_snapshot_time + d && far_from_the_end { + self.send_stats().await; + self.last_snapshot_time += d; + } + } + Interval::Count(cnt) => { + if iteration > self.last_snapshot_iter + cnt && far_from_the_end { + self.send_stats().await; + self.last_snapshot_iter += cnt; + } + } + Interval::Unbounded => {} + } + } + + /// Finishes the run by emiting the last sample + pub async fn finish(mut self) { + self.send_stats().await; + } + + /// Fetches session statistics and sends them to the channel. + async fn send_stats(&mut self) { + let stats = self.workload.take_stats(Instant::now()); + self.output.send(Ok(stats)).await.unwrap(); + } +}