Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/arrow/src/csv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
pub mod reader;
pub mod writer;

pub use self::reader::infer_schema_from_files;
pub use self::reader::Reader;
pub use self::reader::ReaderBuilder;
pub use self::reader::{build_array, build_batch, infer_schema_from_files};
pub use self::writer::Writer;
pub use self::writer::WriterBuilder;
157 changes: 92 additions & 65 deletions rust/arrow/src/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,7 @@ impl<R: Read> Iterator for Reader<R> {
return None;
}

// parse the batches into a RecordBatch
let result = parse(
let result = build_batch(
&self.batch_records[..read_records],
&self.schema.fields(),
&self.projection,
Expand All @@ -394,88 +393,116 @@ impl<R: Read> Iterator for Reader<R> {
}
}

/// parses a slice of [csv_crate::StringRecord] into a [array::record_batch::RecordBatch].
fn parse(
/// Tries to create an [array::Array] from a slice of [csv_crate::StringRecord] by interpreting its
/// values at column `column_index` to be of `data_type`.
/// `line_number` is where the set of rows starts at, and is only used to report the line number in case of errors.
/// # Error
/// This function errors iff:
/// * _any_ entry from `rows` at `column_index` cannot be parsed into the DataType.
/// * The [array::datatypes::DataType] is not supported.
pub fn build_array(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside is that this creates a dependency on StringRecord in the public API, making it harder to remove it when we want?

rows: &[StringRecord],
data_type: &DataType,
line_number: usize,
column_index: usize,
) -> Result<ArrayRef> {
match data_type {
DataType::Boolean => build_boolean_array(line_number, rows, column_index),
DataType::Int8 => {
build_primitive_array::<Int8Type>(line_number, rows, column_index)
}
DataType::Int16 => {
build_primitive_array::<Int16Type>(line_number, rows, column_index)
}
DataType::Int32 => {
build_primitive_array::<Int32Type>(line_number, rows, column_index)
}
DataType::Int64 => {
build_primitive_array::<Int64Type>(line_number, rows, column_index)
}
DataType::UInt8 => {
build_primitive_array::<UInt8Type>(line_number, rows, column_index)
}
DataType::UInt16 => {
build_primitive_array::<UInt16Type>(line_number, rows, column_index)
}
DataType::UInt32 => {
build_primitive_array::<UInt32Type>(line_number, rows, column_index)
}
DataType::UInt64 => {
build_primitive_array::<UInt64Type>(line_number, rows, column_index)
}
DataType::Float32 => {
build_primitive_array::<Float32Type>(line_number, rows, column_index)
}
DataType::Float64 => {
build_primitive_array::<Float64Type>(line_number, rows, column_index)
}
DataType::Date32(_) => {
build_primitive_array::<Date32Type>(line_number, rows, column_index)
}
DataType::Date64(_) => {
build_primitive_array::<Date64Type>(line_number, rows, column_index)
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
build_primitive_array::<TimestampMicrosecondType>(
line_number,
rows,
column_index,
)
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => build_primitive_array::<
TimestampNanosecondType,
>(
line_number, rows, column_index
),
DataType::Utf8 => Ok(Arc::new(
rows.iter()
.map(|row| row.get(column_index))
.collect::<StringArray>(),
) as ArrayRef),
other => Err(ArrowError::ParseError(format!(
"Unsupported data type {:?}",
other
))),
}
}

/// Tries to create an [array::record_batch::RecordBatch] from a slice of [csv_crate::StringRecord] by interpreting
/// each of its columns according to `fields`. When `projection` is not None, it is used to select a subset of `fields` to
/// parse.
/// `line_number` is where the set of rows starts at, and is only used to report the line number in case of errors.
/// # Error
/// This function errors iff:
/// * _any_ entry from `rows` cannot be parsed into its corresponding field's `DataType`.
/// * Any of the fields' [array::datatypes::DataType] is not supported.
/// # Panic
/// This function panics if any index in `projection` is larger than `fields.len()`.
pub fn build_batch(
rows: &[StringRecord],
fields: &[Field],
projection: &Option<Vec<usize>>,
line_number: usize,
) -> Result<RecordBatch> {
let projection: Vec<usize> = match projection {
Some(ref v) => v.clone(),
None => fields.iter().enumerate().map(|(i, _)| i).collect(),
None => (0..fields.len()).collect(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I think the v.clone() could maybe even be removed?

};

let arrays: Result<Vec<ArrayRef>> = projection
let columns = projection
.iter()
.map(|i| {
let i = *i;
let field = &fields[i];
match field.data_type() {
&DataType::Boolean => build_boolean_array(line_number, rows, i),
&DataType::Int8 => {
build_primitive_array::<Int8Type>(line_number, rows, i)
}
&DataType::Int16 => {
build_primitive_array::<Int16Type>(line_number, rows, i)
}
&DataType::Int32 => {
build_primitive_array::<Int32Type>(line_number, rows, i)
}
&DataType::Int64 => {
build_primitive_array::<Int64Type>(line_number, rows, i)
}
&DataType::UInt8 => {
build_primitive_array::<UInt8Type>(line_number, rows, i)
}
&DataType::UInt16 => {
build_primitive_array::<UInt16Type>(line_number, rows, i)
}
&DataType::UInt32 => {
build_primitive_array::<UInt32Type>(line_number, rows, i)
}
&DataType::UInt64 => {
build_primitive_array::<UInt64Type>(line_number, rows, i)
}
&DataType::Float32 => {
build_primitive_array::<Float32Type>(line_number, rows, i)
}
&DataType::Float64 => {
build_primitive_array::<Float64Type>(line_number, rows, i)
}
&DataType::Date32(_) => {
build_primitive_array::<Date32Type>(line_number, rows, i)
}
&DataType::Date64(_) => {
build_primitive_array::<Date64Type>(line_number, rows, i)
}
&DataType::Timestamp(TimeUnit::Microsecond, _) => {
build_primitive_array::<TimestampMicrosecondType>(
line_number,
rows,
i,
)
}
&DataType::Timestamp(TimeUnit::Nanosecond, _) => {
build_primitive_array::<TimestampNanosecondType>(line_number, rows, i)
}
&DataType::Utf8 => Ok(Arc::new(
rows.iter().map(|row| row.get(i)).collect::<StringArray>(),
) as ArrayRef),
other => Err(ArrowError::ParseError(format!(
"Unsupported data type {:?}",
other
))),
}
build_array(rows, fields[i].data_type(), line_number, i)
})
.collect();
.collect::<Result<_>>()?;

let projected_fields: Vec<Field> =
projection.iter().map(|i| fields[*i].clone()).collect();

let projected_schema = Arc::new(Schema::new(projected_fields));

arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr))
RecordBatch::try_new(projected_schema, columns)
}

/// Specialized parsing implementations
Expand Down