Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rust/arrow/src/ipc/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,9 @@ pub(crate) fn build_field<'a: 'b, 'b>(

let mut field_builder = ipc::FieldBuilder::new(fbb);
field_builder.add_name(fb_field_name);
fb_dictionary.map(|dictionary| field_builder.add_dictionary(dictionary));
if let Some(dictionary) = fb_dictionary {
field_builder.add_dictionary(dictionary)
}
field_builder.add_type_type(field_type.type_type);
field_builder.add_nullable(field.is_nullable());
match field_type.children {
Expand Down
106 changes: 80 additions & 26 deletions rust/parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,20 @@ use arrow::array::{
Int16BufferBuilder, StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::{DataType as ArrowType, DateUnit, Field, IntervalUnit, TimeUnit};
use arrow::datatypes::{
DataType as ArrowType, DateUnit, Field, IntervalUnit, Schema, TimeUnit,
};

use crate::arrow::converter::{
BinaryArrayConverter, BinaryConverter, BoolConverter, BooleanArrayConverter,
Converter, Date32Converter, FixedLenBinaryConverter, FixedSizeArrayConverter,
Float32Converter, Float64Converter, Int16Converter, Int32Converter, Int64Converter,
Int8Converter, Int96ArrayConverter, Int96Converter, Time32MillisecondConverter,
Time32SecondConverter, Time64MicrosecondConverter, Time64NanosecondConverter,
TimestampMicrosecondConverter, TimestampMillisecondConverter, UInt16Converter,
UInt32Converter, UInt64Converter, UInt8Converter, Utf8ArrayConverter, Utf8Converter,
Int8Converter, Int96ArrayConverter, Int96Converter, LargeBinaryArrayConverter,
LargeBinaryConverter, LargeUtf8ArrayConverter, LargeUtf8Converter,
Time32MillisecondConverter, Time32SecondConverter, Time64MicrosecondConverter,
Time64NanosecondConverter, TimestampMicrosecondConverter,
TimestampMillisecondConverter, UInt16Converter, UInt32Converter, UInt64Converter,
UInt8Converter, Utf8ArrayConverter, Utf8Converter,
};
use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
Expand Down Expand Up @@ -612,6 +616,7 @@ impl ArrayReader for StructArrayReader {
/// Create array reader from parquet schema, column indices, and parquet file reader.
pub fn build_array_reader<T>(
parquet_schema: SchemaDescPtr,
arrow_schema: Schema,
column_indices: T,
file_reader: Rc<dyn FileReader>,
) -> Result<Box<dyn ArrayReader>>
Expand Down Expand Up @@ -650,13 +655,19 @@ where
fields: filtered_root_fields,
};

ArrayReaderBuilder::new(Rc::new(proj), Rc::new(leaves), file_reader)
.build_array_reader()
ArrayReaderBuilder::new(
Rc::new(proj),
Rc::new(arrow_schema),
Rc::new(leaves),
file_reader,
)
.build_array_reader()
}

/// Used to build array reader.
struct ArrayReaderBuilder {
root_schema: TypePtr,
arrow_schema: Rc<Schema>,
// Key: columns that need to be included in final array builder
// Value: column index in schema
columns_included: Rc<HashMap<*const Type, usize>>,
Expand Down Expand Up @@ -790,11 +801,13 @@ impl<'a> ArrayReaderBuilder {
/// Construct array reader builder.
fn new(
root_schema: TypePtr,
arrow_schema: Rc<Schema>,
columns_included: Rc<HashMap<*const Type, usize>>,
file_reader: Rc<dyn FileReader>,
) -> Self {
Self {
root_schema,
arrow_schema,
columns_included,
file_reader,
}
Expand Down Expand Up @@ -835,6 +848,12 @@ impl<'a> ArrayReaderBuilder {
self.file_reader.clone(),
)?);

let arrow_type = self
.arrow_schema
.field_with_name(cur_type.name())
.ok()
.map(|f| f.data_type());

match cur_type.get_physical_type() {
PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::<BoolType>::new(
page_iterator,
Expand Down Expand Up @@ -866,21 +885,43 @@ impl<'a> ArrayReaderBuilder {
)),
PhysicalType::BYTE_ARRAY => {
if cur_type.get_basic_info().logical_type() == LogicalType::UTF8 {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator, column_desc, converter
)?))
if let Some(ArrowType::LargeUtf8) = arrow_type {
let converter =
LargeUtf8Converter::new(LargeUtf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeUtf8Converter,
>::new(
page_iterator, column_desc, converter
)?))
} else {
let converter = Utf8Converter::new(Utf8ArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
Utf8Converter,
>::new(
page_iterator, column_desc, converter
)?))
}
} else {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
if let Some(ArrowType::LargeBinary) = arrow_type {
let converter =
LargeBinaryConverter::new(LargeBinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
LargeBinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
} else {
let converter = BinaryConverter::new(BinaryArrayConverter {});
Ok(Box::new(ComplexObjectArrayReader::<
ByteArrayType,
BinaryConverter,
>::new(
page_iterator, column_desc, converter
)?))
}
}
}
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
Expand Down Expand Up @@ -918,11 +959,15 @@ impl<'a> ArrayReaderBuilder {

for child in cur_type.get_fields() {
if let Some(child_reader) = self.dispatch(child.clone(), context)? {
fields.push(Field::new(
child.name(),
child_reader.get_data_type().clone(),
child.is_optional(),
));
let field = match self.arrow_schema.field_with_name(child.name()) {
Ok(f) => f.to_owned(),
_ => Field::new(
child.name(),
child_reader.get_data_type().clone(),
child.is_optional(),
),
};
fields.push(field);
children_reader.push(child_reader);
}
}
Expand All @@ -945,6 +990,7 @@ impl<'a> ArrayReaderBuilder {
mod tests {
use super::*;
use crate::arrow::converter::Utf8Converter;
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::{Page, PageReader};
use crate::data_type::{ByteArray, DataType, Int32Type, Int64Type};
Expand Down Expand Up @@ -1591,8 +1637,16 @@ mod tests {
let file = get_test_file("nulls.snappy.parquet");
let file_reader = Rc::new(SerializedFileReader::new(file).unwrap());

let file_metadata = file_reader.metadata().file_metadata();
let arrow_schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
file_metadata.key_value_metadata(),
)
.unwrap();

let array_reader = build_array_reader(
file_reader.metadata().file_metadata().schema_descr_ptr(),
arrow_schema,
vec![0usize].into_iter(),
file_reader,
)
Expand Down
36 changes: 28 additions & 8 deletions rust/parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
use crate::arrow::schema::{
parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
};
use crate::errors::{ParquetError, Result};
use crate::file::reader::FileReader;
use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef};
Expand All @@ -40,7 +42,12 @@ pub trait ArrowReader {

/// Read parquet schema and convert it into arrow schema.
/// This schema only includes columns identified by `column_indices`.
fn get_schema_by_columns<T>(&mut self, column_indices: T) -> Result<Schema>
/// To select leaf columns (i.e. `a.b.c` instead of `a`), set `leaf_columns = true`
fn get_schema_by_columns<T>(
&mut self,
column_indices: T,
leaf_columns: bool,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>;

Expand Down Expand Up @@ -84,16 +91,28 @@ impl ArrowReader for ParquetFileArrowReader {
)
}

fn get_schema_by_columns<T>(&mut self, column_indices: T) -> Result<Schema>
fn get_schema_by_columns<T>(
&mut self,
column_indices: T,
leaf_columns: bool,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>,
{
let file_metadata = self.file_reader.metadata().file_metadata();
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
column_indices,
file_metadata.key_value_metadata(),
)
if leaf_columns {
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
column_indices,
file_metadata.key_value_metadata(),
)
} else {
parquet_to_arrow_schema_by_root_columns(
file_metadata.schema_descr(),
column_indices,
file_metadata.key_value_metadata(),
)
}
}

fn get_record_reader(
Expand Down Expand Up @@ -123,6 +142,7 @@ impl ArrowReader for ParquetFileArrowReader {
.metadata()
.file_metadata()
.schema_descr_ptr(),
self.get_schema()?,
column_indices,
self.file_reader.clone(),
)?;
Expand Down
4 changes: 2 additions & 2 deletions rust/parquet/src/arrow/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ mod tests {
}

#[test]
#[ignore] // Large Binary support isn't correct yet
#[ignore] // Large binary support isn't correct yet - buffers don't match
fn large_binary_single_column() {
let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
Expand All @@ -1035,7 +1035,7 @@ mod tests {
}

#[test]
#[ignore] // Large String support isn't correct yet - null_bitmap and buffers don't match
#[ignore] // Large string support isn't correct yet - null_bitmap doesn't match
fn large_string_single_column() {
let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
let raw_strs = raw_values.iter().map(|s| s.as_str());
Expand Down
52 changes: 48 additions & 4 deletions rust/parquet/src/arrow/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::data_type::{ByteArray, DataType, Int96};
use arrow::{
array::{
Array, ArrayRef, BinaryBuilder, BooleanArray, BooleanBufferBuilder,
BufferBuilderTrait, FixedSizeBinaryBuilder, StringBuilder,
TimestampNanosecondBuilder,
BufferBuilderTrait, FixedSizeBinaryBuilder, LargeBinaryBuilder,
LargeStringBuilder, StringBuilder, TimestampNanosecondBuilder,
},
datatypes::Time32MillisecondType,
};
Expand All @@ -38,8 +38,8 @@ use arrow::datatypes::{ArrowPrimitiveType, DataType as ArrowDataType};

use arrow::array::ArrayDataBuilder;
use arrow::array::{
BinaryArray, FixedSizeBinaryArray, PrimitiveArray, StringArray,
TimestampNanosecondArray,
BinaryArray, FixedSizeBinaryArray, LargeBinaryArray, LargeStringArray,
PrimitiveArray, StringArray, TimestampNanosecondArray,
};
use std::marker::PhantomData;

Expand Down Expand Up @@ -200,6 +200,27 @@ impl Converter<Vec<Option<ByteArray>>, StringArray> for Utf8ArrayConverter {
}
}

pub struct LargeUtf8ArrayConverter {}

impl Converter<Vec<Option<ByteArray>>, LargeStringArray> for LargeUtf8ArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<LargeStringArray> {
let data_size = source
.iter()
.map(|x| x.as_ref().map(|b| b.len()).unwrap_or(0))
.sum();

let mut builder = LargeStringBuilder::with_capacity(source.len(), data_size);
for v in source {
match v {
Some(array) => builder.append_value(array.as_utf8()?),
None => builder.append_null(),
}?
}

Ok(builder.finish())
}
}

pub struct BinaryArrayConverter {}

impl Converter<Vec<Option<ByteArray>>, BinaryArray> for BinaryArrayConverter {
Expand All @@ -216,6 +237,22 @@ impl Converter<Vec<Option<ByteArray>>, BinaryArray> for BinaryArrayConverter {
}
}

pub struct LargeBinaryArrayConverter {}

impl Converter<Vec<Option<ByteArray>>, LargeBinaryArray> for LargeBinaryArrayConverter {
fn convert(&self, source: Vec<Option<ByteArray>>) -> Result<LargeBinaryArray> {
let mut builder = LargeBinaryBuilder::new(source.len());
for v in source {
match v {
Some(array) => builder.append_value(array.data()),
None => builder.append_null(),
}?
}

Ok(builder.finish())
}
}

pub type BoolConverter<'a> = ArrayRefConverter<
&'a mut RecordReader<BoolType>,
BooleanArray,
Expand Down Expand Up @@ -246,8 +283,15 @@ pub type Float32Converter = CastConverter<ParquetFloatType, Float32Type, Float32
pub type Float64Converter = CastConverter<ParquetDoubleType, Float64Type, Float64Type>;
pub type Utf8Converter =
ArrayRefConverter<Vec<Option<ByteArray>>, StringArray, Utf8ArrayConverter>;
pub type LargeUtf8Converter =
ArrayRefConverter<Vec<Option<ByteArray>>, LargeStringArray, LargeUtf8ArrayConverter>;
pub type BinaryConverter =
ArrayRefConverter<Vec<Option<ByteArray>>, BinaryArray, BinaryArrayConverter>;
pub type LargeBinaryConverter = ArrayRefConverter<
Vec<Option<ByteArray>>,
LargeBinaryArray,
LargeBinaryArrayConverter,
>;
pub type Int96Converter =
ArrayRefConverter<Vec<Option<Int96>>, TimestampNanosecondArray, Int96ArrayConverter>;
pub type FixedLenBinaryConverter = ArrayRefConverter<
Expand Down
3 changes: 2 additions & 1 deletion rust/parquet/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
//!
//! println!("Converted arrow schema is: {}", arrow_reader.get_schema().unwrap());
//! println!("Arrow schema after projection is: {}",
//! arrow_reader.get_schema_by_columns(vec![2, 4, 6]).unwrap());
//! arrow_reader.get_schema_by_columns(vec![2, 4, 6], true).unwrap());
//!
//! let mut record_batch_reader = arrow_reader.get_record_reader(2048).unwrap();
//!
Expand All @@ -61,6 +61,7 @@ pub use self::arrow_reader::ParquetFileArrowReader;
pub use self::arrow_writer::ArrowWriter;
pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
parquet_to_arrow_schema_by_root_columns,
};

/// Schema metadata key used to store serialized Arrow IPC schema
Expand Down
1 change: 1 addition & 0 deletions rust/parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ impl<'a, T> FatPtr<'a, T> {
self.ptr
}

#[allow(clippy::wrong_self_convention)]
fn to_slice_mut(&mut self) -> &mut [T] {
self.ptr
}
Expand Down
Loading