Skip to content

Column projection of union type#108

Closed
yiqiangin wants to merge 4 commits intolinkedin:li-0.11.xfrom
yiqiangin:yiqdingin/union-type-projection
Closed

Column projection of union type#108
yiqiangin wants to merge 4 commits intolinkedin:li-0.11.xfrom
yiqiangin:yiqdingin/union-type-projection

Conversation

@yiqiangin
Copy link
Copy Markdown

@yiqiangin yiqiangin commented May 16, 2022

Problem
Currently column projection does not work for union type. The root cause of the problem is as follows:

  • The current code assumes that the types inside a union from Avro schema should match the type fields inside a union struct from Iceberg schema when Avro union reader is created.
  • In case of column projection of union type, the current code only prune the schema of union in Iceberg schema with the projected fields, while the union of Avro schema still contains all the types. It results in the mismatch between Avro schema and Iceberg schema for the union in this case.
  • However, as all the contents of each data type in a union in Avro file should be read by Avro readers correctly no matter this data type is projected or not based on the decoding procedure of Avro file, all the types in a union from Avro schema are needed to create the corresponding type readers in AvroUnionReader even in case of column projection. Therefore the union in Avro schema cannot be pruned like what is done to the union struct in Iceberg schema.

Solution
Assuming there are N types in a union, there are N+1 fields including "tag" field in the struct corresponding to the union in Iceberg schema. The user can project any K fields (K>=1 and K<=N+1 and including the tag field) of the union in a query. The case of without column projection equals to full fields projection namely K=N+1. Therefore the solution does not differentiate the cases of with and without column projections.
In addition, the order of the types in a union in Iceberg schema can be identified from its field name like "field0".."fieldK". K is the index which can be used to match the order of the types in the union of Avro schema.

In the code of create the readers of all types in the union of Avro schema (namely AvroSchemaWithTypeVisitor.visitUnion), checking the fields of the struct corresponding union in Iceberg schema to create a map between the order index and the field type in Iceberg schema. When iterating through all the types in Avro schema, using the order index to check if the corresponding type exists in the map, if yes which means the field is projected, creating the option of creating the reader with the type in Iceberg schema, otherwise, creating the option with type null.

In the code of AvroUnionReader, Iceberg schema needs to be passed into it. The fields of the returned row should be constructed based on the fields in Iceberg schema not the types in Avro schema. If tag field is projected, one more field is added in the beginning of the row and updated with the index of the field in Avro file.

Test
All the test cases in TestSparkAvroUnions.java with a new test case writeAndValidateRequiredComplexUnionWithProjection

Manual test with Spark3 with all the following queries on a table with a union:

val df = spark.sql("select c1.field0 from u_yiqding.avro_union_table_test")

val df = spark.sql("select c1.field0,c1.field1 from u_yiqding.avro_union_table_test")

val df = spark.sql("select c1.tag,c1.field0 from u_yiqding.avro_union_table_test")

val df = spark.sql("select c1.tag,c1.field0,c1.field1 from u_yiqding.avro_union_table_test")

val df = spark.sql("select c1.field1,c1.field0,c1.tag from u_yiqding.avro_union_table_test")

val df = spark.sql("select c1.field1,c1.field0 from u_yiqding.avro_union_table_test")

@wmoustafa
Copy link
Copy Markdown

Thank you so much for the PR and comprehensive description!


/*
A complex union with multiple types of Avro schema is converted into a struct with multiple fields of Iceberg schema.
A field is related to a type in the order defined in Avro schema. Also, an extra tag field is added into the struct of
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we frame A field is related to a type in the order defined in Avro schema as an expectation from the Iceberg schema to match the Avro schema?

/*
A complex union with multiple types of Avro schema is converted into a struct with multiple fields of Iceberg schema.
A field is related to a type in the order defined in Avro schema. Also, an extra tag field is added into the struct of
Iceberg schema. The user can query the column of union type with the field projected (e.g. colUnion.field0) in which
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Iceberg documentation tries to avoid describing scenarios referencing the user. Can we make this a factual statement about the function spec that does not involve referencing users?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we should make this function description revolve around projection pruning use case. Let us describe it in a more general sense.

A field is related to a type in the order defined in Avro schema. Also, an extra tag field is added into the struct of
Iceberg schema. The user can query the column of union type with the field projected (e.g. colUnion.field0) in which
the maximum number of the fields to be projected equals to the number of fields of the complete struct converted
from the union. The case of without field projection equals to the case of full fields projection.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case of without field projection equals to the case of full fields projection. is not clear.

// can be used to track the corresponding field in the struct of Iceberg schema.
int actualTypeIndex = nullTypeFound ? typeIndex - 1 : typeIndex;
boolean relatedFieldInStructFound = false;
while (fieldIndexInStruct < type.asStructType().fields().size()) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this loop or can you just keep two pointers and leverage the outer loop?

nullTypeFound = true;
options.add(visit((Type) null, schema, visitor));
typeIndex++;
continue;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to do if/else instead of continue? I think this might get rid of typeIndex++ twice.

return this;
}

public ReadBuilder setFileSchema(Schema fileSchema) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only called in the tests. I am not sure how it is called in the non-test path.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this code together with the code change in ProjectionDatumReader.setSchema are added only for test the case that fileSchema to read Avro file is different from the actual schema to write the Avro file. This test case proves that the data cannot be returned by mistake if the schema to read the file is different from the schema to write the file. In normal code flow, the fileSchema to read Avro file is always read from the metadata of the actual Avro file, therefore the schema to read file is always the same as the schema to write the file. This code change is for test only and does not affect the normal code flow. It can be removed.

Comment on lines 73 to +76
public void setSchema(Schema newFileSchema) {
this.fileSchema = newFileSchema;
if (this.fileSchema == null) {
this.fileSchema = newFileSchema;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we are ignoring newFileSchema in the uniontype case, which might be an incorrect use of the API. Note that this method overrides its parent in Avro's DatumReader.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls refer to the comment above.

@yiqiangin yiqiangin closed this Jul 19, 2023
@yiqiangin yiqiangin deleted the yiqdingin/union-type-projection branch October 6, 2023 19:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants