Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion datafusion-cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,15 @@ impl Command {
let command_batch = all_commands_info();
let schema = command_batch.schema();
let num_rows = command_batch.num_rows();
print_options.print_batches(schema, &[command_batch], now, num_rows)
let task_ctx = ctx.task_ctx();
let config = &task_ctx.session_config().options().format;
print_options.print_batches(
schema,
&[command_batch],
now,
num_rows,
config,
)
}
Self::ListTables => {
exec_and_print(ctx, print_options, "SHOW TABLES".into()).await
Expand Down
17 changes: 12 additions & 5 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ pub(super) async fn exec_and_print(
) -> Result<()> {
let now = Instant::now();
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let options = task_ctx.session_config().options();
let dialect = &options.sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Expand Down Expand Up @@ -250,7 +251,9 @@ pub(super) async fn exec_and_print(
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
print_options
.print_stream(stream, now, &options.format)
.await?;
} else {
// Bounded stream; collected results size is limited by the maxrows option
let schema = physical_plan.schema();
Expand All @@ -273,9 +276,13 @@ pub(super) async fn exec_and_print(
}
row_count += curr_num_rows;
}
adjusted
.into_inner()
.print_batches(schema, &results, now, row_count)?;
adjusted.into_inner().print_batches(
schema,
&results,
now,
row_count,
&options.format,
)?;
reservation.free();
}
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ fn get_session_config(args: &Args) -> Result<SessionConfig> {
config_options.explain.format = String::from("tree");
}

// in the CLI, we want to show NULL values rather the empty strings
if env::var_os("DATAFUSION_FORMAT_NULL").is_none() {
config_options.format.null = String::from("NULL");
}

let session_config =
SessionConfig::from(config_options).with_information_schema(true);
Ok(session_config)
Expand Down
30 changes: 17 additions & 13 deletions datafusion-cli/src/print_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
use arrow::json::{ArrayWriter, LineDelimitedWriter};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches_with_options;
use datafusion::common::format::DEFAULT_CLI_FORMAT_OPTIONS;
use datafusion::config::FormatOptions;
use datafusion::error::Result;

/// Allow records to be printed in different formats
Expand Down Expand Up @@ -110,7 +110,10 @@ fn format_batches_with_maxrows<W: std::io::Write>(
writer: &mut W,
batches: &[RecordBatch],
maxrows: MaxRows,
format_options: &FormatOptions,
) -> Result<()> {
let options: arrow::util::display::FormatOptions = format_options.try_into()?;

match maxrows {
MaxRows::Limited(maxrows) => {
// Filter batches to meet the maxrows condition
Expand All @@ -131,10 +134,8 @@ fn format_batches_with_maxrows<W: std::io::Write>(
}
}

let formatted = pretty_format_batches_with_options(
&filtered_batches,
&DEFAULT_CLI_FORMAT_OPTIONS,
)?;
let formatted =
pretty_format_batches_with_options(&filtered_batches, &options)?;
if over_limit {
let mut formatted_str = format!("{}", formatted);
formatted_str = keep_only_maxrows(&formatted_str, maxrows);
Expand All @@ -144,8 +145,7 @@ fn format_batches_with_maxrows<W: std::io::Write>(
}
}
MaxRows::Unlimited => {
let formatted =
pretty_format_batches_with_options(batches, &DEFAULT_CLI_FORMAT_OPTIONS)?;
let formatted = pretty_format_batches_with_options(batches, &options)?;
writeln!(writer, "{}", formatted)?;
}
}
Expand All @@ -162,6 +162,7 @@ impl PrintFormat {
batches: &[RecordBatch],
maxrows: MaxRows,
with_header: bool,
format_options: &FormatOptions,
) -> Result<()> {
// filter out any empty batches
let batches: Vec<_> = batches
Expand All @@ -170,7 +171,7 @@ impl PrintFormat {
.cloned()
.collect();
if batches.is_empty() {
return self.print_empty(writer, schema);
return self.print_empty(writer, schema, format_options);
}

match self {
Expand All @@ -182,7 +183,7 @@ impl PrintFormat {
if maxrows == MaxRows::Limited(0) {
return Ok(());
}
format_batches_with_maxrows(writer, &batches, maxrows)
format_batches_with_maxrows(writer, &batches, maxrows, format_options)
}
Self::Json => batches_to_json!(ArrayWriter, writer, &batches),
Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, &batches),
Expand All @@ -194,15 +195,17 @@ impl PrintFormat {
&self,
writer: &mut W,
schema: SchemaRef,
format_options: &FormatOptions,
) -> Result<()> {
match self {
// Print column headers for Table format
Self::Table if !schema.fields().is_empty() => {
let format_options: arrow::util::display::FormatOptions =
format_options.try_into()?;

let empty_batch = RecordBatch::new_empty(schema);
let formatted = pretty_format_batches_with_options(
&[empty_batch],
&DEFAULT_CLI_FORMAT_OPTIONS,
)?;
let formatted =
pretty_format_batches_with_options(&[empty_batch], &format_options)?;
writeln!(writer, "{}", formatted)?;
}
_ => {}
Expand Down Expand Up @@ -644,6 +647,7 @@ mod tests {
&self.batches,
self.maxrows,
with_header,
&FormatOptions::default(),
)
.unwrap();
String::from_utf8(buffer).unwrap()
Expand Down
14 changes: 12 additions & 2 deletions datafusion-cli/src/print_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::common::DataFusionError;
use datafusion::error::Result;
use datafusion::physical_plan::RecordBatchStream;

use datafusion::config::FormatOptions;
use futures::StreamExt;

#[derive(Debug, Clone, PartialEq, Copy)]
Expand Down Expand Up @@ -103,12 +104,19 @@ impl PrintOptions {
batches: &[RecordBatch],
query_start_time: Instant,
row_count: usize,
format_options: &FormatOptions,
) -> Result<()> {
let stdout = std::io::stdout();
let mut writer = stdout.lock();

self.format
.print_batches(&mut writer, schema, batches, self.maxrows, true)?;
self.format.print_batches(
&mut writer,
schema,
batches,
self.maxrows,
true,
format_options,
)?;

let formatted_exec_details = get_execution_details_formatted(
row_count,
Expand All @@ -132,6 +140,7 @@ impl PrintOptions {
&self,
mut stream: Pin<Box<dyn RecordBatchStream>>,
query_start_time: Instant,
format_options: &FormatOptions,
) -> Result<()> {
if self.format == PrintFormat::Table {
return Err(DataFusionError::External(
Expand All @@ -154,6 +163,7 @@ impl PrintOptions {
&[batch],
MaxRows::Unlimited,
with_header,
format_options,
)?;
with_header = false;
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion-cli/tests/cli_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ fn init() {
// can choose the old explain format too
["--command", "EXPLAIN FORMAT indent SELECT 123"],
)]
#[case::change_format_version(
"change_format_version",
["--file", "tests/sql/types_format.sql", "-q"],
)]
#[test]
fn cli_quick_test<'a>(
#[case] snapshot_name: &'a str,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
source: datafusion-cli/tests/cli_integration.rs
info:
program: datafusion-cli
args:
- "--file"
- tests/sql/types_format.sql
- "-q"
---
success: true
exit_code: 0
----- stdout -----
+-----------+
| Int64(54) |
| Int64 |
+-----------+
| 54 |
+-----------+

----- stderr -----
3 changes: 3 additions & 0 deletions datafusion-cli/tests/sql/types_format.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
set datafusion.format.types_info to true;

select 54
82 changes: 69 additions & 13 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@

//! Runtime configuration, via [`ConfigOptions`]

use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::fmt::{self, Display};
use std::str::FromStr;

use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};

/// A macro that wraps a configuration struct and automatically derives
/// [`Default`] and [`ConfigField`] for it, allowing it to be used
/// in the [`ConfigOptions`] configuration tree.
Expand Down Expand Up @@ -759,6 +758,59 @@ impl ExecutionOptions {
}
}

config_namespace! {
/// Options controlling the format of output when printing record batches
/// Copies [`arrow::util::display::FormatOptions`]
pub struct FormatOptions {
/// If set to `true` any formatting errors will be written to the output
/// instead of being converted into a [`std::fmt::Error`]
pub safe: bool, default = true
/// Format string for nulls
pub null: String, default = "".into()
/// Date format for date arrays
pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
/// Format for DateTime arrays
pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
/// Timestamp format for timestamp arrays
pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
/// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
pub timestamp_tz_format: Option<String>, default = None
/// Time format for time arrays
pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
/// Duration format. Can be either `"pretty"` or `"ISO8601"`
pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
/// Show types in visual representation batches
pub types_info: bool, default = false
}
}

impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
type Error = DataFusionError;
fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
let duration_format = match self.duration_format.as_str() {
"pretty" => arrow::util::display::DurationFormat::Pretty,
"iso8601" => arrow::util::display::DurationFormat::ISO8601,
_ => {
return _config_err!(
"Invalid duration format: {}. Valid values are pretty or iso8601",
self.duration_format
)
}
};

Ok(arrow::util::display::FormatOptions::new()
.with_display_error(self.safe)
.with_null(&self.null)
.with_date_format(self.date_format.as_deref())
.with_datetime_format(self.datetime_format.as_deref())
.with_timestamp_format(self.timestamp_format.as_deref())
.with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
.with_time_format(self.time_format.as_deref())
.with_duration_format(duration_format)
.with_types_info(self.types_info))
}
}

/// A key value pair, with a corresponding description
#[derive(Debug)]
pub struct ConfigEntry {
Expand Down Expand Up @@ -788,6 +840,8 @@ pub struct ConfigOptions {
pub explain: ExplainOptions,
/// Optional extensions registered using [`Extensions::insert`]
pub extensions: Extensions,
/// Formatting options when printing batches
pub format: FormatOptions,
}

impl ConfigField for ConfigOptions {
Expand All @@ -800,6 +854,7 @@ impl ConfigField for ConfigOptions {
"optimizer" => self.optimizer.set(rem, value),
"explain" => self.explain.set(rem, value),
"sql_parser" => self.sql_parser.set(rem, value),
"format" => self.format.set(rem, value),
_ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
}
}
Expand All @@ -810,6 +865,7 @@ impl ConfigField for ConfigOptions {
self.optimizer.visit(v, "datafusion.optimizer", "");
self.explain.visit(v, "datafusion.explain", "");
self.sql_parser.visit(v, "datafusion.sql_parser", "");
self.format.visit(v, "datafusion.format", "");
}
}

Expand Down Expand Up @@ -2004,11 +2060,11 @@ config_namespace! {
}
}

pub trait FormatOptionsExt: Display {}
pub trait OutputFormatExt: Display {}

#[derive(Debug, Clone, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum FormatOptions {
pub enum OutputFormat {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to avoid confusion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @blaginin, this PR is really nice. I've made a quick look, and think that perhaps we should rename this config in even all places as "output format" ? Directly saying "datafusion.format" etc. could make people think something related with objects?

Copy link
Collaborator Author

@blaginin blaginin Apr 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, thanks for checking this out! 🙏 I feel like "format" can be better than "output format" because:

  • Below we've discussed that it would be good to add support for these params in UDF functions - so in a way that will be related to the objects.
  • Output format IMO implies that it is something related to the way the file is written - but in reality, it has nothing to do with the CSV /Parquet saving

CSV(CsvOptions),
JSON(JsonOptions),
#[cfg(feature = "parquet")]
Expand All @@ -2017,15 +2073,15 @@ pub enum FormatOptions {
ARROW,
}

impl Display for FormatOptions {
impl Display for OutputFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let out = match self {
FormatOptions::CSV(_) => "csv",
FormatOptions::JSON(_) => "json",
OutputFormat::CSV(_) => "csv",
OutputFormat::JSON(_) => "json",
#[cfg(feature = "parquet")]
FormatOptions::PARQUET(_) => "parquet",
FormatOptions::AVRO => "avro",
FormatOptions::ARROW => "arrow",
OutputFormat::PARQUET(_) => "parquet",
OutputFormat::AVRO => "avro",
OutputFormat::ARROW => "arrow",
};
write!(f, "{}", out)
}
Expand Down
Loading