Skip to content
Merged
1 change: 1 addition & 0 deletions datafusion-cli/examples/cli-session-context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub async fn main() {
format: datafusion_cli::print_format::PrintFormat::Automatic,
quiet: false,
maxrows: datafusion_cli::print_options::MaxRows::Unlimited,
stop_after_max_rows: false,
color: true,
};

Expand Down
33 changes: 26 additions & 7 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

//! Execution functions

use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;

use crate::cli_context::CliSessionContext;
use crate::helper::split_from_semicolon;
use crate::print_format::PrintFormat;
Expand All @@ -31,6 +26,11 @@ use crate::{
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};
use futures::StreamExt;
use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;

use datafusion::common::instant::Instant;
use datafusion::common::{plan_datafusion_err, plan_err};
Expand All @@ -39,10 +39,12 @@ use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::execution::memory_pool::MemoryConsumer;
use datafusion::physical_plan::spill::get_record_batch_memory_size;
use datafusion::sql::sqlparser;
use rustyline::error::ReadlineError;
use rustyline::Editor;
Expand Down Expand Up @@ -235,6 +237,10 @@ pub(super) async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

// Track memory usage for the query result if it's bounded
let mut reservation =
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());

if physical_plan.boundedness().is_unbounded() {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
Expand All @@ -249,8 +255,21 @@ pub(super) async fn exec_and_print(
} else {
// Bounded stream; collected results are printed after all input consumed.
let schema = physical_plan.schema();
let results = collect(physical_plan, task_ctx.clone()).await?;
let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
let mut results = vec![];
while let Some(batch) = stream.next().await {
let batch = batch?;
reservation.try_grow(get_record_batch_memory_size(&batch))?;
results.push(batch);
if let MaxRows::Limited(max_rows) = print_options.maxrows {
// Stop collecting results if the number of rows exceeds the limit
if results.len() >= max_rows && print_options.stop_after_max_rows {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you break here it will stop the plan early -- then you need to add stop_after_max_rows to permit the plan to run to completion, however, if that option is set then the data is buffered again (which from a memory perspective is the same as increasing the max_rows)

Rather than breaking here, I recommend just ignoring the batches after max_rows has been reached (aka don't push them into results) -- I think that would be the most intuitive behavior:

  1. Queries would still always run to completion
  2. Max rows would control how much memory was used by datafusion-cli buffering

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @alamb for review and the great idea! It makes sense to me, addressed in latest PR.

break;
}
}
}
adjusted.into_inner().print_batches(schema, &results, now)?;
reservation.free();
}
}

Expand Down
8 changes: 8 additions & 0 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ struct Args {
)]
maxrows: MaxRows,

#[clap(
short,
long,
help = "Whether to stop early when max rows is reached, this will help reduce the memory usage when the result is large"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this option is basically the same as adding a LIMIT max_rows to the query and thus is somewhat redundant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point, remove it now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The option appears to still be present

)]
stop_after_max_rows: bool,

#[clap(long, help = "Enables console syntax highlighting")]
color: bool,
}
Expand Down Expand Up @@ -186,6 +193,7 @@ async fn main_inner() -> Result<()> {
quiet: args.quiet,
maxrows: args.maxrows,
color: args.color,
stop_after_max_rows: args.stop_after_max_rows,
};

let commands = args.command;
Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub struct PrintOptions {
pub quiet: bool,
pub maxrows: MaxRows,
pub color: bool,
pub stop_after_max_rows: bool,
}

// Returns the query execution details formatted
Expand Down
3 changes: 3 additions & 0 deletions docs/source/user-guide/cli/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ OPTIONS:
The max number of rows to display for 'Table' format
[possible values: numbers(0/10/...), inf(no limit)] [default: 40]

--stop-after-max-rows
Enable to stop early when maxrows is reached, default to false.

--mem-pool-type <MEM_POOL_TYPE>
Specify the memory pool type 'greedy' or 'fair', default to 'greedy'

Expand Down
Loading