diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 7cf6d15a94..e94d8c230d 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -19,6 +19,7 @@ package org.apache.iceberg.avro; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -156,6 +157,29 @@ public static boolean isOptionalComplexUnion(Schema schema) { return false; } + public static Schema discardNullFromUnionIfExist(Schema schema) { + Preconditions.checkArgument(schema.getType() == UNION, + "Expected union schema but was passed: %s", schema); + List result = new ArrayList<>(); + for (Schema nested : schema.getTypes()) { + if (!(nested.getType() == Schema.Type.NULL)) { + result.add(nested); + } + } + return Schema.createUnion(result); + } + + public static boolean nullExistInUnion(Schema schema) { + Preconditions.checkArgument(schema.getType() == UNION, + "Expected union schema but was passed: %s", schema); + for (Schema nested : schema.getTypes()) { + if (nested.getType() == Schema.Type.NULL) { + return true; + } + } + return false; + } + public static Schema toOption(Schema schema) { return toOption(schema, false); } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveSchemaWithPartnerVisitor.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveSchemaWithPartnerVisitor.java index dd2adcf271..f3d702b4f9 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveSchemaWithPartnerVisitor.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveSchemaWithPartnerVisitor.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.iceberg.relocated.com.google.common.collect.Lists; /** @@ -60,6 +61,8 @@ public interface PartnerAccessor { P mapValuePartner(P partnerMap); P listElementPartner(P partnerList); + + P unionObjectPartner(P partnerUnion, int ordinal); } @SuppressWarnings("MethodTypeParameterName") @@ -98,7 +101,15 @@ public static R visit(TypeInfo typeInfo, P partner, HiveSchemaWit return visitor.primitive((PrimitiveTypeInfo) typeInfo, partner); case UNION: - throw new UnsupportedOperationException("Union data type not supported: " + typeInfo); + UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo; + List allAlternatives = unionTypeInfo.getAllUnionObjectTypeInfos(); + List unionResults = Lists.newArrayListWithExpectedSize(allAlternatives.size()); + for (int i = 0; i < allAlternatives.size(); i++) { + P unionObjectPartner = partner != null ? accessor.unionObjectPartner(partner, i) : null; + R result = visit(allAlternatives.get(i), unionObjectPartner, visitor, accessor); + unionResults.add(result); + } + return visitor.union(unionTypeInfo, partner, unionResults); default: throw new UnsupportedOperationException(typeInfo + " not supported"); @@ -121,6 +132,10 @@ public R map(MapTypeInfo map, P partner, R keyResult, R valueResult) { return null; } + public R union(UnionTypeInfo union, P partner, List results) { + return null; + } + public R primitive(PrimitiveTypeInfo primitive, P partner) { return null; } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java index a538429b8f..7d022cfc3a 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeToIcebergType.java @@ -25,12 +25,14 @@ import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; public class HiveTypeToIcebergType extends HiveTypeUtil.HiveSchemaVisitor { + private static final String UNION_TO_STRUCT_CONVERSION_PREFIX = "tag_"; private int nextId = 1; @Override @@ -52,6 +54,16 @@ public Type list(ListTypeInfo list, Type elementResult) { return Types.ListType.ofOptional(allocateId(), elementResult); } + // Mimic the struct call behavior to construct a union converted struct type + @Override + public Type union(UnionTypeInfo union, List unionResults) { + List fields = Lists.newArrayListWithExpectedSize(unionResults.size()); + for (int i = 0; i < unionResults.size(); i++) { + fields.add(Types.NestedField.optional(allocateId(), UNION_TO_STRUCT_CONVERSION_PREFIX + i, unionResults.get(i))); + } + return Types.StructType.of(fields); + } + @Override public Type primitive(PrimitiveTypeInfo primitive) { switch (primitive.getPrimitiveCategory()) { diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeUtil.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeUtil.java index 67c348cff1..9d58ff4b6f 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeUtil.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveTypeUtil.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; @@ -49,7 +50,12 @@ public static T visit(TypeInfo typeInfo, HiveSchemaVisitor visitor) { return visitor.struct(structTypeInfo, names, results); case UNION: - throw new UnsupportedOperationException("Union data type not supported : " + typeInfo); + final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo; + List unionResults = Lists.newArrayListWithExpectedSize(unionTypeInfo.getAllUnionObjectTypeInfos().size()); + for (TypeInfo unionObjectTypeInfo : unionTypeInfo.getAllUnionObjectTypeInfos()) { + unionResults.add(visit(unionObjectTypeInfo, visitor)); + } + return visitor.union(unionTypeInfo, unionResults); case LIST: ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo; @@ -80,6 +86,10 @@ public T map(MapTypeInfo map, T keyResult, T valueResult) { return null; } + public T union(UnionTypeInfo union, List unionResults) { + return null; + } + public T primitive(PrimitiveTypeInfo primitive) { return null; } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/MergeHiveSchemaWithAvro.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/MergeHiveSchemaWithAvro.java index 55018c7d77..0dec45757f 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/MergeHiveSchemaWithAvro.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/MergeHiveSchemaWithAvro.java @@ -19,6 +19,7 @@ package org.apache.iceberg.hive.legacy; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.avro.JsonProperties; @@ -30,6 +31,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -124,6 +126,17 @@ public Schema map(MapTypeInfo map, Schema partner, Schema keyResult, Schema valu return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; } + @Override + public Schema union(UnionTypeInfo union, Schema partner, List results) { + if (AvroSchemaUtil.nullExistInUnion(partner)) { + List toAddNull = new ArrayList<>(); + toAddNull.add(Schema.create(Schema.Type.NULL)); + toAddNull.addAll(results); + return Schema.createUnion(toAddNull); + } + return Schema.createUnion(results); + } + @Override public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) { boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); @@ -177,6 +190,15 @@ public Schema listElementPartner(Schema partner) { return (schema.getType() == Schema.Type.ARRAY) ? schema.getElementType() : null; } + @Override + public Schema unionObjectPartner(Schema partner, int ordinal) { + if (partner.getType() != Schema.Type.UNION) { + return null; + } + Schema schema = AvroSchemaUtil.discardNullFromUnionIfExist(partner); + return schema.getTypes().get(ordinal); + } + private Schema.Field findCaseInsensitive(Schema struct, String fieldName) { Preconditions.checkArgument(struct.getType() == Schema.Type.RECORD); // TODO: Optimize? This will be called for every struct field, we will run the for loop for every struct field diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestHiveSchemaConversions.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestHiveSchemaConversions.java index ccbaffd284..7b12233f25 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestHiveSchemaConversions.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestHiveSchemaConversions.java @@ -80,6 +80,7 @@ public void testConversions() { "struct<" + "length:int,count:int,list:array>," + "wordcounts:map>"); + check("struct<1: tag_0: optional int, 2: tag_1: optional string>", "uniontype"); } private static void check(String icebergTypeStr, String hiveTypeStr) { diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestMergeHiveSchemaWithAvro.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestMergeHiveSchemaWithAvro.java index 7ad76dae7a..4310a1e9df 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestMergeHiveSchemaWithAvro.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestMergeHiveSchemaWithAvro.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Arrays; import java.util.Map; +import java.util.stream.Collectors; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.util.internal.JacksonUtils; @@ -226,6 +227,25 @@ public void shouldHandleMaps() { assertSchema(expected, merge(hive, avro)); } + @Test + public void shouldHandleUnions() { + String hive = "struct,fb:uniontype,fc:uniontype>"; + Schema avro = struct("r1", + required("fA", union(Schema.Type.NULL, Schema.Type.STRING, Schema.Type.INT)), + required("fB", union(Schema.Type.STRING, Schema.Type.INT)), + required("fC", union(Schema.Type.STRING, Schema.Type.INT, Schema.Type.NULL)) + ); + + Schema expected = struct("r1", + required("fA", union(Schema.Type.NULL, Schema.Type.STRING, Schema.Type.INT)), + required("fB", union(Schema.Type.STRING, Schema.Type.INT)), + // our merge logic always put the NULL alternative in the front + required("fC", union(Schema.Type.NULL, Schema.Type.STRING, Schema.Type.INT)) + ); + + assertSchema(expected, merge(hive, avro)); + } + @Test public void shouldSanitizeIncompatibleFieldNames() { StructTypeInfo typeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo( @@ -336,6 +356,10 @@ private Schema map(Schema.Type valueType) { return map(Schema.create(valueType)); } + private Schema union(Schema.Type... types) { + return Schema.createUnion(Arrays.stream(types).map(Schema::create).collect(Collectors.toList())); + } + private Schema.Field nullable(Schema.Field field) { Preconditions.checkArgument(!AvroSchemaUtil.isOptionSchema(field.schema())); return field(field.name(), nullable(field.schema()), field.doc(),