Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion api/src/main/java/org/apache/iceberg/util/UUIDUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,17 @@ public static byte[] convert(UUID value) {
}

public static ByteBuffer convertToByteBuffer(UUID value) {
ByteBuffer buffer = ByteBuffer.allocate(16);
return convertToByteBuffer(value, null);
}

public static ByteBuffer convertToByteBuffer(UUID value, ByteBuffer reuse) {
ByteBuffer buffer;
if (reuse != null) {
buffer = reuse;
} else {
buffer = ByteBuffer.allocate(16);
}

buffer.order(ByteOrder.BIG_ENDIAN);
buffer.putLong(0, value.getMostSignificantBits());
buffer.putLong(8, value.getLeastSignificantBits());
Expand Down
4 changes: 4 additions & 0 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ public static Object generateDictionaryEncodablePrimitive(
BigInteger unscaled = new BigInteger(String.valueOf(value + 1));
BigDecimal bd = new BigDecimal(unscaled, type.scale());
return negate(value) ? bd.negate() : bd;
case UUID:
byte[] uuidBytes = new byte[16];
random.nextBytes(uuidBytes);
return uuidBytes;
default:
throw new IllegalArgumentException(
"Cannot generate random value for unknown type: " + primitive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> getPlai
return new FixedSizeBinaryBackedDecimalAccessor<>(
(FixedSizeBinaryVector) vector, decimalFactorySupplier.get());
}
return new FixedSizeBinaryAccessor<>((FixedSizeBinaryVector) vector);
return new FixedSizeBinaryAccessor<>(
(FixedSizeBinaryVector) vector, stringFactorySupplier.get());
}
throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass());
}
Expand Down Expand Up @@ -558,16 +559,32 @@ private static class FixedSizeBinaryAccessor<
extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> {

private final FixedSizeBinaryVector vector;
private final StringFactory<Utf8StringT> stringFactory;

FixedSizeBinaryAccessor(FixedSizeBinaryVector vector) {
super(vector);
this.vector = vector;
this.stringFactory = null;
}

FixedSizeBinaryAccessor(
FixedSizeBinaryVector vector, StringFactory<Utf8StringT> stringFactory) {
super(vector);
this.vector = vector;
this.stringFactory = stringFactory;
}

@Override
public byte[] getBinary(int rowId) {
return vector.get(rowId);
}

@Override
public Utf8StringT getUTF8String(int rowId) {
return null == stringFactory
? super.getUTF8String(rowId)
: stringFactory.ofRow(vector, rowId);
}
}

private static class ArrayAccessor<
Expand Down Expand Up @@ -794,6 +811,14 @@ protected interface StringFactory<Utf8StringT> {
/** Create a UTF8 String from the row value in the arrow vector. */
Utf8StringT ofRow(VarCharVector vector, int rowId);

/** Create a UTF8 String from the row value in the FixedSizeBinaryVector vector. */
default Utf8StringT ofRow(FixedSizeBinaryVector vector, int rowId) {
throw new UnsupportedOperationException(
String.format(
"Creating %s from a FixedSizeBinaryVector is not supported",
getGenericClass().getSimpleName()));
}

/** Create a UTF8 String from the byte array. */
Utf8StringT ofBytes(byte[] bytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public ValueWriter<?> primitive(DataType type, Schema primitive) {
return SparkValueWriters.decimal(decimal.getPrecision(), decimal.getScale());

case "uuid":
return ValueWriters.uuids();
return SparkValueWriters.uuids();

default:
throw new IllegalArgumentException("Unsupported logical type: " + logicalType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ public OrcValueReader<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
case STRING:
return SparkOrcValueReaders.utf8String();
case BINARY:
if (Type.TypeID.UUID == iPrimitive.typeId()) {
return SparkOrcValueReaders.uuids();
}
return OrcValueReaders.bytes();
default:
throw new IllegalArgumentException("Unhandled type " + primitive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.iceberg.spark.data;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.orc.OrcValueReader;
import org.apache.iceberg.orc.OrcValueReaders;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
Expand All @@ -49,6 +51,10 @@ public static OrcValueReader<UTF8String> utf8String() {
return StringReader.INSTANCE;
}

public static OrcValueReader<UTF8String> uuids() {
return UUIDReader.INSTANCE;
}

public static OrcValueReader<Long> timestampTzs() {
return TimestampTzReader.INSTANCE;
}
Expand Down Expand Up @@ -170,6 +176,20 @@ public UTF8String nonNullRead(ColumnVector vector, int row) {
}
}

private static class UUIDReader implements OrcValueReader<UTF8String> {
private static final UUIDReader INSTANCE = new UUIDReader();

private UUIDReader() {}

@Override
public UTF8String nonNullRead(ColumnVector vector, int row) {
BytesColumnVector bytesVector = (BytesColumnVector) vector;
ByteBuffer buffer =
ByteBuffer.wrap(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]);
return UTF8String.fromString(UUIDUtil.convert(buffer).toString());
}
}

private static class TimestampTzReader implements OrcValueReader<Long> {
private static final TimestampTzReader INSTANCE = new TimestampTzReader();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.iceberg.spark.data;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.orc.OrcValueWriter;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
Expand All @@ -42,6 +45,10 @@ static OrcValueWriter<?> strings() {
return StringWriter.INSTANCE;
}

static OrcValueWriter<?> uuids() {
return UUIDWriter.INSTANCE;
}

static OrcValueWriter<?> timestampTz() {
return TimestampTzWriter.INSTANCE;
}
Expand Down Expand Up @@ -73,6 +80,16 @@ public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) {
}
}

private static class UUIDWriter implements OrcValueWriter<UTF8String> {
private static final UUIDWriter INSTANCE = new UUIDWriter();

@Override
public void nonNullWrite(int rowId, UTF8String data, ColumnVector output) {
ByteBuffer buffer = UUIDUtil.convertToByteBuffer(UUID.fromString(data.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allocates a buffer. We may want to have a buffer here as a thread-local or a field to avoid allocation in a tight loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that observation and I initially used a Thread local to reduce byte[] allocation but couldn't get it to work because ((BytesColumnVector) output).setRef(..) would just store a reference to the passed byte[] and on subsequent writes we would end up overwriting previous values.
Worth mentioning that GenericOrcWriters does the same thing when writing UUIDs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth mentioning in a comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, I've added a comment to this as part of #7496

((BytesColumnVector) output).setRef(rowId, buffer.array(), 0, buffer.array().length);
}
}

private static class TimestampTzWriter implements OrcValueWriter<Long> {
private static final TimestampTzWriter INSTANCE = new TimestampTzWriter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public OrcValueWriter<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
case DOUBLE:
return GenericOrcWriters.doubles(ORCSchemaUtil.fieldId(primitive));
case BINARY:
if (Type.TypeID.UUID == iPrimitive.typeId()) {
return SparkOrcValueWriters.uuids();
}
return GenericOrcWriters.byteArrays();
case STRING:
case CHAR:
Expand Down Expand Up @@ -173,7 +176,14 @@ static FieldGetter<?> createFieldGetter(TypeDescription fieldType) {
fieldGetter = SpecializedGetters::getDouble;
break;
case BINARY:
fieldGetter = SpecializedGetters::getBinary;
if (ORCSchemaUtil.BinaryType.UUID
.toString()
.equalsIgnoreCase(
fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) {
fieldGetter = SpecializedGetters::getUTF8String;
} else {
fieldGetter = SpecializedGetters::getBinary;
}
// getBinary always makes a copy, so we don't need to worry about it
// being changed behind our back.
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
Expand Down Expand Up @@ -232,6 +233,7 @@ public ParquetValueReader<?> map(
}

@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public ParquetValueReader<?> primitive(
org.apache.iceberg.types.Type.PrimitiveType expected, PrimitiveType primitive) {
ColumnDescriptor desc = type.getColumnDescription(currentPath());
Expand Down Expand Up @@ -282,6 +284,9 @@ public ParquetValueReader<?> primitive(
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
if (expected != null && expected.typeId() == TypeID.UUID) {
return new UUIDReader(desc);
}
return new ParquetValueReaders.ByteArrayReader(desc);
case INT32:
if (expected != null && expected.typeId() == TypeID.LONG) {
Expand Down Expand Up @@ -413,6 +418,18 @@ public UTF8String read(UTF8String ignored) {
}
}

private static class UUIDReader extends PrimitiveReader<UTF8String> {
UUIDReader(ColumnDescriptor desc) {
super(desc);
}

@Override
@SuppressWarnings("ByteBufferBackingArray")
public UTF8String read(UTF8String ignored) {
return UTF8String.fromString(UUIDUtil.convert(column.nextBinary().toByteBuffer()).toString());
}
}

private static class ArrayReader<E> extends RepeatedReader<ArrayData, ReusableArrayData, E> {
private int readPos = 0;
private int writePos = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.iceberg.spark.data;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import org.apache.iceberg.parquet.ParquetValueReaders.ReusableEntry;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
Expand All @@ -32,9 +35,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DecimalUtil;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -176,6 +181,9 @@ public ParquetValueWriter<?> primitive(DataType sType, PrimitiveType primitive)
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
if (LogicalTypeAnnotation.uuidType().equals(primitive.getLogicalTypeAnnotation())) {
return uuids(desc);
}
return byteArrays(desc);
case BOOLEAN:
return ParquetValueWriters.booleans(desc);
Expand Down Expand Up @@ -206,6 +214,10 @@ private static PrimitiveWriter<UTF8String> utf8Strings(ColumnDescriptor desc) {
return new UTF8StringWriter(desc);
}

private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) {
return new UUIDWriter(desc);
}

private static PrimitiveWriter<Decimal> decimalAsInteger(
ColumnDescriptor desc, int precision, int scale) {
return new IntegerDecimalWriter(desc, precision, scale);
Expand Down Expand Up @@ -316,6 +328,27 @@ public void write(int repetitionLevel, Decimal decimal) {
}
}

private static class UUIDWriter extends PrimitiveWriter<UTF8String> {
private static final ThreadLocal<ByteBuffer> BUFFER =
ThreadLocal.withInitial(
() -> {
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.order(ByteOrder.BIG_ENDIAN);
return buffer;
});

private UUIDWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, UTF8String string) {
UUID uuid = UUID.fromString(string.toString());
ByteBuffer buffer = UUIDUtil.convertToByteBuffer(uuid, BUFFER.get());
column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer));
}
}

private static class ByteArrayWriter extends PrimitiveWriter<byte[]> {
private ByteArrayWriter(ColumnDescriptor desc) {
super(desc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.vectorized.ArrowColumnVector;
import org.apache.spark.sql.vectorized.ColumnarArray;
Expand Down Expand Up @@ -74,6 +76,11 @@ public UTF8String ofRow(VarCharVector vector, int rowId) {
null, vector.getDataBuffer().memoryAddress() + start, end - start);
}

@Override
public UTF8String ofRow(FixedSizeBinaryVector vector, int rowId) {
return UTF8String.fromString(UUIDUtil.convert(vector.get(rowId)).toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to get the underlying array and offset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vector.get(rowId) will return the byte[] with a length of 16 for the given rowId. I think we could get underlying array and offset from the underlying ArrowBuf, but we would need to read it into a new byte[], which is what vector.get(rowId) is doing underneath

}

@Override
public UTF8String ofBytes(byte[] bytes) {
return UTF8String.fromBytes(bytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primit
primitiveValueReader = SparkOrcValueReaders.utf8String();
break;
case BINARY:
primitiveValueReader = OrcValueReaders.bytes();
primitiveValueReader =
Type.TypeID.UUID == iPrimitive.typeId()
? SparkOrcValueReaders.uuids()
: OrcValueReaders.bytes();
break;
default:
throw new IllegalArgumentException("Unhandled type " + primitive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public abstract class AvroDataTest {
optional(107, "date", Types.DateType.get()),
required(108, "ts", Types.TimestampType.withZone()),
required(110, "s", Types.StringType.get()),
// required(111, "uuid", Types.UUIDType.get()),
required(111, "uuid", Types.UUIDType.get()),
required(112, "fixed", Types.FixedType.ofLength(7)),
optional(113, "bytes", Types.BinaryType.get()),
required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ public Object primitive(Type.PrimitiveType primitive) {
return UTF8String.fromString((String) obj);
case DECIMAL:
return Decimal.apply((BigDecimal) obj);
case UUID:
return UTF8String.fromString(UUID.nameUUIDFromBytes((byte[]) obj).toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does generatePrimitive provide byte[]? Shouldn't it create a String for Spark already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my guess would be because RandomUtil.generatePrimitive(..) is used in other places where UUIDs are expected to be byte[]

default:
return obj;
}
Expand Down
Loading