Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename iterations to cycles #19

Merged
merged 1 commit into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Latte has the following unique performance characteristics:
This means you can test large clusters with a small number of clients.
* About 50x-100x lower memory footprint than Java-based tools.
* Very low impact on operating system resources – low number of syscalls, context switches and page faults.
* No client code warmup needed. The client code works with maximum performance from the first iteration.
* No client code warmup needed. The client code works with maximum performance from the first benchmark cycle.
Even runs as short as 30 seconds give accurate results.
* No GC pauses nor HotSpot recompilation happening in the middle of the test. You want to measure hiccups of the server,
not the benchmarking tool.
Expand Down Expand Up @@ -110,7 +110,7 @@ A workload script defines a set of public functions that Latte calls automatical
must define at least a single public async function `run` with two arguments:

- `ctx` – session context that provides the access to Cassandra
- `i` – current unique iteration number of a 64-bit integer type, starting at 0
- `i` – current unique cycle number of a 64-bit integer type, starting at 0

The following script would benchmark querying the `system.local` table:

Expand Down Expand Up @@ -181,7 +181,7 @@ If needed, you can skip the loading phase by passing `--no-load` command line fl
### Generating data

Latte comes with a library of data generating functions. They are accessible in the `latte` crate. Typically, those
functions accept an integer `i` iteration number, so you can generate consistent numbers. The data generating functions
functions accept an integer `i` cycle number, so you can generate consistent numbers. The data generating functions
are pure, i.e. invoking them multiple times with the same parameters yields always the same results.

- `latte::uuid(i)` – generates a random (type 4) UUID
Expand Down
13 changes: 6 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Interval {
}
}

/// If the string is a valid integer, it is assumed to be the number of iterations.
/// If the string is a valid integer, it is assumed to be the number of cycles.
/// If the string additionally contains a time unit, e.g. "s" or "secs", it is parsed
/// as time duration.
impl FromStr for Interval {
Expand All @@ -75,7 +75,7 @@ impl FromStr for Interval {
} else if let Ok(d) = parse_duration::parse(s) {
Ok(Interval::Time(d))
} else {
Err("Required integer number of iterations or time duration".to_string())
Err("Required integer number of cycles or time duration".to_string())
}
}
}
Expand All @@ -86,13 +86,12 @@ impl FromStr for Interval {
setting(AppSettings::DeriveDisplayOrder)
)]
pub struct RunCommand {
/// Number of requests per second to send.
/// If not given or zero the requests will be sent as fast as possible within
/// the parallelism limit
/// Number of cycles per second to execute.
/// If not given, the benchmark cycles will be executed as fast as possible.
#[clap(short('r'), long, value_name = "COUNT")]
pub rate: Option<f64>,

/// Number of iterations or time duration of the warmup phase
/// Number of cycles or duration of the warmup phase
#[clap(
short('w'),
long("warmup"),
Expand All @@ -101,7 +100,7 @@ pub struct RunCommand {
)]
pub warmup_duration: Interval,

/// Number of iterations or time duration of the main benchmark phase
/// Number of cycles or duration of the main benchmark phase
#[clap(
short('d'),
long("duration"),
Expand Down
59 changes: 30 additions & 29 deletions src/iteration.rs → src/cycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,27 @@ use std::time::Instant;

const BATCH_SIZE: u64 = 64;

/// Provides distinct benchmark iteration numbers to multiple threads of execution.
pub struct IterationCounter {
/// Provides distinct benchmark cycle numbers to multiple threads of execution.
/// Cycle numbers increase and never repeat.
pub struct CycleCounter {
shared: Arc<AtomicU64>,
local: u64,
local_max: u64,
}

impl IterationCounter {
/// Creates a new iteration counter, starting at `start`.
impl CycleCounter {
/// Creates a new cycle counter, starting at `start`.
/// The counter is logically positioned at one item before `start`, so the first call
/// to `next` will return `start`.
pub fn new(start: u64) -> Self {
IterationCounter {
CycleCounter {
shared: Arc::new(AtomicU64::new(start)),
local: 0, // the value does not matter as long as it is not lower than local_max
local_max: 0, // force getting the shared count in the first call to `next`
}
}

/// Gets the next iteration number and advances the counter by one
/// Gets the next cycle number and advances the counter by one.
pub fn next(&mut self) -> u64 {
if self.local >= self.local_max {
self.next_batch();
Expand All @@ -35,47 +36,47 @@ impl IterationCounter {
result
}

/// Reserves the next batch of iterations
/// Reserves the next batch of cycles.
fn next_batch(&mut self) {
self.local = self.shared.fetch_add(BATCH_SIZE, Ordering::Relaxed);
self.local_max = self.local + BATCH_SIZE;
}

/// Creates a new counter sharing the list of iterations with this one.
/// The new counter will never return the same iteration number as this one.
pub fn share(&self) -> IterationCounter {
IterationCounter {
/// Creates a new counter sharing the list of cycles with this one.
/// The new counter will never return the same cycle number as this one.
pub fn share(&self) -> CycleCounter {
CycleCounter {
shared: self.shared.clone(),
local: 0,
local_max: 0,
}
}
}

/// Provides distinct benchmark iteration numbers to multiple threads of execution.
/// Provides distinct benchmark cycle numbers to multiple threads of execution.
/// Decides when to stop the benchmark execution.
pub struct BoundedIterationCounter {
pub struct BoundedCycleCounter {
pub duration: config::Interval,
start_time: Instant,
iteration_counter: IterationCounter,
cycle_counter: CycleCounter,
}

impl BoundedIterationCounter {
/// Creates a new iteration counter based on configured benchmark duration.
impl BoundedCycleCounter {
/// Creates a new counter based on configured benchmark duration.
/// For time-based deadline, the clock starts ticking when this object is created.
pub fn new(duration: config::Interval) -> Self {
BoundedIterationCounter {
BoundedCycleCounter {
duration,
start_time: Instant::now(),
iteration_counter: IterationCounter::new(0),
cycle_counter: CycleCounter::new(0),
}
}

/// Returns the next iteration number or `None` if deadline or iteration count was exceeded.
/// Returns the next cycle number or `None` if deadline or cycle count was exceeded.
pub fn next(&mut self) -> Option<u64> {
match self.duration {
Interval::Count(count) => {
let result = self.iteration_counter.next();
let result = self.cycle_counter.next();
if result < count {
Some(result)
} else {
Expand All @@ -84,43 +85,43 @@ impl BoundedIterationCounter {
}
Interval::Time(duration) => {
if Instant::now() < self.start_time + duration {
Some(self.iteration_counter.next())
Some(self.cycle_counter.next())
} else {
None
}
}
Interval::Unbounded => Some(self.iteration_counter.next()),
Interval::Unbounded => Some(self.cycle_counter.next()),
}
}

/// Shares this counter e.g. with another thread.
pub fn share(&self) -> Self {
BoundedIterationCounter {
BoundedCycleCounter {
start_time: self.start_time,
duration: self.duration,
iteration_counter: self.iteration_counter.share(),
cycle_counter: self.cycle_counter.share(),
}
}
}

#[cfg(test)]
mod test {
use crate::iteration::{IterationCounter, BATCH_SIZE};
use crate::cycle::{CycleCounter, BATCH_SIZE};
use itertools::Itertools;
use std::collections::BTreeSet;

#[test]
pub fn iteration_counter_must_return_all_numbers() {
let mut counter = IterationCounter::new(10);
pub fn cycle_counter_must_return_all_numbers() {
let mut counter = CycleCounter::new(10);
for i in 10..(10 + 2 * BATCH_SIZE) {
let iter = counter.next();
assert_eq!(i, iter)
}
}

#[test]
pub fn shared_iteration_counter_must_return_distinct_numbers() {
let mut counter1 = IterationCounter::new(10);
pub fn shared_cycle_counter_must_return_distinct_numbers() {
let mut counter1 = CycleCounter::new(10);
let mut counter2 = counter1.share();
let mut set1 = BTreeSet::new();
let mut set2 = BTreeSet::new();
Expand Down
28 changes: 14 additions & 14 deletions src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use tokio_stream::wrappers::IntervalStream;

use crate::error::Result;
use crate::{
BenchmarkStats, BoundedIterationCounter, InterruptHandler, Interval, Progress, Recorder,
Sampler, Workload, WorkloadStats,
BenchmarkStats, BoundedCycleCounter, InterruptHandler, Interval, Progress, Recorder, Sampler,
Workload, WorkloadStats,
};

/// Returns a stream emitting `rate` events per second.
Expand All @@ -23,24 +23,24 @@ fn interval_stream(rate: f64) -> IntervalStream {
IntervalStream::new(tokio::time::interval(interval))
}

/// Runs a stream of workload iterations till completion in the context of the current task.
/// Runs a stream of workload cycles till completion in the context of the current task.
/// Periodically sends workload statistics to the `out` channel.
///
/// # Parameters
/// - stream: a stream of iteration numbers; None means the end of the stream
/// - stream: a stream of cycle numbers; None means the end of the stream
/// - workload: defines the function to call
/// - iter_counter: shared iteration numbers provider
/// - cycle_counter: shared cycle numbers provider
/// - concurrency: the maximum number of pending workload calls
/// - sampling: controls when to output workload statistics
/// - progress: progress bar notified about each successful iteration
/// - progress: progress bar notified about each successful cycle
/// - interrupt: allows for terminating the stream early
/// - out: the channel to receive workload statistics
///
#[allow(clippy::too_many_arguments)] // todo: refactor
async fn run_stream<T>(
stream: impl Stream<Item = T> + std::marker::Unpin,
workload: Workload,
iter_counter: BoundedIterationCounter,
cycle_counter: BoundedCycleCounter,
concurrency: NonZeroUsize,
sampling: Interval,
interrupt: Arc<InterruptHandler>,
Expand All @@ -49,7 +49,7 @@ async fn run_stream<T>(
) {
workload.reset(Instant::now());

let mut iter_counter = iter_counter;
let mut iter_counter = cycle_counter;
let mut sampler = Sampler::new(iter_counter.duration, sampling, &workload, &mut out);

let mut result_stream = stream
Expand All @@ -62,7 +62,7 @@ async fn run_stream<T>(

while let Some(res) = result_stream.next().await {
match res {
Ok((iter, end_time)) => sampler.iteration_completed(iter, end_time).await,
Ok((iter, end_time)) => sampler.cycle_completed(iter, end_time).await,
Err(e) => {
out.send(Err(e)).await.unwrap();
return;
Expand All @@ -78,16 +78,16 @@ async fn run_stream<T>(

/// Launches a new worker task that runs a series of invocations of the workload function.
///
/// The task will run as long as `deadline` produces new iteration numbers.
/// The task updates the `progress` bar after each successful iteration.
/// The task will run as long as `deadline` produces new cycle numbers.
/// The task updates the `progress` bar after each successful cycle.
///
/// Returns a stream where workload statistics are published.
fn spawn_stream(
concurrency: NonZeroUsize,
rate: Option<f64>,
sampling: Interval,
workload: Workload,
iter_counter: BoundedIterationCounter,
iter_counter: BoundedCycleCounter,
interrupt: Arc<InterruptHandler>,
progress: Arc<StatusLine<Progress>>,
) -> Receiver<Result<WorkloadStats>> {
Expand Down Expand Up @@ -161,7 +161,7 @@ pub struct ExecutionOptions {
///
/// # Parameters
/// - `name`: text displayed next to the progress bar
/// - `count`: number of iterations
/// - `count`: number of cycles
/// - `exec_options`: controls execution options such as parallelism level and rate
/// - `workload`: encapsulates a set of queries to execute
pub async fn par_execute(
Expand All @@ -185,7 +185,7 @@ pub async fn par_execute(
..Default::default()
};
let progress = Arc::new(StatusLine::with_options(progress, progress_opts));
let deadline = BoundedIterationCounter::new(exec_options.duration);
let deadline = BoundedCycleCounter::new(exec_options.duration);
let mut streams = Vec::with_capacity(thread_count);
let mut stats = Recorder::start(rate, concurrency);

Expand Down
10 changes: 5 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use tokio::runtime::Builder;
use config::RunCommand;

use crate::config::{AppConfig, Command, HdrCommand, Interval, ShowCommand};
use crate::cycle::BoundedCycleCounter;
use crate::error::{LatteError, Result};
use crate::exec::{par_execute, ExecutionOptions};
use crate::interrupt::InterruptHandler;
use crate::iteration::BoundedIterationCounter;
use crate::progress::Progress;
use crate::report::{Report, RunConfigCmp};
use crate::sampler::Sampler;
Expand All @@ -27,11 +27,11 @@ use crate::stats::{BenchmarkCmp, BenchmarkStats, Recorder};
use crate::workload::{FnRef, Workload, WorkloadStats, LOAD_FN, RUN_FN};

mod config;
mod cycle;
mod error;
mod exec;
mod histogram;
mod interrupt;
mod iteration;
mod progress;
mod report;
mod sampler;
Expand Down Expand Up @@ -293,16 +293,16 @@ async fn export_hdr_log(conf: HdrCommand) -> Result<()> {
let interval_start_time = Duration::from_millis((sample.time_s * 1000.0) as u64);
let interval_duration = Duration::from_millis((sample.duration_s * 1000.0) as u64);
log_writer.write_histogram(
&sample.call_time_histogram_ns.0,
&sample.cycle_time_histogram_ns.0,
interval_start_time,
interval_duration,
Tag::new("call_time"),
Tag::new("cycles"),
)?;
log_writer.write_histogram(
&sample.resp_time_histogram_ns.0,
interval_start_time,
interval_duration,
Tag::new("resp_time"),
Tag::new("requests"),
)?;
}
Ok(())
Expand Down
Loading