Skip to content

Commit

Permalink
Don't emit tiny interval at the end of the run
Browse files Browse the repository at this point in the history
The snapshotter has been modified to not emit
the stats snapshot shortly before the end of the run.
This way we avoid a weird tiny final interval with
large error.
  • Loading branch information
pkolaczk committed Dec 15, 2021
1 parent 208409a commit 758d616
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 29 deletions.
4 changes: 2 additions & 2 deletions src/iteration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ impl IterationCounter {
/// Provides distinct benchmark iteration numbers to multiple threads of execution.
/// Decides when to stop the benchmark execution.
pub struct BoundedIterationCounter {
pub duration: config::Duration,
start_time: Instant,
duration: config::Duration,
iteration_counter: IterationCounter,
}

Expand All @@ -65,8 +65,8 @@ impl BoundedIterationCounter {
/// For time-based deadline, the clock starts ticking when this object is created.
pub fn new(duration: config::Duration) -> Self {
BoundedIterationCounter {
start_time: Instant::now(),
duration,
start_time: Instant::now(),
iteration_counter: IterationCounter::new(0),
}
}
Expand Down
58 changes: 37 additions & 21 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,44 +64,58 @@ async fn send_stats(workload: &Workload, tx: &mut Sender<Result<WorkloadStats>>)

/// Responsible for periodically getting a snapshot of statistics from the `workload`
/// and sending them to the `output` channel. The sampling period is controlled by `sampling`.
/// Snapshot is not taken near the end of the run to avoid small final sample.
struct Snapshotter<'a> {
run_duration: config::Duration,
sampling: config::Duration,
workload: &'a Workload,
output: &'a mut Sender<Result<WorkloadStats>>,
start_time: Instant,
last_snapshot_time: Instant,
last_snapshot_iter: u64,
current_iter: u64,
}

impl<'a> Snapshotter<'a> {
pub fn new(
run_duration: config::Duration,
sampling: config::Duration,
workload: &'a Workload,
output: &'a mut Sender<Result<WorkloadStats>>,
) -> Snapshotter<'a> {
let start_time = Instant::now();
Snapshotter {
run_duration,
sampling,
workload,
output,
last_snapshot_time: Instant::now(),
start_time,
last_snapshot_time: start_time,
last_snapshot_iter: 0,
current_iter: 0,
}
}

/// Should be called when a workload iteration finished.
/// If there comes the time, it will send the stats to the output.
pub async fn iteration_completed(&mut self, end_time: Instant) {
self.current_iter += 1;
pub async fn iteration_completed(&mut self, iteration: u64, now: Instant) {
let current_interval_duration = now - self.last_snapshot_time;
let current_interval_iter_count = iteration - self.last_snapshot_iter;

// Don't snapshot if we're too close to the end of the run,
// to avoid excessively small samples:
let far_from_the_end = match self.run_duration {
config::Duration::Time(d) => now < self.start_time + d - current_interval_duration / 2,
config::Duration::Count(count) => iteration < count - current_interval_iter_count / 2,
};

match self.sampling {
config::Duration::Time(d) => {
if end_time - self.last_snapshot_time > d {
if now > self.last_snapshot_time + d && far_from_the_end {
send_stats(self.workload, self.output).await;
self.last_snapshot_time += d;
}
}
config::Duration::Count(cnt) => {
if self.current_iter - self.last_snapshot_iter > cnt {
if iteration > self.last_snapshot_iter + cnt && far_from_the_end {
send_stats(self.workload, self.output).await;
self.last_snapshot_iter += cnt;
}
Expand All @@ -121,9 +135,10 @@ impl<'a> Snapshotter<'a> {
/// - progress: progress bar notified about each successful iteration
/// - interrupt: allows for terminating the stream early
/// - out: the channel to receive workload statistics
async fn run_stream(
stream: impl Stream<Item = Option<u64>> + std::marker::Unpin,
async fn run_stream<T>(
stream: impl Stream<Item = T> + std::marker::Unpin,
workload: Workload,
iter_counter: BoundedIterationCounter,
concurrency: NonZeroUsize,
sampling: Option<config::Duration>,
interrupt: Arc<InterruptHandler>,
Expand All @@ -132,19 +147,23 @@ async fn run_stream(
) {
workload.reset(Instant::now());

let mut iter_counter = iter_counter;
let mut snapshotter =
sampling.map(|s| Snapshotter::new(iter_counter.duration, s, &workload, &mut out));

let mut result_stream = stream
.map(|_| iter_counter.next())
.take_while(|i| ready(i.is_some()))
// unconstrained to workaround quadratic complexity of buffer_unordered ()
.map(|i| tokio::task::unconstrained(workload.run(i.unwrap() as i64)))
.map(|i| tokio::task::unconstrained(workload.run(i.unwrap())))
.buffer_unordered(concurrency.get())
.inspect(|_| progress.tick());

let mut snapshotter = sampling.map(|s| Snapshotter::new(s, &workload, &mut out));
while let Some(res) = result_stream.next().await {
match res {
Ok(end_time) => {
Ok((iter, end_time)) => {
if let Some(snapshotter) = &mut snapshotter {
snapshotter.iteration_completed(end_time).await
snapshotter.iteration_completed(iter, end_time).await
}
}
Err(e) => {
Expand Down Expand Up @@ -172,20 +191,20 @@ fn spawn_stream(
rate: Option<f64>,
sampling: Option<config::Duration>,
workload: Workload,
deadline: BoundedIterationCounter,
iter_counter: BoundedIterationCounter,
interrupt: Arc<InterruptHandler>,
progress: Arc<StatusLine<Progress>>,
) -> Receiver<Result<WorkloadStats>> {
let (tx, rx) = mpsc::channel(1);
let mut deadline = deadline;

tokio::spawn(async move {
match rate {
Some(rate) => {
let stream = interval_stream(rate).map(|_| deadline.next());
let stream = interval_stream(rate);
run_stream(
stream,
workload,
iter_counter,
concurrency,
sampling,
interrupt,
Expand All @@ -195,10 +214,11 @@ fn spawn_stream(
.await
}
None => {
let stream = futures::stream::repeat_with(|| deadline.next());
let stream = futures::stream::repeat_with(|| ());
run_stream(
stream,
workload,
iter_counter,
concurrency,
sampling,
interrupt,
Expand Down Expand Up @@ -269,10 +289,6 @@ async fn par_execute(
};
let progress = Arc::new(StatusLine::with_options(progress, progress_opts));
let deadline = BoundedIterationCounter::new(exec_options.duration);
let sampling_period = sampling_period.map(|s| match s {
config::Duration::Count(cnt) => config::Duration::Count(cnt / thread_count as u64),
config::Duration::Time(d) => config::Duration::Time(d),
});
let mut streams = Vec::with_capacity(thread_count);
let mut stats = Recorder::start(rate, concurrency);

Expand Down
2 changes: 1 addition & 1 deletion src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ impl Display for Sample {
write!(
f,
"{:8.3} {:11.0} {:11.0} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3}",
self.time_s,
self.time_s + self.duration_s,
self.call_throughput,
self.req_throughput,
self.resp_time_percentiles[Percentile::Min as usize],
Expand Down
10 changes: 5 additions & 5 deletions src/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,24 +384,24 @@ impl Workload {
/// Executes a single iteration of a workload.
/// This should be idempotent –
/// the generated action should be a function of the iteration number.
/// Returns the end time of the query.
pub async fn run(&self, iteration: i64) -> Result<Instant, LatteError> {
/// Returns the iteration number and the end time of the query.
pub async fn run(&self, iteration: u64) -> Result<(u64, Instant), LatteError> {
let start_time = Instant::now();
let session = SessionRef::new(&self.session);
let result = self
.program
.async_call(self.function, (session, iteration))
.async_call(self.function, (session, iteration as i64))
.await
.map(|_| ()); // erase Value, because Value is !Send
let end_time = Instant::now();
let mut state = self.state.try_lock().unwrap();
state.fn_stats.operation_completed(end_time - start_time);
match result {
Ok(_) => Ok(end_time),
Ok(_) => Ok((iteration, end_time)),
Err(LatteError::Cassandra(CassError(CassErrorKind::Overloaded(_)))) => {
// don't stop on overload errors;
// they are being counted by the session stats anyways
Ok(end_time)
Ok((iteration, end_time))
}
Err(e) => Err(e),
}
Expand Down

0 comments on commit 758d616

Please sign in to comment.