diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index e94772d82e7..81aedfeb9c9 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -57,6 +57,7 @@ default = ["simd"] [dev-dependencies] criterion = "0.2" lazy_static = "1" +flate2 = "1" [[bench]] name = "array_from_vec" diff --git a/rust/arrow/src/array/array.rs b/rust/arrow/src/array/array.rs index 8dcc81e084a..1756b35fb57 100644 --- a/rust/arrow/src/array/array.rs +++ b/rust/arrow/src/array/array.rs @@ -1544,6 +1544,35 @@ impl fmt::Debug for StructArray { } } +impl From<(Vec<(Field, ArrayRef)>, Buffer, usize)> for StructArray { + fn from(triple: (Vec<(Field, ArrayRef)>, Buffer, usize)) -> Self { + let (field_types, field_values): (Vec<_>, Vec<_>) = triple.0.into_iter().unzip(); + + // Check the length of the child arrays + let length = field_values[0].len(); + for i in 1..field_values.len() { + assert_eq!( + length, + field_values[i].len(), + "all child arrays of a StructArray must have the same length" + ); + assert_eq!( + field_types[i].data_type(), + field_values[i].data().data_type(), + "the field data types must match the array data in a StructArray" + ) + } + + let data = ArrayData::builder(DataType::Struct(field_types)) + .null_bit_buffer(triple.1) + .child_data(field_values.into_iter().map(|a| a.data()).collect()) + .len(length) + .null_count(triple.2) + .build(); + Self::from(data) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index e4502b770e3..2f047965653 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -184,7 +184,8 @@ impl ArrowNativeType for u64 { impl ArrowNativeType for f32 { fn into_json_value(self) -> Option { - Number::from_f64(self as f64).map(|num| VNumber(num)) + Number::from_f64(f64::round(self as f64 * 1000.0) / 1000.0) + .map(|num| VNumber(num)) } } diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index abcabd70ef0..97fcde01bbc 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -17,11 +17,12 @@ //! Utilities for converting between IPC types and native Arrow types -use crate::datatypes::DataType::*; -use crate::datatypes::Schema; +use crate::datatypes::{DataType, DateUnit, Field, Schema, TimeUnit}; use crate::ipc; -use flatbuffers::FlatBufferBuilder; +use flatbuffers::{ + FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, +}; /// Serialize a schema in IPC format fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { @@ -30,21 +31,17 @@ fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { let mut fields = vec![]; for field in schema.fields() { let fb_field_name = fbb.create_string(field.name().as_str()); + let (ipc_type_type, ipc_type, ipc_children) = + get_fb_field_type(field.data_type(), &mut fbb); let mut field_builder = ipc::FieldBuilder::new(&mut fbb); field_builder.add_name(fb_field_name); - let ipc_type = match field.data_type() { - Boolean => ipc::Type::Bool, - UInt8 | UInt16 | UInt32 | UInt64 => ipc::Type::Int, - Int8 | Int16 | Int32 | Int64 => ipc::Type::Int, - Float32 | Float64 => ipc::Type::FloatingPoint, - Utf8 => ipc::Type::Utf8, - Date32(_) | Date64(_) => ipc::Type::Date, - Time32(_) | Time64(_) => ipc::Type::Time, - Timestamp(_) => ipc::Type::Timestamp, - _ => ipc::Type::NONE, - }; - field_builder.add_type_type(ipc_type); + field_builder.add_type_type(ipc_type_type); field_builder.add_nullable(field.is_nullable()); + match ipc_children { + None => {} + Some(children) => field_builder.add_children(children), + }; + field_builder.add_type_(ipc_type); fields.push(field_builder.finish()); } @@ -61,16 +58,367 @@ fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder { fbb } +/// Convert an IPC Field to Arrow Field +impl<'a> From> for Field { + fn from(field: ipc::Field) -> Field { + Field::new( + field.name().unwrap(), + get_data_type(field), + field.nullable(), + ) + } +} + +/// Deserialize a Schema table from IPC format to Schema data type +pub fn fb_to_schema(fb: ipc::Schema) -> Schema { + let mut fields: Vec = vec![]; + let c_fields = fb.fields().unwrap(); + let len = c_fields.len(); + for i in 0..len { + let c_field: ipc::Field = c_fields.get(i); + fields.push(c_field.into()); + } + Schema::new(fields) +} + +/// Get the Arrow data type from the flatbuffer Field table +fn get_data_type(field: ipc::Field) -> DataType { + match field.type_type() { + ipc::Type::Bool => DataType::Boolean, + ipc::Type::Int => { + let int = field.type__as_int().unwrap(); + match (int.bitWidth(), int.is_signed()) { + (8, true) => DataType::Int8, + (8, false) => DataType::UInt8, + (16, true) => DataType::Int16, + (16, false) => DataType::UInt16, + (32, true) => DataType::Int32, + (32, false) => DataType::UInt32, + (64, true) => DataType::Int64, + (64, false) => DataType::UInt64, + _ => panic!("Unexpected bitwidth and signed"), + } + } + ipc::Type::Binary => DataType::Binary, + ipc::Type::Utf8 => DataType::Utf8, + ipc::Type::FixedSizeBinary => { + let fsb = field.type__as_fixed_size_binary().unwrap(); + DataType::FixedSizeBinary(fsb.byteWidth()) + } + ipc::Type::FloatingPoint => { + let float = field.type__as_floating_point().unwrap(); + match float.precision() { + ipc::Precision::HALF => DataType::Float16, + ipc::Precision::SINGLE => DataType::Float32, + ipc::Precision::DOUBLE => DataType::Float64, + } + } + ipc::Type::Date => { + let date = field.type__as_date().unwrap(); + match date.unit() { + ipc::DateUnit::DAY => DataType::Date32(DateUnit::Day), + ipc::DateUnit::MILLISECOND => DataType::Date64(DateUnit::Millisecond), + } + } + ipc::Type::Time => { + let time = field.type__as_time().unwrap(); + match (time.bitWidth(), time.unit()) { + (32, ipc::TimeUnit::SECOND) => DataType::Time32(TimeUnit::Second), + (32, ipc::TimeUnit::MILLISECOND) => { + DataType::Time32(TimeUnit::Millisecond) + } + (64, ipc::TimeUnit::MICROSECOND) => { + DataType::Time64(TimeUnit::Microsecond) + } + (64, ipc::TimeUnit::NANOSECOND) => DataType::Time64(TimeUnit::Nanosecond), + z @ _ => panic!( + "Time type with bit width of {} and unit of {:?} not supported", + z.0, z.1 + ), + } + } + ipc::Type::Timestamp => { + let timestamp = field.type__as_timestamp().unwrap(); + match timestamp.unit() { + ipc::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second), + ipc::TimeUnit::MILLISECOND => DataType::Timestamp(TimeUnit::Millisecond), + ipc::TimeUnit::MICROSECOND => DataType::Timestamp(TimeUnit::Microsecond), + ipc::TimeUnit::NANOSECOND => DataType::Timestamp(TimeUnit::Nanosecond), + } + } + ipc::Type::List => { + let children = field.children().unwrap(); + if children.len() != 1 { + panic!("expect a list to have one child") + } + let child_field = children.get(0); + // returning int16 for now, to test, not sure how to get data type + DataType::List(Box::new(get_data_type(child_field))) + } + ipc::Type::FixedSizeList => { + let children = field.children().unwrap(); + if children.len() != 1 { + panic!("expect a list to have one child") + } + let child_field = children.get(0); + let fsl = field.type__as_fixed_size_list().unwrap(); + DataType::FixedSizeList(( + Box::new(get_data_type(child_field)), + fsl.listSize(), + )) + } + ipc::Type::Struct_ => { + let mut fields = vec![]; + if let Some(children) = field.children() { + for i in 0..children.len() { + fields.push(children.get(i).into()); + } + }; + + DataType::Struct(fields) + } + // TODO add interval support + t @ _ => unimplemented!("Type {:?} not supported", t), + } +} + +/// Get the IPC type of a data type +fn get_fb_field_type<'a: 'b, 'b>( + data_type: &DataType, + mut fbb: &mut FlatBufferBuilder<'a>, +) -> ( + ipc::Type, + WIPOffset, + Option>>>>, +) { + use DataType::*; + match data_type { + Boolean => ( + ipc::Type::Bool, + ipc::BoolBuilder::new(&mut fbb).finish().as_union_value(), + None, + ), + UInt8 | UInt16 | UInt32 | UInt64 => { + let mut builder = ipc::IntBuilder::new(&mut fbb); + builder.add_is_signed(false); + match data_type { + UInt8 => builder.add_bitWidth(8), + UInt16 => builder.add_bitWidth(16), + UInt32 => builder.add_bitWidth(32), + UInt64 => builder.add_bitWidth(64), + _ => {} + }; + (ipc::Type::Int, builder.finish().as_union_value(), None) + } + Int8 | Int16 | Int32 | Int64 => { + let mut builder = ipc::IntBuilder::new(&mut fbb); + builder.add_is_signed(true); + match data_type { + Int8 => builder.add_bitWidth(8), + Int16 => builder.add_bitWidth(16), + Int32 => builder.add_bitWidth(32), + Int64 => builder.add_bitWidth(64), + _ => {} + }; + (ipc::Type::Int, builder.finish().as_union_value(), None) + } + Float16 | Float32 | Float64 => { + let mut builder = ipc::FloatingPointBuilder::new(&mut fbb); + match data_type { + Float16 => builder.add_precision(ipc::Precision::HALF), + Float32 => builder.add_precision(ipc::Precision::SINGLE), + Float64 => builder.add_precision(ipc::Precision::DOUBLE), + _ => {} + }; + ( + ipc::Type::FloatingPoint, + builder.finish().as_union_value(), + None, + ) + } + Utf8 => ( + ipc::Type::Utf8, + ipc::Utf8Builder::new(&mut fbb).finish().as_union_value(), + None, + ), + Date32(_) => { + let mut builder = ipc::DateBuilder::new(&mut fbb); + builder.add_unit(ipc::DateUnit::DAY); + (ipc::Type::Date, builder.finish().as_union_value(), None) + } + Date64(_) => { + let mut builder = ipc::DateBuilder::new(&mut fbb); + builder.add_unit(ipc::DateUnit::MILLISECOND); + (ipc::Type::Date, builder.finish().as_union_value(), None) + } + Time32(unit) | Time64(unit) => { + let mut builder = ipc::TimeBuilder::new(&mut fbb); + match unit { + TimeUnit::Second => { + builder.add_bitWidth(32); + builder.add_unit(ipc::TimeUnit::SECOND); + } + TimeUnit::Millisecond => { + builder.add_bitWidth(32); + builder.add_unit(ipc::TimeUnit::MILLISECOND); + } + TimeUnit::Microsecond => { + builder.add_bitWidth(64); + builder.add_unit(ipc::TimeUnit::MICROSECOND); + } + TimeUnit::Nanosecond => { + builder.add_bitWidth(64); + builder.add_unit(ipc::TimeUnit::NANOSECOND); + } + } + (ipc::Type::Time, builder.finish().as_union_value(), None) + } + Timestamp(unit) => { + let mut builder = ipc::TimestampBuilder::new(&mut fbb); + let time_unit = match unit { + TimeUnit::Second => ipc::TimeUnit::SECOND, + TimeUnit::Millisecond => ipc::TimeUnit::MILLISECOND, + TimeUnit::Microsecond => ipc::TimeUnit::MICROSECOND, + TimeUnit::Nanosecond => ipc::TimeUnit::NANOSECOND, + }; + builder.add_unit(time_unit); + ( + ipc::Type::Timestamp, + builder.finish().as_union_value(), + None, + ) + } + List(ref list_type) => { + let inner_types = get_fb_field_type(list_type, &mut fbb); + let child = ipc::Field::create( + &mut fbb, + &ipc::FieldArgs { + name: None, + nullable: false, + type_type: inner_types.0, + type_: Some(inner_types.1), + dictionary: None, + children: inner_types.2, + custom_metadata: None, + }, + ); + let children = fbb.create_vector(&[child]); + ( + ipc::Type::List, + ipc::ListBuilder::new(&mut fbb).finish().as_union_value(), + Some(children), + ) + } + Struct(fields) => { + // struct's fields are children + let mut children = vec![]; + for field in fields { + let inner_types = get_fb_field_type(field.data_type(), &mut fbb); + let field_name = fbb.create_string(field.name()); + children.push(ipc::Field::create( + &mut fbb, + &ipc::FieldArgs { + name: Some(field_name), + nullable: field.is_nullable(), + type_type: inner_types.0, + type_: Some(inner_types.1), + dictionary: None, + children: inner_types.2, + custom_metadata: None, + }, + )); + } + let children = fbb.create_vector(&children[..]); + ( + ipc::Type::Struct_, + ipc::Struct_Builder::new(&mut fbb).finish().as_union_value(), + Some(children), + ) + } + t @ _ => panic!("Unsupported Arrow Data Type {:?}", t), + } +} + #[cfg(test)] mod tests { use super::*; use crate::datatypes::{DataType, Field, Schema}; #[test] - fn convert_schema() { - let schema = Schema::new(vec![Field::new("a", DataType::UInt32, false)]); + fn convert_schema_round_trip() { + let schema = Schema::new(vec![ + Field::new("uint8", DataType::UInt8, false), + Field::new("uint16", DataType::UInt16, true), + Field::new("uint32", DataType::UInt32, false), + Field::new("uint64", DataType::UInt64, true), + Field::new("int8", DataType::Int8, true), + Field::new("int16", DataType::Int16, false), + Field::new("int32", DataType::Int32, true), + Field::new("int64", DataType::Int64, false), + Field::new("float16", DataType::Float16, true), + Field::new("float32", DataType::Float32, false), + Field::new("float64", DataType::Float64, true), + Field::new("bool", DataType::Boolean, false), + Field::new("date32", DataType::Date32(DateUnit::Day), false), + Field::new("date64", DataType::Date64(DateUnit::Millisecond), true), + Field::new("time32[s]", DataType::Time32(TimeUnit::Second), true), + Field::new("time32[ms]", DataType::Time32(TimeUnit::Millisecond), false), + Field::new("time64[us]", DataType::Time64(TimeUnit::Microsecond), false), + Field::new("time64[ns]", DataType::Time64(TimeUnit::Nanosecond), true), + Field::new("timestamp[s]", DataType::Timestamp(TimeUnit::Second), false), + Field::new( + "timestamp[ms]", + DataType::Timestamp(TimeUnit::Millisecond), + true, + ), + Field::new( + "timestamp[us]", + DataType::Timestamp(TimeUnit::Microsecond), + false, + ), + Field::new( + "timestamp[ns]", + DataType::Timestamp(TimeUnit::Nanosecond), + true, + ), + Field::new("utf8", DataType::Utf8, false), + Field::new("list[u8]", DataType::List(Box::new(DataType::UInt8)), true), + Field::new( + "list[struct]", + DataType::List(Box::new(DataType::Struct(vec![ + Field::new("float32", DataType::UInt8, false), + Field::new("int32", DataType::Int32, true), + Field::new("bool", DataType::Boolean, true), + ]))), + false, + ), + Field::new( + "struct]>]>", + DataType::Struct(vec![ + Field::new("int64", DataType::Int64, true), + Field::new( + "list[struct]>]", + DataType::List(Box::new(DataType::Struct(vec![ + Field::new("date32", DataType::Date32(DateUnit::Day), true), + Field::new( + "list[struct<>]", + DataType::List(Box::new(DataType::Struct(vec![]))), + false, + ), + ]))), + false, + ), + ]), + false, + ), + Field::new("struct<>", DataType::Struct(vec![]), true), + ]); + + let fb = schema_to_fb(&schema); - let ipc = schema_to_fb(&schema); - assert_eq!(60, ipc.finished_data().len()); + // read back fields + let ipc = ipc::get_root_as_schema(fb.finished_data()); + let schema2 = fb_to_schema(ipc); + assert_eq!(schema, schema2); } } diff --git a/rust/arrow/src/ipc/file/mod.rs b/rust/arrow/src/ipc/file/mod.rs new file mode 100644 index 00000000000..49535903ab5 --- /dev/null +++ b/rust/arrow/src/ipc/file/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod reader; diff --git a/rust/arrow/src/ipc/file/reader.rs b/rust/arrow/src/ipc/file/reader.rs new file mode 100644 index 00000000000..e01d08b2ae1 --- /dev/null +++ b/rust/arrow/src/ipc/file/reader.rs @@ -0,0 +1,569 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Arrow File Reader + +use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::sync::Arc; + +use crate::array::*; +use crate::buffer::Buffer; +use crate::compute::cast; +use crate::datatypes::{DataType, Schema, SchemaRef}; +use crate::error::{ArrowError, Result}; +use crate::ipc; +use crate::record_batch::{RecordBatch, RecordBatchReader}; +use DataType::*; + +static ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1']; + +/// Read a buffer based on offset and length +fn read_buffer(buf: &ipc::Buffer, a_data: &Vec) -> Buffer { + let start_offset = buf.offset() as usize; + let end_offset = start_offset + buf.length() as usize; + let buf_data = &a_data[start_offset..end_offset]; + Buffer::from(&buf_data) +} + +/// Coordinates reading arrays based on data types. +/// +/// Notes: +/// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls +/// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size. +/// We thus: +/// - check if the bit width of non-64-bit numbers is 64, and +/// - read the buffer as 64-bit (signed integer or float), and +/// - cast the 64-bit array to the appropriate data type +fn create_array( + nodes: &[ipc::FieldNode], + data_type: &DataType, + data: &Vec, + buffers: &[ipc::Buffer], + mut node_index: usize, + mut buffer_index: usize, +) -> (ArrayRef, usize, usize) { + use DataType::*; + let array = match data_type { + Utf8 | Binary => { + let array = create_primitive_array( + &nodes[node_index], + data_type, + buffers[buffer_index..buffer_index + 3] + .iter() + .map(|buf| read_buffer(buf, data)) + .collect(), + ); + node_index = node_index + 1; + buffer_index = buffer_index + 3; + array + } + FixedSizeBinary(_) => { + let array = create_primitive_array( + &nodes[node_index], + data_type, + buffers[buffer_index..buffer_index + 2] + .iter() + .map(|buf| read_buffer(buf, data)) + .collect(), + ); + node_index = node_index + 1; + buffer_index = buffer_index + 2; + array + } + List(ref list_data_type) => { + let list_node = &nodes[node_index]; + let list_buffers: Vec = buffers[buffer_index..buffer_index + 2] + .iter() + .map(|buf| read_buffer(buf, data)) + .collect(); + node_index = node_index + 1; + buffer_index = buffer_index + 2; + let triple = create_array( + nodes, + list_data_type, + data, + buffers, + node_index, + buffer_index, + ); + node_index = triple.1; + buffer_index = triple.2; + + create_list_array(list_node, data_type, &list_buffers[..], triple.0) + } + FixedSizeList((ref list_data_type, _)) => { + let list_node = &nodes[node_index]; + let list_buffers: Vec = buffers[buffer_index..buffer_index + 1] + .iter() + .map(|buf| read_buffer(buf, data)) + .collect(); + node_index = node_index + 1; + buffer_index = buffer_index + 1; + let triple = create_array( + nodes, + list_data_type, + data, + buffers, + node_index, + buffer_index, + ); + node_index = triple.1; + buffer_index = triple.2; + + create_list_array(list_node, data_type, &list_buffers[..], triple.0) + } + Struct(struct_fields) => { + let struct_node = &nodes[node_index]; + let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data); + node_index = node_index + 1; + buffer_index = buffer_index + 1; + + // read the arrays for each field + let mut struct_arrays = vec![]; + // TODO investigate whether just knowing the number of buffers could + // still work + for struct_field in struct_fields { + let triple = create_array( + nodes, + struct_field.data_type(), + data, + buffers, + node_index, + buffer_index, + ); + node_index = triple.1; + buffer_index = triple.2; + struct_arrays.push((struct_field.clone(), triple.0)); + } + let null_count = struct_node.null_count() as usize; + let struct_array = if null_count > 0 { + // create struct array from fields, arrays and null data + StructArray::from(( + struct_arrays, + null_buffer, + struct_node.null_count() as usize, + )) + } else { + StructArray::from(struct_arrays) + }; + Arc::new(struct_array) + } + _ => { + let array = create_primitive_array( + &nodes[node_index], + data_type, + buffers[buffer_index..buffer_index + 2] + .iter() + .map(|buf| read_buffer(buf, data)) + .collect(), + ); + node_index = node_index + 1; + buffer_index = buffer_index + 2; + array + } + }; + (array, node_index, buffer_index) +} + +/// Reads the correct number of buffers based on data type and null_count, and creates a +/// primitive array ref +fn create_primitive_array( + field_node: &ipc::FieldNode, + data_type: &DataType, + buffers: Vec, +) -> ArrayRef { + let length = field_node.length() as usize; + let null_count = field_node.null_count() as usize; + let array_data = match data_type { + Utf8 | Binary => { + // read 3 buffers + let mut builder = ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..3].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + builder.build() + } + FixedSizeBinary(_) => { + // read 3 buffers + let mut builder = ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..2].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + builder.build() + } + Int8 | Int16 | Int32 | UInt8 | UInt16 | UInt32 | Time32(_) | Date32(_) => { + if buffers[1].len() / 8 == length { + // interpret as a signed i64, and cast appropriately + let mut builder = ArrayData::builder(DataType::Int64) + .len(length) + .buffers(buffers[1..].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + let values = Arc::new(Int64Array::from(builder.build())) as ArrayRef; + let casted = cast(&values, data_type).unwrap(); + casted.data() + } else { + let mut builder = ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + builder.build() + } + } + Float32 => { + if buffers[1].len() / 8 == length { + // interpret as a f64, and cast appropriately + let mut builder = ArrayData::builder(DataType::Float64) + .len(length) + .buffers(buffers[1..].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + let values = Arc::new(Float64Array::from(builder.build())) as ArrayRef; + let casted = cast(&values, data_type).unwrap(); + casted.data() + } else { + let mut builder = ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + builder.build() + } + } + Boolean | Int64 | UInt64 | Float64 | Time64(_) | Timestamp(_) | Date64(_) => { + let mut builder = ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..].to_vec()) + .offset(0); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + builder.build() + } + t @ _ => panic!("Data type {:?} either unsupported or not primitive", t), + }; + + make_array(array_data) +} + +/// Reads the correct number of buffers based on list type an null_count, and creates a +/// list array ref +fn create_list_array( + field_node: &ipc::FieldNode, + data_type: &DataType, + buffers: &[Buffer], + child_array: ArrayRef, +) -> ArrayRef { + if let &DataType::List(_) = data_type { + let null_count = field_node.null_count() as usize; + let mut builder = ArrayData::builder(data_type.clone()) + .len(field_node.length() as usize) + .buffers(buffers[1..2].to_vec()) + .offset(0) + .child_data(vec![child_array.data()]); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + make_array(builder.build()) + } else if let &DataType::FixedSizeList(_) = data_type { + let null_count = field_node.null_count() as usize; + let mut builder = ArrayData::builder(data_type.clone()) + .len(field_node.length() as usize) + .buffers(buffers[1..1].to_vec()) + .offset(0) + .child_data(vec![child_array.data()]); + if null_count > 0 { + builder = builder + .null_count(null_count) + .null_bit_buffer(buffers[0].clone()) + } + make_array(builder.build()) + } else { + panic!("Cannot create list array from {:?}", data_type) + } +} + +/// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema` +fn read_record_batch( + buf: &Vec, + batch: ipc::RecordBatch, + schema: Arc, +) -> Result> { + let buffers = batch.buffers().unwrap(); + let field_nodes = batch.nodes().unwrap(); + // keep track of buffer and node index, the functions that create arrays mutate these + let mut buffer_index = 0; + let mut node_index = 0; + let mut arrays = vec![]; + + // keep track of index as lists require more than one node + for field in schema.fields() { + let triple = create_array( + field_nodes, + field.data_type(), + &buf, + buffers, + node_index, + buffer_index, + ); + node_index = triple.1; + buffer_index = triple.2; + arrays.push(triple.0); + } + + RecordBatch::try_new(schema.clone(), arrays).map(|batch| Some(batch)) +} + +/// Arrow File reader +pub struct Reader { + /// Buffered reader that supports reading and seeking + reader: BufReader, + /// The schema that is read from the file header + schema: Arc, + /// The blocks in the file + /// + /// A block indicates the regions in the file to read to get data + blocks: Vec, + /// A counter to keep track of the current block that should be read + current_block: usize, + /// The total number of blocks, which may contain record batches and other types + total_blocks: usize, +} + +impl Reader { + /// Try to create a new reader + /// + /// Returns errors if the file does not meet the Arrow Format header and footer + /// requirements + pub fn try_new(reader: R) -> Result { + let mut reader = BufReader::new(reader); + // check if header and footer contain correct magic bytes + let mut magic_buffer: [u8; 6] = [0; 6]; + reader.read_exact(&mut magic_buffer)?; + if magic_buffer != ARROW_MAGIC { + return Err(ArrowError::IoError( + "Arrow file does not contain correct header".to_string(), + )); + } + reader.seek(SeekFrom::End(-6))?; + reader.read_exact(&mut magic_buffer)?; + if magic_buffer != ARROW_MAGIC { + return Err(ArrowError::IoError( + "Arrow file does not contain correct footer".to_string(), + )); + } + reader.seek(SeekFrom::Start(8))?; + // determine metadata length + let mut meta_size: [u8; 4] = [0; 4]; + reader.read_exact(&mut meta_size)?; + let meta_len = u32::from_le_bytes(meta_size); + + let mut meta_buffer = vec![0; meta_len as usize]; + reader.seek(SeekFrom::Start(12))?; + reader.read_exact(&mut meta_buffer)?; + + let vecs = &meta_buffer.to_vec(); + let message = ipc::get_root_as_message(vecs); + // message header is a Schema, so read it + let ipc_schema: ipc::Schema = message.header_as_schema().unwrap(); + let schema = ipc::convert::fb_to_schema(ipc_schema); + + // what does the footer contain? + let mut footer_size: [u8; 4] = [0; 4]; + reader.seek(SeekFrom::End(-10))?; + reader.read_exact(&mut footer_size)?; + let footer_len = u32::from_le_bytes(footer_size); + + // read footer + let mut footer_data = vec![0; footer_len as usize]; + reader.seek(SeekFrom::End(-10 - footer_len as i64))?; + reader.read_exact(&mut footer_data)?; + let footer = ipc::get_root_as_footer(&footer_data[..]); + + let blocks = footer.recordBatches().unwrap(); + + let total_blocks = blocks.len(); + + Ok(Self { + reader, + schema: Arc::new(schema), + blocks: blocks.to_vec(), + current_block: 0, + total_blocks, + }) + } + + /// Return the number of batches in the file + pub fn num_batches(&self) -> usize { + self.total_blocks + } + + /// Return the schema of the file + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + /// Read the next record batch + pub fn next(&mut self) -> Result> { + // get current block + if self.current_block < self.total_blocks { + let block = self.blocks[self.current_block]; + self.current_block = self.current_block + 1; + + // read length from end of offset + let meta_len = block.metaDataLength() - 4; + + let mut block_data = vec![0; meta_len as usize]; + self.reader + .seek(SeekFrom::Start(block.offset() as u64 + 4))?; + self.reader.read_exact(&mut block_data)?; + + let message = ipc::get_root_as_message(&block_data[..]); + + match message.header_type() { + ipc::MessageHeader::Schema => { + panic!("Not expecting a schema when messages are read") + } + ipc::MessageHeader::RecordBatch => { + let batch = message.header_as_record_batch().unwrap(); + // read the block that makes up the record batch into a buffer + let mut buf = vec![0; block.bodyLength() as usize]; + self.reader.seek(SeekFrom::Start( + block.offset() as u64 + block.metaDataLength() as u64, + ))?; + self.reader.read_exact(&mut buf)?; + + read_record_batch(&buf, batch, self.schema()) + } + _ => unimplemented!( + "reading types other than record batches not yet supported" + ), + } + } else { + Ok(None) + } + } + + /// Read a specific record batch + /// + /// Sets the current block to the index, allowing random reads + pub fn set_index(&mut self, index: usize) -> Result<()> { + if index >= self.total_blocks { + Err(ArrowError::IoError(format!( + "Cannot set batch to index {} from {} total batches", + index, self.total_blocks + ))) + } else { + self.current_block = index; + Ok(()) + } + } +} + +impl RecordBatchReader for Reader { + fn schema(&mut self) -> SchemaRef { + self.schema.clone() + } + + fn next_batch(&mut self) -> Result> { + self.next() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use flate2::read::GzDecoder; + + use crate::util::integration_util::*; + use std::env; + use std::fs::File; + + #[test] + fn read_generated_files() { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + // "generated_datetime", + "generated_nested", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc/integration/0.14.1/{}.arrow_file", + testdata, path + )) + .unwrap(); + + let mut reader = Reader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + /// Read gzipped JSON file + fn read_gzip_json(path: &str) -> ArrowJson { + let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); + let file = File::open(format!( + "{}/arrow-ipc/integration/0.14.1/{}.json.gz", + testdata, path + )) + .unwrap(); + let mut gz = GzDecoder::new(&file); + let mut s = String::new(); + gz.read_to_string(&mut s).unwrap(); + // convert to Arrow JSON + let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap(); + arrow_json + } +} diff --git a/rust/arrow/src/ipc/mod.rs b/rust/arrow/src/ipc/mod.rs index c0220d7687e..41f8150c760 100644 --- a/rust/arrow/src/ipc/mod.rs +++ b/rust/arrow/src/ipc/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod convert; +pub mod file; pub mod gen; diff --git a/rust/arrow/src/memory.rs b/rust/arrow/src/memory.rs index d73ddd11676..c941d34bfec 100644 --- a/rust/arrow/src/memory.rs +++ b/rust/arrow/src/memory.rs @@ -51,7 +51,6 @@ pub unsafe fn memcpy(dst: *mut u8, src: *const u8, len: usize) { } extern "C" { - #[inline] pub fn memcmp(p1: *const u8, p2: *const u8, len: usize) -> i32; } diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 5a8a9f2f97d..bbdb5160235 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -24,11 +24,11 @@ use serde_json::Value; use crate::array::*; use crate::datatypes::*; -use crate::record_batch::RecordBatch; +use crate::record_batch::{RecordBatch, RecordBatchReader}; /// A struct that represents an Arrow file with a schema and record batches #[derive(Deserialize)] -struct ArrowJson { +pub(crate) struct ArrowJson { schema: ArrowJsonSchema, batches: Vec, } @@ -62,6 +62,22 @@ struct ArrowJsonColumn { children: Option>, } +impl ArrowJson { + /// Compare the Arrow JSON with a record batch reader + pub fn equals_reader(&self, reader: &mut RecordBatchReader) -> bool { + if !self.schema.equals_schema(&reader.schema()) { + return false; + } + self.batches.iter().all(|col| { + let batch = reader.next_batch(); + match batch { + Ok(Some(batch)) => col.equals_batch(&batch), + _ => false, + } + }) + } +} + impl ArrowJsonSchema { /// Compare the Arrow JSON schema with the Arrow `Schema` fn equals_schema(&self, schema: &Schema) -> bool { @@ -79,7 +95,7 @@ impl ArrowJsonSchema { } impl ArrowJsonBatch { - /// Comapre the Arrow JSON record batch with a `RecordBatch` + /// Compare the Arrow JSON record batch with a `RecordBatch` fn equals_batch(&self, batch: &RecordBatch) -> bool { if self.count != batch.num_rows() { return false; @@ -164,6 +180,11 @@ impl ArrowJsonBatch { let arr = arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) } + DataType::FixedSizeList(_) => { + let arr = + arr.as_any().downcast_ref::().unwrap(); + arr.equals_json(&json_array.iter().collect::>()[..]) + } DataType::Struct(_) => { let arr = arr.as_any().downcast_ref::().unwrap(); arr.equals_json(&json_array.iter().collect::>()[..]) @@ -178,6 +199,9 @@ impl ArrowJsonBatch { fn json_from_col(col: &ArrowJsonColumn, data_type: &DataType) -> Vec { match data_type { DataType::List(dt) => json_from_list_col(col, &**dt), + DataType::FixedSizeList((dt, list_size)) => { + json_from_fixed_size_list_col(col, &**dt, *list_size as usize) + } DataType::Struct(fields) => json_from_struct_col(col, fields), _ => merge_json_array(&col.validity, &col.data.clone().unwrap()), } @@ -257,6 +281,36 @@ fn json_from_list_col(col: &ArrowJsonColumn, data_type: &DataType) -> Vec values } +/// Convert an Arrow JSON column/array of a `DataType::List` into a vector of `Value` +fn json_from_fixed_size_list_col( + col: &ArrowJsonColumn, + data_type: &DataType, + list_size: usize, +) -> Vec { + let mut values = Vec::with_capacity(col.count); + + // get the inner array + let child = &col.children.clone().expect("list type must have children")[0]; + let inner = match data_type { + DataType::List(ref dt) => json_from_col(child, &**dt), + DataType::FixedSizeList((ref dt, _)) => json_from_col(child, &**dt), + DataType::Struct(fields) => json_from_struct_col(col, fields), + _ => merge_json_array(&child.validity, &child.data.clone().unwrap()), + }; + + for i in 0..col.count { + match col.validity[i] { + 0 => values.push(Value::Null), + 1 => values.push(Value::Array( + inner[(list_size * i)..(list_size * (i + 1))].to_vec(), + )), + _ => panic!("Validity data should be 0 or 1"), + } + } + + values +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/parquet/src/encodings/decoding.rs b/rust/parquet/src/encodings/decoding.rs index 7dc97b65cf9..453d91cae83 100644 --- a/rust/parquet/src/encodings/decoding.rs +++ b/rust/parquet/src/encodings/decoding.rs @@ -657,10 +657,8 @@ impl Decoder for DeltaBitPackDecoder { /// Helper trait to define specific conversions when decoding values trait DeltaBitPackDecoderConversion { /// Sets decoded value based on type `T`. - #[inline] fn get_delta(&self, index: usize) -> i64; - #[inline] fn set_decoded_value(&self, buffer: &mut [T::T], index: usize, value: i64); } diff --git a/rust/parquet/src/encodings/encoding.rs b/rust/parquet/src/encodings/encoding.rs index 5aae81d242f..c9cad12e42d 100644 --- a/rust/parquet/src/encodings/encoding.rs +++ b/rust/parquet/src/encodings/encoding.rs @@ -417,7 +417,6 @@ impl Encoder for DictEncoder { /// Provides encoded size for a data type. /// This is a workaround to calculate dictionary size in bytes. trait DictEncodedSize { - #[inline] fn get_encoded_size(&self, value: &T::T) -> usize; } @@ -752,16 +751,12 @@ impl Encoder for DeltaBitPackEncoder { /// Helper trait to define specific conversions and subtractions when computing deltas trait DeltaBitPackEncoderConversion { // Method should panic if type is not supported, otherwise no-op - #[inline] fn assert_supported_type(); - #[inline] fn as_i64(&self, values: &[T::T], index: usize) -> i64; - #[inline] fn subtract(&self, left: i64, right: i64) -> i64; - #[inline] fn subtract_u64(&self, left: i64, right: i64) -> u64; }