From 06a80b4ed041060dc902df92d2236c9cab5a3f48 Mon Sep 17 00:00:00 2001 From: "Artur.Mukhutdinov" Date: Wed, 24 Sep 2025 16:26:53 +0300 Subject: [PATCH 1/3] fix(iceberg): UUID implement automatic replacement of UUID type columns with the FixedType[16] --- ...GenericIcebergPartitionedFanoutWriter.java | 74 ++++++++++++++++++- 1 file changed, 71 insertions(+), 3 deletions(-) diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java index ae90117819..d902b8865c 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java @@ -23,12 +23,19 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.PartitionedFanoutWriter; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; /** * A {@link PartitionedFanoutWriter} for generic records. This class can write multiple partition @@ -38,6 +45,7 @@ public class GenericIcebergPartitionedFanoutWriter extends PartitionedFanoutWrit private final PartitionKey partitionKey; private final InternalRecordWrapper wrapper; + private final List positionToUpdateKey; public GenericIcebergPartitionedFanoutWriter( Schema schema, @@ -48,14 +56,74 @@ public GenericIcebergPartitionedFanoutWriter( FileIO io, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - this.partitionKey = new PartitionKey(spec, schema); - this.wrapper = new InternalRecordWrapper(schema.asStruct()); + Schema schemaWithoutUUID = getSchemaWithoutUUID(schema); + this.partitionKey = new PartitionKey(spec, schemaWithoutUUID); + this.wrapper = new InternalRecordWrapper(schemaWithoutUUID.asStruct()); + this.positionToUpdateKey = getPositionToUpdateKey(schema); } @Override protected PartitionKey partition(Record row) { - StructLike structLike = wrapper.wrap(row); + StructLike structLike = wrapper.wrap(getGenericRecordWithoutUUID(row)); partitionKey.partition(structLike); return partitionKey; } + + /** + * According to the GitHub Issue + * there is a fix for the UUID types to cast to another internal type + * + * @param row original row with data + * @return GenericRecord as a single class that inherits Row interface + */ + private GenericRecord getGenericRecordWithoutUUID(Record row) { + GenericRecord record = (GenericRecord) row; + positionToUpdateKey.forEach(i -> record.set(i, (byte[]) record.get(i))); + return record; + } + + /** + * Get rid of the UUID type if any + * + * @param schema provided schema + * @return the same schema that is provided but with all the UUID type transformed into the + * FixedType[16] type + */ + private Schema getSchemaWithoutUUID(Schema schema) { + return new Schema( + schema.columns().stream() + .map( + column -> { + if (column.type().equals(Types.UUIDType.get())) { + return Types.NestedField.of( + column.fieldId(), + column.isOptional(), + column.name(), + Types.FixedType.ofLength(16)); + } + + return column; + }) + .collect(Collectors.toList())); + } + + /** + * Receive list of positions where UUID values placed in the schema + * + * @param schema that is provided for the certain table + * @return list of integers which are indexes of UUID columns in the table + */ + private List getPositionToUpdateKey(Schema schema) { + List columns = schema.columns(); + int size = columns.size(); + List positionToUpdateKey = new ArrayList<>(); + + for (int i = 0; i < size; i++) { + Type currentType = columns.get(i).type(); + if (currentType.equals(Types.UUIDType.get())) { + positionToUpdateKey.add(i); + } + } + return positionToUpdateKey; + } } From 7a2874fe15059f03a6e96280a772fb64cefd1290 Mon Sep 17 00:00:00 2001 From: "Artur.Mukhutdinov" Date: Sun, 28 Sep 2025 22:39:31 +0300 Subject: [PATCH 2/3] fix(iceberg): UUID add more elegant way to handle the issue with the Iceberg tables and UUID type --- ...GenericIcebergPartitionedFanoutWriter.java | 78 +---------- .../iceberg/data/IcebergRecordWrapper.java | 129 ++++++++++++++++++ 2 files changed, 134 insertions(+), 73 deletions(-) create mode 100644 amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java index d902b8865c..0b6e3405c6 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java @@ -23,19 +23,12 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.IcebergRecordWrapper; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.PartitionedFanoutWriter; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; /** * A {@link PartitionedFanoutWriter} for generic records. This class can write multiple partition @@ -44,8 +37,7 @@ public class GenericIcebergPartitionedFanoutWriter extends PartitionedFanoutWriter { private final PartitionKey partitionKey; - private final InternalRecordWrapper wrapper; - private final List positionToUpdateKey; + private final IcebergRecordWrapper wrapper; public GenericIcebergPartitionedFanoutWriter( Schema schema, @@ -56,74 +48,14 @@ public GenericIcebergPartitionedFanoutWriter( FileIO io, long targetFileSize) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); - Schema schemaWithoutUUID = getSchemaWithoutUUID(schema); - this.partitionKey = new PartitionKey(spec, schemaWithoutUUID); - this.wrapper = new InternalRecordWrapper(schemaWithoutUUID.asStruct()); - this.positionToUpdateKey = getPositionToUpdateKey(schema); + this.partitionKey = new PartitionKey(spec, schema); + this.wrapper = new IcebergRecordWrapper(schema.asStruct()); } @Override protected PartitionKey partition(Record row) { - StructLike structLike = wrapper.wrap(getGenericRecordWithoutUUID(row)); + StructLike structLike = wrapper.wrap(row); partitionKey.partition(structLike); return partitionKey; } - - /** - * According to the GitHub Issue - * there is a fix for the UUID types to cast to another internal type - * - * @param row original row with data - * @return GenericRecord as a single class that inherits Row interface - */ - private GenericRecord getGenericRecordWithoutUUID(Record row) { - GenericRecord record = (GenericRecord) row; - positionToUpdateKey.forEach(i -> record.set(i, (byte[]) record.get(i))); - return record; - } - - /** - * Get rid of the UUID type if any - * - * @param schema provided schema - * @return the same schema that is provided but with all the UUID type transformed into the - * FixedType[16] type - */ - private Schema getSchemaWithoutUUID(Schema schema) { - return new Schema( - schema.columns().stream() - .map( - column -> { - if (column.type().equals(Types.UUIDType.get())) { - return Types.NestedField.of( - column.fieldId(), - column.isOptional(), - column.name(), - Types.FixedType.ofLength(16)); - } - - return column; - }) - .collect(Collectors.toList())); - } - - /** - * Receive list of positions where UUID values placed in the schema - * - * @param schema that is provided for the certain table - * @return list of integers which are indexes of UUID columns in the table - */ - private List getPositionToUpdateKey(Schema schema) { - List columns = schema.columns(); - int size = columns.size(); - List positionToUpdateKey = new ArrayList<>(); - - for (int i = 0; i < size; i++) { - Type currentType = columns.get(i).type(); - if (currentType.equals(Types.UUIDType.get())) { - positionToUpdateKey.add(i); - } - } - return positionToUpdateKey; - } } diff --git a/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java b/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java new file mode 100644 index 0000000000..b651a681e5 --- /dev/null +++ b/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.data; + +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; + +import java.lang.reflect.Array; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.util.function.Function; + +/** + * This class is a copy of {@link InternalRecordWrapper} that adds proper handling for UUID types in + * the Iceberg format. + * + *

It serves as a temporary solution for an issue discussed in this pull request. + * + *

Once the related pull request is + * merged, the Iceberg dependency version should be updated accordingly, and this class will be + * removed. + */ +public class IcebergRecordWrapper implements StructLike { + private final Function[] transforms; + private StructLike wrapped = null; + + @SuppressWarnings("unchecked") + public IcebergRecordWrapper(Types.StructType struct) { + this( + struct.fields().stream() + .map(field -> converter(field.type())) + .toArray( + length -> (Function[]) Array.newInstance(Function.class, length))); + } + + private IcebergRecordWrapper(Function[] transforms) { + this.transforms = transforms; + } + + private static Function converter(Type type) { + switch (type.typeId()) { + case DATE: + return date -> DateTimeUtil.daysFromDate((LocalDate) date); + case TIME: + return time -> DateTimeUtil.microsFromTime((LocalTime) time); + case TIMESTAMP: + if (((Types.TimestampType) type).shouldAdjustToUTC()) { + return timestamp -> DateTimeUtil.microsFromTimestamptz((OffsetDateTime) timestamp); + } else { + return timestamp -> DateTimeUtil.microsFromTimestamp((LocalDateTime) timestamp); + } + case FIXED: + return bytes -> ByteBuffer.wrap((byte[]) bytes); + case UUID: + return uuid -> { + if (uuid instanceof byte[]) { + return UUIDUtil.convert((byte[]) uuid); + } else { + return uuid; + } + }; + case STRUCT: + InternalRecordWrapper wrapper = new InternalRecordWrapper(type.asStructType()); + return struct -> wrapper.wrap((StructLike) struct); + default: + } + return null; + } + + public StructLike get() { + return wrapped; + } + + public IcebergRecordWrapper copyFor(StructLike record) { + return new IcebergRecordWrapper(transforms).wrap(record); + } + + public IcebergRecordWrapper wrap(StructLike record) { + this.wrapped = record; + return this; + } + + @Override + public int size() { + return wrapped.size(); + } + + @Override + public T get(int pos, Class javaClass) { + if (transforms[pos] != null) { + Object value = wrapped.get(pos, Object.class); + if (value == null) { + // transforms function don't allow to handle null values, so just return null here. + return null; + } else { + return javaClass.cast(transforms[pos].apply(value)); + } + } + return wrapped.get(pos, javaClass); + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot update IcebergRecordWrapper"); + } +} From 39052f25f62fd4433d6b38e2f07d5213dd978eb1 Mon Sep 17 00:00:00 2001 From: "Artur.Mukhutdinov" Date: Sun, 28 Sep 2025 22:49:35 +0300 Subject: [PATCH 3/3] fix(iceberg): UUID add a little change to `converter` method to always return an instance of `IcebergRecordWrapper` class --- .../main/java/org/apache/iceberg/data/IcebergRecordWrapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java b/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java index b651a681e5..28108bc1b9 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java +++ b/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java @@ -83,7 +83,7 @@ private static Function converter(Type type) { } }; case STRUCT: - InternalRecordWrapper wrapper = new InternalRecordWrapper(type.asStructType()); + IcebergRecordWrapper wrapper = new IcebergRecordWrapper(type.asStructType()); return struct -> wrapper.wrap((StructLike) struct); default: }