Skip to content

Conversation

@chenjunjiedada
Copy link
Collaborator

This is a refactor for parquet schema visitor which accepts a partner type, such as Iceberg Type, Spark DataType, Flink LogicalType.

@chenjunjiedada
Copy link
Collaborator Author

@JingsongLi , This is similar to the visitor you wrote before, would you mind to take a look?

@JingsongLi
Copy link
Contributor

Thanks @chenjunjiedada for the contribution, I'll take a look these two days.

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @chenjunjiedada , sorry for late review, please rebase latest master.

public ParquetValueReader<RowData> message(org.apache.iceberg.types.Type expected, MessageType message,
List<ParquetValueReader<?>> fieldReaders) {
return struct(expected, message.asGroupType(), fieldReaders);
if (expected == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: return struct(expected == null ? null : expected.asStructType(), message.asGroupType(), fieldReaders);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

List<ParquetValueWriter<?>> fieldWriters) {
List<Type> fields = struct.getFields();
List<RowField> flinkFields = sStruct.getFields();
List<RowField> flinkFields = ((RowType) fStruct).getFields();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change to use LogicalType.getChildren?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, updated.

private final Deque<String> fieldNames = Lists.newLinkedList();

public static <P, T> T visit(P partnerType, Type type, ParquetTypeWithPartnerVisitor<P, T> visitor) {
if (type instanceof MessageType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preconditions.checkNotNull(partnerType, "Invalid partnerType: null");

Copy link
Collaborator Author

@chenjunjiedada chenjunjiedada Oct 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we have to allow the partner type to be null in current logic since we will call visitFields where the partner could be null. Also when visiting message/struct, we allow the expected to be null. Let me investigate whether we could avoid this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi , The visit is also called by vistitList and visitMap where we cannot guarantee that the inner type is not null. So I think we should allow null partner here. Does that make sense to you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's possible to have null for the type, then the if{} else if{} else {} will throw NPE ? Because they did not do any nullable check for type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I got the wrong thing. Pls ignore my above comment.

}


public void beforeField(Type type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand the meaning of the following methods. Will they be overrided?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to customize the stack when visiting the type, such as ApplyNameMapping visitor which override some of them to generate the correct mapping.

protected abstract P arrayElementType(P arrayType);
protected abstract P mapKeyType(P mapType);
protected abstract P mapValueType(P mapType);
protected abstract Pair<String, P> fieldNameAndType(P structType, int pos, Integer fieldId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fieldId -> parquetFieldId

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
Type valueType = repeatedKeyValue.getType(1);
visitor.beforeValueField(valueType);
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can have a method runWithStack(Runnable, Type)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParquetTypeVisitor also has the same visiting code logic. How about using a separated PR to refactor both?

return list.toArray(new String[0]);
Type type = struct.field(fieldId).type();
String name = struct.field(fieldId).name();
return Pair.of(name, type);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT:

    Types.NestedField field = struct.field(fieldId);
    return field == null ? null : Pair.of(field.name(), field.type());

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

protected abstract P arrayElementType(P arrayType);
protected abstract P mapKeyType(P mapType);
protected abstract P mapValueType(P mapType);
protected abstract Pair<String, P> fieldNameAndType(P structType, int pos, Integer fieldId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need fieldNameAndType? Looks like just type is OK?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right.

newOption(repeatedKeyValue.getType(0), keyWriter),
newOption(repeatedKeyValue.getType(1), valueWriter),
sMap.keyType(), sMap.valueType());
((MapType) sMap).keyType(), ((MapType) sMap).valueType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use arrayElementType mapKeyType mapValueType etc... to avoid casting?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@chenjunjiedada
Copy link
Collaborator Author

Thanks a lot @JingsongLi! I will update this tomorrow.

@chenjunjiedada chenjunjiedada force-pushed the refactor-for-flink-reader-and-writer branch from 38a1ee2 to d5db93d Compare October 30, 2020 07:31
@chenjunjiedada
Copy link
Collaborator Author

@JingsongLi , I think this is ready for another review. Could you please take a look?

Comment on lines 157 to 160
visitor.beforeField(field);
Integer fieldId = field.getId() == null ? null : field.getId().intValue();
results.add(visit(visitor.fieldType(struct, i, fieldId), field, visitor));
visitor.afterField(field);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: should we use a try-finally block here for the beforeField & afterField ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so. Nice catch!

List<String> list = Lists.newArrayList(fieldNames.descendingIterator());
list.add(name);
return list.toArray(new String[0]);
Types.NestedField field = structType.asStructType().field(fieldId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bug here ? Because in the method definition, we have a parquetFieldId but here we use this id to access the iceberg nested field ? That sounds unreasonable.

  protected abstract P fieldType(P structType, int pos, Integer parquetFieldId);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw: we might need to have a unit test to cover this case ?

List<String> list = Lists.newArrayList(fieldNames.descendingIterator());
list.add(name);
return list.toArray(new String[0]);
return ((RowType) structType).getTypeAt(pos);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious whether the inner fields are keep the same order as the parquet's fields. If not, then we would get the incorrect data type by the pos provided from parquet.

if (field.getId() != null) {
id = field.getId().intValue();
}
Types.NestedField iField = (struct != null && id >= 0) ? struct.field(id) : null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue , Is it a bug in the master branch here ? we use the parquet field id to access the iceberg nested field ? That sounds unreasonable...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two ways to traverse the schemas together, by name and by ID. When we are building a reader, we use the ID method because names don't necessarily match between when the file was written and the current table schema. The columns are identified by ID, so this is correct in that case.

Traversing two schemas by name is only done when we know that the names between the two match. For example, when Spark runs a CTAS operation, we convert the Spark schema to an Iceberg schema and we know that the two have the same structure and field names, but the Spark schema has no IDs. So when we build a writer, we have to match by name but we know that this is safe. (The two schemas are needed to build writers that convert, like short -> int.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The columns are identified by ID, so this is correct in that case.

OK, I saw the the ParquetWriter will persist the parquet schema which was converted from iceberg schema by TypeToMessageType (it will use the same field id for the converted parquet field). So the TypeWithSchemaVisitor 's parquet schema should have the same field ids as the iceberg table's schema.

I just want to make sure it was designed intentionally. Thanks for the context and confirmation.

@rdblue
Copy link
Contributor

rdblue commented Nov 3, 2020

I'm not sure we want to move forward with these changes. This is a lot of code churn and I don't see much of a benefit. Abstracting this requires using more generic types and calling asStruct or similar methods. Without a motivating use case like a new visitor, I wonder if this has enough value to warrant making the changes.

@chenjunjiedada
Copy link
Collaborator Author

@rdblue , This was motivated when ParquetWithFlinkSchemaVisitor was adding, see discussion here #1272 (comment).

parthchandra pushed a commit to parthchandra/iceberg that referenced this pull request Oct 22, 2025
* Core, Spark 3.5: Remove dangling deletes as part of RewriteDataFilesAction (apache#9724)

* Spark 3.4: Action to remove dangling deletes (apache#11377)

* SpotlessApply

---------

Co-authored-by: Hongyue/Steve Zhang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants