Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rust/arrow/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1905,7 +1905,7 @@ mod tests {
#[test]
#[should_panic(expected = "memory is not aligned")]
fn test_primitive_array_alignment() {
let ptr = memory::allocate_aligned(8).unwrap();
let ptr = memory::allocate_aligned(8);
let buf = Buffer::from_raw_parts(ptr, 8);
let buf2 = buf.slice(1);
let array_data = ArrayData::builder(DataType::Int32).add_buffer(buf2).build();
Expand All @@ -1915,7 +1915,7 @@ mod tests {
#[test]
#[should_panic(expected = "memory is not aligned")]
fn test_list_array_alignment() {
let ptr = memory::allocate_aligned(8).unwrap();
let ptr = memory::allocate_aligned(8);
let buf = Buffer::from_raw_parts(ptr, 8);
let buf2 = buf.slice(1);

Expand All @@ -1935,7 +1935,7 @@ mod tests {
#[test]
#[should_panic(expected = "memory is not aligned")]
fn test_binary_array_alignment() {
let ptr = memory::allocate_aligned(8).unwrap();
let ptr = memory::allocate_aligned(8);
let buf = Buffer::from_raw_parts(ptr, 8);
let buf2 = buf.slice(1);

Expand Down
23 changes: 14 additions & 9 deletions rust/arrow/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! The main type in the module is `Buffer`, a contiguous immutable memory region of
//! fixed size aligned at a 64-byte boundary. `MutableBuffer` is like `Buffer`, but it can
//! be mutated and grown.
//!
use packed_simd::u8x64;

use std::cmp;
Expand Down Expand Up @@ -65,14 +64,19 @@ impl PartialEq for BufferData {
/// Release the underlying memory when the current buffer goes out of scope
impl Drop for BufferData {
fn drop(&mut self) {
memory::free_aligned(self.ptr);
if !self.ptr.is_null() {
memory::free_aligned(self.ptr as *mut u8, self.len);
}
}
}

impl Buffer {
/// Creates a buffer from an existing memory region (must already be byte-aligned)
pub fn from_raw_parts(ptr: *const u8, len: usize) -> Self {
assert!(memory::is_aligned(ptr, 64), "memory not aligned");
assert!(
memory::is_aligned(ptr, memory::ALIGNMENT),
"memory not aligned"
);
let buf_data = BufferData { ptr, len };
Buffer {
data: Arc::new(buf_data),
Expand Down Expand Up @@ -138,7 +142,7 @@ impl<T: AsRef<[u8]>> From<T> for Buffer {
let slice = p.as_ref();
let len = slice.len() * mem::size_of::<u8>();
let capacity = bit_util::round_upto_multiple_of_64(len);
let buffer = memory::allocate_aligned(capacity).unwrap();
let buffer = memory::allocate_aligned(capacity);
unsafe {
memory::memcpy(buffer, slice.as_ptr(), len);
}
Expand Down Expand Up @@ -295,7 +299,7 @@ impl MutableBuffer {
/// Allocate a new mutable buffer with initial capacity to be `capacity`.
pub fn new(capacity: usize) -> Self {
let new_capacity = bit_util::round_upto_multiple_of_64(capacity);
let ptr = memory::allocate_aligned(new_capacity).unwrap();
let ptr = memory::allocate_aligned(new_capacity);
Self {
data: ptr,
len: 0,
Expand Down Expand Up @@ -339,7 +343,7 @@ impl MutableBuffer {
if capacity > self.capacity {
let new_capacity = bit_util::round_upto_multiple_of_64(capacity);
let new_capacity = cmp::max(new_capacity, self.capacity * 2);
let new_data = memory::reallocate(self.capacity, new_capacity, self.data)?;
let new_data = memory::reallocate(self.data, self.capacity, new_capacity);
self.data = new_data as *mut u8;
self.capacity = new_capacity;
}
Expand All @@ -359,8 +363,7 @@ impl MutableBuffer {
} else {
let new_capacity = bit_util::round_upto_multiple_of_64(new_len);
if new_capacity < self.capacity {
let new_data =
memory::reallocate(self.capacity, new_capacity, self.data)?;
let new_data = memory::reallocate(self.data, self.capacity, new_capacity);
self.data = new_data as *mut u8;
self.capacity = new_capacity;
}
Expand Down Expand Up @@ -425,7 +428,9 @@ impl MutableBuffer {

impl Drop for MutableBuffer {
fn drop(&mut self) {
memory::free_aligned(self.data);
if !self.data.is_null() {
memory::free_aligned(self.data, self.capacity);
}
}
}

Expand Down
81 changes: 18 additions & 63 deletions rust/arrow/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,83 +15,38 @@
// specific language governing permissions and limitations
// under the License.

//! Defines memory-related functions, currently mostly to make this library play nicely
//! with C.
//! Defines memory-related functions, such as allocate/deallocate/reallocate memory
//! regions.

use libc;
use std::cmp;
use std::mem;
use std::alloc::Layout;

use crate::error::{ArrowError, Result};
pub const ALIGNMENT: usize = 64;

const ALIGNMENT: usize = 64;

#[cfg(windows)]
#[link(name = "msvcrt")]
extern "C" {
fn _aligned_malloc(size: libc::size_t, alignment: libc::size_t) -> libc::size_t;
fn _aligned_free(prt: *const u8);
}

#[cfg(windows)]
pub fn allocate_aligned(size: usize) -> Result<*mut u8> {
let page =
unsafe { _aligned_malloc(size as libc::size_t, ALIGNMENT as libc::size_t) };
match page {
0 => Err(ArrowError::MemoryError(
"Failed to allocate memory".to_string(),
)),
_ => Ok(unsafe { mem::transmute::<libc::size_t, *mut u8>(page) }),
}
}

#[cfg(not(windows))]
pub fn allocate_aligned(size: usize) -> Result<*mut u8> {
unsafe {
let mut page: *mut libc::c_void = mem::uninitialized();
let result = libc::posix_memalign(&mut page, ALIGNMENT, size);
match result {
0 => Ok(mem::transmute::<*mut libc::c_void, *mut u8>(page)),
_ => Err(ArrowError::MemoryError(
"Failed to allocate memory".to_string(),
)),
}
}
}

#[cfg(windows)]
pub fn free_aligned(p: *const u8) {
pub fn allocate_aligned(size: usize) -> *mut u8 {
unsafe {
_aligned_free(p);
let layout = Layout::from_size_align_unchecked(size, ALIGNMENT);
::std::alloc::alloc(layout)
}
}

#[cfg(not(windows))]
pub fn free_aligned(p: *const u8) {
pub fn free_aligned(p: *mut u8, size: usize) {
unsafe {
libc::free(mem::transmute::<*const u8, *mut libc::c_void>(p));
::std::alloc::dealloc(p, Layout::from_size_align_unchecked(size, ALIGNMENT));
}
}

pub fn reallocate(
old_size: usize,
new_size: usize,
pointer: *const u8,
) -> Result<*const u8> {
pub fn reallocate(ptr: *mut u8, old_size: usize, new_size: usize) -> *mut u8 {
unsafe {
let old_src = mem::transmute::<*const u8, *mut libc::c_void>(pointer);
let result = allocate_aligned(new_size)?;
let dst = mem::transmute::<*const u8, *mut libc::c_void>(result);
libc::memcpy(dst, old_src, cmp::min(old_size, new_size));
free_aligned(pointer);
Ok(result)
::std::alloc::realloc(
ptr,
Layout::from_size_align_unchecked(old_size, ALIGNMENT),
new_size,
)
}
}

pub unsafe fn memcpy(dst: *mut u8, src: *const u8, len: usize) {
let src = mem::transmute::<*const u8, *const libc::c_void>(src);
let dst = mem::transmute::<*mut u8, *mut libc::c_void>(dst);
libc::memcpy(dst, src, len);
::std::ptr::copy_nonoverlapping(src, dst, len)
}

extern "C" {
Expand All @@ -113,7 +68,7 @@ mod tests {
#[test]
fn test_allocate() {
for _ in 0..10 {
let p = allocate_aligned(1024).unwrap();
let p = allocate_aligned(1024);
// make sure this is 64-byte aligned
assert_eq!(0, (p as usize) % 64);
}
Expand All @@ -122,7 +77,7 @@ mod tests {
#[test]
fn test_is_aligned() {
// allocate memory aligned to 64-byte
let mut ptr = allocate_aligned(10).unwrap();
let mut ptr = allocate_aligned(10);
assert_eq!(true, is_aligned::<u8>(ptr, 1));
assert_eq!(true, is_aligned::<u8>(ptr, 2));
assert_eq!(true, is_aligned::<u8>(ptr, 4));
Expand Down