Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 80 additions & 8 deletions rust/arrow/src/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ lazy_static! {
.case_insensitive(true)
.build()
.unwrap();
static ref DATE_RE: Regex = Regex::new(r"^\d\d\d\d-\d\d-\d\d$").unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't there a \d{4} or something like that? May make it a bit easier to read and more expressive, IMO

static ref DATETIME_RE: Regex =
Regex::new(r"^\d\d\d\d-\d\d-\d\dT\d\d:\d\d:\d\d$").unwrap();
}

/// Infer the data type of a record
Expand All @@ -79,6 +82,10 @@ fn infer_field_schema(string: &str) -> DataType {
// match regex in a particular order
if BOOLEAN_RE.is_match(string) {
DataType::Boolean
} else if DATETIME_RE.is_match(string) {
DataType::Date64(DateUnit::Millisecond)
} else if DATE_RE.is_match(string) {
DataType::Date32(DateUnit::Day)
} else if DECIMAL_RE.is_match(string) {
DataType::Float64
} else if INTEGER_RE.is_match(string) {
Expand Down Expand Up @@ -219,6 +226,35 @@ pub fn infer_schema_from_files(
Schema::try_merge(&schemas)
}

/// Parses a string into the specified `ArrowPrimitiveType`.
fn parse_field<T: ArrowPrimitiveType>(s: &str) -> Result<T::Native> {
let from_ymd = chrono::NaiveDate::from_ymd;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note there is also code to convert strings to nanosecond timestamps (string_to_timestamp_nanos) here: https://github.com/apache/arrow/blob/master/rust/datafusion/src/physical_plan/datetime_expressions.rs#L30

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the long run, this is motivation to centralise this into temporal kernels; so we can share this with the JSON reader.
One of the things I'll submit a PR for in the coming days/weeks, is a crate that parses strings to numbers faster than what libcore does.

let since = chrono::NaiveDate::signed_duration_since;

match T::DATA_TYPE {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will benefit from changes in this PR to include a trait.
#8714

DataType::Boolean => s
.to_lowercase()
.parse::<T::Native>()
.map_err(|_| ArrowError::ParseError("Error parsing boolean".to_string())),
DataType::Date32(DateUnit::Day) => {
let days = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d")
.map(|t| since(t, from_ymd(1970, 1, 1)).num_days() as i32);
days.map(|t| unsafe { std::mem::transmute_copy::<i32, T::Native>(&t) })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what @vertexclique said: transmute is one of the most unsafe operations in rust, and this can easily lead to undefined behavior if it overflows.

Copy link
Contributor

@alamb alamb Nov 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another alternative would be to extend the ArrowNativeType with from_i32 and from_i64, following the model of from_usize and then implement those functions for i32 and i64 respectively (as those are the underlying native types)

I tried this approach out on a branch in case you are interested / want to take the change:
Commit with change: alamb@cc61e7a

The branch (with your change) is here: https://github.com/alamb/arrow/tree/alamb/less-unsafe)

.map_err(|e| ArrowError::ParseError(e.to_string()))
}
DataType::Date64(DateUnit::Millisecond) => {
let millis = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")
.map(|t| t.timestamp_millis());
millis
.map(|t| unsafe { std::mem::transmute_copy::<i64, T::Native>(&t) })
.map_err(|e| ArrowError::ParseError(e.to_string()))
}
_ => s
.parse::<T::Native>()
.map_err(|_| ArrowError::ParseError("Error parsing field".to_string())),
}
}

// optional bounds of the reader, of the form (min line, max line).
type Bounds = Option<(usize, usize)>;

Expand Down Expand Up @@ -370,6 +406,7 @@ impl<R: Read> Iterator for Reader<R> {
}

/// parses a slice of [csv_crate::StringRecord] into a [array::record_batch::RecordBatch].

fn parse(
rows: &[StringRecord],
fields: &Vec<Field>,
Expand Down Expand Up @@ -430,6 +467,12 @@ fn parse(
}
Ok(Arc::new(builder.finish()) as ArrayRef)
}
&DataType::Date32(_) => {
build_primitive_array::<Date32Type>(line_number, rows, i)
}
&DataType::Date64(_) => {
build_primitive_array::<Date64Type>(line_number, rows, i)
}
other => Err(ArrowError::ParseError(format!(
"Unsupported data type {:?}",
other
Expand All @@ -446,7 +489,6 @@ fn parse(
arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr))
}

// parses a specific column (col_idx) into an Arrow Array.
fn build_primitive_array<T: ArrowPrimitiveType>(
line_number: usize,
rows: &[StringRecord],
Expand All @@ -460,11 +502,8 @@ fn build_primitive_array<T: ArrowPrimitiveType>(
if s.is_empty() {
return Ok(None);
}
let parsed = if T::DATA_TYPE == DataType::Boolean {
s.to_lowercase().parse::<T::Native>()
} else {
s.parse::<T::Native>()
};

let parsed = parse_field::<T>(s);
match parsed {
Ok(e) => Ok(Some(e)),
Err(_) => Err(ArrowError::ParseError(format!(
Expand Down Expand Up @@ -835,25 +874,30 @@ mod tests {
.has_header(true)
.with_delimiter(b'|')
.with_batch_size(512)
.with_projection(vec![0, 1, 2, 3]);
.with_projection(vec![0, 1, 2, 3, 4]);

let mut csv = builder.build(file).unwrap();
let batch = csv.next().unwrap().unwrap();

assert_eq!(5, batch.num_rows());
assert_eq!(4, batch.num_columns());
assert_eq!(5, batch.num_columns());

let schema = batch.schema();

assert_eq!(&DataType::Int64, schema.field(0).data_type());
assert_eq!(&DataType::Float64, schema.field(1).data_type());
assert_eq!(&DataType::Float64, schema.field(2).data_type());
assert_eq!(&DataType::Boolean, schema.field(3).data_type());
assert_eq!(
&DataType::Date32(DateUnit::Day),
schema.field(4).data_type()
);

assert_eq!(false, schema.field(0).is_nullable());
assert_eq!(true, schema.field(1).is_nullable());
assert_eq!(true, schema.field(2).is_nullable());
assert_eq!(false, schema.field(3).is_nullable());
assert_eq!(true, schema.field(4).is_nullable());

assert_eq!(false, batch.column(1).is_null(0));
assert_eq!(false, batch.column(1).is_null(1));
Expand Down Expand Up @@ -901,6 +945,34 @@ mod tests {
assert_eq!(infer_field_schema("10.2"), DataType::Float64);
assert_eq!(infer_field_schema("true"), DataType::Boolean);
assert_eq!(infer_field_schema("false"), DataType::Boolean);
assert_eq!(
infer_field_schema("2020-11-08"),
DataType::Date32(DateUnit::Day)
);
assert_eq!(
infer_field_schema("2020-11-08T14:20:01"),
DataType::Date64(DateUnit::Millisecond)
);
}

#[test]
fn parse_date32() {
assert_eq!(parse_field::<Date32Type>("1970-01-01").unwrap(), 0);
assert_eq!(parse_field::<Date32Type>("2020-03-15").unwrap(), 18336);
assert_eq!(parse_field::<Date32Type>("1945-05-08").unwrap(), -9004);
}

#[test]
fn parse_date64() {
assert_eq!(parse_field::<Date64Type>("1970-01-01T00:00:00").unwrap(), 0);
assert_eq!(
parse_field::<Date64Type>("2018-11-13T17:11:10").unwrap(),
1542129070000
);
assert_eq!(
parse_field::<Date64Type>("1900-02-28T12:34:56").unwrap(),
-2203932304000
);
}

#[test]
Expand Down
12 changes: 6 additions & 6 deletions rust/arrow/test/data/various_types.csv
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
c_int|c_float|c_string|c_bool
1|1.1|"1.11"|true
2|2.2|"2.22"|true
3||"3.33"|true
4|4.4||false
5|6.6|""|false
c_int|c_float|c_string|c_bool|c_date
1|1.1|"1.11"|true|1970-01-01
2|2.2|"2.22"|true|2020-11-08
3||"3.33"|true|1969-12-31
4|4.4||false|
5|6.6|""|false|1990-01-01