diff --git a/pom.xml b/pom.xml index 23ecd60..dc86999 100644 --- a/pom.xml +++ b/pom.xml @@ -790,7 +790,11 @@ org.apache.hive:hive-serde + org/apache/hadoop/hive/serde2/avro/AvroDeserializer*.class + org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils*.class org/apache/hadoop/hive/serde2/avro/InstanceCache*.class + org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo*.class + org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema*.class diff --git a/src/main/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java b/src/main/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java new file mode 100644 index 0000000..038e2cf --- /dev/null +++ b/src/main/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java @@ -0,0 +1,386 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.avro; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.rmi.server.UID; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Fixed; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.UnresolvedUnionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; +import org.apache.hadoop.io.Writable; + +class AvroDeserializer { + private static final Logger LOG = LoggerFactory.getLogger(AvroDeserializer.class); + /** + * Set of already seen and valid record readers IDs which doesn't need re-encoding + */ + private final HashSet noEncodingNeeded = new HashSet(); + /** + * Map of record reader ID and the associated re-encoder. It contains only the record readers + * that record needs to be re-encoded. + */ + private final HashMap reEncoderCache = new HashMap(); + /** + * Flag to print the re-encoding warning message only once. Avoid excessive logging for each + * record encoding. + */ + private boolean warnedOnce = false; + /** + * When encountering a record with an older schema than the one we're trying + * to read, it is necessary to re-encode with a reader against the newer schema. + * Because Hive doesn't provide a way to pass extra information to the + * inputformat, we're unable to provide the newer schema when we have it and it + * would be most useful - when the inputformat is reading the file. + * + * This is a slow process, so we try to cache as many of the objects as possible. + */ + static class SchemaReEncoder { + private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + private final GenericDatumWriter gdw = new GenericDatumWriter(); + private BinaryDecoder binaryDecoder = null; + + GenericDatumReader gdr = null; + + public SchemaReEncoder(Schema writer, Schema reader) { + gdr = new GenericDatumReader(writer, reader); + } + + public GenericRecord reencode(GenericRecord r) + throws AvroSerdeException { + baos.reset(); + + BinaryEncoder be = EncoderFactory.get().directBinaryEncoder(baos, null); + gdw.setSchema(r.getSchema()); + try { + gdw.write(r, be); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + + binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bais, binaryDecoder); + + return gdr.read(r, binaryDecoder); + + } catch (IOException e) { + throw new AvroSerdeException("Exception trying to re-encode record to new schema", e); + } + } + } + + private List row; + + /** + * Deserialize an Avro record, recursing into its component fields and + * deserializing them as well. Fields of the record are matched by name + * against fields in the Hive row. + * + * Because Avro has some data types that Hive does not, these are converted + * during deserialization to types Hive will work with. + * + * @param columnNames List of columns Hive is expecting from record. + * @param columnTypes List of column types matched by index to names + * @param writable Instance of GenericAvroWritable to deserialize + * @param readerSchema Schema of the writable to deserialize + * @return A list of objects suitable for Hive to work with further + * @throws AvroSerdeException For any exception during deseriliazation + */ + public Object deserialize(List columnNames, List columnTypes, + Writable writable, Schema readerSchema) throws AvroSerdeException { + if(!(writable instanceof AvroGenericRecordWritable)) { + throw new AvroSerdeException("Expecting a AvroGenericRecordWritable"); + } + + if(row == null || row.size() != columnNames.size()) { + row = new ArrayList(columnNames.size()); + } else { + row.clear(); + } + + AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable; + GenericRecord r = recordWritable.getRecord(); + Schema fileSchema = recordWritable.getFileSchema(); + + UID recordReaderId = recordWritable.getRecordReaderID(); + //If the record reader (from which the record is originated) is already seen and valid, + //no need to re-encode the record. + if(!noEncodingNeeded.contains(recordReaderId)) { + SchemaReEncoder reEncoder = null; + //Check if the record record is already encoded once. If it does + //reuse the encoder. + if(reEncoderCache.containsKey(recordReaderId)) { + reEncoder = reEncoderCache.get(recordReaderId); //Reuse the re-encoder + } else if (!r.getSchema().equals(readerSchema)) { //Evolved schema? + //Create and store new encoder in the map for re-use + reEncoder = new SchemaReEncoder(r.getSchema(), readerSchema); + reEncoderCache.put(recordReaderId, reEncoder); + } else{ + LOG.debug("Adding new valid RRID :" + recordReaderId); + noEncodingNeeded.add(recordReaderId); + } + if(reEncoder != null) { + if (!warnedOnce) { + LOG.warn("Received different schemas. Have to re-encode: " + + r.getSchema().toString(false) + "\nSIZE" + reEncoderCache + " ID " + recordReaderId); + warnedOnce = true; + } + r = reEncoder.reencode(r); + } + } + + workerBase(row, fileSchema, columnNames, columnTypes, r); + return row; + } + + // The actual deserialization may involve nested records, which require recursion. + private List workerBase(List objectRow, Schema fileSchema, List columnNames, + List columnTypes, GenericRecord record) + throws AvroSerdeException { + for(int i = 0; i < columnNames.size(); i++) { + TypeInfo columnType = columnTypes.get(i); + String columnName = columnNames.get(i); + Object datum = record.get(columnName); + Schema datumSchema = record.getSchema().getField(columnName).schema(); + Schema.Field field = AvroSerdeUtils.isNullableType(fileSchema)?AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema).getField(columnName):fileSchema.getField(columnName); + objectRow.add(worker(datum, field == null ? null : field.schema(), datumSchema, columnType)); + } + + return objectRow; + } + + private Object worker(Object datum, Schema fileSchema, Schema recordSchema, TypeInfo columnType) + throws AvroSerdeException { + if (datum == null) { + return null; + } + + // Avro requires nullable types to be defined as unions of some type T + // and NULL. This is annoying and we're going to hide it from the user. + + if (AvroSerdeUtils.isNullableType(recordSchema)) { + recordSchema = AvroSerdeUtils.getOtherTypeFromNullableType(recordSchema); + } + if (fileSchema != null && AvroSerdeUtils.isNullableType(fileSchema)) { + fileSchema = AvroSerdeUtils.getOtherTypeFromNullableType(fileSchema); + } + + switch(columnType.getCategory()) { + case STRUCT: + return deserializeStruct((GenericData.Record) datum, fileSchema, (StructTypeInfo) columnType); + case UNION: + return deserializeUnion(datum, fileSchema, recordSchema, (UnionTypeInfo) columnType); + case LIST: + return deserializeList(datum, fileSchema, recordSchema, (ListTypeInfo) columnType); + case MAP: + return deserializeMap(datum, fileSchema, recordSchema, (MapTypeInfo) columnType); + case PRIMITIVE: + return deserializePrimitive(datum, fileSchema, recordSchema, (PrimitiveTypeInfo) columnType); + default: + throw new AvroSerdeException("Unknown TypeInfo: " + columnType.getCategory()); + } + } + + private Object deserializePrimitive(Object datum, Schema fileSchema, Schema recordSchema, + PrimitiveTypeInfo columnType) throws AvroSerdeException { + switch (columnType.getPrimitiveCategory()){ + case STRING: + return datum.toString(); // To workaround AvroUTF8 + // This also gets us around the Enum issue since we just take the value + // and convert it to a string. Yay! + case BINARY: + if (recordSchema.getType() == Type.FIXED){ + Fixed fixed = (Fixed) datum; + return fixed.bytes(); + } else if (recordSchema.getType() == Type.BYTES){ + return AvroSerdeUtils.getBytesFromByteBuffer((ByteBuffer) datum); + } else { + throw new AvroSerdeException("Unexpected Avro schema for Binary TypeInfo: " + recordSchema.getType()); + } + case DECIMAL: + if (fileSchema == null) { + throw new AvroSerdeException("File schema is missing for decimal field. Reader schema is " + columnType); + } + + int scale = 0; + try { + scale = AvroSerdeUtils.getIntFromSchema(fileSchema, AvroSerDe.AVRO_PROP_SCALE); + } catch(Exception ex) { + throw new AvroSerdeException("Failed to obtain scale value from file schema: " + fileSchema, ex); + } + + HiveDecimal dec = AvroSerdeUtils.getHiveDecimalFromByteBuffer((ByteBuffer) datum, scale); + JavaHiveDecimalObjectInspector oi = (JavaHiveDecimalObjectInspector) + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector((DecimalTypeInfo)columnType); + return oi.set(null, dec); + case CHAR: + if (fileSchema == null) { + throw new AvroSerdeException("File schema is missing for char field. Reader schema is " + columnType); + } + + int maxLength = 0; + try { + maxLength = AvroSerdeUtils.getIntFromSchema(fileSchema, AvroSerDe.AVRO_PROP_MAX_LENGTH); + } catch (Exception ex) { + throw new AvroSerdeException("Failed to obtain maxLength value for char field from file schema: " + fileSchema, ex); + } + + String str = datum.toString(); + HiveChar hc = new HiveChar(str, maxLength); + return hc; + case VARCHAR: + if (fileSchema == null) { + throw new AvroSerdeException("File schema is missing for varchar field. Reader schema is " + columnType); + } + + maxLength = 0; + try { + maxLength = AvroSerdeUtils.getIntFromSchema(fileSchema, AvroSerDe.AVRO_PROP_MAX_LENGTH); + } catch (Exception ex) { + throw new AvroSerdeException("Failed to obtain maxLength value for varchar field from file schema: " + fileSchema, ex); + } + + str = datum.toString(); + HiveVarchar hvc = new HiveVarchar(str, maxLength); + return hvc; + case DATE: + if (recordSchema.getType() != Type.INT) { + throw new AvroSerdeException("Unexpected Avro schema for Date TypeInfo: " + recordSchema.getType()); + } + + return new Date(DateWritable.daysToMillis((Integer)datum)); + case TIMESTAMP: + if (recordSchema.getType() != Type.LONG) { + throw new AvroSerdeException( + "Unexpected Avro schema for Date TypeInfo: " + recordSchema.getType()); + } + return new Timestamp((Long)datum); + default: + return datum; + } + } + + + private Object deserializeStruct(GenericData.Record datum, Schema fileSchema, StructTypeInfo columnType) + throws AvroSerdeException { + // No equivalent Java type for the backing structure, need to recurse and build a list + ArrayList innerFieldTypes = columnType.getAllStructFieldTypeInfos(); + ArrayList innerFieldNames = columnType.getAllStructFieldNames(); + List innerObjectRow = new ArrayList(innerFieldTypes.size()); + + return workerBase(innerObjectRow, fileSchema, innerFieldNames, innerFieldTypes, datum); + } + + private Object deserializeUnion(Object datum, Schema fileSchema, Schema recordSchema, + UnionTypeInfo columnType) throws AvroSerdeException { + // Calculate tags individually since the schema can evolve and can have different tags. In worst case, both schemas are same + // and we would end up doing calculations twice to get the same tag + int fsTag = GenericData.get().resolveUnion(fileSchema, datum); // Determine index of value from fileSchema + int rsTag = GenericData.get().resolveUnion(recordSchema, datum); // Determine index of value from recordSchema + Object desered = worker(datum, fileSchema == null ? null : fileSchema.getTypes().get(fsTag), + recordSchema.getTypes().get(rsTag), columnType.getAllUnionObjectTypeInfos().get(rsTag)); + return new StandardUnionObjectInspector.StandardUnion((byte)rsTag, desered); + } + + private Object deserializeList(Object datum, Schema fileSchema, Schema recordSchema, + ListTypeInfo columnType) throws AvroSerdeException { + // Need to check the original schema to see if this is actually a Fixed. + if(recordSchema.getType().equals(Schema.Type.FIXED)) { + // We're faking out Hive to work through a type system impedence mismatch. + // Pull out the backing array and convert to a list. + GenericData.Fixed fixed = (GenericData.Fixed) datum; + List asList = new ArrayList(fixed.bytes().length); + for(int j = 0; j < fixed.bytes().length; j++) { + asList.add(fixed.bytes()[j]); + } + return asList; + } else if(recordSchema.getType().equals(Schema.Type.BYTES)) { + // This is going to be slow... hold on. + ByteBuffer bb = (ByteBuffer)datum; + List asList = new ArrayList(bb.capacity()); + byte[] array = bb.array(); + for(int j = 0; j < array.length; j++) { + asList.add(array[j]); + } + return asList; + } else { // An actual list, deser its values + List listData = (List) datum; + Schema listSchema = recordSchema.getElementType(); + List listContents = new ArrayList(listData.size()); + for(Object obj : listData) { + listContents.add(worker(obj, fileSchema == null ? null : fileSchema.getElementType(), listSchema, + columnType.getListElementTypeInfo())); + } + return listContents; + } + } + + private Object deserializeMap(Object datum, Schema fileSchema, Schema mapSchema, MapTypeInfo columnType) + throws AvroSerdeException { + // Avro only allows maps with Strings for keys, so we only have to worry + // about deserializing the values + Map map = new HashMap(); + Map mapDatum = (Map)datum; + Schema valueSchema = mapSchema.getValueType(); + TypeInfo valueTypeInfo = columnType.getMapValueTypeInfo(); + for (CharSequence key : mapDatum.keySet()) { + Object value = mapDatum.get(key); + map.put(key.toString(), worker(value, fileSchema == null ? null : fileSchema.getValueType(), + valueSchema, valueTypeInfo)); + } + + return map; + } + + public HashSet getNoEncodingNeeded() { + return noEncodingNeeded; + } + + public HashMap getReEncoderCache() { + return reEncoderCache; + } + +} diff --git a/src/main/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java b/src/main/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java new file mode 100644 index 0000000..4c46d1a --- /dev/null +++ b/src/main/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java @@ -0,0 +1,353 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.avro; + + +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.mapred.JobConf; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.Properties; + +/** + * Utilities useful only to the AvroSerde itself. Not mean to be used by + * end-users but public for interop to the ql package. + */ +public class AvroSerdeUtils { + private static final Logger LOG = LoggerFactory.getLogger(AvroSerdeUtils.class); + + /** + * Enum container for all avro table properties. + * If introducing a new avro-specific table property, + * add it here. Putting them in an enum rather than separate strings + * allows them to be programmatically grouped and referenced together. + */ + public static enum AvroTableProperties { + SCHEMA_LITERAL("avro.schema.literal"), + SCHEMA_URL("avro.schema.url"), + SCHEMA_NAMESPACE("avro.schema.namespace"), + SCHEMA_NAME("avro.schema.name"), + SCHEMA_DOC("avro.schema.doc"), + AVRO_SERDE_SCHEMA("avro.serde.schema"), + SCHEMA_RETRIEVER("avro.schema.retriever"); + + private final String propName; + + AvroTableProperties(String propName) { + this.propName = propName; + } + + public String getPropName(){ + return this.propName; + } + } + + // Following parameters slated for removal, prefer usage of enum above, that allows programmatic access. + @Deprecated public static final String SCHEMA_LITERAL = "avro.schema.literal"; + @Deprecated public static final String SCHEMA_URL = "avro.schema.url"; + @Deprecated public static final String SCHEMA_NAMESPACE = "avro.schema.namespace"; + @Deprecated public static final String SCHEMA_NAME = "avro.schema.name"; + @Deprecated public static final String SCHEMA_DOC = "avro.schema.doc"; + @Deprecated public static final String AVRO_SERDE_SCHEMA = AvroTableProperties.AVRO_SERDE_SCHEMA.getPropName(); + @Deprecated public static final String SCHEMA_RETRIEVER = AvroTableProperties.SCHEMA_RETRIEVER.getPropName(); + + public static final String SCHEMA_NONE = "none"; + public static final String EXCEPTION_MESSAGE = "Neither " + + AvroTableProperties.SCHEMA_LITERAL.getPropName() + " nor " + + AvroTableProperties.SCHEMA_URL.getPropName() + " specified, can't determine table schema"; + + + + /** + * Determine the schema to that's been provided for Avro serde work. + * @param properties containing a key pointing to the schema, one way or another + * @return schema to use while serdeing the avro file + * @throws IOException if error while trying to read the schema from another location + * @throws AvroSerdeException if unable to find a schema or pointer to it in the properties + */ + public static Schema determineSchemaOrThrowException(Configuration conf, Properties properties) + throws IOException, AvroSerdeException { + String schemaString = properties.getProperty(AvroTableProperties.SCHEMA_LITERAL.getPropName()); + if(schemaString != null && !schemaString.equals(SCHEMA_NONE)) + return AvroSerdeUtils.getSchemaFor(schemaString); + + // Try pulling directly from URL + schemaString = properties.getProperty(AvroTableProperties.SCHEMA_URL.getPropName()); + if (schemaString == null) { + final String columnNameProperty = properties.getProperty(serdeConstants.LIST_COLUMNS); + final String columnTypeProperty = properties.getProperty(serdeConstants.LIST_COLUMN_TYPES); + final String columnCommentProperty = properties.getProperty(AvroSerDe.LIST_COLUMN_COMMENTS); + if (columnNameProperty == null || columnNameProperty.isEmpty() + || columnTypeProperty == null || columnTypeProperty.isEmpty() ) { + throw new AvroSerdeException(EXCEPTION_MESSAGE); + } + final String columnNameDelimiter = properties.containsKey(serdeConstants.COLUMN_NAME_DELIMITER) ? properties + .getProperty(serdeConstants.COLUMN_NAME_DELIMITER) : String.valueOf(SerDeUtils.COMMA); + // Get column names and types + List columnNames = Arrays.asList(columnNameProperty.split(columnNameDelimiter)); + List columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + + Schema schema = AvroSerDe.getSchemaFromCols(properties, columnNames, columnTypes, columnCommentProperty); + properties.setProperty(AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.toString()); + if (conf != null) + conf.set(AvroTableProperties.AVRO_SERDE_SCHEMA.getPropName(), schema.toString(false)); + return schema; + } else if(schemaString.equals(SCHEMA_NONE)) { + throw new AvroSerdeException(EXCEPTION_MESSAGE); + } + + try { + Schema s = getSchemaFromFS(schemaString, conf); + if (s == null) { + //in case schema is not a file system + return AvroSerdeUtils.getSchemaFor(new URL(schemaString)); + } + return s; + } catch (IOException ioe) { + throw new AvroSerdeException("Unable to read schema from given path: " + schemaString, ioe); + } catch (URISyntaxException urie) { + throw new AvroSerdeException("Unable to read schema from given path: " + schemaString, urie); + } + } + + // Protected for testing and so we can pass in a conf for testing. + protected static Schema getSchemaFromFS(String schemaFSUrl, + Configuration conf) throws IOException, URISyntaxException { + FSDataInputStream in = null; + FileSystem fs = null; + try { + fs = FileSystem.get(new URI(schemaFSUrl), conf); + } catch (IOException ioe) { + //return null only if the file system in schema is not recognized + if (LOG.isDebugEnabled()) { + String msg = "Failed to open file system for uri " + schemaFSUrl + " assuming it is not a FileSystem url"; + LOG.debug(msg, ioe); + } + + return null; + } + try { + in = fs.open(new Path(schemaFSUrl)); + Schema s = AvroSerdeUtils.getSchemaFor(in); + return s; + } finally { + if(in != null) in.close(); + } + } + + /** + * Determine if an Avro schema is of type Union[T, NULL]. Avro supports nullable + * types via a union of type T and null. This is a very common use case. + * As such, we want to silently convert it to just T and allow the value to be null. + * + * When a Hive union type is used with AVRO, the schema type becomes + * Union[NULL, T1, T2, ...]. The NULL in the union should be silently removed + * + * @return true if type represents Union[T, Null], false otherwise + */ + public static boolean isNullableType(Schema schema) { + if (!schema.getType().equals(Schema.Type.UNION)) { + return false; + } + + List itemSchemas = schema.getTypes(); + if (itemSchemas.size() < 2) { + return false; + } + + for (Schema itemSchema : itemSchemas) { + if (Schema.Type.NULL.equals(itemSchema.getType())) { + return true; + } + } + + // [null, null] not allowed, so this check is ok. + return false; + } + + /** + * If the union schema is a nullable union, get the schema for the non-nullable type. + * This method does no checking that the provided Schema is nullable. If the provided + * union schema is non-nullable, it simply returns the union schema + */ + public static Schema getOtherTypeFromNullableType(Schema unionSchema) { + final List types = unionSchema.getTypes(); + if (types.size() == 2) { // most common scenario + if (types.get(0).getType() == Schema.Type.NULL) { + return types.get(1); + } + if (types.get(1).getType() == Schema.Type.NULL) { + return types.get(0); + } + // not a nullable union + return unionSchema; + } + + final List itemSchemas = new ArrayList<>(); + for (Schema itemSchema : types) { + if (!Schema.Type.NULL.equals(itemSchema.getType())) { + itemSchemas.add(itemSchema); + } + } + + if (itemSchemas.size() > 1) { + return Schema.createUnion(itemSchemas); + } else { + return itemSchemas.get(0); + } + } + + /** + * Determine if we're being executed from within an MR job or as part + * of a select * statement. The signals for this varies between Hive versions. + * @param job that contains things that are or are not set in a job + * @return Are we in a job or not? + */ + public static boolean insideMRJob(JobConf job) { + return job != null + && (HiveConf.getVar(job, HiveConf.ConfVars.PLAN) != null) + && (!HiveConf.getVar(job, HiveConf.ConfVars.PLAN).isEmpty()); + } + + public static Buffer getBufferFromBytes(byte[] input) { + ByteBuffer bb = ByteBuffer.wrap(input); + return bb.rewind(); + } + + public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) { + if (dec == null) { + return null; + } + + // NOTE: Previously, we did OldHiveDecimal.setScale(scale), called OldHiveDecimal + // unscaledValue().toByteArray(). + return AvroSerdeUtils.getBufferFromBytes(dec.bigIntegerBytesScaled(scale)); + } + + public static byte[] getBytesFromByteBuffer(ByteBuffer byteBuffer) { + byteBuffer.rewind(); + byte[] result = new byte[byteBuffer.limit()]; + byteBuffer.get(result); + return result; + } + + public static HiveDecimal getHiveDecimalFromByteBuffer(ByteBuffer byteBuffer, int scale) { + byte[] result = getBytesFromByteBuffer(byteBuffer); + HiveDecimal dec = HiveDecimal.create(new BigInteger(result), scale); + return dec; + } + + public static Schema getSchemaFor(String str) { + Schema.Parser parser = new Schema.Parser(); + Schema schema = parser.parse(str); + return schema; + } + + public static Schema getSchemaFor(File file) { + Schema.Parser parser = new Schema.Parser(); + Schema schema; + try { + schema = parser.parse(file); + } catch (IOException e) { + throw new RuntimeException("Failed to parse Avro schema from " + file.getName(), e); + } + return schema; + } + + public static Schema getSchemaFor(InputStream stream) { + Schema.Parser parser = new Schema.Parser(); + Schema schema; + try { + schema = parser.parse(stream); + } catch (IOException e) { + throw new RuntimeException("Failed to parse Avro schema", e); + } + return schema; + } + + public static Schema getSchemaFor(URL url) { + InputStream in = null; + try { + in = url.openStream(); + return getSchemaFor(in); + } catch (Exception e) { + throw new RuntimeException("Failed to parse Avro schema", e); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException e) { + // Ignore + } + } + } + } + + public static int getIntFromSchema(Schema schema, String name) { + Object obj = schema.getObjectProp(name); + if (obj instanceof String) { + return Integer.parseInt((String) obj); + } else if (obj instanceof Integer) { + return (int) obj; + } else { + throw new IllegalArgumentException("Expect integer or string value from property " + name + + " but found type " + obj.getClass().getName()); + } + } + + /** + * Called on specific alter table events, removes schema url and schema literal from given tblproperties + * After the change, HMS solely will be responsible for handling the schema + * + * @param conf + * @param serializationLib + * @param parameters + */ + public static void handleAlterTableForAvro(HiveConf conf, String serializationLib, Map parameters) { + if (AvroSerDe.class.getName().equals(serializationLib)) { + String literalPropName = AvroTableProperties.SCHEMA_LITERAL.getPropName(); + String urlPropName = AvroTableProperties.SCHEMA_URL.getPropName(); + + if (parameters.containsKey(literalPropName) || parameters.containsKey(urlPropName)) { + throw new RuntimeException("Not allowed to alter schema of Avro stored table having external schema." + + " Consider removing "+AvroTableProperties.SCHEMA_LITERAL.getPropName() + " or " + + AvroTableProperties.SCHEMA_URL.getPropName() + " from table properties."); + } + } + } +} diff --git a/src/main/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java b/src/main/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java new file mode 100644 index 0000000..2789dac --- /dev/null +++ b/src/main/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java @@ -0,0 +1,285 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.avro; + +import static org.apache.avro.Schema.Type.BOOLEAN; +import static org.apache.avro.Schema.Type.BYTES; +import static org.apache.avro.Schema.Type.DOUBLE; +import static org.apache.avro.Schema.Type.FIXED; +import static org.apache.avro.Schema.Type.FLOAT; +import static org.apache.avro.Schema.Type.INT; +import static org.apache.avro.Schema.Type.LONG; +import static org.apache.avro.Schema.Type.NULL; +import static org.apache.avro.Schema.Type.STRING; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +/** + * Convert an Avro Schema to a Hive TypeInfo + */ +class SchemaToTypeInfo { + // Conversion of Avro primitive types to Hive primitive types + // Avro Hive + // Null + // boolean boolean check + // int int check + // long bigint check + // float double check + // double double check + // bytes binary check + // fixed binary check + // string string check + // tinyint + // smallint + + // Map of Avro's primitive types to Hives (for those that are supported by both) + private static final Map primitiveTypeToTypeInfo = initTypeMap(); + private static Map initTypeMap() { + Map theMap = new HashMap(); + theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void")); + theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean")); + theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int")); + theMap.put(LONG, TypeInfoFactory.getPrimitiveTypeInfo("bigint")); + theMap.put(FLOAT, TypeInfoFactory.getPrimitiveTypeInfo("float")); + theMap.put(DOUBLE, TypeInfoFactory.getPrimitiveTypeInfo("double")); + theMap.put(BYTES, TypeInfoFactory.getPrimitiveTypeInfo("binary")); + theMap.put(FIXED, TypeInfoFactory.getPrimitiveTypeInfo("binary")); + theMap.put(STRING, TypeInfoFactory.getPrimitiveTypeInfo("string")); + return Collections.unmodifiableMap(theMap); + } + + /** + * Generate a list of of TypeInfos from an Avro schema. This method is + * currently public due to some weirdness in deserializing unions, but + * will be made private once that is resolved. + * @param schema Schema to generate field types for + * @return List of TypeInfos, each element of which is a TypeInfo derived + * from the schema. + * @throws AvroSerdeException for problems during conversion. + */ + public static List generateColumnTypes(Schema schema) throws AvroSerdeException { + return generateColumnTypes (schema, null); + } + + /** + * Generate a list of of TypeInfos from an Avro schema. This method is + * currently public due to some weirdness in deserializing unions, but + * will be made private once that is resolved. + * @param schema Schema to generate field types for + * @param seenSchemas stores schemas processed in the parsing done so far, + * helping to resolve circular references in the schema + * @return List of TypeInfos, each element of which is a TypeInfo derived + * from the schema. + * @throws AvroSerdeException for problems during conversion. + */ + public static List generateColumnTypes(Schema schema, + Set seenSchemas) throws AvroSerdeException { + List fields = schema.getFields(); + + List types = new ArrayList(fields.size()); + + for (Schema.Field field : fields) { + types.add(generateTypeInfo(field.schema(), seenSchemas)); + } + + return types; + } + + static InstanceCache typeInfoCache = new InstanceCache() { + @Override + protected TypeInfo makeInstance(Schema s, + Set seenSchemas) + throws AvroSerdeException { + return generateTypeInfoWorker(s, seenSchemas); + } + }; + /** + * Convert an Avro Schema into an equivalent Hive TypeInfo. + * @param schema to record. Must be of record type. + * @param seenSchemas stores schemas processed in the parsing done so far, + * helping to resolve circular references in the schema + * @return TypeInfo matching the Avro schema + * @throws AvroSerdeException for any problems during conversion. + */ + public static TypeInfo generateTypeInfo(Schema schema, + Set seenSchemas) throws AvroSerdeException { + // For bytes type, it can be mapped to decimal. + Schema.Type type = schema.getType(); + if (type == BYTES && AvroSerDe.DECIMAL_TYPE_NAME + .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + int precision = 0; + int scale = 0; + try { + Object o = schema.getObjectProp(AvroSerDe.AVRO_PROP_PRECISION); + if (o instanceof Integer) { + precision = (int) o; + } + o = schema.getObjectProp(AvroSerDe.AVRO_PROP_SCALE); + if (o instanceof Integer) { + scale = (int) o; + } + } catch (Exception ex) { + throw new AvroSerdeException("Failed to obtain scale value from file schema: " + schema, ex); + } + + try { + HiveDecimalUtils.validateParameter(precision, scale); + } catch (Exception ex) { + throw new AvroSerdeException("Invalid precision or scale for decimal type", ex); + } + + return TypeInfoFactory.getDecimalTypeInfo(precision, scale); + } + + if (type == STRING && + AvroSerDe.CHAR_TYPE_NAME.equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + int maxLength = 0; + try { + maxLength = AvroSerdeUtils.getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH); + } catch (Exception ex) { + throw new AvroSerdeException("Failed to obtain maxLength value from file schema: " + schema, ex); + } + return TypeInfoFactory.getCharTypeInfo(maxLength); + } + + if (type == STRING && AvroSerDe.VARCHAR_TYPE_NAME + .equalsIgnoreCase(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + int maxLength = 0; + try { + maxLength = AvroSerdeUtils.getIntFromSchema(schema, AvroSerDe.AVRO_PROP_MAX_LENGTH); + } catch (Exception ex) { + throw new AvroSerdeException("Failed to obtain maxLength value from file schema: " + schema, ex); + } + return TypeInfoFactory.getVarcharTypeInfo(maxLength); + } + + if (type == INT && + AvroSerDe.DATE_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + return TypeInfoFactory.dateTypeInfo; + } + + if (type == LONG && + AvroSerDe.TIMESTAMP_TYPE_NAME.equals(schema.getProp(AvroSerDe.AVRO_PROP_LOGICAL_TYPE))) { + return TypeInfoFactory.timestampTypeInfo; + } + + return typeInfoCache.retrieve(schema, seenSchemas); + } + + private static TypeInfo generateTypeInfoWorker(Schema schema, + Set seenSchemas) throws AvroSerdeException { + // Avro requires NULLable types to be defined as unions of some type T + // and NULL. This is annoying and we're going to hide it from the user. + if(AvroSerdeUtils.isNullableType(schema)) { + return generateTypeInfo( + AvroSerdeUtils.getOtherTypeFromNullableType(schema), seenSchemas); + } + + Schema.Type type = schema.getType(); + if(primitiveTypeToTypeInfo.containsKey(type)) { + return primitiveTypeToTypeInfo.get(type); + } + + switch(type) { + case RECORD: return generateRecordTypeInfo(schema, seenSchemas); + case MAP: return generateMapTypeInfo(schema, seenSchemas); + case ARRAY: return generateArrayTypeInfo(schema, seenSchemas); + case UNION: return generateUnionTypeInfo(schema, seenSchemas); + case ENUM: return generateEnumTypeInfo(schema); + default: throw new AvroSerdeException("Do not yet support: " + schema); + } + } + + private static TypeInfo generateRecordTypeInfo(Schema schema, + Set seenSchemas) throws AvroSerdeException { + assert schema.getType().equals(Schema.Type.RECORD); + + if (seenSchemas == null) { + seenSchemas = Collections.newSetFromMap(new IdentityHashMap()); + } else if (seenSchemas.contains(schema)) { + throw new AvroSerdeException( + "Recursive schemas are not supported. Recursive schema was " + schema + .getFullName()); + } + seenSchemas.add(schema); + + List fields = schema.getFields(); + List fieldNames = new ArrayList(fields.size()); + List typeInfos = new ArrayList(fields.size()); + + for(int i = 0; i < fields.size(); i++) { + fieldNames.add(i, fields.get(i).name()); + typeInfos.add(i, generateTypeInfo(fields.get(i).schema(), seenSchemas)); + } + + return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos); + } + + /** + * Generate a TypeInfo for an Avro Map. This is made slightly simpler in that + * Avro only allows maps with strings for keys. + */ + private static TypeInfo generateMapTypeInfo(Schema schema, + Set seenSchemas) throws AvroSerdeException { + assert schema.getType().equals(Schema.Type.MAP); + Schema valueType = schema.getValueType(); + TypeInfo ti = generateTypeInfo(valueType, seenSchemas); + + return TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"), ti); + } + + private static TypeInfo generateArrayTypeInfo(Schema schema, + Set seenSchemas) throws AvroSerdeException { + assert schema.getType().equals(Schema.Type.ARRAY); + Schema itemsType = schema.getElementType(); + TypeInfo itemsTypeInfo = generateTypeInfo(itemsType, seenSchemas); + + return TypeInfoFactory.getListTypeInfo(itemsTypeInfo); + } + + private static TypeInfo generateUnionTypeInfo(Schema schema, + Set seenSchemas) throws AvroSerdeException { + assert schema.getType().equals(Schema.Type.UNION); + List types = schema.getTypes(); + + + List typeInfos = new ArrayList(types.size()); + + for(Schema type : types) { + typeInfos.add(generateTypeInfo(type, seenSchemas)); + } + + return TypeInfoFactory.getUnionTypeInfo(typeInfos); + } + + // Hive doesn't have an Enum type, so we're going to treat them as Strings. + // During the deserialize/serialize stage we'll check for enumness and + // convert as such. + private static TypeInfo generateEnumTypeInfo(Schema schema) { + assert schema.getType().equals(Schema.Type.ENUM); + + return TypeInfoFactory.getPrimitiveTypeInfo("string"); + } +} diff --git a/src/main/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java b/src/main/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java new file mode 100644 index 0000000..4416209 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java @@ -0,0 +1,278 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.avro; + +import org.apache.avro.JsonProperties; +import org.apache.avro.Schema; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Convert Hive TypeInfo to an Avro Schema + */ +public class TypeInfoToSchema { + + private long recordCounter = 0; + + /** + * Converts Hive schema to avro schema + * + * @param columnNames Names of the hive columns + * @param columnTypes Hive Column types + * @param namespace Namespace of Avro schema + * @param name Avro schema name + * @param doc Avro schema doc + * @return Avro Schema + */ + public Schema convert(List columnNames, List columnTypes, + List columnComments, String namespace, String name, String doc) { + + List fields = new ArrayList(); + for (int i = 0; i < columnNames.size(); ++i) { + final String comment = columnComments.size() > i ? columnComments.get(i) : null; + final Schema.Field avroField = createAvroField(columnNames.get(i), columnTypes.get(i), + comment); + fields.addAll(getFields(avroField)); + } + + if (name == null || name.isEmpty()) { + name = "baseRecord"; + } + + Schema avroSchema = Schema.createRecord(name, doc, namespace, false); + avroSchema.setFields(fields); + return avroSchema; + } + + private Schema.Field createAvroField(String name, TypeInfo typeInfo, String comment) { + return new Schema.Field(name, createAvroSchema(typeInfo), comment, null); + } + + private Schema createAvroSchema(TypeInfo typeInfo) { + Schema schema = null; + switch (typeInfo.getCategory()) { + case PRIMITIVE: + schema = createAvroPrimitive(typeInfo); + break; + case LIST: + schema = createAvroArray(typeInfo); + break; + case MAP: + schema = createAvroMap(typeInfo); + break; + case STRUCT: + schema = createAvroRecord(typeInfo); + break; + case UNION: + schema = createAvroUnion(typeInfo); + break; + } + + return wrapInUnionWithNull(schema); + } + + private Schema createAvroPrimitive(TypeInfo typeInfo) { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; + Schema schema; + switch (primitiveTypeInfo.getPrimitiveCategory()) { + case STRING: + schema = Schema.create(Schema.Type.STRING); + break; + case CHAR: + schema = AvroSerdeUtils.getSchemaFor("{" + + "\"type\":\"" + AvroSerDe.AVRO_STRING_TYPE_NAME + "\"," + + "\"logicalType\":\"" + AvroSerDe.CHAR_TYPE_NAME + "\"," + + "\"maxLength\":" + ((CharTypeInfo) typeInfo).getLength() + "}"); + break; + case VARCHAR: + schema = AvroSerdeUtils.getSchemaFor("{" + + "\"type\":\"" + AvroSerDe.AVRO_STRING_TYPE_NAME + "\"," + + "\"logicalType\":\"" + AvroSerDe.VARCHAR_TYPE_NAME + "\"," + + "\"maxLength\":" + ((VarcharTypeInfo) typeInfo).getLength() + "}"); + break; + case BINARY: + schema = Schema.create(Schema.Type.BYTES); + break; + case BYTE: + schema = Schema.create(Schema.Type.INT); + break; + case SHORT: + schema = Schema.create(Schema.Type.INT); + break; + case INT: + schema = Schema.create(Schema.Type.INT); + break; + case LONG: + schema = Schema.create(Schema.Type.LONG); + break; + case FLOAT: + schema = Schema.create(Schema.Type.FLOAT); + break; + case DOUBLE: + schema = Schema.create(Schema.Type.DOUBLE); + break; + case BOOLEAN: + schema = Schema.create(Schema.Type.BOOLEAN); + break; + case DECIMAL: + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo; + String precision = String.valueOf(decimalTypeInfo.precision()); + String scale = String.valueOf(decimalTypeInfo.scale()); + schema = AvroSerdeUtils.getSchemaFor("{" + + "\"type\":\"bytes\"," + + "\"logicalType\":\"decimal\"," + + "\"precision\":" + precision + "," + + "\"scale\":" + scale + "}"); + break; + case DATE: + schema = AvroSerdeUtils.getSchemaFor("{" + + "\"type\":\"" + AvroSerDe.AVRO_INT_TYPE_NAME + "\"," + + "\"logicalType\":\"" + AvroSerDe.DATE_TYPE_NAME + "\"}"); + break; + case TIMESTAMP: + schema = AvroSerdeUtils.getSchemaFor("{" + + "\"type\":\"" + AvroSerDe.AVRO_LONG_TYPE_NAME + "\"," + + "\"logicalType\":\"" + AvroSerDe.TIMESTAMP_TYPE_NAME + "\"}"); + break; + case VOID: + schema = Schema.create(Schema.Type.NULL); + break; + default: + throw new UnsupportedOperationException(typeInfo + " is not supported."); + } + return schema; + } + + private Schema createAvroUnion(TypeInfo typeInfo) { + List childSchemas = new ArrayList(); + for (TypeInfo childTypeInfo : ((UnionTypeInfo) typeInfo).getAllUnionObjectTypeInfos()) { + final Schema childSchema = createAvroSchema(childTypeInfo); + if (childSchema.getType() == Schema.Type.UNION) { + childSchemas.addAll(childSchema.getTypes()); + } else { + childSchemas.add(childSchema); + } + } + + return Schema.createUnion(removeDuplicateNullSchemas(childSchemas)); + } + + private Schema createAvroRecord(TypeInfo typeInfo) { + List childFields = new ArrayList(); + + final List allStructFieldNames = + ((StructTypeInfo) typeInfo).getAllStructFieldNames(); + final List allStructFieldTypeInfos = + ((StructTypeInfo) typeInfo).getAllStructFieldTypeInfos(); + if (allStructFieldNames.size() != allStructFieldTypeInfos.size()) { + throw new IllegalArgumentException("Failed to generate avro schema from hive schema. " + + "name and column type differs. names = " + allStructFieldNames + ", types = " + + allStructFieldTypeInfos); + } + + for (int i = 0; i < allStructFieldNames.size(); ++i) { + final TypeInfo childTypeInfo = allStructFieldTypeInfos.get(i); + final Schema.Field grandChildSchemaField = createAvroField(allStructFieldNames.get(i), + childTypeInfo, childTypeInfo.toString()); + final List grandChildFields = getFields(grandChildSchemaField); + childFields.addAll(grandChildFields); + } + + Schema recordSchema = Schema.createRecord("record_" + recordCounter, typeInfo.toString(), + null, false); + ++recordCounter; + recordSchema.setFields(childFields); + return recordSchema; + } + + private Schema createAvroMap(TypeInfo typeInfo) { + TypeInfo keyTypeInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); + if (((PrimitiveTypeInfo) keyTypeInfo).getPrimitiveCategory() + != PrimitiveObjectInspector.PrimitiveCategory.STRING) { + throw new UnsupportedOperationException("Key of Map can only be a String"); + } + + TypeInfo valueTypeInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo(); + Schema valueSchema = createAvroSchema(valueTypeInfo); + + return Schema.createMap(valueSchema); + } + + private Schema createAvroArray(TypeInfo typeInfo) { + ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo; + Schema listSchema = createAvroSchema(listTypeInfo.getListElementTypeInfo()); + return Schema.createArray(listSchema); + } + + private List getFields(Schema.Field schemaField) { + List fields = new ArrayList(); + + Object nullDefault = JsonProperties.NULL_VALUE; + if (schemaField.schema().getType() == Schema.Type.RECORD) { + for (Schema.Field field : schemaField.schema().getFields()) { + fields.add(new Schema.Field(field.name(), field.schema(), field.doc(), nullDefault)); + } + } else { + fields.add(new Schema.Field(schemaField.name(), schemaField.schema(), schemaField.doc(), + nullDefault)); + } + + return fields; + } + + private Schema wrapInUnionWithNull(Schema schema) { + Schema wrappedSchema = schema; + switch (schema.getType()) { + case NULL: + break; + case UNION: + List existingSchemas = removeDuplicateNullSchemas(schema.getTypes()); + wrappedSchema = Schema.createUnion(existingSchemas); + break; + default: + wrappedSchema = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), schema)); + } + + return wrappedSchema; + } + + private List removeDuplicateNullSchemas(List childSchemas) { + List prunedSchemas = new ArrayList(); + boolean isNullPresent = false; + for (Schema schema : childSchemas) { + if (schema.getType() == Schema.Type.NULL) { + isNullPresent = true; + } else { + prunedSchemas.add(schema); + } + } + if (isNullPresent) { + prunedSchemas.add(0, Schema.create(Schema.Type.NULL)); + } + + return prunedSchemas; + } +}