diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index f0eb58a233910..fc7d1a2617cf6 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -62,7 +62,9 @@ impl Command { Self::Help => { let now = Instant::now(); let command_batch = all_commands_info(); - print_options.print_batches(command_batch.schema(), &[command_batch], now) + let schema = command_batch.schema(); + let num_rows = command_batch.num_rows(); + print_options.print_batches(schema, &[command_batch], now, num_rows) } Self::ListTables => { exec_and_print(ctx, print_options, "SHOW TABLES".into()).await diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index a4f154b2de92d..84664794b7d92 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -17,11 +17,6 @@ //! Execution functions -use std::collections::HashMap; -use std::fs::File; -use std::io::prelude::*; -use std::io::BufReader; - use crate::cli_context::CliSessionContext; use crate::helper::split_from_semicolon; use crate::print_format::PrintFormat; @@ -31,6 +26,11 @@ use crate::{ object_storage::get_object_store, print_options::{MaxRows, PrintOptions}, }; +use futures::StreamExt; +use std::collections::HashMap; +use std::fs::File; +use std::io::prelude::*; +use std::io::BufReader; use datafusion::common::instant::Instant; use datafusion::common::{plan_datafusion_err, plan_err}; @@ -39,10 +39,12 @@ use datafusion::datasource::listing::ListingTableUrl; use datafusion::error::{DataFusionError, Result}; use datafusion::logical_expr::{DdlStatement, LogicalPlan}; use datafusion::physical_plan::execution_plan::EmissionType; -use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties}; +use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties}; use datafusion::sql::parser::{DFParser, Statement}; use datafusion::sql::sqlparser::dialect::dialect_from_str; +use datafusion::execution::memory_pool::MemoryConsumer; +use datafusion::physical_plan::spill::get_record_batch_memory_size; use datafusion::sql::sqlparser; use rustyline::error::ReadlineError; use rustyline::Editor; @@ -235,6 +237,10 @@ pub(super) async fn exec_and_print( let df = ctx.execute_logical_plan(plan).await?; let physical_plan = df.create_physical_plan().await?; + // Track memory usage for the query result if it's bounded + let mut reservation = + MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool()); + if physical_plan.boundedness().is_unbounded() { if physical_plan.pipeline_behavior() == EmissionType::Final { return plan_err!( @@ -247,10 +253,29 @@ pub(super) async fn exec_and_print( let stream = execute_stream(physical_plan, task_ctx.clone())?; print_options.print_stream(stream, now).await?; } else { - // Bounded stream; collected results are printed after all input consumed. + // Bounded stream; collected results size is limited by the maxrows option let schema = physical_plan.schema(); - let results = collect(physical_plan, task_ctx.clone()).await?; - adjusted.into_inner().print_batches(schema, &results, now)?; + let mut stream = execute_stream(physical_plan, task_ctx.clone())?; + let mut results = vec![]; + let mut row_count = 0_usize; + while let Some(batch) = stream.next().await { + let batch = batch?; + let curr_num_rows = batch.num_rows(); + if let MaxRows::Limited(max_rows) = print_options.maxrows { + // Stop collecting results if the number of rows exceeds the limit + // results batch should include the last batch that exceeds the limit + if row_count < max_rows + curr_num_rows { + // Try to grow the reservation to accommodate the batch in memory + reservation.try_grow(get_record_batch_memory_size(&batch))?; + results.push(batch); + } + } + row_count += curr_num_rows; + } + adjusted + .into_inner() + .print_batches(schema, &results, now, row_count)?; + reservation.free(); } } diff --git a/datafusion-cli/src/print_options.rs b/datafusion-cli/src/print_options.rs index e80cc55663ae2..9557e783e8a7c 100644 --- a/datafusion-cli/src/print_options.rs +++ b/datafusion-cli/src/print_options.rs @@ -102,6 +102,7 @@ impl PrintOptions { schema: SchemaRef, batches: &[RecordBatch], query_start_time: Instant, + row_count: usize, ) -> Result<()> { let stdout = std::io::stdout(); let mut writer = stdout.lock(); @@ -109,7 +110,6 @@ impl PrintOptions { self.format .print_batches(&mut writer, schema, batches, self.maxrows, true)?; - let row_count: usize = batches.iter().map(|b| b.num_rows()).sum(); let formatted_exec_details = get_execution_details_formatted( row_count, if self.format == PrintFormat::Table {