Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions arrow-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ bench = false
[package.metadata.docs.rs]
all-features = true

[features]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to also document this feature flag on the crate page?

pool = []

[dependencies]
bytes = { version = "1.4" }
num = { version = "0.4", default-features = false, features = ["std"] }
Expand Down
14 changes: 14 additions & 0 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::BufferBuilder;
use crate::{bit_util, bytes::Bytes, native::ArrowNativeType};

#[cfg(feature = "pool")]
use crate::pool::MemoryPool;

use super::ops::bitwise_unary_op_helper;
use super::{MutableBuffer, ScalarBuffer};

Expand Down Expand Up @@ -414,6 +417,17 @@ impl Buffer {
pub fn ptr_eq(&self, other: &Self) -> bool {
self.ptr == other.ptr && self.length == other.length
}

/// Register this [`Buffer`] with the provided [`MemoryPool`]
///
/// This claims the memory used by this buffer in the pool, allowing for
/// accurate accounting of memory usage. Any prior reservation will be
/// released so this works well when the buffer is being shared among
/// multiple arrays.
#[cfg(feature = "pool")]
pub fn claim(&self, pool: &dyn MemoryPool) {
self.data.claim(pool)
}
}

/// Note that here we deliberately do not implement
Expand Down
175 changes: 172 additions & 3 deletions arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ use crate::{
util::bit_util,
};

#[cfg(feature = "pool")]
use crate::pool::{MemoryPool, MemoryReservation};
#[cfg(feature = "pool")]
use std::sync::Mutex;

use super::Buffer;

/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items.
Expand Down Expand Up @@ -57,6 +62,10 @@ pub struct MutableBuffer {
// invariant: len <= capacity
len: usize,
layout: Layout,

/// Memory reservation for tracking memory usage
#[cfg(feature = "pool")]
reservation: Mutex<Option<Box<dyn MemoryReservation>>>,
}

impl MutableBuffer {
Expand Down Expand Up @@ -91,6 +100,8 @@ impl MutableBuffer {
data,
len: 0,
layout,
#[cfg(feature = "pool")]
reservation: std::sync::Mutex::new(None),
}
}

Expand All @@ -115,7 +126,13 @@ impl MutableBuffer {
NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout))
}
};
Self { data, len, layout }
Self {
data,
len,
layout,
#[cfg(feature = "pool")]
reservation: std::sync::Mutex::new(None),
}
}

/// Allocates a new [MutableBuffer] from given `Bytes`.
Expand All @@ -127,9 +144,17 @@ impl MutableBuffer {

let len = bytes.len();
let data = bytes.ptr();
#[cfg(feature = "pool")]
let reservation = bytes.reservation.lock().unwrap().take();
mem::forget(bytes);

Ok(Self { data, len, layout })
Ok(Self {
data,
len,
layout,
#[cfg(feature = "pool")]
reservation: Mutex::new(reservation),
})
}

/// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits.
Expand Down Expand Up @@ -217,6 +242,12 @@ impl MutableBuffer {
};
self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout));
self.layout = new_layout;
#[cfg(feature = "pool")]
{
if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
reservation.resize(self.layout.size());
}
}
}

/// Truncates this buffer to `len` bytes
Expand All @@ -228,6 +259,12 @@ impl MutableBuffer {
return;
}
self.len = len;
#[cfg(feature = "pool")]
{
if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
reservation.resize(self.len);
}
}
}

/// Resizes the buffer, either truncating its contents (with no change in capacity), or
Expand All @@ -251,6 +288,12 @@ impl MutableBuffer {
}
// this truncates the buffer when new_len < self.len
self.len = new_len;
#[cfg(feature = "pool")]
{
if let Some(reservation) = self.reservation.lock().unwrap().as_mut() {
reservation.resize(self.len);
}
}
}

/// Shrinks the capacity of the buffer as much as possible.
Expand Down Expand Up @@ -328,6 +371,11 @@ impl MutableBuffer {
#[inline]
pub(super) fn into_buffer(self) -> Buffer {
let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) };
#[cfg(feature = "pool")]
{
let reservation = self.reservation.lock().unwrap().take();
*bytes.reservation.lock().unwrap() = reservation;
}
std::mem::forget(self);
Buffer::from(bytes)
}
Expand Down Expand Up @@ -466,6 +514,17 @@ impl MutableBuffer {
buffer.truncate(bit_util::ceil(len, 8));
buffer
}

/// Register this [`MutableBuffer`] with the provided [`MemoryPool`]
///
/// This claims the memory used by this buffer in the pool, allowing for
/// accurate accounting of memory usage. Any prior reservation will be
/// released so this works well when the buffer is being shared among
/// multiple arrays.
#[cfg(feature = "pool")]
pub fn claim(&self, pool: &dyn MemoryPool) {
*self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity()));
}
}

/// Creates a non-null pointer with alignment of [`ALIGNMENT`]
Expand Down Expand Up @@ -506,7 +565,13 @@ impl<T: ArrowNativeType> From<Vec<T>> for MutableBuffer {
// This is based on `RawVec::current_memory`
let layout = unsafe { Layout::array::<T>(value.capacity()).unwrap_unchecked() };
mem::forget(value);
Self { data, len, layout }
Self {
data,
len,
layout,
#[cfg(feature = "pool")]
reservation: std::sync::Mutex::new(None),
}
}
}

Expand Down Expand Up @@ -1013,4 +1078,108 @@ mod tests {
let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT);
let _ = MutableBuffer::with_capacity(max_capacity + 1);
}

#[cfg(feature = "pool")]
mod pool_tests {
use super::*;
use crate::pool::{MemoryPool, TrackingMemoryPool};

#[test]
fn test_reallocate_with_pool() {
let pool = TrackingMemoryPool::default();
let mut buffer = MutableBuffer::with_capacity(100);
buffer.claim(&pool);

// Initial capacity should be 128 (multiple of 64)
assert_eq!(buffer.capacity(), 128);
assert_eq!(pool.used(), 128);

// Reallocate to a larger size
buffer.reallocate(200);

// The capacity is exactly the requested size, not rounded up
assert_eq!(buffer.capacity(), 200);
assert_eq!(pool.used(), 200);

// Reallocate to a smaller size
buffer.reallocate(50);

// The capacity is exactly the requested size, not rounded up
assert_eq!(buffer.capacity(), 50);
assert_eq!(pool.used(), 50);
}

#[test]
fn test_truncate_with_pool() {
let pool = TrackingMemoryPool::default();
let mut buffer = MutableBuffer::with_capacity(100);

// Fill buffer with some data
buffer.resize(80, 1);
assert_eq!(buffer.len(), 80);

buffer.claim(&pool);
assert_eq!(pool.used(), 128);

// Truncate buffer
buffer.truncate(40);
assert_eq!(buffer.len(), 40);
assert_eq!(pool.used(), 40);

// Truncate to zero
buffer.truncate(0);
assert_eq!(buffer.len(), 0);
assert_eq!(pool.used(), 0);
}

#[test]
fn test_resize_with_pool() {
let pool = TrackingMemoryPool::default();
let mut buffer = MutableBuffer::with_capacity(100);
buffer.claim(&pool);

// Initial state
assert_eq!(buffer.len(), 0);
assert_eq!(pool.used(), 128);

// Resize to increase length
buffer.resize(50, 1);
assert_eq!(buffer.len(), 50);
assert_eq!(pool.used(), 50);

// Resize to increase length beyond capacity
buffer.resize(150, 1);
assert_eq!(buffer.len(), 150);
assert_eq!(buffer.capacity(), 256);
assert_eq!(pool.used(), 150);

// Resize to decrease length
buffer.resize(30, 1);
assert_eq!(buffer.len(), 30);
assert_eq!(pool.used(), 30);
}

#[test]
fn test_buffer_lifecycle_with_pool() {
let pool = TrackingMemoryPool::default();

// Create a buffer with memory reservation
let mut mutable = MutableBuffer::with_capacity(100);
mutable.resize(80, 1);
mutable.claim(&pool);

// Memory reservation is based on capacity when using claim()
assert_eq!(pool.used(), 128);

// Convert to immutable Buffer
let buffer = mutable.into_buffer();

// Memory reservation should be preserved
assert_eq!(pool.used(), 128);

// Drop the buffer and the reservation should be released
drop(buffer);
assert_eq!(pool.used(), 0);
}
}
}
Loading
Loading