Skip to content

Support non-optional union types and column projection in complex union for Avro#5704

Closed
yiqiangin wants to merge 6 commits intoapache:mainfrom
yiqiangin:master
Closed

Support non-optional union types and column projection in complex union for Avro#5704
yiqiangin wants to merge 6 commits intoapache:mainfrom
yiqiangin:master

Conversation

@yiqiangin
Copy link
Copy Markdown

@yiqiangin yiqiangin commented Sep 4, 2022

This PR consists of two parts

In Iceberg, there are two types of schema: table schema and file schema. Table schema refers to the schema defined in Iceberg table format. File schema refers to the schema of the data stored in underlying data file. If the data file is defined in Avro format, file schema is also referred as Avro file schema.
The complex union refers to a union consisting of multiple types. While the union type is natively supported in Avro file schema, there is no union type defined in Iceberg table format. Therefore, the complex union is represented by a struct with multiple fields in Iceberg table schema. Each field in the struct is associated with a type in the union.
In normal case, the number of fields in the struct equals to the number of types in the union plus one (for the tag field).
In case of the column projection on union type in the query, the fields of the struct in Iceberg table schema are pruned according to the types projected in the query.
In contrast, the union in Avro file schema is not pruned in case of column projection, as all the types in the union are needed to read the data from Avro data file successfully.
Also the value readers to read the data of all types in the union from Avro data file are created based on the types in the union from Avro file schema and the fields in the struct of Iceberg table schema.
The major problem to be solved here is to correlate the type in Avro file schema with the corresponding field of the struct in Iceberg table schema, especially in case that only a part of fields exist in the struct of Iceberg table schema with column projection.

The main idea of the solution is as follows:

  • Build the mapping from the type name of the union in Avro file schema to the id of the corresponding field of the struct in Iceberg table schema.
  • When value readers are created, find the corresponding field in Iceberg table schema for a type in the union of Avro file schema with the id stored in the mapping which key is the name of the type in Avro file schema.

The details of the implementation are as follows:

  • The mapping from the field name in Avro file schema to the field id in Iceberg schema is derived during the conversion from Avro file schema to Iceberg table schema in the function of AvroSchemaUtil.convertToDeriveNameMapping and the class of SchemaToType.
  • The mapping of direct child fields of an Avro file schema field is stored as a property named AvroFieldNameToIcebergId in this Avro file schema field, therefore it can be easily accessed when Avro schema is traversed to generate the correspond readers to read Avro data file.
  • In case of union, the key of the mapping is the name of the branch in the union.
  • In case of complex union, the code of AvroSchemaWithTypeVisitor.visitUnion() first gets the mapping from the property of Avro file schema, then get the field id in Iceberg table schema using the type name in Avro file schema, finally it uses the field id to get the field type in Iceberg table schema:
    • if the corresponding field in Iceberg table schema exists, the field is used to create the reader together with Avro file schema node;
    • if the field for the given field id does not exist in Iceberg table schema (which means this field is not projected in Iceberg schema), a pseudo branch type is created based on the corresponding Avro file schema node to facilitate the creation of the reader.
  • In the class of UnionReader, the rows read from Avro data file are filtered according to the fields existing in Iceberg table schema.

@wmoustafa
Copy link
Copy Markdown
Contributor

It seems references to "Avro schema" in the description are ambiguous. Could you disambiguate them? For example when saying In contrast, the union schema of Avro schema is not pruned in case of column projection, it is not clear which Avro schema you are referring to. This might apply to other schema type references. File schema is an example of an unambiguous reference.

@yiqiangin
Copy link
Copy Markdown
Author

It seems references to "Avro schema" in the description are ambiguous. Could you disambiguate them? For example when saying In contrast, the union schema of Avro schema is not pruned in case of column projection, it is not clear which Avro schema you are referring to. This might apply to other schema type references. File schema is an example of an unambiguous reference.

Good point. The description is revised to remove the ambigulity.

return AvroSchemaVisitor.visit(schema, new SchemaToType(schema));
}

public static Type convertToDeriveNameMapping(Schema schema) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with the other APIs, rename this to visit and provide a flag to indicate whether to derive name mapping?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revisit this comment after addressing some of the more fundamental ones below.

Comment on lines +108 to +111
String name =
branch.getType().equals(Schema.Type.RECORD)
? branch.getName()
: branch.getType().getName();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might add a comment explaining the logic here.

Comment on lines +108 to +111
String name =
branch.getType().equals(Schema.Type.RECORD)
? branch.getName()
: branch.getType().getName();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might add a comment explaining the logic here.

Comment on lines +108 to +111
String name =
branch.getType().equals(Schema.Type.RECORD)
? branch.getName()
: branch.getType().getName();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might add a comment explaining the logic here.

}

if (deriveNameMapping && record.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
record.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a map to the entire union type field or a prop to each branch, similar to how records/structs work for example?

Comment on lines +115 to +120
if (branchType != null) {
options.add(visit(branchType.type(), branch, visitor));
} else {
Type pseudoBranchType = AvroSchemaUtil.convert(branch);
options.add(visit(pseudoBranchType, branch, visitor));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to add comments to the if/else branches.

}
}

SchemaToType(Schema root, boolean deriveNameMapping) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be cleaner to extract the name mapping injection to another class that either extends AvroSchemaVisitor or SchemaToType.

}

if (deriveNameMapping && union.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
union.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is not expected to change the input schema.

@Override
public void setSchema(Schema newFileSchema) {
this.fileSchema = newFileSchema;
AvroSchemaUtil.convertToDeriveNameMapping(this.fileSchema);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line should change/go away after addressing some of the other comments.

this.fileSchema = newFileSchema;
AvroSchemaUtil.convertToDeriveNameMapping(this.fileSchema);
if (nameMapping == null && !AvroSchemaUtil.hasIds(fileSchema)) {
nameMapping = MappingUtil.create(expectedSchema);
Copy link
Copy Markdown
Contributor

@wmoustafa wmoustafa Sep 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that we would add name/type name information to the nameMapping maps. For example ("int" -> 2, "com.my.namespace.MyRecord" -> 3). @rdblue, what do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. See the code I posted above. I think we need to add branch-id to the union branches during pruning.

}
}
}
return visitor.union(type, union, options);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here you are returning all the readers and in the UnionReader you are trying to figure out which ones to use and which to ignore. Can we just pass the required readers here in a way that aligns with the expected schema ahead of time?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid not. The readers for all branch types need to pass into UnionReader, as UnionReader needs to read all types of data from the union in Avro file to read the all the records successfully. Filtering the data based on the types projected in expected Iceberg schema can only happens after the data are read from Avro file

}

@Override
public InternalRow read(Decoder decoder, Object reuse) throws IOException {
Copy link
Copy Markdown
Contributor

@wmoustafa wmoustafa Sep 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not the logic here be:

  • Iterate on the Avro schema. For each branch, get the field ID from the Avro schema annotation.
  • The assumption is Avro schema union preserves all the union branches even if some are not projected. So we still need to figure out if a field is projected or not. This can be achieved by looking up the field ID from the step above in the expected Iceberg schema. If the field ID is projected, populate the InternalRow index using the next suitable reader (hopefully reader order is preset properly in AvroSchemaWithTypeVisitor to match the expected projection).
  • If the field ID is not projected, skip.

The above logic can be split/refactored between the constructor and the read method for efficiency.
@rdblue Let me know if this matches your understanding.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @wmoustafa, although I think that this is correct to make the mapping array. That way the implementation is straightforward:

  InternalRow row = reuseOrCreate(reuse); // this is where setNullAt happens
  int index = decoder.readIndex();
  int destIndex = projectionIndexes[index];
  if (destIndex >= 0) {
    Object value = readers[index].read(decoder, get(reuse, destIndex));
    row.update(destIndex, value);
  } else {
    readers[index].read(decoder, null);
  }

  return row;

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per Ryan's suggestion, Avro Schema is not passed into ComplexUnionReader. The approach of mapping array is used to track the relationship between a branch type and the position of its value in the returned row.

// the original
// schema, while for nested types, we want to use the visitResult because they have content from
// the previous
// recursive calls.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix line wrapping? Looks like this was auto-formatted.

// schema, while for nested types, we want to use the visitResult because they have content from
// the previous
// recursive calls.
private static Schema copyUnion(Schema record, List<Schema> visitResults) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better name for this? Maybe pruneComplexUnion?

List<Schema> branches = Lists.newArrayListWithExpectedSize(visitResults.size());
for (int i = 0; i < visitResults.size(); i++) {
if (visitResults.get(i) == null) {
branches.add(record.getTypes().get(i));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like record is actually a union and not a record.


SchemaToType(Schema root, boolean deriveNameMapping) {
this(root);
this.deriveNameMapping = deriveNameMapping;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this PR should build a name mapping. That can be added in a later PR, and it should not use a custom Avro property.

Where possible, we avoid mixing jobs in the Iceberg project. This class converts a schema from Avro to Iceberg and should do only that. If you want to derive a mapping, I'd recommend building a visitor to do that.

}
}
Schema schema = Schema.createUnion(branches);
if (record.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID) != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yiqiangin, @wmoustafa, I would expect this to apply the name mapping, but instead it passes on a custom schema property. I don't think that this approach is correct.

The Avro implementation for name mapping is a little odd. For Parquet and ORC, there's a class that rewrites the schema and adds IDs. It looks like instead of taking that approach, the Avro implementer added name mapping to this class. That's okay, but that means that the name mapping should be applied here for the union work.

We want to create guarantees that we can rely on to simplify other code. In this case, once PruneColumns is done, we're guaranteed to have an Avro schema with the correct field IDs annotated throughout.

To do that, I think the field ID should be added to each Schema that is a branch of the union:

  List<Schema> unionTypes = union.getTypes();
  for (int ind = 0; ind < branches.size(); ind += 1) {
    Schema branchSchema = visitResults.get(ind);
    if (branchSchema == null) {
      branchSchema = unionTypes.get(ind);
    }

    Integer branchId = AvroSchemaUtil.getBranchId(branchSchema, nameMapping, fieldNames());
    if (branchId != null) {
      optionSchema.addProp(AvroSchemaUtil.BRANCH_ID_PROP, String.valueOf(branchId));
    }

    branches.add(branchSchema)
  }

  return Schema.createUnion(branches);

// AvroSchemaUtil additions:

  public static final String BRANCH_ID_PROP = "branch-id";

  static Integer getBranchId(
      Schema branch, NameMapping mapping, Iterable<String> parentFieldNames) {
    Object id = branch.getObjectProp(BRANCH_ID_PROP);
    if (id != null) {
      return toInt(id);
    } else if (mapping != null) {
      MappedField mappedField = findInMapping(mapping, parentFieldNames, branch.getName(), branch.getFullName());
      if (mappedField != null) {
        return mappedField.id();
      }
    }

    return null;
  }

  private static MappedField findInMapping(NameMapping mapping, Iterable<String> parentFieldNames, String... nameOpts) {
    List<String> names = Lists.newArrayList(parentFieldNames);
    for (String name : nameOpts) {
      names.add(name);
      MappedField field = mapping.find(name);
      if (field != null) {
        return field;
      }
    }

    return null;
  }

type,
union);
Map<String, Integer> fieldNameToId =
(Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matching should be done using branch IDs, not a map like this.

}
}

private static class ComplexUnionReader implements ValueReader<InternalRow> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is specific to Spark about this? Can we use an approach like the struct reader and have a generic one that is extended by Spark, Flink, etc. to make the type concrete?

import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.Schema;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not passing in the Avro schema. I think that the behavior should be that the AvroSchemaWithTypeVisitor visits each union branch and produces a ValueReader. Then the visitor implementation should create the index map and pass it into the reader. Not passing the schema in should keep the reader simple.

}

// checking if NULL type exists in Avro union schema
this.nullTypeIndex = -1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an index for null, then it should be handled just like any other value reader, right? It won't be projected, but if the union has the null index, the reader can be called and will do nothing.

I guess the odd thing is that there isn't a NullValueReader that can be used as a placeholder? I think maybe adding one would be cleaner than adding special handling for null options.

this.isTagFieldProjected = false;
for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
String fieldName = expectedStructField.name();
if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to identify the tag field?

if (fieldName.equals(UNION_TAG_FIELD_NAME)) {
this.isTagFieldProjected = true;
this.numOfFieldsInReturnedRow++;
continue;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: prefer else to continue when the logic is simple like this.

this.numOfFieldsInReturnedRow++;
continue;
}
int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not parse field names. It should instead use field IDs from the Iceberg schema and branch IDs from the Avro schema.

if (index == nullTypeIndex) {
// if it is a null data, directly return null as the whole union result
// we know for sure it is a null so the casting will always work.
return (InternalRow) readers[nullTypeIndex].read(decoder, reuse);
Copy link
Copy Markdown
Contributor

@rdblue rdblue Oct 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that this always returns null, but it is really weird to return the result of a reader directly.

What if the tag was projected? Why does this not produce InternalRow(nullIndex, null, null, ... null)?

}

// otherwise, we need to return an InternalRow as a struct data
InternalRow struct = new GenericInternalRow(numOfFieldsInReturnedRow);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readers need to support an option to reuse the row. You can see how in the struct reader.

@github-actions
Copy link
Copy Markdown

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Aug 18, 2024
@github-actions
Copy link
Copy Markdown

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Aug 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants