Skip to content

Commit

Permalink
Move some code from main to their own modules
Browse files Browse the repository at this point in the history
  • Loading branch information
pkolaczk committed Dec 15, 2021
1 parent 758d616 commit 34221f5
Show file tree
Hide file tree
Showing 6 changed files with 341 additions and 311 deletions.
31 changes: 19 additions & 12 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,37 @@ where

/// Controls how long the benchmark should run.
/// We can specify either a time-based duration or a number of calls to perform.
/// It is also used for controlling sampling.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum Duration {
pub enum Interval {
Count(u64),
Time(tokio::time::Duration),
Unbounded,
}

impl Duration {
impl Interval {
pub fn is_not_zero(&self) -> bool {
match self {
Duration::Count(cnt) => *cnt > 0,
Duration::Time(d) => !d.is_zero(),
Interval::Count(cnt) => *cnt > 0,
Interval::Time(d) => !d.is_zero(),
Interval::Unbounded => false,
}
}

pub fn is_bounded(&self) -> bool {
!matches!(self, Interval::Unbounded)
}

pub fn count(&self) -> Option<u64> {
if let Duration::Count(c) = self {
if let Interval::Count(c) = self {
Some(*c)
} else {
None
}
}

pub fn seconds(&self) -> Option<f32> {
if let Duration::Time(d) = self {
if let Interval::Time(d) = self {
Some(d.as_secs_f32())
} else {
None
Expand All @@ -59,14 +66,14 @@ impl Duration {
/// If the string is a valid integer, it is assumed to be the number of iterations.
/// If the string additionally contains a time unit, e.g. "s" or "secs", it is parsed
/// as time duration.
impl FromStr for Duration {
impl FromStr for Interval {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(i) = s.parse() {
Ok(Duration::Count(i))
Ok(Interval::Count(i))
} else if let Ok(d) = parse_duration::parse(s) {
Ok(Duration::Time(d))
Ok(Interval::Time(d))
} else {
Err("Required integer number of iterations or time duration".to_string())
}
Expand All @@ -92,7 +99,7 @@ pub struct RunCommand {
default_value = "1",
value_name = "TIME | COUNT"
)]
pub warmup_duration: Duration,
pub warmup_duration: Interval,

/// Number of iterations or time duration of the main benchmark phase
#[clap(
Expand All @@ -101,7 +108,7 @@ pub struct RunCommand {
default_value = "60s",
value_name = "TIME | COUNT"
)]
pub run_duration: Duration,
pub run_duration: Interval,

/// Number of worker threads used by the driver
#[clap(short('t'), long, default_value = "1", value_name = "COUNT")]
Expand All @@ -126,7 +133,7 @@ pub struct RunCommand {
default_value = "1s",
value_name = "TIME | COUNT"
)]
pub sampling_period: Duration,
pub sampling_interval: Interval,

/// Label that will be added to the report to help identifying the test
#[clap(long("tag"), number_of_values = 1, multiple_occurrences = true)]
Expand Down
224 changes: 224 additions & 0 deletions src/exec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
//! Implementation of the main benchmarking loop

use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::{SinkExt, Stream, StreamExt};
use itertools::Itertools;
use status_line::StatusLine;
use std::cmp::max;
use std::future::ready;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Instant;
use tokio_stream::wrappers::IntervalStream;

use crate::error::Result;
use crate::{
BenchmarkStats, BoundedIterationCounter, InterruptHandler, Interval, Progress, Recorder,
Sampler, Workload, WorkloadStats,
};

/// Returns a stream emitting `rate` events per second.
fn interval_stream(rate: f64) -> IntervalStream {
let interval = tokio::time::Duration::from_nanos(max(1, (1000000000.0 / rate) as u64));
IntervalStream::new(tokio::time::interval(interval))
}

/// Runs a stream of workload iterations till completion in the context of the current task.
/// Periodically sends workload statistics to the `out` channel.
///
/// # Parameters
/// - stream: a stream of iteration numbers; None means the end of the stream
/// - workload: defines the function to call
/// - iter_counter: shared iteration numbers provider
/// - concurrency: the maximum number of pending workload calls
/// - sampling: controls when to output workload statistics
/// - progress: progress bar notified about each successful iteration
/// - interrupt: allows for terminating the stream early
/// - out: the channel to receive workload statistics
///
#[allow(clippy::too_many_arguments)] // todo: refactor
async fn run_stream<T>(
stream: impl Stream<Item = T> + std::marker::Unpin,
workload: Workload,
iter_counter: BoundedIterationCounter,
concurrency: NonZeroUsize,
sampling: Interval,
interrupt: Arc<InterruptHandler>,
progress: Arc<StatusLine<Progress>>,
mut out: Sender<Result<WorkloadStats>>,
) {
workload.reset(Instant::now());

let mut iter_counter = iter_counter;
let mut sampler = Sampler::new(iter_counter.duration, sampling, &workload, &mut out);

let mut result_stream = stream
.map(|_| iter_counter.next())
.take_while(|i| ready(i.is_some()))
// unconstrained to workaround quadratic complexity of buffer_unordered ()
.map(|i| tokio::task::unconstrained(workload.run(i.unwrap())))
.buffer_unordered(concurrency.get())
.inspect(|_| progress.tick());

while let Some(res) = result_stream.next().await {
match res {
Ok((iter, end_time)) => sampler.iteration_completed(iter, end_time).await,
Err(e) => {
out.send(Err(e)).await.unwrap();
return;
}
}
if interrupt.is_interrupted() {
break;
}
}
// Send the statistics of remaining requests
sampler.finish().await;
}

/// Launches a new worker task that runs a series of invocations of the workload function.
///
/// The task will run as long as `deadline` produces new iteration numbers.
/// The task updates the `progress` bar after each successful iteration.
///
/// Returns a stream where workload statistics are published.
fn spawn_stream(
concurrency: NonZeroUsize,
rate: Option<f64>,
sampling: Interval,
workload: Workload,
iter_counter: BoundedIterationCounter,
interrupt: Arc<InterruptHandler>,
progress: Arc<StatusLine<Progress>>,
) -> Receiver<Result<WorkloadStats>> {
let (tx, rx) = channel(1);

tokio::spawn(async move {
match rate {
Some(rate) => {
let stream = interval_stream(rate);
run_stream(
stream,
workload,
iter_counter,
concurrency,
sampling,
interrupt,
progress,
tx,
)
.await
}
None => {
let stream = futures::stream::repeat_with(|| ());
run_stream(
stream,
workload,
iter_counter,
concurrency,
sampling,
interrupt,
progress,
tx,
)
.await
}
}
});
rx
}

/// Receives one item from each of the streams.
/// Streams that are closed are ignored.
async fn receive_one_of_each<T, S>(streams: &mut [S]) -> Vec<T>
where
S: Stream<Item = T> + Unpin,
{
let mut items = Vec::with_capacity(streams.len());
for s in streams {
if let Some(item) = s.next().await {
items.push(item);
}
}
items
}

/// Controls the intensity of requests sent to the server
pub struct ExecutionOptions {
/// How long to execute
pub duration: Interval,
/// Maximum rate of requests in requests per second, `None` means no limit
pub rate: Option<f64>,
/// Number of parallel threads of execution
pub threads: NonZeroUsize,
/// Number of outstanding async requests per each thread
pub concurrency: NonZeroUsize,
}

/// Executes the given function many times in parallel.
/// Draws a progress bar.
/// Returns the statistics such as throughput or duration histogram.
///
/// # Parameters
/// - `name`: text displayed next to the progress bar
/// - `count`: number of iterations
/// - `exec_options`: controls execution options such as parallelism level and rate
/// - `workload`: encapsulates a set of queries to execute
pub async fn par_execute(
name: &str,
exec_options: &ExecutionOptions,
sampling: Interval,
workload: Workload,
signals: Arc<InterruptHandler>,
show_progress: bool,
) -> Result<BenchmarkStats> {
let thread_count = exec_options.threads.get();
let concurrency = exec_options.concurrency;
let rate = exec_options.rate;
let progress = match exec_options.duration {
Interval::Count(count) => Progress::with_count(name.to_string(), count),
Interval::Time(duration) => Progress::with_duration(name.to_string(), duration),
Interval::Unbounded => unreachable!(),
};
let progress_opts = status_line::Options {
initially_visible: show_progress,
..Default::default()
};
let progress = Arc::new(StatusLine::with_options(progress, progress_opts));
let deadline = BoundedIterationCounter::new(exec_options.duration);
let mut streams = Vec::with_capacity(thread_count);
let mut stats = Recorder::start(rate, concurrency);

for _ in 0..thread_count {
let s = spawn_stream(
concurrency,
rate.map(|r| r / (thread_count as f64)),
sampling,
workload.clone(),
deadline.share(),
signals.clone(),
progress.clone(),
);
streams.push(s);
}

loop {
let partial_stats: Vec<_> = receive_one_of_each(&mut streams)
.await
.into_iter()
.try_collect()?;

if partial_stats.is_empty() {
break;
}

let aggregate = stats.record(&partial_stats);
if sampling.is_bounded() {
progress.set_visible(false);
println!("{}", aggregate);
progress.set_visible(show_progress);
}
}

Ok(stats.finish())
}
11 changes: 6 additions & 5 deletions src/iteration.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config;
use crate::config::Duration;
use crate::config::Interval;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -55,15 +55,15 @@ impl IterationCounter {
/// Provides distinct benchmark iteration numbers to multiple threads of execution.
/// Decides when to stop the benchmark execution.
pub struct BoundedIterationCounter {
pub duration: config::Duration,
pub duration: config::Interval,
start_time: Instant,
iteration_counter: IterationCounter,
}

impl BoundedIterationCounter {
/// Creates a new iteration counter based on configured benchmark duration.
/// For time-based deadline, the clock starts ticking when this object is created.
pub fn new(duration: config::Duration) -> Self {
pub fn new(duration: config::Interval) -> Self {
BoundedIterationCounter {
duration,
start_time: Instant::now(),
Expand All @@ -74,21 +74,22 @@ impl BoundedIterationCounter {
/// Returns the next iteration number or `None` if deadline or iteration count was exceeded.
pub fn next(&mut self) -> Option<u64> {
match self.duration {
Duration::Count(count) => {
Interval::Count(count) => {
let result = self.iteration_counter.next();
if result < count {
Some(result)
} else {
None
}
}
Duration::Time(duration) => {
Interval::Time(duration) => {
if Instant::now() < self.start_time + duration {
Some(self.iteration_counter.next())
} else {
None
}
}
Interval::Unbounded => Some(self.iteration_counter.next()),
}
}

Expand Down
Loading

0 comments on commit 34221f5

Please sign in to comment.