Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,29 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;

/**
* Converter between Flink types and Iceberg type.
* The conversion is not a 1:1 mapping that not allows back-and-forth conversion. So some information might get lost
* during the back-and-forth conversion.
* <p>
* This inconsistent types:
* <ul>
* <li>map Iceberg UUID type to Flink BinaryType(16)</li>
* <li>map Flink VarCharType(_) and CharType(_) to Iceberg String type</li>
* <li>map Flink VarBinaryType(_) to Iceberg Binary type</li>
* <li>map Flink TimeType(_) to Iceberg Time type (microseconds)</li>
* <li>map Flink TimestampType(_) to Iceberg Timestamp without zone type (microseconds)</li>
* <li>map Flink LocalZonedTimestampType(_) to Iceberg Timestamp with zone type (microseconds)</li>
* <li>map Flink MultiSetType to Iceberg Map type(element, int)</li>
* </ul>
* <p>
*/
public class FlinkSchemaUtil {

private FlinkSchemaUtil() {
Expand All @@ -43,4 +62,40 @@ public static Schema convert(TableSchema schema) {

return new Schema(converted.asStructType().fields());
}

/**
* Convert a {@link Schema} to a {@link RowType Flink type}.
*
* @param schema a Schema
* @return the equivalent Flink type
* @throws IllegalArgumentException if the type cannot be converted to Flink
*/
public static RowType convert(Schema schema) {
return (RowType) TypeUtil.visit(schema, new TypeToFlinkType());
}

/**
* Convert a {@link Type} to a {@link LogicalType Flink type}.
*
* @param type a Type
* @return the equivalent Flink type
* @throws IllegalArgumentException if the type cannot be converted to Flink
*/
public static LogicalType convert(Type type) {
return TypeUtil.visit(type, new TypeToFlinkType());
}

/**
* Convert a {@link RowType} to a {@link TableSchema}.
*
* @param rowType a RowType
* @return Flink TableSchema
*/
public static TableSchema toSchema(RowType rowType) {
TableSchema.Builder builder = TableSchema.builder();
for (RowType.RowField field : rowType.getFields()) {
builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()));
}
return builder.build();
}
}
132 changes: 132 additions & 0 deletions flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.flink;

import java.util.List;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.DateType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.FloatType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;

class TypeToFlinkType extends TypeUtil.SchemaVisitor<LogicalType> {
TypeToFlinkType() {
}

@Override
public LogicalType schema(Schema schema, LogicalType structType) {
return structType;
}

@Override
public LogicalType struct(Types.StructType struct, List<LogicalType> fieldResults) {
List<Types.NestedField> fields = struct.fields();

List<RowType.RowField> flinkFields = Lists.newArrayListWithExpectedSize(fieldResults.size());
for (int i = 0; i < fields.size(); i += 1) {
Types.NestedField field = fields.get(i);
LogicalType type = fieldResults.get(i);
RowType.RowField flinkField = new RowType.RowField(
field.name(), type.copy(field.isOptional()), field.doc());
flinkFields.add(flinkField);
}

return new RowType(flinkFields);
}

@Override
public LogicalType field(Types.NestedField field, LogicalType fieldResult) {
return fieldResult;
}

@Override
public LogicalType list(Types.ListType list, LogicalType elementResult) {
return new ArrayType(elementResult.copy(list.isElementOptional()));
}

@Override
public LogicalType map(Types.MapType map, LogicalType keyResult, LogicalType valueResult) {
// keys in map are not allowed to be null.
return new MapType(keyResult.copy(false), valueResult.copy(map.isValueOptional()));
}

@Override
public LogicalType primitive(Type.PrimitiveType primitive) {
switch (primitive.typeId()) {
case BOOLEAN:
return new BooleanType();
case INTEGER:
return new IntType();
case LONG:
return new BigIntType();
case FLOAT:
return new FloatType();
case DOUBLE:
return new DoubleType();
case DATE:
return new DateType();
case TIME:
// MICROS
return new TimeType(6);
case TIMESTAMP:
Types.TimestampType timestamp = (Types.TimestampType) primitive;
if (timestamp.shouldAdjustToUTC()) {
// MICROS
return new LocalZonedTimestampType(6);
} else {
// MICROS
return new TimestampType(6);
}
case STRING:
return new VarCharType(VarCharType.MAX_LENGTH);
case UUID:
// UUID length is 16
return new BinaryType(16);
case FIXED:
Types.FixedType fixedType = (Types.FixedType) primitive;
return new BinaryType(fixedType.length());
case BINARY:
return new VarBinaryType(VarBinaryType.MAX_LENGTH);
case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) primitive;
return new DecimalType(decimal.precision(), decimal.scale());
default:
throw new UnsupportedOperationException(
"Cannot convert unknown type to Flink: " + primitive);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimeType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -57,8 +67,7 @@ public void testConvertFlinkSchemaToIcebergSchema() {
.field("multiset", DataTypes.MULTISET(DataTypes.STRING().notNull()))
.build();

Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
Schema expectedSchema = new Schema(
Schema icebergSchema = new Schema(
Types.NestedField.required(0, "id", Types.IntegerType.get(), null),
Types.NestedField.optional(1, "name", Types.StringType.get(), null),
Types.NestedField.required(2, "salary", Types.DoubleType.get(), null),
Expand Down Expand Up @@ -90,7 +99,7 @@ public void testConvertFlinkSchemaToIcebergSchema() {
Types.IntegerType.get()))
);

Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct());
checkSchema(flinkSchema, icebergSchema);
}

@Test
Expand All @@ -112,8 +121,7 @@ public void testMapField() {
)
.build();

Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
Schema expectedSchema = new Schema(
Schema icebergSchema = new Schema(
Types.NestedField.required(0, "map_int_long",
Types.MapType.ofOptional(4, 5, Types.IntegerType.get(), Types.LongType.get()), null),
Types.NestedField.optional(1, "map_int_array_string",
Expand All @@ -132,7 +140,7 @@ public void testMapField() {
)
);

Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct());
checkSchema(flinkSchema, icebergSchema);
}

@Test
Expand All @@ -152,8 +160,7 @@ public void testStructField() {
).nullable()) /* Optional */
.build();

Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
Schema expectedSchema = new Schema(
Schema icebergSchema = new Schema(
Types.NestedField.required(0, "struct_int_string_decimal",
Types.StructType.of(
Types.NestedField.optional(5, "field_int", Types.IntegerType.get()),
Expand All @@ -173,7 +180,8 @@ public void testStructField() {
)
)
);
Assert.assertEquals(actualSchema.asStruct(), expectedSchema.asStruct());

checkSchema(flinkSchema, icebergSchema);
}

@Test
Expand Down Expand Up @@ -201,8 +209,7 @@ public void testListField() {
).notNull()) /* Required */
.build();

Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
Schema expectedSchema = new Schema(
Schema icebergSchema = new Schema(
Types.NestedField.required(0, "list_struct_fields",
Types.ListType.ofOptional(4, Types.StructType.of(
Types.NestedField.optional(3, "field_int", Types.IntegerType.get())
Expand All @@ -222,6 +229,45 @@ public void testListField() {
))
);

Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct());
checkSchema(flinkSchema, icebergSchema);
}

private void checkSchema(TableSchema flinkSchema, Schema icebergSchema) {
Assert.assertEquals(icebergSchema.asStruct(), FlinkSchemaUtil.convert(flinkSchema).asStruct());
// The conversion is not a 1:1 mapping, so we just check iceberg types.
Assert.assertEquals(
icebergSchema.asStruct(),
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema))).asStruct());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to provide some separate unit tests to address the conversion differences ( so that we won't regress).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add tests for the conversion differences.

}

@Test
public void testInconsistentTypes() {
checkInconsistentType(
Types.UUIDType.get(), new BinaryType(16),
new BinaryType(16), Types.FixedType.ofLength(16));
checkInconsistentType(
Types.StringType.get(), new VarCharType(VarCharType.MAX_LENGTH),
new CharType(100), Types.StringType.get());
checkInconsistentType(
Types.BinaryType.get(), new VarBinaryType(VarBinaryType.MAX_LENGTH),
new VarBinaryType(100), Types.BinaryType.get());
checkInconsistentType(
Types.TimeType.get(), new TimeType(6),
new TimeType(3), Types.TimeType.get());
checkInconsistentType(
Types.TimestampType.withoutZone(), new TimestampType(6),
new TimestampType(3), Types.TimestampType.withoutZone());
checkInconsistentType(
Types.TimestampType.withZone(), new LocalZonedTimestampType(6),
new LocalZonedTimestampType(3), Types.TimestampType.withZone());
}

private void checkInconsistentType(
Type icebergType, LogicalType flinkExpectedType,
LogicalType flinkType, Type icebergExpectedType) {
Assert.assertEquals(flinkExpectedType, FlinkSchemaUtil.convert(icebergType));
Assert.assertEquals(
Types.StructType.of(Types.NestedField.optional(0, "f0", icebergExpectedType)),
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(RowType.of(flinkType))).asStruct());
}
}