Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.function.IntFunction;
import java.util.function.Supplier;
Expand Down Expand Up @@ -556,9 +555,9 @@ public final DecimalT getDecimal(int rowId, int precision, int scale) {
DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT extends AutoCloseable>
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {
private final DecimalT[] cache;
private final DecimalFactory<DecimalT> decimalFactory;
private final Dictionary parquetDictionary;
private final IntVector offsetVector;
protected final DecimalFactory<DecimalT> decimalFactory;
protected final Dictionary parquetDictionary;

private DictionaryDecimalAccessor(
IntVector vector,
Expand All @@ -571,28 +570,16 @@ private DictionaryDecimalAccessor(
this.cache = genericArray(decimalFactory.getGenericClass(), dictionary.getMaxId() + 1);
}

protected long decodeToBinary(int dictId) {
return new BigInteger(parquetDictionary.decodeToBinary(dictId).getBytesUnsafe()).longValue();
}

protected long decodeToLong(int dictId) {
return parquetDictionary.decodeToLong(dictId);
}

protected int decodeToInt(int dictId) {
return parquetDictionary.decodeToInt(dictId);
}

@Override
public final DecimalT getDecimal(int rowId, int precision, int scale) {
int offset = offsetVector.get(rowId);
if (cache[offset] == null) {
cache[offset] = decimalFactory.ofLong(decode(offset), precision, scale);
cache[offset] = decode(offset, precision, scale);
}
return cache[offset];
}

protected abstract long decode(int dictId);
protected abstract DecimalT decode(int dictId, int precision, int scale);
}

private static class
Expand All @@ -604,8 +591,10 @@ public final DecimalT getDecimal(int rowId, int precision, int scale) {
}

@Override
protected long decode(int dictId) {
return decodeToBinary(dictId);
protected DecimalT decode(int dictId, int precision, int scale) {
ByteBuffer byteBuffer = parquetDictionary.decodeToBinary(dictId).toByteBuffer();
BigDecimal value = DecimalUtility.getBigDecimalFromByteBuffer(byteBuffer, scale, byteBuffer.remaining());
return decimalFactory.ofBigDecimal(value, precision, scale);
}
}

Expand All @@ -617,8 +606,8 @@ private static class DictionaryDecimalLongAccessor<DecimalT, Utf8StringT, ArrayT
}

@Override
protected long decode(int dictId) {
return decodeToLong(dictId);
protected DecimalT decode(int dictId, int precision, int scale) {
return decimalFactory.ofLong(parquetDictionary.decodeToLong(dictId), precision, scale);
}
}

Expand All @@ -630,8 +619,8 @@ private static class DictionaryDecimalIntAccessor<DecimalT, Utf8StringT, ArrayT,
}

@Override
protected long decode(int dictId) {
return decodeToInt(dictId);
protected DecimalT decode(int dictId, int precision, int scale) {
return decimalFactory.ofLong(parquetDictionary.decodeToInt(dictId), precision, scale);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@
package org.apache.iceberg.arrow.vectorized.parquet;

import java.util.Arrays;
import org.apache.arrow.vector.DecimalVector;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;

public class DecimalVectorUtil {

private DecimalVectorUtil() {
}

public static void setBigEndian(DecimalVector vector, int idx, byte[] value) {
byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(value, DecimalVector.TYPE_WIDTH);
vector.setBigEndian(idx, paddedBytes);
}

/**
* Parquet stores decimal values in big-endian byte order, and Arrow stores them in native byte order.
* When setting the value in Arrow, we call setBigEndian(), and the byte order is reversed if needed.
Expand All @@ -37,7 +44,8 @@ private DecimalVectorUtil() {
* @param newLength The length of the byte array to return
* @return The new byte array
*/
public static byte[] padBigEndianBytes(byte[] bigEndianBytes, int newLength) {
@VisibleForTesting
static byte[] padBigEndianBytes(byte[] bigEndianBytes, int newLength) {
if (bigEndianBytes.length == newLength) {
return bigEndianBytes;
} else if (bigEndianBytes.length < newLength) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,8 @@ protected void nextVal(FieldVector vector, Dictionary dict, int idx, int current
class FixedLengthDecimalDictEncodedReader extends BaseDictEncodedReader {
@Override
protected void nextVal(FieldVector vector, Dictionary dict, int idx, int currentVal, int typeWidth) {
byte[] vectorBytes =
DecimalVectorUtil.padBigEndianBytes(
dict.decodeToBinary(currentVal).getBytesUnsafe(),
DecimalVector.TYPE_WIDTH);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
byte[] bytes = dict.decodeToBinary(currentVal).getBytesUnsafe();
DecimalVectorUtil.setBigEndian((DecimalVector) vector, idx, bytes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,7 @@ class FixedLengthDecimalReader extends BaseReader {
protected void nextVal(
FieldVector vector, int idx, ValuesAsBytesReader valuesReader, int typeWidth, byte[] byteArray) {
valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
byte[] vectorBytes = DecimalVectorUtil.padBigEndianBytes(byteArray, DecimalVector.TYPE_WIDTH);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
DecimalVectorUtil.setBigEndian((DecimalVector) vector, idx, byteArray);
}

@Override
Expand All @@ -370,11 +369,8 @@ protected void nextDictEncodedVal(
reader.fixedLengthDecimalDictEncodedReader()
.nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, typeWidth);
} else if (Mode.PACKED.equals(mode)) {
byte[] vectorBytes =
DecimalVectorUtil.padBigEndianBytes(
dict.decodeToBinary(reader.readInteger()).getBytesUnsafe(),
DecimalVector.TYPE_WIDTH);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
byte[] bytes = dict.decodeToBinary(reader.readInteger()).getBytesUnsafe();
DecimalVectorUtil.setBigEndian((DecimalVector) vector, idx, bytes);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

package org.apache.iceberg.spark.source.parquet.vectorized;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Map;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand All @@ -45,8 +42,8 @@
* <p>
* To run this benchmark for spark-3.3:
* <code>
* ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh
* -PjmhIncludeRegex=VectorizedReadDictionaryEncodedFlatParquetDataBenchmark
* ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh \
* -PjmhIncludeRegex=VectorizedReadDictionaryEncodedFlatParquetDataBenchmark \
* -PjmhOutputPath=benchmark/results.txt
* </code>
*/
Expand Down Expand Up @@ -78,6 +75,7 @@ void appendData() {
df = withIntColumnDictEncoded(df);
df = withFloatColumnDictEncoded(df);
df = withDoubleColumnDictEncoded(df);
df = withBigDecimalColumnNotDictEncoded(df); // no dictionary for fixed len binary in Parquet v1
df = withDecimalColumnDictEncoded(df);
df = withDateColumnDictEncoded(df);
df = withTimestampColumnDictEncoded(df);
Expand Down Expand Up @@ -113,9 +111,12 @@ private static Dataset<Row> withDoubleColumnDictEncoded(Dataset<Row> df) {
return df.withColumn("doubleCol", modColumn().cast(DataTypes.DoubleType));
}

private static Dataset<Row> withBigDecimalColumnNotDictEncoded(Dataset<Row> df) {
return df.withColumn("bigDecimalCol", modColumn().cast("decimal(20,5)"));
}

private static Dataset<Row> withDecimalColumnDictEncoded(Dataset<Row> df) {
Types.DecimalType type = Types.DecimalType.of(20, 5);
return df.withColumn("decimalCol", lit(bigDecimal(type, 0)).plus(modColumn()));
return df.withColumn("decimalCol", modColumn().cast("decimal(18,5)"));
}

private static Dataset<Row> withDateColumnDictEncoded(Dataset<Row> df) {
Expand All @@ -131,9 +132,4 @@ private static Dataset<Row> withTimestampColumnDictEncoded(Dataset<Row> df) {
private static Dataset<Row> withStringColumnDictEncoded(Dataset<Row> df) {
return df.withColumn("stringCol", modColumn().cast(DataTypes.StringType));
}

private static BigDecimal bigDecimal(Types.DecimalType type, int value) {
BigInteger unscaled = new BigInteger(String.valueOf(value + 1));
return new BigDecimal(unscaled, type.scale());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
* <p>
* To run this benchmark for spark-3.3:
* <code>
* ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh
* -PjmhIncludeRegex=VectorizedReadFlatParquetDataBenchmark
* ./gradlew -DsparkVersions=3.3 :iceberg-spark:iceberg-spark-3.3_2.12:jmh \
* -PjmhIncludeRegex=VectorizedReadFlatParquetDataBenchmark \
* -PjmhOutputPath=benchmark/results.txt
* </code>
*/
Expand Down Expand Up @@ -87,15 +87,18 @@ protected Configuration initHadoopConf() {

@Override
protected Table initTable() {
// bigDecimalCol is big enough to be encoded as fix len binary (9 bytes),
// decimalCol is small enough to be encoded as a 64-bit int
Schema schema = new Schema(
optional(1, "longCol", Types.LongType.get()),
optional(2, "intCol", Types.IntegerType.get()),
optional(3, "floatCol", Types.FloatType.get()),
optional(4, "doubleCol", Types.DoubleType.get()),
optional(5, "decimalCol", Types.DecimalType.of(20, 5)),
optional(6, "dateCol", Types.DateType.get()),
optional(7, "timestampCol", Types.TimestampType.withZone()),
optional(8, "stringCol", Types.StringType.get()));
optional(5, "bigDecimalCol", Types.DecimalType.of(20, 5)),
optional(6, "decimalCol", Types.DecimalType.of(18, 5)),
optional(7, "dateCol", Types.DateType.get()),
optional(8, "timestampCol", Types.TimestampType.withZone()),
optional(9, "stringCol", Types.StringType.get()));
PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
HadoopTables tables = new HadoopTables(hadoopConf());
Map<String, String> properties = parquetWriteProps();
Expand All @@ -120,7 +123,8 @@ void appendData() {
.withColumn("intCol", expr("CAST(longCol AS INT)"))
.withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
.withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
.withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
.withColumn("bigDecimalCol", expr("CAST(longCol AS DECIMAL(20, 5))"))
.withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(18, 5))"))
.withColumn("dateCol", date_add(current_date(), fileNum))
.withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
.withColumn("stringCol", expr("CAST(longCol AS STRING)"));
Expand Down Expand Up @@ -228,6 +232,26 @@ public void readDecimalsSparkVectorized5k() {
});
}

@Benchmark
@Threads(1)
public void readBigDecimalsIcebergVectorized5k() {
withTableProperties(tablePropsWithVectorizationEnabled(5000), () -> {
String tableLocation = table().location();
Dataset<Row> df = spark().read().format("iceberg")
.load(tableLocation).select("bigDecimalCol");
materialize(df);
});
}

@Benchmark
@Threads(1)
public void readBigDecimalsSparkVectorized5k() {
withSQLConf(sparkConfWithVectorizationEnabled(5000), () -> {
Dataset<Row> df = spark().read().parquet(dataLocation()).select("bigDecimalCol");
materialize(df);
});
}

@Benchmark
@Threads(1)
public void readDatesIcebergVectorized5k() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ public abstract class AvroDataTest {
// required(111, "uuid", Types.UUIDType.get()),
required(112, "fixed", Types.FixedType.ofLength(7)),
optional(113, "bytes", Types.BinaryType.get()),
required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // spark's maximum precision
required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded
required(115, "dec_11_2", Types.DecimalType.of(11, 2)), // long encoded
required(116, "dec_20_5", Types.DecimalType.of(20, 5)), // requires padding
required(117, "dec_38_10", Types.DecimalType.of(38, 10)) // Spark's maximum precision
);

@Rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,14 @@ public void testReadsForTypePromotedColumns() throws Exception {

@Test
public void testSupportedReadsForParquetV2() throws Exception {
// Only float and double column types are written using plain encoding with Parquet V2
// Float and double column types are written using plain encoding with Parquet V2,
// also Parquet V2 will dictionary encode decimals that use fixed length binary
// (i.e. decimals > 8 bytes)
Schema schema = new Schema(
optional(102, "float_data", Types.FloatType.get()),
optional(103, "double_data", Types.DoubleType.get()));
optional(103, "double_data", Types.DoubleType.get()),
optional(104, "decimal_data", Types.DecimalType.of(25, 5))
);

File dataFile = temp.newFile();
Assert.assertTrue("Delete should succeed", dataFile.delete());
Expand Down