Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support newlines_in_values CSV option #11533

Merged
merged 10 commits into from
Jul 21, 2024
30 changes: 30 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ config_namespace! {
/// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
pub has_header: bool, default = false

/// Specifies whether newlines in (quoted) CSV values are supported.
///
/// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
pub newlines_in_values: bool, default = false
}
}

Expand Down Expand Up @@ -1593,6 +1603,14 @@ config_namespace! {
pub quote: u8, default = b'"'
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: usize, default = 100
pub date_format: Option<String>, default = None
Expand Down Expand Up @@ -1665,6 +1683,18 @@ impl CsvOptions {
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = Some(newlines_in_values);
self
}

/// Set a `CompressionTypeVariant` of CSV
/// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
pub fn with_file_compression_type(
Expand Down
50 changes: 50 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,18 @@ impl CsvFormat {
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.options.newlines_in_values = Some(newlines_in_values);
self
}

/// Set a `FileCompressionType` of CSV
/// - defaults to `FileCompressionType::UNCOMPRESSED`
pub fn with_file_compression_type(
Expand Down Expand Up @@ -330,6 +342,9 @@ impl FileFormat for CsvFormat {
self.options.quote,
self.options.escape,
self.options.comment,
self.options
.newlines_in_values
.unwrap_or(state.config_options().catalog.newlines_in_values),
self.options.compression.into(),
);
Ok(Arc::new(exec))
Expand Down Expand Up @@ -1052,6 +1067,41 @@ mod tests {
Ok(())
}

#[rstest(n_partitions, case(1), case(2), case(3), case(4))]
#[tokio::test]
async fn test_csv_parallel_newlines_in_values(n_partitions: usize) -> Result<()> {
let config = SessionConfig::new()
.with_repartition_file_scans(true)
.with_repartition_file_min_size(0)
.with_target_partitions(n_partitions);
let csv_options = CsvReadOptions::default()
.has_header(true)
.newlines_in_values(true);
let ctx = SessionContext::new_with_config(config);
let testdata = arrow_test_data();
ctx.register_csv(
"aggr",
&format!("{testdata}/csv/aggregate_test_100.csv"),
csv_options,
)
.await?;

let query = "select sum(c3) from aggr;";
let query_result = ctx.sql(query).await?.collect().await?;
let actual_partitions = count_query_csv_partitions(&ctx, query).await?;

#[rustfmt::skip]
let expected = ["+--------------+",
"| sum(aggr.c3) |",
"+--------------+",
"| 781 |",
"+--------------+"];
assert_batches_eq!(expected, &query_result);
assert_eq!(1, actual_partitions); // csv won't be scanned in parallel when newlines_in_values is set

Ok(())
}

/// Read a single empty csv file in parallel
///
/// empty_0_byte.csv:
Expand Down
22 changes: 22 additions & 0 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ pub struct CsvReadOptions<'a> {
pub escape: Option<u8>,
/// If enabled, lines beginning with this byte are ignored.
pub comment: Option<u8>,
/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub newlines_in_values: bool,
/// An optional schema representing the CSV files. If None, CSV reader will try to infer it
/// based on data in file.
pub schema: Option<&'a Schema>,
Expand Down Expand Up @@ -95,6 +103,7 @@ impl<'a> CsvReadOptions<'a> {
delimiter: b',',
quote: b'"',
escape: None,
newlines_in_values: false,
file_extension: DEFAULT_CSV_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
Expand Down Expand Up @@ -133,6 +142,18 @@ impl<'a> CsvReadOptions<'a> {
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = newlines_in_values;
self
}

/// Specify the file extension for CSV file selection
pub fn file_extension(mut self, file_extension: &'a str) -> Self {
self.file_extension = file_extension;
Expand Down Expand Up @@ -490,6 +511,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_delimiter(self.delimiter)
.with_quote(self.quote)
.with_escape(self.escape)
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());

Expand Down
26 changes: 23 additions & 3 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub struct CsvExec {
quote: u8,
escape: Option<u8>,
comment: Option<u8>,
newlines_in_values: bool,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Compression type of the file associated with CsvExec
Expand All @@ -75,6 +76,7 @@ impl CsvExec {
quote: u8,
escape: Option<u8>,
comment: Option<u8>,
newlines_in_values: bool,
file_compression_type: FileCompressionType,
) -> Self {
let (projected_schema, projected_statistics, projected_output_ordering) =
Expand All @@ -91,6 +93,7 @@ impl CsvExec {
delimiter,
quote,
escape,
newlines_in_values,
metrics: ExecutionPlanMetricsSet::new(),
file_compression_type,
cache,
Expand Down Expand Up @@ -126,6 +129,17 @@ impl CsvExec {
self.escape
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub fn newlines_in_values(&self) -> bool {
self.newlines_in_values
}

fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}
Expand Down Expand Up @@ -196,15 +210,15 @@ impl ExecutionPlan for CsvExec {
/// Redistribute files across partitions according to their size
/// See comments on [`FileGroupPartitioner`] for more detail.
///
/// Return `None` if can't get repartitioned(empty/compressed file).
/// Return `None` if can't get repartitioned (empty, compressed file, or `newlines_in_values` set).
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
// Parallel execution on compressed CSV file is not supported yet.
if self.file_compression_type.is_compressed() {
// Parallel execution on compressed CSV files or files that must support newlines in values is not supported yet.
if self.file_compression_type.is_compressed() || self.newlines_in_values {
return Ok(None);
}

Expand Down Expand Up @@ -589,6 +603,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -658,6 +673,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -727,6 +743,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -793,6 +810,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(14, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -858,6 +876,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
Expand Down Expand Up @@ -953,6 +972,7 @@ mod tests {
b'"',
None,
None,
false,
file_compression_type.to_owned(),
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,7 @@ pub(crate) mod tests {
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand All @@ -1496,6 +1497,7 @@ pub(crate) mod tests {
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down Expand Up @@ -3770,6 +3772,7 @@ pub(crate) mod tests {
b'"',
None,
None,
false,
compression_type,
)),
vec![("a".to_string(), "a".to_string())],
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ fn try_swapping_with_csv(
csv.quote(),
csv.escape(),
csv.comment(),
csv.newlines_in_values(),
csv.file_compression_type,
)) as _
})
Expand Down Expand Up @@ -1700,6 +1701,7 @@ mod tests {
0,
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand All @@ -1723,6 +1725,7 @@ mod tests {
0,
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,7 @@ mod tests {
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result<Arc<Cs
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
)))
}
Expand Down Expand Up @@ -283,6 +284,7 @@ pub fn csv_exec_sorted(
0,
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down Expand Up @@ -339,6 +341,7 @@ pub fn csv_exec_ordered(
b'"',
None,
None,
false,
FileCompressionType::UNCOMPRESSED,
))
}
Expand Down
13 changes: 13 additions & 0 deletions datafusion/core/tests/data/newlines_in_values.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
id,message
1,"hello
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

world"
2,"something
else"
3,"
many
lines
make
good test
"
4,unquoted
value,end
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ message CsvOptions {
string null_value = 12; // Optional representation of null value
bytes comment = 13; // Optional comment character as a byte
bytes double_quote = 14; // Indicates if quotes are doubled
bytes newlines_in_values = 15; // Indicates if newlines are supported in values
}

// Options controlling CSV format
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
quote: proto_opts.quote[0],
escape: proto_opts.escape.first().copied(),
double_quote: proto_opts.has_header.first().map(|h| *h != 0),
newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
compression: proto_opts.compression().into(),
schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize,
date_format: (!proto_opts.date_format.is_empty())
Expand Down
Loading
Loading