diff --git a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java index 7d7f741222c..8c5f61169d4 100644 --- a/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java +++ b/java/adapter/jdbc/src/main/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumer.java @@ -26,8 +26,6 @@ import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.VarBinaryVector; -import io.netty.util.internal.PlatformDependent; - /** * Consumer which consume binary type values from {@link ResultSet}. * Write the data to {@link org.apache.arrow.vector.VarBinaryVector}. @@ -45,7 +43,7 @@ public static BinaryConsumer createConsumer(VarBinaryVector vector, int index, b } } - private static final int BUFFER_SIZE = 1024; + private final byte[] reuseBytes = new byte[1024]; /** * Instantiate a BinaryConsumer. @@ -62,23 +60,21 @@ public BinaryConsumer(VarBinaryVector vector, int index) { */ public void consume(InputStream is) throws IOException { if (is != null) { - + while (currentIndex >= vector.getValueCapacity()) { + vector.reallocValidityAndOffsetBuffers(); + } + final int startOffset = vector.getStartOffset(currentIndex); + final ArrowBuf offsetBuffer = vector.getOffsetBuffer(); + int dataLength = 0; int read; - byte[] bytes = new byte[BUFFER_SIZE]; - int totalBytes = 0; - - ArrowBuf dataBuffer = vector.getDataBuffer(); - ArrowBuf offsetBuffer = vector.getOffsetBuffer(); - int startIndex = offsetBuffer.getInt(currentIndex * 4); - while ((read = is.read(bytes)) != -1) { - while ((dataBuffer.writerIndex() + read) > dataBuffer.capacity()) { + while ((read = is.read(reuseBytes)) != -1) { + while (vector.getDataBuffer().capacity() < (startOffset + dataLength + read)) { vector.reallocDataBuffer(); } - PlatformDependent.copyMemory(bytes, 0, - dataBuffer.memoryAddress() + startIndex + totalBytes, read); - totalBytes += read; + vector.getDataBuffer().setBytes(startOffset + dataLength, reuseBytes, 0, read); + dataLength += read; } - offsetBuffer.setInt((currentIndex + 1) * 4, startIndex + totalBytes); + offsetBuffer.setInt((currentIndex + 1) * VarBinaryVector.OFFSET_WIDTH, startOffset + dataLength); BitVectorHelper.setBit(vector.getValidityBuffer(), currentIndex); vector.setLastSet(currentIndex); } @@ -113,7 +109,7 @@ public void consume(ResultSet resultSet) throws SQLException, IOException { if (!resultSet.wasNull()) { consume(is); } - currentIndex++; + moveWriterPosition(); } } @@ -133,7 +129,7 @@ public NonNullableBinaryConsumer(VarBinaryVector vector, int index) { public void consume(ResultSet resultSet) throws SQLException, IOException { InputStream is = resultSet.getBinaryStream(columnIndexInResultSet); consume(is); - currentIndex++; + moveWriterPosition(); } } } diff --git a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/AbstractConsumerTest.java b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/AbstractConsumerTest.java new file mode 100644 index 00000000000..96bac42214c --- /dev/null +++ b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/AbstractConsumerTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.jdbc.consumer; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.junit.After; +import org.junit.Before; + +public abstract class AbstractConsumerTest { + + protected BufferAllocator allocator; + + @Before + public void setUp() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @After + public void tearDown() { + allocator.close(); + } + +} diff --git a/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java new file mode 100644 index 00000000000..a368023d490 --- /dev/null +++ b/java/adapter/jdbc/src/test/java/org/apache/arrow/adapter/jdbc/consumer/BinaryConsumerTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.adapter.jdbc.consumer; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import org.apache.arrow.vector.BaseValueVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.junit.Test; + +public class BinaryConsumerTest extends AbstractConsumerTest { + + private static final int INITIAL_VALUE_ALLOCATION = BaseValueVector.INITIAL_VALUE_ALLOCATION; + private static final int DEFAULT_RECORD_BYTE_COUNT = 8; + + interface InputStreamConsumer { + void consume(BinaryConsumer consumer) throws IOException; + } + + protected void assertConsume(boolean nullable, InputStreamConsumer dataConsumer, byte[][] expect) throws IOException { + try (final VarBinaryVector vector = new VarBinaryVector("binary", allocator)) { + BinaryConsumer consumer = BinaryConsumer.createConsumer(vector, 0, nullable); + dataConsumer.consume(consumer); + assertEquals(expect.length - 1, vector.getLastSet()); + for (int i = 0; i < expect.length; i++) { + byte[] value = expect[i]; + if (value == null) { + assertTrue(vector.isNull(i)); + } else { + assertArrayEquals(expect[i], vector.get(i)); + } + } + } + } + + private byte[] createBytes(int length) { + byte[] bytes = new byte[length]; + for (int i = 0; i < length; i++) { + bytes[i] = (byte) (i % 1024); + } + return bytes; + } + + + public void testConsumeInputStream(byte[][] values, boolean nullable) throws IOException { + assertConsume(nullable, binaryConsumer -> { + for (byte[] value : values) { + binaryConsumer.consume(new ByteArrayInputStream(value)); + binaryConsumer.moveWriterPosition(); + } + }, values); + } + + @Test + public void testConsumeInputStream() throws IOException { + testConsumeInputStream(new byte[][]{ + createBytes(DEFAULT_RECORD_BYTE_COUNT) + }, false); + + testConsumeInputStream(new byte[][]{ + createBytes(DEFAULT_RECORD_BYTE_COUNT), + createBytes(DEFAULT_RECORD_BYTE_COUNT) + }, false); + + testConsumeInputStream(new byte[][]{ + createBytes(DEFAULT_RECORD_BYTE_COUNT * 2), + createBytes(DEFAULT_RECORD_BYTE_COUNT), + createBytes(DEFAULT_RECORD_BYTE_COUNT) + }, false); + + testConsumeInputStream(new byte[][]{ + createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT) + }, false); + + testConsumeInputStream(new byte[][]{ + createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT * 10), + }, false); + + testConsumeInputStream(new byte[][]{ + createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT), + createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT) + }, false); + + testConsumeInputStream(new byte[][]{ + createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT), + createBytes(DEFAULT_RECORD_BYTE_COUNT), + createBytes(INITIAL_VALUE_ALLOCATION * DEFAULT_RECORD_BYTE_COUNT) + }, false); + + byte[][] testRecords = new byte[INITIAL_VALUE_ALLOCATION * 2][]; + for (int i = 0; i < testRecords.length; i++) { + testRecords[i] = createBytes(DEFAULT_RECORD_BYTE_COUNT); + } + testConsumeInputStream(testRecords, false); + } + +}