Skip to content

Commit

Permalink
Improve performance of FixedLengthBinary decoding (#6220)
Browse files Browse the repository at this point in the history
* add set_from_bytes to ParquetValueType

* change naming of FLBA types so critcmp will work
  • Loading branch information
etseidl committed Aug 13, 2024
1 parent c1b3d98 commit 9c4a7e3
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 32 deletions.
7 changes: 5 additions & 2 deletions parquet/benches/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use criterion::*;
use half::f16;
use parquet::basic::Encoding;
use parquet::basic::{Encoding, Type as ParquetType};
use parquet::data_type::{
DataType, DoubleType, FixedLenByteArray, FixedLenByteArrayType, FloatType,
};
Expand All @@ -35,7 +35,10 @@ fn bench_typed<T: DataType>(
) {
let name = format!(
"dtype={}, encoding={:?}",
std::any::type_name::<T::T>(),
match T::get_physical_type() {
ParquetType::FIXED_LEN_BYTE_ARRAY => format!("FixedLenByteArray({type_length})"),
_ => std::any::type_name::<T::T>().to_string(),
},
encoding
);
let column_desc_ptr = ColumnDescPtr::new(ColumnDescriptor::new(
Expand Down
21 changes: 18 additions & 3 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,13 @@ pub(crate) mod private {

/// Return the value as an mutable Any to allow for downcasts without transmutation
fn as_mut_any(&mut self) -> &mut dyn std::any::Any;

/// Sets the value of this object from the provided [`Bytes`]
///
/// Only implemented for `ByteArray` and `FixedLenByteArray`. Will panic for other types.
fn set_from_bytes(&mut self, _data: Bytes) {
unimplemented!();
}
}

impl ParquetValueType for bool {
Expand Down Expand Up @@ -953,9 +960,7 @@ pub(crate) mod private {
return Err(eof_err!("Not enough bytes to decode"));
}

let val: &mut Self = val_array.as_mut_any().downcast_mut().unwrap();

val.set_data(data.slice(decoder.start..decoder.start + len));
val_array.set_data(data.slice(decoder.start..decoder.start + len));
decoder.start += len;
}
decoder.num_values -= num_values;
Expand Down Expand Up @@ -998,6 +1003,11 @@ pub(crate) mod private {
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

#[inline]
fn set_from_bytes(&mut self, data: Bytes) {
self.set_data(data);
}
}

impl HeapSize for super::ByteArray {
Expand Down Expand Up @@ -1093,6 +1103,11 @@ pub(crate) mod private {
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

#[inline]
fn set_from_bytes(&mut self, data: Bytes) {
self.set_data(data);
}
}

impl HeapSize for super::FixedLenByteArray {
Expand Down
23 changes: 3 additions & 20 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,11 +901,7 @@ impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {

for item in buffer.iter_mut().take(num_values) {
let len = self.lengths[self.current_idx] as usize;

item.as_mut_any()
.downcast_mut::<ByteArray>()
.unwrap()
.set_data(data.slice(self.offset..self.offset + len));
item.set_from_bytes(data.slice(self.offset..self.offset + len));

self.offset += len;
self.current_idx += 1;
Expand Down Expand Up @@ -1029,7 +1025,7 @@ impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {

fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
match T::get_physical_type() {
ty @ Type::BYTE_ARRAY | ty @ Type::FIXED_LEN_BYTE_ARRAY => {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
let num_values = cmp::min(buffer.len(), self.num_values);
let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
for item in buffer.iter_mut().take(num_values) {
Expand All @@ -1051,20 +1047,7 @@ impl<T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
result.extend_from_slice(suffix);

let data = Bytes::from(result.clone());

match ty {
Type::BYTE_ARRAY => item
.as_mut_any()
.downcast_mut::<ByteArray>()
.unwrap()
.set_data(data),
Type::FIXED_LEN_BYTE_ARRAY => item
.as_mut_any()
.downcast_mut::<FixedLenByteArray>()
.unwrap()
.set_data(data),
_ => unreachable!(),
};
item.set_from_bytes(data);

self.previous_value = result;
self.current_idx += 1;
Expand Down
9 changes: 2 additions & 7 deletions parquet/src/encodings/decoding/byte_stream_split_decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use bytes::Bytes;

use crate::basic::{Encoding, Type};
use crate::data_type::private::ParquetValueType;
use crate::data_type::{DataType, FixedLenByteArray, SliceAsBytes};
use crate::data_type::{DataType, SliceAsBytes};
use crate::errors::{ParquetError, Result};

use super::Decoder;
Expand Down Expand Up @@ -234,12 +234,7 @@ impl<T: DataType> Decoder<T> for VariableWidthByteStreamSplitDecoder<T> {
for (i, bi) in buffer.iter_mut().enumerate().take(num_values) {
// Get a view into the data, without also copying the bytes
let data = bytes_with_data.slice(i * type_size..(i + 1) * type_size);
// TODO: perhaps add a `set_from_bytes` method to `DataType` to avoid downcasting
let bi = bi
.as_mut_any()
.downcast_mut::<FixedLenByteArray>()
.expect("Decoding fixed length byte array");
bi.set_data(data);
bi.set_from_bytes(data);
}

Ok(num_values)
Expand Down

0 comments on commit 9c4a7e3

Please sign in to comment.