diff --git a/arrow-buffer/src/util/bit_iterator.rs b/arrow-buffer/src/util/bit_iterator.rs index c3e72044bf87..6a783138884b 100644 --- a/arrow-buffer/src/util/bit_iterator.rs +++ b/arrow-buffer/src/util/bit_iterator.rs @@ -216,6 +216,7 @@ impl<'a> BitIndexIterator<'a> { impl Iterator for BitIndexIterator<'_> { type Item = usize; + #[inline] fn next(&mut self) -> Option { loop { if self.current_chunk != 0 { diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index e4662b8f316c..2b8169316136 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -43,6 +43,7 @@ use crate::errors::{ParquetError, Result}; use arrow_array::cast::AsArray; use arrow_array::{Array, ArrayRef, OffsetSizeTrait}; +use arrow_buffer::bit_iterator::BitIndexIterator; use arrow_buffer::{NullBuffer, OffsetBuffer}; use arrow_schema::{DataType, Field}; use std::ops::Range; @@ -497,18 +498,19 @@ impl LevelInfoBuilder { def_levels.reserve(len); info.non_null_indices.reserve(len); - match info.array.logical_nulls() { + match &info.logical_nulls { Some(nulls) => { - // TODO: Faster bitmask iteration (#1757) - for i in range { - match nulls.is_valid(i) { - true => { - def_levels.push(info.max_def_level); - info.non_null_indices.push(i) - } - false => def_levels.push(info.max_def_level - 1), - } - } + assert!(range.end <= nulls.len()); + let nulls = nulls.inner(); + def_levels.extend(range.clone().map(|i| { + // Safety: range.end was asserted to be in bounds earlier + let valid = unsafe { nulls.value_unchecked(i) }; + info.max_def_level - (!valid as i16) + })); + info.non_null_indices.extend( + BitIndexIterator::new(nulls.inner(), nulls.offset() + range.start, len) + .map(|i| i + range.start), + ); } None => { let iter = std::iter::repeat(info.max_def_level).take(len); @@ -566,6 +568,9 @@ pub(crate) struct ArrayLevels { /// The arrow array array: ArrayRef, + + /// cached logical nulls of the array. + logical_nulls: Option, } impl PartialEq for ArrayLevels { @@ -576,6 +581,7 @@ impl PartialEq for ArrayLevels { && self.max_def_level == other.max_def_level && self.max_rep_level == other.max_rep_level && self.array.as_ref() == other.array.as_ref() + && self.logical_nulls.as_ref() == other.logical_nulls.as_ref() } } impl Eq for ArrayLevels {} @@ -588,6 +594,8 @@ impl ArrayLevels { false => ctx.def_level, }; + let logical_nulls = array.logical_nulls(); + Self { def_levels: (max_def_level != 0).then(Vec::new), rep_levels: (max_rep_level != 0).then(Vec::new), @@ -595,6 +603,7 @@ impl ArrayLevels { max_def_level, max_rep_level, array, + logical_nulls, } } @@ -668,6 +677,7 @@ mod tests { max_def_level: 2, max_rep_level: 2, array: Arc::new(primitives), + logical_nulls: None, }; assert_eq!(&levels[0], &expected); } @@ -688,6 +698,7 @@ mod tests { max_def_level: 0, max_rep_level: 0, array, + logical_nulls: None, }; assert_eq!(&levels[0], &expected_levels); } @@ -707,6 +718,7 @@ mod tests { let levels = calculate_array_levels(&array, &field).unwrap(); assert_eq!(levels.len(), 1); + let logical_nulls = array.logical_nulls(); let expected_levels = ArrayLevels { def_levels: Some(vec![1, 0, 1, 1, 0]), rep_levels: None, @@ -714,6 +726,7 @@ mod tests { max_def_level: 1, max_rep_level: 0, array, + logical_nulls, }; assert_eq!(&levels[0], &expected_levels); } @@ -748,6 +761,7 @@ mod tests { max_def_level: 1, max_rep_level: 1, array: Arc::new(leaf_array), + logical_nulls: None, }; assert_eq!(&levels[0], &expected_levels); @@ -781,6 +795,7 @@ mod tests { max_def_level: 2, max_rep_level: 1, array: Arc::new(leaf_array), + logical_nulls: None, }; assert_eq!(&levels[0], &expected_levels); } @@ -830,6 +845,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: Arc::new(leaf), + logical_nulls: None, }; assert_eq!(&levels[0], &expected_levels); @@ -880,6 +896,7 @@ mod tests { max_def_level: 5, max_rep_level: 2, array: Arc::new(leaf), + logical_nulls: None, }; assert_eq!(&levels[0], &expected_levels); @@ -917,6 +934,7 @@ mod tests { max_def_level: 1, max_rep_level: 1, array: Arc::new(leaf), + logical_nulls: None, }; assert_eq!(&levels[0], &expected_levels); @@ -949,6 +967,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: Arc::new(leaf), + logical_nulls: None, }; assert_eq!(&levels[0], &expected_levels); @@ -997,6 +1016,7 @@ mod tests { max_def_level: 5, max_rep_level: 2, array: Arc::new(leaf), + logical_nulls: None, }; assert_eq!(&levels[0], &expected_levels); } @@ -1029,6 +1049,7 @@ mod tests { let levels = calculate_array_levels(&a_array, &a_field).unwrap(); assert_eq!(levels.len(), 1); + let logical_nulls = leaf.logical_nulls(); let expected_levels = ArrayLevels { def_levels: Some(vec![3, 2, 3, 1, 0, 3]), rep_levels: None, @@ -1036,6 +1057,7 @@ mod tests { max_def_level: 3, max_rep_level: 0, array: leaf, + logical_nulls, }; assert_eq!(&levels[0], &expected_levels); } @@ -1075,6 +1097,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: Arc::new(a_values), + logical_nulls: None, }; assert_eq!(list_level, &expected_level); } @@ -1167,12 +1190,14 @@ mod tests { max_def_level: 0, max_rep_level: 0, array: Arc::new(a), + logical_nulls: None, }; assert_eq!(list_level, &expected_level); // test "b" levels let list_level = levels.get(1).unwrap(); + let b_logical_nulls = b.logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![1, 0, 0, 1, 1]), rep_levels: None, @@ -1180,12 +1205,14 @@ mod tests { max_def_level: 1, max_rep_level: 0, array: Arc::new(b), + logical_nulls: b_logical_nulls, }; assert_eq!(list_level, &expected_level); // test "d" levels let list_level = levels.get(2).unwrap(); + let d_logical_nulls = d.logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![1, 1, 1, 2, 1]), rep_levels: None, @@ -1193,12 +1220,14 @@ mod tests { max_def_level: 2, max_rep_level: 0, array: Arc::new(d), + logical_nulls: d_logical_nulls, }; assert_eq!(list_level, &expected_level); // test "f" levels let list_level = levels.get(3).unwrap(); + let f_logical_nulls = f.logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![3, 2, 3, 2, 3]), rep_levels: None, @@ -1206,6 +1235,7 @@ mod tests { max_def_level: 3, max_rep_level: 0, array: Arc::new(f), + logical_nulls: f_logical_nulls, }; assert_eq!(list_level, &expected_level); } @@ -1301,6 +1331,7 @@ mod tests { assert_eq!(levels.len(), 2); let map = batch.column(0).as_map(); + let map_keys_logical_nulls = map.keys().logical_nulls(); // test key levels let list_level = &levels[0]; @@ -1312,11 +1343,13 @@ mod tests { max_def_level: 1, max_rep_level: 1, array: map.keys().clone(), + logical_nulls: map_keys_logical_nulls, }; assert_eq!(list_level, &expected_level); // test values levels let list_level = levels.get(1).unwrap(); + let map_values_logical_nulls = map.values().logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![2, 2, 2, 1, 2, 1, 2]), @@ -1325,6 +1358,7 @@ mod tests { max_def_level: 2, max_rep_level: 1, array: map.values().clone(), + logical_nulls: map_values_logical_nulls, }; assert_eq!(list_level, &expected_level); } @@ -1403,6 +1437,7 @@ mod tests { let levels = calculate_array_levels(rb.column(0), rb.schema().field(0)).unwrap(); let list_level = &levels[0]; + let logical_nulls = values.logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![4, 1, 0, 2, 2, 3, 4]), rep_levels: Some(vec![0, 0, 0, 0, 1, 0, 0]), @@ -1410,6 +1445,7 @@ mod tests { max_def_level: 4, max_rep_level: 1, array: values, + logical_nulls, }; assert_eq!(list_level, &expected_level); @@ -1443,6 +1479,7 @@ mod tests { assert_eq!(levels.len(), 1); + let logical_nulls = values.logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![4, 4, 3, 2, 0, 4, 4, 0, 1]), rep_levels: Some(vec![0, 1, 0, 0, 0, 0, 1, 0, 0]), @@ -1450,6 +1487,7 @@ mod tests { max_def_level: 4, max_rep_level: 1, array: values, + logical_nulls, }; assert_eq!(&levels[0], &expected_level); @@ -1528,6 +1566,7 @@ mod tests { assert_eq!(levels.len(), 2); + let a1_logical_nulls = a1_values.logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![0, 0, 1, 6, 5, 2, 3, 1]), rep_levels: Some(vec![0, 0, 0, 0, 2, 0, 1, 0]), @@ -1535,10 +1574,12 @@ mod tests { max_def_level: 6, max_rep_level: 2, array: a1_values, + logical_nulls: a1_logical_nulls, }; assert_eq!(&levels[0], &expected_level); + let a2_logical_nulls = a2_values.logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![0, 0, 1, 3, 2, 4, 1]), rep_levels: Some(vec![0, 0, 0, 0, 0, 1, 0]), @@ -1546,6 +1587,7 @@ mod tests { max_def_level: 4, max_rep_level: 1, array: a2_values, + logical_nulls: a2_logical_nulls, }; assert_eq!(&levels[1], &expected_level); @@ -1577,6 +1619,7 @@ mod tests { let list_level = &levels[0]; + let logical_nulls = values.logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![0, 0, 3, 3]), rep_levels: Some(vec![0, 0, 0, 1]), @@ -1584,6 +1627,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: values, + logical_nulls, }; assert_eq!(list_level, &expected_level); } @@ -1727,6 +1771,7 @@ mod tests { let b_levels = &levels[1]; // [[{a: 1}, null], null, [null, null], [{a: null}, {a: 2}]] + let values_a_logical_nulls = values_a.logical_nulls(); let expected_a = ArrayLevels { def_levels: Some(vec![4, 2, 0, 2, 2, 3, 4]), rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]), @@ -1734,8 +1779,10 @@ mod tests { max_def_level: 4, max_rep_level: 1, array: values_a, + logical_nulls: values_a_logical_nulls, }; // [[{b: 2}, null], null, [null, null], [{b: 3}, {b: 4}]] + let values_b_logical_nulls = values_b.logical_nulls(); let expected_b = ArrayLevels { def_levels: Some(vec![3, 2, 0, 2, 2, 3, 3]), rep_levels: Some(vec![0, 1, 0, 0, 1, 0, 1]), @@ -1743,6 +1790,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: values_b, + logical_nulls: values_b_logical_nulls, }; assert_eq!(a_levels, &expected_a); @@ -1767,6 +1815,7 @@ mod tests { let list_level = &levels[0]; + let logical_nulls = values.logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![1, 0, 1]), rep_levels: Some(vec![0, 0, 0]), @@ -1774,6 +1823,7 @@ mod tests { max_def_level: 3, max_rep_level: 1, array: values, + logical_nulls, }; assert_eq!(list_level, &expected_level); } @@ -1802,6 +1852,7 @@ mod tests { builder.write(0..4); let levels = builder.finish(); + let logical_nulls = values.logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]), rep_levels: Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]), @@ -1809,6 +1860,7 @@ mod tests { max_def_level: 5, max_rep_level: 2, array: values, + logical_nulls, }; assert_eq!(levels[0], expected_level); @@ -1832,6 +1884,8 @@ mod tests { let mut builder = levels(&item_field, dict.clone()); builder.write(0..4); let levels = builder.finish(); + + let logical_nulls = dict.logical_nulls(); let expected_level = ArrayLevels { def_levels: Some(vec![0, 0, 1, 1]), rep_levels: None, @@ -1839,6 +1893,7 @@ mod tests { max_def_level: 1, max_rep_level: 0, array: Arc::new(dict), + logical_nulls, }; assert_eq!(levels[0], expected_level); } diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 69ef4538baa1..384a4a10486e 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -101,6 +101,7 @@ const SALT: [u32; 8] = [ /// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits. /// Each word is thought of as an array of bits; each bit is either "set" or "not set". #[derive(Debug, Copy, Clone)] +#[repr(transparent)] struct Block([u32; 8]); impl Block { const ZERO: Block = Block([0; 8]); @@ -118,24 +119,12 @@ impl Block { Self(result) } - #[inline] - #[cfg(target_endian = "little")] - fn to_le_bytes(self) -> [u8; 32] { - self.to_ne_bytes() - } - #[inline] #[cfg(not(target_endian = "little"))] fn to_le_bytes(self) -> [u8; 32] { self.swap_bytes().to_ne_bytes() } - #[inline] - fn to_ne_bytes(self) -> [u8; 32] { - // SAFETY: [u32; 8] and [u8; 32] have the same size and neither has invalid bit patterns. - unsafe { std::mem::transmute(self.0) } - } - #[inline] #[cfg(not(target_endian = "little"))] fn swap_bytes(mut self) -> Self { @@ -248,8 +237,10 @@ impl Sbbf { /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH]. pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self { let num_bytes = optimal_num_of_bytes(num_bytes); - let bitset = vec![0_u8; num_bytes]; - Self::new(&bitset) + assert_eq!(num_bytes % size_of::(), 0); + let num_blocks = num_bytes / size_of::(); + let bitset = vec![Block::ZERO; num_blocks]; + Self(bitset) } pub(crate) fn new(bitset: &[u8]) -> Self { @@ -281,6 +272,7 @@ impl Sbbf { } /// Write the bitset in serialized form to the writer. + #[cfg(not(target_endian = "little"))] fn write_bitset(&self, mut writer: W) -> Result<(), ParquetError> { for block in &self.0 { writer @@ -292,6 +284,22 @@ impl Sbbf { Ok(()) } + /// Write the bitset in serialized form to the writer. + #[cfg(target_endian = "little")] + fn write_bitset(&self, mut writer: W) -> Result<(), ParquetError> { + // Safety: Block is repr(transparent) and [u32; 8] can be reinterpreted as [u8; 32]. + let slice = unsafe { + std::slice::from_raw_parts( + self.0.as_ptr() as *const u8, + self.0.len() * size_of::(), + ) + }; + writer.write_all(slice).map_err(|e| { + ParquetError::General(format!("Could not write bloom filter bit set: {e}")) + })?; + Ok(()) + } + /// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form fn header(&self) -> BloomFilterHeader { BloomFilterHeader { diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 8a2bab5a642e..6a0f780e56af 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -653,15 +653,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { ) })?; - let mut values_to_write = 0; - for &level in levels { - if level == self.descr.max_def_level() { - values_to_write += 1; - } else { - // We must always compute this as it is used to populate v2 pages - self.page_metrics.num_page_nulls += 1 - } - } + let values_to_write = levels + .iter() + .map(|level| (*level == self.descr.max_def_level()) as usize) + .sum(); + self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64; // Update histogram self.page_metrics.update_definition_level_histogram(levels); diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index d6e32600d321..89a1f00a5850 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -84,6 +84,7 @@ impl RleEncoder { /// Initialize the encoder from existing `buffer` pub fn new_from_buf(bit_width: u8, buffer: Vec) -> Self { + assert!(bit_width <= 64); let bit_writer = BitWriter::new_from_buf(buffer); RleEncoder { bit_width, @@ -135,7 +136,7 @@ impl RleEncoder { } else { if self.repeat_count >= 8 { // The current RLE run has ended and we've gathered enough. Flush first. - assert_eq!(self.bit_packed_count, 0); + debug_assert_eq!(self.bit_packed_count, 0); self.flush_rle_run(); } self.repeat_count = 1; @@ -146,7 +147,7 @@ impl RleEncoder { self.num_buffered_values += 1; if self.num_buffered_values == 8 { // Buffered values are full. Flush them. - assert_eq!(self.bit_packed_count % 8, 0); + debug_assert_eq!(self.bit_packed_count % 8, 0); self.flush_buffered_values(); } } @@ -220,7 +221,7 @@ impl RleEncoder { } fn flush_rle_run(&mut self) { - assert!(self.repeat_count > 0); + debug_assert!(self.repeat_count > 0); let indicator_value = self.repeat_count << 1; self.bit_writer.put_vlq_int(indicator_value as u64); self.bit_writer.put_aligned( @@ -237,9 +238,8 @@ impl RleEncoder { } // Write all buffered values as bit-packed literals - for i in 0..self.num_buffered_values { - self.bit_writer - .put_value(self.buffered_values[i], self.bit_width as usize); + for v in &self.buffered_values[..self.num_buffered_values] { + self.bit_writer.put_value(*v, self.bit_width as usize); } self.num_buffered_values = 0; if update_indicator_byte { @@ -253,14 +253,13 @@ impl RleEncoder { } } - #[inline(never)] fn flush_buffered_values(&mut self) { if self.repeat_count >= 8 { self.num_buffered_values = 0; if self.bit_packed_count > 0 { // In this case we choose RLE encoding. Flush the current buffered values // as bit-packed encoding. - assert_eq!(self.bit_packed_count % 8, 0); + debug_assert_eq!(self.bit_packed_count % 8, 0); self.flush_bit_packed_run(true) } return; @@ -271,7 +270,7 @@ impl RleEncoder { if num_groups + 1 >= MAX_GROUPS_PER_BIT_PACKED_RUN { // We've reached the maximum value that can be hold in a single bit-packed // run. - assert!(self.indicator_byte_pos >= 0); + debug_assert!(self.indicator_byte_pos >= 0); self.flush_bit_packed_run(true); } else { self.flush_bit_packed_run(false); diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs index 8f6c2d8f8184..b3015c2ba755 100644 --- a/parquet/src/util/bit_util.rs +++ b/parquet/src/util/bit_util.rs @@ -283,9 +283,9 @@ impl BitWriter { /// The `num_bits` must not be greater than 64. This is bit packed. #[inline] pub fn put_value(&mut self, v: u64, num_bits: usize) { - assert!(num_bits <= 64); + debug_assert!(num_bits <= 64); let num_bits = num_bits as u8; - assert_eq!(v.checked_shr(num_bits as u32).unwrap_or(0), 0); // covers case v >> 64 + debug_assert_eq!(v.checked_shr(num_bits as u32).unwrap_or(0), 0); // covers case v >> 64 // Add value to buffered_values self.buffered_values |= v << self.bit_offset;