diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index d06029915c..42442ff2dd 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -130,7 +130,7 @@ public static boolean isOptionSchema(Schema schema) { return false; } - static Schema toOption(Schema schema) { + public static Schema toOption(Schema schema) { if (schema.getType() == UNION) { Preconditions.checkArgument(isOptionSchema(schema), "Union schemas are not supported: %s", schema); @@ -140,7 +140,7 @@ static Schema toOption(Schema schema) { } } - static Schema fromOption(Schema schema) { + public static Schema fromOption(Schema schema) { Preconditions.checkArgument(schema.getType() == UNION, "Expected union schema but was passed: %s", schema); Preconditions.checkArgument(schema.getTypes().size() == 2, @@ -317,7 +317,7 @@ private static int toInt(Object value) { throw new UnsupportedOperationException("Cannot coerce value to int: " + value); } - static Schema copyRecord(Schema record, List newFields, String newName) { + public static Schema copyRecord(Schema record, List newFields, String newName) { Schema copy; if (newName != null) { copy = Schema.createRecord(newName, record.getDoc(), null, record.isError(), newFields); @@ -337,7 +337,7 @@ static Schema copyRecord(Schema record, List newFields, String new return copy; } - static Schema.Field copyField(Schema.Field field, Schema newSchema, String newName) { + public static Schema.Field copyField(Schema.Field field, Schema newSchema, String newName) { Schema.Field copy = new Schema.Field(newName, newSchema, field.doc(), field.defaultVal(), field.order()); diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveSchemaWithPartnerVisitor.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveSchemaWithPartnerVisitor.java new file mode 100644 index 0000000000..dd2adcf271 --- /dev/null +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/HiveSchemaWithPartnerVisitor.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.util.List; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * A Hive {@link TypeInfo} visitor with an accompanying partner schema + * + * This visitor traverses the Hive {@link TypeInfo} tree contiguously accessing the schema tree for the partner schema + * using {@link PartnerAccessor}. When visiting each type in the Hive tree, the implementation is also presented + * with the corresponding type from the partner schema, or else a {@code null} if no match was found. Matching + * behavior can be controlled by implementing the methods in {@link PartnerAccessor} + * + * @param

type of partner schema + * @param type of the field representation in the partner schema + * @param type of the resultant schema generated by the visitor + * @param type of the field representation in the resultant schema + */ +@SuppressWarnings("ClassTypeParameterName") +public abstract class HiveSchemaWithPartnerVisitor { + + /** + * Methods to access types in the partner schema corresponding to types in the Hive schema being traversed + * + * @param

type of partner schema + * @param type of the field representation in the partner schema + */ + public interface PartnerAccessor { + + FP fieldPartner(P partnerStruct, String fieldName); + + P fieldType(FP partnerField); + + P mapKeyPartner(P partnerMap); + + P mapValuePartner(P partnerMap); + + P listElementPartner(P partnerList); + } + + @SuppressWarnings("MethodTypeParameterName") + public static R visit(TypeInfo typeInfo, P partner, HiveSchemaWithPartnerVisitor visitor, + PartnerAccessor accessor) { + switch (typeInfo.getCategory()) { + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List names = structTypeInfo.getAllStructFieldNames(); + List results = Lists.newArrayListWithExpectedSize(names.size()); + for (String name : names) { + TypeInfo fieldTypeInfo = structTypeInfo.getStructFieldTypeInfo(name); + FP fieldPartner = partner != null ? accessor.fieldPartner(partner, name) : null; + P fieldPartnerType = fieldPartner != null ? accessor.fieldType(fieldPartner) : null; + R result = visit(fieldTypeInfo, fieldPartnerType, visitor, accessor); + results.add(visitor.field(name, fieldTypeInfo, fieldPartner, result)); + } + return visitor.struct(structTypeInfo, partner, results); + + case LIST: + ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo; + TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo(); + P elementPartner = partner != null ? accessor.listElementPartner(partner) : null; + R elementResult = visit(elementTypeInfo, elementPartner, visitor, accessor); + return visitor.list(listTypeInfo, partner, elementResult); + + case MAP: + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + P keyPartner = partner != null ? accessor.mapKeyPartner(partner) : null; + R keyResult = visit(mapTypeInfo.getMapKeyTypeInfo(), keyPartner, visitor, accessor); + P valuePartner = partner != null ? accessor.mapValuePartner(partner) : null; + R valueResult = visit(mapTypeInfo.getMapValueTypeInfo(), valuePartner, visitor, accessor); + return visitor.map(mapTypeInfo, partner, keyResult, valueResult); + + case PRIMITIVE: + return visitor.primitive((PrimitiveTypeInfo) typeInfo, partner); + + case UNION: + throw new UnsupportedOperationException("Union data type not supported: " + typeInfo); + + default: + throw new UnsupportedOperationException(typeInfo + " not supported"); + } + } + + public R struct(StructTypeInfo struct, P partner, List fieldResults) { + return null; + } + + public FR field(String name, TypeInfo field, FP partner, R fieldResult) { + return null; + } + + public R list(ListTypeInfo list, P partner, R elementResult) { + return null; + } + + public R map(MapTypeInfo map, P partner, R keyResult, R valueResult) { + return null; + } + + public R primitive(PrimitiveTypeInfo primitive, P partner) { + return null; + } +} diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java index 3cd66455d3..21b55442f1 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/LegacyHiveTableUtils.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -35,6 +36,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.AvroSchemaVisitor; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Conversions; @@ -54,12 +56,24 @@ private LegacyHiveTableUtils() { static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) { Map props = getTableProperties(table); String schemaStr = props.get("avro.schema.literal"); + org.apache.avro.Schema avroSchema = schemaStr != null ? new org.apache.avro.Schema.Parser().parse(schemaStr) : null; Schema schema; - if (schemaStr != null) { - schema = AvroSchemaUtil.toIceberg(new org.apache.avro.Schema.Parser().parse(schemaStr)); + if (avroSchema != null) { + FileFormat format = serdeToFileFormat(table.getSd().getSerdeInfo().getSerializationLib()); + org.apache.avro.Schema finalAvroSchema; + if (format.equals(FileFormat.AVRO) || HasDuplicateLowercaseColumnNames.visit(avroSchema)) { + // Case 1: If serde == AVRO, early escape; Hive column info is not reliable and can be empty for these tables + // Hive itself uses avro.schema.literal as source of truth for these tables, so this should be fine + // Case 2: If avro.schema.literal has duplicate column names when lowercased, that means we cannot do reliable + // matching with Hive schema as multiple Avro fields can map to the same Hive field + finalAvroSchema = avroSchema; + } else { + finalAvroSchema = MergeHiveSchemaWithAvro.visit(structTypeInfoFromCols(table.getSd().getCols()), avroSchema); + } + schema = AvroSchemaUtil.toIceberg(finalAvroSchema); } else { //TODO: Do we need to support column and column.types properties for ORC tables? - LOG.warn("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. " + + LOG.info("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. " + "The schema will not have case sensitivity and nullability information", table.getDbName(), table.getTableName()); Type icebergType = HiveTypeUtil.convert(structTypeInfoFromCols(table.getSd().getCols())); @@ -74,7 +88,7 @@ static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) { return new Schema(fields); } - static TypeInfo structTypeInfoFromCols(List cols) { + static StructTypeInfo structTypeInfoFromCols(List cols) { Preconditions.checkArgument(cols != null && cols.size() > 0, "No Hive schema present"); List fieldNames = cols .stream() @@ -84,7 +98,7 @@ static TypeInfo structTypeInfoFromCols(List cols) { .stream() .map(f -> TypeInfoUtils.getTypeInfoFromTypeString(f.getType())) .collect(Collectors.toList()); - return TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypeInfos); + return (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypeInfos); } private static Schema partitionSchema(List partitionKeys, Schema dataSchema) { @@ -169,4 +183,39 @@ private static FileFormat serdeToFileFormat(String serde) { throw new IllegalArgumentException("Unrecognized serde: " + serde); } } + + private static class HasDuplicateLowercaseColumnNames extends AvroSchemaVisitor { + private static final HasDuplicateLowercaseColumnNames INSTANCE = new HasDuplicateLowercaseColumnNames(); + + private static boolean visit(org.apache.avro.Schema schema) { + return AvroSchemaVisitor.visit(schema, INSTANCE); + } + + @Override + public Boolean record(org.apache.avro.Schema record, List names, List fieldResults) { + return fieldResults.stream().anyMatch(x -> x) || + names.stream().collect(Collectors.groupingBy(String::toLowerCase)) + .values().stream().anyMatch(x -> x.size() > 1); + } + + @Override + public Boolean union(org.apache.avro.Schema union, List optionResults) { + return optionResults.stream().anyMatch(x -> x); + } + + @Override + public Boolean array(org.apache.avro.Schema array, Boolean elementResult) { + return elementResult; + } + + @Override + public Boolean map(org.apache.avro.Schema map, Boolean valueResult) { + return valueResult; + } + + @Override + public Boolean primitive(org.apache.avro.Schema primitive) { + return false; + } + } } diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/MergeHiveSchemaWithAvro.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/MergeHiveSchemaWithAvro.java new file mode 100644 index 0000000000..a0e56a8afc --- /dev/null +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/legacy/MergeHiveSchemaWithAvro.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.avro.JsonProperties; +import org.apache.avro.Schema; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.codehaus.jackson.node.JsonNodeFactory; + + +/** + * A {@link HiveSchemaWithPartnerVisitor} which augments a Hive schema with extra metadata from a partner Avro schema + * and generates a resultant "merged" Avro schema + * + * 1. Fields are matched between Hive and Avro schemas using a case insensitive search by field name + * 2. Copies field names, nullability, default value, field props from the Avro schema + * 3. Copies field type from the Hive schema. + * TODO: We should also handle some cases of type promotion where the types in Avro are potentially more correct + * e.g.BINARY in Hive -> FIXED in Avro, STRING in Hive -> ENUM in Avro, etc + * 4. Retains fields found only in the Hive schema; Ignores fields found only in the Avro schema + * 5. Fields found only in Hive schema are represented as optional fields in the resultant Avro schema + * 6. For fields found only in Hive schema, field names are sanitized to make them compatible with Avro identifier spec + */ +class MergeHiveSchemaWithAvro extends HiveSchemaWithPartnerVisitor { + + static Schema visit(StructTypeInfo typeInfo, Schema schema) { + return HiveSchemaWithPartnerVisitor.visit(typeInfo, schema, new MergeHiveSchemaWithAvro(), + AvroPartnerAccessor.INSTANCE); + } + + private final AtomicInteger recordCounter = new AtomicInteger(0); + + @Override + public Schema struct(StructTypeInfo struct, Schema partner, List fieldResults) { + boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); + Schema result; + if (partner == null || extractIfOption(partner).getType() != Schema.Type.RECORD) { + // if there was no matching Avro struct, return a struct with new record/namespace + int recordNum = recordCounter.incrementAndGet(); + result = Schema.createRecord("record" + recordNum, null, "namespace" + recordNum, false, fieldResults); + } else { + result = AvroSchemaUtil.copyRecord(extractIfOption(partner), fieldResults, null); + } + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; + } + + @Override + public Schema.Field field(String name, TypeInfo field, Schema.Field partner, Schema fieldResult) { + // No need to infer `shouldResultBeOptional`. We expect other visitor methods to return optional schemas + // in their field results if required + if (partner == null) { + // if there was no matching Avro field, use name form the Hive schema and set a null default + return new Schema.Field( + AvroSchemaUtil.makeCompatibleName(name), fieldResult, null, Schema.Field.NULL_DEFAULT_VALUE); + } else { + // TODO: How to ensure that field default value is compatible with new field type generated from Hive? + // Copy field type from the visitor result, copy everything else from the partner + // Avro requires the default value to match the first type in the option, reorder option if required + Schema reordered = reorderOptionIfRequired(fieldResult, partner.defaultVal()); + return AvroSchemaUtil.copyField(partner, reordered, partner.name()); + } + } + + /** + * Reorders an option schema so that the type of the provided default value is the first type in the option schema + * + * e.g. If the schema is (NULL, INT) and the default value is 1, the returned schema is (INT, NULL) + * If the schema is not an option schema or if there is no default value, schema is returned as-is + */ + private Schema reorderOptionIfRequired(Schema schema, Object defaultValue) { + if (AvroSchemaUtil.isOptionSchema(schema) && defaultValue != null) { + boolean isNullFirstOption = schema.getTypes().get(0).getType() == Schema.Type.NULL; + if (isNullFirstOption && defaultValue.equals(JsonProperties.NULL_VALUE)) { + return schema; + } else { + return Schema.createUnion(schema.getTypes().get(1), schema.getTypes().get(0)); + } + } else { + return schema; + } + } + + @Override + public Schema list(ListTypeInfo list, Schema partner, Schema elementResult) { + // if there was no matching Avro list, or if matching Avro list was an option, return an optional list + boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); + Schema result = Schema.createArray(elementResult); + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; + } + + @Override + public Schema map(MapTypeInfo map, Schema partner, Schema keyResult, Schema valueResult) { + Preconditions.checkArgument(extractIfOption(keyResult).getType() == Schema.Type.STRING, + "Map keys should always be non-nullable strings. Found: %s", keyResult); + // if there was no matching Avro map, or if matching Avro map was an option, return an optional map + boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); + Schema result = Schema.createMap(valueResult); + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; + } + + @Override + public Schema primitive(PrimitiveTypeInfo primitive, Schema partner) { + boolean shouldResultBeOptional = partner == null || AvroSchemaUtil.isOptionSchema(partner); + Schema hivePrimitive = hivePrimitiveToAvro(primitive); + // if there was no matching Avro primitive, use the Hive primitive + Schema result = partner == null ? hivePrimitive : checkCompatibilityAndPromote(hivePrimitive, partner); + return shouldResultBeOptional ? AvroSchemaUtil.toOption(result) : result; + } + + private Schema checkCompatibilityAndPromote(Schema schema, Schema partner) { + // TODO: Check if schema is compatible with partner + // Also do type promotion if required, schema = string & partner = enum, schema = bytes & partner = fixed, etc + return schema; + } + + /** + * A {@link PartnerAccessor} which matches the requested field from a partner Avro struct by case insensitive + * field name match + */ + private static class AvroPartnerAccessor implements PartnerAccessor { + private static final AvroPartnerAccessor INSTANCE = new AvroPartnerAccessor(); + + private static final Schema MAP_KEY = Schema.create(Schema.Type.STRING); + + @Override + public Schema.Field fieldPartner(Schema partner, String fieldName) { + Schema schema = extractIfOption(partner); + return (schema.getType() == Schema.Type.RECORD) ? findCaseInsensitive(schema, fieldName) : null; + } + + @Override + public Schema fieldType(Schema.Field partnerField) { + return partnerField.schema(); + } + + @Override + public Schema mapKeyPartner(Schema partner) { + Schema schema = extractIfOption(partner); + return (schema.getType() == Schema.Type.MAP) ? MAP_KEY : null; + } + + @Override + public Schema mapValuePartner(Schema partner) { + Schema schema = extractIfOption(partner); + return (schema.getType() == Schema.Type.MAP) ? schema.getValueType() : null; + } + + @Override + public Schema listElementPartner(Schema partner) { + Schema schema = extractIfOption(partner); + return (schema.getType() == Schema.Type.ARRAY) ? schema.getElementType() : null; + } + + private Schema.Field findCaseInsensitive(Schema struct, String fieldName) { + Preconditions.checkArgument(struct.getType() == Schema.Type.RECORD); + // TODO: Optimize? This will be called for every struct field, we will run the for loop for every struct field + for (Schema.Field field : struct.getFields()) { + if (field.name().equalsIgnoreCase(fieldName)) { + return field; + } + } + return null; + } + } + + private static Schema extractIfOption(Schema schema) { + if (AvroSchemaUtil.isOptionSchema(schema)) { + return AvroSchemaUtil.fromOption(schema); + } else { + return schema; + } + } + + // Additional numeric type, similar to other logical type names in AvroSerde + private static final String SHORT_TYPE_NAME = "short"; + private static final String BYTE_TYPE_NAME = "byte"; + + // TODO: This should be refactored into a visitor if we ever require conversion of complex types + public Schema hivePrimitiveToAvro(PrimitiveTypeInfo primitive) { + Schema schema; + switch (primitive.getPrimitiveCategory()) { + case INT: + return Schema.create(Schema.Type.INT); + + case BYTE: + schema = Schema.create(Schema.Type.INT); + schema.addProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE, BYTE_TYPE_NAME); + return schema; + + case SHORT: + schema = Schema.create(Schema.Type.INT); + schema.addProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE, SHORT_TYPE_NAME); + return schema; + + case LONG: + return Schema.create(Schema.Type.LONG); + + case FLOAT: + return Schema.create(Schema.Type.FLOAT); + + case DOUBLE: + return Schema.create(Schema.Type.DOUBLE); + + case BOOLEAN: + return Schema.create(Schema.Type.BOOLEAN); + + case CHAR: + case STRING: + case VARCHAR: + return Schema.create(Schema.Type.STRING); + + case BINARY: + return Schema.create(Schema.Type.BYTES); + + case VOID: + return Schema.create(Schema.Type.NULL); + + case DATE: + schema = Schema.create(Schema.Type.INT); + schema.addProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE, AvroSerDe.DATE_TYPE_NAME); + return schema; + + case TIMESTAMP: + schema = Schema.create(Schema.Type.LONG); + schema.addProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE, AvroSerDe.TIMESTAMP_TYPE_NAME); + return schema; + + case DECIMAL: + DecimalTypeInfo dti = (DecimalTypeInfo) primitive; + JsonNodeFactory factory = JsonNodeFactory.instance; + schema = Schema.create(Schema.Type.BYTES); + schema.addProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE, AvroSerDe.DECIMAL_TYPE_NAME); + schema.addProp(AvroSerDe.AVRO_PROP_PRECISION, factory.numberNode(dti.getPrecision())); + schema.addProp(AvroSerDe.AVRO_PROP_SCALE, factory.numberNode(dti.getScale())); + return schema; + + default: + throw new UnsupportedOperationException(primitive + " is not supported."); + } + } +} diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestMergeHiveSchemaWithAvro.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestMergeHiveSchemaWithAvro.java new file mode 100644 index 0000000000..585f3b63ca --- /dev/null +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/legacy/TestMergeHiveSchemaWithAvro.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hive.legacy; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Arrays; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.util.internal.JacksonUtils; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + + +public class TestMergeHiveSchemaWithAvro { + + @Test + public void shouldUseFieldNamesFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", + optional("fA", Schema.Type.INT), + optional("fB", struct("r2", + optional("gA", Schema.Type.INT) + )) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Test + public void shouldUseNullabilityFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT), + required("fB", struct("r2", + required("gA", Schema.Type.INT) + )) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Test + public void shouldUseTypesFromHive() { + String hive = "struct,fb:array,fc:map,fd:string>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT), + required("fB", Schema.Type.INT), + required("fC", Schema.Type.INT), + required("fD", Schema.Type.INT) + ); + + Schema expected = struct("r1", + required("fA", struct("record1", null, "namespace1", + optional("ga", Schema.Type.INT) + )), + required("fB", array(nullable(Schema.Type.INT))), + required("fC", map(nullable(Schema.Type.INT))), + required("fD", Schema.Type.STRING) + ); + + assertSchema(expected, merge(hive, avro)); + } + + @Test + public void shouldIgnoreExtraFieldsFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT), + required("fB", struct("r2", + required("gA", Schema.Type.INT), + required("gB", Schema.Type.INT) + )), + required("fC", Schema.Type.INT) + ); + + Schema expected = struct("r1", + required("fA", Schema.Type.INT), + required("fB", struct("r2", + required("gA", Schema.Type.INT) + )) + ); + + assertSchema(expected, merge(hive, avro)); + } + + @Test + public void shouldRetainExtraFieldsFromHive() { + String hive = "struct,fc:int,fd:struct>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT), + required("fB", struct("r2", + required("gA", Schema.Type.INT) + )) + ); + + Schema expected = struct("r1", + required("fA", Schema.Type.INT), + required("fB", struct("r2", + required("gA", Schema.Type.INT), + // Nested field missing in Avro + optional("gb", Schema.Type.INT) + )), + // Top level field missing in Avro + optional("fc", Schema.Type.INT), + // Top level struct missing in Avro + optional("fd", struct("record1", null, "namespace1", + optional("ha", Schema.Type.INT) + )) + ); + + assertSchema(expected, merge(hive, avro)); + } + + @Test + public void shouldRetainDocStringsFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", "doc-r1", "n1", + required("fA", Schema.Type.INT, "doc-fA", null, null), + required("fB", struct("r2", "doc-r2", "n2", + required("gA", Schema.Type.INT, "doc-gA", null, null) + ), "doc-fB", null, null) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Test + public void shouldRetainDefaultValuesFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT, null, 1, null), + required("fB", struct("r2", + required("gA", Schema.Type.INT, null, 2, null) + ), null, fromJson("{\"gA\": 3}"), null) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Test + public void shouldRetainFieldPropsFromAvro() { + String hive = "struct>"; + Schema avro = struct("r1", + required("fA", Schema.Type.INT, null, null, ImmutableMap.of("pfA", "vfA")), + required("fB", struct("r2", + required("gA", Schema.Type.INT, null, null, ImmutableMap.of("pfB", "vfB")) + ), null, null, ImmutableMap.of("pgA", "vgA")) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Test + public void shouldHandleLists() { + String hive = "struct,fb:array,fc:array>,fd:array>"; + Schema avro = struct("r1", + required("fA", array(Schema.Type.INT)), + optional("fB", array(Schema.Type.INT)), + required("fC", array(struct("r2", + required("gA", Schema.Type.INT) + ))) + ); + + Schema expected = struct("r1", + required("fA", array(Schema.Type.INT)), + optional("fB", array(Schema.Type.INT)), + required("fC", array(struct("r2", + required("gA", Schema.Type.INT) + ))), + // Array element type is also nullable because it is generated from Hive + optional("fd", array(nullable(Schema.Type.INT))) + ); + + assertSchema(expected, merge(hive, avro)); + } + + @Test + public void shouldHandleMaps() { + String hive = "struct,fb:map,fc:map>,fd:map>"; + Schema avro = struct("r1", + required("fA", map(Schema.Type.INT)), + optional("fB", map(Schema.Type.INT)), + required("fC", map(struct("r2", + required("gA", Schema.Type.INT) + ))) + ); + + Schema expected = struct("r1", + required("fA", map(Schema.Type.INT)), + optional("fB", map(Schema.Type.INT)), + required("fC", map(struct("r2", + required("gA", Schema.Type.INT) + ))), + // Map value type is also nullable because it is generated from Hive + optional("fd", map(nullable(Schema.Type.INT))) + ); + + assertSchema(expected, merge(hive, avro)); + } + + @Test + public void shouldSanitizeIncompatibleFieldNames() { + StructTypeInfo typeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo( + Lists.newArrayList("a.b.c", "$#@%!"), + Lists.newArrayList(TypeInfoFactory.intTypeInfo, TypeInfoFactory.intTypeInfo) + ); + Schema avro = struct("r1"); + + Schema expected = struct("r1", + optional("a_x2Eb_x2Ec", Schema.Type.INT), + optional("_x24_x23_x40_x25_x21", Schema.Type.INT) + ); + assertSchema(expected, merge(typeInfo, avro)); + } + + @Test + public void shouldReorderOptionalSchemaToMatchDefaultValue() { + String hive = "struct>"; + Schema avro = struct("r1", + field("fA", Schema.createUnion(Schema.create(Schema.Type.INT), Schema.create(Schema.Type.NULL)), + null, 1, null), + field("fB", Schema.createUnion( + struct("r2", required("gA", Schema.Type.INT, null, 2, null)), + Schema.create(Schema.Type.NULL) + ), null, fromJson("{\"gA\": 3}"), null) + ); + + assertSchema(avro, merge(hive, avro)); + } + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void shouldFailForMapsWithNonStringKey() { + String hive = "struct>"; + Schema avro = struct("r1"); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Map keys should always be non-nullable strings"); + assertSchema(avro, merge(hive, avro)); + } + + // TODO: tests to retain schema props + // TODO: tests for explicit type compatibility check between hive and avro primitives, once we implement it + // TODO: tests for error case => default value in Avro does not match with type from hive + + /** Test Helpers */ + + private void assertSchema(Schema expected, Schema actual) { + Assert.assertEquals(expected, actual); + Assert.assertEquals(expected.toString(true), actual.toString(true)); + } + + private Schema merge(StructTypeInfo typeInfo, Schema avro) { + return MergeHiveSchemaWithAvro.visit(typeInfo, avro); + } + + private Schema merge(String hive, Schema avro) { + StructTypeInfo typeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(hive); + return merge(typeInfo, avro); + } + + private Schema struct(String name, String doc, String namespace, Schema.Field... fields) { + return Schema.createRecord(name, doc, namespace, false, Arrays.asList(fields)); + } + + private Schema struct(String name, Schema.Field... fields) { + return struct(name, null, "n" + name, fields); + } + + private Schema array(Schema element) { + return Schema.createArray(element); + } + + private Schema array(Schema.Type elementType) { + return array(Schema.create(elementType)); + } + + private Schema map(Schema value) { + return Schema.createMap(value); + } + + private Schema map(Schema.Type valueType) { + return map(Schema.create(valueType)); + } + + private Schema.Field nullable(Schema.Field field) { + Preconditions.checkArgument(!AvroSchemaUtil.isOptionSchema(field.schema())); + return field(field.name(), nullable(field.schema()), field.doc(), + Schema.Field.NULL_DEFAULT_VALUE, field.getObjectProps()); + } + + private Schema nullable(Schema schema) { + return AvroSchemaUtil.toOption(schema); + } + + private Schema nullable(Schema.Type type) { + return nullable(Schema.create(type)); + } + + private Schema.Field field(String name, Schema schema, String doc, Object defaultValue, + Map props) { + Schema.Field field = new Schema.Field(name, schema, doc, defaultValue); + if (props != null) { + props.forEach(field::addProp); + } + return field; + } + + private Schema.Field required(String name, Schema schema, String doc, Object defaultValue, + Map props) { + return field(name, schema, doc, defaultValue, props); + } + + private Schema.Field required(String name, Schema schema) { + return required(name, schema, null, null, null); + } + + private Schema.Field required(String name, Schema.Type type, String doc, Object defaultValue, + Map props) { + return required(name, Schema.create(type), doc, defaultValue, props); + } + + private Schema.Field required(String name, Schema.Type type) { + return required(name, type, null, null, null); + } + + private Schema.Field optional(String name, Schema schema, String doc) { + return nullable(field(name, schema, doc, null, null)); + } + + private Schema.Field optional(String name, Schema schema) { + return optional(name, schema, null); + } + + private Schema.Field optional(String name, Schema.Type type, String doc) { + return optional(name, Schema.create(type), doc); + } + + private Schema.Field optional(String name, Schema.Type type) { + return optional(name, type, null); + } + + private Object fromJson(String json) { + ObjectMapper mapper = new ObjectMapper(); + try { + return JacksonUtils.toObject(mapper.readTree(json)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +}