diff --git a/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java index 871000279848..312c7afe9275 100644 --- a/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java +++ b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java @@ -29,6 +29,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; public class InternalRecordWrapper implements StructLike { private final Function[] transforms; @@ -61,6 +62,14 @@ private static Function converter(Type type) { } case FIXED: return bytes -> ByteBuffer.wrap((byte[]) bytes); + case UUID: + return uuid -> { + if (uuid instanceof byte[]) { + return UUIDUtil.convert((byte[]) uuid); + } else { + return uuid; + } + }; case STRUCT: InternalRecordWrapper wrapper = new InternalRecordWrapper(type.asStructType()); return struct -> wrapper.wrap((StructLike) struct); diff --git a/data/src/test/java/org/apache/iceberg/data/TestInternalRecordWrapper.java b/data/src/test/java/org/apache/iceberg/data/TestInternalRecordWrapper.java new file mode 100644 index 000000000000..da4bdef7641d --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/TestInternalRecordWrapper.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Random; +import java.util.UUID; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; +import org.junit.jupiter.api.Test; + +public class TestInternalRecordWrapper { + + @Test + public void testDateConversion() { + Schema schema = new Schema(required(1, "date_field", Types.DateType.get())); + + LocalDate testDate = LocalDate.of(2025, 10, 1); + Record record = GenericRecord.create(schema); + record.set(0, testDate); + + InternalRecordWrapper wrapper = new InternalRecordWrapper(schema.asStruct()); + StructLike wrappedRecord = wrapper.wrap(record); + + Integer convertedValue = wrappedRecord.get(0, Integer.class); + Integer expectedValue = DateTimeUtil.daysFromDate(testDate); + + assertThat(convertedValue).isEqualTo(expectedValue); + } + + @Test + public void testTimeConversion() { + Schema schema = new Schema(required(1, "time_field", Types.TimeType.get())); + + LocalTime testTime = LocalTime.of(14, 30, 45, 123456000); + Record record = GenericRecord.create(schema); + record.set(0, testTime); + + InternalRecordWrapper wrapper = new InternalRecordWrapper(schema.asStruct()); + StructLike wrappedRecord = wrapper.wrap(record); + + Long convertedValue = wrappedRecord.get(0, Long.class); + Long expectedValue = DateTimeUtil.microsFromTime(testTime); + + assertThat(convertedValue).isEqualTo(expectedValue); + } + + @Test + public void testTimestampWithZoneConversion() { + Schema schema = new Schema(required(1, "timestamp_tz_field", Types.TimestampType.withZone())); + + OffsetDateTime testTimestamp = + OffsetDateTime.of(2025, 10, 1, 14, 30, 45, 123456000, ZoneOffset.UTC); + Record record = GenericRecord.create(schema); + record.set(0, testTimestamp); + + InternalRecordWrapper wrapper = new InternalRecordWrapper(schema.asStruct()); + StructLike wrappedRecord = wrapper.wrap(record); + + Long convertedValue = wrappedRecord.get(0, Long.class); + Long expectedValue = DateTimeUtil.microsFromTimestamptz(testTimestamp); + + assertThat(convertedValue).isEqualTo(expectedValue); + } + + @Test + public void testTimestampWithoutZoneConversion() { + Schema schema = new Schema(required(1, "timestamp_field", Types.TimestampType.withoutZone())); + + LocalDateTime testTimestamp = LocalDateTime.of(2025, 10, 1, 14, 30, 45, 123456000); + Record record = GenericRecord.create(schema); + record.set(0, testTimestamp); + + InternalRecordWrapper wrapper = new InternalRecordWrapper(schema.asStruct()); + StructLike wrappedRecord = wrapper.wrap(record); + + Long convertedValue = wrappedRecord.get(0, Long.class); + Long expectedValue = DateTimeUtil.microsFromTimestamp(testTimestamp); + + assertThat(convertedValue).isEqualTo(expectedValue); + } + + @Test + public void testFixedConversion() { + Schema schema = new Schema(required(1, "fixed_field", Types.FixedType.ofLength(16))); + + byte[] testBytes = "test fixed bytes".getBytes(); + Record record = GenericRecord.create(schema); + record.set(0, testBytes); + + InternalRecordWrapper wrapper = new InternalRecordWrapper(schema.asStruct()); + StructLike wrappedRecord = wrapper.wrap(record); + + ByteBuffer convertedValue = wrappedRecord.get(0, ByteBuffer.class); + ByteBuffer expectedValue = ByteBuffer.wrap(testBytes); + + assertThat(convertedValue).isEqualTo(expectedValue); + } + + @Test + public void testUuidConversion() { + Schema schema = new Schema(required(1, "uuid_field", Types.UUIDType.get())); + + byte[] uuidBytes = new byte[16]; + Random random = new Random(); + random.nextBytes(uuidBytes); + Record record = GenericRecord.create(schema); + record.set(0, uuidBytes); + + InternalRecordWrapper wrapper = new InternalRecordWrapper(schema.asStruct()); + StructLike wrappedRecord = wrapper.wrap(record); + + UUID convertedValue = wrappedRecord.get(0, UUID.class); + UUID expectedValue = UUIDUtil.convert(uuidBytes); + + assertThat(convertedValue).isEqualTo(expectedValue); + } +} diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestPartitionedAppendWriter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestPartitionedAppendWriter.java index b6a899fee16b..3fd7937974d5 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestPartitionedAppendWriter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestPartitionedAppendWriter.java @@ -18,12 +18,17 @@ */ package org.apache.iceberg.connect.data; +import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.List; +import java.util.UUID; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.connect.TableSinkConfig; import org.apache.iceberg.data.GenericRecord; @@ -31,6 +36,8 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -63,4 +70,34 @@ public void testPartitionedAppendWriter(String format) { assertThat(result.dataFiles()).allMatch(file -> file.format() == FileFormat.fromString(format)); assertThat(result.deleteFiles()).hasSize(0); } + + @ParameterizedTest + @ValueSource(strings = {"parquet", "orc"}) + public void testWriteUuidWithFountWriter(String format) { + Schema schema = + new Schema( + required(1, "id", Types.IntegerType.get()), + required(2, "uuid_field", Types.UUIDType.get())); + PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uuid_field", 2).build(); + when(table.schema()).thenReturn(schema); + when(table.spec()).thenReturn(spec); + + List rows = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + UUID uuid = UUID.randomUUID(); + Record record = GenericRecord.create(schema); + record.set(0, i); + record.set(1, uuid); + rows.add(record); + } + + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.writeProps()).thenReturn(ImmutableMap.of("write.format.default", format)); + WriteResult result = writeTest(rows, config, PartitionedAppendWriter.class); + + assertThat(result.dataFiles()).hasSizeGreaterThan(1); + assertThat(result.dataFiles()).allMatch(file -> file.format() == FileFormat.fromString(format)); + assertThat(result.deleteFiles()).hasSize(0); + } }