Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public static Type convert(Schema schema) {
return AvroSchemaVisitor.visit(schema, new SchemaToType(schema));
}

public static Type convertToDeriveNameMapping(Schema schema) {
return AvroSchemaVisitor.visit(schema, new SchemaToType(schema, true));
}

public static org.apache.iceberg.Schema toIceberg(Schema schema) {
final List<Types.NestedField> fields = convert(schema).asNestedType().asStructType().fields();
return new org.apache.iceberg.Schema(fields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Deque;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -92,16 +93,24 @@ private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisit
} else { // complex union case
Preconditions.checkArgument(type instanceof Types.StructType,
"Cannot visit invalid Iceberg type: %s for Avro complex union type: %s", type, union);

List<Types.NestedField> fields = type.asStructType().fields();
// start index from 1 because 0 is the tag field which doesn't exist in the original Avro schema
int index = 1;
Map<String, Integer> fieldNameToId = (Map) union.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID);
for (Schema branch : types) {
if (branch.getType() == Schema.Type.NULL) {
options.add(visit((Type) null, branch, visitor));
} else {
options.add(visit(fields.get(index).type(), branch, visitor));
index += 1;
String name = branch.getType().equals(Schema.Type.RECORD) ? branch.getName() : branch.getType().getName();
if (fieldNameToId.containsKey(name)) {
int fieldId = fieldNameToId.get(name);
Types.NestedField branchType = type.asStructType().field(fieldId);
if (branchType != null) {
options.add(visit(branchType.type(), branch, visitor));
} else {
Type pseudoBranchType = AvroSchemaUtil.convert(branch);
options.add(visit(pseudoBranchType, branch, visitor));
}
} else {
options.add(visit((Type) null, branch, visitor));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iceberg.avro;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -162,41 +161,8 @@ public Schema union(Schema union, Iterable<Schema> options) {
if (!Objects.equals(nonNullOriginal, nonNullResult)) {
return AvroSchemaUtil.toOption(nonNullResult);
}

return union;
} else { // Complex union
Preconditions.checkArgument(current instanceof Types.StructType,
"Incompatible projected type: %s for Avro complex union type: %s", current, union);

Types.StructType asStructType = current.asStructType();

long nonNullBranchesCount = union.getTypes().stream()
.filter(branch -> branch.getType() != Schema.Type.NULL).count();
Preconditions.checkState(asStructType.fields().size() > nonNullBranchesCount,
"Column projection on struct converted from Avro complex union type: %s is not supported", union);

Iterator<Schema> resultBranchIterator = options.iterator();

// we start index from 1 because 0 is the tag field which doesn't exist in the original Avro
int index = 1;
List<Schema> resultBranches = Lists.newArrayListWithExpectedSize(union.getTypes().size());

try {
for (Schema originalBranch : union.getTypes()) {
if (originalBranch.getType() == Schema.Type.NULL) {
resultBranches.add(resultBranchIterator.next());
} else {
this.current = asStructType.fields().get(index).type();
resultBranches.add(resultBranchIterator.next());
index += 1;
}
}

return Schema.createUnion(resultBranches);
} finally {
this.current = asStructType;
}
}
return union;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
@Override
public void setSchema(Schema newFileSchema) {
this.fileSchema = newFileSchema;
AvroSchemaUtil.convertToDeriveNameMapping(this.fileSchema);
if (nameMapping == null && !AvroSchemaUtil.hasIds(fileSchema)) {
nameMapping = MappingUtil.create(expectedSchema);
}
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/avro/PruneColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,11 @@ private static Schema copyUnion(Schema record, List<Schema> visitResults) {
branches.add(visitResults.get(i));
}
}
return Schema.createUnion(branches);
Schema schema = Schema.createUnion(branches);
if (record.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID) != null) {
schema.addProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID,
record.getObjectProp(SchemaToType.AVRO_FIELD_NAME_TO_ICEBERG_ID));
}
return schema;
}
}
43 changes: 42 additions & 1 deletion core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@

package org.apache.iceberg.avro;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;

class SchemaToType extends AvroSchemaVisitor<Type> {
public static final String AVRO_FIELD_NAME_TO_ICEBERG_ID = "AvroFieldNameToIcebergId";
private final Schema root;
private boolean deriveNameMapping;

SchemaToType(Schema root) {
this.root = root;
Expand All @@ -39,6 +44,11 @@ class SchemaToType extends AvroSchemaVisitor<Type> {
}
}

SchemaToType(Schema root, boolean deriveNameMapping) {
this(root);
this.deriveNameMapping = deriveNameMapping;
}

private int nextId = 1;

private int getElementId(Schema schema) {
Expand Down Expand Up @@ -88,6 +98,7 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {
this.nextId = 0;
}

Map<String, Integer> fieldNameToId = Maps.newHashMap();
for (int i = 0; i < fields.size(); i += 1) {
Schema.Field field = fields.get(i);
Type fieldType = fieldTypes.get(i);
Expand All @@ -98,6 +109,11 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {
} else {
newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, field.doc()));
}
fieldNameToId.put(field.name(), fieldId);
}

if (deriveNameMapping && record.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
record.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);
}

return Types.StructType.of(newFields);
Expand All @@ -115,16 +131,25 @@ public Type union(Schema union, List<Type> options) {
}
} else {
// Complex union
Map<String, Integer> fieldNameToId = Maps.newHashMap();
List<Types.NestedField> newFields = Lists.newArrayList();
newFields.add(Types.NestedField.required(allocateId(), "tag", Types.IntegerType.get()));

int tagIndex = 0;
for (Type type : options) {
if (type != null) {
newFields.add(Types.NestedField.optional(allocateId(), "field" + tagIndex++, type));
int fieldId = allocateId();
Schema schema = union.getTypes().get(tagIndex);
newFields.add(Types.NestedField.optional(fieldId, "field" + tagIndex++, type));
String name = schema.getType().equals(Schema.Type.RECORD) ? schema.getName() : schema.getType().getName();
fieldNameToId.put(name, fieldId);
}
}

if (deriveNameMapping && union.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
union.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);
}

return Types.StructType.of(newFields);
}
}
Expand All @@ -141,6 +166,13 @@ public Type array(Schema array, Type elementType) {
Types.NestedField keyField = keyValueType.field("key");
Types.NestedField valueField = keyValueType.field("value");

if (deriveNameMapping && array.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
Map<String, Integer> fieldNameToId = Maps.newHashMap();
fieldNameToId.put("key", keyField.fieldId());
fieldNameToId.put("value", valueField.fieldId());
array.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);
}

if (keyValueType.field("value").isOptional()) {
return Types.MapType.ofOptional(
keyField.fieldId(), valueField.fieldId(), keyField.type(), valueField.type());
Expand All @@ -153,6 +185,9 @@ public Type array(Schema array, Type elementType) {
// normal array
Schema elementSchema = array.getElementType();
int id = getElementId(array);
if (deriveNameMapping && array.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
array.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, Collections.singletonMap("element", id));
}
if (AvroSchemaUtil.isOptionSchema(elementSchema)) {
return Types.ListType.ofOptional(id, elementType);
} else {
Expand All @@ -166,6 +201,12 @@ public Type map(Schema map, Type valueType) {
Schema valueSchema = map.getValueType();
int keyId = getKeyId(map);
int valueId = getValueId(map);
if (deriveNameMapping && map.getObjectProp(AVRO_FIELD_NAME_TO_ICEBERG_ID) == null) {
Map<String, Integer> fieldNameToId = Maps.newHashMap();
fieldNameToId.put("key", keyId);
fieldNameToId.put("value", valueId);
map.addProp(AVRO_FIELD_NAME_TO_ICEBERG_ID, fieldNameToId);
}

if (AvroSchemaUtil.isOptionSchema(valueSchema)) {
return Types.MapType.ofOptional(keyId, valueId, Types.StringType.get(), valueType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ private static class RandomDataGenerator extends TypeUtil.CustomOrderSchemaVisit

private RandomDataGenerator(Schema schema, long seed) {
this.typeToSchema = AvroSchemaUtil.convertTypes(schema.asStruct(), "test");
for (org.apache.avro.Schema s : typeToSchema.values()) {
AvroSchemaUtil.convertToDeriveNameMapping(s);
}
this.random = new Random(seed);
}

Expand Down
Loading