From 07ddc3c777b7dccda24098a9918e3746a27bc500 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Fri, 21 Apr 2023 09:18:32 +0200 Subject: [PATCH 1/3] Spark: Add read/write support for UUIDs --- .../iceberg/spark/data/SparkAvroWriter.java | 2 +- .../iceberg/spark/data/SparkOrcReader.java | 3 ++ .../spark/data/SparkOrcValueReaders.java | 32 +++++++++++++++++ .../spark/data/SparkOrcValueWriters.java | 17 +++++++++ .../iceberg/spark/data/SparkOrcWriter.java | 11 +++++- .../spark/data/SparkParquetReaders.java | 17 +++++++++ .../spark/data/SparkParquetWriters.java | 36 +++++++++++++++++++ .../vectorized/VectorizedSparkOrcReaders.java | 5 ++- .../iceberg/spark/data/AvroDataTest.java | 2 +- .../apache/iceberg/spark/data/RandomData.java | 2 ++ .../spark/data/TestSparkParquetWriter.java | 2 +- 11 files changed, 124 insertions(+), 5 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java index 15465568c231..04dfd46a1891 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java @@ -126,7 +126,7 @@ public ValueWriter primitive(DataType type, Schema primitive) { return SparkValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); case "uuid": - return ValueWriters.uuids(); + return SparkValueWriters.uuids(); default: throw new IllegalArgumentException("Unsupported logical type: " + logicalType); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 78db137054bc..c20be44f6735 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -123,6 +123,9 @@ public OrcValueReader primitive(Type.PrimitiveType iPrimitive, TypeDescriptio case STRING: return SparkOrcValueReaders.utf8String(); case BINARY: + if (Type.TypeID.UUID == iPrimitive.typeId()) { + return SparkOrcValueReaders.uuids(); + } return OrcValueReaders.bytes(); default: throw new IllegalArgumentException("Unhandled type " + primitive); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index 9e9b3e53bbcc..2bc5ef96a3a9 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.data; import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.List; import java.util.Map; import org.apache.iceberg.orc.OrcValueReader; @@ -26,6 +28,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.UUIDUtil; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; @@ -49,6 +52,10 @@ public static OrcValueReader utf8String() { return StringReader.INSTANCE; } + public static OrcValueReader uuids() { + return UUIDReader.INSTANCE; + } + public static OrcValueReader timestampTzs() { return TimestampTzReader.INSTANCE; } @@ -170,6 +177,31 @@ public UTF8String nonNullRead(ColumnVector vector, int row) { } } + private static class UUIDReader implements OrcValueReader { + private static final ThreadLocal BUFFER = + ThreadLocal.withInitial( + () -> { + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.order(ByteOrder.BIG_ENDIAN); + return buffer; + }); + + private static final UUIDReader INSTANCE = new UUIDReader(); + + private UUIDReader() {} + + @Override + public UTF8String nonNullRead(ColumnVector vector, int row) { + BytesColumnVector bytesVector = (BytesColumnVector) vector; + ByteBuffer buffer = BUFFER.get(); + buffer.rewind(); + buffer.put(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]); + buffer.rewind(); + + return UTF8String.fromString(UUIDUtil.convert(buffer).toString()); + } + } + private static class TimestampTzReader implements OrcValueReader { private static final TimestampTzReader INSTANCE = new TimestampTzReader(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index 780090f99109..9a4f1b5b4888 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -18,10 +18,13 @@ */ package org.apache.iceberg.spark.data; +import java.nio.ByteBuffer; import java.util.List; +import java.util.UUID; import java.util.stream.Stream; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.iceberg.util.UUIDUtil; import org.apache.orc.TypeDescription; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; @@ -42,6 +45,10 @@ static OrcValueWriter strings() { return StringWriter.INSTANCE; } + static OrcValueWriter uuids() { + return UUIDWriter.INSTANCE; + } + static OrcValueWriter timestampTz() { return TimestampTzWriter.INSTANCE; } @@ -73,6 +80,16 @@ public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) { } } + private static class UUIDWriter implements OrcValueWriter { + private static final UUIDWriter INSTANCE = new UUIDWriter(); + + @Override + public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) { + ByteBuffer buffer = UUIDUtil.convertToByteBuffer(UUID.fromString(data.toString())); + ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length); + } + } + private static class TimestampTzWriter implements OrcValueWriter { private static final TimestampTzWriter INSTANCE = new TimestampTzWriter(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 60868b8700a3..c5477fac0827 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -111,6 +111,9 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio case DOUBLE: return GenericOrcWriters.doubles(ORCSchemaUtil.fieldId(primitive)); case BINARY: + if (Type.TypeID.UUID == iPrimitive.typeId()) { + return SparkOrcValueWriters.uuids(); + } return GenericOrcWriters.byteArrays(); case STRING: case CHAR: @@ -173,7 +176,13 @@ static FieldGetter createFieldGetter(TypeDescription fieldType) { fieldGetter = SpecializedGetters::getDouble; break; case BINARY: - fieldGetter = SpecializedGetters::getBinary; + if (ORCSchemaUtil.BinaryType.UUID + .toString() + .equals(fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) { + fieldGetter = SpecializedGetters::getUTF8String; + } else { + fieldGetter = SpecializedGetters::getBinary; + } // getBinary always makes a copy, so we don't need to worry about it // being changed behind our back. break; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 59f81de6ae4a..af16d9bbc290 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -46,6 +46,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type.TypeID; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.GroupType; @@ -232,6 +233,7 @@ public ParquetValueReader map( } @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") public ParquetValueReader primitive( org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); @@ -282,6 +284,9 @@ public ParquetValueReader primitive( switch (primitive.getPrimitiveTypeName()) { case FIXED_LEN_BYTE_ARRAY: case BINARY: + if (expected != null && expected.typeId() == TypeID.UUID) { + return new UUIDReader(desc); + } return new ParquetValueReaders.ByteArrayReader(desc); case INT32: if (expected != null && expected.typeId() == TypeID.LONG) { @@ -413,6 +418,18 @@ public UTF8String read(UTF8String ignored) { } } + private static class UUIDReader extends PrimitiveReader { + UUIDReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + @SuppressWarnings("ByteBufferBackingArray") + public UTF8String read(UTF8String ignored) { + return UTF8String.fromString(UUIDUtil.convert(column.nextBinary().toByteBuffer()).toString()); + } + } + private static class ArrayReader extends RepeatedReader { private int readPos = 0; private int writePos = 0; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index 3637fa4a2604..c1abec96cdf0 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -18,10 +18,13 @@ */ package org.apache.iceberg.spark.data; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.UUID; import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; @@ -35,6 +38,7 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -176,6 +180,9 @@ public ParquetValueWriter primitive(DataType sType, PrimitiveType primitive) switch (primitive.getPrimitiveTypeName()) { case FIXED_LEN_BYTE_ARRAY: case BINARY: + if (LogicalTypeAnnotation.uuidType().equals(primitive.getLogicalTypeAnnotation())) { + return uuids(desc); + } return byteArrays(desc); case BOOLEAN: return ParquetValueWriters.booleans(desc); @@ -316,6 +323,35 @@ public void write(int repetitionLevel, Decimal decimal) { } } + private static PrimitiveWriter uuids(ColumnDescriptor desc) { + return new UUIDWriter(desc); + } + + private static class UUIDWriter extends PrimitiveWriter { + private static final ThreadLocal BUFFER = + ThreadLocal.withInitial( + () -> { + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.order(ByteOrder.BIG_ENDIAN); + return buffer; + }); + + private UUIDWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, UTF8String string) { + UUID uuid = UUID.fromString(string.toString()); + ByteBuffer buffer = BUFFER.get(); + buffer.rewind(); + buffer.putLong(uuid.getMostSignificantBits()); + buffer.putLong(uuid.getLeastSignificantBits()); + buffer.rewind(); + column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer)); + } + } + private static class ByteArrayWriter extends PrimitiveWriter { private ByteArrayWriter(ColumnDescriptor desc) { super(desc); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index b2d8bd14beca..c030311232a2 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -155,7 +155,10 @@ public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primit primitiveValueReader = SparkOrcValueReaders.utf8String(); break; case BINARY: - primitiveValueReader = OrcValueReaders.bytes(); + primitiveValueReader = + Type.TypeID.UUID == iPrimitive.typeId() + ? SparkOrcValueReaders.uuids() + : OrcValueReaders.bytes(); break; default: throw new IllegalArgumentException("Unhandled type " + primitive); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java index 5fd137c5361d..db0d7336f161 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java @@ -56,7 +56,7 @@ public abstract class AvroDataTest { optional(107, "date", Types.DateType.get()), required(108, "ts", Types.TimestampType.withZone()), required(110, "s", Types.StringType.get()), - // required(111, "uuid", Types.UUIDType.get()), + required(111, "uuid", Types.UUIDType.get()), required(112, "fixed", Types.FixedType.ofLength(7)), optional(113, "bytes", Types.BinaryType.get()), required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java index 1c95df8ced12..478afcf09ae3 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java @@ -329,6 +329,8 @@ public Object primitive(Type.PrimitiveType primitive) { return UTF8String.fromString((String) obj); case DECIMAL: return Decimal.apply((BigDecimal) obj); + case UUID: + return UTF8String.fromString(UUID.nameUUIDFromBytes((byte[]) obj).toString()); default: return obj; } diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java index 261fb8838aa4..467d8a27a27c 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java @@ -79,7 +79,7 @@ public class TestSparkParquetWriter { Types.StringType.get(), Types.StructType.of( optional(22, "jumpy", Types.DoubleType.get()), - required(23, "koala", Types.IntegerType.get()), + required(23, "koala", Types.UUIDType.get()), required(24, "couch rope", Types.IntegerType.get())))), optional(2, "slide", Types.StringType.get())); From ced5a0737fa608645f5ae386b387944912cb697d Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Mon, 24 Apr 2023 17:17:08 +0200 Subject: [PATCH 2/3] Add vectorization support for UTF8String --- .../org/apache/iceberg/util/RandomUtil.java | 4 +++ .../GenericArrowVectorAccessorFactory.java | 27 ++++++++++++++++++- .../ArrowVectorAccessorFactory.java | 7 +++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java index a84dc4d8f8ce..9131e6166133 100644 --- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java @@ -182,6 +182,10 @@ public static Object generateDictionaryEncodablePrimitive( BigInteger unscaled = new BigInteger(String.valueOf(value + 1)); BigDecimal bd = new BigDecimal(unscaled, type.scale()); return negate(value) ? bd.negate() : bd; + case UUID: + byte[] uuidBytes = new byte[16]; + random.nextBytes(uuidBytes); + return uuidBytes; default: throw new IllegalArgumentException( "Cannot generate random value for unknown type: " + primitive); diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java index e9305e399c85..a988516bc6df 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java @@ -218,7 +218,8 @@ private ArrowVectorAccessor getPlai return new FixedSizeBinaryBackedDecimalAccessor<>( (FixedSizeBinaryVector) vector, decimalFactorySupplier.get()); } - return new FixedSizeBinaryAccessor<>((FixedSizeBinaryVector) vector); + return new FixedSizeBinaryAccessor<>( + (FixedSizeBinaryVector) vector, stringFactorySupplier.get()); } throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass()); } @@ -558,16 +559,32 @@ private static class FixedSizeBinaryAccessor< extends ArrowVectorAccessor { private final FixedSizeBinaryVector vector; + private final StringFactory stringFactory; FixedSizeBinaryAccessor(FixedSizeBinaryVector vector) { super(vector); this.vector = vector; + this.stringFactory = null; + } + + FixedSizeBinaryAccessor( + FixedSizeBinaryVector vector, StringFactory stringFactory) { + super(vector); + this.vector = vector; + this.stringFactory = stringFactory; } @Override public byte[] getBinary(int rowId) { return vector.get(rowId); } + + @Override + public Utf8StringT getUTF8String(int rowId) { + return null == stringFactory + ? super.getUTF8String(rowId) + : stringFactory.ofRow(vector, rowId); + } } private static class ArrayAccessor< @@ -794,6 +811,14 @@ protected interface StringFactory { /** Create a UTF8 String from the row value in the arrow vector. */ Utf8StringT ofRow(VarCharVector vector, int rowId); + /** Create a UTF8 String from the row value in the FixedSizeBinaryVector vector. */ + default Utf8StringT ofRow(FixedSizeBinaryVector vector, int rowId) { + throw new UnsupportedOperationException( + String.format( + "Creating %s from a FixedSizeBinaryVector is not supported", + getGenericClass().getSimpleName())); + } + /** Create a UTF8 String from the byte array. */ Utf8StringT ofBytes(byte[] bytes); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java index e32ebcb02bbc..29e938bb092e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java @@ -21,10 +21,12 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.vector.FixedSizeBinaryVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory; +import org.apache.iceberg.util.UUIDUtil; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; @@ -74,6 +76,11 @@ public UTF8String ofRow(VarCharVector vector, int rowId) { null, vector.getDataBuffer().memoryAddress() + start, end - start); } + @Override + public UTF8String ofRow(FixedSizeBinaryVector vector, int rowId) { + return UTF8String.fromString(UUIDUtil.convert(vector.get(rowId)).toString()); + } + @Override public UTF8String ofBytes(byte[] bytes) { return UTF8String.fromBytes(bytes); From 7fcc85b044d70e382443275ad0ee090c79d9193b Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 26 Apr 2023 09:12:09 +0200 Subject: [PATCH 3/3] address review feedback --- .../java/org/apache/iceberg/util/UUIDUtil.java | 12 +++++++++++- .../iceberg/spark/data/SparkOrcValueReaders.java | 16 ++-------------- .../iceberg/spark/data/SparkOrcWriter.java | 3 ++- .../iceberg/spark/data/SparkParquetWriters.java | 15 ++++++--------- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java index 4cedb5bd2288..b72feec00b2c 100644 --- a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java +++ b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java @@ -62,7 +62,17 @@ public static byte[] convert(UUID value) { } public static ByteBuffer convertToByteBuffer(UUID value) { - ByteBuffer buffer = ByteBuffer.allocate(16); + return convertToByteBuffer(value, null); + } + + public static ByteBuffer convertToByteBuffer(UUID value, ByteBuffer reuse) { + ByteBuffer buffer; + if (reuse != null) { + buffer = reuse; + } else { + buffer = ByteBuffer.allocate(16); + } + buffer.order(ByteOrder.BIG_ENDIAN); buffer.putLong(0, value.getMostSignificantBits()); buffer.putLong(8, value.getLeastSignificantBits()); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index 2bc5ef96a3a9..670537fbf872 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -20,7 +20,6 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.util.List; import java.util.Map; import org.apache.iceberg.orc.OrcValueReader; @@ -178,14 +177,6 @@ public UTF8String nonNullRead(ColumnVector vector, int row) { } private static class UUIDReader implements OrcValueReader { - private static final ThreadLocal BUFFER = - ThreadLocal.withInitial( - () -> { - ByteBuffer buffer = ByteBuffer.allocate(16); - buffer.order(ByteOrder.BIG_ENDIAN); - return buffer; - }); - private static final UUIDReader INSTANCE = new UUIDReader(); private UUIDReader() {} @@ -193,11 +184,8 @@ private UUIDReader() {} @Override public UTF8String nonNullRead(ColumnVector vector, int row) { BytesColumnVector bytesVector = (BytesColumnVector) vector; - ByteBuffer buffer = BUFFER.get(); - buffer.rewind(); - buffer.put(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]); - buffer.rewind(); - + ByteBuffer buffer = + ByteBuffer.wrap(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]); return UTF8String.fromString(UUIDUtil.convert(buffer).toString()); } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index c5477fac0827..6b799e677bf4 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -178,7 +178,8 @@ static FieldGetter createFieldGetter(TypeDescription fieldType) { case BINARY: if (ORCSchemaUtil.BinaryType.UUID .toString() - .equals(fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) { + .equalsIgnoreCase( + fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) { fieldGetter = SpecializedGetters::getUTF8String; } else { fieldGetter = SpecializedGetters::getBinary; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index c1abec96cdf0..af6f65a089b6 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -35,6 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.GroupType; @@ -213,6 +214,10 @@ private static PrimitiveWriter utf8Strings(ColumnDescriptor desc) { return new UTF8StringWriter(desc); } + private static PrimitiveWriter uuids(ColumnDescriptor desc) { + return new UUIDWriter(desc); + } + private static PrimitiveWriter decimalAsInteger( ColumnDescriptor desc, int precision, int scale) { return new IntegerDecimalWriter(desc, precision, scale); @@ -323,10 +328,6 @@ public void write(int repetitionLevel, Decimal decimal) { } } - private static PrimitiveWriter uuids(ColumnDescriptor desc) { - return new UUIDWriter(desc); - } - private static class UUIDWriter extends PrimitiveWriter { private static final ThreadLocal BUFFER = ThreadLocal.withInitial( @@ -343,11 +344,7 @@ private UUIDWriter(ColumnDescriptor desc) { @Override public void write(int repetitionLevel, UTF8String string) { UUID uuid = UUID.fromString(string.toString()); - ByteBuffer buffer = BUFFER.get(); - buffer.rewind(); - buffer.putLong(uuid.getMostSignificantBits()); - buffer.putLong(uuid.getLeastSignificantBits()); - buffer.rewind(); + ByteBuffer buffer = UUIDUtil.convertToByteBuffer(uuid, BUFFER.get()); column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer)); } }