diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java index 15465568c231..04dfd46a1891 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java @@ -126,7 +126,7 @@ public ValueWriter primitive(DataType type, Schema primitive) { return SparkValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); case "uuid": - return ValueWriters.uuids(); + return SparkValueWriters.uuids(); default: throw new IllegalArgumentException("Unsupported logical type: " + logicalType); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java index 78db137054bc..c20be44f6735 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java @@ -123,6 +123,9 @@ public OrcValueReader primitive(Type.PrimitiveType iPrimitive, TypeDescriptio case STRING: return SparkOrcValueReaders.utf8String(); case BINARY: + if (Type.TypeID.UUID == iPrimitive.typeId()) { + return SparkOrcValueReaders.uuids(); + } return OrcValueReaders.bytes(); default: throw new IllegalArgumentException("Unhandled type " + primitive); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index 9e9b3e53bbcc..670537fbf872 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.data; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import org.apache.iceberg.orc.OrcValueReader; @@ -26,6 +27,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.UUIDUtil; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; @@ -49,6 +51,10 @@ public static OrcValueReader utf8String() { return StringReader.INSTANCE; } + public static OrcValueReader uuids() { + return UUIDReader.INSTANCE; + } + public static OrcValueReader timestampTzs() { return TimestampTzReader.INSTANCE; } @@ -170,6 +176,20 @@ public UTF8String nonNullRead(ColumnVector vector, int row) { } } + private static class UUIDReader implements OrcValueReader { + private static final UUIDReader INSTANCE = new UUIDReader(); + + private UUIDReader() {} + + @Override + public UTF8String nonNullRead(ColumnVector vector, int row) { + BytesColumnVector bytesVector = (BytesColumnVector) vector; + ByteBuffer buffer = + ByteBuffer.wrap(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]); + return UTF8String.fromString(UUIDUtil.convert(buffer).toString()); + } + } + private static class TimestampTzReader implements OrcValueReader { private static final TimestampTzReader INSTANCE = new TimestampTzReader(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index 780090f99109..7f9810e4c60c 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -18,10 +18,13 @@ */ package org.apache.iceberg.spark.data; +import java.nio.ByteBuffer; import java.util.List; +import java.util.UUID; import java.util.stream.Stream; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.iceberg.util.UUIDUtil; import org.apache.orc.TypeDescription; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; @@ -42,6 +45,10 @@ static OrcValueWriter strings() { return StringWriter.INSTANCE; } + static OrcValueWriter uuids() { + return UUIDWriter.INSTANCE; + } + static OrcValueWriter timestampTz() { return TimestampTzWriter.INSTANCE; } @@ -73,6 +80,19 @@ public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) { } } + private static class UUIDWriter implements OrcValueWriter { + private static final UUIDWriter INSTANCE = new UUIDWriter(); + + @Override + public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) { + // ((BytesColumnVector) output).setRef(..) just stores a reference to the passed byte[], so + // can't use a ThreadLocal ByteBuffer here like in other places because subsequent writes + // would then overwrite previous values + ByteBuffer buffer = UUIDUtil.convertToByteBuffer(UUID.fromString(data.toString())); + ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length); + } + } + private static class TimestampTzWriter implements OrcValueWriter { private static final TimestampTzWriter INSTANCE = new TimestampTzWriter(); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 60868b8700a3..6b799e677bf4 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -111,6 +111,9 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio case DOUBLE: return GenericOrcWriters.doubles(ORCSchemaUtil.fieldId(primitive)); case BINARY: + if (Type.TypeID.UUID == iPrimitive.typeId()) { + return SparkOrcValueWriters.uuids(); + } return GenericOrcWriters.byteArrays(); case STRING: case CHAR: @@ -173,7 +176,14 @@ static FieldGetter createFieldGetter(TypeDescription fieldType) { fieldGetter = SpecializedGetters::getDouble; break; case BINARY: - fieldGetter = SpecializedGetters::getBinary; + if (ORCSchemaUtil.BinaryType.UUID + .toString() + .equalsIgnoreCase( + fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) { + fieldGetter = SpecializedGetters::getUTF8String; + } else { + fieldGetter = SpecializedGetters::getBinary; + } // getBinary always makes a copy, so we don't need to worry about it // being changed behind our back. break; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 59f81de6ae4a..af16d9bbc290 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -46,6 +46,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type.TypeID; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.GroupType; @@ -232,6 +233,7 @@ public ParquetValueReader map( } @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") public ParquetValueReader primitive( org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); @@ -282,6 +284,9 @@ public ParquetValueReader primitive( switch (primitive.getPrimitiveTypeName()) { case FIXED_LEN_BYTE_ARRAY: case BINARY: + if (expected != null && expected.typeId() == TypeID.UUID) { + return new UUIDReader(desc); + } return new ParquetValueReaders.ByteArrayReader(desc); case INT32: if (expected != null && expected.typeId() == TypeID.LONG) { @@ -413,6 +418,18 @@ public UTF8String read(UTF8String ignored) { } } + private static class UUIDReader extends PrimitiveReader { + UUIDReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + @SuppressWarnings("ByteBufferBackingArray") + public UTF8String read(UTF8String ignored) { + return UTF8String.fromString(UUIDUtil.convert(column.nextBinary().toByteBuffer()).toString()); + } + } + private static class ArrayReader extends RepeatedReader { private int readPos = 0; private int writePos = 0; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index 3637fa4a2604..af6f65a089b6 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -18,10 +18,13 @@ */ package org.apache.iceberg.spark.data; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.UUID; import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; @@ -32,9 +35,11 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -176,6 +181,9 @@ public ParquetValueWriter primitive(DataType sType, PrimitiveType primitive) switch (primitive.getPrimitiveTypeName()) { case FIXED_LEN_BYTE_ARRAY: case BINARY: + if (LogicalTypeAnnotation.uuidType().equals(primitive.getLogicalTypeAnnotation())) { + return uuids(desc); + } return byteArrays(desc); case BOOLEAN: return ParquetValueWriters.booleans(desc); @@ -206,6 +214,10 @@ private static PrimitiveWriter utf8Strings(ColumnDescriptor desc) { return new UTF8StringWriter(desc); } + private static PrimitiveWriter uuids(ColumnDescriptor desc) { + return new UUIDWriter(desc); + } + private static PrimitiveWriter decimalAsInteger( ColumnDescriptor desc, int precision, int scale) { return new IntegerDecimalWriter(desc, precision, scale); @@ -316,6 +328,27 @@ public void write(int repetitionLevel, Decimal decimal) { } } + private static class UUIDWriter extends PrimitiveWriter { + private static final ThreadLocal BUFFER = + ThreadLocal.withInitial( + () -> { + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.order(ByteOrder.BIG_ENDIAN); + return buffer; + }); + + private UUIDWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, UTF8String string) { + UUID uuid = UUID.fromString(string.toString()); + ByteBuffer buffer = UUIDUtil.convertToByteBuffer(uuid, BUFFER.get()); + column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer)); + } + } + private static class ByteArrayWriter extends PrimitiveWriter { private ByteArrayWriter(ColumnDescriptor desc) { super(desc); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java index e32ebcb02bbc..29e938bb092e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java @@ -21,10 +21,12 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.vector.FixedSizeBinaryVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory; +import org.apache.iceberg.util.UUIDUtil; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; @@ -74,6 +76,11 @@ public UTF8String ofRow(VarCharVector vector, int rowId) { null, vector.getDataBuffer().memoryAddress() + start, end - start); } + @Override + public UTF8String ofRow(FixedSizeBinaryVector vector, int rowId) { + return UTF8String.fromString(UUIDUtil.convert(vector.get(rowId)).toString()); + } + @Override public UTF8String ofBytes(byte[] bytes) { return UTF8String.fromBytes(bytes); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java index b2d8bd14beca..c030311232a2 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java @@ -155,7 +155,10 @@ public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primit primitiveValueReader = SparkOrcValueReaders.utf8String(); break; case BINARY: - primitiveValueReader = OrcValueReaders.bytes(); + primitiveValueReader = + Type.TypeID.UUID == iPrimitive.typeId() + ? SparkOrcValueReaders.uuids() + : OrcValueReaders.bytes(); break; default: throw new IllegalArgumentException("Unhandled type " + primitive); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java index 5fd137c5361d..db0d7336f161 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java @@ -56,7 +56,7 @@ public abstract class AvroDataTest { optional(107, "date", Types.DateType.get()), required(108, "ts", Types.TimestampType.withZone()), required(110, "s", Types.StringType.get()), - // required(111, "uuid", Types.UUIDType.get()), + required(111, "uuid", Types.UUIDType.get()), required(112, "fixed", Types.FixedType.ofLength(7)), optional(113, "bytes", Types.BinaryType.get()), required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java index 1c95df8ced12..478afcf09ae3 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java @@ -329,6 +329,8 @@ public Object primitive(Type.PrimitiveType primitive) { return UTF8String.fromString((String) obj); case DECIMAL: return Decimal.apply((BigDecimal) obj); + case UUID: + return UTF8String.fromString(UUID.nameUUIDFromBytes((byte[]) obj).toString()); default: return obj; } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java index 261fb8838aa4..467d8a27a27c 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetWriter.java @@ -79,7 +79,7 @@ public class TestSparkParquetWriter { Types.StringType.get(), Types.StructType.of( optional(22, "jumpy", Types.DoubleType.get()), - required(23, "koala", Types.IntegerType.get()), + required(23, "koala", Types.UUIDType.get()), required(24, "couch rope", Types.IntegerType.get())))), optional(2, "slide", Types.StringType.get())); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index 9a4f1b5b4888..7f9810e4c60c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -85,6 +85,9 @@ private static class UUIDWriter implements OrcValueWriter { @Override public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) { + // ((BytesColumnVector) output).setRef(..) just stores a reference to the passed byte[], so + // can't use a ThreadLocal ByteBuffer here like in other places because subsequent writes + // would then overwrite previous values ByteBuffer buffer = UUIDUtil.convertToByteBuffer(UUID.fromString(data.toString())); ((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length); }