From 5e09cca60e5569afc54223a198785df84ccb370e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 29 Nov 2017 14:30:13 -0800 Subject: [PATCH 1/4] Upgrade Netty to 4.1.x --- .../main/java/io/netty/buffer/ArrowBuf.java | 183 ++++++++++++------ .../netty/buffer/MutableWrappedByteBuf.java | 116 ++++++++++- .../arrow/memory/ArrowByteBufAllocator.java | 36 ++++ java/pom.xml | 2 +- .../arrow/vector/util/MapWithOrdinal.java | 4 +- 5 files changed, 276 insertions(+), 65 deletions(-) diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index e2bbe35480b..4aa52fd97bf 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.FileChannel; import java.nio.channels.GatheringByteChannel; import java.nio.channels.ScatteringByteChannel; import java.nio.charset.Charset; @@ -494,30 +495,27 @@ public ArrowBuf retain() { } @Override - public long getLong(int index) { - chk(index, 8); - final long v = PlatformDependent.getLong(addr(index)); - return v; - } - - @Override - public float getFloat(int index) { - return Float.intBitsToFloat(getInt(index)); + public ByteBuf touch() { + return this; } @Override - public double getDouble(int index) { - return Double.longBitsToDouble(getLong(index)); + public ByteBuf touch(Object hint) { + return this; } @Override - public char getChar(int index) { - return (char) getShort(index); + public long getLong(int index) { + chk(index, 8); + final long v = PlatformDependent.getLong(addr(index)); + return v; } @Override - public long getUnsignedInt(int index) { - return getInt(index) & 0xFFFFFFFFL; + public long getLongLE(int index) { + chk(index, 8); + final long v = PlatformDependent.getLong(addr(index)); + return Long.reverseBytes(v); } @Override @@ -528,17 +526,41 @@ public int getInt(int index) { } @Override - public int getUnsignedShort(int index) { - return getShort(index) & 0xFFFF; + public int getIntLE(int index) { + chk(index, 4); + final int v = PlatformDependent.getInt(addr(index)); + return Integer.reverseBytes(v); } @Override public short getShort(int index) { chk(index, 2); - short v = PlatformDependent.getShort(addr(index)); + final short v = PlatformDependent.getShort(addr(index)); return v; } + @Override + public short getShortLE(int index) { + final short v = PlatformDependent.getShort(addr(index)); + return Short.reverseBytes(v); + } + + @Override + public int getUnsignedMedium(int index) { + chk(index, 3); + final long addr = addr(index); + return (PlatformDependent.getByte(addr) & 0xff) << 16 | + (PlatformDependent.getShort(addr + 1) & 0xffff); + } + + @Override + public int getUnsignedMediumLE(int index) { + chk(index, 3); + final long addr = addr(index); + return (PlatformDependent.getByte(addr) & 0xff) | + (Short.reverseBytes(PlatformDependent.getShort(addr + 1)) & 0xffff) << 8; + } + @Override public ArrowBuf setShort(int index, int value) { chk(index, 2); @@ -547,66 +569,60 @@ public ArrowBuf setShort(int index, int value) { } @Override - public ArrowBuf setInt(int index, int value) { - chk(index, 4); - PlatformDependent.putInt(addr(index), value); + public ByteBuf setShortLE(int index, int value) { + chk(index, 2); + PlatformDependent.putShort(addr(index), Short.reverseBytes((short) value)); return this; } @Override - public ArrowBuf setLong(int index, long value) { - chk(index, 8); - PlatformDependent.putLong(addr(index), value); + public ByteBuf setMedium(int index, int value) { + chk(index, 3); + final long addr = addr(index); + PlatformDependent.putByte(addr, (byte) (value >>> 16)); + PlatformDependent.putShort(addr + 1, (short) value); return this; } @Override - public ArrowBuf setChar(int index, int value) { - chk(index, 2); - PlatformDependent.putShort(addr(index), (short) value); + public ByteBuf setMediumLE(int index, int value) { + chk(index, 3); + final long addr = addr(index); + PlatformDependent.putByte(addr, (byte) value); + PlatformDependent.putShort(addr + 1, Short.reverseBytes((short) (value >>> 8))); return this; } @Override - public ArrowBuf setFloat(int index, float value) { + public ArrowBuf setInt(int index, int value) { chk(index, 4); - PlatformDependent.putInt(addr(index), Float.floatToRawIntBits(value)); - return this; - } - - @Override - public ArrowBuf setDouble(int index, double value) { - chk(index, 8); - PlatformDependent.putLong(addr(index), Double.doubleToRawLongBits(value)); + PlatformDependent.putInt(addr(index), value); return this; } @Override - public ArrowBuf writeShort(int value) { - ensure(2); - PlatformDependent.putShort(addr(writerIndex), (short) value); - writerIndex += 2; + public ByteBuf setIntLE(int index, int value) { + chk(index, 4); + PlatformDependent.putInt(addr(index), Integer.reverseBytes(value)); return this; } @Override - public ArrowBuf writeInt(int value) { - ensure(4); - PlatformDependent.putInt(addr(writerIndex), value); - writerIndex += 4; + public ArrowBuf setLong(int index, long value) { + chk(index, 8); + PlatformDependent.putLong(addr(index), value); return this; } @Override - public ArrowBuf writeLong(long value) { - ensure(8); - PlatformDependent.putLong(addr(writerIndex), value); - writerIndex += 8; + public ByteBuf setLongLE(int index, long value) { + chk(index, 8); + PlatformDependent.putLong(addr(index), Long.reverseBytes(value)); return this; } @Override - public ArrowBuf writeChar(int value) { + public ArrowBuf writeShort(int value) { ensure(2); PlatformDependent.putShort(addr(writerIndex), (short) value); writerIndex += 2; @@ -614,17 +630,17 @@ public ArrowBuf writeChar(int value) { } @Override - public ArrowBuf writeFloat(float value) { + public ArrowBuf writeInt(int value) { ensure(4); - PlatformDependent.putInt(addr(writerIndex), Float.floatToRawIntBits(value)); + PlatformDependent.putInt(addr(writerIndex), value); writerIndex += 4; return this; } @Override - public ArrowBuf writeDouble(double value) { + public ArrowBuf writeLong(long value) { ensure(8); - PlatformDependent.putLong(addr(writerIndex), Double.doubleToRawLongBits(value)); + PlatformDependent.putLong(addr(writerIndex), value); writerIndex += 8; return this; } @@ -668,16 +684,41 @@ protected short _getShort(int index) { return getShort(index); } + @Override + protected short _getShortLE(int index) { + return getShortLE(index); + } + @Override protected int _getInt(int index) { return getInt(index); } + @Override + protected int _getIntLE(int index) { + return getIntLE(index); + } + + @Override + protected int _getUnsignedMedium(int index) { + return getUnsignedMedium(index); + } + + @Override + protected int _getUnsignedMediumLE(int index) { + return getUnsignedMediumLE(index); + } + @Override protected long _getLong(int index) { return getLong(index); } + @Override + protected long _getLongLE(int index) { + return getLongLE(index); + } + @Override protected void _setByte(int index, int value) { setByte(index, value); @@ -688,21 +729,41 @@ protected void _setShort(int index, int value) { setShort(index, value); } + @Override + protected void _setShortLE(int index, int value) { + setShortLE(index, value); + } + @Override protected void _setMedium(int index, int value) { setMedium(index, value); } + @Override + protected void _setMediumLE(int index, int value) { + setMediumLE(index, value); + } + @Override protected void _setInt(int index, int value) { setInt(index, value); } + @Override + protected void _setIntLE(int index, int value) { + setIntLE(index, value); + } + @Override protected void _setLong(int index, long value) { setLong(index, value); } + @Override + public void _setLongLE(int index, long value) { + setLongLE(index, value); + } + @Override public ArrowBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { udle.getBytes(index + offset, dst, dstIndex, length); @@ -716,16 +777,13 @@ public ArrowBuf getBytes(int index, OutputStream out, int length) throws IOExcep } @Override - protected int _getUnsignedMedium(int index) { - final long addr = addr(index); - return (PlatformDependent.getByte(addr) & 0xff) << 16 | - (PlatformDependent.getByte(addr + 1) & 0xff) << 8 | - PlatformDependent.getByte(addr + 2) & 0xff; + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + return udle.getBytes(index + offset, out, length); } @Override - public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { - return udle.getBytes(index + offset, out, length); + public int getBytes(int index, FileChannel out, long position, int length) throws IOException { + return udle.getBytes(index + offset, out, position, length); } @Override @@ -776,6 +834,11 @@ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOEx return udle.setBytes(index + offset, in, length); } + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + return udle.setBytes(index + offset, in, position, length); + } + @Override public byte getByte(int index) { chk(index, 1); diff --git a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java index a5683adccbc..f0bc84cdc2d 100644 --- a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java @@ -23,9 +23,12 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.FileChannel; import java.nio.channels.GatheringByteChannel; import java.nio.channels.ScatteringByteChannel; +import io.netty.util.ByteProcessor; + /** * This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override * some behaviors and make @@ -128,6 +131,16 @@ protected short _getShort(int index) { return buffer.getShort(index); } + @Override + public short getShortLE(int index) { + return buffer.getShortLE(index); + } + + @Override + protected short _getShortLE(int index) { + return buffer.getShortLE(index); + } + @Override public int getUnsignedMedium(int index) { return _getUnsignedMedium(index); @@ -138,6 +151,16 @@ protected int _getUnsignedMedium(int index) { return buffer.getUnsignedMedium(index); } + @Override + public int getUnsignedMediumLE(int index) { + return buffer.getUnsignedMediumLE(index); + } + + @Override + protected int _getUnsignedMediumLE(int index) { + return buffer.getUnsignedMediumLE(index); + } + @Override public int getInt(int index) { return _getInt(index); @@ -148,6 +171,16 @@ protected int _getInt(int index) { return buffer.getInt(index); } + @Override + public int getIntLE(int index) { + return buffer.getIntLE(index); + } + + @Override + protected int _getIntLE(int index) { + return buffer.getIntLE(index); + } + @Override public long getLong(int index) { return _getLong(index); @@ -158,6 +191,16 @@ protected long _getLong(int index) { return buffer.getLong(index); } + @Override + public long getLongLE(int index) { + return buffer.getLongLE(index); + } + + @Override + protected long _getLongLE(int index) { + return buffer.getLongLE(index); + } + @Override public abstract ByteBuf copy(int index, int length); @@ -206,6 +249,17 @@ protected void _setShort(int index, int value) { buffer.setShort(index, value); } + @Override + public ByteBuf setShortLE(int index, int value) { + buffer.setShortLE(index, value); + return this; + } + + @Override + protected void _setShortLE(int index, int value) { + buffer.setShortLE(index, value); + } + @Override public ByteBuf setMedium(int index, int value) { _setMedium(index, value); @@ -217,6 +271,17 @@ protected void _setMedium(int index, int value) { buffer.setMedium(index, value); } + @Override + public ByteBuf setMediumLE(int index, int value) { + buffer.setMediumLE(index, value); + return this; + } + + @Override + protected void _setMediumLE(int index, int value) { + buffer.setMediumLE(index, value); + } + @Override public ByteBuf setInt(int index, int value) { _setInt(index, value); @@ -228,6 +293,17 @@ protected void _setInt(int index, int value) { buffer.setInt(index, value); } + @Override + public ByteBuf setIntLE(int index, int value) { + buffer.setIntLE(index, value); + return this; + } + + @Override + protected void _setIntLE(int index, int value) { + buffer.setIntLE(index, value); + } + @Override public ByteBuf setLong(int index, long value) { _setLong(index, value); @@ -239,6 +315,17 @@ protected void _setLong(int index, long value) { buffer.setLong(index, value); } + @Override + public ByteBuf setLongLE(int index, long value) { + buffer.setLongLE(index, value); + return this; + } + + @Override + protected void _setLongLE(int index, long value) { + buffer.setLongLE(index, value); + } + @Override public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { buffer.setBytes(index, src, srcIndex, length); @@ -257,6 +344,12 @@ public ByteBuf setBytes(int index, ByteBuffer src) { return this; } + @Override + public int setBytes(int index, FileChannel in, long position, int length) + throws IOException { + return buffer.setBytes(index, in, position, length); + } + @Override public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { @@ -282,6 +375,13 @@ public int setBytes(int index, ScatteringByteChannel in, int length) return buffer.setBytes(index, in, length); } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) + throws IOException { + return buffer.getBytes(index, out, position, length); + } + @Override public int nioBufferCount() { return buffer.nioBufferCount(); @@ -298,12 +398,12 @@ public ByteBuffer internalNioBuffer(int index, int length) { } @Override - public int forEachByte(int index, int length, ByteBufProcessor processor) { + public int forEachByte(int index, int length, ByteProcessor processor) { return buffer.forEachByte(index, length, processor); } @Override - public int forEachByteDesc(int index, int length, ByteBufProcessor processor) { + public int forEachByteDesc(int index, int length, ByteProcessor processor) { return buffer.forEachByteDesc(index, length, processor); } @@ -312,6 +412,18 @@ public final int refCnt() { return unwrap().refCnt(); } + @Override + public final ByteBuf touch() { + unwrap().touch(); + return this; + } + + @Override + public final ByteBuf touch(Object hint) { + unwrap().touch(hint); + return this; + } + @Override public final ByteBuf retain() { unwrap().retain(); diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java index b8b5283423c..58bd6e54c2d 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java @@ -36,6 +36,7 @@ public class ArrowByteBufAllocator implements ByteBufAllocator { private static final int DEFAULT_BUFFER_SIZE = 4096; private static final int DEFAULT_MAX_COMPOSITE_COMPONENTS = 16; + private static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page private final BufferAllocator allocator; @@ -146,4 +147,39 @@ private RuntimeException fail() { throw new UnsupportedOperationException("Allocator doesn't support heap-based memory."); } + @Override + public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { + if (minNewCapacity < 0) { + throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)"); + } + if (minNewCapacity > maxCapacity) { + throw new IllegalArgumentException(String.format( + "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", + minNewCapacity, maxCapacity)); + } + final int threshold = CALCULATE_THRESHOLD; // 4 MiB page + + if (minNewCapacity == threshold) { + return threshold; + } + + // If over threshold, do not double but just increase by threshold. + if (minNewCapacity > threshold) { + int newCapacity = minNewCapacity / threshold * threshold; + if (newCapacity > maxCapacity - threshold) { + newCapacity = maxCapacity; + } else { + newCapacity += threshold; + } + return newCapacity; + } + + // Not over threshold. Double up to 4 MiB, starting from 64. + int newCapacity = 64; + while (newCapacity < minNewCapacity) { + newCapacity <<= 1; + } + + return Math.min(newCapacity, maxCapacity); + } } diff --git a/java/pom.xml b/java/pom.xml index c479d651f6b..162c53460cc 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -32,7 +32,7 @@ 4.11 1.7.25 18.0 - 4.0.49.Final + 4.1.17.Final 2.7.9 2.7.1 1.2.0-3f79e055 diff --git a/java/vector/src/main/java/org/apache/arrow/vector/util/MapWithOrdinal.java b/java/vector/src/main/java/org/apache/arrow/vector/util/MapWithOrdinal.java index 6d3b390379a..b863fa8af86 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/util/MapWithOrdinal.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/util/MapWithOrdinal.java @@ -134,9 +134,9 @@ public Set keySet() { @Override public Collection values() { - return Lists.newArrayList(Iterables.transform(secondary.entries(), new Function, V>() { + return Lists.newArrayList(Iterables.transform(secondary.entries(), new Function, V>() { @Override - public V apply(IntObjectMap.Entry entry) { + public V apply(IntObjectMap.PrimitiveEntry entry) { return Preconditions.checkNotNull(entry).value(); } })); From 555f88aee53ec810ef5e558fc28b5fa9cf1fb82c Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 29 Nov 2017 16:07:48 -0800 Subject: [PATCH 2/4] Add methods back --- .../main/java/io/netty/buffer/ArrowBuf.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index 4aa52fd97bf..679f373b7c7 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -511,6 +511,11 @@ public long getLong(int index) { return v; } + @Override + public float getFloat(int index) { + return Float.intBitsToFloat(getInt(index)); + } + @Override public long getLongLE(int index) { chk(index, 8); @@ -518,6 +523,21 @@ public long getLongLE(int index) { return Long.reverseBytes(v); } + @Override + public double getDouble(int index) { + return Double.longBitsToDouble(getLong(index)); + } + + @Override + public char getChar(int index) { + return (char) getShort(index); + } + + @Override + public long getUnsignedInt(int index) { + return getInt(index) & 0xFFFFFFFFL; + } + @Override public int getInt(int index) { chk(index, 4); @@ -532,6 +552,11 @@ public int getIntLE(int index) { return Integer.reverseBytes(v); } + @Override + public int getUnsignedShort(int index) { + return getShort(index) & 0xFFFF; + } + @Override public short getShort(int index) { chk(index, 2); @@ -621,6 +646,27 @@ public ByteBuf setLongLE(int index, long value) { return this; } + @Override + public ArrowBuf setChar(int index, int value) { + chk(index, 2); + PlatformDependent.putShort(addr(index), (short) value); + return this; + } + + @Override + public ArrowBuf setFloat(int index, float value) { + chk(index, 4); + PlatformDependent.putInt(addr(index), Float.floatToRawIntBits(value)); + return this; + } + + @Override + public ArrowBuf setDouble(int index, double value) { + chk(index, 8); + PlatformDependent.putLong(addr(index), Double.doubleToRawLongBits(value)); + return this; + } + @Override public ArrowBuf writeShort(int value) { ensure(2); @@ -645,6 +691,30 @@ public ArrowBuf writeLong(long value) { return this; } + @Override + public ArrowBuf writeChar(int value) { + ensure(2); + PlatformDependent.putShort(addr(writerIndex), (short) value); + writerIndex += 2; + return this; + } + + @Override + public ArrowBuf writeFloat(float value) { + ensure(4); + PlatformDependent.putInt(addr(writerIndex), Float.floatToRawIntBits(value)); + writerIndex += 4; + return this; + } + + @Override + public ArrowBuf writeDouble(double value) { + ensure(8); + PlatformDependent.putLong(addr(writerIndex), Double.doubleToRawLongBits(value)); + writerIndex += 8; + return this; + } + @Override public ArrowBuf getBytes(int index, byte[] dst, int dstIndex, int length) { udle.getBytes(index + offset, dst, dstIndex, length); From bb973335f5384c7300b000dd893b7433b6d3b2bb Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Sun, 3 Dec 2017 23:51:49 -0800 Subject: [PATCH 3/4] Add comment for calculateNewCapacity --- .../java/org/apache/arrow/memory/ArrowByteBufAllocator.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java index 58bd6e54c2d..b345fa7e655 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java @@ -147,6 +147,12 @@ private RuntimeException fail() { throw new UnsupportedOperationException("Allocator doesn't support heap-based memory."); } + /** + * This method was copied from AbstractByteBufAllocator. Netty 4.1.x moved this method from + * AbstractByteBuf to AbstractByteBufAllocator. However, as ArrowByteBufAllocator doesn't extend + * AbstractByteBufAllocator, it doesn't get the implementation automatically and we have to copy + * the codes. + */ @Override public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { if (minNewCapacity < 0) { From 96a93e1805d612c98cae3ee2297ee83ca5c6292f Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 4 Dec 2017 16:07:01 -0800 Subject: [PATCH 4/4] extend AbstractByteBufAllocator; add javadoc for new methods --- .../main/java/io/netty/buffer/ArrowBuf.java | 50 ++++++++++++++++ .../arrow/memory/ArrowByteBufAllocator.java | 57 ++++--------------- 2 files changed, 62 insertions(+), 45 deletions(-) diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index 679f373b7c7..23f5d65fbb5 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -516,6 +516,10 @@ public float getFloat(int index) { return Float.intBitsToFloat(getInt(index)); } + /** + * Gets a 64-bit long integer at the specified absolute {@code index} in + * this buffer in Big Endian Byte Order. + */ @Override public long getLongLE(int index) { chk(index, 8); @@ -545,6 +549,10 @@ public int getInt(int index) { return v; } + /** + * Gets a 32-bit integer at the specified absolute {@code index} in + * this buffer in Big Endian Byte Order. + */ @Override public int getIntLE(int index) { chk(index, 4); @@ -564,12 +572,20 @@ public short getShort(int index) { return v; } + /** + * Gets a 16-bit short integer at the specified absolute {@code index} in + * this buffer in Big Endian Byte Order. + */ @Override public short getShortLE(int index) { final short v = PlatformDependent.getShort(addr(index)); return Short.reverseBytes(v); } + /** + * Gets an unsigned 24-bit medium integer at the specified absolute + * {@code index} in this buffer. + */ @Override public int getUnsignedMedium(int index) { chk(index, 3); @@ -578,6 +594,10 @@ public int getUnsignedMedium(int index) { (PlatformDependent.getShort(addr + 1) & 0xffff); } + /** + * Gets an unsigned 24-bit medium integer at the specified absolute {@code index} in + * this buffer in Big Endian Byte Order. + */ @Override public int getUnsignedMediumLE(int index) { chk(index, 3); @@ -593,6 +613,10 @@ public ArrowBuf setShort(int index, int value) { return this; } + /** + * Sets the specified 16-bit short integer at the specified absolute {@code index} + * in this buffer with Big Endian byte order. + */ @Override public ByteBuf setShortLE(int index, int value) { chk(index, 2); @@ -600,6 +624,10 @@ public ByteBuf setShortLE(int index, int value) { return this; } + /** + * Sets the specified 24-bit medium integer at the specified absolute + * {@code index} in this buffer. + */ @Override public ByteBuf setMedium(int index, int value) { chk(index, 3); @@ -609,6 +637,11 @@ public ByteBuf setMedium(int index, int value) { return this; } + + /** + * Sets the specified 24-bit medium integer at the specified absolute {@code index} + * in this buffer with Big Endian byte order. + */ @Override public ByteBuf setMediumLE(int index, int value) { chk(index, 3); @@ -625,6 +658,10 @@ public ArrowBuf setInt(int index, int value) { return this; } + /** + * Sets the specified 32-bit integer at the specified absolute {@code index} + * in this buffer with Big Endian byte order. + */ @Override public ByteBuf setIntLE(int index, int value) { chk(index, 4); @@ -639,6 +676,10 @@ public ArrowBuf setLong(int index, long value) { return this; } + /** + * Sets the specified 64-bit long integer at the specified absolute {@code index} + * in this buffer with Big Endian byte order. + */ @Override public ByteBuf setLongLE(int index, long value) { chk(index, 8); @@ -754,6 +795,7 @@ protected short _getShort(int index) { return getShort(index); } + /** @see {@link #getShortLE(int)} */ @Override protected short _getShortLE(int index) { return getShortLE(index); @@ -764,16 +806,19 @@ protected int _getInt(int index) { return getInt(index); } + /** @see {@link #getIntLE(int)} */ @Override protected int _getIntLE(int index) { return getIntLE(index); } + /** @see {@link #getUnsignedMedium(int)} */ @Override protected int _getUnsignedMedium(int index) { return getUnsignedMedium(index); } + /** @see {@link #getUnsignedMediumLE(int)} */ @Override protected int _getUnsignedMediumLE(int index) { return getUnsignedMediumLE(index); @@ -784,6 +829,7 @@ protected long _getLong(int index) { return getLong(index); } + /** @see {@link #getLongLE(int)} */ @Override protected long _getLongLE(int index) { return getLongLE(index); @@ -799,6 +845,7 @@ protected void _setShort(int index, int value) { setShort(index, value); } + /** @see {@link #setShortLE(int, int)} */ @Override protected void _setShortLE(int index, int value) { setShortLE(index, value); @@ -809,6 +856,7 @@ protected void _setMedium(int index, int value) { setMedium(index, value); } + /** @see {@link #setMediumLE(int, int)} */ @Override protected void _setMediumLE(int index, int value) { setMediumLE(index, value); @@ -819,6 +867,7 @@ protected void _setInt(int index, int value) { setInt(index, value); } + /** @see {@link #setIntLE(int, int)} */ @Override protected void _setIntLE(int index, int value) { setIntLE(index, value); @@ -829,6 +878,7 @@ protected void _setLong(int index, long value) { setLong(index, value); } + /** @see {@link #setLongLE(int, long)} */ @Override public void _setLongLE(int index, long value) { setLongLE(index, value); diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java index b345fa7e655..94102992139 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java @@ -18,8 +18,8 @@ package org.apache.arrow.memory; +import io.netty.buffer.AbstractByteBufAllocator; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.ExpandableByteBuf; @@ -32,11 +32,10 @@ * otherwise non-expandable * ArrowBufs to be expandable. */ -public class ArrowByteBufAllocator implements ByteBufAllocator { +public class ArrowByteBufAllocator extends AbstractByteBufAllocator { private static final int DEFAULT_BUFFER_SIZE = 4096; private static final int DEFAULT_MAX_COMPOSITE_COMPONENTS = 16; - private static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page private final BufferAllocator allocator; @@ -143,49 +142,17 @@ public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { throw fail(); } - private RuntimeException fail() { - throw new UnsupportedOperationException("Allocator doesn't support heap-based memory."); + @Override + protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { + throw fail(); + } + + @Override + protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { + return buffer(initialCapacity, maxCapacity); } - /** - * This method was copied from AbstractByteBufAllocator. Netty 4.1.x moved this method from - * AbstractByteBuf to AbstractByteBufAllocator. However, as ArrowByteBufAllocator doesn't extend - * AbstractByteBufAllocator, it doesn't get the implementation automatically and we have to copy - * the codes. - */ - @Override - public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { - if (minNewCapacity < 0) { - throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)"); - } - if (minNewCapacity > maxCapacity) { - throw new IllegalArgumentException(String.format( - "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", - minNewCapacity, maxCapacity)); - } - final int threshold = CALCULATE_THRESHOLD; // 4 MiB page - - if (minNewCapacity == threshold) { - return threshold; - } - - // If over threshold, do not double but just increase by threshold. - if (minNewCapacity > threshold) { - int newCapacity = minNewCapacity / threshold * threshold; - if (newCapacity > maxCapacity - threshold) { - newCapacity = maxCapacity; - } else { - newCapacity += threshold; - } - return newCapacity; - } - - // Not over threshold. Double up to 4 MiB, starting from 64. - int newCapacity = 64; - while (newCapacity < minNewCapacity) { - newCapacity <<= 1; - } - - return Math.min(newCapacity, maxCapacity); + private RuntimeException fail() { + throw new UnsupportedOperationException("Allocator doesn't support heap-based memory."); } }