diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java index d36cb37fc2e..de181df247c 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationListener.java @@ -21,8 +21,8 @@ /** * An allocation listener being notified for allocation/deallocation *

- * It is expected to be called from multiple threads and as such, - * provider should take care of making the implementation thread-safe + * It might be called from multiple threads if the allocator hierarchy shares a listener, in which + * case, the provider should take care of making the implementation thread-safe. */ public interface AllocationListener { @@ -30,6 +30,11 @@ public interface AllocationListener { @Override public void onAllocation(long size) { } + + @Override + public boolean onFailedAllocation(long size, Accountant.AllocationOutcome outcome) { + return false; + } }; /** @@ -39,4 +44,15 @@ public void onAllocation(long size) { */ void onAllocation(long size); + /** + * Called whenever an allocation failed, giving the caller a chance to create some space in the allocator + * (either by freeing some resource, or by changing the limit), and, if successful, allowing the allocator + * to retry the allocation. + * + * @param size the buffer size that was being allocated + * @param outcome the outcome of the failed allocation. Carries information of what failed + * @return true, if the allocation can be retried; false if the allocation should fail + */ + boolean onFailedAllocation(long size, Accountant.AllocationOutcome outcome); + } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 4804b6b02a9..6acbc1e12f9 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -56,28 +56,21 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato private final HistoricalLog historicalLog; private volatile boolean isClosed = false; // the allocator has been closed + /** + * Initialize an allocator + * @param parentAllocator parent allocator. null if defining a root allocator + * @param listener listener callback. Must be non-null -- use {@link AllocationListener#NOOP} if no listener + * desired + * @param name name of this allocator + * @param initReservation initial reservation. Cannot be modified after construction + * @param maxAllocation limit. Allocations past the limit fail. Can be modified after construction + */ protected BaseAllocator( - final AllocationListener listener, - final String name, - final long initReservation, - final long maxAllocation) throws OutOfMemoryException { - this(listener, null, name, initReservation, maxAllocation); - } - - protected BaseAllocator( - final BaseAllocator parentAllocator, - final String name, - final long initReservation, - final long maxAllocation) throws OutOfMemoryException { - this(parentAllocator.listener, parentAllocator, name, initReservation, maxAllocation); - } - - private BaseAllocator( - final AllocationListener listener, - final BaseAllocator parentAllocator, - final String name, - final long initReservation, - final long maxAllocation) throws OutOfMemoryException { + final BaseAllocator parentAllocator, + final AllocationListener listener, + final String name, + final long initReservation, + final long maxAllocation) throws OutOfMemoryException { super(parentAllocator, initReservation, maxAllocation); this.listener = listener; @@ -276,7 +269,13 @@ public ArrowBuf buffer(final int initialRequestSize, BufferManager manager) { : initialRequestSize; AllocationOutcome outcome = this.allocateBytes(actualRequestSize); if (!outcome.isOk()) { - throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize)); + if (listener.onFailedAllocation(actualRequestSize, outcome)) { + // Second try, in case the listener can do something about it + outcome = this.allocateBytes(actualRequestSize); + } + if (!outcome.isOk()) { + throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize)); + } } boolean success = false; @@ -333,9 +332,18 @@ public BufferAllocator newChildAllocator( final String name, final long initReservation, final long maxAllocation) { + return newChildAllocator(name, this.listener, initReservation, maxAllocation); + } + + @Override + public BufferAllocator newChildAllocator( + final String name, + final AllocationListener listener, + final long initReservation, + final long maxAllocation) { assertOpen(); - final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation, + final ChildAllocator childAllocator = new ChildAllocator(listener, this, name, initReservation, maxAllocation); if (DEBUG) { diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java index b23a6e4bd85..94ea62e6aad 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java @@ -68,6 +68,17 @@ public interface BufferAllocator extends AutoCloseable { */ public BufferAllocator newChildAllocator(String name, long initReservation, long maxAllocation); + /** + * Create a new child allocator. + * + * @param name the name of the allocator. + * @param listener allocation listener for the newly created child + * @param initReservation the initial space reservation (obtained from this allocator) + * @param maxAllocation maximum amount of space the new allocator can allocate + * @return the new allocator, or null if it can't be created + */ + public BufferAllocator newChildAllocator(String name, AllocationListener listener, long initReservation, long maxAllocation); + /** * Close and release all buffers generated from this buffer pool. * diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java index f9a6dc72ece..03ec268d35c 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ChildAllocator.java @@ -32,6 +32,7 @@ class ChildAllocator extends BaseAllocator { /** * Constructor. * + * @param listener Allocation listener to be used in this child * @param parentAllocator parent allocator -- the one creating this child * @param name the name of this child allocator * @param initReservation initial amount of space to reserve (obtained from the parent) @@ -41,11 +42,12 @@ class ChildAllocator extends BaseAllocator { * allocation policy in force, even less memory may be available */ ChildAllocator( + AllocationListener listener, BaseAllocator parentAllocator, String name, long initReservation, long maxAllocation) { - super(parentAllocator, name, initReservation, maxAllocation); + super(parentAllocator, listener, name, initReservation, maxAllocation); } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java index 1dc6bf0c92f..161b81a58b5 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java @@ -31,7 +31,7 @@ public RootAllocator(final long limit) { } public RootAllocator(final AllocationListener listener, final long limit) { - super(listener, "ROOT", 0, limit); + super(null, listener, "ROOT", 0, limit); } /** diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 76f2c501cf4..62b0046b2ae 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -222,7 +222,122 @@ public void testRootAllocator_createChildDontClose() throws Exception { } } - private static void allocateAndFree(final BufferAllocator allocator) { + // Allocation listener + // It counts the number of times it has been invoked, and how much memory allocation it has seen + // When set to 'expand on fail', it attempts to expand the associated allocator's limit + private static final class TestAllocationListener implements AllocationListener { + private int numCalls; + private long totalMem; + private boolean expandOnFail; + BufferAllocator expandAlloc; + long expandLimit; + + TestAllocationListener() { + this.numCalls = 0; + this.totalMem = 0; + this.expandOnFail = false; + this.expandAlloc = null; + this.expandLimit = 0; + } + + @Override + public void onAllocation(long size) { + numCalls++; + totalMem += size; + } + + @Override + public boolean onFailedAllocation(long size, Accountant.AllocationOutcome outcome) { + if (expandOnFail) { + expandAlloc.setLimit(expandLimit); + return true; + } + return false; + } + + void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) { + this.expandOnFail = true; + this.expandAlloc = expandAlloc; + this.expandLimit = expandLimit; + } + + int getNumCalls() { + return numCalls; + } + + long getTotalMem() { + return totalMem; + } + } + + @Test + public void testRootAllocator_listeners() throws Exception { + TestAllocationListener l1 = new TestAllocationListener(); + assertEquals(0, l1.getNumCalls()); + assertEquals(0, l1.getTotalMem()); + TestAllocationListener l2 = new TestAllocationListener(); + assertEquals(0, l2.getNumCalls()); + assertEquals(0, l2.getTotalMem()); + // root and first-level child share the first listener + // second-level and third-level child share the second listener + try (final RootAllocator rootAllocator = new RootAllocator(l1, MAX_ALLOCATION)) { + try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", 0, MAX_ALLOCATION)) { + final ArrowBuf buf1 = c1.buffer(16); + assertNotNull("allocation failed", buf1); + assertEquals(1, l1.getNumCalls()); + assertEquals(16, l1.getTotalMem()); + buf1.release(); + try (final BufferAllocator c2 = c1.newChildAllocator("c2", l2, 0, MAX_ALLOCATION)) { + final ArrowBuf buf2 = c2.buffer(32); + assertNotNull("allocation failed", buf2); + assertEquals(1, l1.getNumCalls()); + assertEquals(16, l1.getTotalMem()); + assertEquals(1, l2.getNumCalls()); + assertEquals(32, l2.getTotalMem()); + buf2.release(); + try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0, MAX_ALLOCATION)) { + final ArrowBuf buf3 = c3.buffer(64); + assertNotNull("allocation failed", buf3); + assertEquals(1, l1.getNumCalls()); + assertEquals(16, l1.getTotalMem()); + assertEquals(2, l2.getNumCalls()); + assertEquals(32 + 64, l2.getTotalMem()); + buf3.release(); + } + } + } + } + } + + @Test + public void testRootAllocator_listenerAllocationFail() throws Exception { + TestAllocationListener l1 = new TestAllocationListener(); + assertEquals(0, l1.getNumCalls()); + assertEquals(0, l1.getTotalMem()); + // Test attempts to allocate too much from a child whose limit is set to half of the max allocation + // The listener's callback triggers, expanding the child allocator's limit, so then the allocation succeeds + try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) { + try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", l1,0, MAX_ALLOCATION / 2)) { + try { + c1.buffer(MAX_ALLOCATION); + fail("allocated memory beyond max allowed"); + } catch (OutOfMemoryException e) { + // expected + } + assertEquals(0, l1.getNumCalls()); + assertEquals(0, l1.getTotalMem()); + + l1.setExpandOnFail(c1, MAX_ALLOCATION); + ArrowBuf arrowBuf = c1.buffer(MAX_ALLOCATION); + assertNotNull("allocation failed", arrowBuf); + assertEquals(1, l1.getNumCalls()); + assertEquals(MAX_ALLOCATION, l1.getTotalMem()); + arrowBuf.release(); + } + } + } + + private static void allocateAndFree(final BufferAllocator allocator) { final ArrowBuf arrowBuf = allocator.buffer(512); assertNotNull("allocation failed", arrowBuf); arrowBuf.release();