diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java index 448b18edae82..69b5934c44e8 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java @@ -74,7 +74,9 @@ public String ofByteBuffer(ByteBuffer byteBuffer) { return new String(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), byteBuffer.remaining(), StandardCharsets.UTF_8); } - return StandardCharsets.UTF_8.decode(byteBuffer).toString(); + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return new String(bytes, StandardCharsets.UTF_8); } } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java new file mode 100644 index 000000000000..cb4e4536906e --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.arrow.vectorized.parquet; + +import java.util.Arrays; + +public class DecimalVectorUtil { + + private DecimalVectorUtil() { + } + + /** + * Parquet stores decimal values in big-endian byte order, and Arrow stores them in native byte order. + * When setting the value in Arrow, we call setBigEndian(), and the byte order is reversed if needed. + * Also, the byte array is padded to fill 16 bytes in length by calling Unsafe.setMemory(). The padding + * operation can be slow, so by using this utility method, we can pad before calling setBigEndian() and + * avoid the call to Unsafe.setMemory(). + * + * @param bigEndianBytes The big endian bytes + * @param newLength The length of the byte array to return + * @return The new byte array + */ + public static byte[] padBigEndianBytes(byte[] bigEndianBytes, int newLength) { + if (bigEndianBytes.length == newLength) { + return bigEndianBytes; + } else if (bigEndianBytes.length < newLength) { + byte[] result = new byte[newLength]; + if (bigEndianBytes.length == 0) { + return result; + } + + int start = newLength - bigEndianBytes.length; + if (bigEndianBytes[0] < 0) { + Arrays.fill(result, 0, start, (byte) 0xFF); + } + System.arraycopy(bigEndianBytes, 0, result, start, bigEndianBytes.length); + + return result; + } + throw new IllegalArgumentException(String.format("Buffer size of %d is larger than requested size of %d", + bigEndianBytes.length, newLength)); + } + +} diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java index 1773d74873c1..e8e3cabd9da2 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java @@ -128,12 +128,11 @@ protected void nextVal(FieldVector vector, Dictionary dict, int idx, int current class FixedLengthDecimalDictEncodedReader extends BaseDictEncodedReader { @Override protected void nextVal(FieldVector vector, Dictionary dict, int idx, int currentVal, int typeWidth) { - byte[] decimalBytes = dict.decodeToBinary(currentVal).getBytesUnsafe(); - byte[] vectorBytes = new byte[typeWidth]; - System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth); + byte[] vectorBytes = + DecimalVectorUtil.padBigEndianBytes( + dict.decodeToBinary(currentVal).getBytesUnsafe(), + DecimalVector.TYPE_WIDTH); ((DecimalVector) vector).setBigEndian(idx, vectorBytes); - ByteBuffer buffer = dict.decodeToBinary(currentVal).toByteBuffer(); - vector.getDataBuffer().setBytes(idx, buffer); } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java index dbfb4054e7a2..a8990d0cd638 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java @@ -358,7 +358,8 @@ class FixedLengthDecimalReader extends BaseReader { protected void nextVal( FieldVector vector, int idx, ValuesAsBytesReader valuesReader, int typeWidth, byte[] byteArray) { valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth); - ((DecimalVector) vector).setBigEndian(idx, byteArray); + byte[] vectorBytes = DecimalVectorUtil.padBigEndianBytes(byteArray, DecimalVector.TYPE_WIDTH); + ((DecimalVector) vector).setBigEndian(idx, vectorBytes); } @Override @@ -369,9 +370,10 @@ protected void nextDictEncodedVal( reader.fixedLengthDecimalDictEncodedReader() .nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, typeWidth); } else if (Mode.PACKED.equals(mode)) { - ByteBuffer decimalBytes = dict.decodeToBinary(reader.readInteger()).toByteBuffer(); - byte[] vectorBytes = new byte[typeWidth]; - System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth); + byte[] vectorBytes = + DecimalVectorUtil.padBigEndianBytes( + dict.decodeToBinary(reader.readInteger()).getBytesUnsafe(), + DecimalVector.TYPE_WIDTH); ((DecimalVector) vector).setBigEndian(idx, vectorBytes); } } diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtilTest.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtilTest.java new file mode 100644 index 000000000000..10fe1afcdfbf --- /dev/null +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtilTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.arrow.vectorized.parquet; + +import java.math.BigInteger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class DecimalVectorUtilTest { + + @Test + public void testPadBigEndianBytes() { + BigInteger bigInt = new BigInteger("12345"); + byte[] bytes = bigInt.toByteArray(); + byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16); + + assertEquals(16, paddedBytes.length); + BigInteger result = new BigInteger(paddedBytes); + assertEquals(bigInt, result); + } + + @Test + public void testPadBigEndianBytesNegative() { + BigInteger bigInt = new BigInteger("-12345"); + byte[] bytes = bigInt.toByteArray(); + byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16); + + assertEquals(16, paddedBytes.length); + BigInteger result = new BigInteger(paddedBytes); + assertEquals(bigInt, result); + } + + @Test + public void testPadBigEndianBytesZero() { + byte[] bytes = BigInteger.ZERO.toByteArray(); + byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16); + + assertEquals(16, paddedBytes.length); + BigInteger result = new BigInteger(paddedBytes); + assertEquals(BigInteger.ZERO, result); + + bytes = new byte[0]; + paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16); + + assertEquals(16, paddedBytes.length); + result = new BigInteger(paddedBytes); + assertEquals(BigInteger.ZERO, result); + } + + @Test(expected = IllegalArgumentException.class) + public void testPadBigEndianBytesOverflow() { + byte[] bytes = new byte[17]; + DecimalVectorUtil.padBigEndianBytes(bytes, 16); + } +}