Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,10 @@ public static class ReadBuilder {
private Function<Schema, DatumReader<?>> createReaderFunc = null;
private BiFunction<org.apache.iceberg.Schema, Schema, DatumReader<?>> createReaderBiFunc = null;

// This field is temporally added to pass the map between Avro schema name and Iceberg field id
// mocked in the test to the classes of ProjectionDatumReader and PruneColumns
private Map<String, Integer> avroSchemaNameToIcebergFieldId = null;

@SuppressWarnings("UnnecessaryLambda")
private final Function<Schema, DatumReader<?>> defaultCreateReaderFunc =
readSchema -> {
Expand Down Expand Up @@ -683,6 +687,12 @@ public ReadBuilder classLoader(ClassLoader classLoader) {
return this;
}

public ReadBuilder withAvroSchemaNameToIcebergFieldId(
Map<String, Integer> schemaNameToIcebergFieldId) {
this.avroSchemaNameToIcebergFieldId = schemaNameToIcebergFieldId;
return this;
}

public <D> AvroIterable<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
Function<Schema, DatumReader<?>> readerFunc;
Expand All @@ -696,7 +706,8 @@ public <D> AvroIterable<D> build() {

return new AvroIterable<>(
file,
new ProjectionDatumReader<>(readerFunc, schema, renames, nameMapping),
new ProjectionDatumReader<>(
readerFunc, schema, renames, nameMapping, avroSchemaNameToIcebergFieldId),
start,
length,
reuseContainers);
Expand Down
60 changes: 60 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.avro.Schema;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand All @@ -47,12 +48,16 @@ private AvroSchemaUtil() {}
public static final String ELEMENT_ID_PROP = "element-id";
public static final String ADJUST_TO_UTC_PROP = "adjust-to-utc";

public static final String BRANCH_ID_PROP = "branch-id";

private static final Schema NULL = Schema.create(Schema.Type.NULL);
private static final Schema.Type MAP = Schema.Type.MAP;
private static final Schema.Type ARRAY = Schema.Type.ARRAY;
private static final Schema.Type UNION = Schema.Type.UNION;
private static final Schema.Type RECORD = Schema.Type.RECORD;

private static final Joiner DOT = Joiner.on('.');

public static Schema convert(org.apache.iceberg.Schema schema, String tableName) {
return convert(schema, ImmutableMap.of(schema.asStruct(), tableName));
}
Expand Down Expand Up @@ -121,6 +126,15 @@ public static Schema pruneColumns(
return new PruneColumns(selectedIds, nameMapping).rootSchema(schema);
}

public static Schema pruneColumns(
Schema schema,
Set<Integer> selectedIds,
NameMapping nameMapping,
Map<String, Integer> avroSchemaFieldNameToIcebergFieldId) {
return new PruneColumns(selectedIds, nameMapping, avroSchemaFieldNameToIcebergFieldId)
.rootSchema(schema);
}

public static Schema buildAvroProjection(
Schema schema, org.apache.iceberg.Schema expected, Map<String, String> renames) {
return AvroCustomOrderSchemaVisitor.visit(schema, new BuildAvroProjection(expected, renames));
Expand Down Expand Up @@ -158,6 +172,27 @@ public static boolean isOptionSchema(Schema schema) {
return false;
}

/**
* This method decides whether a schema is of type union and is complex union and is optional
*
* <p>Complex union: the number of options in union larger than 2 Optional: null is present in
* union
*
* @param schema input schema
* @return true if schema is complex union and it is optional
*/
public static boolean isOptionalComplexUnion(Schema schema) {
if (schema.getType() == UNION && schema.getTypes().size() > 2) {
for (Schema type : schema.getTypes()) {
if (type.getType() == Schema.Type.NULL) {
return true;
}
}
}

return false;
}

static Schema toOption(Schema schema) {
if (schema.getType() == UNION) {
Preconditions.checkArgument(
Expand All @@ -168,6 +203,18 @@ static Schema toOption(Schema schema) {
}
}

public static Schema toOption(Schema schema, boolean nullIsSecondElement) {
if (schema.getType() == UNION) {
Preconditions.checkArgument(
isOptionSchema(schema), "Union schemas are not supported: %s", schema);
return schema;
} else if (nullIsSecondElement) {
return Schema.createUnion(schema, NULL);
} else {
return Schema.createUnion(NULL, schema);
}
}

static Schema fromOption(Schema schema) {
Preconditions.checkArgument(
schema.getType() == UNION, "Expected union schema but was passed: %s", schema);
Expand Down Expand Up @@ -477,4 +524,17 @@ private static String sanitize(char character) {
}
return "_x" + Integer.toHexString(character).toUpperCase();
}

public static Integer getBranchId(
Schema branch, Map<String, Integer> nameToIdMap, Iterable<String> parentFieldNames) {
Object id = branch.getObjectProp(BRANCH_ID_PROP);
if (id != null) {
return toInt(id);
} else if (nameToIdMap != null && nameToIdMap.isEmpty()) {
List<String> names = Lists.newArrayList(parentFieldNames);
names.add(branch.getName());
return nameToIdMap.get(DOT.join(names));
}
return null;
}
}
18 changes: 15 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,23 @@ public static <T> T visit(Schema schema, AvroSchemaVisitor<T> visitor) {
case UNION:
List<Schema> types = schema.getTypes();
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
for (Schema type : types) {
options.add(visit(type, visitor));
if (AvroSchemaUtil.isOptionSchema(schema)) {
for (Schema type : types) {
options.add(visit(type, visitor));
}
} else {
// complex union case
int nonNullIdx = 0;
for (Schema type : types) {
if (type.getType() != Schema.Type.NULL) {
options.add(visitWithName("field" + nonNullIdx, type, visitor));
nonNullIdx += 1;
} else {
options.add(visit(type, visitor));
}
}
}
return visitor.union(schema, options);

case ARRAY:
if (schema.getLogicalType() instanceof LogicalMap) {
return visitor.array(schema, visit(schema.getElementType(), visitor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,38 @@ private static <T> T visitRecord(
private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
List<Schema> types = union.getTypes();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Terminology is a bit confusing (types, branches, options, etc). I would try to qualify them by the source, e.g., renaming types to avroUnionBranches.

List<T> options = Lists.newArrayListWithExpectedSize(types.size());
for (Schema branch : types) {
if (branch.getType() == Schema.Type.NULL) {
options.add(visit((Type) null, branch, visitor));
} else {
options.add(visit(type, branch, visitor));

// simple union case
if (AvroSchemaUtil.isOptionSchema(union)) {
for (Schema branch : types) {
if (branch.getType() == Schema.Type.NULL) {
options.add(visit((Type) null, branch, visitor));
} else {
options.add(visit(type, branch, visitor));
}
}
} else { // complex union case
Preconditions.checkArgument(
type instanceof Types.StructType,
"Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
type,
union);
for (Schema branch : types) {
if (branch.getType() == Schema.Type.NULL) {
options.add(visit((Type) null, branch, visitor));
} else {
Types.NestedField expectedSchemaField = null;
String branchId = branch.getProp(AvroSchemaUtil.BRANCH_ID_PROP);
if (branchId != null) {
expectedSchemaField = type.asStructType().field(Integer.parseInt(branchId));
}
if (expectedSchemaField != null) {
options.add(visit(expectedSchemaField.type(), branch, visitor));
} else {
Type pseudoExpectedSchemaField = AvroSchemaUtil.convert(branch);
options.add(visit(pseudoExpectedSchemaField, branch, visitor));
}
}
}
}
return visitor.union(type, union, options);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here you are returning all the readers and in the UnionReader you are trying to figure out which ones to use and which to ignore. Can we just pass the required readers here in a way that aligns with the expected schema ahead of time?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid not. The readers for all branch types need to pass into UnionReader, as UnionReader needs to read all types of data from the union in Avro file to read the all the records successfully. Filtering the data based on the types projected in expected Iceberg schema can only happens after the data are read from Avro file

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,15 @@ public Schema.Field field(Schema.Field field, Supplier<Schema> fieldResult) {

@Override
public Schema union(Schema union, Iterable<Schema> options) {
Preconditions.checkState(
AvroSchemaUtil.isOptionSchema(union),
"Invalid schema: non-option unions are not supported: %s",
union);
Schema nonNullOriginal = AvroSchemaUtil.fromOption(union);
Schema nonNullResult = AvroSchemaUtil.fromOptions(Lists.newArrayList(options));

if (!Objects.equals(nonNullOriginal, nonNullResult)) {
return AvroSchemaUtil.toOption(nonNullResult);
}
if (AvroSchemaUtil.isOptionSchema(union)) {
Schema nonNullOriginal = AvroSchemaUtil.fromOption(union);
Schema nonNullResult = AvroSchemaUtil.fromOptions(Lists.newArrayList(options));

if (!Objects.equals(nonNullOriginal, nonNullResult)) {
boolean nullIsSecondOption = union.getTypes().get(1).getType() == Schema.Type.NULL;
return AvroSchemaUtil.toOption(nonNullResult, nullIsSecondOption);
}
}
return union;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class ProjectionDatumReader<D> implements DatumReader<D>, SupportsRowPosi
private Schema fileSchema = null;
private DatumReader<D> wrapped = null;

private Map<String, Integer> avroSchemaNameToIcebergFieldId = null;

public ProjectionDatumReader(
Function<Schema, DatumReader<?>> getReader,
org.apache.iceberg.Schema expectedSchema,
Expand All @@ -50,6 +52,16 @@ public ProjectionDatumReader(
this.nameMapping = nameMapping;
}

public ProjectionDatumReader(
Function<Schema, DatumReader<?>> getReader,
org.apache.iceberg.Schema expectedSchema,
Map<String, String> renames,
NameMapping nameMapping,
Map<String, Integer> avroSchemaNameToIcebergFieldId) {
this(getReader, expectedSchema, renames, nameMapping);
this.avroSchemaNameToIcebergFieldId = avroSchemaNameToIcebergFieldId;
}

@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (wrapped instanceof SupportsRowPosition) {
Expand All @@ -64,7 +76,9 @@ public void setSchema(Schema newFileSchema) {
nameMapping = MappingUtil.create(expectedSchema);
Copy link
Copy Markdown
Contributor

@wmoustafa wmoustafa Sep 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that we would add name/type name information to the nameMapping maps. For example ("int" -> 2, "com.my.namespace.MyRecord" -> 3). @rdblue, what do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. See the code I posted above. I think we need to add branch-id to the union branches during pruning.

}
Set<Integer> projectedIds = TypeUtil.getProjectedIds(expectedSchema);
Schema prunedSchema = AvroSchemaUtil.pruneColumns(newFileSchema, projectedIds, nameMapping);
Schema prunedSchema =
AvroSchemaUtil.pruneColumns(
newFileSchema, projectedIds, nameMapping, avroSchemaNameToIcebergFieldId);
this.readSchema = AvroSchemaUtil.buildAvroProjection(prunedSchema, expectedSchema, renames);
this.wrapped = newDatumReader();
}
Expand Down
74 changes: 56 additions & 18 deletions core/src/main/java/org/apache/iceberg/avro/PruneColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,22 @@ class PruneColumns extends AvroSchemaVisitor<Schema> {
private final Set<Integer> selectedIds;
private final NameMapping nameMapping;

private Map<String, Integer> avroSchemaFieldNameToIcebergFieldId;

PruneColumns(Set<Integer> selectedIds, NameMapping nameMapping) {
Preconditions.checkNotNull(selectedIds, "Selected field ids cannot be null");
this.selectedIds = selectedIds;
this.nameMapping = nameMapping;
}

PruneColumns(
Set<Integer> selectedIds,
NameMapping nameMapping,
Map<String, Integer> avroSchemaFieldNameToIcebergFieldId) {
this(selectedIds, nameMapping);
this.avroSchemaFieldNameToIcebergFieldId = avroSchemaFieldNameToIcebergFieldId;
}

Schema rootSchema(Schema record) {
Schema result = visit(record, this);
if (result != null) {
Expand Down Expand Up @@ -118,27 +128,27 @@ public Schema record(Schema record, List<String> names, List<Schema> fields) {

@Override
public Schema union(Schema union, List<Schema> options) {
Preconditions.checkState(
AvroSchemaUtil.isOptionSchema(union),
"Invalid schema: non-option unions are not supported: %s",
union);

// only unions with null are allowed, and a null schema results in null
Schema pruned = null;
if (options.get(0) != null) {
pruned = options.get(0);
} else if (options.get(1) != null) {
pruned = options.get(1);
}
if (AvroSchemaUtil.isOptionSchema(union)) {
// case option union
Schema pruned = null;
if (options.get(0) != null) {
pruned = options.get(0);
} else if (options.get(1) != null) {
pruned = options.get(1);
}

if (pruned != null) {
if (!Objects.equals(pruned, AvroSchemaUtil.fromOption(union))) {
return AvroSchemaUtil.toOption(pruned);
if (pruned != null) {
if (!Objects.equals(pruned, AvroSchemaUtil.fromOption(union))) {
return AvroSchemaUtil.toOption(pruned);
}
return union;
}
return union;
}

return null;
return null;
} else {
// Complex union case
return pruneComplexUnion(union, options);
}
}

@Override
Expand Down Expand Up @@ -345,4 +355,32 @@ private static boolean isOptionSchemaWithNonNullFirstOption(Schema schema) {
return AvroSchemaUtil.isOptionSchema(schema)
&& schema.getTypes().get(0).getType() != Schema.Type.NULL;
}

/**
* For primitive types, the visitResult will be null, we want to reuse the primitive types from
* the original schema, while for nested types, we want to use the visitResult because they have
* content from the previous recursive calls. Also the id of the field in the Iceberg schema
* corresponding to each branch schema of the union is assigned as the property named "branch-id"
* of the branch schema.
*/
private Schema pruneComplexUnion(Schema union, List<Schema> visitResults) {
List<Schema> branches = Lists.newArrayListWithExpectedSize(visitResults.size());

List<Schema> unionTypes = union.getTypes();
for (int i = 0; i < visitResults.size(); ++i) {
Schema branchSchema = visitResults.get(i);
if (branchSchema == null) {
branchSchema = unionTypes.get(i);
}
Integer branchId =
AvroSchemaUtil.getBranchId(
branchSchema, avroSchemaFieldNameToIcebergFieldId, fieldNames());
if (branchId != null) {
branchSchema.addProp(AvroSchemaUtil.BRANCH_ID_PROP, String.valueOf(branchId));
}

branches.add(branchSchema);
}
return Schema.createUnion(branches);
}
}
Loading