Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ public String ofByteBuffer(ByteBuffer byteBuffer) {
return new String(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(),
byteBuffer.remaining(), StandardCharsets.UTF_8);
}
return StandardCharsets.UTF_8.decode(byteBuffer).toString();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return new String(bytes, StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.arrow.vectorized.parquet;

import java.util.Arrays;

public class DecimalVectorUtil {

private DecimalVectorUtil() {
}

/**
* Parquet stores decimal values in big-endian byte order, and Arrow stores them in native byte order.
* When setting the value in Arrow, we call setBigEndian(), and the byte order is reversed if needed.
* Also, the byte array is padded to fill 16 bytes in length by calling Unsafe.setMemory(). The padding
* operation can be slow, so by using this utility method, we can pad before calling setBigEndian() and
* avoid the call to Unsafe.setMemory().
*
* @param bigEndianBytes The big endian bytes
* @param newLength The length of the byte array to return
* @return The new byte array
*/
public static byte[] padBigEndianBytes(byte[] bigEndianBytes, int newLength) {
if (bigEndianBytes.length == newLength) {
return bigEndianBytes;
} else if (bigEndianBytes.length < newLength) {
byte[] result = new byte[newLength];
if (bigEndianBytes.length == 0) {
return result;
Copy link
Contributor

@rdblue rdblue Jul 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bryanck, is this hit? It looks like an invalid case because the decimal precision would need to be 0, but we're choosing to return 0 for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not, I was mimicking the behavior in DecimalVector.setBigEndian() to be on the safe side.

}

int start = newLength - bigEndianBytes.length;
if (bigEndianBytes[0] < 0) {
Arrays.fill(result, 0, start, (byte) 0xFF);
}
System.arraycopy(bigEndianBytes, 0, result, start, bigEndianBytes.length);

return result;
}
throw new IllegalArgumentException(String.format("Buffer size of %d is larger than requested size of %d",
bigEndianBytes.length, newLength));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,11 @@ protected void nextVal(FieldVector vector, Dictionary dict, int idx, int current
class FixedLengthDecimalDictEncodedReader extends BaseDictEncodedReader {
@Override
protected void nextVal(FieldVector vector, Dictionary dict, int idx, int currentVal, int typeWidth) {
byte[] decimalBytes = dict.decodeToBinary(currentVal).getBytesUnsafe();
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
byte[] vectorBytes =
DecimalVectorUtil.padBigEndianBytes(
dict.decodeToBinary(currentVal).getBytesUnsafe(),
DecimalVector.TYPE_WIDTH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is typeWidth going to be the same as DecimalVector.TYPE_WIDTH?

Copy link
Contributor Author

@bryanck bryanck Jul 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typeWidth is the Parquet width, I believe, which is variable depending on the precision of the decimal, but the Arrow width is always 16.

((DecimalVector) vector).setBigEndian(idx, vectorBytes);
ByteBuffer buffer = dict.decodeToBinary(currentVal).toByteBuffer();
vector.getDataBuffer().setBytes(idx, buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bryanck, was this really setting the value twice? It looks like it was calling setBigEndian on the vector and then setBytes on the backing buffer. That could explain a lot of the slowness as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like that's what it was doing.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ class FixedLengthDecimalReader extends BaseReader {
protected void nextVal(
FieldVector vector, int idx, ValuesAsBytesReader valuesReader, int typeWidth, byte[] byteArray) {
valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
((DecimalVector) vector).setBigEndian(idx, byteArray);
byte[] vectorBytes = DecimalVectorUtil.padBigEndianBytes(byteArray, DecimalVector.TYPE_WIDTH);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a place where we could reuse a buffer rather than allocating in padBigEndianBytes every time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some testing, and reusing the buffer was a little bit slower, partly because we need to always to fill the buffer to zero out the last value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that was a little bit faster was to bypass DecimalVector.setBigEndian(), convert to little endian (if needed) and copy the bytes directly to the value buffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also one thing to note is that the benchmark isn't quite right. Decimal(20,5) will end up taking 9 bytes and will thus use a fixed length byte array instead of long or int encoding. And fixed length byte arrays aren't dictionary encoded in Parquet v1. That explains why the decimal benchmark is much slower than the other data types (which are dictionary encoded).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(It looks like dictionary encoding for fixed length byte arrays wouldn't work correctly anyway, I may follow up with a fix for that)

Copy link
Contributor Author

@bryanck bryanck Jul 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought about reusing the buffer, we could create a buffer per value reader so the width of the value is the same, then skip the array fill (if you have 2 buffers, one for negative and one for positive values)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the PR that has a fix for the dictionary encoding

((DecimalVector) vector).setBigEndian(idx, vectorBytes);
}

@Override
Expand All @@ -369,9 +370,10 @@ protected void nextDictEncodedVal(
reader.fixedLengthDecimalDictEncodedReader()
.nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, typeWidth);
} else if (Mode.PACKED.equals(mode)) {
ByteBuffer decimalBytes = dict.decodeToBinary(reader.readInteger()).toByteBuffer();
byte[] vectorBytes = new byte[typeWidth];
System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this correct before? It looks like it was trying to use System.arraycopy with a ByteBuffer!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this would have thrown an ArrayStoreException

byte[] vectorBytes =
DecimalVectorUtil.padBigEndianBytes(
dict.decodeToBinary(reader.readInteger()).getBytesUnsafe(),
DecimalVector.TYPE_WIDTH);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.arrow.vectorized.parquet;

import java.math.BigInteger;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class DecimalVectorUtilTest {

@Test
public void testPadBigEndianBytes() {
BigInteger bigInt = new BigInteger("12345");
byte[] bytes = bigInt.toByteArray();
byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);

assertEquals(16, paddedBytes.length);
BigInteger result = new BigInteger(paddedBytes);
assertEquals(bigInt, result);
}

@Test
public void testPadBigEndianBytesNegative() {
BigInteger bigInt = new BigInteger("-12345");
byte[] bytes = bigInt.toByteArray();
byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);

assertEquals(16, paddedBytes.length);
BigInteger result = new BigInteger(paddedBytes);
assertEquals(bigInt, result);
}

@Test
public void testPadBigEndianBytesZero() {
byte[] bytes = BigInteger.ZERO.toByteArray();
byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);

assertEquals(16, paddedBytes.length);
BigInteger result = new BigInteger(paddedBytes);
assertEquals(BigInteger.ZERO, result);

bytes = new byte[0];
paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);

assertEquals(16, paddedBytes.length);
result = new BigInteger(paddedBytes);
assertEquals(BigInteger.ZERO, result);
}

@Test(expected = IllegalArgumentException.class)
public void testPadBigEndianBytesOverflow() {
byte[] bytes = new byte[17];
DecimalVectorUtil.padBigEndianBytes(bytes, 16);
}
}