From e071d8ec128d8e3a385c2a86ad80d2bb0dac11af Mon Sep 17 00:00:00 2001 From: openinx Date: Thu, 13 Aug 2020 11:06:43 +0800 Subject: [PATCH 1/2] Refactor to use a common UUIDUtil to convert between bytes and UUID --- .../org/apache/iceberg/types/Conversions.java | 13 ++-- .../org/apache/iceberg/util/UUIDUtil.java | 67 +++++++++++++++++++ .../apache/iceberg/avro/UUIDConversion.java | 15 +---- .../org/apache/iceberg/avro/ValueReaders.java | 5 +- .../iceberg/data/orc/GenericOrcReaders.java | 9 ++- .../apache/iceberg/flink/RowDataWrapper.java | 11 +-- .../parquet/ParquetAvroValueReaders.java | 10 +-- .../iceberg/spark/data/SparkValueReaders.java | 6 +- 8 files changed, 88 insertions(+), 48 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/util/UUIDUtil.java diff --git a/api/src/main/java/org/apache/iceberg/types/Conversions.java b/api/src/main/java/org/apache/iceberg/types/Conversions.java index d0a2967c5951..9ba17808c7c6 100644 --- a/api/src/main/java/org/apache/iceberg/types/Conversions.java +++ b/api/src/main/java/org/apache/iceberg/types/Conversions.java @@ -32,10 +32,12 @@ import java.util.UUID; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.util.UUIDUtil; public class Conversions { - private Conversions() {} + private Conversions() { + } private static final String HIVE_NULL = "__HIVE_DEFAULT_PARTITION__"; @@ -111,10 +113,7 @@ public static ByteBuffer toByteBuffer(Type.TypeID typeId, Object value) { throw new RuntimeIOException(e, "Failed to encode value as UTF-8: " + value); } case UUID: - UUID uuid = (UUID) value; - return ByteBuffer.allocate(16).order(ByteOrder.BIG_ENDIAN) - .putLong(0, uuid.getMostSignificantBits()) - .putLong(8, uuid.getLeastSignificantBits()); + return UUIDUtil.convertToByteBuffer((UUID) value); case FIXED: case BINARY: return (ByteBuffer) value; @@ -170,9 +169,7 @@ private static Object internalFromByteBuffer(Type type, ByteBuffer buffer) { throw new RuntimeIOException(e, "Failed to decode value as UTF-8: " + buffer); } case UUID: - long mostSigBits = tmp.getLong(); - long leastSigBits = tmp.getLong(); - return new UUID(mostSigBits, leastSigBits); + return UUIDUtil.convert(tmp); case FIXED: case BINARY: return tmp; diff --git a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java new file mode 100644 index 000000000000..7b9482bb5842 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.util; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.UUID; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class UUIDUtil { + private UUIDUtil() { + } + + public static UUID convert(byte[] buf) { + Preconditions.checkArgument(buf.length == 16, "UUID require 16 bytes"); + ByteBuffer bb = ByteBuffer.wrap(buf); + bb.order(ByteOrder.BIG_ENDIAN); + return convert(bb); + } + + public static UUID convert(byte[] buf, int offset) { + Preconditions.checkArgument(offset >= 0 && offset < buf.length, + "Offset overflow, offset=%d, length=%d", offset, buf.length); + Preconditions.checkArgument(offset + 16 <= buf.length, + "UUID require 16 bytes, offset=%d, length=%d", offset, buf.length); + + ByteBuffer bb = ByteBuffer.wrap(buf, offset, 16); + bb.order(ByteOrder.BIG_ENDIAN); + return convert(bb); + } + + public static UUID convert(ByteBuffer buf) { + long mostSigBits = buf.getLong(); + long leastSigBits = buf.getLong(); + + return new UUID(mostSigBits, leastSigBits); + } + + public static byte[] convert(UUID value) { + return convertToByteBuffer(value).array(); + } + + public static ByteBuffer convertToByteBuffer(UUID value) { + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.order(ByteOrder.BIG_ENDIAN); + buffer.putLong(value.getMostSignificantBits()); + buffer.putLong(value.getLeastSignificantBits()); + return buffer; + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/UUIDConversion.java b/core/src/main/java/org/apache/iceberg/avro/UUIDConversion.java index b772effd1508..a88cc213494b 100644 --- a/core/src/main/java/org/apache/iceberg/avro/UUIDConversion.java +++ b/core/src/main/java/org/apache/iceberg/avro/UUIDConversion.java @@ -19,8 +19,6 @@ package org.apache.iceberg.avro; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.util.UUID; import org.apache.avro.Conversion; import org.apache.avro.LogicalType; @@ -28,6 +26,7 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericFixed; +import org.apache.iceberg.util.UUIDUtil; public class UUIDConversion extends Conversion { @Override @@ -42,19 +41,11 @@ public String getLogicalTypeName() { @Override public UUID fromFixed(GenericFixed value, Schema schema, LogicalType type) { - ByteBuffer buffer = ByteBuffer.wrap(value.bytes()); - buffer.order(ByteOrder.BIG_ENDIAN); - long mostSigBits = buffer.getLong(); - long leastSigBits = buffer.getLong(); - return new UUID(mostSigBits, leastSigBits); + return UUIDUtil.convert(value.bytes()); } @Override public GenericFixed toFixed(UUID value, Schema schema, LogicalType type) { - ByteBuffer buffer = ByteBuffer.allocate(16); - buffer.order(ByteOrder.BIG_ENDIAN); - buffer.putLong(value.getMostSignificantBits()); - buffer.putLong(value.getLeastSignificantBits()); - return new GenericData.Fixed(schema, buffer.array()); + return new GenericData.Fixed(schema, UUIDUtil.convert(value)); } } diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index 0eb3fe2791e0..476e21ecd6a1 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -43,6 +43,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.UUIDUtil; import static java.util.Collections.emptyIterator; @@ -276,10 +277,8 @@ public UUID read(Decoder decoder, Object ignored) throws IOException { buffer.rewind(); decoder.readFixed(buffer.array(), 0, 16); - long mostSigBits = buffer.getLong(); - long leastSigBits = buffer.getLong(); - return new UUID(mostSigBits, leastSigBits); + return UUIDUtil.convert(buffer); } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java index 018a5052fcdb..44ea15e3ba35 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java @@ -39,6 +39,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; @@ -147,7 +148,7 @@ private TimestampReader() { public LocalDateTime nonNullRead(ColumnVector vector, int row) { TimestampColumnVector tcv = (TimestampColumnVector) vector; return Instant.ofEpochSecond(Math.floorDiv(tcv.time[row], 1_000), tcv.nanos[row]).atOffset(ZoneOffset.UTC) - .toLocalDateTime(); + .toLocalDateTime(); } } @@ -174,7 +175,7 @@ private StringReader() { public String nonNullRead(ColumnVector vector, int row) { BytesColumnVector bytesVector = (BytesColumnVector) vector; return new String(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row], - StandardCharsets.UTF_8); + StandardCharsets.UTF_8); } } @@ -188,9 +189,7 @@ private UUIDReader() { public UUID nonNullRead(ColumnVector vector, int row) { BytesColumnVector bytesVector = (BytesColumnVector) vector; ByteBuffer buf = ByteBuffer.wrap(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]); - long mostSigBits = buf.getLong(); - long leastSigBits = buf.getLong(); - return new UUID(mostSigBits, leastSigBits); + return UUIDUtil.convert(buf); } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java index 28de63408e8f..cf0c09c4dcd4 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java +++ b/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java @@ -22,7 +22,6 @@ import java.lang.reflect.Array; import java.nio.ByteBuffer; import java.time.LocalDateTime; -import java.util.UUID; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.DecimalType; @@ -34,6 +33,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; class RowDataWrapper implements StructLike { @@ -96,13 +96,8 @@ private static PositionalGetter buildGetter(LogicalType logicalType, Type typ case BINARY: case VARBINARY: - if (Type.TypeID.UUID.equals(type.typeId())) { - return (row, pos) -> { - ByteBuffer bb = ByteBuffer.wrap(row.getBinary(pos)); - long mostSigBits = bb.getLong(); - long leastSigBits = bb.getLong(); - return new UUID(mostSigBits, leastSigBits); - }; + if (Type.TypeID.UUID == type.typeId()) { + return (row, pos) -> UUIDUtil.convert(row.getBinary(pos)); } else { return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos)); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java index cb25f2caa6fb..fe2c8467bd40 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvroValueReaders.java @@ -22,7 +22,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.util.List; import java.util.Map; import java.util.UUID; @@ -45,6 +44,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type.TypeID; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.DecimalMetadata; import org.apache.parquet.schema.GroupType; @@ -291,13 +291,7 @@ static class UUIDReader extends ParquetValueReaders.PrimitiveReader { @Override public UUID read(UUID ignored) { - ByteBuffer buffer = column.nextBinary().toByteBuffer(); - buffer.order(ByteOrder.BIG_ENDIAN); - - long mostSigBits = buffer.getLong(); - long leastSigBits = buffer.getLong(); - - return new UUID(mostSigBits, leastSigBits); + return UUIDUtil.convert(column.nextBinary().toByteBuffer()); } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java index eaeb85f3c344..278937ecc269 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java @@ -27,13 +27,13 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; -import java.util.UUID; import org.apache.avro.io.Decoder; import org.apache.avro.util.Utf8; import org.apache.iceberg.avro.ValueReader; import org.apache.iceberg.avro.ValueReaders; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.UUIDUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; @@ -138,10 +138,8 @@ public UTF8String read(Decoder decoder, Object reuse) throws IOException { buffer.rewind(); decoder.readFixed(buffer.array(), 0, 16); - long mostSigBits = buffer.getLong(); - long leastSigBits = buffer.getLong(); - return UTF8String.fromString(new UUID(mostSigBits, leastSigBits).toString()); + return UTF8String.fromString(UUIDUtil.convert(buffer).toString()); } } From 3fb27dc73cdf724aca6ac8bf4ade5409fc7d7bb1 Mon Sep 17 00:00:00 2001 From: openinx Date: Thu, 13 Aug 2020 11:38:30 +0800 Subject: [PATCH 2/2] Fix the broken unit tests. --- api/src/main/java/org/apache/iceberg/types/Conversions.java | 3 +-- api/src/main/java/org/apache/iceberg/util/UUIDUtil.java | 4 ++-- .../java/org/apache/iceberg/data/orc/GenericOrcReaders.java | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/Conversions.java b/api/src/main/java/org/apache/iceberg/types/Conversions.java index 9ba17808c7c6..8ddd5c801ce4 100644 --- a/api/src/main/java/org/apache/iceberg/types/Conversions.java +++ b/api/src/main/java/org/apache/iceberg/types/Conversions.java @@ -36,8 +36,7 @@ public class Conversions { - private Conversions() { - } + private Conversions() {} private static final String HIVE_NULL = "__HIVE_DEFAULT_PARTITION__"; diff --git a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java index 7b9482bb5842..c4ae7a5a300f 100644 --- a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java +++ b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java @@ -60,8 +60,8 @@ public static byte[] convert(UUID value) { public static ByteBuffer convertToByteBuffer(UUID value) { ByteBuffer buffer = ByteBuffer.allocate(16); buffer.order(ByteOrder.BIG_ENDIAN); - buffer.putLong(value.getMostSignificantBits()); - buffer.putLong(value.getLeastSignificantBits()); + buffer.putLong(0, value.getMostSignificantBits()); + buffer.putLong(8, value.getLeastSignificantBits()); return buffer; } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java index 44ea15e3ba35..1ad87f5a25ff 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReaders.java @@ -148,7 +148,7 @@ private TimestampReader() { public LocalDateTime nonNullRead(ColumnVector vector, int row) { TimestampColumnVector tcv = (TimestampColumnVector) vector; return Instant.ofEpochSecond(Math.floorDiv(tcv.time[row], 1_000), tcv.nanos[row]).atOffset(ZoneOffset.UTC) - .toLocalDateTime(); + .toLocalDateTime(); } } @@ -175,7 +175,7 @@ private StringReader() { public String nonNullRead(ColumnVector vector, int row) { BytesColumnVector bytesVector = (BytesColumnVector) vector; return new String(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row], - StandardCharsets.UTF_8); + StandardCharsets.UTF_8); } }