Skip to content

Commit

Permalink
Support newlines_in_values CSV option (apache#11533)
Browse files Browse the repository at this point in the history
* feat!: support `newlines_in_values` CSV option

This significantly simplifies the UX when dealing with large CSV files
that must support newlines in (quoted) values. By default, large CSV
files will be repartitioned into multiple parallel range scans. This is
great for performance in the common case but when large CSVs contain
newlines in values the parallel scan will fail due to splitting on
newlines within quotes rather than actual line terminators.

With the current implementation, this behaviour can be controlled by the
session-level `datafusion.optimizer.repartition_file_scans` and
`datafusion.optimizer.repartition_file_min_size` settings.

This commit introduces a `newlines_in_values` option to `CsvOptions` and
plumbs it through to `CsvExec`, which includes it in the test for whether
parallel execution is supported. This provides a convenient and
searchable way to disable file scan repartitioning on a per-CSV basis.

BREAKING CHANGE: This adds new public fields to types with all public
fields, which is a breaking change.

* docs: normalise `newlines_in_values` documentation

* test: add/fix sqllogictests for `newlines_in_values`

* docs: document `datafusion.catalog.newlines_in_values`

* fix: typo in config.md

* chore: suppress lint on too many arguments for `CsvExec::new`

* fix: always checkout `*.slt` with LF line endings

This is a bit of a stab in the dark, but it might fix multiline tests on
Windows.

* fix: always checkout `newlines_in_values.csv` with `LF` line endings

The default git behaviour of converting line endings for checked out files causes the `csv_files.slt` test to fail when testing `newlines_in_values`. This appears to be due to the quoted newlines being converted to CRLF, which are not then normalised when the CSV is read. Assuming that the sqllogictests do normalise line endings in the expected output, this could then lead to a "spurious" diff from the actual output.

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
2 people authored and Lordworms committed Jul 23, 2024
1 parent 9104246 commit cd1f9d8
Show file tree
Hide file tree
Showing 23 changed files with 250 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.github/ export-ignore
datafusion/core/tests/data/newlines_in_values.csv text eol=lf
datafusion/proto/src/generated/prost.rs linguist-generated
datafusion/proto/src/generated/pbjson.rs linguist-generated
30 changes: 30 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ config_namespace! {
/// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
pub has_header: bool, default = false

/// Specifies whether newlines in (quoted) CSV values are supported.
///
/// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
pub newlines_in_values: bool, default = false
}
}

Expand Down Expand Up @@ -1593,6 +1603,14 @@ config_namespace! {
pub quote: u8, default = b'"'
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: usize, default = 100
pub date_format: Option<String>, default = None
Expand Down Expand Up @@ -1665,6 +1683,18 @@ impl CsvOptions {
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = Some(newlines_in_values);
self
}

/// Set a `CompressionTypeVariant` of CSV
/// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
pub fn with_file_compression_type(
Expand Down
50 changes: 50 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,18 @@ impl CsvFormat {
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.options.newlines_in_values = Some(newlines_in_values);
self
}

/// Set a `FileCompressionType` of CSV
/// - defaults to `FileCompressionType::UNCOMPRESSED`
pub fn with_file_compression_type(
Expand Down Expand Up @@ -343,6 +355,9 @@ impl FileFormat for CsvFormat {
self.options.quote,
self.options.escape,
self.options.comment,
self.options
.newlines_in_values
.unwrap_or(state.config_options().catalog.newlines_in_values),
self.options.compression.into(),
);
Ok(Arc::new(exec))
Expand Down Expand Up @@ -1065,6 +1080,41 @@ mod tests {
Ok(())
}

#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn test_csv_parallel_newlines_in_values(n_partitions: usize) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let csv_options = CsvReadOptions::default()
.has_header(true)
.newlines_in_values(true);
let ctx = SessionContext::new_with_config(config);
let testdata = arrow_test_data();
ctx.register_csv(
"aggr",
&format!("{testdata}/csv/aggregate_test_100.csv"),
csv_options,
)
.await?;

let query = "select sum(c3) from aggr;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = ["+--------------+",
"| sum(aggr.c3) |",
"+--------------+",
"| 781 |",
"+--------------+"];
assert_batches_eq!(expected, &query_result);
assert_eq!(1, actual_partitions); // csv won't be scanned in parallel when newlines_in_values is set

Ok(())
}

/// Read a single empty csv file in parallel
///
/// empty_0_byte.csv:
Expand Down
22 changes: 22 additions & 0 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ pub struct CsvReadOptions<'a> {
pub escape: Option<u8>,
/// If enabled, lines beginning with this byte are ignored.
pub comment: Option<u8>,
/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub newlines_in_values: bool,
/// An optional schema representing the CSV files. If None, CSV reader will try to infer it
/// based on data in file.
pub schema: Option<&'a Schema>,
Expand Down Expand Up @@ -95,6 +103,7 @@ impl<'a> CsvReadOptions<'a> {
delimiter: b',',
quote: b'"',
escape: None,
newlines_in_values: false,
file_extension: DEFAULT_CSV_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
Expand Down Expand Up @@ -133,6 +142,18 @@ impl<'a> CsvReadOptions<'a> {
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = newlines_in_values;
self
}

/// Specify the file extension for CSV file selection
pub fn file_extension(mut self, file_extension: &'a str) -> Self {
self.file_extension = file_extension;
Expand Down Expand Up @@ -490,6 +511,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_delimiter(self.delimiter)
.with_quote(self.quote)
.with_escape(self.escape)
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());

Expand Down
27 changes: 24 additions & 3 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub struct CsvExec {
quote: u8,
escape: Option<u8>,
comment: Option<u8>,
newlines_in_values: bool,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Compression type of the file associated with CsvExec
Expand All @@ -68,13 +69,15 @@ pub struct CsvExec {

impl CsvExec {
/// Create a new CSV reader execution plan provided base and specific configurations
#[allow(clippy::too_many_arguments)]
pub fn new(
base_config: FileScanConfig,
has_header: bool,
delimiter: u8,
quote: u8,
escape: Option<u8>,
comment: Option<u8>,
newlines_in_values: bool,
file_compression_type: FileCompressionType,
) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
Expand All @@ -91,6 +94,7 @@ impl CsvExec {
delimiter,
quote,
escape,
newlines_in_values,
metrics: ExecutionPlanMetricsSet::new(),
file_compression_type,
cache,
Expand Down Expand Up @@ -126,6 +130,17 @@ impl CsvExec {
self.escape
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub fn newlines_in_values(&self) -> bool {
self.newlines_in_values
}

fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}
Expand Down Expand Up @@ -196,15 +211,15 @@ impl ExecutionPlan for CsvExec {
/// Redistribute files across partitions according to their size
/// See comments on [`FileGroupPartitioner`] for more detail.
///
/// Return `None` if can't get repartitioned(empty/compressed file).
/// Return `None` if can't get repartitioned (empty, compressed file, or `newlines_in_values` set).
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
// Parallel execution on compressed CSV file is not supported yet.
if self.file_compression_type.is_compressed() {
// Parallel execution on compressed CSV files or files that must support newlines in values is not supported yet.
if self.file_compression_type.is_compressed() || self.newlines_in_values {
return Ok(None);
}

Expand Down Expand Up @@ -589,6 +604,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -658,6 +674,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -727,6 +744,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -793,6 +811,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(14, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -858,6 +877,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -953,6 +973,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,7 @@ pub(crate) mod tests {
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand All @@ -1496,6 +1497,7 @@ pub(crate) mod tests {
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down Expand Up @@ -3770,6 +3772,7 @@ pub(crate) mod tests {
b'"',
None,
None,
false,
compression_type,
)),
vec![("a".to_string(), "a".to_string())],
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ fn try_swapping_with_csv(
csv.quote(),
csv.escape(),
csv.comment(),
csv.newlines_in_values(),
csv.file_compression_type,
)) as _
})
Expand Down Expand Up @@ -1700,6 +1701,7 @@ mod tests {
0,
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand All @@ -1723,6 +1725,7 @@ mod tests {
0,
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,7 @@ mod tests {
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result<Arc<Cs
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
)))
}
Expand Down Expand Up @@ -283,6 +284,7 @@ pub fn csv_exec_sorted(
0,
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down Expand Up @@ -339,6 +341,7 @@ pub fn csv_exec_ordered(
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
13 changes: 13 additions & 0 deletions datafusion/core/tests/data/newlines_in_values.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
id,message
1,"hello
world"
2,"something
else"
3,"
many
lines
make
good test
"
4,unquoted
value,end
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ message CsvOptions {
string null_value = 12; // Optional representation of null value
bytes comment = 13; // Optional comment character as a byte
bytes double_quote = 14; // Indicates if quotes are doubled
bytes newlines_in_values = 15; // Indicates if newlines are supported in values
}

// Options controlling CSV format
Expand Down
Loading

0 comments on commit cd1f9d8

Please sign in to comment.