Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support newlines_in_values CSV option #11533

Merged
merged 10 commits into from
Jul 21, 2024
13 changes: 13 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ config_namespace! {
/// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
pub has_header: bool, default = false

/// Default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
// if not specified explicitly in the statement.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more descriptive comment help for users without context to understand this config option, which should include 1. It's CSV specific 2. Can be overridden by the same config field in CsvOptions 3. Its behavior as stated in CsvOptions's comment.

And update in https://github.com/apache/datafusion/blob/main/docs/source/user-guide/configs.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've expanded the documentation. I forgot about the user guide so I'll do that now as well.

pub newlines_in_values: bool, default = false
}
}

Expand Down Expand Up @@ -1593,6 +1597,7 @@ config_namespace! {
pub quote: u8, default = b'"'
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: usize, default = 100
pub date_format: Option<String>, default = None
Expand Down Expand Up @@ -1665,6 +1670,14 @@ impl CsvOptions {
self
}

/// Set true to ensure that newlines in (quoted) values are supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Set true to ensure that newlines in (quoted) values are supported.
/// Set true to support newlines in (quoted) values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The phrasing was a bit awkward initially as I was thinking of the flag as "ensuring support", rather than directly "supporting", since newlines in values are already supported if the file is below the datafusion.optimizer.repartition_file_min_size.

I'm not sure if it's worth conveying that detail through these docs, or else to document this as providing support and treat the fact that newlines in values will "just work" for smaller files as an implementation detail that might change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've expanded and normalised the documentation. I've gone for documenting the flag as enabling support for newlines in values, with the fact that newlines in values might work without it left as an implementation-defined detail.

/// Note that setting this may reduce performance as large file scans will not be repartitioned.
/// - default is None
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = Some(newlines_in_values);
self
}

/// Set a `CompressionTypeVariant` of CSV
/// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
pub fn with_file_compression_type(
Expand Down
46 changes: 46 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ impl CsvFormat {
self
}

/// Set true to ensure that newlines in (quoted) values are supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Set true to ensure that newlines in (quoted) values are supported.
/// Set true to support newlines in (quoted) values.

/// Note that setting this may reduce performance as large file scans will not be repartitioned.
/// - default is None
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.options.newlines_in_values = Some(newlines_in_values);
self
}

/// Set a `FileCompressionType` of CSV
/// - defaults to `FileCompressionType::UNCOMPRESSED`
pub fn with_file_compression_type(
Expand Down Expand Up @@ -330,6 +338,9 @@ impl FileFormat for CsvFormat {
self.options.quote,
self.options.escape,
self.options.comment,
self.options
.newlines_in_values
.unwrap_or(state.config_options().catalog.newlines_in_values),
self.options.compression.into(),
);
Ok(Arc::new(exec))
Expand Down Expand Up @@ -1052,6 +1063,41 @@ mod tests {
Ok(())
}

#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn test_csv_parallel_newlines_in_values(n_partitions: usize) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let csv_options = CsvReadOptions::default()
.has_header(true)
.newlines_in_values(true);
let ctx = SessionContext::new_with_config(config);
let testdata = arrow_test_data();
ctx.register_csv(
"aggr",
&format!("{testdata}/csv/aggregate_test_100.csv"),
csv_options,
)
.await?;

let query = "select sum(c3) from aggr;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = ["+--------------+",
"| sum(aggr.c3) |",
"+--------------+",
"| 781 |",
"+--------------+"];
assert_batches_eq!(expected, &query_result);
assert_eq!(1, actual_partitions); // csv won't be scanned in parallel when newlines_in_values is set

Ok(())
}

/// Read a single empty csv file in parallel
///
/// empty_0_byte.csv:
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub struct CsvReadOptions<'a> {
pub escape: Option<u8>,
/// If enabled, lines beginning with this byte are ignored.
pub comment: Option<u8>,
/// Does the CSV file contain newlines in values?
///
/// If enabled, parallel scanning will be disabled.
pub newlines_in_values: bool,
/// An optional schema representing the CSV files. If None, CSV reader will try to infer it
/// based on data in file.
pub schema: Option<&'a Schema>,
Expand Down Expand Up @@ -95,6 +99,7 @@ impl<'a> CsvReadOptions<'a> {
delimiter: b',',
quote: b'"',
escape: None,
newlines_in_values: false,
file_extension: DEFAULT_CSV_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
Expand Down Expand Up @@ -133,6 +138,12 @@ impl<'a> CsvReadOptions<'a> {
self
}

/// Specify whether to support newlines in values.
pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = newlines_in_values;
self
}

/// Specify the file extension for CSV file selection
pub fn file_extension(mut self, file_extension: &'a str) -> Self {
self.file_extension = file_extension;
Expand Down Expand Up @@ -490,6 +501,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_delimiter(self.delimiter)
.with_quote(self.quote)
.with_escape(self.escape)
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());

Expand Down
22 changes: 19 additions & 3 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub struct CsvExec {
quote: u8,
escape: Option<u8>,
comment: Option<u8>,
newlines_in_values: bool,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Compression type of the file associated with CsvExec
Expand All @@ -75,6 +76,7 @@ impl CsvExec {
quote: u8,
escape: Option<u8>,
comment: Option<u8>,
newlines_in_values: bool,
file_compression_type: FileCompressionType,
) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
Expand All @@ -91,6 +93,7 @@ impl CsvExec {
delimiter,
quote,
escape,
newlines_in_values,
metrics: ExecutionPlanMetricsSet::new(),
file_compression_type,
cache,
Expand Down Expand Up @@ -126,6 +129,13 @@ impl CsvExec {
self.escape
}

/// Whether newlines are always supported in values.
///
/// When set, this will disable file repartitioning.
pub fn newlines_in_values(&self) -> bool {
self.newlines_in_values
}

fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}
Expand Down Expand Up @@ -196,15 +206,15 @@ impl ExecutionPlan for CsvExec {
/// Redistribute files across partitions according to their size
/// See comments on [`FileGroupPartitioner`] for more detail.
///
/// Return `None` if can't get repartitioned(empty/compressed file).
/// Return `None` if can't get repartitioned (empty, compressed file, or `newlines_in_values` set).
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
// Parallel execution on compressed CSV file is not supported yet.
if self.file_compression_type.is_compressed() {
// Parallel execution on compressed CSV files or files that must support newlines in values is not supported yet.
if self.file_compression_type.is_compressed() || self.newlines_in_values {
return Ok(None);
}

Expand Down Expand Up @@ -589,6 +599,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -658,6 +669,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -727,6 +739,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -793,6 +806,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(14, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -858,6 +872,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -953,6 +968,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,7 @@ pub(crate) mod tests {
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand All @@ -1496,6 +1497,7 @@ pub(crate) mod tests {
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down Expand Up @@ -3770,6 +3772,7 @@ pub(crate) mod tests {
b'"',
None,
None,
false,
compression_type,
)),
vec![("a".to_string(), "a".to_string())],
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ fn try_swapping_with_csv(
csv.quote(),
csv.escape(),
csv.comment(),
csv.newlines_in_values(),
csv.file_compression_type,
)) as _
})
Expand Down Expand Up @@ -1700,6 +1701,7 @@ mod tests {
0,
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand All @@ -1723,6 +1725,7 @@ mod tests {
0,
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,7 @@ mod tests {
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result<Arc<Cs
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
)))
}
Expand Down Expand Up @@ -283,6 +284,7 @@ pub fn csv_exec_sorted(
0,
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down Expand Up @@ -339,6 +341,7 @@ pub fn csv_exec_ordered(
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ message CsvOptions {
string null_value = 12; // Optional representation of null value
bytes comment = 13; // Optional comment character as a byte
bytes double_quote = 14; // Indicates if quotes are doubled
bytes newlines_in_values = 15; // Indicates if newlines are supported in values
}

// Options controlling CSV format
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
quote: proto_opts.quote[0],
escape: proto_opts.escape.first().copied(),
double_quote: proto_opts.has_header.first().map(|h| *h != 0),
newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
compression: proto_opts.compression().into(),
schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize,
date_format: (!proto_opts.date_format.is_empty())
Expand Down
Loading
Loading