diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index e38926d37fe..d1e7ff93698 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -40,25 +40,23 @@ //! let batch = csv.next().unwrap().unwrap(); //! ``` +use core::cmp::min; use lazy_static::lazy_static; use regex::{Regex, RegexBuilder}; +use std::collections::HashSet; +use std::fmt; use std::fs::File; -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; -use std::{collections::HashSet, iter::Skip}; -use std::{fmt, iter::Take}; use csv as csv_crate; +use crate::array::{ArrayRef, PrimitiveArray, StringBuilder}; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; -use crate::{ - array::{ArrayRef, PrimitiveArray, StringBuilder}, - util::buffered_iterator::Buffered, -}; -use self::csv_crate::{Error, StringRecord, StringRecordsIntoIter}; +use self::csv_crate::{ByteRecord, StringRecord}; lazy_static! { static ref DECIMAL_RE: Regex = Regex::new(r"^-?(\d+\.\d+)$").unwrap(); @@ -95,7 +93,7 @@ fn infer_field_schema(string: &str) -> DataType { /// /// Return infered schema and number of records used for inference. fn infer_file_schema( - reader: &mut BufReader, + reader: &mut R, delimiter: u8, max_read_records: Option, has_header: bool, @@ -131,11 +129,12 @@ fn infer_file_schema( let mut records_count = 0; let mut fields = vec![]; - for result in csv_reader - .records() - .take(max_read_records.unwrap_or(std::usize::MAX)) - { - let record = result?; + let mut record = StringRecord::new(); + let max_records = max_read_records.unwrap_or(usize::MAX); + while records_count < max_records { + if !csv_reader.read_record(&mut record)? { + break; + } records_count += 1; for i in 0..header_length { @@ -201,7 +200,7 @@ pub fn infer_schema_from_files( for fname in files.iter() { let (schema, records_read) = infer_file_schema( - &mut BufReader::new(File::open(fname)?), + &mut File::open(fname)?, delimiter, Some(records_to_read), has_header, @@ -229,10 +228,15 @@ pub struct Reader { /// Optional projection for which columns to load (zero-based column indices) projection: Option>, /// File reader - record_iter: - Buffered>>>, StringRecord, Error>, + reader: csv_crate::Reader, /// Current line number line_number: usize, + /// Maximum number of rows to read + end: usize, + /// Number of records per batch + batch_size: usize, + /// Vector that can hold the `StringRecord`s of the batches + batch_records: Vec, } impl fmt::Debug for Reader @@ -263,14 +267,8 @@ impl Reader { bounds: Bounds, projection: Option>, ) -> Self { - Self::from_buf_reader( - BufReader::new(reader), - schema, - has_header, - delimiter, - batch_size, - bounds, - projection, + Self::from_reader( + reader, schema, has_header, delimiter, batch_size, bounds, projection, ) } @@ -289,12 +287,12 @@ impl Reader { } } - /// Create a new CsvReader from a `BufReader + /// Create a new CsvReader from a Reader /// /// This constructor allows you more flexibility in what records are processed by the /// csv reader. - pub fn from_buf_reader( - buf_reader: BufReader, + pub fn from_reader( + reader: R, schema: SchemaRef, has_header: bool, delimiter: Option, @@ -309,27 +307,40 @@ impl Reader { reader_builder.delimiter(c); } - let csv_reader = reader_builder.from_reader(buf_reader); - let record_iter = csv_reader.into_records(); + let mut csv_reader = reader_builder.from_reader(reader); let (start, end) = match bounds { None => (0, usize::MAX), Some((start, end)) => (start, end), }; - // Create an iterator that: - // * skips the first `start` items - // * runs up to `end` items - // * buffers `batch_size` items + + // First we will skip `start` rows // note that this skips by iteration. This is because in general it is not possible // to seek in CSV. However, skiping still saves the burden of creating arrow arrays, // which is a slow operation that scales with the number of columns - let record_iter = Buffered::new(record_iter.take(end).skip(start), batch_size); + + let mut record = ByteRecord::new(); + // Skip first start items + for _ in 0..start { + let res = csv_reader.read_byte_record(&mut record); + if !res.unwrap_or(false) { + break; + } + } + + // Initialize batch_records with StringRecords so they + // can be reused accross batches + let mut batch_records = Vec::with_capacity(batch_size); + batch_records.resize_with(batch_size, Default::default); Self { schema, projection, - record_iter, + reader: csv_reader, line_number: if has_header { start + 1 } else { start }, + batch_size, + end, + batch_records, } } } @@ -338,32 +349,39 @@ impl Iterator for Reader { type Item = Result; fn next(&mut self) -> Option { - let rows = match self.record_iter.next() { - Some(Ok(r)) => r, - Some(Err(e)) => { - return Some(Err(ArrowError::ParseError(format!( - "Error parsing line {}: {:?}", - self.line_number + self.record_iter.n(), - e - )))); + let remaining = self.end - self.line_number; + + let mut read_records = 0; + for i in 0..min(self.batch_size, remaining) { + match self.reader.read_record(&mut self.batch_records[i]) { + Ok(true) => { + read_records += 1; + } + Ok(false) => break, + Err(e) => { + return Some(Err(ArrowError::ParseError(format!( + "Error parsing line {}: {:?}", + self.line_number + i, + e + )))) + } } - None => return None, - }; + } // return early if no data was loaded - if rows.is_empty() { + if read_records == 0 { return None; } // parse the batches into a RecordBatch let result = parse( - &rows, + &self.batch_records[..read_records], &self.schema.fields(), &self.projection, self.line_number, ); - self.line_number += rows.len(); + self.line_number += read_records; Some(result) } @@ -640,15 +658,14 @@ impl ReaderBuilder { } /// Create a new `Reader` from the `ReaderBuilder` - pub fn build(self, reader: R) -> Result> { + pub fn build(self, mut reader: R) -> Result> { // check if schema should be inferred - let mut buf_reader = BufReader::new(reader); let delimiter = self.delimiter.unwrap_or(b','); let schema = match self.schema { Some(schema) => schema, None => { let (inferred_schema, _) = infer_file_schema( - &mut buf_reader, + &mut reader, delimiter, self.max_records, self.has_header, @@ -657,8 +674,8 @@ impl ReaderBuilder { Arc::new(inferred_schema) } }; - Ok(Reader::from_buf_reader( - buf_reader, + Ok(Reader::from_reader( + reader, schema, self.has_header, self.delimiter, @@ -736,8 +753,8 @@ mod tests { let both_files = file_with_headers .chain(Cursor::new("\n".to_string())) .chain(file_without_headers); - let mut csv = Reader::from_buf_reader( - BufReader::new(both_files), + let mut csv = Reader::from_reader( + both_files, Arc::new(schema), true, None, diff --git a/rust/arrow/src/util/buffered_iterator.rs b/rust/arrow/src/util/buffered_iterator.rs deleted file mode 100644 index 5d42ee43e66..00000000000 --- a/rust/arrow/src/util/buffered_iterator.rs +++ /dev/null @@ -1,138 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [Buffered] is an iterator useful to build an [arrow::array::Array] and other -//! containers that benefit from batching or chunking. - -use std::marker::PhantomData; - -/// An iterator that buffers results in a vector so that the iterator returns a vector of `size` items. -/// The items must be a [std::result::Result] and if an error is returned, tha error is returned -/// and the iterator continues. -/// An invariant of this iterator is that every returned vector's size is at most the specified size. -#[derive(Debug)] -pub struct Buffered -where - T: Clone, - I: Iterator>, -{ - iter: I, - size: usize, - buffer: Vec, - phantom: PhantomData, -} - -impl Buffered -where - T: Clone, - I: Iterator>, -{ - pub fn new(iter: I, size: usize) -> Self { - Buffered { - iter, - size, - buffer: Vec::with_capacity(size), - phantom: PhantomData, - } - } - - /// returns the number of items buffered so far. - /// Useful to extract the exact item where an error occurred - #[inline] - pub fn n(&self) -> usize { - self.buffer.len() - } -} - -impl Iterator for Buffered -where - T: Clone, - I: Iterator>, -{ - type Item = Result, R>; - - fn next(&mut self) -> Option { - for _ in 0..(self.size - self.n()) { - match self.iter.next() { - Some(Ok(item)) => self.buffer.push(item), - Some(Err(error)) => return Some(Err(error)), - None => break, - } - } - if self.buffer.is_empty() { - None - } else { - let result = self.buffer.clone(); - self.buffer.clear(); - Some(Ok(result)) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[derive(Debug, PartialEq)] - struct AError {} - - impl std::fmt::Display for AError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Bla") - } - } - impl std::error::Error for AError {} - - #[test] - fn test_basic() { - let a: Vec> = vec![Ok(1), Ok(2), Ok(3)]; - let iter = a.into_iter(); - let mut iter = Buffered::new(iter, 2); - - assert_eq!(iter.next(), Some(Ok(vec![1, 2]))); - assert_eq!(iter.next(), Some(Ok(vec![3]))); - assert_eq!(iter.next(), None); - } - - #[test] - fn test_error_first() { - let a: Vec> = - vec![Ok(1), Ok(2), Err(AError {}), Ok(4), Ok(5)]; - let iter = a.into_iter(); - let mut iter = Buffered::new(iter, 2); - - assert_eq!(iter.next(), Some(Ok(vec![1, 2]))); - assert_eq!(iter.next(), Some(Err(AError {}))); - // 4 is here: it was not skipped on the previous - assert_eq!(iter.n(), 0); - assert_eq!(iter.next(), Some(Ok(vec![4, 5]))); - assert_eq!(iter.next(), None); - } - - #[test] - fn test_error_last() { - let a: Vec> = vec![Ok(1), Err(AError {}), Ok(3), Ok(4)]; - let iter = a.into_iter(); - let mut iter = Buffered::new(iter, 2); - - assert_eq!(iter.next(), Some(Err(AError {}))); - assert_eq!(iter.n(), 1); - assert_eq!(iter.next(), Some(Ok(vec![1, 3]))); - assert_eq!(iter.next(), Some(Ok(vec![4]))); - assert_eq!(iter.next(), None); - } -} diff --git a/rust/arrow/src/util/mod.rs b/rust/arrow/src/util/mod.rs index 053d1329631..0f95043ea9d 100644 --- a/rust/arrow/src/util/mod.rs +++ b/rust/arrow/src/util/mod.rs @@ -17,7 +17,6 @@ pub mod bit_chunk_iterator; pub mod bit_util; -pub mod buffered_iterator; pub mod display; pub mod integration_util; #[cfg(feature = "prettyprint")] diff --git a/rust/benchmarks/Cargo.toml b/rust/benchmarks/Cargo.toml index 7d7b6c97342..6017d88e1e0 100644 --- a/rust/benchmarks/Cargo.toml +++ b/rust/benchmarks/Cargo.toml @@ -31,4 +31,4 @@ parquet = { path = "../parquet" } datafusion = { path = "../datafusion" } structopt = { version = "0.3", default-features = false } tokio = { version = "0.2", features = ["macros", "rt-core", "rt-threaded"] } -futures = "0.3" +futures = "0.3" \ No newline at end of file diff --git a/rust/benchmarks/src/bin/nyctaxi.rs b/rust/benchmarks/src/bin/nyctaxi.rs index 02a790bdcaf..1ffa6842061 100644 --- a/rust/benchmarks/src/bin/nyctaxi.rs +++ b/rust/benchmarks/src/bin/nyctaxi.rs @@ -92,7 +92,7 @@ async fn datafusion_sql_benchmarks( debug: bool, ) -> Result<()> { let mut queries = HashMap::new(); - queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MIN(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count"); + queries.insert("fare_amt_by_passenger", "SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), SUM(fare_amount) FROM tripdata GROUP BY passenger_count"); for (name, sql) in &queries { println!("Executing '{}'", name); for i in 0..iterations {