Skip to content

Commit

Permalink
chore(cli): optimize output stats & progress
Browse files Browse the repository at this point in the history
  • Loading branch information
everpcpc committed Apr 28, 2023
1 parent 01834e6 commit fb26430
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 119 deletions.
44 changes: 39 additions & 5 deletions cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,38 @@ pub struct Config {
#[serde(default)]
pub connection: ConnectionConfig,
#[serde(default)]
pub settings: Settings,
pub settings: SettingsConfig,
}

#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize, Default)]
#[serde(default)]
pub struct SettingsConfig {
pub display_pretty_sql: Option<bool>,
pub prompt: Option<String>,
pub progress_color: Option<String>,
pub show_progress: Option<bool>,
pub show_stats: Option<bool>,
}

#[derive(Clone, Debug)]
pub struct Settings {
pub display_pretty_sql: bool,
pub prompt: String,
pub progress_color: String,

/// Show progress [bar] when executing queries.
/// Only works in non-interactive mode.
/// Only works with output format `table` and `null`.
pub show_progress: bool,

/// Show stats after executing queries.
/// Only works with non-interactive mode.
pub show_stats: bool,

/// Output format is set by the flag.
#[serde(skip)]
pub output_format: OutputFormat,
#[serde(skip)]

/// Show time elapsed when executing queries.
/// only works with output format `null`.
pub time: bool,
}

Expand All @@ -56,12 +70,31 @@ pub enum OutputFormat {
}

impl Settings {
pub fn merge_config(&mut self, cfg: SettingsConfig) {
if let Some(display_pretty_sql) = cfg.display_pretty_sql {
self.display_pretty_sql = display_pretty_sql;
}
if let Some(prompt) = cfg.prompt {
self.prompt = prompt;
}
if let Some(progress_color) = cfg.progress_color {
self.progress_color = progress_color;
}
if let Some(show_progress) = cfg.show_progress {
self.show_progress = show_progress;
}
if let Some(show_stats) = cfg.show_stats {
self.show_stats = show_stats;
}
}

pub fn inject_ctrl_cmd(&mut self, cmd_name: &str, cmd_value: &str) -> Result<()> {
match cmd_name {
"display_pretty_sql" => self.display_pretty_sql = cmd_value.parse()?,
"prompt" => self.prompt = cmd_value.to_string(),
"progress_color" => self.progress_color = cmd_value.to_string(),
"show_progress" => self.show_progress = cmd_value.parse()?,
"show_stats" => self.show_stats = cmd_value.parse()?,
"output_format" => {
self.output_format = match cmd_value.to_ascii_lowercase().as_str() {
"table" => OutputFormat::Table,
Expand Down Expand Up @@ -118,6 +151,7 @@ impl Default for Settings {
prompt: "{user}@{host}> ".to_string(),
output_format: OutputFormat::Table,
show_progress: false,
show_stats: false,
time: false,
}
}
Expand Down
199 changes: 108 additions & 91 deletions cli/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub trait ChunkDisplay {
fn total_rows(&self) -> usize;
}

pub struct ReplDisplay<'a> {
pub struct FormatDisplay<'a> {
settings: &'a Settings,
query: &'a str,
schema: SchemaRef,
Expand All @@ -46,9 +46,10 @@ pub struct ReplDisplay<'a> {
rows: usize,
progress: Option<ProgressBar>,
start: Instant,
stats: Option<QueryProgress>,
}

impl<'a> ReplDisplay<'a> {
impl<'a> FormatDisplay<'a> {
pub fn new(
settings: &'a Settings,
query: &'a str,
Expand All @@ -64,147 +65,163 @@ impl<'a> ReplDisplay<'a> {
rows: 0,
progress: None,
start,
stats: None,
}
}
}

#[async_trait::async_trait]
impl<'a> ChunkDisplay for ReplDisplay<'a> {
async fn display(&mut self) -> Result<()> {
let mut rows: Vec<Row> = Vec::new();
let mut progress = QueryProgress::default();
impl<'a> FormatDisplay<'a> {
async fn display_progress(&mut self, pg: &QueryProgress) {
if self.settings.show_progress {
let pgo = self.progress.take();
self.progress = Some(display_read_progress(pgo, pg));
}
}

async fn display_table(&mut self) -> Result<()> {
if self.settings.display_pretty_sql {
let format_sql = format_query(self.query);
let format_sql = CliHelper::new().highlight(&format_sql, format_sql.len());
println!("\n{}\n", format_sql);
}

let mut rows = Vec::new();
while let Some(line) = self.data.next().await {
match line {
Ok(RowWithProgress::Progress(pg)) => {
let pgo = self.progress.take();
self.progress = Some(display_read_progress(pgo, &pg));
progress = pg;
}
Ok(RowWithProgress::Row(row)) => {
rows.push(row);
self.rows += 1;
rows.push(row);
}
Ok(RowWithProgress::Progress(pg)) => {
self.display_progress(&pg).await;
self.stats = Some(pg);
}
Err(e) => {
eprintln!("error: {}", e);
Err(err) => {
eprintln!("error: {}", err);
break;
}
}
}

if let Some(pb) = self.progress.take() {
pb.finish_and_clear();
}
if !rows.is_empty() {
println!("{}", create_table(self.schema.clone(), &rows)?);
println!();
}

progress.normalize();
let rows_str = if self.rows > 1 { "rows" } else { "row" };
println!(
"{} {} in {:.3} sec. Processed {} rows, {} ({} rows/s, {}/s)",
self.rows,
rows_str,
self.start.elapsed().as_secs_f64(),
humanize_count(progress.total_rows as f64),
HumanBytes(progress.total_rows as u64),
humanize_count(progress.total_rows as f64 / self.start.elapsed().as_secs_f64()),
HumanBytes((progress.total_bytes as f64 / self.start.elapsed().as_secs_f64()) as u64),
);
println!();

Ok(())
}

fn total_rows(&self) -> usize {
self.rows
async fn display_csv(&mut self) -> Result<()> {
let mut wtr = csv::WriterBuilder::new()
.quote_style(csv::QuoteStyle::Necessary)
.from_writer(std::io::stdout());
while let Some(line) = self.data.next().await {
match line {
Ok(RowWithProgress::Row(row)) => {
self.rows += 1;
let record = row.into_iter().map(|v| v.to_string()).collect::<Vec<_>>();
wtr.write_record(record)?;
}
Ok(RowWithProgress::Progress(pg)) => {
self.stats = Some(pg);
}
Err(err) => {
eprintln!("error: {}", err);
break;
}
}
}
Ok(())
}
}

pub struct FormatDisplay<'a> {
settings: &'a Settings,
schema: SchemaRef,
data: RowProgressIterator,

rows: usize,
_progress: Option<ProgressBar>,
_start: Instant,
}

impl<'a> FormatDisplay<'a> {
pub fn new(
settings: &'a Settings,
start: Instant,
schema: SchemaRef,
data: RowProgressIterator,
) -> Self {
Self {
settings,
schema,
data,
rows: 0,
_progress: None,
_start: start,
async fn display_tsv(&mut self) -> Result<()> {
let mut wtr = csv::WriterBuilder::new()
.delimiter(b'\t')
.quote_style(csv::QuoteStyle::Necessary)
.from_writer(std::io::stdout());
while let Some(line) = self.data.next().await {
match line {
Ok(RowWithProgress::Row(row)) => {
self.rows += 1;
let record = row.into_iter().map(|v| v.to_string()).collect::<Vec<_>>();
wtr.write_record(record)?;
}
Ok(RowWithProgress::Progress(pg)) => {
self.stats = Some(pg);
}
Err(err) => {
eprintln!("error: {}", err);
break;
}
}
}
Ok(())
}
}

#[async_trait::async_trait]
impl<'a> ChunkDisplay for FormatDisplay<'a> {
async fn display(&mut self) -> Result<()> {
let mut rows = Vec::new();
async fn display_null(&mut self) -> Result<()> {
while let Some(line) = self.data.next().await {
match line {
Ok(RowWithProgress::Row(row)) => {
rows.push(row);
Ok(RowWithProgress::Row(_)) => {
self.rows += 1;
}
Ok(_) => {}
Ok(RowWithProgress::Progress(pg)) => {
self.display_progress(&pg).await;
self.stats = Some(pg);
}
Err(err) => {
eprintln!("error: {}", err);
break;
}
}
}
if rows.is_empty() {
return Ok(());
if let Some(pb) = self.progress.take() {
pb.finish_and_clear();
}
if self.settings.time {
println!("{:.3}", self.start.elapsed().as_secs_f64());
}
Ok(())
}

async fn display_stats(&mut self) {
if !self.settings.show_stats {
return;
}
if let Some(ref mut stats) = self.stats {
stats.normalize();
let rows_str = if self.rows > 1 { "rows" } else { "row" };
eprintln!(
"{} {} in {:.3} sec. Processed {} rows, {} ({} rows/s, {}/s)",
self.rows,
rows_str,
self.start.elapsed().as_secs_f64(),
humanize_count(stats.total_rows as f64),
HumanBytes(stats.total_rows as u64),
humanize_count(stats.total_rows as f64 / self.start.elapsed().as_secs_f64()),
HumanBytes((stats.total_bytes as f64 / self.start.elapsed().as_secs_f64()) as u64),
);
eprintln!();
}
}
}

#[async_trait::async_trait]
impl<'a> ChunkDisplay for FormatDisplay<'a> {
async fn display(&mut self) -> Result<()> {
match self.settings.output_format {
OutputFormat::Table => {
println!("{}", create_table(self.schema.clone(), &rows)?);
self.display_table().await?;
}
OutputFormat::CSV => {
let mut wtr = csv::WriterBuilder::new()
.quote_style(csv::QuoteStyle::Necessary)
.from_writer(std::io::stdout());
for row in rows {
let record = row.into_iter().map(|v| v.to_string()).collect::<Vec<_>>();
wtr.write_record(record)?;
}
self.display_csv().await?;
}
OutputFormat::TSV => {
let mut wtr = csv::WriterBuilder::new()
.delimiter(b'\t')
.quote_style(csv::QuoteStyle::Necessary)
.from_writer(std::io::stdout());
for row in rows {
let record = row.into_iter().map(|v| v.to_string()).collect::<Vec<_>>();
wtr.write_record(record)?;
}
self.display_tsv().await?;
}
OutputFormat::Null => {
self.display_null().await?;
}
OutputFormat::Null => {}
}

if self.settings.time {
eprintln!("{:.3}", self._start.elapsed().as_secs_f64());
}
self.display_stats().await;
Ok(())
}

Expand Down
Loading

0 comments on commit fb26430

Please sign in to comment.