Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rust/arrow/src/array/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2821,7 +2821,7 @@ mod tests {
#[should_panic(expected = "memory is not aligned")]
fn test_primitive_array_alignment() {
let ptr = memory::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8) };
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
let buf2 = buf.slice(1);
let array_data = ArrayData::builder(DataType::Int32).add_buffer(buf2).build();
Int32Array::from(array_data);
Expand All @@ -2831,7 +2831,7 @@ mod tests {
#[should_panic(expected = "memory is not aligned")]
fn test_list_array_alignment() {
let ptr = memory::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8) };
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
let buf2 = buf.slice(1);

let values: [i32; 8] = [0; 8];
Expand All @@ -2851,7 +2851,7 @@ mod tests {
#[should_panic(expected = "memory is not aligned")]
fn test_binary_array_alignment() {
let ptr = memory::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8) };
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
let buf2 = buf.slice(1);

let values: [u8; 12] = [0; 12];
Expand Down
15 changes: 14 additions & 1 deletion rust/arrow/src/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::util::bit_util;

use std::ops::{BitAnd, BitOr};

#[derive(PartialEq, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct Bitmap {
pub(crate) bits: Buffer,
}
Expand Down Expand Up @@ -87,6 +87,19 @@ impl From<Buffer> for Bitmap {
}
}

impl PartialEq for Bitmap {
fn eq(&self, other: &Self) -> bool {
// buffer equality considers capacity, but here we want to only compare
// actual data contents
let self_len = self.bits.len();
let other_len = other.bits.len();
if self_len != other_len {
return false;
}
&self.bits.data()[..self_len] == &other.bits.data()[..self_len]
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
92 changes: 73 additions & 19 deletions rust/arrow/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,28 @@ struct BufferData {
/// The raw pointer into the buffer bytes
ptr: *const u8,

/// The length (num of bytes) of the buffer
/// The length (num of bytes) of the buffer. The region `[0, len)` of the buffer
/// is occupied with meaningful data, while the rest `[len, capacity)` is the
/// unoccupied region.
len: usize,

/// Whether this piece of memory is owned by this object
owned: bool,

/// The capacity (num of bytes) of the buffer
/// Invariant: len <= capacity
capacity: usize,
}

impl PartialEq for BufferData {
fn eq(&self, other: &BufferData) -> bool {
if self.len != other.len {
return false;
}
if self.capacity != other.capacity {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against the current implementation, but I wonder if we should only compare the "meaningful" data?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. It seems wrong that on one hand the memcmp is comparing up to len and we bypass that if there is a mismatch on capacity

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks guys! I'm slightly confused - here we are comparing meaningful data, no? and if capacity mismatch then it is considered not equal and we skip comparing the data content.

The equality is defined as: 1) both length should be equal, 2) both capacity should be equal, and 3) data content up to length should be equal. Let me know if this definition sounds good to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me if two arrays only differ by the amount of padding that they have then I would consider them equal. When I perform operations using these two arrays I will get the same answer (because the padding, or rather amount of padding, does not impact the result). However:

  • I am focused on computation, maybe there are other implication in IPC, etc.
  • In practice, this probably won't come up as we round up padding to a multiple of 64 bytes, but it could happen.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Array equality is defined separately in array/equal.rs and yes, it does take account on what you said (it compares buffer content with data()). In the current context we are discussing equality of buffers though, which IMO, when looking at in isolation, should consider both len and capacity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, yes I agree.

return false;
}

unsafe { memory::memcmp(self.ptr, other.ptr, self.len) == 0 }
}
}
Expand All @@ -73,7 +83,7 @@ impl PartialEq for BufferData {
impl Drop for BufferData {
fn drop(&mut self) {
if !self.ptr.is_null() && self.owned {
memory::free_aligned(self.ptr as *mut u8, self.len);
memory::free_aligned(self.ptr as *mut u8, self.capacity);
}
}
}
Expand All @@ -82,8 +92,8 @@ impl Debug for BufferData {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"BufferData {{ ptr: {:?}, len: {}, data: ",
self.ptr, self.len
"BufferData {{ ptr: {:?}, len: {}, capacity: {}, data: ",
self.ptr, self.len, self.capacity
)?;

unsafe {
Expand All @@ -104,13 +114,14 @@ impl Buffer {
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
/// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes**
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes.
pub unsafe fn from_raw_parts(ptr: *const u8, len: usize) -> Self {
Buffer::build_with_arguments(ptr, len, true)
pub unsafe fn from_raw_parts(ptr: *const u8, len: usize, capacity: usize) -> Self {
Buffer::build_with_arguments(ptr, len, capacity, true)
}

/// Creates a buffer from an existing memory region (must already be byte-aligned), this
Expand All @@ -120,13 +131,14 @@ impl Buffer {
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
/// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes**
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes.
pub unsafe fn from_unowned(ptr: *const u8, len: usize) -> Self {
Buffer::build_with_arguments(ptr, len, false)
pub unsafe fn from_unowned(ptr: *const u8, len: usize, capacity: usize) -> Self {
Buffer::build_with_arguments(ptr, len, capacity, false)
}

/// Creates a buffer from an existing memory region (must already be byte-aligned).
Expand All @@ -135,19 +147,30 @@ impl Buffer {
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in bytes
/// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes**
/// * `owned` - Whether the raw parts is owned by this `Buffer`. If true, this `Buffer` will
/// free this memory when dropped, otherwise it will skip freeing the raw parts.
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes.
unsafe fn build_with_arguments(ptr: *const u8, len: usize, owned: bool) -> Self {
unsafe fn build_with_arguments(
ptr: *const u8,
len: usize,
capacity: usize,
owned: bool,
) -> Self {
assert!(
memory::is_aligned(ptr, memory::ALIGNMENT),
"memory not aligned"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also check that we are padded to 64 bytes. We assume this in the SIMD implementations and it's recommended here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably test for this also.

let buf_data = BufferData { ptr, len, owned };
let buf_data = BufferData {
ptr,
len,
capacity,
owned,
};
Buffer {
data: Arc::new(buf_data),
offset: 0,
Expand All @@ -159,6 +182,11 @@ impl Buffer {
self.data.len - self.offset
}

/// Returns the capacity of this buffer
pub fn capacity(&self) -> usize {
self.data.capacity
}

/// Returns whether the buffer is empty.
pub fn is_empty(&self) -> bool {
self.data.len - self.offset == 0
Expand Down Expand Up @@ -210,7 +238,7 @@ impl Buffer {

/// Returns an empty buffer.
pub fn empty() -> Self {
unsafe { Self::from_raw_parts(::std::ptr::null(), 0) }
unsafe { Self::from_raw_parts(::std::ptr::null(), 0, 0) }
}
}

Expand All @@ -234,7 +262,7 @@ impl<T: AsRef<[u8]>> From<T> for Buffer {
let buffer = memory::allocate_aligned(capacity);
unsafe {
memory::memcpy(buffer, slice.as_ptr(), len);
Buffer::from_raw_parts(buffer, len)
Buffer::from_raw_parts(buffer, len, capacity)
}
}
}
Expand Down Expand Up @@ -504,6 +532,7 @@ impl MutableBuffer {
let buffer_data = BufferData {
ptr: self.data,
len: self.len,
capacity: self.capacity,
owned: true,
};
std::mem::forget(self);
Expand All @@ -527,6 +556,9 @@ impl PartialEq for MutableBuffer {
if self.len != other.len {
return false;
}
if self.capacity != other.capacity {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue .. memcmp compares up to len so it doesn't matter if the capacities match or not

return false;
}
unsafe { memory::memcmp(self.data, other.data, self.len) == 0 }
}
}
Expand Down Expand Up @@ -584,45 +616,47 @@ mod tests {

#[test]
fn test_from_raw_parts() {
let buf = unsafe { Buffer::from_raw_parts(null_mut(), 0) };
let buf = unsafe { Buffer::from_raw_parts(null_mut(), 0, 0) };
assert_eq!(0, buf.len());
assert_eq!(0, buf.data().len());
assert_eq!(0, buf.capacity());
assert!(buf.raw_data().is_null());

let buf = Buffer::from(&[0, 1, 2, 3, 4]);
assert_eq!(5, buf.len());
assert!(!buf.raw_data().is_null());
assert_eq!(&[0, 1, 2, 3, 4], buf.data());
assert_eq!([0, 1, 2, 3, 4], buf.data());
}

#[test]
fn test_from_vec() {
let buf = Buffer::from(&[0, 1, 2, 3, 4]);
assert_eq!(5, buf.len());
assert!(!buf.raw_data().is_null());
assert_eq!(&[0, 1, 2, 3, 4], buf.data());
assert_eq!([0, 1, 2, 3, 4], buf.data());
}

#[test]
fn test_copy() {
let buf = Buffer::from(&[0, 1, 2, 3, 4]);
let buf2 = buf.clone();
assert_eq!(5, buf2.len());
assert_eq!(64, buf2.capacity());
assert!(!buf2.raw_data().is_null());
assert_eq!(&[0, 1, 2, 3, 4], buf2.data());
assert_eq!([0, 1, 2, 3, 4], buf2.data());
}

#[test]
fn test_slice() {
let buf = Buffer::from(&[2, 4, 6, 8, 10]);
let buf2 = buf.slice(2);

assert_eq!(&[6, 8, 10], buf2.data());
assert_eq!([6, 8, 10], buf2.data());
assert_eq!(3, buf2.len());
assert_eq!(unsafe { buf.raw_data().offset(2) }, buf2.raw_data());

let buf3 = buf2.slice(1);
assert_eq!(&[8, 10], buf3.data());
assert_eq!([8, 10], buf3.data());
assert_eq!(2, buf3.len());
assert_eq!(unsafe { buf.raw_data().offset(3) }, buf3.raw_data());

Expand Down Expand Up @@ -778,18 +812,38 @@ mod tests {
buf.write("aaaa bbbb cccc dddd".as_bytes())
.expect("write should be OK");
assert_eq!(19, buf.len());
assert_eq!(64, buf.capacity());
assert_eq!("aaaa bbbb cccc dddd".as_bytes(), buf.data());

let immutable_buf = buf.freeze();
assert_eq!(19, immutable_buf.len());
assert_eq!(64, immutable_buf.capacity());
assert_eq!("aaaa bbbb cccc dddd".as_bytes(), immutable_buf.data());
}

#[test]
fn test_mutable_equal() -> Result<()> {
let mut buf = MutableBuffer::new(1);
let mut buf2 = MutableBuffer::new(1);

buf.write(&[0xaa])?;
buf2.write(&[0xaa, 0xbb])?;
assert!(buf != buf2);

buf.write(&[0xbb])?;
assert_eq!(buf, buf2);

buf2.reserve(65)?;
assert!(buf != buf2);

Ok(())
}

#[test]
fn test_access_concurrently() {
let buffer = Buffer::from(vec![1, 2, 3, 4, 5]);
let buffer2 = buffer.clone();
assert_eq!(&[1, 2, 3, 4, 5], buffer.data());
assert_eq!([1, 2, 3, 4, 5], buffer.data());

let buffer_copy = thread::spawn(move || {
// access buffer in another thread.
Expand Down
2 changes: 1 addition & 1 deletion rust/arrow/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub const ALIGNMENT: usize = 64;
pub fn allocate_aligned(size: usize) -> *mut u8 {
unsafe {
let layout = Layout::from_size_align_unchecked(size, ALIGNMENT);
std::alloc::alloc(layout)
std::alloc::alloc_zeroed(layout)
}
}

Expand Down
9 changes: 6 additions & 3 deletions rust/arrow/src/tensor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ impl<'a, T: ArrowPrimitiveType> Tensor<'a, T> {

/// The total number of elements in the `Tensor`
pub fn size(&self) -> usize {
(self.buffer.len() / mem::size_of::<T::Native>())
match self.shape {
None => 0,
Some(ref s) => s.iter().fold(1, |a, b| a * b),
}
}

/// Indicates if the data is laid out contiguously in memory
Expand Down Expand Up @@ -255,7 +258,7 @@ mod tests {
fn test_zero_dim() {
let buf = Buffer::from(&[1]);
let tensor = UInt8Tensor::new(buf, None, None, None);
assert_eq!(1, tensor.size());
assert_eq!(0, tensor.size());
assert_eq!(None, tensor.shape());
assert_eq!(None, tensor.names());
assert_eq!(0, tensor.ndim());
Expand All @@ -265,7 +268,7 @@ mod tests {

let buf = Buffer::from(&[1, 2, 2, 2]);
let tensor = Int32Tensor::new(buf, None, None, None);
assert_eq!(1, tensor.size());
assert_eq!(0, tensor.size());
assert_eq!(None, tensor.shape());
assert_eq!(None, tensor.names());
assert_eq!(0, tensor.ndim());
Expand Down