diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java index ae90117819..0b6e3405c6 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/io/writer/GenericIcebergPartitionedFanoutWriter.java @@ -23,7 +23,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; -import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.IcebergRecordWrapper; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; @@ -37,7 +37,7 @@ public class GenericIcebergPartitionedFanoutWriter extends PartitionedFanoutWriter { private final PartitionKey partitionKey; - private final InternalRecordWrapper wrapper; + private final IcebergRecordWrapper wrapper; public GenericIcebergPartitionedFanoutWriter( Schema schema, @@ -49,7 +49,7 @@ public GenericIcebergPartitionedFanoutWriter( long targetFileSize) { super(spec, format, appenderFactory, fileFactory, io, targetFileSize); this.partitionKey = new PartitionKey(spec, schema); - this.wrapper = new InternalRecordWrapper(schema.asStruct()); + this.wrapper = new IcebergRecordWrapper(schema.asStruct()); } @Override diff --git a/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java b/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java new file mode 100644 index 0000000000..28108bc1b9 --- /dev/null +++ b/amoro-format-iceberg/src/main/java/org/apache/iceberg/data/IcebergRecordWrapper.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.data; + +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; + +import java.lang.reflect.Array; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.util.function.Function; + +/** + * This class is a copy of {@link InternalRecordWrapper} that adds proper handling for UUID types in + * the Iceberg format. + * + *

It serves as a temporary solution for an issue discussed in this pull request. + * + *

Once the related pull request is + * merged, the Iceberg dependency version should be updated accordingly, and this class will be + * removed. + */ +public class IcebergRecordWrapper implements StructLike { + private final Function[] transforms; + private StructLike wrapped = null; + + @SuppressWarnings("unchecked") + public IcebergRecordWrapper(Types.StructType struct) { + this( + struct.fields().stream() + .map(field -> converter(field.type())) + .toArray( + length -> (Function[]) Array.newInstance(Function.class, length))); + } + + private IcebergRecordWrapper(Function[] transforms) { + this.transforms = transforms; + } + + private static Function converter(Type type) { + switch (type.typeId()) { + case DATE: + return date -> DateTimeUtil.daysFromDate((LocalDate) date); + case TIME: + return time -> DateTimeUtil.microsFromTime((LocalTime) time); + case TIMESTAMP: + if (((Types.TimestampType) type).shouldAdjustToUTC()) { + return timestamp -> DateTimeUtil.microsFromTimestamptz((OffsetDateTime) timestamp); + } else { + return timestamp -> DateTimeUtil.microsFromTimestamp((LocalDateTime) timestamp); + } + case FIXED: + return bytes -> ByteBuffer.wrap((byte[]) bytes); + case UUID: + return uuid -> { + if (uuid instanceof byte[]) { + return UUIDUtil.convert((byte[]) uuid); + } else { + return uuid; + } + }; + case STRUCT: + IcebergRecordWrapper wrapper = new IcebergRecordWrapper(type.asStructType()); + return struct -> wrapper.wrap((StructLike) struct); + default: + } + return null; + } + + public StructLike get() { + return wrapped; + } + + public IcebergRecordWrapper copyFor(StructLike record) { + return new IcebergRecordWrapper(transforms).wrap(record); + } + + public IcebergRecordWrapper wrap(StructLike record) { + this.wrapped = record; + return this; + } + + @Override + public int size() { + return wrapped.size(); + } + + @Override + public T get(int pos, Class javaClass) { + if (transforms[pos] != null) { + Object value = wrapped.get(pos, Object.class); + if (value == null) { + // transforms function don't allow to handle null values, so just return null here. + return null; + } else { + return javaClass.cast(transforms[pos].apply(value)); + } + } + return wrapped.get(pos, javaClass); + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException("Cannot update IcebergRecordWrapper"); + } +}