diff --git a/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java index d3ffcf940995..136c5b9bdf0c 100644 --- a/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java +++ b/data/src/main/java/org/apache/iceberg/data/InternalRecordWrapper.java @@ -31,12 +31,12 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; -class InternalRecordWrapper implements StructLike { +public class InternalRecordWrapper implements StructLike { private final Function[] transforms; private StructLike wrapped = null; @SuppressWarnings("unchecked") - InternalRecordWrapper(Types.StructType struct) { + public InternalRecordWrapper(Types.StructType struct) { this.transforms = struct.fields().stream() .map(field -> converter(field.type())) .toArray(length -> (Function[]) Array.newInstance(Function.class, length)); diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java new file mode 100644 index 000000000000..d2316da09368 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.lang.reflect.Array; +import java.nio.ByteBuffer; +import java.time.LocalDateTime; +import java.util.UUID; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; + +class RowDataWrapper implements StructLike { + + private final LogicalType[] types; + private final PositionalGetter[] getters; + private RowData rowData = null; + + RowDataWrapper(RowType rowType, Types.StructType struct) { + int size = rowType.getFieldCount(); + + types = (LogicalType[]) Array.newInstance(LogicalType.class, size); + getters = (PositionalGetter[]) Array.newInstance(PositionalGetter.class, size); + + for (int i = 0; i < size; i++) { + types[i] = rowType.getTypeAt(i); + getters[i] = buildGetter(types[i], struct.fields().get(i).type()); + } + } + + RowDataWrapper wrap(RowData data) { + this.rowData = data; + return this; + } + + @Override + public int size() { + return types.length; + } + + @Override + public T get(int pos, Class javaClass) { + if (rowData.isNullAt(pos)) { + return null; + } else if (getters[pos] != null) { + return javaClass.cast(getters[pos].get(rowData, pos)); + } + + Object value = RowData.createFieldGetter(types[pos], pos).getFieldOrNull(rowData); + return javaClass.cast(value); + } + + @Override + public void set(int pos, T value) { + throw new UnsupportedOperationException("Could not set a field in the RowDataWrapper because rowData is read-only"); + } + + private interface PositionalGetter { + T get(RowData data, int pos); + } + + private static PositionalGetter buildGetter(LogicalType logicalType, Type type) { + switch (type.typeId()) { + case STRING: + return (row, pos) -> row.getString(pos).toString(); + + case FIXED: + case BINARY: + return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos)); + + case UUID: + return (row, pos) -> { + ByteBuffer bb = ByteBuffer.wrap(row.getBinary(pos)); + long mostSigBits = bb.getLong(); + long leastSigBits = bb.getLong(); + return new UUID(mostSigBits, leastSigBits); + }; + + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + return (row, pos) -> row.getDecimal(pos, decimalType.getPrecision(), decimalType.getScale()).toBigDecimal(); + + case TIME: + // Time in RowData is in milliseconds (Integer), while iceberg's time is microseconds (Long). + return (row, pos) -> ((long) row.getInt(pos)) * 1_000; + + case TIMESTAMP: + switch (logicalType.getTypeRoot()) { + case TIMESTAMP_WITHOUT_TIME_ZONE: + TimestampType timestampType = (TimestampType) logicalType; + return (row, pos) -> { + LocalDateTime localDateTime = row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime(); + return DateTimeUtil.microsFromTimestamp(localDateTime); + }; + + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType lzTs = (LocalZonedTimestampType) logicalType; + return (row, pos) -> { + TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision()); + return timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000; + }; + + default: + throw new IllegalArgumentException("Unhandled iceberg type: " + type + " corresponding flink type: " + + logicalType); + } + + case STRUCT: + RowType rowType = (RowType) logicalType; + Types.StructType structType = (Types.StructType) type; + + RowDataWrapper nestedWrapper = new RowDataWrapper(rowType, structType); + return (row, pos) -> nestedWrapper.wrap(row.getRow(pos, rowType.getFieldCount())); + + default: + return null; + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java b/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java new file mode 100644 index 000000000000..b5a495479a8f --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/RowDataConverter.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +class RowDataConverter { + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + + private RowDataConverter() { + } + + static RowData convert(Schema iSchema, Record record) { + return convert(iSchema.asStruct(), record); + } + + private static RowData convert(Types.StructType struct, Record record) { + GenericRowData rowData = new GenericRowData(struct.fields().size()); + List fields = struct.fields(); + for (int i = 0; i < fields.size(); i += 1) { + Types.NestedField field = fields.get(i); + + Type fieldType = field.type(); + + switch (fieldType.typeId()) { + case STRUCT: + rowData.setField(i, convert(fieldType.asStructType(), record.get(i))); + break; + case LIST: + rowData.setField(i, convert(fieldType.asListType(), record.get(i))); + break; + case MAP: + rowData.setField(i, convert(fieldType.asMapType(), record.get(i))); + break; + default: + rowData.setField(i, convert(fieldType, record.get(i))); + } + } + return rowData; + } + + private static Object convert(Type type, Object object) { + if (object == null) { + return null; + } + + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case LONG: + case FLOAT: + case DOUBLE: + case FIXED: + return object; + case DATE: + return (int) ChronoUnit.DAYS.between(EPOCH_DAY, (LocalDate) object); + case TIME: + // Iceberg's time is in microseconds, while flink's time is in milliseconds. + LocalTime localTime = (LocalTime) object; + return (int) TimeUnit.NANOSECONDS.toMillis(localTime.toNanoOfDay()); + case TIMESTAMP: + if (((Types.TimestampType) type).shouldAdjustToUTC()) { + return TimestampData.fromInstant(((OffsetDateTime) object).toInstant()); + } else { + return TimestampData.fromLocalDateTime((LocalDateTime) object); + } + case STRING: + return StringData.fromString((String) object); + case UUID: + UUID uuid = (UUID) object; + ByteBuffer bb = ByteBuffer.allocate(16); + bb.putLong(uuid.getMostSignificantBits()); + bb.putLong(uuid.getLeastSignificantBits()); + return bb.array(); + case BINARY: + ByteBuffer buffer = (ByteBuffer) object; + return Arrays.copyOfRange(buffer.array(), buffer.arrayOffset() + buffer.position(), + buffer.arrayOffset() + buffer.remaining()); + case DECIMAL: + Types.DecimalType decimalType = (Types.DecimalType) type; + return DecimalData.fromBigDecimal((BigDecimal) object, decimalType.precision(), decimalType.scale()); + case STRUCT: + return convert(type.asStructType(), (Record) object); + case LIST: + List convertedList = Lists.newArrayList(); + List list = (List) object; + for (Object element : list) { + convertedList.add(convert(type.asListType().elementType(), element)); + } + return convertedList; + case MAP: + Map convertedMap = Maps.newLinkedHashMap(); + Map map = (Map) object; + for (Map.Entry entry : map.entrySet()) { + convertedMap.put( + convert(type.asMapType().keyType(), entry.getKey()), + convert(type.asMapType().valueType(), entry.getValue()) + ); + } + return convertedMap; + default: + throw new UnsupportedOperationException("Not a supported type: " + type); + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java b/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java new file mode 100644 index 000000000000..b27a4b4adb1c --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Test; + +public class TestRowDataPartitionKey { + private static final Schema SCHEMA = new Schema( + Types.NestedField.required(0, "boolType", Types.BooleanType.get()), + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "longType", Types.LongType.get()), + Types.NestedField.required(3, "dateType", Types.DateType.get()), + Types.NestedField.required(4, "timeType", Types.TimeType.get()), + Types.NestedField.required(5, "stringType", Types.StringType.get()), + Types.NestedField.required(6, "timestampWithoutZone", Types.TimestampType.withoutZone()), + Types.NestedField.required(7, "timestampWithZone", Types.TimestampType.withZone()), + Types.NestedField.required(8, "fixedType", Types.FixedType.ofLength(5)), + Types.NestedField.required(9, "uuidType", Types.UUIDType.get()), + Types.NestedField.required(10, "binaryType", Types.BinaryType.get()), + Types.NestedField.required(11, "decimalType1", Types.DecimalType.of(18, 3)), + Types.NestedField.required(12, "decimalType2", Types.DecimalType.of(10, 5)), + Types.NestedField.required(13, "decimalType3", Types.DecimalType.of(38, 19)), + Types.NestedField.required(14, "floatType", Types.FloatType.get()), + Types.NestedField.required(15, "doubleType", Types.DoubleType.get()) + ); + + private static final List SUPPORTED_PRIMITIVES = SCHEMA.asStruct().fields().stream() + .map(Types.NestedField::name).collect(Collectors.toList()); + + private static final Schema NESTED_SCHEMA = new Schema( + Types.NestedField.required(1, "structType", Types.StructType.of( + Types.NestedField.optional(2, "innerStringType", Types.StringType.get()), + Types.NestedField.optional(3, "innerIntegerType", Types.IntegerType.get()) + )) + ); + + @Test + public void testNullPartitionValue() { + Schema schema = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get()) + ); + + PartitionSpec spec = PartitionSpec.builderFor(schema) + .identity("data") + .build(); + + List rows = Lists.newArrayList( + GenericRowData.of(1, StringData.fromString("a")), + GenericRowData.of(2, StringData.fromString("b")), + GenericRowData.of(3, null) + ); + + RowDataWrapper rowWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct()); + + for (RowData row : rows) { + PartitionKey partitionKey = new PartitionKey(spec, schema); + partitionKey.partition(rowWrapper.wrap(row)); + Assert.assertEquals(partitionKey.size(), 1); + + String expectedStr = row.isNullAt(1) ? null : row.getString(1).toString(); + Assert.assertEquals(expectedStr, partitionKey.get(0, String.class)); + } + } + + @Test + public void testPartitionWithOneNestedField() { + RowDataWrapper rowWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(NESTED_SCHEMA), NESTED_SCHEMA.asStruct()); + List records = RandomGenericData.generate(NESTED_SCHEMA, 10, 1991); + List rows = records.stream().map(record -> RowDataConverter.convert(NESTED_SCHEMA, record)) + .collect(Collectors.toList()); + + PartitionSpec spec1 = PartitionSpec.builderFor(NESTED_SCHEMA) + .identity("structType.innerStringType") + .build(); + PartitionSpec spec2 = PartitionSpec.builderFor(NESTED_SCHEMA) + .identity("structType.innerIntegerType") + .build(); + + for (int i = 0; i < rows.size(); i++) { + RowData row = rows.get(i); + Record record = (Record) records.get(i).get(0); + + PartitionKey partitionKey1 = new PartitionKey(spec1, NESTED_SCHEMA); + partitionKey1.partition(rowWrapper.wrap(row)); + Assert.assertEquals(partitionKey1.size(), 1); + + Assert.assertEquals(record.get(0), partitionKey1.get(0, String.class)); + + PartitionKey partitionKey2 = new PartitionKey(spec2, NESTED_SCHEMA); + partitionKey2.partition(rowWrapper.wrap(row)); + Assert.assertEquals(partitionKey2.size(), 1); + + Assert.assertEquals(record.get(1), partitionKey2.get(0, Integer.class)); + } + } + + @Test + public void testPartitionMultipleNestedField() { + RowDataWrapper rowWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(NESTED_SCHEMA), NESTED_SCHEMA.asStruct()); + List records = RandomGenericData.generate(NESTED_SCHEMA, 10, 1992); + List rows = records.stream().map(record -> RowDataConverter.convert(NESTED_SCHEMA, record)) + .collect(Collectors.toList()); + + PartitionSpec spec1 = PartitionSpec.builderFor(NESTED_SCHEMA) + .identity("structType.innerIntegerType") + .identity("structType.innerStringType") + .build(); + PartitionSpec spec2 = PartitionSpec.builderFor(NESTED_SCHEMA) + .identity("structType.innerStringType") + .identity("structType.innerIntegerType") + .build(); + + PartitionKey pk1 = new PartitionKey(spec1, NESTED_SCHEMA); + PartitionKey pk2 = new PartitionKey(spec2, NESTED_SCHEMA); + + for (int i = 0; i < rows.size(); i++) { + RowData row = rows.get(i); + Record record = (Record) records.get(i).get(0); + + pk1.partition(rowWrapper.wrap(row)); + Assert.assertEquals(2, pk1.size()); + + Assert.assertEquals(record.get(1), pk1.get(0, Integer.class)); + Assert.assertEquals(record.get(0), pk1.get(1, String.class)); + + pk2.partition(rowWrapper.wrap(row)); + Assert.assertEquals(2, pk2.size()); + + Assert.assertEquals(record.get(0), pk2.get(0, String.class)); + Assert.assertEquals(record.get(1), pk2.get(1, Integer.class)); + } + } + + @Test + public void testPartitionValueTypes() { + RowType rowType = FlinkSchemaUtil.convert(SCHEMA); + RowDataWrapper rowWrapper = new RowDataWrapper(rowType, SCHEMA.asStruct()); + InternalRecordWrapper recordWrapper = new InternalRecordWrapper(SCHEMA.asStruct()); + + List records = RandomGenericData.generate(SCHEMA, 10, 1993); + List rows = records.stream().map(record -> RowDataConverter.convert(SCHEMA, record)) + .collect(Collectors.toList()); + + for (String column : SUPPORTED_PRIMITIVES) { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity(column).build(); + Class[] javaClasses = spec.javaClasses(); + + PartitionKey pk = new PartitionKey(spec, SCHEMA); + PartitionKey expectedPK = new PartitionKey(spec, SCHEMA); + + for (int j = 0; j < rows.size(); j++) { + RowData row = rows.get(j); + Record record = records.get(j); + + pk.partition(rowWrapper.wrap(row)); + expectedPK.partition(recordWrapper.wrap(record)); + + Assert.assertEquals("Partition with column " + column + " should have one field.", 1, pk.size()); + + if (column.equals("timeType")) { + Assert.assertEquals("Partition with column " + column + " should have the expected values", + expectedPK.get(0, Long.class) / 1000, pk.get(0, Long.class) / 1000); + } else { + Assert.assertEquals("Partition with column " + column + " should have the expected values", + expectedPK.get(0, javaClasses[0]), pk.get(0, javaClasses[0])); + } + } + } + } + + @Test + public void testNestedPartitionValues() { + Schema nestedSchema = new Schema(Types.NestedField.optional(1001, "nested", SCHEMA.asStruct())); + RowType rowType = FlinkSchemaUtil.convert(nestedSchema); + + RowDataWrapper rowWrapper = new RowDataWrapper(rowType, nestedSchema.asStruct()); + InternalRecordWrapper recordWrapper = new InternalRecordWrapper(nestedSchema.asStruct()); + + List records = RandomGenericData.generate(nestedSchema, 10, 1994); + List rows = records.stream().map(record -> RowDataConverter.convert(nestedSchema, record)) + .collect(Collectors.toList()); + + for (int i = 0; i < SUPPORTED_PRIMITIVES.size(); i++) { + String column = String.format("nested.%s", SUPPORTED_PRIMITIVES.get(i)); + + PartitionSpec spec = PartitionSpec.builderFor(nestedSchema).identity(column).build(); + Class[] javaClasses = spec.javaClasses(); + + PartitionKey pk = new PartitionKey(spec, nestedSchema); + PartitionKey expectedPK = new PartitionKey(spec, nestedSchema); + + for (int j = 0; j < rows.size(); j++) { + pk.partition(rowWrapper.wrap(rows.get(j))); + expectedPK.partition(recordWrapper.wrap(records.get(j))); + + Assert.assertEquals("Partition with nested column " + column + " should have one field.", + 1, pk.size()); + + if (column.equals("nested.timeType")) { + Assert.assertEquals("Partition with nested column " + column + " should have the expected values.", + expectedPK.get(0, Long.class) / 1000, pk.get(0, Long.class) / 1000); + } else { + Assert.assertEquals("Partition with nested column " + column + " should have the expected values.", + expectedPK.get(0, javaClasses[0]), pk.get(0, javaClasses[0])); + } + } + } + } +}