Skip to content

Commit

Permalink
fix: Ignore quotes in csv comments (#20306)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Dec 16, 2024
1 parent 91ad299 commit 64cb34d
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 20 deletions.
71 changes: 58 additions & 13 deletions crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub fn count_rows_from_slice(

let iter = file_chunks.into_par_iter().map(|(start, stop)| {
let local_bytes = &bytes[start..stop];
let row_iterator = SplitLines::new(local_bytes, quote_char, eol_char);
let row_iterator = SplitLines::new(local_bytes, quote_char, eol_char, comment_prefix);
if comment_prefix.is_some() {
Ok(row_iterator
.filter(|line| !line.is_empty() && !is_comment_line(line, comment_prefix))
Expand Down Expand Up @@ -124,9 +124,10 @@ pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
///
/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
#[inline]
pub(super) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
match comment_prefix {
Some(CommentPrefix::Single(c)) => line.starts_with(&[*c]),
Some(CommentPrefix::Single(c)) => line.first() == Some(c),
Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
None => false,
}
Expand Down Expand Up @@ -216,7 +217,7 @@ pub(super) fn next_line_position(
}
debug_assert!(pos <= input.len());
let new_input = unsafe { input.get_unchecked(pos..) };
let mut lines = SplitLines::new(new_input, quote_char, eol_char);
let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
let line = lines.next();

match (line, expected_fields) {
Expand Down Expand Up @@ -355,6 +356,7 @@ pub(super) struct SplitLines<'a> {
previous_valid_eols: u64,
total_index: usize,
quoting: bool,
comment_prefix: Option<&'a CommentPrefix>,
}

#[cfg(feature = "simd")]
Expand All @@ -369,7 +371,12 @@ use polars_utils::clmul::prefix_xorsum_inclusive;
type SimdVec = u8x64;

impl<'a> SplitLines<'a> {
pub(super) fn new(slice: &'a [u8], quote_char: Option<u8>, eol_char: u8) -> Self {
pub(super) fn new(
slice: &'a [u8],
quote_char: Option<u8>,
eol_char: u8,
comment_prefix: Option<&'a CommentPrefix>,
) -> Self {
let quoting = quote_char.is_some();
let quote_char = quote_char.unwrap_or(b'\"');
#[cfg(feature = "simd")]
Expand All @@ -388,19 +395,20 @@ impl<'a> SplitLines<'a> {
previous_valid_eols: 0,
total_index: 0,
quoting,
comment_prefix,
}
}
}

impl<'a> Iterator for SplitLines<'a> {
type Item = &'a [u8];

#[inline]
#[cfg(not(feature = "simd"))]
fn next(&mut self) -> Option<&'a [u8]> {
impl<'a> SplitLines<'a> {
// scalar as in non-simd
fn next_scalar(&mut self) -> Option<&'a [u8]> {
if self.v.is_empty() {
return None;
}
if is_comment_line(self.v, self.comment_prefix) {
return self.next_comment_line();
}
{
let mut pos = 0u32;
let mut iter = self.v.iter();
Expand Down Expand Up @@ -443,6 +451,31 @@ impl<'a> Iterator for SplitLines<'a> {
}
}
}
fn next_comment_line(&mut self) -> Option<&'a [u8]> {
if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
unsafe {
// return line up to this position
let ret = Some(self.v.get_unchecked(..(pos - 1)));
// skip the '\n' token and update slice.
self.v = self.v.get_unchecked(pos..);
ret
}
} else {
let remainder = self.v;
self.v = &[];
Some(remainder)
}
}
}

impl<'a> Iterator for SplitLines<'a> {
type Item = &'a [u8];

#[inline]
#[cfg(not(feature = "simd"))]
fn next(&mut self) -> Option<&'a [u8]> {
self.next_scalar()
}

#[inline]
#[cfg(feature = "simd")]
Expand All @@ -465,6 +498,9 @@ impl<'a> Iterator for SplitLines<'a> {
if self.v.is_empty() {
return None;
}
if self.comment_prefix.is_some() {
return self.next_scalar();
}

self.total_index = 0;
let mut not_in_field_previous_iter = true;
Expand Down Expand Up @@ -729,6 +765,15 @@ pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &
}
}

#[inline]
pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
if let Some(pos) = next_line_position_naive(input, eol_char) {
unsafe { input.get_unchecked(pos..) }
} else {
&[]
}
}

/// Parse CSV.
///
/// # Arguments
Expand Down Expand Up @@ -780,7 +825,7 @@ pub(super) fn parse_lines(
return Ok(original_bytes_len);
} else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
// deal with comments
let bytes_rem = skip_this_line(bytes, parse_options.quote_char, parse_options.eol_char);
let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
bytes = bytes_rem;
continue;
}
Expand Down Expand Up @@ -923,13 +968,13 @@ mod test {
#[test]
fn test_splitlines() {
let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n');
let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
assert_eq!(lines.next(), None);

let input2 = "1,'foo\n'\n2,'foo\n'\n";
let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n');
let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
assert_eq!(lines2.next(), None);
Expand Down
7 changes: 4 additions & 3 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use super::schema_inference::{check_decimal_comma, infer_file_schema};
#[cfg(any(feature = "decompress", feature = "decompress-fast"))]
use super::utils::decompress;
use super::CsvParseOptions;
use crate::csv::read::parser::skip_this_line_naive;
use crate::mmap::ReaderBytes;
use crate::predicates::PhysicalIoExpr;
#[cfg(not(any(feature = "decompress", feature = "decompress-fast")))]
Expand Down Expand Up @@ -606,7 +607,7 @@ pub fn find_starting_point(

// skip 'n' leading rows
if skip_rows_before_header > 0 {
let mut split_lines = SplitLines::new(bytes, quote_char, eol_char);
let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
let mut current_line = &bytes[..0];

for _ in 0..skip_rows_before_header {
Expand All @@ -623,7 +624,7 @@ pub fn find_starting_point(

// skip lines that are comments
while is_comment_line(bytes, comment_prefix) {
bytes = skip_this_line(bytes, quote_char, eol_char);
bytes = skip_this_line_naive(bytes, eol_char);
}

// skip header row
Expand All @@ -632,7 +633,7 @@ pub fn find_starting_point(
}
// skip 'n' rows following the header
if skip_rows_after_header > 0 {
let mut split_lines = SplitLines::new(bytes, quote_char, eol_char);
let mut split_lines = SplitLines::new(bytes, quote_char, eol_char, comment_prefix);
let mut current_line = &bytes[..0];

for _ in 0..skip_rows_after_header {
Expand Down
18 changes: 14 additions & 4 deletions crates/polars-io/src/csv/read/schema_inference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,13 @@ fn infer_file_schema_inner(
if raise_if_empty {
polars_ensure!(!bytes.is_empty(), NoData: "empty CSV");
};
let mut lines =
SplitLines::new(bytes, parse_options.quote_char, parse_options.eol_char).skip(skip_rows);
let mut lines = SplitLines::new(
bytes,
parse_options.quote_char,
parse_options.eol_char,
parse_options.comment_prefix.as_ref(),
)
.skip(skip_rows);

// get or create header names
// when has_header is false, creates default column names with column_ prefix
Expand Down Expand Up @@ -304,8 +309,13 @@ fn infer_file_schema_inner(
};
if !has_header {
// re-init lines so that the header is included in type inference.
lines = SplitLines::new(bytes, parse_options.quote_char, parse_options.eol_char)
.skip(skip_rows);
lines = SplitLines::new(
bytes,
parse_options.quote_char,
parse_options.eol_char,
parse_options.comment_prefix.as_ref(),
)
.skip(skip_rows);
}

let header_length = headers.len();
Expand Down
7 changes: 7 additions & 0 deletions py-polars/tests/unit/io/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2421,3 +2421,10 @@ def test_csv_skip_lines() -> None:

fh.seek(0)
assert_frame_equal(pl.scan_csv(fh, has_header=True, skip_lines=3).collect(), df)


def test_csv_invalid_quoted_comment_line() -> None:
# Comment quotes should be ignored.
assert pl.read_csv(
b'#"Comment\nColA\tColB\n1\t2', separator="\t", comment_prefix="#"
).to_dict(as_series=False) == {"ColA": [1], "ColB": [2]}

0 comments on commit 64cb34d

Please sign in to comment.