diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs index 74b3719718e..7d5b5b864bb 100644 --- a/rust/parquet/src/arrow/arrow_reader.rs +++ b/rust/parquet/src/arrow/arrow_reader.rs @@ -233,7 +233,8 @@ mod tests { }; use crate::column::writer::get_typed_column_writer_mut; use crate::data_type::{ - BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArrayType, Int32Type, + BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, + FixedLenByteArrayType, Int32Type, }; use crate::errors::Result; use crate::file::properties::WriterProperties; @@ -331,10 +332,10 @@ mod tests { struct RandFixedLenGen {} impl RandGen for RandFixedLenGen { - fn gen(len: i32) -> ByteArray { + fn gen(len: i32) -> FixedLenByteArray { let mut v = vec![0u8; len as usize]; rand::thread_rng().fill_bytes(&mut v); - v.into() + ByteArray::from(v).into() } } diff --git a/rust/parquet/src/arrow/converter.rs b/rust/parquet/src/arrow/converter.rs index 33b29c897e6..94d6987a033 100644 --- a/rust/parquet/src/arrow/converter.rs +++ b/rust/parquet/src/arrow/converter.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::data_type::{ByteArray, DataType, Int96}; +use crate::data_type::{ByteArray, DataType, FixedLenByteArray, Int96}; // TODO: clean up imports (best done when there are few moving parts) use arrow::array::{ Array, ArrayRef, BinaryBuilder, FixedSizeBinaryBuilder, LargeBinaryBuilder, @@ -57,8 +57,13 @@ impl FixedSizeArrayConverter { } } -impl Converter>, FixedSizeBinaryArray> for FixedSizeArrayConverter { - fn convert(&self, source: Vec>) -> Result { +impl Converter>, FixedSizeBinaryArray> + for FixedSizeArrayConverter +{ + fn convert( + &self, + source: Vec>, + ) -> Result { let mut builder = FixedSizeBinaryBuilder::new(source.len(), self.byte_width); for v in source { match v { @@ -277,7 +282,7 @@ pub type PrimitiveDictionaryConverter = ArrayRefConverter< pub type Int96Converter = ArrayRefConverter>, TimestampNanosecondArray, Int96ArrayConverter>; pub type FixedLenBinaryConverter = ArrayRefConverter< - Vec>, + Vec>, FixedSizeBinaryArray, FixedSizeArrayConverter, >; diff --git a/rust/parquet/src/column/writer.rs b/rust/parquet/src/column/writer.rs index 9e1188ff8fb..533a8e69a51 100644 --- a/rust/parquet/src/column/writer.rs +++ b/rust/parquet/src/column/writer.rs @@ -16,7 +16,7 @@ // under the License. //! Contains column writer API. -use std::{cmp, collections::VecDeque, convert::TryFrom, sync::Arc}; +use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData, sync::Arc}; use crate::basic::{Compression, Encoding, PageType, Type}; use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; @@ -196,6 +196,7 @@ pub struct ColumnWriterImpl { def_levels_sink: Vec, rep_levels_sink: Vec, data_pages: VecDeque, + _phantom: PhantomData, } impl ColumnWriterImpl { @@ -209,7 +210,7 @@ impl ColumnWriterImpl { // Optionally set dictionary encoder. let dict_encoder = if props.dictionary_enabled(descr.path()) - && Self::has_dictionary_support(&props) + && has_dictionary_support(T::get_physical_type(), &props) { Some(DictEncoder::new(descr.clone(), Arc::new(MemTracker::new()))) } else { @@ -224,7 +225,7 @@ impl ColumnWriterImpl { descr.clone(), props .encoding(descr.path()) - .unwrap_or_else(|| Self::fallback_encoding(&props)), + .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), &props)), Arc::new(MemTracker::new()), ) .unwrap(); @@ -259,6 +260,7 @@ impl ColumnWriterImpl { max_column_value: None, num_column_nulls: 0, column_distinct_count: None, + _phantom: PhantomData, } } @@ -955,81 +957,33 @@ impl ColumnWriterImpl { /// Trait to define default encoding for types, including whether or not the type /// supports dictionary encoding. trait EncodingWriteSupport { - /// Returns encoding for a column when no other encoding is provided in writer - /// properties. - fn fallback_encoding(props: &WriterProperties) -> Encoding; - /// Returns true if dictionary is supported for column writer, false otherwise. fn has_dictionary_support(props: &WriterProperties) -> bool; } -// Basic implementation, always falls back to PLAIN and supports dictionary. -impl EncodingWriteSupport for ColumnWriterImpl { - default fn fallback_encoding(_props: &WriterProperties) -> Encoding { - Encoding::PLAIN - } - - default fn has_dictionary_support(_props: &WriterProperties) -> bool { - true - } -} - -impl EncodingWriteSupport for ColumnWriterImpl { - fn fallback_encoding(props: &WriterProperties) -> Encoding { - match props.writer_version() { - WriterVersion::PARQUET_1_0 => Encoding::PLAIN, - WriterVersion::PARQUET_2_0 => Encoding::RLE, - } - } - - // Boolean column does not support dictionary encoding and should fall back to - // whatever fallback encoding is defined. - fn has_dictionary_support(_props: &WriterProperties) -> bool { - false - } -} - -impl EncodingWriteSupport for ColumnWriterImpl { - fn fallback_encoding(props: &WriterProperties) -> Encoding { - match props.writer_version() { - WriterVersion::PARQUET_1_0 => Encoding::PLAIN, - WriterVersion::PARQUET_2_0 => Encoding::DELTA_BINARY_PACKED, - } - } -} - -impl EncodingWriteSupport for ColumnWriterImpl { - fn fallback_encoding(props: &WriterProperties) -> Encoding { - match props.writer_version() { - WriterVersion::PARQUET_1_0 => Encoding::PLAIN, - WriterVersion::PARQUET_2_0 => Encoding::DELTA_BINARY_PACKED, +/// Returns encoding for a column when no other encoding is provided in writer properties. +fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding { + match (kind, props.writer_version()) { + (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE, + (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED, + (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED, + (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY, + (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => { + Encoding::DELTA_BYTE_ARRAY } + _ => Encoding::PLAIN, } } -impl EncodingWriteSupport for ColumnWriterImpl { - fn fallback_encoding(props: &WriterProperties) -> Encoding { - match props.writer_version() { - WriterVersion::PARQUET_1_0 => Encoding::PLAIN, - WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY, - } - } -} - -impl EncodingWriteSupport for ColumnWriterImpl { - fn fallback_encoding(props: &WriterProperties) -> Encoding { - match props.writer_version() { - WriterVersion::PARQUET_1_0 => Encoding::PLAIN, - WriterVersion::PARQUET_2_0 => Encoding::DELTA_BYTE_ARRAY, - } - } - - fn has_dictionary_support(props: &WriterProperties) -> bool { - match props.writer_version() { - // Dictionary encoding was not enabled in PARQUET 1.0 - WriterVersion::PARQUET_1_0 => false, - WriterVersion::PARQUET_2_0 => true, - } +/// Returns true if dictionary is supported for column writer, false otherwise. +fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool { + match (kind, props.writer_version()) { + // Booleans do not support dict encoding and should use a fallback encoding. + (Type::BOOLEAN, _) => false, + // Dictionary encoding was not enabled in PARQUET 1.0 + (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false, + (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true, + _ => true, } } @@ -1398,28 +1352,28 @@ mod tests { check_encoding_write_support::( WriterVersion::PARQUET_1_0, true, - &[ByteArray::from(vec![1u8])], + &[ByteArray::from(vec![1u8]).into()], None, &[Encoding::PLAIN, Encoding::RLE], ); check_encoding_write_support::( WriterVersion::PARQUET_1_0, false, - &[ByteArray::from(vec![1u8])], + &[ByteArray::from(vec![1u8]).into()], None, &[Encoding::PLAIN, Encoding::RLE], ); check_encoding_write_support::( WriterVersion::PARQUET_2_0, true, - &[ByteArray::from(vec![1u8])], + &[ByteArray::from(vec![1u8]).into()], Some(0), &[Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::RLE], ); check_encoding_write_support::( WriterVersion::PARQUET_2_0, false, - &[ByteArray::from(vec![1u8])], + &[ByteArray::from(vec![1u8]).into()], None, &[Encoding::DELTA_BYTE_ARRAY, Encoding::RLE], ); diff --git a/rust/parquet/src/data_type.rs b/rust/parquet/src/data_type.rs index 37d4189e2b1..92afc8a2e98 100644 --- a/rust/parquet/src/data_type.rs +++ b/rust/parquet/src/data_type.rs @@ -18,7 +18,9 @@ //! Data types that connect Parquet physical types with their Rust-specific //! representations. use std::cmp::Ordering; +use std::fmt; use std::mem; +use std::ops::{Deref, DerefMut}; use std::str::from_utf8; use byteorder::{BigEndian, ByteOrder}; @@ -46,12 +48,15 @@ impl Int96 { } /// Returns underlying data as slice of [`u32`]. + #[inline] pub fn data(&self) -> &[u32] { - assert!(self.value.is_some()); - self.value.as_ref().unwrap() + self.value + .as_ref() + .expect("set_data should have been called") } /// Sets data for this INT96 type. + #[inline] pub fn set_data(&mut self, elem0: u32, elem1: u32, elem2: u32) { self.value = Some([elem0, elem1, elem2]); } @@ -95,6 +100,13 @@ impl From> for Int96 { } } +impl fmt::Display for Int96 { + #[cold] + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self.data()) + } +} + /// Rust representation for BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY Parquet physical types. /// Value is backed by a byte buffer. #[derive(Clone, Debug)] @@ -127,36 +139,48 @@ impl PartialOrd for ByteArray { impl ByteArray { /// Creates new byte array with no data set. + #[inline] pub fn new() -> Self { ByteArray { data: None } } /// Gets length of the underlying byte buffer. + #[inline] pub fn len(&self) -> usize { assert!(self.data.is_some()); self.data.as_ref().unwrap().len() } /// Checks if the underlying buffer is empty. + #[inline] pub fn is_empty(&self) -> bool { self.len() == 0 } /// Returns slice of data. + #[inline] pub fn data(&self) -> &[u8] { - assert!(self.data.is_some()); - self.data.as_ref().unwrap().as_ref() + self.data + .as_ref() + .expect("set_data should have been called") + .as_ref() } /// Set data from another byte buffer. + #[inline] pub fn set_data(&mut self, data: ByteBufferPtr) { self.data = Some(data); } /// Returns `ByteArray` instance with slice of values for a data. + #[inline] pub fn slice(&self, start: usize, len: usize) -> Self { - assert!(self.data.is_some()); - Self::from(self.data.as_ref().unwrap().range(start, len)) + Self::from( + self.data + .as_ref() + .expect("set_data should have been called") + .range(start, len), + ) } pub fn as_utf8(&self) -> Result<&str> { @@ -216,6 +240,98 @@ impl PartialEq for ByteArray { } } +impl fmt::Display for ByteArray { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self.data()) + } +} + +/// Wrapper type for performance reasons, this represents `FIXED_LEN_BYTE_ARRAY` but in all other +/// considerations behaves the same as `ByteArray` +/// +/// # Performance notes: +/// This type is a little unfortunate, without it the compiler generates code that takes quite a +/// big hit on the CPU pipeline. Essentially the previous version stalls awaiting the result of +/// `T::get_physical_type() == Type::FIXED_LEN_BYTE_ARRAY`. +/// +/// Its debatable if this is wanted, it is out of spec for what parquet documents as its base +/// types, although there are code paths in the Rust (and potentially the C++) versions that +/// warrant this. +/// +/// With this wrapper type the compiler generates more targetted code paths matching the higher +/// level logical types, removing the data-hazard from all decoding and encoding paths. +#[repr(transparent)] +#[derive(Clone, Debug, Default)] +pub struct FixedLenByteArray(ByteArray); + +impl PartialEq for FixedLenByteArray { + fn eq(&self, other: &FixedLenByteArray) -> bool { + self.0.eq(&other.0) + } +} + +impl PartialEq for FixedLenByteArray { + fn eq(&self, other: &ByteArray) -> bool { + self.0.eq(other) + } +} + +impl PartialEq for ByteArray { + fn eq(&self, other: &FixedLenByteArray) -> bool { + self.eq(&other.0) + } +} + +impl fmt::Display for FixedLenByteArray { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + +impl PartialOrd for FixedLenByteArray { + fn partial_cmp(&self, other: &FixedLenByteArray) -> Option { + self.0.partial_cmp(&other.0) + } +} + +impl PartialOrd for ByteArray { + fn partial_cmp(&self, other: &FixedLenByteArray) -> Option { + self.partial_cmp(&other.0) + } +} + +impl PartialOrd for FixedLenByteArray { + fn partial_cmp(&self, other: &ByteArray) -> Option { + self.0.partial_cmp(other) + } +} + +impl Deref for FixedLenByteArray { + type Target = ByteArray; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for FixedLenByteArray { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From for FixedLenByteArray { + fn from(other: ByteArray) -> Self { + Self(other) + } +} + +impl From for ByteArray { + fn from(other: FixedLenByteArray) -> Self { + other.0 + } +} + /// Rust representation for Decimal values. /// /// This is not a representation of Parquet physical type, but rather a wrapper for @@ -327,7 +443,12 @@ pub trait AsBytes { pub trait SliceAsBytes: Sized { /// Returns slice of bytes for a slice of this data type. fn slice_as_bytes(self_: &[Self]) -> &[u8]; - fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8]; + /// Return the internal representation as a mutable slice + /// + /// # Safety + /// If modified you are _required_ to ensure the internal representation + /// is valid and correct for the actual raw data + unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8]; } impl AsBytes for [u8] { @@ -348,7 +469,9 @@ macro_rules! gen_as_bytes { } } } + impl SliceAsBytes for $source_ty { + #[inline] fn slice_as_bytes(self_: &[Self]) -> &[u8] { unsafe { std::slice::from_raw_parts( @@ -357,13 +480,13 @@ macro_rules! gen_as_bytes { ) } } - fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] { - unsafe { - std::slice::from_raw_parts_mut( - self_.as_mut_ptr() as *mut u8, - std::mem::size_of::<$source_ty>() * self_.len(), - ) - } + + #[inline] + unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] { + std::slice::from_raw_parts_mut( + self_.as_mut_ptr() as *mut u8, + std::mem::size_of::<$source_ty>() * self_.len(), + ) } } }; @@ -380,6 +503,26 @@ gen_as_bytes!(u64); gen_as_bytes!(f32); gen_as_bytes!(f64); +macro_rules! unimplemented_slice_as_bytes { + ($ty: ty) => { + impl SliceAsBytes for $ty { + fn slice_as_bytes(_self: &[Self]) -> &[u8] { + unimplemented!() + } + + unsafe fn slice_as_bytes_mut(_self: &mut [Self]) -> &mut [u8] { + unimplemented!() + } + } + }; +} + +// TODO - Can Int96 and bool be implemented in these terms? +unimplemented_slice_as_bytes!(Int96); +unimplemented_slice_as_bytes!(bool); +unimplemented_slice_as_bytes!(ByteArray); +unimplemented_slice_as_bytes!(FixedLenByteArray); + impl AsBytes for bool { fn as_bytes(&self) -> &[u8] { unsafe { std::slice::from_raw_parts(self as *const bool as *const u8, 1) } @@ -400,6 +543,12 @@ impl AsBytes for ByteArray { } } +impl AsBytes for FixedLenByteArray { + fn as_bytes(&self) -> &[u8] { + self.data() + } +} + impl AsBytes for Decimal { fn as_bytes(&self) -> &[u8] { self.data() @@ -424,16 +573,461 @@ impl AsBytes for str { } } -/// Contains the Parquet physical type information as well as the Rust primitive type -/// presentation. -pub trait DataType: 'static { - type T: std::cmp::PartialEq +pub(crate) mod private { + use crate::encodings::decoding::PlainDecoderDetails; + use crate::util::bit_util::{BitReader, BitWriter}; + use crate::util::memory::ByteBufferPtr; + + use byteorder::ByteOrder; + use std::convert::TryInto; + + use super::{ParquetError, Result, SliceAsBytes}; + + pub type BitIndex = u64; + + /// Sealed trait to start to remove specialisation from implementations + /// + /// This is done to force the associated value type to be unimplementable outside of this + /// crate, and thus hint to the type system (and end user) traits are public for the contract + /// and not for extension. + pub trait ParquetValueType: + std::cmp::PartialEq + std::fmt::Debug + + std::fmt::Display + std::default::Default + std::clone::Clone - + AsBytes - + FromBytes - + PartialOrd; + + super::AsBytes + + super::FromBytes + + super::SliceAsBytes + + PartialOrd + { + /// Encode the value directly from a higher level encoder + fn encode( + values: &[Self], + writer: &mut W, + bit_writer: &mut BitWriter, + ) -> Result<()>; + + /// Establish the data that will be decoded in a buffer + fn set_data( + decoder: &mut PlainDecoderDetails, + data: ByteBufferPtr, + num_values: usize, + ); + + /// Decode the value from a given buffer for a higher level decoder + fn decode( + buffer: &mut [Self], + decoder: &mut PlainDecoderDetails, + ) -> Result; + + /// Return the encoded size for a type + fn dict_encoding_size(&self) -> (usize, usize) { + (std::mem::size_of::(), 1) + } + + /// Return the value as i64 if possible + /// + /// This is essentially the same as `std::convert::TryInto` but can + /// implemented for `f32` and `f64`, types that would fail orphan rules + fn as_i64(&self) -> Result { + Err(general_err!("Type cannot be converted to i64")) + } + + /// Return the value as u64 if possible + /// + /// This is essentially the same as `std::convert::TryInto` but can + /// implemented for `f32` and `f64`, types that would fail orphan rules + fn as_u64(&self) -> Result { + self.as_i64() + .map_err(|_| general_err!("Type cannot be converted to u64")) + .map(|x| x as u64) + } + + /// Return the value as an Any to allow for downcasts without transmutation + fn as_any(&self) -> &dyn std::any::Any; + + /// Return the value as an mutable Any to allow for downcasts without transmutation + fn as_mut_any(&mut self) -> &mut dyn std::any::Any; + } + + impl ParquetValueType for bool { + #[inline] + fn encode( + values: &[Self], + _: &mut W, + bit_writer: &mut BitWriter, + ) -> Result<()> { + for value in values { + bit_writer.put_value(*value as u64, 1); + } + Ok(()) + } + + #[inline] + fn set_data( + decoder: &mut PlainDecoderDetails, + data: ByteBufferPtr, + num_values: usize, + ) { + decoder.bit_reader.replace(BitReader::new(data)); + decoder.num_values = num_values; + } + + #[inline] + fn decode( + buffer: &mut [Self], + decoder: &mut PlainDecoderDetails, + ) -> Result { + let bit_reader = decoder.bit_reader.as_mut().unwrap(); + let num_values = std::cmp::min(buffer.len(), decoder.num_values); + let values_read = bit_reader.get_batch(&mut buffer[..num_values], 1); + decoder.num_values -= values_read; + Ok(values_read) + } + + #[inline] + fn as_i64(&self) -> Result { + Ok(*self as i64) + } + + #[inline] + fn as_any(&self) -> &dyn std::any::Any { + self + } + + #[inline] + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + } + + /// Hopelessly unsafe function that emulates `num::as_ne_bytes` + /// + /// It is not recommended to use this outside of this private module as, while it + /// _should_ work for primitive values, it is little better than a transmutation + /// and can act as a backdoor into mis-interpreting types as arbitary byte slices + #[inline] + fn as_raw<'a, T>(value: *const T) -> &'a [u8] { + unsafe { + let value = value as *const u8; + std::slice::from_raw_parts(value, std::mem::size_of::()) + } + } + + macro_rules! impl_from_raw { + ($ty: ty, $self: ident => $as_i64: block) => { + impl ParquetValueType for $ty { + #[inline] + fn encode(values: &[Self], writer: &mut W, _: &mut BitWriter) -> Result<()> { + let raw = unsafe { + std::slice::from_raw_parts( + values.as_ptr() as *const u8, + std::mem::size_of::<$ty>() * values.len(), + ) + }; + writer.write_all(raw)?; + + Ok(()) + } + + #[inline] + fn set_data(decoder: &mut PlainDecoderDetails, data: ByteBufferPtr, num_values: usize) { + decoder.data.replace(data); + decoder.start = 0; + decoder.num_values = num_values; + } + + #[inline] + fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result { + let data = decoder.data.as_ref().expect("set_data should have been called"); + let num_values = std::cmp::min(buffer.len(), decoder.num_values); + let bytes_left = data.len() - decoder.start; + let bytes_to_decode = std::mem::size_of::() * num_values; + + if bytes_left < bytes_to_decode { + return Err(eof_err!("Not enough bytes to decode")); + } + + // SAFETY: Raw types should be as per the standard rust bit-vectors + unsafe { + let raw_buffer = &mut Self::slice_as_bytes_mut(buffer)[..bytes_to_decode]; + raw_buffer.copy_from_slice(data.range(decoder.start, bytes_to_decode).as_ref()); + }; + decoder.start += bytes_to_decode; + decoder.num_values -= num_values; + + Ok(num_values) + } + + #[inline] + fn as_i64(&$self) -> Result { + $as_i64 + } + + #[inline] + fn as_any(&self) -> &dyn std::any::Any { + self + } + + #[inline] + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + } + } + } + + impl_from_raw!(i32, self => { Ok(*self as i64) }); + impl_from_raw!(i64, self => { Ok(*self) }); + impl_from_raw!(f32, self => { Err(general_err!("Type cannot be converted to i64")) }); + impl_from_raw!(f64, self => { Err(general_err!("Type cannot be converted to i64")) }); + + impl ParquetValueType for super::Int96 { + #[inline] + fn encode( + values: &[Self], + writer: &mut W, + _: &mut BitWriter, + ) -> Result<()> { + for value in values { + let raw = unsafe { + std::slice::from_raw_parts( + value.data() as *const [u32] as *const u8, + 12, + ) + }; + writer.write_all(raw)?; + } + Ok(()) + } + + #[inline] + fn set_data( + decoder: &mut PlainDecoderDetails, + data: ByteBufferPtr, + num_values: usize, + ) { + decoder.data.replace(data); + decoder.start = 0; + decoder.num_values = num_values; + } + + #[inline] + fn decode( + buffer: &mut [Self], + decoder: &mut PlainDecoderDetails, + ) -> Result { + // TODO - Remove the duplication between this and the general slice method + let data = decoder + .data + .as_ref() + .expect("set_data should have been called"); + let num_values = std::cmp::min(buffer.len(), decoder.num_values); + let bytes_left = data.len() - decoder.start; + let bytes_to_decode = 12 * num_values; + + if bytes_left < bytes_to_decode { + return Err(eof_err!("Not enough bytes to decode")); + } + + let data_range = data.range(decoder.start, bytes_to_decode); + let bytes: &[u8] = data_range.data(); + decoder.start += bytes_to_decode; + + let mut pos = 0; // position in byte array + for i in 0..num_values { + let elem0 = byteorder::LittleEndian::read_u32(&bytes[pos..pos + 4]); + let elem1 = byteorder::LittleEndian::read_u32(&bytes[pos + 4..pos + 8]); + let elem2 = byteorder::LittleEndian::read_u32(&bytes[pos + 8..pos + 12]); + + buffer[i] + .as_mut_any() + .downcast_mut::() + .unwrap() + .set_data(elem0, elem1, elem2); + + pos += 12; + } + decoder.num_values -= num_values; + + Ok(num_values) + } + + #[inline] + fn as_any(&self) -> &dyn std::any::Any { + self + } + + #[inline] + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + } + + // TODO - Why does macro importing fail? + /// Reads `$size` of bytes from `$src`, and reinterprets them as type `$ty`, in + /// little-endian order. `$ty` must implement the `Default` trait. Otherwise this won't + /// compile. + /// This is copied and modified from byteorder crate. + macro_rules! read_num_bytes { + ($ty:ty, $size:expr, $src:expr) => {{ + assert!($size <= $src.len()); + let mut buffer = + <$ty as $crate::util::bit_util::FromBytes>::Buffer::default(); + buffer.as_mut()[..$size].copy_from_slice(&$src[..$size]); + <$ty>::from_ne_bytes(buffer) + }}; + } + + impl ParquetValueType for super::ByteArray { + #[inline] + fn encode( + values: &[Self], + writer: &mut W, + _: &mut BitWriter, + ) -> Result<()> { + for value in values { + let len: u32 = value.len().try_into().unwrap(); + writer.write_all(&len.to_ne_bytes())?; + let raw = value.data(); + writer.write_all(raw)?; + } + Ok(()) + } + + #[inline] + fn set_data( + decoder: &mut PlainDecoderDetails, + data: ByteBufferPtr, + num_values: usize, + ) { + decoder.data.replace(data); + decoder.start = 0; + decoder.num_values = num_values; + } + + #[inline] + fn decode( + buffer: &mut [Self], + decoder: &mut PlainDecoderDetails, + ) -> Result { + let data = decoder + .data + .as_mut() + .expect("set_data should have been called"); + let num_values = std::cmp::min(buffer.len(), decoder.num_values); + for i in 0..num_values { + let len: usize = + read_num_bytes!(u32, 4, data.start_from(decoder.start).as_ref()) + as usize; + decoder.start += std::mem::size_of::(); + + if data.len() < decoder.start + len { + return Err(eof_err!("Not enough bytes to decode")); + } + + let val: &mut Self = buffer[i].as_mut_any().downcast_mut().unwrap(); + + val.set_data(data.range(decoder.start, len)); + decoder.start += len; + } + decoder.num_values -= num_values; + + Ok(num_values) + } + + #[inline] + fn dict_encoding_size(&self) -> (usize, usize) { + (std::mem::size_of::(), self.len()) + } + + #[inline] + fn as_any(&self) -> &dyn std::any::Any { + self + } + + #[inline] + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + } + + impl ParquetValueType for super::FixedLenByteArray { + #[inline] + fn encode( + values: &[Self], + writer: &mut W, + _: &mut BitWriter, + ) -> Result<()> { + for value in values { + let raw = value.data(); + writer.write_all(raw)?; + } + Ok(()) + } + + #[inline] + fn set_data( + decoder: &mut PlainDecoderDetails, + data: ByteBufferPtr, + num_values: usize, + ) { + decoder.data.replace(data); + decoder.start = 0; + decoder.num_values = num_values; + } + + #[inline] + fn decode( + buffer: &mut [Self], + decoder: &mut PlainDecoderDetails, + ) -> Result { + assert!(decoder.type_length > 0); + + let data = decoder + .data + .as_mut() + .expect("set_data should have been called"); + let num_values = std::cmp::min(buffer.len(), decoder.num_values); + for i in 0..num_values { + let len = decoder.type_length as usize; + + if data.len() < decoder.start + len { + return Err(eof_err!("Not enough bytes to decode")); + } + + let val: &mut Self = buffer[i].as_mut_any().downcast_mut().unwrap(); + + val.set_data(data.range(decoder.start, len)); + decoder.start += len; + } + decoder.num_values -= num_values; + + Ok(num_values) + } + + #[inline] + fn dict_encoding_size(&self) -> (usize, usize) { + (std::mem::size_of::(), self.len()) + } + + #[inline] + fn as_any(&self) -> &dyn std::any::Any { + self + } + + #[inline] + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + } +} + +/// Contains the Parquet physical type information as well as the Rust primitive type +/// presentation. +pub trait DataType: 'static { + type T: private::ParquetValueType; /// Returns Parquet physical type. fn get_physical_type() -> Type; @@ -594,8 +1188,8 @@ make_type!( Type::FIXED_LEN_BYTE_ARRAY, FixedLenByteArrayColumnReader, FixedLenByteArrayColumnWriter, - ByteArray, - mem::size_of::() + FixedLenByteArray, + mem::size_of::() ); impl FromBytes for Int96 { @@ -632,6 +1226,30 @@ impl FromBytes for ByteArray { } } +impl FromBytes for FixedLenByteArray { + type Buffer = [u8; 8]; + + fn from_le_bytes(_bs: Self::Buffer) -> Self { + unreachable!() + } + fn from_be_bytes(_bs: Self::Buffer) -> Self { + unreachable!() + } + fn from_ne_bytes(bs: Self::Buffer) -> Self { + Self(ByteArray::from(bs.to_vec())) + } +} + +/// Macro to reduce repetition in making type assertions on the physical type against `T` +macro_rules! ensure_phys_ty { + ($($ty: pat)|+ , $err: literal) => { + match T::get_physical_type() { + $($ty => (),)* + _ => panic!($err), + }; + } +} + #[cfg(test)] #[allow(clippy::float_cmp, clippy::approx_constant)] mod tests { diff --git a/rust/parquet/src/encodings/decoding.rs b/rust/parquet/src/encodings/decoding.rs index 096394eeeb0..f1e98cc606d 100644 --- a/rust/parquet/src/encodings/decoding.rs +++ b/rust/parquet/src/encodings/decoding.rs @@ -21,9 +21,8 @@ use std::{cmp, marker::PhantomData, mem}; use super::rle::RleDecoder; -use byteorder::{ByteOrder, LittleEndian}; - use crate::basic::*; +use crate::data_type::private::*; use crate::data_type::*; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; @@ -129,25 +128,31 @@ pub fn get_decoder( // ---------------------------------------------------------------------- // PLAIN Decoding -/// Plain decoding that supports all types. -/// Values are encoded back to back. For native types, data is encoded as little endian. -/// Floating point types are encoded in IEEE. -/// See [`PlainEncoder`](crate::encoding::PlainEncoder) for more information. -pub struct PlainDecoder { +#[derive(Default)] +pub struct PlainDecoderDetails { // The remaining number of values in the byte array - num_values: usize, + pub(crate) num_values: usize, - // The current starting index in the byte array. - start: usize, + // The current starting index in the byte array. Not used when `T` is bool. + pub(crate) start: usize, // The length for the type `T`. Only used when `T` is `FixedLenByteArrayType` - type_length: i32, + pub(crate) type_length: i32, // The byte array to decode from. Not set if `T` is bool. - data: Option, + pub(crate) data: Option, // Read `data` bit by bit. Only set if `T` is bool. - bit_reader: Option, + pub(crate) bit_reader: Option, +} + +/// Plain decoding that supports all types. +/// Values are encoded back to back. For native types, data is encoded as little endian. +/// Floating point types are encoded in IEEE. +/// See [`PlainEncoder`](crate::encoding::PlainEncoder) for more information. +pub struct PlainDecoder { + // The binary details needed for decoding + inner: PlainDecoderDetails, // To allow `T` in the generic parameter for this struct. This doesn't take any // space. @@ -158,11 +163,13 @@ impl PlainDecoder { /// Creates new plain decoder. pub fn new(type_length: i32) -> Self { PlainDecoder { - data: None, - bit_reader: None, - type_length, - num_values: 0, - start: 0, + inner: PlainDecoderDetails { + type_length, + num_values: 0, + start: 0, + data: None, + bit_reader: None, + }, _phantom: PhantomData, } } @@ -170,16 +177,14 @@ impl PlainDecoder { impl Decoder for PlainDecoder { #[inline] - default fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { - self.num_values = num_values; - self.start = 0; - self.data = Some(data); + fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { + T::T::set_data(&mut self.inner, data, num_values); Ok(()) } #[inline] fn values_left(&self) -> usize { - self.num_values + self.inner.num_values } #[inline] @@ -187,125 +192,9 @@ impl Decoder for PlainDecoder { Encoding::PLAIN } - #[inline] - default fn get(&mut self, _buffer: &mut [T::T]) -> Result { - unreachable!() - } -} - -impl Decoder for PlainDecoder -where - T::T: SliceAsBytes, -{ #[inline] fn get(&mut self, buffer: &mut [T::T]) -> Result { - assert!(self.data.is_some()); - - let data = self.data.as_mut().unwrap(); - let num_values = cmp::min(buffer.len(), self.num_values); - let bytes_left = data.len() - self.start; - let bytes_to_decode = mem::size_of::() * num_values; - if bytes_left < bytes_to_decode { - return Err(eof_err!("Not enough bytes to decode")); - } - let raw_buffer = &mut T::T::slice_as_bytes_mut(buffer)[..bytes_to_decode]; - raw_buffer.copy_from_slice(data.range(self.start, bytes_to_decode).as_ref()); - self.start += bytes_to_decode; - self.num_values -= num_values; - - Ok(num_values) - } -} - -impl Decoder for PlainDecoder { - fn get(&mut self, buffer: &mut [Int96]) -> Result { - assert!(self.data.is_some()); - - let data = self.data.as_ref().unwrap(); - let num_values = cmp::min(buffer.len(), self.num_values); - let bytes_left = data.len() - self.start; - let bytes_to_decode = 12 * num_values; - if bytes_left < bytes_to_decode { - return Err(eof_err!("Not enough bytes to decode")); - } - - let data_range = data.range(self.start, bytes_to_decode); - let bytes: &[u8] = data_range.data(); - self.start += bytes_to_decode; - - let mut pos = 0; // position in byte array - for i in 0..num_values { - let elem0 = LittleEndian::read_u32(&bytes[pos..pos + 4]); - let elem1 = LittleEndian::read_u32(&bytes[pos + 4..pos + 8]); - let elem2 = LittleEndian::read_u32(&bytes[pos + 8..pos + 12]); - buffer[i].set_data(elem0, elem1, elem2); - pos += 12; - } - self.num_values -= num_values; - - Ok(num_values) - } -} - -impl Decoder for PlainDecoder { - fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { - self.num_values = num_values; - self.bit_reader = Some(BitReader::new(data)); - Ok(()) - } - - default fn get(&mut self, buffer: &mut [bool]) -> Result { - assert!(self.bit_reader.is_some()); - - let bit_reader = self.bit_reader.as_mut().unwrap(); - let num_values = cmp::min(buffer.len(), self.num_values); - let values_read = bit_reader.get_batch::(&mut buffer[..num_values], 1); - self.num_values -= values_read; - - Ok(values_read) - } -} - -impl Decoder for PlainDecoder { - fn get(&mut self, buffer: &mut [ByteArray]) -> Result { - assert!(self.data.is_some()); - - let data = self.data.as_mut().unwrap(); - let num_values = cmp::min(buffer.len(), self.num_values); - for i in 0..num_values { - let len: usize = - read_num_bytes!(u32, 4, data.start_from(self.start).as_ref()) as usize; - self.start += mem::size_of::(); - if data.len() < self.start + len { - return Err(eof_err!("Not enough bytes to decode")); - } - buffer[i].set_data(data.range(self.start, len)); - self.start += len; - } - self.num_values -= num_values; - - Ok(num_values) - } -} - -impl Decoder for PlainDecoder { - fn get(&mut self, buffer: &mut [ByteArray]) -> Result { - assert!(self.data.is_some()); - assert!(self.type_length > 0); - - let data = self.data.as_mut().unwrap(); - let type_length = self.type_length as usize; - let num_values = cmp::min(buffer.len(), self.num_values); - for i in 0..num_values { - if data.len() < self.start + type_length { - return Err(eof_err!("Not enough bytes to decode")); - } - buffer[i].set_data(data.range(self.start, type_length)); - self.start += type_length; - } - self.num_values -= num_values; - - Ok(num_values) + T::T::decode(buffer, &mut self.inner) } } @@ -389,7 +278,7 @@ impl Decoder for DictDecoder { /// See [`RleValueEncoder`](crate::encoding::RleValueEncoder) for more information. pub struct RleValueDecoder { values_left: usize, - decoder: Option, + decoder: RleDecoder, _phantom: PhantomData, } @@ -397,39 +286,25 @@ impl RleValueDecoder { pub fn new() -> Self { Self { values_left: 0, - decoder: None, + decoder: RleDecoder::new(1), _phantom: PhantomData, } } +} +impl Decoder for RleValueDecoder { #[inline] - fn set_data_internal( - &mut self, - data: ByteBufferPtr, - num_values: usize, - ) -> Result<()> { + fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { + // Only support RLE value reader for boolean values with bit width of 1. + ensure_phys_ty!(Type::BOOLEAN, "RleValueDecoder only supports BoolType"); + // We still need to remove prefix of i32 from the stream. - let i32_size = mem::size_of::(); - let data_size = read_num_bytes!(i32, i32_size, data.as_ref()) as usize; - let rle_decoder = self - .decoder - .as_mut() - .expect("RLE decoder is not initialized"); - rle_decoder.set_data(data.range(i32_size, data_size)); + const I32_SIZE: usize = mem::size_of::(); + let data_size = read_num_bytes!(i32, I32_SIZE, data.as_ref()) as usize; + self.decoder.set_data(data.range(I32_SIZE, data_size)); self.values_left = num_values; Ok(()) } -} - -impl Decoder for RleValueDecoder { - #[inline] - default fn set_data( - &mut self, - _data: ByteBufferPtr, - _num_values: usize, - ) -> Result<()> { - panic!("RleValueDecoder only supports BoolType"); - } #[inline] fn values_left(&self) -> usize { @@ -443,26 +318,13 @@ impl Decoder for RleValueDecoder { #[inline] fn get(&mut self, buffer: &mut [T::T]) -> Result { - let rle_decoder = self - .decoder - .as_mut() - .expect("RLE decoder is not initialized"); let num_values = cmp::min(buffer.len(), self.values_left); - let values_read = rle_decoder.get_batch(&mut buffer[..num_values])?; + let values_read = self.decoder.get_batch(&mut buffer[..num_values])?; self.values_left -= values_read; Ok(values_read) } } -impl Decoder for RleValueDecoder { - #[inline] - fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { - // Only support RLE value reader for boolean values with bit width of 1. - self.decoder = Some(RleDecoder::new(1)); - self.set_data_internal(data, num_values) - } -} - // ---------------------------------------------------------------------- // DELTA_BINARY_PACKED Decoding @@ -578,13 +440,10 @@ impl DeltaBitPackDecoder { } } -impl Decoder for DeltaBitPackDecoder -where - T::T: FromBytes, -{ +impl Decoder for DeltaBitPackDecoder { // # of total values is derived from encoding #[inline] - default fn set_data(&mut self, data: ByteBufferPtr, _: usize) -> Result<()> { + fn set_data(&mut self, data: ByteBufferPtr, _index: usize) -> Result<()> { self.bit_reader = BitReader::new(data); self.initialized = true; @@ -618,7 +477,7 @@ where Ok(()) } - default fn get(&mut self, buffer: &mut [T::T]) -> Result { + fn get(&mut self, buffer: &mut [T::T]) -> Result { assert!(self.initialized, "Bit reader is not initialized"); let num_values = cmp::min(buffer.len(), self.num_values); @@ -678,38 +537,30 @@ trait DeltaBitPackDecoderConversion { } impl DeltaBitPackDecoderConversion for DeltaBitPackDecoder { - #[inline] - default fn get_delta(&self, _: usize) -> i64 { - panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type") - } - - #[inline] - default fn set_decoded_value(&self, _: &mut [T::T], _: usize, _: i64) { - panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type") - } -} - -impl DeltaBitPackDecoderConversion for DeltaBitPackDecoder { #[inline] fn get_delta(&self, index: usize) -> i64 { - self.deltas_in_mini_block[index] as i64 + ensure_phys_ty!( + Type::INT32 | Type::INT64, + "DeltaBitPackDecoder only supports Int32Type and Int64Type" + ); + self.deltas_in_mini_block[index].as_i64().unwrap() } #[inline] - fn set_decoded_value(&self, buffer: &mut [i32], index: usize, value: i64) { - buffer[index] = value as i32; - } -} + fn set_decoded_value(&self, buffer: &mut [T::T], index: usize, value: i64) { + match T::get_physical_type() { + Type::INT32 => { + let val = buffer[index].as_mut_any().downcast_mut::().unwrap(); -impl DeltaBitPackDecoderConversion for DeltaBitPackDecoder { - #[inline] - fn get_delta(&self, index: usize) -> i64 { - self.deltas_in_mini_block[index] - } + *val = value as i32; + } + Type::INT64 => { + let val = buffer[index].as_mut_any().downcast_mut::().unwrap(); - #[inline] - fn set_decoded_value(&self, buffer: &mut [i64], index: usize, value: i64) { - buffer[index] = value; + *val = value; + } + _ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"), + }; } } @@ -757,16 +608,54 @@ impl DeltaLengthByteArrayDecoder { } impl Decoder for DeltaLengthByteArrayDecoder { - default fn set_data(&mut self, _: ByteBufferPtr, _: usize) -> Result<()> { - Err(general_err!( - "DeltaLengthByteArrayDecoder only support ByteArrayType" - )) + fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { + match T::get_physical_type() { + Type::BYTE_ARRAY => { + let mut len_decoder = DeltaBitPackDecoder::::new(); + len_decoder.set_data(data.all(), num_values)?; + let num_lengths = len_decoder.values_left(); + self.lengths.resize(num_lengths, 0); + len_decoder.get(&mut self.lengths[..])?; + + self.data = Some(data.start_from(len_decoder.get_offset())); + self.offset = 0; + self.current_idx = 0; + self.num_values = num_lengths; + Ok(()) + } + _ => Err(general_err!( + "DeltaLengthByteArrayDecoder only support ByteArrayType" + )), + } } - default fn get(&mut self, _: &mut [T::T]) -> Result { - Err(general_err!( - "DeltaLengthByteArrayDecoder only support ByteArrayType" - )) + fn get(&mut self, buffer: &mut [T::T]) -> Result { + match T::get_physical_type() { + Type::BYTE_ARRAY => { + assert!(self.data.is_some()); + + let data = self.data.as_ref().unwrap(); + let num_values = cmp::min(buffer.len(), self.num_values); + for i in 0..num_values { + let len = self.lengths[self.current_idx] as usize; + + buffer[i] + .as_mut_any() + .downcast_mut::() + .unwrap() + .set_data(data.range(self.offset, len)); + + self.offset += len; + self.current_idx += 1; + } + + self.num_values -= num_values; + Ok(num_values) + } + _ => Err(general_err!( + "DeltaLengthByteArrayDecoder only support ByteArrayType" + )), + } } fn values_left(&self) -> usize { @@ -778,38 +667,6 @@ impl Decoder for DeltaLengthByteArrayDecoder { } } -impl Decoder for DeltaLengthByteArrayDecoder { - fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { - let mut len_decoder = DeltaBitPackDecoder::::new(); - len_decoder.set_data(data.all(), num_values)?; - let num_lengths = len_decoder.values_left(); - self.lengths.resize(num_lengths, 0); - len_decoder.get(&mut self.lengths[..])?; - - self.data = Some(data.start_from(len_decoder.get_offset())); - self.offset = 0; - self.current_idx = 0; - self.num_values = num_lengths; - Ok(()) - } - - fn get(&mut self, buffer: &mut [ByteArray]) -> Result { - assert!(self.data.is_some()); - - let data = self.data.as_ref().unwrap(); - let num_values = cmp::min(buffer.len(), self.num_values); - for i in 0..num_values { - let len = self.lengths[self.current_idx] as usize; - buffer[i].set_data(data.range(self.offset, len)); - self.offset += len; - self.current_idx += 1; - } - - self.num_values -= num_values; - Ok(num_values) - } -} - // ---------------------------------------------------------------------- // DELTA_BYTE_ARRAY Decoding @@ -855,16 +712,81 @@ impl DeltaByteArrayDecoder { } impl<'m, T: DataType> Decoder for DeltaByteArrayDecoder { - default fn set_data(&mut self, _: ByteBufferPtr, _: usize) -> Result<()> { - Err(general_err!( - "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType" - )) + fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { + match T::get_physical_type() { + Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { + let mut prefix_len_decoder = DeltaBitPackDecoder::::new(); + prefix_len_decoder.set_data(data.all(), num_values)?; + let num_prefixes = prefix_len_decoder.values_left(); + self.prefix_lengths.resize(num_prefixes, 0); + prefix_len_decoder.get(&mut self.prefix_lengths[..])?; + + let mut suffix_decoder = DeltaLengthByteArrayDecoder::new(); + suffix_decoder + .set_data(data.start_from(prefix_len_decoder.get_offset()), num_values)?; + self.suffix_decoder = Some(suffix_decoder); + self.num_values = num_prefixes; + self.current_idx = 0; + self.previous_value.clear(); + Ok(()) + } + _ => { + Err(general_err!( + "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType" + )) + } + } } - default fn get(&mut self, _: &mut [T::T]) -> Result { - Err(general_err!( - "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType" - )) + fn get(&mut self, buffer: &mut [T::T]) -> Result { + match T::get_physical_type() { + ty @ Type::BYTE_ARRAY | ty @ Type::FIXED_LEN_BYTE_ARRAY => { + let num_values = cmp::min(buffer.len(), self.num_values); + let mut v: [ByteArray; 1] = [ByteArray::new(); 1]; + for i in 0..num_values { + // Process suffix + // TODO: this is awkward - maybe we should add a non-vectorized API? + let suffix_decoder = self.suffix_decoder.as_mut().expect("decoder not initialized"); + suffix_decoder.get(&mut v[..])?; + let suffix = v[0].data(); + + // Extract current prefix length, can be 0 + let prefix_len = self.prefix_lengths[self.current_idx] as usize; + + // Concatenate prefix with suffix + let mut result = Vec::new(); + result.extend_from_slice(&self.previous_value[0..prefix_len]); + result.extend_from_slice(suffix); + + let data = ByteBufferPtr::new(result.clone()); + + match ty { + Type::BYTE_ARRAY => buffer[i] + .as_mut_any() + .downcast_mut::() + .unwrap() + .set_data(data), + Type::FIXED_LEN_BYTE_ARRAY => buffer[i] + .as_mut_any() + .downcast_mut::() + .unwrap() + .set_data(data), + _ => unreachable!(), + }; + + self.previous_value = result; + self.current_idx += 1; + } + + self.num_values -= num_values; + Ok(num_values) + } + _ => { + Err(general_err!( + "DeltaByteArrayDecoder only supports ByteArrayType and FixedLenByteArrayType" + )) + } + } } fn values_left(&self) -> usize { @@ -876,69 +798,6 @@ impl<'m, T: DataType> Decoder for DeltaByteArrayDecoder { } } -impl Decoder for DeltaByteArrayDecoder { - fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { - let mut prefix_len_decoder = DeltaBitPackDecoder::::new(); - prefix_len_decoder.set_data(data.all(), num_values)?; - let num_prefixes = prefix_len_decoder.values_left(); - self.prefix_lengths.resize(num_prefixes, 0); - prefix_len_decoder.get(&mut self.prefix_lengths[..])?; - - let mut suffix_decoder = DeltaLengthByteArrayDecoder::new(); - suffix_decoder - .set_data(data.start_from(prefix_len_decoder.get_offset()), num_values)?; - self.suffix_decoder = Some(suffix_decoder); - self.num_values = num_prefixes; - self.current_idx = 0; - self.previous_value.clear(); - Ok(()) - } - - fn get(&mut self, buffer: &mut [ByteArray]) -> Result { - assert!(self.suffix_decoder.is_some()); - - let num_values = cmp::min(buffer.len(), self.num_values); - let mut v: [ByteArray; 1] = [ByteArray::new(); 1]; - for i in 0..num_values { - // Process suffix - // TODO: this is awkward - maybe we should add a non-vectorized API? - let suffix_decoder = self.suffix_decoder.as_mut().unwrap(); - suffix_decoder.get(&mut v[..])?; - let suffix = v[0].data(); - - // Extract current prefix length, can be 0 - let prefix_len = self.prefix_lengths[self.current_idx] as usize; - - // Concatenate prefix with suffix - let mut result = Vec::new(); - result.extend_from_slice(&self.previous_value[0..prefix_len]); - result.extend_from_slice(suffix); - - let data = ByteBufferPtr::new(result.clone()); - buffer[i].set_data(data); - self.previous_value = result; - self.current_idx += 1; - } - - self.num_values -= num_values; - Ok(num_values) - } -} - -impl Decoder for DeltaByteArrayDecoder { - fn set_data(&mut self, data: ByteBufferPtr, num_values: usize) -> Result<()> { - let s: &mut DeltaByteArrayDecoder = - unsafe { mem::transmute(self) }; - s.set_data(data, num_values) - } - - fn get(&mut self, buffer: &mut [ByteArray]) -> Result { - let s: &mut DeltaByteArrayDecoder = - unsafe { mem::transmute(self) }; - s.get(buffer) - } -} - #[cfg(test)] #[allow(clippy::approx_constant)] mod tests { @@ -1110,12 +969,12 @@ mod tests { #[test] fn test_plain_decode_fixed_len_byte_array() { - let mut data = vec![ByteArray::default(); 3]; + let mut data = vec![FixedLenByteArray::default(); 3]; data[0].set_data(ByteBufferPtr::new(String::from("bird").into_bytes())); data[1].set_data(ByteBufferPtr::new(String::from("come").into_bytes())); data[2].set_data(ByteBufferPtr::new(String::from("flow").into_bytes())); let data_bytes = FixedLenByteArrayType::to_byte_array(&data[..]); - let mut buffer = vec![ByteArray::default(); 3]; + let mut buffer = vec![FixedLenByteArray::default(); 3]; test_plain_decode::( ByteBufferPtr::new(data_bytes), 3, @@ -1460,16 +1319,21 @@ mod tests { fn to_byte_array(data: &[T::T]) -> Vec; } - impl ToByteArray for T - where - T: SliceAsBytesDataType, - ::T: SliceAsBytes, - { - default fn to_byte_array(data: &[T::T]) -> Vec { - ::T::slice_as_bytes(data).to_vec() - } + macro_rules! to_byte_array_impl { + ($ty: ty) => { + impl ToByteArray<$ty> for $ty { + fn to_byte_array(data: &[<$ty as DataType>::T]) -> Vec { + <$ty as DataType>::T::slice_as_bytes(data).to_vec() + } + } + }; } + to_byte_array_impl!(Int32Type); + to_byte_array_impl!(Int64Type); + to_byte_array_impl!(FloatType); + to_byte_array_impl!(DoubleType); + impl ToByteArray for BoolType { fn to_byte_array(data: &[bool]) -> Vec { let mut v = vec![]; @@ -1509,7 +1373,7 @@ mod tests { } impl ToByteArray for FixedLenByteArrayType { - fn to_byte_array(data: &[ByteArray]) -> Vec { + fn to_byte_array(data: &[FixedLenByteArray]) -> Vec { let mut v = vec![]; for d in data { let buf = d.data(); diff --git a/rust/parquet/src/encodings/encoding.rs b/rust/parquet/src/encodings/encoding.rs index 0cbb6fe8526..d429e278433 100644 --- a/rust/parquet/src/encodings/encoding.rs +++ b/rust/parquet/src/encodings/encoding.rs @@ -17,9 +17,10 @@ //! Contains all supported encoders for Parquet. -use std::{cmp, io::Write, marker::PhantomData, mem}; +use std::{cmp, io::Write, marker::PhantomData}; use crate::basic::*; +use crate::data_type::private::ParquetValueType; use crate::data_type::*; use crate::encodings::rle::RleEncoder; use crate::errors::{ParquetError, Result}; @@ -129,6 +130,10 @@ impl PlainEncoder { } impl Encoder for PlainEncoder { + // Performance Note: + // As far as can be seen these functions are rarely called and as such we can hint to the + // compiler that they dont need to be folded into hot locations in the final output. + #[cold] fn encoding(&self) -> Encoding { Encoding::PLAIN } @@ -138,7 +143,7 @@ impl Encoder for PlainEncoder { } #[inline] - default fn flush_buffer(&mut self) -> Result { + fn flush_buffer(&mut self) -> Result { self.buffer.write_all(self.bit_writer.flush_buffer())?; self.buffer.flush()?; self.bit_writer.clear(); @@ -146,59 +151,9 @@ impl Encoder for PlainEncoder { Ok(self.buffer.consume()) } - default fn put(&mut self, _values: &[T::T]) -> Result<()> { - unreachable!() - } -} - -impl Encoder for PlainEncoder -where - T::T: SliceAsBytes, -{ - default fn put(&mut self, values: &[T::T]) -> Result<()> { - let bytes = T::T::slice_as_bytes(values); - self.buffer.write_all(bytes)?; - Ok(()) - } -} - -impl Encoder for PlainEncoder { - fn put(&mut self, values: &[bool]) -> Result<()> { - for v in values { - self.bit_writer.put_value(*v as u64, 1); - } - Ok(()) - } -} - -impl Encoder for PlainEncoder { - fn put(&mut self, values: &[Int96]) -> Result<()> { - for v in values { - self.buffer.write_all(v.as_bytes())?; - } - self.buffer.flush()?; - Ok(()) - } -} - -impl Encoder for PlainEncoder { - fn put(&mut self, values: &[ByteArray]) -> Result<()> { - for v in values { - self.buffer - .write_all(&(v.len().to_le() as u32).as_bytes())?; - self.buffer.write_all(v.data())?; - } - self.buffer.flush()?; - Ok(()) - } -} - -impl Encoder for PlainEncoder { - fn put(&mut self, values: &[ByteArray]) -> Result<()> { - for v in values { - self.buffer.write_all(v.data())?; - } - self.buffer.flush()?; + #[inline] + fn put(&mut self, values: &[T::T]) -> Result<()> { + T::T::encode(values, &mut self.buffer, &mut self.bit_writer)?; Ok(()) } } @@ -295,7 +250,6 @@ impl DictEncoder { /// Writes out the dictionary values with RLE encoding in a byte buffer, and return /// the result. - #[inline] pub fn write_indices(&mut self) -> Result { // TODO: the caller should allocate the buffer let buffer_len = self.estimated_data_encoded_size(); @@ -329,25 +283,34 @@ impl DictEncoder { } if index == HASH_SLOT_EMPTY { - index = self.uniques.size() as i32; - self.hash_slots[j] = index; - self.add_dict_key(value.clone()); - - if self.uniques.size() - > (self.hash_table_size as f32 * MAX_HASH_LOAD) as usize - { - self.double_table_size(); - } + index = self.insert_fresh_slot(j, value.clone()); } self.buffered_indices.push(index); Ok(()) } - #[inline] - fn add_dict_key(&mut self, value: T::T) { - self.uniques_size_in_bytes += self.get_encoded_size(&value); + #[inline(never)] + fn insert_fresh_slot(&mut self, slot: usize, value: T::T) -> i32 { + let index = self.uniques.size() as i32; + self.hash_slots[slot] = index; + + let (base_size, num_elements) = value.dict_encoding_size(); + + let unique_size = match T::get_physical_type() { + Type::BYTE_ARRAY => base_size + num_elements, + Type::FIXED_LEN_BYTE_ARRAY => self.desc.type_length() as usize, + _ => base_size, + }; + + self.uniques_size_in_bytes += unique_size; self.uniques.push(value); + + if self.uniques.size() > (self.hash_table_size as f32 * MAX_HASH_LOAD) as usize { + self.double_table_size(); + } + + index } #[inline] @@ -362,7 +325,6 @@ impl DictEncoder { } } - #[inline] fn double_table_size(&mut self) { let new_size = self.hash_table_size * 2; let mut new_hash_slots = Buffer::new().with_mem_tracker(self.mem_tracker.clone()); @@ -401,7 +363,10 @@ impl Encoder for DictEncoder { Ok(()) } - #[inline] + // Performance Note: + // As far as can be seen these functions are rarely called and as such we can hint to the + // compiler that they dont need to be folded into hot locations in the final output. + #[cold] fn encoding(&self) -> Encoding { Encoding::PLAIN_DICTIONARY } @@ -419,33 +384,6 @@ impl Encoder for DictEncoder { } } -/// Provides encoded size for a data type. -/// This is a workaround to calculate dictionary size in bytes. -trait DictEncodedSize { - fn get_encoded_size(&self, value: &T::T) -> usize; -} - -impl DictEncodedSize for DictEncoder { - #[inline] - default fn get_encoded_size(&self, _: &T::T) -> usize { - mem::size_of::() - } -} - -impl DictEncodedSize for DictEncoder { - #[inline] - fn get_encoded_size(&self, value: &ByteArray) -> usize { - mem::size_of::() + value.len() - } -} - -impl DictEncodedSize for DictEncoder { - #[inline] - fn get_encoded_size(&self, _value: &ByteArray) -> usize { - self.desc.type_length() as usize - } -} - // ---------------------------------------------------------------------- // RLE encoding @@ -472,50 +410,45 @@ impl RleValueEncoder { impl Encoder for RleValueEncoder { #[inline] - default fn put(&mut self, _values: &[T::T]) -> Result<()> { - panic!("RleValueEncoder only supports BoolType"); + fn put(&mut self, values: &[T::T]) -> Result<()> { + ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType"); + + if self.encoder.is_none() { + self.encoder = Some(RleEncoder::new(1, DEFAULT_RLE_BUFFER_LEN)); + } + let rle_encoder = self.encoder.as_mut().unwrap(); + for value in values { + let value = value.as_u64()?; + if !rle_encoder.put(value)? { + return Err(general_err!("RLE buffer is full")); + } + } + Ok(()) } + // Performance Note: + // As far as can be seen these functions are rarely called and as such we can hint to the + // compiler that they dont need to be folded into hot locations in the final output. + #[cold] fn encoding(&self) -> Encoding { Encoding::RLE } #[inline] - default fn estimated_data_encoded_size(&self) -> usize { + fn estimated_data_encoded_size(&self) -> usize { match self.encoder { Some(ref enc) => enc.len(), None => 0, } } - #[inline] - default fn flush_buffer(&mut self) -> Result { - panic!("RleValueEncoder only supports BoolType"); - } -} - -impl Encoder for RleValueEncoder { - #[inline] - default fn put(&mut self, values: &[bool]) -> Result<()> { - if self.encoder.is_none() { - self.encoder = Some(RleEncoder::new(1, DEFAULT_RLE_BUFFER_LEN)); - } - let rle_encoder = self.encoder.as_mut().unwrap(); - for value in values { - if !rle_encoder.put(*value as u64)? { - return Err(general_err!("RLE buffer is full")); - } - } - Ok(()) - } - #[inline] fn flush_buffer(&mut self) -> Result { - assert!( - self.encoder.is_some(), - "RLE value encoder is not initialized" - ); - let rle_encoder = self.encoder.as_mut().unwrap(); + ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType"); + let rle_encoder = self + .encoder + .as_mut() + .expect("RLE value encoder is not initialized"); // Flush all encoder buffers and raw values let encoded_data = { @@ -524,7 +457,7 @@ impl Encoder for RleValueEncoder { // Note that buf does not have any offset, all data is encoded bytes let len = (buf.len() as i32).to_le(); let len_bytes = len.as_bytes(); - let mut encoded_data = Vec::new(); + let mut encoded_data = vec![]; encoded_data.extend_from_slice(len_bytes); encoded_data.extend_from_slice(buf); encoded_data @@ -625,6 +558,7 @@ impl DeltaBitPackEncoder { } // Write current delta buffer (<= 'block size' values) into bit writer + #[inline(never)] fn flush_block_values(&mut self) -> Result<()> { if self.values_in_block == 0 { return Ok(()); @@ -717,6 +651,10 @@ impl Encoder for DeltaBitPackEncoder { Ok(()) } + // Performance Note: + // As far as can be seen these functions are rarely called and as such we can hint to the + // compiler that they dont need to be folded into hot locations in the final output. + #[cold] fn encoding(&self) -> Encoding { Encoding::DELTA_BINARY_PACKED } @@ -761,72 +699,39 @@ trait DeltaBitPackEncoderConversion { } impl DeltaBitPackEncoderConversion for DeltaBitPackEncoder { - #[inline] - default fn assert_supported_type() { - panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"); - } - - #[inline] - default fn as_i64(&self, _values: &[T::T], _index: usize) -> i64 { - 0 - } - - #[inline] - default fn subtract(&self, _left: i64, _right: i64) -> i64 { - 0 - } - - #[inline] - default fn subtract_u64(&self, _left: i64, _right: i64) -> u64 { - 0 - } -} - -impl DeltaBitPackEncoderConversion for DeltaBitPackEncoder { #[inline] fn assert_supported_type() { - // no-op: supported type - } - - #[inline] - fn as_i64(&self, values: &[i32], index: usize) -> i64 { - values[index] as i64 - } - - #[inline] - fn subtract(&self, left: i64, right: i64) -> i64 { - // It is okay for values to overflow, wrapping_sub wrapping around at the boundary - (left as i32).wrapping_sub(right as i32) as i64 - } - - #[inline] - fn subtract_u64(&self, left: i64, right: i64) -> u64 { - // Conversion of i32 -> u32 -> u64 is to avoid non-zero left most bytes in int - // representation - (left as i32).wrapping_sub(right as i32) as u32 as u64 - } -} - -impl DeltaBitPackEncoderConversion for DeltaBitPackEncoder { - #[inline] - fn assert_supported_type() { - // no-op: supported type + ensure_phys_ty!( + Type::INT32 | Type::INT64, + "DeltaBitPackDecoder only supports Int32Type and Int64Type" + ); } #[inline] - fn as_i64(&self, values: &[i64], index: usize) -> i64 { + fn as_i64(&self, values: &[T::T], index: usize) -> i64 { values[index] + .as_i64() + .expect("DeltaBitPackDecoder only supports Int32Type and Int64Type") } #[inline] fn subtract(&self, left: i64, right: i64) -> i64 { // It is okay for values to overflow, wrapping_sub wrapping around at the boundary - left.wrapping_sub(right) + match T::get_physical_type() { + Type::INT32 => (left as i32).wrapping_sub(right as i32) as i64, + Type::INT64 => left.wrapping_sub(right), + _ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"), + } } #[inline] fn subtract_u64(&self, left: i64, right: i64) -> u64 { - left.wrapping_sub(right) as u64 + match T::get_physical_type() { + // Conversion of i32 -> u32 -> u64 is to avoid non-zero left most bytes in int repr + Type::INT32 => (left as i32).wrapping_sub(right as i32) as u32 as u64, + Type::INT64 => left.wrapping_sub(right) as u64, + _ => panic!("DeltaBitPackDecoder only supports Int32Type and Int64Type"), + } } } @@ -859,10 +764,33 @@ impl DeltaLengthByteArrayEncoder { } impl Encoder for DeltaLengthByteArrayEncoder { - default fn put(&mut self, _values: &[T::T]) -> Result<()> { - panic!("DeltaLengthByteArrayEncoder only supports ByteArrayType"); + fn put(&mut self, values: &[T::T]) -> Result<()> { + ensure_phys_ty!( + Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY, + "DeltaLengthByteArrayEncoder only supports ByteArrayType" + ); + + let val_it = || { + values + .iter() + .map(|x| x.as_any().downcast_ref::().unwrap()) + }; + + let lengths: Vec = + val_it().map(|byte_array| byte_array.len() as i32).collect(); + self.len_encoder.put(&lengths)?; + for byte_array in val_it() { + self.encoded_size += byte_array.len(); + self.data.push(byte_array.clone()); + } + + Ok(()) } + // Performance Note: + // As far as can be seen these functions are rarely called and as such we can hint to the + // compiler that they dont need to be folded into hot locations in the final output. + #[cold] fn encoding(&self) -> Encoding { Encoding::DELTA_LENGTH_BYTE_ARRAY } @@ -871,26 +799,12 @@ impl Encoder for DeltaLengthByteArrayEncoder { self.len_encoder.estimated_data_encoded_size() + self.encoded_size } - default fn flush_buffer(&mut self) -> Result { - panic!("DeltaLengthByteArrayEncoder only supports ByteArrayType"); - } -} - -impl Encoder for DeltaLengthByteArrayEncoder { - fn put(&mut self, values: &[ByteArray]) -> Result<()> { - let lengths: Vec = values - .iter() - .map(|byte_array| byte_array.len() as i32) - .collect(); - self.len_encoder.put(&lengths)?; - for byte_array in values { - self.encoded_size += byte_array.len(); - self.data.push(byte_array.clone()); - } - Ok(()) - } - fn flush_buffer(&mut self) -> Result { + ensure_phys_ty!( + Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY, + "DeltaLengthByteArrayEncoder only supports ByteArrayType" + ); + let mut total_bytes = vec![]; let lengths = self.len_encoder.flush_buffer()?; total_bytes.extend_from_slice(lengths.data()); @@ -899,6 +813,7 @@ impl Encoder for DeltaLengthByteArrayEncoder { }); self.data.clear(); self.encoded_size = 0; + Ok(ByteBufferPtr::new(total_bytes)) } } @@ -910,7 +825,7 @@ impl Encoder for DeltaLengthByteArrayEncoder { /// encoding, followed by suffixes with DELTA_LENGTH_BYTE_ARRAY encoding. pub struct DeltaByteArrayEncoder { prefix_len_encoder: DeltaBitPackEncoder, - suffix_writer: DeltaLengthByteArrayEncoder, + suffix_writer: DeltaLengthByteArrayEncoder, previous: Vec, _phantom: PhantomData, } @@ -919,8 +834,8 @@ impl DeltaByteArrayEncoder { /// Creates new delta byte array encoder. pub fn new() -> Self { Self { - prefix_len_encoder: DeltaBitPackEncoder::::new(), - suffix_writer: DeltaLengthByteArrayEncoder::::new(), + prefix_len_encoder: DeltaBitPackEncoder::new(), + suffix_writer: DeltaLengthByteArrayEncoder::new(), previous: vec![], _phantom: PhantomData, } @@ -928,33 +843,20 @@ impl DeltaByteArrayEncoder { } impl Encoder for DeltaByteArrayEncoder { - default fn put(&mut self, _values: &[T::T]) -> Result<()> { - panic!( - "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType" - ); - } - - fn encoding(&self) -> Encoding { - Encoding::DELTA_BYTE_ARRAY - } - - fn estimated_data_encoded_size(&self) -> usize { - self.prefix_len_encoder.estimated_data_encoded_size() - + self.suffix_writer.estimated_data_encoded_size() - } - - default fn flush_buffer(&mut self) -> Result { - panic!( - "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType" - ); - } -} - -impl Encoder for DeltaByteArrayEncoder { - fn put(&mut self, values: &[ByteArray]) -> Result<()> { + fn put(&mut self, values: &[T::T]) -> Result<()> { let mut prefix_lengths: Vec = vec![]; let mut suffixes: Vec = vec![]; + let values = values.iter() + .map(|x| x.as_any()) + .map(|x| match T::get_physical_type() { + Type::BYTE_ARRAY => x.downcast_ref::().unwrap(), + Type::FIXED_LEN_BYTE_ARRAY => x.downcast_ref::().unwrap(), + _ => panic!( + "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType" + ) + }); + for byte_array in values { let current = byte_array.data(); // Maximum prefix length that is shared between previous value and current @@ -973,36 +875,43 @@ impl Encoder for DeltaByteArrayEncoder { } self.prefix_len_encoder.put(&prefix_lengths)?; self.suffix_writer.put(&suffixes)?; + Ok(()) } - fn flush_buffer(&mut self) -> Result { - // TODO: investigate if we can merge lengths and suffixes - // without copying data into new vector. - let mut total_bytes = vec![]; - // Insert lengths ... - let lengths = self.prefix_len_encoder.flush_buffer()?; - total_bytes.extend_from_slice(lengths.data()); - // ... followed by suffixes - let suffixes = self.suffix_writer.flush_buffer()?; - total_bytes.extend_from_slice(suffixes.data()); - - self.previous.clear(); - Ok(ByteBufferPtr::new(total_bytes)) + // Performance Note: + // As far as can be seen these functions are rarely called and as such we can hint to the + // compiler that they dont need to be folded into hot locations in the final output. + #[cold] + fn encoding(&self) -> Encoding { + Encoding::DELTA_BYTE_ARRAY } -} -impl Encoder for DeltaByteArrayEncoder { - fn put(&mut self, values: &[ByteArray]) -> Result<()> { - let s: &mut DeltaByteArrayEncoder = - unsafe { mem::transmute(self) }; - s.put(values) + fn estimated_data_encoded_size(&self) -> usize { + self.prefix_len_encoder.estimated_data_encoded_size() + + self.suffix_writer.estimated_data_encoded_size() } fn flush_buffer(&mut self) -> Result { - let s: &mut DeltaByteArrayEncoder = - unsafe { mem::transmute(self) }; - s.flush_buffer() + match T::get_physical_type() { + Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { + // TODO: investigate if we can merge lengths and suffixes + // without copying data into new vector. + let mut total_bytes = vec![]; + // Insert lengths ... + let lengths = self.prefix_len_encoder.flush_buffer()?; + total_bytes.extend_from_slice(lengths.data()); + // ... followed by suffixes + let suffixes = self.suffix_writer.flush_buffer()?; + total_bytes.extend_from_slice(suffixes.data()); + + self.previous.clear(); + Ok(ByteBufferPtr::new(total_bytes)) + } + _ => panic!( + "DeltaByteArrayEncoder only supports ByteArrayType and FixedLenByteArrayType" + ) + } } } @@ -1142,7 +1051,7 @@ mod tests { ); run_test::( 2, - &[ByteArray::from("ab"), ByteArray::from("bc")], + &[ByteArray::from("ab").into(), ByteArray::from("bc").into()], 4, ); } @@ -1262,7 +1171,7 @@ mod tests { fn test_dict_internal(total: usize, type_length: i32) -> Result<()>; } - impl EncodingTester for T { + impl> EncodingTester for T { fn test_internal(enc: Encoding, total: usize, type_length: i32) -> Result<()> { let mut encoder = create_test_encoder::(type_length, enc); let mut decoder = create_test_decoder::(type_length, enc); diff --git a/rust/parquet/src/encodings/rle.rs b/rust/parquet/src/encodings/rle.rs index 0f0e8d8bc13..a429ddb4100 100644 --- a/rust/parquet/src/encodings/rle.rs +++ b/rust/parquet/src/encodings/rle.rs @@ -237,7 +237,6 @@ impl RleEncoder { Ok(()) } - #[inline] fn flush_rle_run(&mut self) -> Result<()> { assert!(self.repeat_count > 0); let indicator_value = self.repeat_count << 1; @@ -254,7 +253,6 @@ impl RleEncoder { Ok(()) } - #[inline] fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) -> Result<()> { if self.indicator_byte_pos < 0 { self.indicator_byte_pos = self.bit_writer.skip(1)? as i64; @@ -284,7 +282,7 @@ impl RleEncoder { Ok(()) } - #[inline] + #[inline(never)] fn flush_buffered_values(&mut self) -> Result<()> { if self.repeat_count >= 8 { self.num_buffered_values = 0; @@ -321,7 +319,7 @@ pub struct RleDecoder { bit_reader: Option, // Buffer used when `bit_reader` is not `None`, for batch reading. - index_buf: Option<[i32; 1024]>, + index_buf: [i32; 1024], // The remaining number of values in RLE for this run rle_left: u32, @@ -340,23 +338,25 @@ impl RleDecoder { rle_left: 0, bit_packed_left: 0, bit_reader: None, - index_buf: None, + index_buf: [0; 1024], current_value: None, } } + #[inline] pub fn set_data(&mut self, data: ByteBufferPtr) { if let Some(ref mut bit_reader) = self.bit_reader { bit_reader.reset(data); } else { self.bit_reader = Some(BitReader::new(data)); - self.index_buf = Some([0; 1024]); } let _ = self.reload(); } - #[inline] + // These functions inline badly, they tend to inline and then create very large loop unrolls + // that damage L1d-cache occupancy. This results in a ~18% performance drop + #[inline(never)] pub fn get(&mut self) -> Result> { assert!(size_of::() <= 8); @@ -389,15 +389,13 @@ impl RleDecoder { Ok(Some(value)) } - #[inline] + #[inline(never)] pub fn get_batch(&mut self, buffer: &mut [T]) -> Result { - assert!(self.bit_reader.is_some()); assert!(size_of::() <= 8); let mut values_read = 0; while values_read < buffer.len() { if self.rle_left > 0 { - assert!(self.current_value.is_some()); let num_values = cmp::min(buffer.len() - values_read, self.rle_left as usize); for i in 0..num_values { @@ -409,17 +407,17 @@ impl RleDecoder { self.rle_left -= num_values as u32; values_read += num_values; } else if self.bit_packed_left > 0 { - assert!(self.bit_reader.is_some()); let mut num_values = cmp::min(buffer.len() - values_read, self.bit_packed_left as usize); - if let Some(ref mut bit_reader) = self.bit_reader { - num_values = bit_reader.get_batch::( - &mut buffer[values_read..values_read + num_values], - self.bit_width as usize, - ); - self.bit_packed_left -= num_values as u32; - values_read += num_values; - } + let bit_reader = + self.bit_reader.as_mut().expect("bit_reader should be set"); + + num_values = bit_reader.get_batch::( + &mut buffer[values_read..values_read + num_values], + self.bit_width as usize, + ); + self.bit_packed_left -= num_values as u32; + values_read += num_values; } else if !self.reload() { break; } @@ -428,7 +426,7 @@ impl RleDecoder { Ok(values_read) } - #[inline] + #[inline(never)] pub fn get_batch_with_dict( &mut self, dict: &[T], @@ -443,7 +441,6 @@ impl RleDecoder { let mut values_read = 0; while values_read < max_values { if self.rle_left > 0 { - assert!(self.current_value.is_some()); let num_values = cmp::min(max_values - values_read, self.rle_left as usize); let dict_idx = self.current_value.unwrap() as usize; @@ -453,25 +450,26 @@ impl RleDecoder { self.rle_left -= num_values as u32; values_read += num_values; } else if self.bit_packed_left > 0 { - assert!(self.bit_reader.is_some()); + let bit_reader = + self.bit_reader.as_mut().expect("bit_reader should be set"); + let mut num_values = cmp::min(max_values - values_read, self.bit_packed_left as usize); - if let Some(ref mut bit_reader) = self.bit_reader { - let mut index_buf = self.index_buf.unwrap(); - num_values = cmp::min(num_values, index_buf.len()); - loop { - num_values = bit_reader.get_batch::( - &mut index_buf[..num_values], - self.bit_width as usize, - ); - for i in 0..num_values { - buffer[values_read + i] = dict[index_buf[i] as usize].clone(); - } - self.bit_packed_left -= num_values as u32; - values_read += num_values; - if num_values < index_buf.len() { - break; - } + + num_values = cmp::min(num_values, self.index_buf.len()); + loop { + num_values = bit_reader.get_batch::( + &mut self.index_buf[..num_values], + self.bit_width as usize, + ); + for i in 0..num_values { + buffer[values_read + i] = + dict[self.index_buf[i] as usize].clone(); + } + self.bit_packed_left -= num_values as u32; + values_read += num_values; + if num_values < self.index_buf.len() { + break; } } } else if !self.reload() { @@ -484,24 +482,21 @@ impl RleDecoder { #[inline] fn reload(&mut self) -> bool { - assert!(self.bit_reader.is_some()); - if let Some(ref mut bit_reader) = self.bit_reader { - if let Some(indicator_value) = bit_reader.get_vlq_int() { - if indicator_value & 1 == 1 { - self.bit_packed_left = ((indicator_value >> 1) * 8) as u32; - } else { - self.rle_left = (indicator_value >> 1) as u32; - let value_width = bit_util::ceil(self.bit_width as i64, 8); - self.current_value = - bit_reader.get_aligned::(value_width as usize); - assert!(self.current_value.is_some()); - } - return true; + let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set"); + + if let Some(indicator_value) = bit_reader.get_vlq_int() { + if indicator_value & 1 == 1 { + self.bit_packed_left = ((indicator_value >> 1) * 8) as u32; } else { - return false; + self.rle_left = (indicator_value >> 1) as u32; + let value_width = bit_util::ceil(self.bit_width as i64, 8); + self.current_value = bit_reader.get_aligned::(value_width as usize); + assert!(self.current_value.is_some()); } + true + } else { + false } - false } } diff --git a/rust/parquet/src/file/statistics.rs b/rust/parquet/src/file/statistics.rs index 20de31c9956..238312cdfb4 100644 --- a/rust/parquet/src/file/statistics.rs +++ b/rust/parquet/src/file/statistics.rs @@ -179,8 +179,8 @@ pub fn from_thrift( old_format, ), Type::FIXED_LEN_BYTE_ARRAY => Statistics::fixed_len_byte_array( - min.map(ByteArray::from), - max.map(ByteArray::from), + min.map(ByteArray::from).map(FixedLenByteArray::from), + max.map(ByteArray::from).map(FixedLenByteArray::from), distinct_count, null_count, old_format, @@ -259,7 +259,11 @@ impl Statistics { statistics_new_func![byte_array, Option, ByteArray]; - statistics_new_func![fixed_len_byte_array, Option, FixedLenByteArray]; + statistics_new_func![ + fixed_len_byte_array, + Option, + FixedLenByteArray + ]; /// Returns `true` if statistics have old `min` and `max` fields set. /// This means that the column order is likely to be undefined, which, for old files @@ -423,12 +427,12 @@ impl fmt::Display for TypedStatistics { write!(f, "{{")?; write!(f, "min: ")?; match self.min { - Some(ref value) => self.value_fmt(f, value)?, + Some(ref value) => write!(f, "{}", value)?, None => write!(f, "N/A")?, } write!(f, ", max: ")?; match self.max { - Some(ref value) => self.value_fmt(f, value)?, + Some(ref value) => write!(f, "{}", value)?, None => write!(f, "N/A")?, } write!(f, ", distinct_count: ")?; @@ -467,37 +471,6 @@ impl cmp::PartialEq for TypedStatistics { } } -/// Trait to provide a specific write format for values. -/// For example, we should display vector slices for byte array types, and original -/// values for other types. -trait ValueDisplay { - fn value_fmt(&self, f: &mut fmt::Formatter, value: &T::T) -> fmt::Result; -} - -impl ValueDisplay for TypedStatistics { - default fn value_fmt(&self, f: &mut fmt::Formatter, value: &T::T) -> fmt::Result { - write!(f, "{:?}", value) - } -} - -impl ValueDisplay for TypedStatistics { - fn value_fmt(&self, f: &mut fmt::Formatter, value: &Int96) -> fmt::Result { - write!(f, "{:?}", value.data()) - } -} - -impl ValueDisplay for TypedStatistics { - fn value_fmt(&self, f: &mut fmt::Formatter, value: &ByteArray) -> fmt::Result { - write!(f, "{:?}", value.data()) - } -} - -impl ValueDisplay for TypedStatistics { - fn value_fmt(&self, f: &mut fmt::Formatter, value: &ByteArray) -> fmt::Result { - write!(f, "{:?}", value.data()) - } -} - #[cfg(test)] mod tests { use super::*; @@ -628,8 +601,8 @@ mod tests { 0, true ) != Statistics::fixed_len_byte_array( - Some(ByteArray::from(vec![1, 2, 3])), - Some(ByteArray::from(vec![1, 2, 3])), + Some(ByteArray::from(vec![1, 2, 3]).into()), + Some(ByteArray::from(vec![1, 2, 3]).into()), None, 0, true @@ -679,8 +652,8 @@ mod tests { check_stats(Statistics::byte_array(None, None, None, 7, true)); check_stats(Statistics::fixed_len_byte_array( - Some(ByteArray::from(vec![1, 2, 3])), - Some(ByteArray::from(vec![3, 4, 5])), + Some(ByteArray::from(vec![1, 2, 3]).into()), + Some(ByteArray::from(vec![3, 4, 5]).into()), None, 7, true, diff --git a/rust/parquet/src/lib.rs b/rust/parquet/src/lib.rs index d292312a59f..5ce63bbdedc 100644 --- a/rust/parquet/src/lib.rs +++ b/rust/parquet/src/lib.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -#![feature(specialization)] #![allow(incomplete_features)] #![allow(dead_code)] #![allow(non_camel_case_types)] @@ -39,6 +38,7 @@ #[macro_use] pub mod errors; pub mod basic; +#[macro_use] pub mod data_type; // Exported for external use, such as benchmarks diff --git a/rust/parquet/src/record/triplet.rs b/rust/parquet/src/record/triplet.rs index 6a7efa5d7ac..bb4f942fd18 100644 --- a/rust/parquet/src/record/triplet.rs +++ b/rust/parquet/src/record/triplet.rs @@ -163,7 +163,7 @@ impl TripletIter { TripletIter::FixedLenByteArrayTripletIter(ref typed) => { Field::convert_byte_array( typed.column_descr(), - typed.current_value().clone(), + typed.current_value().clone().into(), ) } } diff --git a/rust/parquet/src/util/bit_util.rs b/rust/parquet/src/util/bit_util.rs index 5ccd1636b7b..677b669287b 100644 --- a/rust/parquet/src/util/bit_util.rs +++ b/rust/parquet/src/util/bit_util.rs @@ -21,6 +21,7 @@ use crate::data_type::AsBytes; use crate::errors::{ParquetError, Result}; use crate::util::{bit_packing::unpack32, memory::ByteBufferPtr}; +#[inline] pub fn from_ne_slice(bs: &[u8]) -> T { let mut b = T::Buffer::default(); { @@ -124,11 +125,7 @@ where /// Returns the ceil of value/divisor #[inline] pub fn ceil(value: i64, divisor: i64) -> i64 { - let mut result = value / divisor; - if value % divisor != 0 { - result += 1 - }; - result + value / divisor + ((value % divisor != 0) as i64) } /// Returns ceil(log2(x)) @@ -473,7 +470,6 @@ impl BitReader { } } - #[inline] pub fn reset(&mut self, buffer: ByteBufferPtr) { self.buffer = buffer; self.total_bytes = self.buffer.len(); @@ -492,7 +488,6 @@ impl BitReader { /// Reads a value of type `T` and of size `num_bits`. /// /// Returns `None` if there's not enough data available. `Some` otherwise. - #[inline] pub fn get_value(&mut self, num_bits: usize) -> Option { assert!(num_bits <= 64); assert!(num_bits <= size_of::() * 8); @@ -518,7 +513,6 @@ impl BitReader { Some(from_ne_slice(v.as_bytes())) } - #[inline] pub fn get_batch(&mut self, batch: &mut [T], num_bits: usize) -> usize { assert!(num_bits <= 32); assert!(num_bits <= size_of::() * 8); @@ -601,7 +595,6 @@ impl BitReader { /// Returns `Some` if there's enough bytes left to form a value of `T`. /// Otherwise `None`. - #[inline] pub fn get_aligned(&mut self, num_bytes: usize) -> Option { let bytes_read = ceil(self.bit_offset as i64, 8) as usize; if self.byte_offset + bytes_read + num_bytes > self.total_bytes { @@ -627,7 +620,6 @@ impl BitReader { /// The encoded int must start at the beginning of a byte. /// /// Returns `None` if there's not enough bytes in the stream. `Some` otherwise. - #[inline] pub fn get_vlq_int(&mut self) -> Option { let mut shift = 0; let mut v: i64 = 0; @@ -663,7 +655,6 @@ impl BitReader { }) } - #[inline] fn reload_buffer_values(&mut self) { let bytes_to_read = cmp::min(self.total_bytes - self.byte_offset, 8); self.buffered_values = read_num_bytes!( diff --git a/rust/parquet/src/util/memory.rs b/rust/parquet/src/util/memory.rs index 901135adb26..7444f979841 100644 --- a/rust/parquet/src/util/memory.rs +++ b/rust/parquet/src/util/memory.rs @@ -383,12 +383,10 @@ impl Display for BufferPtr { impl Drop for BufferPtr { fn drop(&mut self) { - if self.is_mem_tracked() - && Arc::strong_count(&self.data) == 1 - && Arc::weak_count(&self.data) == 0 - { - let mc = self.mem_tracker.as_ref().unwrap(); - mc.alloc(-(self.data.capacity() as i64)); + if let Some(ref mc) = self.mem_tracker { + if Arc::strong_count(&self.data) == 1 && Arc::weak_count(&self.data) == 0 { + mc.alloc(-(self.data.capacity() as i64)); + } } } } diff --git a/rust/parquet/src/util/test_common/rand_gen.rs b/rust/parquet/src/util/test_common/rand_gen.rs index dae8af59e9d..3395bbec502 100644 --- a/rust/parquet/src/util/test_common/rand_gen.rs +++ b/rust/parquet/src/util/test_common/rand_gen.rs @@ -36,12 +36,6 @@ pub trait RandGen { } } -impl RandGen for T { - default fn gen(_: i32) -> T::T { - panic!("Unsupported data type"); - } -} - impl RandGen for BoolType { fn gen(_: i32) -> bool { thread_rng().gen::() @@ -96,7 +90,7 @@ impl RandGen for ByteArrayType { } impl RandGen for FixedLenByteArrayType { - fn gen(len: i32) -> ByteArray { + fn gen(len: i32) -> FixedLenByteArray { let mut rng = thread_rng(); let value_len = if len < 0 { rng.gen_range(0, 128) @@ -104,7 +98,7 @@ impl RandGen for FixedLenByteArrayType { len as usize }; let value = random_bytes(value_len); - ByteArray::from(value) + ByteArray::from(value).into() } } diff --git a/rust/rust-toolchain b/rust/rust-toolchain deleted file mode 100644 index ed4dad9a381..00000000000 --- a/rust/rust-toolchain +++ /dev/null @@ -1 +0,0 @@ -nightly-2020-11-24