diff --git a/rust/arrow/benches/cast_kernels.rs b/rust/arrow/benches/cast_kernels.rs index 0d1759957f3..dbad552393f 100644 --- a/rust/arrow/benches/cast_kernels.rs +++ b/rust/arrow/benches/cast_kernels.rs @@ -29,100 +29,121 @@ use arrow::array::*; use arrow::compute::cast; use arrow::datatypes::*; -// cast array from specified primitive array type to desired data type -fn cast_array(size: usize, to_type: DataType) +fn build_array(size: usize) -> ArrayRef where FROM: ArrowNumericType, Standard: Distribution, - PrimitiveArray: std::convert::From>, + PrimitiveArray: std::convert::From>>, { - let array = Arc::new(PrimitiveArray::::from(vec![ - random::(); - size - ])) as ArrayRef; - criterion::black_box(cast(&array, &to_type).unwrap()); + let values = (0..size) + .map(|_| { + // 10% nulls, i.e. dense. + if random::() < 0.1 { + None + } else { + Some(random::()) + } + }) + .collect(); + + Arc::new(PrimitiveArray::::from(values)) } -// cast timestamp array from specified primitive array type to desired data type -fn cast_timestamp_array(size: usize, to_type: DataType) +fn build_timestamp_array(size: usize) -> ArrayRef where FROM: ArrowTimestampType, - Standard: Distribution, + Standard: Distribution, { - let array = Arc::new(PrimitiveArray::::from_vec( - vec![random::(); size], - None, - )) as ArrayRef; - criterion::black_box(cast(&array, &to_type).unwrap()); + let values = (0..size) + .map(|_| { + if random::() < 0.5 { + None + } else { + Some(random::()) + } + }) + .collect::>>(); + + Arc::new(PrimitiveArray::::from_opt_vec(values, None)) +} + +// cast array from specified primitive array type to desired data type +fn cast_array(array: &ArrayRef, to_type: DataType) { + criterion::black_box(cast(array, &to_type).unwrap()); } fn add_benchmark(c: &mut Criterion) { + let i32_array = build_array::(512); + let i64_array = build_array::(512); + let f32_array = build_array::(512); + let f64_array = build_array::(512); + let date64_array = build_array::(512); + let date32_array = build_array::(512); + let time32s_array = build_array::(512); + let time64ns_array = build_array::(512); + let time_ns_array = build_timestamp_array::(512); + let time_ms_array = build_timestamp_array::(512); + c.bench_function("cast int32 to int32 512", |b| { - b.iter(|| cast_array::(512, DataType::Int32)) + b.iter(|| cast_array(&i32_array, DataType::Int32)) }); c.bench_function("cast int32 to uint32 512", |b| { - b.iter(|| cast_array::(512, DataType::UInt32)) + b.iter(|| cast_array(&i32_array, DataType::UInt32)) }); c.bench_function("cast int32 to float32 512", |b| { - b.iter(|| cast_array::(512, DataType::Float32)) + b.iter(|| cast_array(&i32_array, DataType::Float32)) }); c.bench_function("cast int32 to float64 512", |b| { - b.iter(|| cast_array::(512, DataType::Float64)) + b.iter(|| cast_array(&i32_array, DataType::Float64)) }); c.bench_function("cast int32 to int64 512", |b| { - b.iter(|| cast_array::(512, DataType::Int64)) + b.iter(|| cast_array(&i32_array, DataType::Int64)) }); c.bench_function("cast float32 to int32 512", |b| { - b.iter(|| cast_array::(512, DataType::Int32)) + b.iter(|| cast_array(&f32_array, DataType::Int32)) }); c.bench_function("cast float64 to float32 512", |b| { - b.iter(|| cast_array::(512, DataType::Float32)) + b.iter(|| cast_array(&f64_array, DataType::Float32)) }); c.bench_function("cast float64 to uint64 512", |b| { - b.iter(|| cast_array::(512, DataType::UInt64)) + b.iter(|| cast_array(&f64_array, DataType::UInt64)) }); c.bench_function("cast int64 to int32 512", |b| { - b.iter(|| cast_array::(512, DataType::Int32)) + b.iter(|| cast_array(&i64_array, DataType::Int32)) }); c.bench_function("cast date64 to date32 512", |b| { - b.iter(|| cast_array::(512, DataType::Date32(DateUnit::Day))) + b.iter(|| cast_array(&date64_array, DataType::Date32(DateUnit::Day))) }); c.bench_function("cast date32 to date64 512", |b| { - b.iter(|| cast_array::(512, DataType::Date64(DateUnit::Millisecond))) + b.iter(|| cast_array(&date32_array, DataType::Date64(DateUnit::Millisecond))) }); c.bench_function("cast time32s to time32ms 512", |b| { - b.iter(|| { - cast_array::(512, DataType::Time32(TimeUnit::Millisecond)) - }) + b.iter(|| cast_array(&time32s_array, DataType::Time32(TimeUnit::Millisecond))) }); c.bench_function("cast time32s to time64us 512", |b| { - b.iter(|| { - cast_array::(512, DataType::Time64(TimeUnit::Microsecond)) - }) + b.iter(|| cast_array(&time32s_array, DataType::Time64(TimeUnit::Microsecond))) }); c.bench_function("cast time64ns to time32s 512", |b| { - b.iter(|| { - cast_array::(512, DataType::Time32(TimeUnit::Second)) - }) + b.iter(|| cast_array(&time64ns_array, DataType::Time32(TimeUnit::Second))) }); c.bench_function("cast timestamp_ns to timestamp_s 512", |b| { b.iter(|| { - cast_timestamp_array::( - 512, + cast_array( + &time_ns_array, DataType::Timestamp(TimeUnit::Nanosecond, None), ) }) }); c.bench_function("cast timestamp_ms to timestamp_ns 512", |b| { b.iter(|| { - cast_timestamp_array::( - 512, + cast_array( + &time_ms_array, DataType::Timestamp(TimeUnit::Nanosecond, None), ) }) }); c.bench_function("cast timestamp_ms to i64 512", |b| { - b.iter(|| cast_timestamp_array::(512, DataType::Int64)) + b.iter(|| cast_array(&time_ms_array, DataType::Int64)) }); } diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs index e14b34fb590..e26e9294afa 100644 --- a/rust/arrow/src/array/array.rs +++ b/rust/arrow/src/array/array.rs @@ -16,6 +16,7 @@ // under the License. use std::any::Any; +use std::borrow::Borrow; use std::convert::{From, TryFrom}; use std::fmt; use std::io::Write; @@ -693,6 +694,61 @@ impl fmt::Debug for PrimitiveArray { } } +impl<'a, T: ArrowPrimitiveType> IntoIterator for &'a PrimitiveArray { + type Item = Option<::Native>; + type IntoIter = PrimitiveIter<'a, T>; + + fn into_iter(self) -> Self::IntoIter { + PrimitiveIter::<'a, T>::new(self) + } +} + +impl<'a, T: ArrowPrimitiveType> PrimitiveArray { + /// constructs a new iterator + pub fn iter(&'a self) -> PrimitiveIter<'a, T> { + PrimitiveIter::<'a, T>::new(&self) + } +} + +impl::Native>>> + FromIterator for PrimitiveArray +{ + fn from_iter>(iter: I) -> Self { + let iter = iter.into_iter(); + let (_, data_len) = iter.size_hint(); + let data_len = data_len.expect("Iterator must be sized"); // panic if no upper bound. + + let num_bytes = bit_util::ceil(data_len, 8); + let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + let mut val_buf = MutableBuffer::new( + data_len * mem::size_of::<::Native>(), + ); + + let null = vec![0; mem::size_of::<::Native>()]; + + let null_slice = null_buf.data_mut(); + iter.enumerate().for_each(|(i, item)| { + if let Some(a) = item.borrow() { + bit_util::set_bit(null_slice, i); + val_buf.write_all(a.to_byte_slice()).unwrap(); + } else { + val_buf.write_all(&null).unwrap(); + } + }); + + let data = ArrayData::new( + T::get_data_type(), + data_len, + None, + Some(null_buf.freeze()), + 0, + vec![val_buf.freeze()], + vec![], + ); + PrimitiveArray::from(Arc::new(data)) + } +} + // TODO: the macro is needed here because we'd get "conflicting implementations" error // otherwise with both `From>` and `From>>`. // We should revisit this in future. @@ -713,34 +769,7 @@ macro_rules! def_numeric_from_vec { for PrimitiveArray<$ty> { fn from(data: Vec::Native>>) -> Self { - let data_len = data.len(); - let mut null_buf = make_null_buffer(data_len); - let mut val_buf = MutableBuffer::new( - data_len * mem::size_of::<<$ty as ArrowPrimitiveType>::Native>(), - ); - - { - let null = - vec![0; mem::size_of::<<$ty as ArrowPrimitiveType>::Native>()]; - let null_slice = null_buf.data_mut(); - for (i, v) in data.iter().enumerate() { - if let Some(n) = v { - bit_util::set_bit(null_slice, i); - // unwrap() in the following should be safe here since we've - // made sure enough space is allocated for the values. - val_buf.write_all(&n.to_byte_slice()).unwrap(); - } else { - val_buf.write_all(&null).unwrap(); - } - } - } - - let array_data = ArrayData::builder($ty::get_data_type()) - .len(data_len) - .add_buffer(val_buf.freeze()) - .null_bit_buffer(null_buf.freeze()) - .build(); - PrimitiveArray::from(array_data) + PrimitiveArray::from_iter(data.iter()) } } }; diff --git a/rust/arrow/src/array/iterator.rs b/rust/arrow/src/array/iterator.rs new file mode 100644 index 00000000000..050bba6f4ae --- /dev/null +++ b/rust/arrow/src/array/iterator.rs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::datatypes::ArrowPrimitiveType; + +use super::{Array, PrimitiveArray, PrimitiveArrayOps}; + +/// an iterator that returns Some(T) or None, that can be used on any non-boolean PrimitiveArray +#[derive(Debug)] +pub struct PrimitiveIter<'a, T: ArrowPrimitiveType> { + array: &'a PrimitiveArray, + i: usize, + len: usize, +} + +impl<'a, T: ArrowPrimitiveType> PrimitiveIter<'a, T> { + /// create a new iterator + pub fn new(array: &'a PrimitiveArray) -> Self { + PrimitiveIter:: { + array, + i: 0, + len: array.len(), + } + } +} + +impl<'a, T: ArrowPrimitiveType> std::iter::Iterator for PrimitiveIter<'a, T> { + type Item = Option; + + fn next(&mut self) -> Option { + let i = self.i; + if i >= self.len { + None + } else if self.array.is_null(i) { + self.i += 1; + Some(None) + } else { + self.i += 1; + Some(Some(self.array.value(i))) + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.len, Some(self.len)) + } +} + +/// all arrays have known size. +impl<'a, T: ArrowPrimitiveType> std::iter::ExactSizeIterator for PrimitiveIter<'a, T> {} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::array::{ArrayRef, Int32Array}; + + #[test] + fn test_primitive_array_iter_round_trip() { + let array = Int32Array::from(vec![Some(0), None, Some(2), None, Some(4)]); + let array = Arc::new(array) as ArrayRef; + + let array = array.as_any().downcast_ref::().unwrap(); + + // to and from iter, with a +1 + let result: Int32Array = + array.iter().map(|e| e.and_then(|e| Some(e + 1))).collect(); + + let expected = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]); + assert_eq!(result, expected); + } +} diff --git a/rust/arrow/src/array/mod.rs b/rust/arrow/src/array/mod.rs index 9debbb6b0ad..f5455b82f1e 100644 --- a/rust/arrow/src/array/mod.rs +++ b/rust/arrow/src/array/mod.rs @@ -87,6 +87,7 @@ mod builder; mod cast; mod data; mod equal; +mod iterator; mod null; mod ord; mod union; @@ -239,6 +240,10 @@ pub type DurationMillisecondBuilder = PrimitiveBuilder; pub type DurationMicrosecondBuilder = PrimitiveBuilder; pub type DurationNanosecondBuilder = PrimitiveBuilder; +// --------------------- Array Iterator --------------------- + +pub use self::iterator::*; + // --------------------- Array Equality --------------------- pub use self::equal::ArrayEqual; diff --git a/rust/arrow/src/compute/kernels/cast.rs b/rust/arrow/src/compute/kernels/cast.rs index d8cb480a80c..7a0dcd36433 100644 --- a/rust/arrow/src/compute/kernels/cast.rs +++ b/rust/arrow/src/compute/kernels/cast.rs @@ -378,7 +378,7 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result { (Time32(from_unit), Time64(to_unit)) => { let time_array = Int32Array::from(array.data()); // note: (numeric_cast + SIMD multiply) is faster than (cast & multiply) - let c: Int64Array = numeric_cast(&time_array)?; + let c: Int64Array = numeric_cast(&time_array); let from_size = time_unit_multiple(&from_unit); let to_size = time_unit_multiple(&to_unit); // from is only smaller than to if 64milli/64second don't exist @@ -590,37 +590,27 @@ where FROM::Native: num::NumCast, TO::Native: num::NumCast, { - numeric_cast::( + Ok(Arc::new(numeric_cast::( from.as_any() .downcast_ref::>() .unwrap(), - ) - .map(|to| Arc::new(to) as ArrayRef) + ))) } /// Natural cast between numeric types -fn numeric_cast(from: &PrimitiveArray) -> Result> +fn numeric_cast(from: &PrimitiveArray) -> PrimitiveArray where T: ArrowNumericType, R: ArrowNumericType, T::Native: num::NumCast, R::Native: num::NumCast, { - let mut b = PrimitiveBuilder::::new(from.len()); - - for i in 0..from.len() { - if from.is_null(i) { - b.append_null()?; - } else { - // some casts return None, such as a negative value to u{8|16|32|64} - match num::cast::cast(from.value(i)) { - Some(v) => b.append_value(v)?, - None => b.append_null()?, - }; - } - } - - Ok(b.finish()) + from.iter() + .map(|v| match v { + Some(v) => num::cast::cast::(v), + None => None, + }) + .collect() } /// Cast numeric types to Utf8 @@ -661,28 +651,27 @@ fn cast_string_to_numeric(from: &ArrayRef) -> Result where TO: ArrowNumericType, { - string_to_numeric_cast::(from.as_any().downcast_ref::().unwrap()) - .map(|to| Arc::new(to) as ArrayRef) + Ok(Arc::new(string_to_numeric_cast::( + from.as_any().downcast_ref::().unwrap(), + ))) } -fn string_to_numeric_cast(from: &StringArray) -> Result> +fn string_to_numeric_cast(from: &StringArray) -> PrimitiveArray where T: ArrowNumericType, { - let mut b = PrimitiveBuilder::::new(from.len()); - - for i in 0..from.len() { - if from.is_null(i) { - b.append_null()?; - } else { - match from.value(i).parse::() { - Ok(v) => b.append_value(v)?, - _ => b.append_null()?, - }; - } - } - - Ok(b.finish()) + (0..from.len()) + .map(|i| { + if from.is_null(i) { + None + } else { + match from.value(i).parse::() { + Ok(v) => Some(v), + Err(_) => None, + } + } + }) + .collect() } /// Cast numeric types to Boolean @@ -727,32 +716,28 @@ where TO: ArrowNumericType, TO::Native: num::cast::NumCast, { - bool_to_numeric_cast::(from.as_any().downcast_ref::().unwrap()) - .map(|to| Arc::new(to) as ArrayRef) + Ok(Arc::new(bool_to_numeric_cast::( + from.as_any().downcast_ref::().unwrap(), + ))) } -fn bool_to_numeric_cast(from: &BooleanArray) -> Result> +fn bool_to_numeric_cast(from: &BooleanArray) -> PrimitiveArray where T: ArrowNumericType, T::Native: num::NumCast, { - let mut b = PrimitiveBuilder::::new(from.len()); - - for i in 0..from.len() { - if from.is_null(i) { - b.append_null()?; - } else if from.value(i) { - // a workaround to cast a primitive to T::Native, infallible - match num::cast::cast(1) { - Some(v) => b.append_value(v)?, - None => b.append_null()?, - }; - } else { - b.append_value(T::default_value())?; - } - } - - Ok(b.finish()) + (0..from.len()) + .map(|i| { + if from.is_null(i) { + None + } else if from.value(i) { + // a workaround to cast a primitive to T::Native, infallible + num::cast::cast(1) + } else { + Some(T::default_value()) + } + }) + .collect() } #[cfg(test)] diff --git a/rust/arrow/src/csv/reader.rs b/rust/arrow/src/csv/reader.rs index 0f36648c506..029f66c9870 100644 --- a/rust/arrow/src/csv/reader.rs +++ b/rust/arrow/src/csv/reader.rs @@ -50,7 +50,7 @@ use std::sync::Arc; use csv as csv_crate; -use crate::array::{ArrayRef, PrimitiveBuilder, StringBuilder}; +use crate::array::{ArrayRef, PrimitiveArray, StringBuilder}; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; @@ -417,34 +417,38 @@ impl Reader { rows: &[StringRecord], col_idx: usize, ) -> Result { - let mut builder = PrimitiveBuilder::::new(rows.len()); let is_boolean_type = *self.schema.field(col_idx).data_type() == DataType::Boolean; - for (row_index, row) in rows.iter().enumerate() { - match row.get(col_idx) { - Some(s) if !s.is_empty() => { - let t = if is_boolean_type { - s.to_lowercase().parse::() - } else { - s.parse::() - }; - match t { - Ok(v) => builder.append_value(v)?, - Err(_) => { - // TODO: we should surface the underlying error here. - return Err(ArrowError::ParseError(format!( + + rows.iter() + .enumerate() + .map(|(row_index, row)| { + match row.get(col_idx) { + Some(s) => { + if s.is_empty() { + return Ok(None); + } + let parsed = if is_boolean_type { + s.to_lowercase().parse::() + } else { + s.parse::() + }; + match parsed { + Ok(e) => Ok(Some(e)), + Err(_) => Err(ArrowError::ParseError(format!( + // TODO: we should surface the underlying error here. "Error while parsing value {} for column {} at line {}", s, col_idx, self.line_number + row_index - ))); + ))), } } + None => Ok(None), } - _ => builder.append_null()?, - } - } - Ok(Arc::new(builder.finish())) + }) + .collect::>>() + .map(|e| Arc::new(e) as ArrayRef) } } diff --git a/rust/arrow/src/json/reader.rs b/rust/arrow/src/json/reader.rs index a270052e8e9..aaec8459102 100644 --- a/rust/arrow/src/json/reader.rs +++ b/rust/arrow/src/json/reader.rs @@ -671,22 +671,15 @@ impl Reader { T: ArrowNumericType, T::Native: num::NumCast, { - let mut builder = PrimitiveBuilder::::new(rows.len()); - for row in rows { - if let Some(value) = row.get(&col_name) { - // check that value is of expected datatype - match value.as_f64() { - Some(v) => match num::cast::cast(v) { - Some(v) => builder.append_value(v)?, - None => builder.append_null()?, - }, - None => builder.append_null()?, - } - } else { - builder.append_null()?; - } - } - Ok(Arc::new(builder.finish())) + Ok(Arc::new( + rows.iter() + .map(|row| { + row.get(&col_name) + .and_then(|value| value.as_f64()) + .and_then(num::cast::cast) + }) + .collect::>(), + )) } fn build_list_array(