Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.avro;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -156,6 +157,29 @@ public static boolean isOptionalComplexUnion(Schema schema) {
return false;
}

public static Schema discardNullFromUnionIfExist(Schema schema) {
Preconditions.checkArgument(schema.getType() == UNION,
"Expected union schema but was passed: %s", schema);
List<Schema> result = new ArrayList<>();
for (Schema nested : schema.getTypes()) {
if (!(nested.getType() == Schema.Type.NULL)) {
result.add(nested);
}
}
return Schema.createUnion(result);
}

public static boolean nullExistInUnion(Schema schema) {
Preconditions.checkArgument(schema.getType() == UNION,
"Expected union schema but was passed: %s", schema);
for (Schema nested : schema.getTypes()) {
if (nested.getType() == Schema.Type.NULL) {
return true;
}
}
return false;
}

public static Schema toOption(Schema schema) {
return toOption(schema, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -60,6 +61,8 @@ public interface PartnerAccessor<P, FP> {
P mapValuePartner(P partnerMap);

P listElementPartner(P partnerList);

P unionObjectPartner(P partnerUnion, int ordinal);
}

@SuppressWarnings("MethodTypeParameterName")
Expand Down Expand Up @@ -98,7 +101,15 @@ public static <P, FP, R, FR> R visit(TypeInfo typeInfo, P partner, HiveSchemaWit
return visitor.primitive((PrimitiveTypeInfo) typeInfo, partner);

case UNION:
throw new UnsupportedOperationException("Union data type not supported: " + typeInfo);
UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
List<TypeInfo> allAlternatives = unionTypeInfo.getAllUnionObjectTypeInfos();
List<R> unionResults = Lists.newArrayListWithExpectedSize(allAlternatives.size());
for (int i = 0; i < allAlternatives.size(); i++) {
P unionObjectPartner = partner != null ? accessor.unionObjectPartner(partner, i) : null;
R result = visit(allAlternatives.get(i), unionObjectPartner, visitor, accessor);
unionResults.add(result);
}
return visitor.union(unionTypeInfo, partner, unionResults);

default:
throw new UnsupportedOperationException(typeInfo + " not supported");
Expand All @@ -121,6 +132,10 @@ public R map(MapTypeInfo map, P partner, R keyResult, R valueResult) {
return null;
}

public R union(UnionTypeInfo union, P partner, List<R> results) {
return null;
}

public R primitive(PrimitiveTypeInfo primitive, P partner) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;


public class HiveTypeToIcebergType extends HiveTypeUtil.HiveSchemaVisitor<Type> {
private static final String UNION_TO_STRUCT_CONVERSION_PREFIX = "tag_";
private int nextId = 1;

@Override
Expand All @@ -52,6 +54,16 @@ public Type list(ListTypeInfo list, Type elementResult) {
return Types.ListType.ofOptional(allocateId(), elementResult);
}

// Mimic the struct call behavior to construct a union converted struct type
@Override
public Type union(UnionTypeInfo union, List<Type> unionResults) {
List<Types.NestedField> fields = Lists.newArrayListWithExpectedSize(unionResults.size());
for (int i = 0; i < unionResults.size(); i++) {
fields.add(Types.NestedField.optional(allocateId(), UNION_TO_STRUCT_CONVERSION_PREFIX + i, unionResults.get(i)));
}
return Types.StructType.of(fields);
}

@Override
public Type primitive(PrimitiveTypeInfo primitive) {
switch (primitive.getPrimitiveCategory()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;

Expand All @@ -49,7 +50,12 @@ public static <T> T visit(TypeInfo typeInfo, HiveSchemaVisitor<T> visitor) {
return visitor.struct(structTypeInfo, names, results);

case UNION:
throw new UnsupportedOperationException("Union data type not supported : " + typeInfo);
final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
List<T> unionResults = Lists.newArrayListWithExpectedSize(unionTypeInfo.getAllUnionObjectTypeInfos().size());
for (TypeInfo unionObjectTypeInfo : unionTypeInfo.getAllUnionObjectTypeInfos()) {
unionResults.add(visit(unionObjectTypeInfo, visitor));
}
return visitor.union(unionTypeInfo, unionResults);

case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
Expand Down Expand Up @@ -80,6 +86,10 @@ public T map(MapTypeInfo map, T keyResult, T valueResult) {
return null;
}

public T union(UnionTypeInfo union, List<T> unionResults) {
return null;
}

public T primitive(PrimitiveTypeInfo primitive) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.hive.legacy;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.JsonProperties;
Expand All @@ -30,6 +31,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

Expand Down Expand Up @@ -124,6 +126,17 @@ public Schema map(MapTypeInfo map, Schema partner, Schema keyResult, Schema valu
return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result;
}

@Override
public Schema union(UnionTypeInfo union, Schema partner, List<Schema> results) {
if (AvroSchemaUtil.nullExistInUnion(partner)) {
List<Schema> toAddNull = new ArrayList<>();
toAddNull.add(Schema.create(Schema.Type.NULL));
toAddNull.addAll(results);
return Schema.createUnion(toAddNull);
}
return Schema.createUnion(results);
}

@Override
public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) {
boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner);
Expand Down Expand Up @@ -177,6 +190,15 @@ public Schema listElementPartner(Schema partner) {
return (schema.getType() == Schema.Type.ARRAY) ? schema.getElementType() : null;
}

@Override
public Schema unionObjectPartner(Schema partner, int ordinal) {
if (partner.getType() != Schema.Type.UNION) {
return null;
}
Schema schema = AvroSchemaUtil.discardNullFromUnionIfExist(partner);
return schema.getTypes().get(ordinal);
}

private Schema.Field findCaseInsensitive(Schema struct, String fieldName) {
Preconditions.checkArgument(struct.getType() == Schema.Type.RECORD);
// TODO: Optimize? This will be called for every struct field, we will run the for loop for every struct field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public void testConversions() {
"struct<" +
"length:int,count:int,list:array<struct<lastword:string,lastwordlength:int>>," +
"wordcounts:map<string,int>>");
check("struct<1: tag_0: optional int, 2: tag_1: optional string>", "uniontype<int,string>");
}

private static void check(String icebergTypeStr, String hiveTypeStr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.util.internal.JacksonUtils;
Expand Down Expand Up @@ -226,6 +227,25 @@ public void shouldHandleMaps() {
assertSchema(expected, merge(hive, avro));
}

@Test
public void shouldHandleUnions() {
String hive = "struct<fa:uniontype<string,int>,fb:uniontype<string,int>,fc:uniontype<string,int>>";
Schema avro = struct("r1",
required("fA", union(Schema.Type.NULL, Schema.Type.STRING, Schema.Type.INT)),
required("fB", union(Schema.Type.STRING, Schema.Type.INT)),
required("fC", union(Schema.Type.STRING, Schema.Type.INT, Schema.Type.NULL))
);

Schema expected = struct("r1",
required("fA", union(Schema.Type.NULL, Schema.Type.STRING, Schema.Type.INT)),
required("fB", union(Schema.Type.STRING, Schema.Type.INT)),
// our merge logic always put the NULL alternative in the front
required("fC", union(Schema.Type.NULL, Schema.Type.STRING, Schema.Type.INT))
);

assertSchema(expected, merge(hive, avro));
}

@Test
public void shouldSanitizeIncompatibleFieldNames() {
StructTypeInfo typeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(
Expand Down Expand Up @@ -336,6 +356,10 @@ private Schema map(Schema.Type valueType) {
return map(Schema.create(valueType));
}

private Schema union(Schema.Type... types) {
return Schema.createUnion(Arrays.stream(types).map(Schema::create).collect(Collectors.toList()));
}

private Schema.Field nullable(Schema.Field field) {
Preconditions.checkArgument(!AvroSchemaUtil.isOptionSchema(field.schema()));
return field(field.name(), nullable(field.schema()), field.doc(),
Expand Down