Skip to content

Conversation

@JingsongLi
Copy link
Contributor

@JingsongLi JingsongLi commented Jul 7, 2020

This PR wants to improve #1096

Use LogicalTypeVisitor

Flink has LogicalTypeVisitor and DataTypeVisitor, they are very useful for visiting types. We don't implement a custom visitor based on instanceOf, it's error prone and not very elegant.
And for FieldsDataType, it not has a good design in 1.9 and 1.10, so in Flink 1.11, it has been refactored to be removed getFieldDataTypes. So I think maybe a LogicalTypeVisitor is enough, since we never touch the physical information in the DataTypes.

Support MultisetType

A CollectionDataType may be MultisetType too. We can map it to Map<T, Integer>.

@JingsongLi
Copy link
Contributor Author

CC: @openinx @rdblue

import org.apache.flink.table.types.logical.ZonedTimestampType;

public class FlinkTypeVisitor<T> {
public abstract class FlinkTypeVisitor<T> implements LogicalTypeVisitor<T> {
Copy link
Member

@openinx openinx Jul 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi I'm curious that what's the difference between the flink style LogicalTypeVisitor and iceberg style visitor... Currently, all of the visitor are iceberg style, I'm not quite sure that what's the benifits to convert it to flink style visitor ...

Update: OK, I read the background in this issues here (#1173 (comment)), sounds reasonable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, seems this FlinkTypeVisitor can be package access (I forget to check the access before).

public Type map(KeyValueDataType map, Type keyType, Type valueType) {
public Type visit(MultisetType multisetType) {
Type elementType = multisetType.getElementType().accept(this);
return Types.MapType.ofRequired(getNextId(), getNextId(), elementType, Types.IntegerType.get());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good that we've extended support the flink multiset data type .

@rdblue rdblue added this to the Flink Sink milestone Jul 7, 2020
Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The patch looks good to me overall, left few comments. @rdblue you may want to take a final check. Thanks.

List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
boolean isRoot = root == rowType;

List<Type> types = rowType.getFields().stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems here we don't need to loop twice ( the first loop to get List<Type> and the next loop to get List<Types.NestedField> ). Could be simplified like the following:

  @Override
  public Type visit(RowType rowType) {
    List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(rowType.getFieldCount());
    boolean isRoot = root == rowType;

    for (int i = 0; i < rowType.getFieldCount(); i++) {
      int id = isRoot ? i : getNextId();

      RowType.RowField field = rowType.getFields().get(i);
      String name = field.getName();
      String comment = field.getDescription().orElse(null);
      Type type = field.getType().accept(this);

      if (field.getType().isNullable()) {
        newFields.add(Types.NestedField.optional(id, name, type, comment));
      } else {
        newFields.add(Types.NestedField.required(id, name, type, comment));
      }
    }

    return Types.StructType.of(newFields);
  }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing is : we may adjust the place to generate field Id for nested types, then we may need to adjust the unit test ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to keep the loop twice. If we need change the generation ID for nested types, I think it is better to change Spark too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK about the current twice loop here now, let's just keep the consistence id generation with spark here.

@JingsongLi JingsongLi mentioned this pull request Jul 14, 2020
String comment = field.getDescription().orElse(null);

if (field.getType().isNullable()) {
newFields.add(Types.NestedField.optional(id, name, types.get(i), comment));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also a factory method that accepts a nullability boolean, NestedField.of.

.field("decimal", DataTypes.DECIMAL(2, 2))
.field("decimal2", DataTypes.DECIMAL(38, 2))
.field("decimal3", DataTypes.DECIMAL(10, 1))
.field("multiset", DataTypes.MULTISET(DataTypes.STRING().notNull()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens for a multiset of nullable items?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just like a nullable key in Map, because the default behavior in Flink is nullable key, we support its conversion:

  • in the conversion of Flink type to Iceberg type, just ignore the nullable of key.
  • in the conversion of Iceberg type to Flink type, the nullable of key becomes false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, for rows that are passed to Iceberg that have null map keys or null values in a multiset, what should happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null values are OK, the problem is null keys.
For null keys support, looks like formats are OK, the only constraint of formats is that Avro only support string key of map type.
But the thing is that whether we have any special optimizations for not null. The answer is yes, see ParquetValueWriters.option. If a null key comes to parquet writer, I think there should be NullPointException. This looks not so elegant.

Another choice is what I said in https://github.com/apache/iceberg/pull/1096/files/8891cd5438306f0b4b226706058beff7c3cd4080#diff-12a375418217cdc6be26c73e02d56065R102
We can throw a UnsupportedException here to tell users, although Flink has default nullable map key.

@rdblue
Copy link
Contributor

rdblue commented Jul 14, 2020

Overall, looks good to me. I'll merge this. Thanks @JingsongLi, I think the logical type visitor looks clean. And thanks to @openinx for reviewing!

@rdblue rdblue merged commit dfc8ec3 into apache:master Jul 14, 2020
cmathiesen pushed a commit to ExpediaGroup/iceberg that referenced this pull request Aug 19, 2020
@JingsongLi JingsongLi deleted the fix_flink_type branch November 5, 2020 09:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants