diff --git a/arrow-buffer/Cargo.toml b/arrow-buffer/Cargo.toml index 5414ad43ae04..984e00cf9de9 100644 --- a/arrow-buffer/Cargo.toml +++ b/arrow-buffer/Cargo.toml @@ -35,6 +35,9 @@ bench = false [package.metadata.docs.rs] all-features = true +[features] +pool = [] + [dependencies] bytes = { version = "1.4" } num = { version = "0.4", default-features = false, features = ["std"] } diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index fd145ce2306e..7b79bc275823 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -25,6 +25,9 @@ use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk}; use crate::BufferBuilder; use crate::{bit_util, bytes::Bytes, native::ArrowNativeType}; +#[cfg(feature = "pool")] +use crate::pool::MemoryPool; + use super::ops::bitwise_unary_op_helper; use super::{MutableBuffer, ScalarBuffer}; @@ -414,6 +417,17 @@ impl Buffer { pub fn ptr_eq(&self, other: &Self) -> bool { self.ptr == other.ptr && self.length == other.length } + + /// Register this [`Buffer`] with the provided [`MemoryPool`] + /// + /// This claims the memory used by this buffer in the pool, allowing for + /// accurate accounting of memory usage. Any prior reservation will be + /// released so this works well when the buffer is being shared among + /// multiple arrays. + #[cfg(feature = "pool")] + pub fn claim(&self, pool: &dyn MemoryPool) { + self.data.claim(pool) + } } /// Note that here we deliberately do not implement diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 19ca0fef1519..63fdbf598bdb 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -26,6 +26,11 @@ use crate::{ util::bit_util, }; +#[cfg(feature = "pool")] +use crate::pool::{MemoryPool, MemoryReservation}; +#[cfg(feature = "pool")] +use std::sync::Mutex; + use super::Buffer; /// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items. @@ -57,6 +62,10 @@ pub struct MutableBuffer { // invariant: len <= capacity len: usize, layout: Layout, + + /// Memory reservation for tracking memory usage + #[cfg(feature = "pool")] + reservation: Mutex>>, } impl MutableBuffer { @@ -91,6 +100,8 @@ impl MutableBuffer { data, len: 0, layout, + #[cfg(feature = "pool")] + reservation: std::sync::Mutex::new(None), } } @@ -115,7 +126,13 @@ impl MutableBuffer { NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) } }; - Self { data, len, layout } + Self { + data, + len, + layout, + #[cfg(feature = "pool")] + reservation: std::sync::Mutex::new(None), + } } /// Allocates a new [MutableBuffer] from given `Bytes`. @@ -127,9 +144,17 @@ impl MutableBuffer { let len = bytes.len(); let data = bytes.ptr(); + #[cfg(feature = "pool")] + let reservation = bytes.reservation.lock().unwrap().take(); mem::forget(bytes); - Ok(Self { data, len, layout }) + Ok(Self { + data, + len, + layout, + #[cfg(feature = "pool")] + reservation: Mutex::new(reservation), + }) } /// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits. @@ -217,6 +242,12 @@ impl MutableBuffer { }; self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout)); self.layout = new_layout; + #[cfg(feature = "pool")] + { + if let Some(reservation) = self.reservation.lock().unwrap().as_mut() { + reservation.resize(self.layout.size()); + } + } } /// Truncates this buffer to `len` bytes @@ -228,6 +259,12 @@ impl MutableBuffer { return; } self.len = len; + #[cfg(feature = "pool")] + { + if let Some(reservation) = self.reservation.lock().unwrap().as_mut() { + reservation.resize(self.len); + } + } } /// Resizes the buffer, either truncating its contents (with no change in capacity), or @@ -251,6 +288,12 @@ impl MutableBuffer { } // this truncates the buffer when new_len < self.len self.len = new_len; + #[cfg(feature = "pool")] + { + if let Some(reservation) = self.reservation.lock().unwrap().as_mut() { + reservation.resize(self.len); + } + } } /// Shrinks the capacity of the buffer as much as possible. @@ -328,6 +371,11 @@ impl MutableBuffer { #[inline] pub(super) fn into_buffer(self) -> Buffer { let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) }; + #[cfg(feature = "pool")] + { + let reservation = self.reservation.lock().unwrap().take(); + *bytes.reservation.lock().unwrap() = reservation; + } std::mem::forget(self); Buffer::from(bytes) } @@ -466,6 +514,17 @@ impl MutableBuffer { buffer.truncate(bit_util::ceil(len, 8)); buffer } + + /// Register this [`MutableBuffer`] with the provided [`MemoryPool`] + /// + /// This claims the memory used by this buffer in the pool, allowing for + /// accurate accounting of memory usage. Any prior reservation will be + /// released so this works well when the buffer is being shared among + /// multiple arrays. + #[cfg(feature = "pool")] + pub fn claim(&self, pool: &dyn MemoryPool) { + *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity())); + } } /// Creates a non-null pointer with alignment of [`ALIGNMENT`] @@ -506,7 +565,13 @@ impl From> for MutableBuffer { // This is based on `RawVec::current_memory` let layout = unsafe { Layout::array::(value.capacity()).unwrap_unchecked() }; mem::forget(value); - Self { data, len, layout } + Self { + data, + len, + layout, + #[cfg(feature = "pool")] + reservation: std::sync::Mutex::new(None), + } } } @@ -1013,4 +1078,108 @@ mod tests { let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT); let _ = MutableBuffer::with_capacity(max_capacity + 1); } + + #[cfg(feature = "pool")] + mod pool_tests { + use super::*; + use crate::pool::{MemoryPool, TrackingMemoryPool}; + + #[test] + fn test_reallocate_with_pool() { + let pool = TrackingMemoryPool::default(); + let mut buffer = MutableBuffer::with_capacity(100); + buffer.claim(&pool); + + // Initial capacity should be 128 (multiple of 64) + assert_eq!(buffer.capacity(), 128); + assert_eq!(pool.used(), 128); + + // Reallocate to a larger size + buffer.reallocate(200); + + // The capacity is exactly the requested size, not rounded up + assert_eq!(buffer.capacity(), 200); + assert_eq!(pool.used(), 200); + + // Reallocate to a smaller size + buffer.reallocate(50); + + // The capacity is exactly the requested size, not rounded up + assert_eq!(buffer.capacity(), 50); + assert_eq!(pool.used(), 50); + } + + #[test] + fn test_truncate_with_pool() { + let pool = TrackingMemoryPool::default(); + let mut buffer = MutableBuffer::with_capacity(100); + + // Fill buffer with some data + buffer.resize(80, 1); + assert_eq!(buffer.len(), 80); + + buffer.claim(&pool); + assert_eq!(pool.used(), 128); + + // Truncate buffer + buffer.truncate(40); + assert_eq!(buffer.len(), 40); + assert_eq!(pool.used(), 40); + + // Truncate to zero + buffer.truncate(0); + assert_eq!(buffer.len(), 0); + assert_eq!(pool.used(), 0); + } + + #[test] + fn test_resize_with_pool() { + let pool = TrackingMemoryPool::default(); + let mut buffer = MutableBuffer::with_capacity(100); + buffer.claim(&pool); + + // Initial state + assert_eq!(buffer.len(), 0); + assert_eq!(pool.used(), 128); + + // Resize to increase length + buffer.resize(50, 1); + assert_eq!(buffer.len(), 50); + assert_eq!(pool.used(), 50); + + // Resize to increase length beyond capacity + buffer.resize(150, 1); + assert_eq!(buffer.len(), 150); + assert_eq!(buffer.capacity(), 256); + assert_eq!(pool.used(), 150); + + // Resize to decrease length + buffer.resize(30, 1); + assert_eq!(buffer.len(), 30); + assert_eq!(pool.used(), 30); + } + + #[test] + fn test_buffer_lifecycle_with_pool() { + let pool = TrackingMemoryPool::default(); + + // Create a buffer with memory reservation + let mut mutable = MutableBuffer::with_capacity(100); + mutable.resize(80, 1); + mutable.claim(&pool); + + // Memory reservation is based on capacity when using claim() + assert_eq!(pool.used(), 128); + + // Convert to immutable Buffer + let buffer = mutable.into_buffer(); + + // Memory reservation should be preserved + assert_eq!(pool.used(), 128); + + // Drop the buffer and the reservation should be released + drop(buffer); + assert_eq!(pool.used(), 0); + } + } } diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs index b811bd2c6b40..8f912b807da5 100644 --- a/arrow-buffer/src/bytes.rs +++ b/arrow-buffer/src/bytes.rs @@ -26,6 +26,11 @@ use std::{fmt::Debug, fmt::Formatter}; use crate::alloc::Deallocation; use crate::buffer::dangling_ptr; +#[cfg(feature = "pool")] +use crate::pool::{MemoryPool, MemoryReservation}; +#[cfg(feature = "pool")] +use std::sync::Mutex; + /// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself. /// /// Note that this structure is an internal implementation detail of the @@ -49,6 +54,10 @@ pub struct Bytes { /// how to deallocate this region deallocation: Deallocation, + + /// Memory reservation for tracking memory usage + #[cfg(feature = "pool")] + pub(super) reservation: Mutex>>, } impl Bytes { @@ -70,6 +79,8 @@ impl Bytes { ptr, len, deallocation, + #[cfg(feature = "pool")] + reservation: Mutex::new(None), } } @@ -101,6 +112,27 @@ impl Bytes { } } + /// Register this [`Bytes`] with the provided [`MemoryPool`], replacing any prior reservation. + #[cfg(feature = "pool")] + pub fn claim(&self, pool: &dyn MemoryPool) { + *self.reservation.lock().unwrap() = Some(pool.reserve(self.capacity())); + } + + /// Resize the memory reservation of this buffer + /// + /// This is a no-op if this buffer doesn't have a reservation. + #[cfg(feature = "pool")] + fn resize_reservation(&self, new_size: usize) { + let mut guard = self.reservation.lock().unwrap(); + if let Some(mut reservation) = guard.take() { + // Resize the reservation + reservation.resize(new_size); + + // Put it back + *guard = Some(reservation); + } + } + /// Try to reallocate the underlying memory region to a new size (smaller or larger). /// /// Only works for bytes allocated with the standard allocator. @@ -135,6 +167,13 @@ impl Bytes { self.ptr = ptr; self.len = new_len; self.deallocation = Deallocation::Standard(new_layout); + + #[cfg(feature = "pool")] + { + // Resize reservation + self.resize_reservation(new_len); + } + return Ok(()); } } @@ -199,6 +238,8 @@ impl From for Bytes { len, ptr: NonNull::new(value.as_ptr() as _).unwrap(), deallocation: Deallocation::Custom(std::sync::Arc::new(value), len), + #[cfg(feature = "pool")] + reservation: Mutex::new(None), } } } @@ -209,14 +250,83 @@ mod tests { #[test] fn test_from_bytes() { - let bytes = bytes::Bytes::from(vec![1, 2, 3, 4]); - let arrow_bytes: Bytes = bytes.clone().into(); + let message = b"hello arrow"; - assert_eq!(bytes.as_ptr(), arrow_bytes.as_ptr()); + // we can create a Bytes from bytes::Bytes (created from slices) + let c_bytes: bytes::Bytes = message.as_ref().into(); + let a_bytes: Bytes = c_bytes.into(); + assert_eq!(a_bytes.as_slice(), message); - drop(bytes); - drop(arrow_bytes); + // we can create a Bytes from bytes::Bytes (created from Vec) + let c_bytes: bytes::Bytes = bytes::Bytes::from(message.to_vec()); + let a_bytes: Bytes = c_bytes.into(); + assert_eq!(a_bytes.as_slice(), message); + } + + #[cfg(feature = "pool")] + mod pool_tests { + use super::*; + + use crate::pool::TrackingMemoryPool; + + #[test] + fn test_bytes_with_pool() { + // Create a standard allocation + let buffer = unsafe { + let layout = + std::alloc::Layout::from_size_align(1024, crate::alloc::ALIGNMENT).unwrap(); + let ptr = std::alloc::alloc(layout); + assert!(!ptr.is_null()); + + Bytes::new( + NonNull::new(ptr).unwrap(), + 1024, + Deallocation::Standard(layout), + ) + }; + + // Create a memory pool + let pool = TrackingMemoryPool::default(); + assert_eq!(pool.used(), 0); + + // Reserve memory and assign to buffer. Claim twice. + buffer.claim(&pool); + assert_eq!(pool.used(), 1024); + buffer.claim(&pool); + assert_eq!(pool.used(), 1024); + + // Memory should be released when buffer is dropped + drop(buffer); + assert_eq!(pool.used(), 0); + } + + #[test] + fn test_bytes_drop_releases_pool() { + let pool = TrackingMemoryPool::default(); + + { + // Create a buffer with pool + let _buffer = unsafe { + let layout = + std::alloc::Layout::from_size_align(1024, crate::alloc::ALIGNMENT).unwrap(); + let ptr = std::alloc::alloc(layout); + assert!(!ptr.is_null()); + + let bytes = Bytes::new( + NonNull::new(ptr).unwrap(), + 1024, + Deallocation::Standard(layout), + ); + + bytes.claim(&pool); + bytes + }; - let _ = Bytes::from(bytes::Bytes::new()); + assert_eq!(pool.used(), 1024); + } + + // Buffer has been dropped, memory should be released + assert_eq!(pool.used(), 0); + } } } diff --git a/arrow-buffer/src/lib.rs b/arrow-buffer/src/lib.rs index 174cdc4d9c18..1090146f3636 100644 --- a/arrow-buffer/src/lib.rs +++ b/arrow-buffer/src/lib.rs @@ -48,3 +48,8 @@ mod interval; pub use interval::*; mod arith; + +#[cfg(feature = "pool")] +mod pool; +#[cfg(feature = "pool")] +pub use pool::*; diff --git a/arrow-buffer/src/pool.rs b/arrow-buffer/src/pool.rs new file mode 100644 index 000000000000..bf22d433d615 --- /dev/null +++ b/arrow-buffer/src/pool.rs @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains traits for memory pool traits and an implementation +//! for tracking memory usage. +//! +//! The basic traits are [`MemoryPool`] and [`MemoryReservation`]. And default +//! implementation of [`MemoryPool`] is [`TrackingMemoryPool`]. Their relationship +//! is as follows: +//! +//! ```text +//! (pool tracker) (resizable) +//! ┌──────────────────┐ fn reserve() ┌─────────────────────────┐ +//! │ trait MemoryPool │─────────────►│ trait MemoryReservation │ +//! └──────────────────┘ └─────────────────────────┘ +//! ``` + +use std::fmt::Debug; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +/// A memory reservation within a [`MemoryPool`] that is freed on drop +pub trait MemoryReservation: Debug + Send + Sync { + /// Returns the size of this reservation in bytes. + fn size(&self) -> usize; + + /// Resize this reservation to a new size in bytes. + fn resize(&mut self, new_size: usize); +} + +/// A pool of memory that can be reserved and released. +/// +/// This is used to accurately track memory usage when buffers are shared +/// between multiple arrays or other data structures. +/// +/// For example, assume we have two arrays that share underlying buffer. +/// It's hard to tell how much memory is used by them because we can't +/// tell if the buffer is shared or not. +/// +/// ```text +/// Array A Array B +/// ┌────────────┐ ┌────────────┐ +/// │ slices... │ │ slices... │ +/// │────────────│ │────────────│ +/// │ Arc │ │ Arc │ (shared buffer) +/// └─────▲──────┘ └───────▲────┘ +/// │ │ +/// │ Bytes │ +/// │ ┌─────────────┐ │ +/// │ │ data... │ │ +/// │ │─────────────│ │ +/// └──│ Memory │──┘ (tracked with a memory pool) +/// │ Reservation │ +/// └─────────────┘ +/// ``` +/// +/// With a memory pool, we can count the memory usage by the shared buffer +/// directly. +pub trait MemoryPool: Debug + Send + Sync { + /// Reserves memory from the pool. Infallible. + /// + /// Returns a reservation of the requested size. + fn reserve(&self, size: usize) -> Box; + + /// Returns the current available memory in the pool. + /// + /// The pool may be overfilled, so this method might return a negative value. + fn available(&self) -> isize; + + /// Returns the current used memory from the pool. + fn used(&self) -> usize; + + /// Returns the maximum memory that can be reserved from the pool. + fn capacity(&self) -> usize; +} + +/// A simple [`MemoryPool`] that reports the total memory usage +#[derive(Debug, Default)] +pub struct TrackingMemoryPool(Arc); + +impl TrackingMemoryPool { + /// Returns the total allocated size + pub fn allocated(&self) -> usize { + self.0.load(Ordering::Relaxed) + } +} + +impl MemoryPool for TrackingMemoryPool { + fn reserve(&self, size: usize) -> Box { + self.0.fetch_add(size, Ordering::Relaxed); + Box::new(Tracker { + size, + shared: Arc::clone(&self.0), + }) + } + + fn available(&self) -> isize { + isize::MAX - self.used() as isize + } + + fn used(&self) -> usize { + self.0.load(Ordering::Relaxed) + } + + fn capacity(&self) -> usize { + usize::MAX + } +} + +#[derive(Debug)] +struct Tracker { + size: usize, + shared: Arc, +} + +impl Drop for Tracker { + fn drop(&mut self) { + self.shared.fetch_sub(self.size, Ordering::Relaxed); + } +} + +impl MemoryReservation for Tracker { + fn size(&self) -> usize { + self.size + } + + fn resize(&mut self, new: usize) { + match self.size < new { + true => self.shared.fetch_add(new - self.size, Ordering::Relaxed), + false => self.shared.fetch_sub(self.size - new, Ordering::Relaxed), + }; + self.size = new; + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_tracking_memory_pool() { + let pool = TrackingMemoryPool::default(); + + // Reserve 512 bytes + let reservation = pool.reserve(512); + assert_eq!(reservation.size(), 512); + assert_eq!(pool.used(), 512); + assert_eq!(pool.available(), isize::MAX - 512); + + // Reserve another 256 bytes + let reservation2 = pool.reserve(256); + assert_eq!(reservation2.size(), 256); + assert_eq!(pool.used(), 768); + assert_eq!(pool.available(), isize::MAX - 768); + + // Test resize to increase + let mut reservation_mut = reservation; + reservation_mut.resize(600); + assert_eq!(reservation_mut.size(), 600); + assert_eq!(pool.used(), 856); // 600 + 256 + + // Test resize to decrease + reservation_mut.resize(400); + assert_eq!(reservation_mut.size(), 400); + assert_eq!(pool.used(), 656); // 400 + 256 + + // Drop the first reservation + drop(reservation_mut); + assert_eq!(pool.used(), 256); + + // Drop the second reservation + drop(reservation2); + assert_eq!(pool.used(), 0); + } +}