diff --git a/src/iteration.rs b/src/iteration.rs index e15ec48..e7ecae7 100644 --- a/src/iteration.rs +++ b/src/iteration.rs @@ -55,8 +55,8 @@ impl IterationCounter { /// Provides distinct benchmark iteration numbers to multiple threads of execution. /// Decides when to stop the benchmark execution. pub struct BoundedIterationCounter { + pub duration: config::Duration, start_time: Instant, - duration: config::Duration, iteration_counter: IterationCounter, } @@ -65,8 +65,8 @@ impl BoundedIterationCounter { /// For time-based deadline, the clock starts ticking when this object is created. pub fn new(duration: config::Duration) -> Self { BoundedIterationCounter { - start_time: Instant::now(), duration, + start_time: Instant::now(), iteration_counter: IterationCounter::new(0), } } diff --git a/src/main.rs b/src/main.rs index 5235d09..a1db667 100644 --- a/src/main.rs +++ b/src/main.rs @@ -64,44 +64,58 @@ async fn send_stats(workload: &Workload, tx: &mut Sender>) /// Responsible for periodically getting a snapshot of statistics from the `workload` /// and sending them to the `output` channel. The sampling period is controlled by `sampling`. +/// Snapshot is not taken near the end of the run to avoid small final sample. struct Snapshotter<'a> { + run_duration: config::Duration, sampling: config::Duration, workload: &'a Workload, output: &'a mut Sender>, + start_time: Instant, last_snapshot_time: Instant, last_snapshot_iter: u64, - current_iter: u64, } impl<'a> Snapshotter<'a> { pub fn new( + run_duration: config::Duration, sampling: config::Duration, workload: &'a Workload, output: &'a mut Sender>, ) -> Snapshotter<'a> { + let start_time = Instant::now(); Snapshotter { + run_duration, sampling, workload, output, - last_snapshot_time: Instant::now(), + start_time, + last_snapshot_time: start_time, last_snapshot_iter: 0, - current_iter: 0, } } /// Should be called when a workload iteration finished. /// If there comes the time, it will send the stats to the output. - pub async fn iteration_completed(&mut self, end_time: Instant) { - self.current_iter += 1; + pub async fn iteration_completed(&mut self, iteration: u64, now: Instant) { + let current_interval_duration = now - self.last_snapshot_time; + let current_interval_iter_count = iteration - self.last_snapshot_iter; + + // Don't snapshot if we're too close to the end of the run, + // to avoid excessively small samples: + let far_from_the_end = match self.run_duration { + config::Duration::Time(d) => now < self.start_time + d - current_interval_duration / 2, + config::Duration::Count(count) => iteration < count - current_interval_iter_count / 2, + }; + match self.sampling { config::Duration::Time(d) => { - if end_time - self.last_snapshot_time > d { + if now > self.last_snapshot_time + d && far_from_the_end { send_stats(self.workload, self.output).await; self.last_snapshot_time += d; } } config::Duration::Count(cnt) => { - if self.current_iter - self.last_snapshot_iter > cnt { + if iteration > self.last_snapshot_iter + cnt && far_from_the_end { send_stats(self.workload, self.output).await; self.last_snapshot_iter += cnt; } @@ -121,9 +135,10 @@ impl<'a> Snapshotter<'a> { /// - progress: progress bar notified about each successful iteration /// - interrupt: allows for terminating the stream early /// - out: the channel to receive workload statistics -async fn run_stream( - stream: impl Stream> + std::marker::Unpin, +async fn run_stream( + stream: impl Stream + std::marker::Unpin, workload: Workload, + iter_counter: BoundedIterationCounter, concurrency: NonZeroUsize, sampling: Option, interrupt: Arc, @@ -132,19 +147,23 @@ async fn run_stream( ) { workload.reset(Instant::now()); + let mut iter_counter = iter_counter; + let mut snapshotter = + sampling.map(|s| Snapshotter::new(iter_counter.duration, s, &workload, &mut out)); + let mut result_stream = stream + .map(|_| iter_counter.next()) .take_while(|i| ready(i.is_some())) // unconstrained to workaround quadratic complexity of buffer_unordered () - .map(|i| tokio::task::unconstrained(workload.run(i.unwrap() as i64))) + .map(|i| tokio::task::unconstrained(workload.run(i.unwrap()))) .buffer_unordered(concurrency.get()) .inspect(|_| progress.tick()); - let mut snapshotter = sampling.map(|s| Snapshotter::new(s, &workload, &mut out)); while let Some(res) = result_stream.next().await { match res { - Ok(end_time) => { + Ok((iter, end_time)) => { if let Some(snapshotter) = &mut snapshotter { - snapshotter.iteration_completed(end_time).await + snapshotter.iteration_completed(iter, end_time).await } } Err(e) => { @@ -172,20 +191,20 @@ fn spawn_stream( rate: Option, sampling: Option, workload: Workload, - deadline: BoundedIterationCounter, + iter_counter: BoundedIterationCounter, interrupt: Arc, progress: Arc>, ) -> Receiver> { let (tx, rx) = mpsc::channel(1); - let mut deadline = deadline; tokio::spawn(async move { match rate { Some(rate) => { - let stream = interval_stream(rate).map(|_| deadline.next()); + let stream = interval_stream(rate); run_stream( stream, workload, + iter_counter, concurrency, sampling, interrupt, @@ -195,10 +214,11 @@ fn spawn_stream( .await } None => { - let stream = futures::stream::repeat_with(|| deadline.next()); + let stream = futures::stream::repeat_with(|| ()); run_stream( stream, workload, + iter_counter, concurrency, sampling, interrupt, @@ -269,10 +289,6 @@ async fn par_execute( }; let progress = Arc::new(StatusLine::with_options(progress, progress_opts)); let deadline = BoundedIterationCounter::new(exec_options.duration); - let sampling_period = sampling_period.map(|s| match s { - config::Duration::Count(cnt) => config::Duration::Count(cnt / thread_count as u64), - config::Duration::Time(d) => config::Duration::Time(d), - }); let mut streams = Vec::with_capacity(thread_count); let mut stats = Recorder::start(rate, concurrency); diff --git a/src/report.rs b/src/report.rs index 0fd2ce6..cabfe81 100644 --- a/src/report.rs +++ b/src/report.rs @@ -534,7 +534,7 @@ impl Display for Sample { write!( f, "{:8.3} {:11.0} {:11.0} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3} {:9.3}", - self.time_s, + self.time_s + self.duration_s, self.call_throughput, self.req_throughput, self.resp_time_percentiles[Percentile::Min as usize], diff --git a/src/workload.rs b/src/workload.rs index 39e644c..b31df88 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -384,24 +384,24 @@ impl Workload { /// Executes a single iteration of a workload. /// This should be idempotent – /// the generated action should be a function of the iteration number. - /// Returns the end time of the query. - pub async fn run(&self, iteration: i64) -> Result { + /// Returns the iteration number and the end time of the query. + pub async fn run(&self, iteration: u64) -> Result<(u64, Instant), LatteError> { let start_time = Instant::now(); let session = SessionRef::new(&self.session); let result = self .program - .async_call(self.function, (session, iteration)) + .async_call(self.function, (session, iteration as i64)) .await .map(|_| ()); // erase Value, because Value is !Send let end_time = Instant::now(); let mut state = self.state.try_lock().unwrap(); state.fn_stats.operation_completed(end_time - start_time); match result { - Ok(_) => Ok(end_time), + Ok(_) => Ok((iteration, end_time)), Err(LatteError::Cassandra(CassError(CassErrorKind::Overloaded(_)))) => { // don't stop on overload errors; // they are being counted by the session stats anyways - Ok(end_time) + Ok((iteration, end_time)) } Err(e) => Err(e), }