diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java index 7c8066c0324..032107d85bf 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java @@ -57,6 +57,7 @@ import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; /** * The in-memory representation of FlightData used to manage a stream of Arrow messages. @@ -333,14 +334,14 @@ private InputStream asInputStream(BufferAllocator allocator) { if (appMetadata != null && appMetadata.capacity() > 0) { // Must call slice() as CodedOutputStream#writeByteBuffer writes -capacity- bytes, not -limit- bytes - cos.writeByteBuffer(FlightData.APP_METADATA_FIELD_NUMBER, appMetadata.asNettyBuffer().nioBuffer().slice()); + cos.writeByteBuffer(FlightData.APP_METADATA_FIELD_NUMBER, appMetadata.nioBuffer().slice()); } cos.writeTag(FlightData.DATA_BODY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED); int size = 0; List allBufs = new ArrayList<>(); for (ArrowBuf b : bufs) { - allBufs.add(b.asNettyBuffer()); + allBufs.add(Unpooled.wrappedBuffer(b.nioBuffer()).retain()); size += b.readableBytes(); // [ARROW-4213] These buffers must be aligned to an 8-byte boundary in order to be readable from C++. if (b.readableBytes() % 8 != 0) { @@ -349,19 +350,19 @@ private InputStream asInputStream(BufferAllocator allocator) { size += paddingBytes; allBufs.add(PADDING_BUFFERS.get(paddingBytes).retain()); } - // gRPC/Netty will decrement the reference count (via the ByteBufInputStream below) when written, so increment - // the reference count - b.getReferenceManager().retain(); } // rawvarint is used for length definition. cos.writeUInt32NoTag(size); cos.flush(); - ArrowBuf initialBuf = allocator.buffer(baos.size()); + ByteBuf initialBuf = Unpooled.buffer(baos.size()); initialBuf.writeBytes(baos.toByteArray()); - final CompositeByteBuf bb = new CompositeByteBuf(allocator.getAsByteBufAllocator(), true, + final CompositeByteBuf bb = new CompositeByteBuf(UnpooledByteBufAllocator.DEFAULT, true, Math.max(2, bufs.size() + 1), - ImmutableList.builder().add(initialBuf.asNettyBuffer()).addAll(allBufs).build()); + ImmutableList.builder() + .add(initialBuf) + .addAll(allBufs) + .build()); final ByteBufInputStream is = new DrainableByteBufInputStream(bb); return is; } catch (Exception ex) { diff --git a/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java b/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java index c2f3f68b0c0..09b7300444c 100644 --- a/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ExpandableByteBuf.java @@ -41,7 +41,7 @@ public ByteBuf copy(int index, int length) { @Override public ByteBuf capacity(int newCapacity) { if (newCapacity > capacity()) { - ByteBuf newBuf = allocator.buffer(newCapacity).asNettyBuffer(); + ByteBuf newBuf = NettyArrowBuf.unwrapBuffer(allocator.buffer(newCapacity)); newBuf.writeBytes(buffer, 0, buffer.capacity()); newBuf.readerIndex(buffer.readerIndex()); newBuf.writerIndex(buffer.writerIndex()); diff --git a/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java index 39450f1346e..8681b005fcf 100644 --- a/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/NettyArrowBuf.java @@ -31,6 +31,7 @@ import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.memory.ArrowByteBufAllocator; import org.apache.arrow.memory.BoundsChecking; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.Preconditions; import io.netty.util.internal.PlatformDependent; @@ -48,17 +49,17 @@ public class NettyArrowBuf extends AbstractByteBuf implements AutoCloseable { /** * Constructs a new instance. * - * @param arrowBuf The buffer to wrap. - * @param arrowByteBufAllocator The allocator for the buffer (assumed to be {@link ArrowByteBufAllocator}). - * @param length The length of this buffer. + * @param arrowBuf The buffer to wrap. + * @param bufferAllocator The allocator for the buffer. + * @param length The length of this buffer. */ public NettyArrowBuf( final ArrowBuf arrowBuf, - final ByteBufAllocator arrowByteBufAllocator, + final BufferAllocator bufferAllocator, final int length) { super(length); this.arrowBuf = arrowBuf; - this.arrowByteBufAllocator = (ArrowByteBufAllocator) arrowByteBufAllocator; + this.arrowByteBufAllocator = new ArrowByteBufAllocator(bufferAllocator); this.length = length; this.address = arrowBuf.memoryAddress(); } @@ -164,12 +165,12 @@ public int capacity() { @Override public NettyArrowBuf slice() { - return arrowBuf.slice(readerIndex, writerIndex - readerIndex).asNettyBuffer(); + return unwrapBuffer(arrowBuf.slice(readerIndex, writerIndex - readerIndex)); } @Override public NettyArrowBuf slice(int index, int length) { - return arrowBuf.slice(index, length).asNettyBuffer(); + return unwrapBuffer(arrowBuf.slice(index, length)); } @Override @@ -252,6 +253,7 @@ public ByteBuffer nioBuffer(long index, int length) { /** * Get this ArrowBuf as a direct {@link ByteBuffer}. + * * @return ByteBuffer */ private ByteBuffer getDirectBuffer(long index) { @@ -284,8 +286,9 @@ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { /** * Determine if the requested {@code index} and {@code length} will fit within {@code capacity}. - * @param index The starting index. - * @param length The length which will be utilized (starting from {@code index}). + * + * @param index The starting index. + * @param length The length which will be utilized (starting from {@code index}). * @param capacity The capacity that {@code index + length} is allowed to be within. * @return {@code true} if the requested {@code index} and {@code length} will fit within {@code capacity}. * {@code false} if this would result in an index out of bounds exception. @@ -368,7 +371,7 @@ public int getBytes(int index, FileChannel out, long position, int length) throw if (length == 0) { return 0; } else { - final ByteBuffer tmpBuf = getDirectBuffer(index ); + final ByteBuffer tmpBuf = getDirectBuffer(index); tmpBuf.clear().limit(length); return out.write(tmpBuf, position); } @@ -404,7 +407,7 @@ protected int _getUnsignedMediumLE(int index) { this.chk(index, 3); long addr = this.addr(index); return PlatformDependent.getByte(addr) & 255 | - (Short.reverseBytes(PlatformDependent.getShort(addr + 1L)) & '\uffff') << 8; + (Short.reverseBytes(PlatformDependent.getShort(addr + 1L)) & '\uffff') << 8; } @@ -516,7 +519,8 @@ private long addr(long index) { /** * Helper function to do bounds checking at a particular * index for particular length of data. - * @param index index (0 based relative to this ArrowBuf) + * + * @param index index (0 based relative to this ArrowBuf) * @param fieldLength provided length of data for get/set */ private void chk(long index, long fieldLength) { @@ -529,7 +533,7 @@ private void chk(long index, long fieldLength) { } if (index < 0 || index > capacity() - fieldLength) { throw new IndexOutOfBoundsException(String.format( - "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity())); + "index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity())); } } } @@ -601,4 +605,18 @@ public NettyArrowBuf setLong(int index, long value) { arrowBuf.setLong(index, value); return this; } + + /** + * unwrap arrow buffer into a netty buffer. + */ + public static NettyArrowBuf unwrapBuffer(ArrowBuf buf) { + final NettyArrowBuf nettyArrowBuf = new NettyArrowBuf( + buf, + buf.getReferenceManager().getAllocator(), + checkedCastToInt(buf.capacity())); + nettyArrowBuf.readerIndex(checkedCastToInt(buf.readerIndex())); + nettyArrowBuf.writerIndex(checkedCastToInt(buf.writerIndex())); + return nettyArrowBuf; + } + } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index 179c2b7409e..c61d041097e 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -210,5 +210,7 @@ public interface Factory { * @return The created AllocationManager used by this allocator */ AllocationManager create(BaseAllocator accountingAllocator, long size); + + ArrowBuf empty(); } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java b/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java index 3a78c6de672..92a14c25702 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java @@ -33,10 +33,6 @@ import org.apache.arrow.memory.util.MemoryUtil; import org.apache.arrow.util.Preconditions; -import io.netty.buffer.NettyArrowBuf; -import io.netty.buffer.PooledByteBufAllocatorL; -import io.netty.util.internal.PlatformDependent; - /** * ArrowBuf serves as a facade over underlying memory by providing * several access APIs to read/write data into a chunk of direct @@ -130,22 +126,6 @@ private void ensureAccessible() { } } - /** - * Get a wrapper buffer to comply with Netty interfaces and - * can be used in RPC/RPC allocator code. - * @return netty compliant {@link NettyArrowBuf} - */ - public NettyArrowBuf asNettyBuffer() { - - final NettyArrowBuf nettyArrowBuf = new NettyArrowBuf( - this, - referenceManager.getAllocator().getAsByteBufAllocator(), - checkedCastToInt(length)); - nettyArrowBuf.readerIndex(checkedCastToInt(readerIndex)); - nettyArrowBuf.writerIndex(checkedCastToInt(writerIndex)); - return nettyArrowBuf; - } - /** * Get reference manager for this ArrowBuf. * @return user provided implementation of {@link ReferenceManager} @@ -227,13 +207,25 @@ public ArrowBuf slice(long index, long length) { return newBuf; } + /** + * Make a nio byte buffer from this arrowbuf. + */ public ByteBuffer nioBuffer() { - return asNettyBuffer().nioBuffer(); + return nioBuffer(readerIndex, checkedCastToInt(readableBytes())); } + + /** + * Make a nio byte buffer from this ArrowBuf. + */ public ByteBuffer nioBuffer(long index, int length) { - return length == 0 ? ByteBuffer.allocateDirect(0) : - PlatformDependent.directBuffer(memoryAddress() + index, length); + chk(index, length); + return getDirectBuffer(index, length); + } + + private ByteBuffer getDirectBuffer(long index, int length) { + long address = addr(index); + return MemoryUtil.directBuffer(address, length); } public long memoryAddress() { @@ -1244,10 +1236,4 @@ public ArrowBuf setIndex(int readerIndex, int writerIndex) { } } - /** - * Create an empty ArrowBuf with length. - */ - public static ArrowBuf empty(long length) { - return new ArrowBuf(ReferenceManager.NO_OP, null, length, new PooledByteBufAllocatorL().empty.memoryAddress()); - } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java index 96fe7eb1c51..ff40b49ff6f 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.ExpandableByteBuf; +import io.netty.buffer.NettyArrowBuf; /** * An implementation of ByteBufAllocator that wraps a Arrow BufferAllocator. This allows the RPC @@ -56,7 +57,7 @@ public ByteBuf buffer() { @Override public ByteBuf buffer(int initialCapacity) { - return new ExpandableByteBuf(allocator.buffer(initialCapacity).asNettyBuffer(), allocator); + return new ExpandableByteBuf(NettyArrowBuf.unwrapBuffer(allocator.buffer(initialCapacity)), allocator); } @Override @@ -86,7 +87,7 @@ public ByteBuf directBuffer() { @Override public ByteBuf directBuffer(int initialCapacity) { - return allocator.buffer(initialCapacity).asNettyBuffer(); + return NettyArrowBuf.unwrapBuffer(allocator.buffer(initialCapacity)); } @Override diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java index 62fd98c0f7e..d7c225ce5c2 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BaseAllocator.java @@ -63,7 +63,6 @@ abstract class BaseAllocator extends Accountant implements BufferAllocator { private final Object DEBUG_LOCK = DEBUG ? new Object() : null; final AllocationListener listener; private final BaseAllocator parentAllocator; - private final ArrowByteBufAllocator thisAsByteBufAllocator; private final Map childAllocators; private final ArrowBuf empty; // members used purely for debugging @@ -107,7 +106,6 @@ protected BaseAllocator( this.parentAllocator = parentAllocator; this.name = name; - this.thisAsByteBufAllocator = new ArrowByteBufAllocator(this); this.childAllocators = Collections.synchronizedMap(new IdentityHashMap<>()); if (DEBUG) { @@ -239,7 +237,7 @@ public ArrowBuf buffer(final long initialRequestSize) { } private ArrowBuf createEmpty() { - return new ArrowBuf(ReferenceManager.NO_OP, null, 0, NettyAllocationManager.EMPTY.memoryAddress()); + return allocationManagerFactory.empty(); } @Override @@ -249,7 +247,7 @@ public ArrowBuf buffer(final long initialRequestSize, BufferManager manager) { Preconditions.checkArgument(initialRequestSize >= 0, "the requested size must be non-negative"); if (initialRequestSize == 0) { - return empty; + return getEmpty(); } // round the request size according to the rounding policy @@ -313,11 +311,6 @@ private AllocationManager newAllocationManager(BaseAllocator accountingAllocator return allocationManagerFactory.create(accountingAllocator, size); } - @Override - public ArrowByteBufAllocator getAsByteBufAllocator() { - return thisAsByteBufAllocator; - } - @Override public BufferAllocator newChildAllocator( final String name, @@ -756,7 +749,7 @@ long getMaxAllocation() { */ @Value.Default RoundingPolicy getRoundingPolicy() { - return DefaultRoundingPolicy.INSTANCE; + return DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY; } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java index d7beb51242a..aa1f856c591 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferAllocator.java @@ -19,8 +19,6 @@ import java.util.Collection; -import io.netty.buffer.ByteBufAllocator; - /** * Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods. */ @@ -51,16 +49,6 @@ public interface BufferAllocator extends AutoCloseable { */ ArrowBuf buffer(long size, BufferManager manager); - /** - * Returns the allocator this allocator falls back to when it needs more memory. - * - * @return the underlying allocator used by this allocator - * - * @deprecated This method may be removed in a future release. - */ - @Deprecated - ByteBufAllocator getAsByteBufAllocator(); - /** * Create a new child allocator. * diff --git a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java index c651c9daaa7..9fa4de71d8d 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/BufferLedger.java @@ -25,8 +25,6 @@ import org.apache.arrow.memory.util.HistoricalLog; import org.apache.arrow.util.Preconditions; -import io.netty.buffer.UnsafeDirectLittleEndian; - /** * The reference manager that binds an {@link AllocationManager} to * {@link BufferAllocator} and a set of {@link ArrowBuf}. The set of @@ -515,16 +513,6 @@ void print(StringBuilder sb, int indent, BaseAllocator.Verbosity verbosity) { } } - /** - * Returns the underlying {@link UnsafeDirectLittleEndian} instance used by this BufferLedger. - * - * @deprecated This method may be removed in a future release. - */ - @Deprecated - public UnsafeDirectLittleEndian getUnderlying() { - return unwrap(UnsafeDirectLittleEndian.class); - } - /** * Get the {@link AllocationManager} used by this BufferLedger. * @@ -534,18 +522,4 @@ public AllocationManager getAllocationManager() { return allocationManager; } - /** - * Return the {@link AllocationManager} used or underlying {@link UnsafeDirectLittleEndian} instance - * (in the case of we use a {@link NettyAllocationManager}), and cast to desired class. - * - * @param clazz The desired class to cast into - * @return The AllocationManager used by this BufferLedger, or the underlying UnsafeDirectLittleEndian object. - */ - public T unwrap(Class clazz) { - if (clazz.isInstance(allocationManager)) { - return clazz.cast(allocationManager); - } - - throw new IllegalArgumentException("Unexpected unwrapping class: " + clazz); - } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java b/java/memory/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java index a7fe7958428..052fa24bed6 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/DefaultAllocationManagerFactory.java @@ -32,4 +32,9 @@ public class DefaultAllocationManagerFactory implements AllocationManager.Factor public AllocationManager create(BaseAllocator accountingAllocator, long size) { return new NettyAllocationManager(accountingAllocator, size); } + + @Override + public ArrowBuf empty() { + return NettyAllocationManager.EMPTY_BUFFER; + } } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java index 0f713a8a2e4..45bd5d91347 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/NettyAllocationManager.java @@ -27,7 +27,18 @@ */ public class NettyAllocationManager extends AllocationManager { - public static final AllocationManager.Factory FACTORY = NettyAllocationManager::new; + public static final AllocationManager.Factory FACTORY = new AllocationManager.Factory() { + + @Override + public AllocationManager create(BaseAllocator accountingAllocator, long size) { + return new NettyAllocationManager(accountingAllocator, size); + } + + @Override + public ArrowBuf empty() { + return EMPTY_BUFFER; + } + }; /** * The default cut-off value for switching allocation strategies. @@ -39,6 +50,10 @@ public class NettyAllocationManager extends AllocationManager { private static final PooledByteBufAllocatorL INNER_ALLOCATOR = new PooledByteBufAllocatorL(); static final UnsafeDirectLittleEndian EMPTY = INNER_ALLOCATOR.empty; + static final ArrowBuf EMPTY_BUFFER = new ArrowBuf(ReferenceManager.NO_OP, + null, + 0, + NettyAllocationManager.EMPTY.memoryAddress()); static final long CHUNK_SIZE = INNER_ALLOCATOR.getChunkSize(); private final long allocatedSize; diff --git a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java index 5eb56d6b05d..89889118c05 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/RootAllocator.java @@ -37,7 +37,8 @@ public RootAllocator(final long limit) { } public RootAllocator(final AllocationListener listener, final long limit) { - this(listener, limit, DefaultRoundingPolicy.INSTANCE); + //todo fix DefaultRoundingPolicy when using Netty + this(listener, limit, DefaultRoundingPolicy.DEFAULT_ROUNDING_POLICY); } /** diff --git a/java/memory/src/main/java/org/apache/arrow/memory/UnsafeAllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/UnsafeAllocationManager.java index 976f8907e65..f9756539c55 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/UnsafeAllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/UnsafeAllocationManager.java @@ -24,7 +24,23 @@ */ public final class UnsafeAllocationManager extends AllocationManager { - public static final AllocationManager.Factory FACTORY = UnsafeAllocationManager::new; + private static final ArrowBuf EMPTY = new ArrowBuf(ReferenceManager.NO_OP, + null, + 0, + MemoryUtil.UNSAFE.allocateMemory(0) + ); + + public static final AllocationManager.Factory FACTORY = new Factory() { + @Override + public AllocationManager create(BaseAllocator accountingAllocator, long size) { + return new UnsafeAllocationManager(accountingAllocator, size); + } + + @Override + public ArrowBuf empty() { + return EMPTY; + } + }; private final long allocatedSize; diff --git a/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java b/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java index e9ff3bc6b6c..1b4b2d87be2 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/rounding/DefaultRoundingPolicy.java @@ -17,10 +17,11 @@ package org.apache.arrow.memory.rounding; -import java.lang.reflect.Field; - -import org.apache.arrow.memory.NettyAllocationManager; import org.apache.arrow.memory.util.CommonUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.util.internal.SystemPropertyUtil; /** * The default rounding policy. That is, if the requested size is within the chunk size, @@ -28,22 +29,83 @@ * will be identical to the requested size. */ public class DefaultRoundingPolicy implements RoundingPolicy { - + private static final Logger logger = LoggerFactory.getLogger(DefaultRoundingPolicy.class); public final long chunkSize; /** - * The singleton instance. + * The variables here and the static block calculates the DEFAULT_CHUNK_SIZE. + * + *

+ * It was copied from {@link io.netty.buffer.PooledByteBufAllocator}. + *

*/ - public static final DefaultRoundingPolicy INSTANCE = new DefaultRoundingPolicy(); + private static final int MIN_PAGE_SIZE = 4096; + private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); + private static final long DEFAULT_CHUNK_SIZE; - private DefaultRoundingPolicy() { + + static { + int defaultPageSize = SystemPropertyUtil.getInt("org.apache.memory.allocator.pageSize", 8192); + Throwable pageSizeFallbackCause = null; try { - Field field = NettyAllocationManager.class.getDeclaredField("CHUNK_SIZE"); - field.setAccessible(true); - chunkSize = (Long) field.get(null); - } catch (Exception e) { - throw new RuntimeException("Failed to get chunk size from allocation manager"); + validateAndCalculatePageShifts(defaultPageSize); + } catch (Throwable t) { + pageSizeFallbackCause = t; + defaultPageSize = 8192; + } + + int defaultMaxOrder = SystemPropertyUtil.getInt("org.apache.memory.allocator.maxOrder", 11); + Throwable maxOrderFallbackCause = null; + try { + validateAndCalculateChunkSize(defaultPageSize, defaultMaxOrder); + } catch (Throwable t) { + maxOrderFallbackCause = t; + defaultMaxOrder = 11; + } + DEFAULT_CHUNK_SIZE = validateAndCalculateChunkSize(defaultPageSize, defaultMaxOrder); + if (logger.isDebugEnabled()) { + logger.debug("-Dorg.apache.memory.allocator.pageSize: {}", defaultPageSize); + logger.debug("-Dorg.apache.memory.allocator.maxOrder: {}", defaultMaxOrder); + } + } + + private static int validateAndCalculatePageShifts(int pageSize) { + if (pageSize < MIN_PAGE_SIZE) { + throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + ")"); + } + + if ((pageSize & pageSize - 1) != 0) { + throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)"); + } + + // Logarithm base 2. At this point we know that pageSize is a power of two. + return Integer.SIZE - 1 - Integer.numberOfLeadingZeros(pageSize); + } + + private static int validateAndCalculateChunkSize(int pageSize, int maxOrder) { + if (maxOrder > 14) { + throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)"); + } + + // Ensure the resulting chunkSize does not overflow. + int chunkSize = pageSize; + for (int i = maxOrder; i > 0; i --) { + if (chunkSize > MAX_CHUNK_SIZE / 2) { + throw new IllegalArgumentException(String.format( + "pageSize (%d) << maxOrder (%d) must not exceed %d", pageSize, maxOrder, MAX_CHUNK_SIZE)); + } + chunkSize <<= 1; } + return chunkSize; + } + + /** + * The singleton instance. + */ + public static final DefaultRoundingPolicy DEFAULT_ROUNDING_POLICY = new DefaultRoundingPolicy(DEFAULT_CHUNK_SIZE); + + private DefaultRoundingPolicy(long chunkSize) { + this.chunkSize = chunkSize; } @Override diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java b/java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java index 4a83fd7426c..63d2c091eb5 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java @@ -17,20 +17,22 @@ package org.apache.arrow.memory.util; +import java.lang.reflect.Constructor; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.security.AccessController; import java.security.PrivilegedAction; -import io.netty.buffer.ByteBuf; -import io.netty.util.internal.ReflectionUtil; import sun.misc.Unsafe; /** * Utilities for memory related operations. */ public class MemoryUtil { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryUtil.class); + private static final Constructor DIRECT_BUFFER_CONSTRUCTOR; /** * The unsafe object from which to access the off-heap memory. */ @@ -54,10 +56,7 @@ public class MemoryUtil { public Object run() { try { final Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); - Throwable cause = ReflectionUtil.trySetAccessible(unsafeField, false); - if (cause != null) { - return cause; - } + unsafeField.setAccessible(true); return unsafeField.get(null); } catch (Throwable e) { return e; @@ -78,6 +77,55 @@ public Object run() { Field addressField = java.nio.Buffer.class.getDeclaredField("address"); addressField.setAccessible(true); BYTE_BUFFER_ADDRESS_OFFSET = UNSAFE.objectFieldOffset(addressField); + + Constructor directBufferConstructor; + long address = -1; + final ByteBuffer direct = ByteBuffer.allocateDirect(1); + try { + + final Object maybeDirectBufferConstructor = + AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Object run() { + try { + final Constructor constructor = + direct.getClass().getDeclaredConstructor(long.class, int.class); + constructor.setAccessible(true); + logger.debug("Constructor for direct buffer found and made accessible"); + return constructor; + } catch (NoSuchMethodException e) { + logger.debug("Cannot get constructor for direct buffer allocation", e); + return e; + } catch (SecurityException e) { + logger.debug("Cannot get constructor for direct buffer allocation", e); + return e; + } + } + }); + + if (maybeDirectBufferConstructor instanceof Constructor) { + address = UNSAFE.allocateMemory(1); + // try to use the constructor now + try { + ((Constructor) maybeDirectBufferConstructor).newInstance(address, 1); + directBufferConstructor = (Constructor) maybeDirectBufferConstructor; + logger.debug("direct buffer constructor: available"); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + logger.warn("unable to instantiate a direct buffer via constructor", e); + directBufferConstructor = null; + } + } else { + logger.debug( + "direct buffer constructor: unavailable", + (Throwable) maybeDirectBufferConstructor); + directBufferConstructor = null; + } + } finally { + if (address != -1) { + UNSAFE.freeMemory(address); + } + } + DIRECT_BUFFER_CONSTRUCTOR = directBufferConstructor; } catch (Throwable e) { throw new RuntimeException("Failed to initialize MemoryUtil.", e); } @@ -95,4 +143,22 @@ public static long getByteBufferAddress(ByteBuffer buf) { private MemoryUtil() { } + + /** + * Create nio byte buffer. + */ + public static ByteBuffer directBuffer(long address, int capacity) { + if (DIRECT_BUFFER_CONSTRUCTOR != null) { + if (capacity < 0) { + throw new IllegalArgumentException("Capacity is negative, has to be positive or 0"); + } + try { + return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity); + } catch (Throwable cause) { + throw new Error(cause); + } + } + throw new UnsupportedOperationException( + "sun.misc.Unsafe or java.nio.DirectByteBuffer.(long, int) not available"); + } } diff --git a/java/memory/src/test/java/io/netty/buffer/TestNettyArrowBuf.java b/java/memory/src/test/java/io/netty/buffer/TestNettyArrowBuf.java index a1777177d5a..916cf82e76b 100644 --- a/java/memory/src/test/java/io/netty/buffer/TestNettyArrowBuf.java +++ b/java/memory/src/test/java/io/netty/buffer/TestNettyArrowBuf.java @@ -33,7 +33,7 @@ public void testSliceWithoutArgs() { try (BufferAllocator allocator = new RootAllocator(128); ArrowBuf buf = allocator.buffer(20); ) { - NettyArrowBuf nettyBuf = buf.asNettyBuffer(); + NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf); nettyBuf.writerIndex(20); nettyBuf.readerIndex(10); NettyArrowBuf slicedBuffer = nettyBuf.slice(); @@ -47,7 +47,7 @@ public void testNioBuffer() { try (BufferAllocator allocator = new RootAllocator(128); ArrowBuf buf = allocator.buffer(20); ) { - NettyArrowBuf nettyBuf = buf.asNettyBuffer(); + NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf); ByteBuffer byteBuffer = nettyBuf.nioBuffer(4, 6); // Nio Buffers should always be 0 indexed Assert.assertEquals(0, byteBuffer.position()); @@ -63,7 +63,7 @@ public void testInternalNioBuffer() { try (BufferAllocator allocator = new RootAllocator(128); ArrowBuf buf = allocator.buffer(20); ) { - NettyArrowBuf nettyBuf = buf.asNettyBuffer(); + NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf); ByteBuffer byteBuffer = nettyBuf.internalNioBuffer(4, 6); Assert.assertEquals(0, byteBuffer.position()); Assert.assertEquals(6, byteBuffer.limit()); @@ -78,7 +78,7 @@ public void testSetLEValues() { try (BufferAllocator allocator = new RootAllocator(128); ArrowBuf buf = allocator.buffer(20); ) { - NettyArrowBuf nettyBuf = buf.asNettyBuffer(); + NettyArrowBuf nettyBuf = NettyArrowBuf.unwrapBuffer(buf); int [] intVals = new int[] {Integer.MIN_VALUE, Short.MIN_VALUE - 1, Short.MIN_VALUE, 0 , Short.MAX_VALUE , Short.MAX_VALUE + 1, Integer.MAX_VALUE}; for (int intValue :intVals ) { @@ -104,7 +104,7 @@ public void testSetLEValues() { public void testSetCompositeBuffer() { try (BufferAllocator allocator = new RootAllocator(128); ArrowBuf buf = allocator.buffer(20); - NettyArrowBuf buf2 = allocator.buffer(20).asNettyBuffer(); + NettyArrowBuf buf2 = NettyArrowBuf.unwrapBuffer(allocator.buffer(20)); ) { CompositeByteBuf byteBufs = new CompositeByteBuf(new ArrowByteBufAllocator(allocator), true, 1); @@ -112,7 +112,7 @@ public void testSetCompositeBuffer() { buf2.setInt(0, expected); buf2.writerIndex(4); byteBufs.addComponent(true, buf2); - buf.asNettyBuffer().setBytes(0, byteBufs, 4); + NettyArrowBuf.unwrapBuffer(buf).setBytes(0, byteBufs, 4); int actual = buf.getInt(0); Assert.assertEquals(expected, actual); } @@ -127,12 +127,12 @@ public void testGetCompositeBuffer() { true, 1); int expected = 4; buf.setInt(0, expected); - NettyArrowBuf buf2 = allocator.buffer(20).asNettyBuffer(); + NettyArrowBuf buf2 = NettyArrowBuf.unwrapBuffer(allocator.buffer(20)); // composite buffers are a bit weird, need to jump hoops // to set capacity. byteBufs.addComponent(true, buf2); byteBufs.capacity(20); - buf.asNettyBuffer().getBytes(0, byteBufs, 4); + NettyArrowBuf.unwrapBuffer(buf).getBytes(0, byteBufs, 4); int actual = byteBufs.getInt(0); Assert.assertEquals(expected, actual); byteBufs.component(0).release(); diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java b/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java index 2f503d61d3a..9b36a3bb352 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestArrowBuf.java @@ -27,8 +27,6 @@ import org.junit.BeforeClass; import org.junit.Test; -import io.netty.buffer.PooledByteBufAllocatorL; - public class TestArrowBuf { private static final int MAX_ALLOCATION = 8 * 1024; @@ -126,40 +124,4 @@ public void testSetBytesUnsliced() { } } - @Test - public void testEmptyArrowBuf() { - ArrowBuf buf = new ArrowBuf(ReferenceManager.NO_OP, null, - 1024, new PooledByteBufAllocatorL().empty.memoryAddress()); - - buf.getReferenceManager().retain(); - buf.getReferenceManager().retain(8); - assertEquals(1024, buf.capacity()); - assertEquals(1, buf.getReferenceManager().getRefCount()); - assertEquals(0, buf.getActualMemoryConsumed()); - - for (int i = 0; i < 10; i++) { - buf.setByte(i, i); - } - assertEquals(0, buf.getActualMemoryConsumed()); - assertEquals(0, buf.getReferenceManager().getSize()); - assertEquals(0, buf.getReferenceManager().getAccountedSize()); - assertEquals(false, buf.getReferenceManager().release()); - assertEquals(false, buf.getReferenceManager().release(2)); - assertEquals(0, buf.getReferenceManager().getAllocator().getLimit()); - assertEquals(buf, buf.getReferenceManager().transferOwnership(buf, allocator).getTransferredBuffer()); - assertEquals(0, buf.readerIndex()); - assertEquals(0, buf.writerIndex()); - assertEquals(1, buf.refCnt()); - - ArrowBuf derive = buf.getReferenceManager().deriveBuffer(buf, 0, 100); - assertEquals(derive, buf); - assertEquals(1, buf.refCnt()); - assertEquals(1, derive.refCnt()); - - buf.close(); - - ArrowBuf buf2 = ArrowBuf.empty(10); - assertEquals(10, buf2.capacity()); - } - } diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java index 62cbe8bf80a..a42e272a42e 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestBaseAllocator.java @@ -391,39 +391,49 @@ public void testCustomizedAllocationManager() { private BaseAllocator createAllocatorWithCustomizedAllocationManager() { return new RootAllocator(BaseAllocator.configBuilder() .maxAllocation(MAX_ALLOCATION) - .allocationManagerFactory((accountingAllocator, requestedSize) -> new AllocationManager(accountingAllocator) { - private final Unsafe unsafe = getUnsafe(); - private final long address = unsafe.allocateMemory(requestedSize); - + .allocationManagerFactory(new AllocationManager.Factory() { @Override - protected long memoryAddress() { - return address; - } + public AllocationManager create(BaseAllocator accountingAllocator, long requestedSize) { + return new AllocationManager(accountingAllocator) { + private final Unsafe unsafe = getUnsafe(); + private final long address = unsafe.allocateMemory(requestedSize); + + @Override + protected long memoryAddress() { + return address; + } - @Override - protected void release0() { - unsafe.setMemory(address, requestedSize, (byte) 0); - unsafe.freeMemory(address); - } + @Override + protected void release0() { + unsafe.setMemory(address, requestedSize, (byte) 0); + unsafe.freeMemory(address); + } - @Override - public long getSize() { - return requestedSize; - } + @Override + public long getSize() { + return requestedSize; + } - private Unsafe getUnsafe() { - Field f = null; - try { - f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return (Unsafe) f.get(null); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException(e); - } finally { - if (f != null) { - f.setAccessible(false); + private Unsafe getUnsafe() { + Field f = null; + try { + f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + return (Unsafe) f.get(null); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } finally { + if (f != null) { + f.setAccessible(false); + } + } } - } + }; + } + + @Override + public ArrowBuf empty() { + return null; } }).build()); } diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestEmptyArrowBuf.java b/java/memory/src/test/java/org/apache/arrow/memory/TestEmptyArrowBuf.java new file mode 100644 index 00000000000..21e2eaa490d --- /dev/null +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestEmptyArrowBuf.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import static org.junit.Assert.assertEquals; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.PooledByteBufAllocatorL; + +public class TestEmptyArrowBuf { + + private static final int MAX_ALLOCATION = 8 * 1024; + private static RootAllocator allocator; + + @BeforeClass + public static void beforeClass() { + allocator = new RootAllocator(MAX_ALLOCATION); + } + + /** Ensure the allocator is closed. */ + @AfterClass + public static void afterClass() { + if (allocator != null) { + allocator.close(); + } + } + + @Test + public void testEmptyArrowBuf() { + ArrowBuf buf = new ArrowBuf(ReferenceManager.NO_OP, null, + 1024, new PooledByteBufAllocatorL().empty.memoryAddress()); + + buf.getReferenceManager().retain(); + buf.getReferenceManager().retain(8); + assertEquals(1024, buf.capacity()); + assertEquals(1, buf.getReferenceManager().getRefCount()); + assertEquals(0, buf.getActualMemoryConsumed()); + + for (int i = 0; i < 10; i++) { + buf.setByte(i, i); + } + assertEquals(0, buf.getActualMemoryConsumed()); + assertEquals(0, buf.getReferenceManager().getSize()); + assertEquals(0, buf.getReferenceManager().getAccountedSize()); + assertEquals(false, buf.getReferenceManager().release()); + assertEquals(false, buf.getReferenceManager().release(2)); + assertEquals(0, buf.getReferenceManager().getAllocator().getLimit()); + assertEquals(buf, buf.getReferenceManager().transferOwnership(buf, allocator).getTransferredBuffer()); + assertEquals(0, buf.readerIndex()); + assertEquals(0, buf.writerIndex()); + assertEquals(1, buf.refCnt()); + + ArrowBuf derive = buf.getReferenceManager().deriveBuffer(buf, 0, 100); + assertEquals(derive, buf); + assertEquals(1, buf.refCnt()); + assertEquals(1, derive.refCnt()); + + buf.close(); + + } + +} diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestEndianness.java b/java/memory/src/test/java/org/apache/arrow/memory/TestEndianness.java index 0d42bfabd2b..6f0da8728ce 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestEndianness.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestEndianness.java @@ -22,13 +22,14 @@ import org.junit.Test; import io.netty.buffer.ByteBuf; +import io.netty.buffer.NettyArrowBuf; public class TestEndianness { @Test public void testLittleEndian() { final BufferAllocator a = new RootAllocator(10000); - final ByteBuf b = a.buffer(4).asNettyBuffer(); + final ByteBuf b = NettyArrowBuf.unwrapBuffer(a.buffer(4)); b.setInt(0, 35); assertEquals(b.getByte(0), 35); assertEquals(b.getByte(1), 0); diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java b/java/memory/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java index 5c5baad04aa..f386ea66b2a 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestNettyAllocationManager.java @@ -33,9 +33,17 @@ public class TestNettyAllocationManager { private BaseAllocator createCustomizedAllocator() { return new RootAllocator(BaseAllocator.configBuilder() - .allocationManagerFactory((accountingAllocator, requestedSize) -> - new NettyAllocationManager( - accountingAllocator, requestedSize, CUSTOMIZED_ALLOCATION_CUTOFF_VALUE)).build()); + .allocationManagerFactory(new AllocationManager.Factory() { + @Override + public AllocationManager create(BaseAllocator accountingAllocator, long size) { + return new NettyAllocationManager(accountingAllocator, size, CUSTOMIZED_ALLOCATION_CUTOFF_VALUE); + } + + @Override + public ArrowBuf empty() { + return null; + } + }).build()); } private void readWriteArrowBuf(ArrowBuf buffer) { diff --git a/java/memory/src/test/java/org/apache/arrow/memory/TestUnsafeAllocationManager.java b/java/memory/src/test/java/org/apache/arrow/memory/TestUnsafeAllocationManager.java index af4f9aa3cbb..c15882a37a6 100644 --- a/java/memory/src/test/java/org/apache/arrow/memory/TestUnsafeAllocationManager.java +++ b/java/memory/src/test/java/org/apache/arrow/memory/TestUnsafeAllocationManager.java @@ -28,10 +28,8 @@ public class TestUnsafeAllocationManager { private BaseAllocator createUnsafeAllocator() { - return new RootAllocator(BaseAllocator.configBuilder() - .allocationManagerFactory((accountingAllocator, requestedSize) -> - new UnsafeAllocationManager( - accountingAllocator, requestedSize)).build()); + return new RootAllocator(BaseAllocator.configBuilder().allocationManagerFactory(UnsafeAllocationManager.FACTORY) + .build()); } private void readWriteArrowBuf(ArrowBuf buffer) {