diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
index d4dd61d1057e..90534d714630 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
@@ -22,10 +22,29 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+/**
+ * Converter between Flink types and Iceberg type.
+ * The conversion is not a 1:1 mapping that not allows back-and-forth conversion. So some information might get lost
+ * during the back-and-forth conversion.
+ *
+ * This inconsistent types:
+ *
+ * - map Iceberg UUID type to Flink BinaryType(16)
+ * - map Flink VarCharType(_) and CharType(_) to Iceberg String type
+ * - map Flink VarBinaryType(_) to Iceberg Binary type
+ * - map Flink TimeType(_) to Iceberg Time type (microseconds)
+ * - map Flink TimestampType(_) to Iceberg Timestamp without zone type (microseconds)
+ * - map Flink LocalZonedTimestampType(_) to Iceberg Timestamp with zone type (microseconds)
+ * - map Flink MultiSetType to Iceberg Map type(element, int)
+ *
+ *
+ */
public class FlinkSchemaUtil {
private FlinkSchemaUtil() {
@@ -43,4 +62,40 @@ public static Schema convert(TableSchema schema) {
return new Schema(converted.asStructType().fields());
}
+
+ /**
+ * Convert a {@link Schema} to a {@link RowType Flink type}.
+ *
+ * @param schema a Schema
+ * @return the equivalent Flink type
+ * @throws IllegalArgumentException if the type cannot be converted to Flink
+ */
+ public static RowType convert(Schema schema) {
+ return (RowType) TypeUtil.visit(schema, new TypeToFlinkType());
+ }
+
+ /**
+ * Convert a {@link Type} to a {@link LogicalType Flink type}.
+ *
+ * @param type a Type
+ * @return the equivalent Flink type
+ * @throws IllegalArgumentException if the type cannot be converted to Flink
+ */
+ public static LogicalType convert(Type type) {
+ return TypeUtil.visit(type, new TypeToFlinkType());
+ }
+
+ /**
+ * Convert a {@link RowType} to a {@link TableSchema}.
+ *
+ * @param rowType a RowType
+ * @return Flink TableSchema
+ */
+ public static TableSchema toSchema(RowType rowType) {
+ TableSchema.Builder builder = TableSchema.builder();
+ for (RowType.RowField field : rowType.getFields()) {
+ builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
+ }
+ return builder.build();
+ }
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java b/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java
new file mode 100644
index 000000000000..dfd8ffb9668c
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+
+class TypeToFlinkType extends TypeUtil.SchemaVisitor {
+ TypeToFlinkType() {
+ }
+
+ @Override
+ public LogicalType schema(Schema schema, LogicalType structType) {
+ return structType;
+ }
+
+ @Override
+ public LogicalType struct(Types.StructType struct, List fieldResults) {
+ List fields = struct.fields();
+
+ List flinkFields = Lists.newArrayListWithExpectedSize(fieldResults.size());
+ for (int i = 0; i < fields.size(); i += 1) {
+ Types.NestedField field = fields.get(i);
+ LogicalType type = fieldResults.get(i);
+ RowType.RowField flinkField = new RowType.RowField(
+ field.name(), type.copy(field.isOptional()), field.doc());
+ flinkFields.add(flinkField);
+ }
+
+ return new RowType(flinkFields);
+ }
+
+ @Override
+ public LogicalType field(Types.NestedField field, LogicalType fieldResult) {
+ return fieldResult;
+ }
+
+ @Override
+ public LogicalType list(Types.ListType list, LogicalType elementResult) {
+ return new ArrayType(elementResult.copy(list.isElementOptional()));
+ }
+
+ @Override
+ public LogicalType map(Types.MapType map, LogicalType keyResult, LogicalType valueResult) {
+ // keys in map are not allowed to be null.
+ return new MapType(keyResult.copy(false), valueResult.copy(map.isValueOptional()));
+ }
+
+ @Override
+ public LogicalType primitive(Type.PrimitiveType primitive) {
+ switch (primitive.typeId()) {
+ case BOOLEAN:
+ return new BooleanType();
+ case INTEGER:
+ return new IntType();
+ case LONG:
+ return new BigIntType();
+ case FLOAT:
+ return new FloatType();
+ case DOUBLE:
+ return new DoubleType();
+ case DATE:
+ return new DateType();
+ case TIME:
+ // MICROS
+ return new TimeType(6);
+ case TIMESTAMP:
+ Types.TimestampType timestamp = (Types.TimestampType) primitive;
+ if (timestamp.shouldAdjustToUTC()) {
+ // MICROS
+ return new LocalZonedTimestampType(6);
+ } else {
+ // MICROS
+ return new TimestampType(6);
+ }
+ case STRING:
+ return new VarCharType(VarCharType.MAX_LENGTH);
+ case UUID:
+ // UUID length is 16
+ return new BinaryType(16);
+ case FIXED:
+ Types.FixedType fixedType = (Types.FixedType) primitive;
+ return new BinaryType(fixedType.length());
+ case BINARY:
+ return new VarBinaryType(VarBinaryType.MAX_LENGTH);
+ case DECIMAL:
+ Types.DecimalType decimal = (Types.DecimalType) primitive;
+ return new DecimalType(decimal.precision(), decimal.scale());
+ default:
+ throw new UnsupportedOperationException(
+ "Cannot convert unknown type to Flink: " + primitive);
+ }
+ }
+}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
index c7c00c12e33c..320d3bb30861 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
@@ -21,7 +21,17 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
@@ -57,8 +67,7 @@ public void testConvertFlinkSchemaToIcebergSchema() {
.field("multiset", DataTypes.MULTISET(DataTypes.STRING().notNull()))
.build();
- Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
- Schema expectedSchema = new Schema(
+ Schema icebergSchema = new Schema(
Types.NestedField.required(0, "id", Types.IntegerType.get(), null),
Types.NestedField.optional(1, "name", Types.StringType.get(), null),
Types.NestedField.required(2, "salary", Types.DoubleType.get(), null),
@@ -90,7 +99,7 @@ public void testConvertFlinkSchemaToIcebergSchema() {
Types.IntegerType.get()))
);
- Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct());
+ checkSchema(flinkSchema, icebergSchema);
}
@Test
@@ -112,8 +121,7 @@ public void testMapField() {
)
.build();
- Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
- Schema expectedSchema = new Schema(
+ Schema icebergSchema = new Schema(
Types.NestedField.required(0, "map_int_long",
Types.MapType.ofOptional(4, 5, Types.IntegerType.get(), Types.LongType.get()), null),
Types.NestedField.optional(1, "map_int_array_string",
@@ -132,7 +140,7 @@ public void testMapField() {
)
);
- Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct());
+ checkSchema(flinkSchema, icebergSchema);
}
@Test
@@ -152,8 +160,7 @@ public void testStructField() {
).nullable()) /* Optional */
.build();
- Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
- Schema expectedSchema = new Schema(
+ Schema icebergSchema = new Schema(
Types.NestedField.required(0, "struct_int_string_decimal",
Types.StructType.of(
Types.NestedField.optional(5, "field_int", Types.IntegerType.get()),
@@ -173,7 +180,8 @@ public void testStructField() {
)
)
);
- Assert.assertEquals(actualSchema.asStruct(), expectedSchema.asStruct());
+
+ checkSchema(flinkSchema, icebergSchema);
}
@Test
@@ -201,8 +209,7 @@ public void testListField() {
).notNull()) /* Required */
.build();
- Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
- Schema expectedSchema = new Schema(
+ Schema icebergSchema = new Schema(
Types.NestedField.required(0, "list_struct_fields",
Types.ListType.ofOptional(4, Types.StructType.of(
Types.NestedField.optional(3, "field_int", Types.IntegerType.get())
@@ -222,6 +229,45 @@ public void testListField() {
))
);
- Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct());
+ checkSchema(flinkSchema, icebergSchema);
+ }
+
+ private void checkSchema(TableSchema flinkSchema, Schema icebergSchema) {
+ Assert.assertEquals(icebergSchema.asStruct(), FlinkSchemaUtil.convert(flinkSchema).asStruct());
+ // The conversion is not a 1:1 mapping, so we just check iceberg types.
+ Assert.assertEquals(
+ icebergSchema.asStruct(),
+ FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema))).asStruct());
+ }
+
+ @Test
+ public void testInconsistentTypes() {
+ checkInconsistentType(
+ Types.UUIDType.get(), new BinaryType(16),
+ new BinaryType(16), Types.FixedType.ofLength(16));
+ checkInconsistentType(
+ Types.StringType.get(), new VarCharType(VarCharType.MAX_LENGTH),
+ new CharType(100), Types.StringType.get());
+ checkInconsistentType(
+ Types.BinaryType.get(), new VarBinaryType(VarBinaryType.MAX_LENGTH),
+ new VarBinaryType(100), Types.BinaryType.get());
+ checkInconsistentType(
+ Types.TimeType.get(), new TimeType(6),
+ new TimeType(3), Types.TimeType.get());
+ checkInconsistentType(
+ Types.TimestampType.withoutZone(), new TimestampType(6),
+ new TimestampType(3), Types.TimestampType.withoutZone());
+ checkInconsistentType(
+ Types.TimestampType.withZone(), new LocalZonedTimestampType(6),
+ new LocalZonedTimestampType(3), Types.TimestampType.withZone());
+ }
+
+ private void checkInconsistentType(
+ Type icebergType, LogicalType flinkExpectedType,
+ LogicalType flinkType, Type icebergExpectedType) {
+ Assert.assertEquals(flinkExpectedType, FlinkSchemaUtil.convert(icebergType));
+ Assert.assertEquals(
+ Types.StructType.of(Types.NestedField.optional(0, "f0", icebergExpectedType)),
+ FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(RowType.of(flinkType))).asStruct());
}
}