Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions rust/arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,29 @@ impl TryFrom<&SchemaResult> for Schema {
pub fn flight_data_to_arrow_batch(
data: &FlightData,
schema: SchemaRef,
) -> Result<Option<RecordBatch>> {
) -> Option<Result<RecordBatch>> {
// check that the data_header is a record batch message
let message = arrow::ipc::get_root_as_message(&data.data_header[..]);
let dictionaries_by_field = Vec::new();
let batch_header = message.header_as_record_batch().ok_or_else(|| {
ArrowError::ParseError(
"Unable to convert flight data header to a record batch".to_string(),

message
.header_as_record_batch()
.ok_or_else(|| {
ArrowError::ParseError(
"Unable to convert flight data header to a record batch".to_string(),
)
})
.map_or_else(
|err| Some(Err(err)),
|batch| {
Some(reader::read_record_batch(
&data.data_body,
batch,
schema,
&dictionaries_by_field,
))
},
)
})?;
reader::read_record_batch(
&data.data_body,
batch_header,
schema,
&dictionaries_by_field,
)
}

// TODO: add more explicit conversion that expoess flight descriptor and metadata options
81 changes: 46 additions & 35 deletions rust/arrow/src/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,32 +313,7 @@ impl<R: Read> Reader<R> {
}
}

/// Read the next batch of rows
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Result<Option<RecordBatch>> {
// read a batch of rows into memory
let mut rows: Vec<StringRecord> = Vec::with_capacity(self.batch_size);
for i in 0..self.batch_size {
match self.record_iter.next() {
Some(Ok(r)) => {
rows.push(r);
}
Some(Err(e)) => {
return Err(ArrowError::ParseError(format!(
"Error parsing line {}: {:?}",
self.line_number + i,
e
)));
}
None => break,
}
}

// return early if no data was loaded
if rows.is_empty() {
return Ok(None);
}

fn parse(&self, rows: &[StringRecord]) -> Result<RecordBatch> {
let projection: Vec<usize> = match self.projection {
Some(ref v) => v.clone(),
None => self
Expand All @@ -350,7 +325,6 @@ impl<R: Read> Reader<R> {
.collect(),
};

let rows = &rows[..];
let arrays: Result<Vec<ArrayRef>> = projection
.iter()
.map(|i| {
Expand Down Expand Up @@ -398,8 +372,6 @@ impl<R: Read> Reader<R> {
})
.collect();

self.line_number += rows.len();

let schema_fields = self.schema.fields();

let projected_fields: Vec<Field> = projection
Expand All @@ -409,7 +381,7 @@ impl<R: Read> Reader<R> {

let projected_schema = Arc::new(Schema::new(projected_fields));

arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr).map(Some))
arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr))
}

fn build_primitive_array<T: ArrowPrimitiveType>(
Expand Down Expand Up @@ -448,6 +420,42 @@ impl<R: Read> Reader<R> {
}
}

impl<R: Read> Iterator for Reader<R> {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
// read a batch of rows into memory
let mut rows: Vec<StringRecord> = Vec::with_capacity(self.batch_size);
for i in 0..self.batch_size {
match self.record_iter.next() {
Some(Ok(r)) => {
rows.push(r);
}
Some(Err(e)) => {
return Some(Err(ArrowError::ParseError(format!(
"Error parsing line {}: {:?}",
self.line_number + i,
e
))));
}
None => break,
}
}

// return early if no data was loaded
if rows.is_empty() {
return None;
}

// parse the batches into a RecordBatch
let result = self.parse(&rows);

self.line_number += rows.len();

Some(result)
}
}

/// CSV file reader builder
#[derive(Debug)]
pub struct ReaderBuilder {
Expand Down Expand Up @@ -832,11 +840,14 @@ mod tests {

let mut csv = builder.build(file).unwrap();
match csv.next() {
Err(e) => assert_eq!(
"ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")",
format!("{:?}", e)
),
Ok(_) => panic!("should have failed"),
Some(e) => match e {
Err(e) => assert_eq!(
"ParseError(\"Error while parsing value 4.x4 for column 1 at line 4\")",
format!("{:?}", e)
),
Ok(_) => panic!("should have failed"),
}
None => panic!("should have failed"),
}
}

Expand Down
Loading