Skip to content

Commit

Permalink
Don't throw away the last chunk of data on SIGTERM (Ctrl-C)
Browse files Browse the repository at this point in the history
By moving the SIGTERM logic earlier in the stream
pipeline, now when SIGTERM is received, we emit
and process the last chunk.
  • Loading branch information
pkolaczk committed Aug 5, 2024
1 parent 946f7b0 commit fd20881
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 25 deletions.
4 changes: 0 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::context::CassError;
use crate::stats::BenchmarkStats;
use err_derive::*;
use hdrhistogram::serialization::interval_log::IntervalLogWriterError;
use hdrhistogram::serialization::V2DeflateSerializeError;
Expand Down Expand Up @@ -40,9 +39,6 @@ pub enum LatteError {
#[error(display = "Error writing HDR log: {}", _0)]
HdrLogWrite(#[source] IntervalLogWriterError<V2DeflateSerializeError>),

#[error(display = "Interrupted")]
Interrupted(Box<BenchmarkStats>),

#[error(display = "Failed to launch external editor {}: {}", _0, _1)]
ExternalEditorLaunch(String, std::io::Error),

Expand Down
36 changes: 16 additions & 20 deletions src/exec.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Implementation of the main benchmarking loop

use futures::channel::mpsc::{channel, Receiver, Sender};
use futures::{SinkExt, Stream, StreamExt};
use futures::{pin_mut, SinkExt, Stream, StreamExt};
use itertools::Itertools;
use pin_project::pin_project;
use status_line::StatusLine;
Expand All @@ -12,6 +12,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use tokio::signal::ctrl_c;
use tokio::time::MissedTickBehavior;
use tokio_stream::wrappers::IntervalStream;

Expand Down Expand Up @@ -56,13 +57,14 @@ async fn run_stream<T>(
let sample_size = sampling.count().unwrap_or(u64::MAX);
let sample_duration = sampling.period().unwrap_or(tokio::time::Duration::MAX);

let mut stats_stream = stream
let stats_stream = stream
.map(|_| iter_counter.next())
.take_while(|i| ready(i.is_some()))
// unconstrained to workaround quadratic complexity of buffer_unordered ()
.map(|i| tokio::task::unconstrained(workload.run(i.unwrap())))
.buffer_unordered(concurrency.get())
.inspect(|_| progress.tick())
.take_until(ctrl_c())
.terminate_after_error()
.chunks_aggregated(sample_size, sample_duration, Vec::new, |errors, result| {
if let Err(e) = result {
Expand All @@ -71,6 +73,8 @@ async fn run_stream<T>(
})
.map(|errors| (workload.take_stats(Instant::now()), errors));

pin_mut!(stats_stream);

workload.reset(Instant::now());
while let Some((stats, errors)) = stats_stream.next().await {
if out.send(Ok(stats)).await.is_err() {
Expand Down Expand Up @@ -215,25 +219,17 @@ pub async fn par_execute(
}

loop {
tokio::select! {
partial_stats = receive_one_of_each(&mut streams) => {
let partial_stats: Vec<_> = partial_stats.into_iter().try_collect()?;
if partial_stats.is_empty() {
break Ok(stats.finish());
}

let aggregate = stats.record(&partial_stats);
if sampling.is_bounded() {
progress.set_visible(false);
println!("{aggregate}");
progress.set_visible(show_progress);
}
}
let partial_stats = receive_one_of_each(&mut streams).await;
let partial_stats: Vec<_> = partial_stats.into_iter().try_collect()?;
if partial_stats.is_empty() {
break Ok(stats.finish());
}

_ = tokio::signal::ctrl_c() => {
progress.set_visible(false);
break Err(LatteError::Interrupted(Box::new(stats.finish())));
}
let aggregate = stats.record(&partial_stats);
if sampling.is_bounded() {
progress.set_visible(false);
println!("{aggregate}");
progress.set_visible(show_progress);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ async fn run(conf: RunCommand) -> Result<()> {
.await
{
Ok(stats) => stats,
Err(LatteError::Interrupted(stats)) => *stats,
Err(e) => {
return Err(e);
}
Expand Down

0 comments on commit fd20881

Please sign in to comment.