diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java index 406a2cba4526..bbe038c6000d 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java @@ -37,9 +37,11 @@ import java.util.Base64; import java.util.Date; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -59,7 +61,9 @@ import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.types.Types.TimestampType; +import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.UUIDUtil; import org.apache.kafka.connect.data.Struct; class RecordConverter { @@ -128,8 +132,9 @@ private Object convertValue( case UUID: return convertUUID(value); case BINARY: - case FIXED: return convertBase64Binary(value); + case FIXED: + return ByteBuffers.toByteArray(convertBase64Binary(value)); case DATE: return convertDateValue(value); case TIME: @@ -388,13 +393,24 @@ protected String convertString(Object value) { throw new IllegalArgumentException("Cannot convert to string: " + value.getClass().getName()); } - protected UUID convertUUID(Object value) { + protected Object convertUUID(Object value) { + UUID uuid; if (value instanceof String) { - return UUID.fromString((String) value); + uuid = UUID.fromString((String) value); } else if (value instanceof UUID) { - return (UUID) value; + uuid = (UUID) value; + } else { + throw new IllegalArgumentException("Cannot convert to UUID: " + value.getClass().getName()); + } + + if (FileFormat.PARQUET + .name() + .toLowerCase(Locale.ROOT) + .equals(config.writeProps().get(TableProperties.DEFAULT_FILE_FORMAT))) { + return UUIDUtil.convert(uuid); + } else { + return uuid; } - throw new IllegalArgumentException("Cannot convert to UUID: " + value.getClass().getName()); } protected ByteBuffer convertBase64Binary(Object value) { diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java index b494a9da85d3..47ee76eade15 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java @@ -37,9 +37,11 @@ import java.util.Collection; import java.util.Date; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.UUID; import java.util.function.Function; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.connect.IcebergSinkConfig; @@ -72,6 +74,7 @@ import org.apache.iceberg.types.Types.TimeType; import org.apache.iceberg.types.Types.TimestampType; import org.apache.iceberg.types.Types.UUIDType; +import org.apache.iceberg.util.UUIDUtil; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -221,6 +224,25 @@ public void testMapConvert() { assertRecordValues(record); } + @Test + public void testUUIDConversionWithParquet() { + Table table = mock(Table.class); + when(table.schema()) + .thenReturn(new org.apache.iceberg.Schema(NestedField.required(1, "uuid", UUIDType.get()))); + when(config.writeProps()) + .thenReturn( + ImmutableMap.of( + TableProperties.DEFAULT_FILE_FORMAT, + FileFormat.PARQUET.name().toLowerCase(Locale.ROOT))); + + RecordConverter converter = new RecordConverter(table, config); + Map data = + ImmutableMap.builder().put("uuid", UUID_VAL.toString()).build(); + + Record record = converter.convert(data); + assertThat(record.getField("uuid")).isEqualTo(UUIDUtil.convert(UUID_VAL)); + } + @Test public void testNestedMapConvert() { Table table = mock(Table.class); @@ -859,7 +881,7 @@ public void testEvolveTypeDetectionStructNested() { assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class); } - private Map createMapData() { + public static Map createMapData() { return ImmutableMap.builder() .put("i", 1) .put("l", 2L) @@ -898,8 +920,8 @@ private Struct createStructData() { .put("s", STR_VAL) .put("b", true) .put("u", UUID_VAL.toString()) - .put("f", BYTES_VAL.array()) - .put("bi", BYTES_VAL.array()) + .put("f", BYTES_VAL) + .put("bi", BYTES_VAL) .put("li", LIST_VAL) .put("ma", MAP_VAL); } @@ -921,11 +943,11 @@ private void assertRecordValues(Record record) { assertThat(rec.getField("dec")).isEqualTo(DEC_VAL); assertThat(rec.getField("s")).isEqualTo(STR_VAL); assertThat(rec.getField("b")).isEqualTo(true); - assertThat(rec.getField("u")).isEqualTo(UUID_VAL); - assertThat(rec.getField("f")).isEqualTo(BYTES_VAL); + assertThat(rec.getField("f")).isEqualTo(BYTES_VAL.array()); assertThat(rec.getField("bi")).isEqualTo(BYTES_VAL); assertThat(rec.getField("li")).isEqualTo(LIST_VAL); assertThat(rec.getField("ma")).isEqualTo(MAP_VAL); + assertThat(rec.getField("u")).isEqualTo(UUID_VAL); } private void assertNestedRecordValues(Record record) {