Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 73 additions & 56 deletions rust/arrow/src/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,23 @@
//! let batch = csv.next().unwrap().unwrap();
//! ```

use core::cmp::min;
use lazy_static::lazy_static;
use regex::{Regex, RegexBuilder};
use std::collections::HashSet;
use std::fmt;
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use std::{collections::HashSet, iter::Skip};
use std::{fmt, iter::Take};

use csv as csv_crate;

use crate::array::{ArrayRef, PrimitiveArray, StringBuilder};
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
use crate::{
array::{ArrayRef, PrimitiveArray, StringBuilder},
util::buffered_iterator::Buffered,
};

use self::csv_crate::{Error, StringRecord, StringRecordsIntoIter};
use self::csv_crate::{ByteRecord, StringRecord};

lazy_static! {
static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap();
Expand Down Expand Up @@ -95,7 +93,7 @@ fn infer_field_schema(string: &str) -> DataType {
///
/// Return infered schema and number of records used for inference.
fn infer_file_schema<R: Read + Seek>(
reader: &mut BufReader<R>,
reader: &mut R,
delimiter: u8,
max_read_records: Option<usize>,
has_header: bool,
Expand Down Expand Up @@ -131,11 +129,12 @@ fn infer_file_schema<R: Read + Seek>(
let mut records_count = 0;
let mut fields = vec![];

for result in csv_reader
.records()
.take(max_read_records.unwrap_or(std::usize::MAX))
{
let record = result?;
let mut record = StringRecord::new();
let max_records = max_read_records.unwrap_or(usize::MAX);
while records_count < max_records {
if !csv_reader.read_record(&mut record)? {
break;
}
records_count += 1;

for i in 0..header_length {
Expand Down Expand Up @@ -201,7 +200,7 @@ pub fn infer_schema_from_files(

for fname in files.iter() {
let (schema, records_read) = infer_file_schema(
&mut BufReader::new(File::open(fname)?),
&mut File::open(fname)?,
delimiter,
Some(records_to_read),
has_header,
Expand Down Expand Up @@ -229,10 +228,15 @@ pub struct Reader<R: Read> {
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<usize>>,
/// File reader
record_iter:
Buffered<Skip<Take<StringRecordsIntoIter<BufReader<R>>>>, StringRecord, Error>,
reader: csv_crate::Reader<R>,
/// Current line number
line_number: usize,
/// Maximum number of rows to read
end: usize,
/// Number of records per batch
batch_size: usize,
/// Vector that can hold the `StringRecord`s of the batches
batch_records: Vec<StringRecord>,
}

impl<R> fmt::Debug for Reader<R>
Expand Down Expand Up @@ -263,14 +267,8 @@ impl<R: Read> Reader<R> {
bounds: Bounds,
projection: Option<Vec<usize>>,
) -> Self {
Self::from_buf_reader(
BufReader::new(reader),
schema,
has_header,
delimiter,
batch_size,
bounds,
projection,
Self::from_reader(
reader, schema, has_header, delimiter, batch_size, bounds, projection,
)
}

Expand All @@ -289,12 +287,12 @@ impl<R: Read> Reader<R> {
}
}

/// Create a new CsvReader from a `BufReader<R: Read>
/// Create a new CsvReader from a Reader
///
/// This constructor allows you more flexibility in what records are processed by the
/// csv reader.
pub fn from_buf_reader(
buf_reader: BufReader<R>,
pub fn from_reader(
reader: R,
schema: SchemaRef,
has_header: bool,
delimiter: Option<u8>,
Expand All @@ -309,27 +307,40 @@ impl<R: Read> Reader<R> {
reader_builder.delimiter(c);
}

let csv_reader = reader_builder.from_reader(buf_reader);
let record_iter = csv_reader.into_records();
let mut csv_reader = reader_builder.from_reader(reader);

let (start, end) = match bounds {
None => (0, usize::MAX),
Some((start, end)) => (start, end),
};
// Create an iterator that:
// * skips the first `start` items
// * runs up to `end` items
// * buffers `batch_size` items

// First we will skip `start` rows
// note that this skips by iteration. This is because in general it is not possible
// to seek in CSV. However, skiping still saves the burden of creating arrow arrays,
// which is a slow operation that scales with the number of columns
let record_iter = Buffered::new(record_iter.take(end).skip(start), batch_size);

let mut record = ByteRecord::new();
// Skip first start items
for _ in 0..start {
let res = csv_reader.read_byte_record(&mut record);
if !res.unwrap_or(false) {
break;
}
}

// Initialize batch_records with StringRecords so they
// can be reused accross batches
let mut batch_records = Vec::with_capacity(batch_size);
batch_records.resize_with(batch_size, Default::default);

Self {
schema,
projection,
record_iter,
reader: csv_reader,
line_number: if has_header { start + 1 } else { start },
batch_size,
end,
batch_records,
}
}
}
Expand All @@ -338,32 +349,39 @@ impl<R: Read> Iterator for Reader<R> {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
let rows = match self.record_iter.next() {
Some(Ok(r)) => r,
Some(Err(e)) => {
return Some(Err(ArrowError::ParseError(format!(
"Error parsing line {}: {:?}",
self.line_number + self.record_iter.n(),
e
))));
let remaining = self.end - self.line_number;

let mut read_records = 0;
for i in 0..min(self.batch_size, remaining) {
match self.reader.read_record(&mut self.batch_records[i]) {
Ok(true) => {
read_records += 1;
}
Ok(false) => break,
Err(e) => {
return Some(Err(ArrowError::ParseError(format!(
"Error parsing line {}: {:?}",
self.line_number + i,
e
))))
}
}
None => return None,
};
}

// return early if no data was loaded
if rows.is_empty() {
if read_records == 0 {
return None;
}

// parse the batches into a RecordBatch
let result = parse(
&rows,
&self.batch_records[..read_records],
&self.schema.fields(),
&self.projection,
self.line_number,
);

self.line_number += rows.len();
self.line_number += read_records;

Some(result)
}
Expand Down Expand Up @@ -640,15 +658,14 @@ impl ReaderBuilder {
}

/// Create a new `Reader` from the `ReaderBuilder`
pub fn build<R: Read + Seek>(self, reader: R) -> Result<Reader<R>> {
pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<Reader<R>> {
// check if schema should be inferred
let mut buf_reader = BufReader::new(reader);
let delimiter = self.delimiter.unwrap_or(b',');
let schema = match self.schema {
Some(schema) => schema,
None => {
let (inferred_schema, _) = infer_file_schema(
&mut buf_reader,
&mut reader,
delimiter,
self.max_records,
self.has_header,
Expand All @@ -657,8 +674,8 @@ impl ReaderBuilder {
Arc::new(inferred_schema)
}
};
Ok(Reader::from_buf_reader(
buf_reader,
Ok(Reader::from_reader(
reader,
schema,
self.has_header,
self.delimiter,
Expand Down Expand Up @@ -736,8 +753,8 @@ mod tests {
let both_files = file_with_headers
.chain(Cursor::new("\n".to_string()))
.chain(file_without_headers);
let mut csv = Reader::from_buf_reader(
BufReader::new(both_files),
let mut csv = Reader::from_reader(
both_files,
Arc::new(schema),
true,
None,
Expand Down
138 changes: 0 additions & 138 deletions rust/arrow/src/util/buffered_iterator.rs

This file was deleted.

Loading