Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public ValueWriter<?> primitive(DataType type, Schema primitive) {
return SparkValueWriters.decimal(decimal.getPrecision(), decimal.getScale());

case "uuid":
return ValueWriters.uuids();
return SparkValueWriters.uuids();

default:
throw new IllegalArgumentException("Unsupported logical type: " + logicalType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ public OrcValueReader<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
case STRING:
return SparkOrcValueReaders.utf8String();
case BINARY:
if (Type.TypeID.UUID == iPrimitive.typeId()) {
return SparkOrcValueReaders.uuids();
}
return OrcValueReaders.bytes();
default:
throw new IllegalArgumentException("Unhandled type " + primitive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.iceberg.spark.data;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.orc.OrcValueReader;
import org.apache.iceberg.orc.OrcValueReaders;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
Expand All @@ -49,6 +51,10 @@ public static OrcValueReader<UTF8String> utf8String() {
return StringReader.INSTANCE;
}

public static OrcValueReader<UTF8String> uuids() {
return UUIDReader.INSTANCE;
}

public static OrcValueReader<Long> timestampTzs() {
return TimestampTzReader.INSTANCE;
}
Expand Down Expand Up @@ -170,6 +176,20 @@ public UTF8String nonNullRead(ColumnVector vector, int row) {
}
}

private static class UUIDReader implements OrcValueReader<UTF8String> {
private static final UUIDReader INSTANCE = new UUIDReader();

private UUIDReader() {}

@Override
public UTF8String nonNullRead(ColumnVector vector, int row) {
BytesColumnVector bytesVector = (BytesColumnVector) vector;
ByteBuffer buffer =
ByteBuffer.wrap(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]);
return UTF8String.fromString(UUIDUtil.convert(buffer).toString());
}
}

private static class TimestampTzReader implements OrcValueReader<Long> {
private static final TimestampTzReader INSTANCE = new TimestampTzReader();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.iceberg.spark.data;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.orc.OrcValueWriter;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
Expand All @@ -42,6 +45,10 @@ static OrcValueWriter<?> strings() {
return StringWriter.INSTANCE;
}

static OrcValueWriter<?> uuids() {
return UUIDWriter.INSTANCE;
}

static OrcValueWriter<?> timestampTz() {
return TimestampTzWriter.INSTANCE;
}
Expand Down Expand Up @@ -73,6 +80,19 @@ public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) {
}
}

private static class UUIDWriter implements OrcValueWriter<UTF8String> {
private static final UUIDWriter INSTANCE = new UUIDWriter();

@Override
public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) {
// ((BytesColumnVector) output).setRef(..) just stores a reference to the passed byte[], so
// can't use a ThreadLocal ByteBuffer here like in other places because subsequent writes
// would then overwrite previous values
ByteBuffer buffer = UUIDUtil.convertToByteBuffer(UUID.fromString(data.toString()));
((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length);
}
}

private static class TimestampTzWriter implements OrcValueWriter<Long> {
private static final TimestampTzWriter INSTANCE = new TimestampTzWriter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public OrcValueWriter<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
case DOUBLE:
return GenericOrcWriters.doubles(ORCSchemaUtil.fieldId(primitive));
case BINARY:
if (Type.TypeID.UUID == iPrimitive.typeId()) {
return SparkOrcValueWriters.uuids();
}
return GenericOrcWriters.byteArrays();
case STRING:
case CHAR:
Expand Down Expand Up @@ -173,7 +176,14 @@ static FieldGetter<?> createFieldGetter(TypeDescription fieldType) {
fieldGetter = SpecializedGetters::getDouble;
break;
case BINARY:
fieldGetter = SpecializedGetters::getBinary;
if (ORCSchemaUtil.BinaryType.UUID
.toString()
.equalsIgnoreCase(
fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) {
fieldGetter = SpecializedGetters::getUTF8String;
} else {
fieldGetter = SpecializedGetters::getBinary;
}
// getBinary always makes a copy, so we don't need to worry about it
// being changed behind our back.
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
Expand Down Expand Up @@ -232,6 +233,7 @@ public ParquetValueReader<?> map(
}

@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public ParquetValueReader<?> primitive(
org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) {
ColumnDescriptor desc = type.getColumnDescription(currentPath());
Expand Down Expand Up @@ -282,6 +284,9 @@ public ParquetValueReader<?> primitive(
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
if (expected != null && expected.typeId() == TypeID.UUID) {
return new UUIDReader(desc);
}
return new ParquetValueReaders.ByteArrayReader(desc);
case INT32:
if (expected != null && expected.typeId() == TypeID.LONG) {
Expand Down Expand Up @@ -413,6 +418,18 @@ public UTF8String read(UTF8String ignored) {
}
}

private static class UUIDReader extends PrimitiveReader<UTF8String> {
UUIDReader(ColumnDescriptor desc) {
super(desc);
}

@Override
@SuppressWarnings("ByteBufferBackingArray")
public UTF8String read(UTF8String ignored) {
return UTF8String.fromString(UUIDUtil.convert(column.nextBinary().toByteBuffer()).toString());
}
}

private static class ArrayReader<E> extends RepeatedReader<ArrayData, ReusableArrayData, E> {
private int readPos = 0;
private int writePos = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.iceberg.spark.data;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
Expand All @@ -32,9 +35,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DecimalUtil;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -176,6 +181,9 @@ public ParquetValueWriter<?> primitive(DataType sType, PrimitiveType primitive)
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
if (LogicalTypeAnnotation.uuidType().equals(primitive.getLogicalTypeAnnotation())) {
return uuids(desc);
}
return byteArrays(desc);
case BOOLEAN:
return ParquetValueWriters.booleans(desc);
Expand Down Expand Up @@ -206,6 +214,10 @@ private static PrimitiveWriter<UTF8String> utf8Strings(ColumnDescriptor desc) {
return new UTF8StringWriter(desc);
}

private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) {
return new UUIDWriter(desc);
}

private static PrimitiveWriter<Decimal> decimalAsInteger(
ColumnDescriptor desc, int precision, int scale) {
return new IntegerDecimalWriter(desc, precision, scale);
Expand Down Expand Up @@ -316,6 +328,27 @@ public void write(int repetitionLevel, Decimal decimal) {
}
}

private static class UUIDWriter extends PrimitiveWriter<UTF8String> {
private static final ThreadLocal<ByteBuffer> BUFFER =
ThreadLocal.withInitial(
() -> {
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.order(ByteOrder.BIG_ENDIAN);
return buffer;
});

private UUIDWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, UTF8String string) {
UUID uuid = UUID.fromString(string.toString());
ByteBuffer buffer = UUIDUtil.convertToByteBuffer(uuid, BUFFER.get());
column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer));
}
}

private static class ByteArrayWriter extends PrimitiveWriter<byte[]> {
private ByteArrayWriter(ColumnDescriptor desc) {
super(desc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
Expand Down Expand Up @@ -74,6 +76,11 @@ public UTF8String ofRow(VarCharVector vector, int rowId) {
null, vector.getDataBuffer().memoryAddress() + start, end - start);
}

@Override
public UTF8String ofRow(FixedSizeBinaryVector vector, int rowId) {
return UTF8String.fromString(UUIDUtil.convert(vector.get(rowId)).toString());
}

@Override
public UTF8String ofBytes(byte[] bytes) {
return UTF8String.fromBytes(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primit
primitiveValueReader = SparkOrcValueReaders.utf8String();
break;
case BINARY:
primitiveValueReader = OrcValueReaders.bytes();
primitiveValueReader =
Type.TypeID.UUID == iPrimitive.typeId()
? SparkOrcValueReaders.uuids()
: OrcValueReaders.bytes();
break;
default:
throw new IllegalArgumentException("Unhandled type " + primitive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class AvroDataTest {
optional(107, "date", Types.DateType.get()),
required(108, "ts", Types.TimestampType.withZone()),
required(110, "s", Types.StringType.get()),
// required(111, "uuid", Types.UUIDType.get()),
required(111, "uuid", Types.UUIDType.get()),
required(112, "fixed", Types.FixedType.ofLength(7)),
optional(113, "bytes", Types.BinaryType.get()),
required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ public Object primitive(Type.PrimitiveType primitive) {
return UTF8String.fromString((String) obj);
case DECIMAL:
return Decimal.apply((BigDecimal) obj);
case UUID:
return UTF8String.fromString(UUID.nameUUIDFromBytes((byte[]) obj).toString());
default:
return obj;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class TestSparkParquetWriter {
Types.StringType.get(),
Types.StructType.of(
optional(22, "jumpy", Types.DoubleType.get()),
required(23, "koala", Types.IntegerType.get()),
required(23, "koala", Types.UUIDType.get()),
required(24, "couch rope", Types.IntegerType.get())))),
optional(2, "slide", Types.StringType.get()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ private static class UUIDWriter implements OrcValueWriter<UTF8String> {

@Override
public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) {
// ((BytesColumnVector) output).setRef(..) just stores a reference to the passed byte[], so
// can't use a ThreadLocal ByteBuffer here like in other places because subsequent writes
// would then overwrite previous values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

ByteBuffer buffer = UUIDUtil.convertToByteBuffer(UUID.fromString(data.toString()));
((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length);
}
Expand Down