From 151bb2a506246ee2a576abf8f27aab3afdb69d5d Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Thu, 12 Nov 2020 00:24:03 +0100 Subject: [PATCH 1/2] Simplified write. --- rust/arrow/src/array/array_primitive.rs | 11 +-- rust/arrow/src/array/builder.rs | 34 ++------- rust/arrow/src/buffer.rs | 80 +++++--------------- rust/arrow/src/compute/kernels/comparison.rs | 10 +-- rust/arrow/src/compute/kernels/filter.rs | 3 +- 5 files changed, 38 insertions(+), 100 deletions(-) diff --git a/rust/arrow/src/array/array_primitive.rs b/rust/arrow/src/array/array_primitive.rs index ade18dbe329..d1014dbb940 100644 --- a/rust/arrow/src/array/array_primitive.rs +++ b/rust/arrow/src/array/array_primitive.rs @@ -19,7 +19,6 @@ use std::any::Any; use std::borrow::Borrow; use std::convert::From; use std::fmt; -use std::io::Write; use std::iter::{FromIterator, IntoIterator}; use std::mem; use std::sync::Arc; @@ -309,9 +308,9 @@ impl::Native iter.enumerate().for_each(|(i, item)| { if let Some(a) = item.borrow() { bit_util::set_bit(null_slice, i); - val_buf.write_all(a.to_byte_slice()).unwrap(); + val_buf.extend_from_slice(a.to_byte_slice()); } else { - val_buf.write_all(&null).unwrap(); + val_buf.extend_from_slice(&null); } }); @@ -406,11 +405,9 @@ impl PrimitiveArray { for (i, v) in data.iter().enumerate() { if let Some(n) = v { bit_util::set_bit(null_slice, i); - // unwrap() in the following should be safe here since we've - // made sure enough space is allocated for the values. - val_buf.write_all(&n.to_byte_slice()).unwrap(); + val_buf.extend_from_slice(&n.to_byte_slice()); } else { - val_buf.write_all(&null).unwrap(); + val_buf.extend_from_slice(&null); } } } diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index c0300eeb510..2a6c237859a 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -23,7 +23,6 @@ use std::any::Any; use std::collections::HashMap; use std::fmt; -use std::io::Write; use std::marker::PhantomData; use std::mem; use std::{convert::TryInto, sync::Arc}; @@ -325,7 +324,7 @@ impl BufferBuilderTrait for BufferBuilder { } self.len += 1; } else { - self.write_bytes(v.to_byte_slice(), 1)?; + self.write_bytes(v.to_byte_slice(), 1); } Ok(()) } @@ -346,7 +345,7 @@ impl BufferBuilderTrait for BufferBuilder { self.len += n; } else { for _ in 0..n { - self.write_bytes(v.to_byte_slice(), 1)?; + self.write_bytes(v.to_byte_slice(), 1); } } Ok(()) @@ -371,7 +370,7 @@ impl BufferBuilderTrait for BufferBuilder { } Ok(()) } else { - self.write_bytes(slice.to_byte_slice(), array_slots) + Ok(self.write_bytes(slice.to_byte_slice(), array_slots)) } } @@ -397,18 +396,9 @@ impl BufferBuilder { /// Writes a byte slice to the underlying buffer and updates the `len`, i.e. the /// number array elements in the builder. Also, converts the `io::Result` /// required by the `Write` trait to the Arrow `Result` type. - fn write_bytes(&mut self, bytes: &[u8], len_added: usize) -> Result<()> { - let write_result = self.buffer.write(bytes); - // `io::Result` has many options one of which we use, so pattern matching is - // overkill here - if write_result.is_err() { - Err(ArrowError::MemoryError( - "Could not write to Buffer, not big enough".to_string(), - )) - } else { - self.len += len_added; - Ok(()) - } + fn write_bytes(&mut self, bytes: &[u8], len_added: usize) { + self.buffer.extend_from_slice(bytes); + self.len += len_added; } } @@ -525,7 +515,7 @@ impl ArrayBuilder for PrimitiveBuilder { let sliced = array.buffers()[0].data(); // slice into data by factoring (offset and length) * byte width self.values_builder - .write_bytes(&sliced[(offset * mul)..((len + offset) * mul)], len)?; + .write_bytes(&sliced[(offset * mul)..((len + offset) * mul)], len); } for i in 0..len { @@ -2600,21 +2590,13 @@ mod tests { fn test_write_bytes_i32() { let mut b = Int32BufferBuilder::new(4); let bytes = [8, 16, 32, 64].to_byte_slice(); - b.write_bytes(bytes, 4).unwrap(); + b.write_bytes(bytes, 4); assert_eq!(4, b.len()); assert_eq!(16, b.capacity()); let buffer = b.finish(); assert_eq!(16, buffer.len()); } - #[test] - #[should_panic(expected = "Could not write to Buffer, not big enough")] - fn test_write_too_many_bytes() { - let mut b = Int32BufferBuilder::new(0); - let bytes = [8, 16, 32, 64].to_byte_slice(); - b.write_bytes(bytes, 4).unwrap(); - } - #[test] fn test_boolean_array_builder_append_slice() { let arr1 = diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 68d07fc74d4..d5b824e2992 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -24,7 +24,6 @@ use packed_simd::u8x64; use std::cmp; use std::convert::AsRef; use std::fmt::{Debug, Formatter}; -use std::io::{Error as IoError, ErrorKind, Result as IoResult, Write}; use std::mem; use std::ops::{BitAnd, BitOr, Not}; use std::ptr::NonNull; @@ -416,9 +415,7 @@ where let rem = op(left_chunks.remainder_bits(), right_chunks.remainder_bits()); // we are counting its starting from the least significant bit, to to_le_bytes should be correct let rem = &rem.to_le_bytes()[0..remainder_bytes]; - result - .write_all(rem) - .expect("not enough capacity in buffer"); + result.extend_from_slice(rem); result.freeze() } @@ -451,9 +448,7 @@ where let rem = op(left_chunks.remainder_bits()); // we are counting its starting from the least significant bit, to to_le_bytes should be correct let rem = &rem.to_le_bytes()[0..remainder_bytes]; - result - .write_all(rem) - .expect("not enough capacity in buffer"); + result.extend_from_slice(rem); result.freeze() } @@ -773,21 +768,16 @@ impl MutableBuffer { } } - /// Writes a byte slice to the underlying buffer and updates the `len`, i.e. the - /// number array elements in the buffer. Also, converts the `io::Result` - /// required by the `Write` trait to the Arrow `Result` type. - pub fn write_bytes(&mut self, bytes: &[u8], len_added: usize) -> Result<()> { - let write_result = self.write(bytes); - // `io::Result` has many options one of which we use, so pattern matching is - // overkill here - if write_result.is_err() { - Err(ArrowError::IoError( - "Could not write to Buffer, not big enough".to_string(), - )) - } else { - self.len += len_added; - Ok(()) + /// Extends the buffer from a byte slice, incrementing its capacity if needed. + pub fn extend_from_slice(&mut self, bytes: &[u8]) { + let remaining_capacity = self.capacity - self.len; + if bytes.len() > remaining_capacity { + self.reserve(self.len + bytes.len()); } + unsafe { + memory::memcpy(self.data.add(self.len), bytes.as_ptr(), bytes.len()); + } + self.len += bytes.len(); } } @@ -811,24 +801,6 @@ impl PartialEq for MutableBuffer { } } -impl Write for MutableBuffer { - fn write(&mut self, buf: &[u8]) -> IoResult { - let remaining_capacity = self.capacity - self.len; - if buf.len() > remaining_capacity { - return Err(IoError::new(ErrorKind::Other, "Buffer not big enough")); - } - unsafe { - memory::memcpy(self.data.add(self.len), buf.as_ptr(), buf.len()); - self.len += buf.len(); - Ok(buf.len()) - } - } - - fn flush(&mut self) -> IoResult<()> { - Ok(()) - } -} - unsafe impl Sync for MutableBuffer {} unsafe impl Send for MutableBuffer {} @@ -855,8 +827,7 @@ mod tests { // Different capacities should still preserve equality let mut buf2 = MutableBuffer::new(65); - buf2.write_all(&[0, 1, 2, 3, 4]) - .expect("write should be OK"); + buf2.extend_from_slice(&[0, 1, 2, 3, 4]); let buf2 = buf2.freeze(); assert_eq!(buf1, buf2); @@ -994,33 +965,23 @@ mod tests { } #[test] - fn test_mutable_write() { + fn test_mutable_extend_from_slice() { let mut buf = MutableBuffer::new(100); - buf.write_all(b"hello").expect("Ok"); + buf.extend_from_slice(b"hello"); assert_eq!(5, buf.len()); assert_eq!(b"hello", buf.data()); - buf.write_all(b" world").expect("Ok"); + buf.extend_from_slice(b" world"); assert_eq!(11, buf.len()); assert_eq!(b"hello world", buf.data()); buf.clear(); assert_eq!(0, buf.len()); - buf.write_all(b"hello arrow").expect("Ok"); + buf.extend_from_slice(b"hello arrow"); assert_eq!(11, buf.len()); assert_eq!(b"hello arrow", buf.data()); } - #[test] - #[should_panic(expected = "Buffer not big enough")] - fn test_mutable_write_overflow() { - let mut buf = MutableBuffer::new(1); - assert_eq!(64, buf.capacity()); - for _ in 0..10 { - buf.write_all(&[0, 0, 0, 0, 0, 0, 0, 0]).unwrap(); - } - } - #[test] fn test_mutable_reserve() { let mut buf = MutableBuffer::new(1); @@ -1066,8 +1027,7 @@ mod tests { #[test] fn test_mutable_freeze() { let mut buf = MutableBuffer::new(1); - buf.write_all(b"aaaa bbbb cccc dddd") - .expect("write should be OK"); + buf.extend_from_slice(b"aaaa bbbb cccc dddd"); assert_eq!(19, buf.len()); assert_eq!(64, buf.capacity()); assert_eq!(b"aaaa bbbb cccc dddd", buf.data()); @@ -1083,11 +1043,11 @@ mod tests { let mut buf = MutableBuffer::new(1); let mut buf2 = MutableBuffer::new(1); - buf.write_all(&[0xaa])?; - buf2.write_all(&[0xaa, 0xbb])?; + buf.extend_from_slice(&[0xaa]); + buf2.extend_from_slice(&[0xaa, 0xbb]); assert!(buf != buf2); - buf.write_all(&[0xbb])?; + buf.extend_from_slice(&[0xbb]); assert_eq!(buf, buf2); buf2.reserve(65); diff --git a/rust/arrow/src/compute/kernels/comparison.rs b/rust/arrow/src/compute/kernels/comparison.rs index 88bb49987af..d73356e44fe 100644 --- a/rust/arrow/src/compute/kernels/comparison.rs +++ b/rust/arrow/src/compute/kernels/comparison.rs @@ -260,7 +260,6 @@ where T: ArrowNumericType, F: Fn(T::Simd, T::Simd) -> T::SimdMask, { - use std::io::Write; use std::mem; let len = left.len(); @@ -283,7 +282,7 @@ where let simd_right = T::load(right.value_slice(i, lanes)); let simd_result = op(simd_left, simd_right); T::bitmask(&simd_result, |b| { - result.write(b).unwrap(); + result.extend_from_slice(b); }); } @@ -293,7 +292,7 @@ where let simd_result = op(simd_left, simd_right); let rem_buffer_size = (rem as f32 / 8f32).ceil() as usize; T::bitmask(&simd_result, |b| { - result.write(&b[0..rem_buffer_size]).unwrap(); + result.extend_from_slice(&b[0..rem_buffer_size]); }); } @@ -321,7 +320,6 @@ where T: ArrowNumericType, F: Fn(T::Simd, T::Simd) -> T::SimdMask, { - use std::io::Write; use std::mem; let len = left.len(); @@ -336,7 +334,7 @@ where let simd_left = T::load(left.value_slice(i, lanes)); let simd_result = op(simd_left, simd_right); T::bitmask(&simd_result, |b| { - result.write(b).unwrap(); + result.extend_from_slice(b); }); } @@ -345,7 +343,7 @@ where let simd_result = op(simd_left, simd_right); let rem_buffer_size = (rem as f32 / 8f32).ceil() as usize; T::bitmask(&simd_result, |b| { - result.write(&b[0..rem_buffer_size]).unwrap(); + result.extend_from_slice(&b[0..rem_buffer_size]); }); } diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index 3870f9ab665..55024ff2ae2 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -327,7 +327,8 @@ impl FilterContext { let mut u64_buffer = MutableBuffer::new(filter_bytes.len()); // add to the resulting len so is is a multiple of the size of u64 let pad_addional_len = (8 - filter_bytes.len() % 8) % 8; - u64_buffer.write_bytes(filter_bytes, pad_addional_len)?; + u64_buffer.extend_from_slice(filter_bytes); + u64_buffer.extend_from_slice(&vec![0; pad_addional_len]); let mut filter_u64 = u64_buffer.typed_data_mut::().to_owned(); // mask of any bits outside of the given len From fd75933bead400a3943af1d336c006c6656d6f80 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Fri, 13 Nov 2020 05:22:11 +0100 Subject: [PATCH 2/2] Set bit error. --- rust/arrow/src/array/builder.rs | 10 ++--- rust/arrow/src/array/equal_json.rs | 2 + rust/arrow/src/util/bit_util.rs | 61 ------------------------------ 3 files changed, 7 insertions(+), 66 deletions(-) diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index 2a6c237859a..7a39b939827 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -334,13 +334,13 @@ impl BufferBuilderTrait for BufferBuilder { self.reserve(n); if T::DATA_TYPE == DataType::Boolean { if n != 0 && v != T::default_value() { - unsafe { - bit_util::set_bits_raw( + let data = unsafe { + std::slice::from_raw_parts_mut( self.buffer.raw_data_mut(), - self.len, - self.len + n, + self.buffer.capacity(), ) - } + }; + (self.len..self.len + n).for_each(|i| bit_util::set_bit(data, i)) } self.len += n; } else { diff --git a/rust/arrow/src/array/equal_json.rs b/rust/arrow/src/array/equal_json.rs index 42c0964c68b..cdc9fc4122e 100644 --- a/rust/arrow/src/array/equal_json.rs +++ b/rust/arrow/src/array/equal_json.rs @@ -533,6 +533,8 @@ mod tests { "#, ) .unwrap(); + println!("{:?}", arrow_array); + println!("{:?}", json_array); assert!(arrow_array.eq(&json_array)); assert!(json_array.eq(&arrow_array)); diff --git a/rust/arrow/src/util/bit_util.rs b/rust/arrow/src/util/bit_util.rs index 834c264d0e8..269eceb8be2 100644 --- a/rust/arrow/src/util/bit_util.rs +++ b/rust/arrow/src/util/bit_util.rs @@ -99,36 +99,6 @@ pub unsafe fn unset_bit_raw(data: *mut u8, i: usize) { *data.add(i >> 3) ^= BIT_MASK[i & 7]; } -/// Sets bits in the non-inclusive range `start..end` for `data` -/// -/// # Safety -/// -/// Note this doesn't do any bound checking, for performance reason. The caller is -/// responsible to guarantee that both `start` and `end` are within bounds. -#[inline] -pub unsafe fn set_bits_raw(data: *mut u8, start: usize, end: usize) { - let start_byte = (start >> 3) as isize; - let end_byte = (end >> 3) as isize; - - let start_offset = (start & 7) as u8; - let end_offset = (end & 7) as u8; - - // All set apart from lowest `start_offset` bits - let start_mask = !((1 << start_offset) - 1); - // All clear apart from lowest `end_offset` bits - let end_mask = (1 << end_offset) - 1; - - if start_byte == end_byte { - *data.offset(start_byte) |= start_mask & end_mask; - } else { - *data.offset(start_byte) |= start_mask; - for i in (start_byte + 1)..end_byte { - *data.offset(i) = 0xFF; - } - *data.offset(end_byte) |= end_mask; - } -} - /// Returns the number of 1-bits in `data` #[inline] pub fn count_set_bits(data: &[u8]) -> usize { @@ -334,37 +304,6 @@ mod tests { } } - #[test] - fn test_set_bits_raw() { - const NUM_BYTE: usize = 64; - const NUM_BLOCKS: usize = 12; - const MAX_BLOCK_SIZE: usize = 32; - let mut buf = vec![0; NUM_BYTE]; - - let mut expected = Vec::with_capacity(NUM_BYTE * 8); - expected.resize(NUM_BYTE * 8, false); - - let mut rng = seedable_rng(); - - for _ in 0..NUM_BLOCKS { - let start = rng.gen_range(0, NUM_BYTE * 8 - MAX_BLOCK_SIZE); - let end = start + rng.gen_range(1, MAX_BLOCK_SIZE); - unsafe { - set_bits_raw(buf.as_mut_ptr(), start, end); - } - for i in start..end { - expected[i] = true; - } - } - - let raw_ptr = buf.as_ptr(); - for (i, b) in expected.iter().enumerate() { - unsafe { - assert_eq!(*b, get_bit_raw(raw_ptr, i)); - } - } - } - #[test] fn test_get_set_bit_roundtrip() { const NUM_BYTES: usize = 10;