diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java index 967cc2ef656..f7dac0c08b1 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java @@ -24,9 +24,12 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.stream.Stream; +import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; import org.apache.commons.lang.reflect.MethodUtils; +import org.apache.gobblin.hive.avro.HiveAvroSerDeManager; import org.apache.gobblin.hive.spec.HiveSpec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -47,6 +50,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.orc.TypeDescription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -211,13 +215,16 @@ public static Map getParameters(State props) { return parameters; } + public static boolean isNonAvroFormat(HiveRegistrationUnit unit) { + return unit.getInputFormat().isPresent() && !unit.getInputFormat().get().equals(AvroContainerInputFormat.class.getName()); + } + public static StorageDescriptor getStorageDescriptor(HiveRegistrationUnit unit) { State props = unit.getStorageProps(); StorageDescriptor sd = new StorageDescriptor(); sd.setParameters(getParameters(props)); //Treat AVRO and other formats differently. Details can be found in GOBBLIN-877 - if (unit.isRegisterSchema() || - (unit.getInputFormat().isPresent() && !unit.getInputFormat().get().equals(AvroContainerInputFormat.class.getName()))) { + if (unit.isRegisterSchema() || isNonAvroFormat(unit)) { sd.setCols(getFieldSchemas(unit)); } if (unit.getLocation().isPresent()) { @@ -256,6 +263,83 @@ public static SerDeInfo getSerDeInfo(HiveRegistrationUnit unit) { return si; } + public static boolean containsNonOptionalUnionTypeColumn(Table t) { + return containsNonOptionalUnionTypeColumn(getHiveTable(t)); + } + + /** + * Util for detecting if a hive table has a non-optional union (aka complex unions) column types. A non optional + * union is defined as a uniontype with n >= 2 non-null subtypes + * + * @param hiveTable Hive table with either avro.schema.literal set or is an ORC table + * @return if hive table contains non-optional uniontype columns + */ + public static boolean containsNonOptionalUnionTypeColumn(HiveTable hiveTable) { + if (hiveTable.getProps().contains(HiveAvroSerDeManager.SCHEMA_LITERAL)) { + Schema.Parser parser = new Schema.Parser(); + Schema schema = parser.parse(hiveTable.getProps().getProp(HiveAvroSerDeManager.SCHEMA_LITERAL)); + return isNonOptionalUnion(schema); + } + + if (isNonAvroFormat(hiveTable)) { + return hiveTable.getColumns().stream() + .map(HiveRegistrationUnit.Column::getType) + .filter(type -> type.contains("uniontype")) + .map(type -> TypeDescription.fromString(type)) + .anyMatch(type -> isNonOptionalUnion(type)); + } + + throw new RuntimeException("Avro based Hive tables without \"" + HiveAvroSerDeManager.SCHEMA_LITERAL +"\" are not supported"); + } + + /** + * Detects if an Avro schema contains a non-optional union. A non optional (aka complex) + * union is defined as a uniontype with n >= 2 non-null subtypes + * @param schema Avro Schema + * @return if schema contains non optional union + */ + public static boolean isNonOptionalUnion(Schema schema) { + switch (schema.getType()) { + case UNION: + Stream nonNullSubTypes = schema.getTypes().stream() + .map(Schema::getType).filter(t -> !t.equals(Schema.Type.NULL)); + if (nonNullSubTypes.count() >= 2) { + return true; + } + return schema.getTypes().stream().anyMatch(s -> isNonOptionalUnion(s)); + case MAP: // key is a string and doesn't need to be checked + return isNonOptionalUnion(schema.getValueType()); + case ARRAY: + return isNonOptionalUnion(schema.getElementType()); + case RECORD: + return schema.getFields().stream().map(Schema.Field::schema).anyMatch(s -> isNonOptionalUnion(s)); + default: + return false; + } + } + + /** + * Detects if an ORC column data type contains a non-optional union. A non optional (aka complex) + * union is defined as a UNION with n >= 2 non-null subtypes + * @param description ORC type description + * @return if the ORC data type contains a non optional union type + */ + public static boolean isNonOptionalUnion(TypeDescription description) { + switch (description.getCategory()) { + case UNION: + if (description.getChildren().size() >= 2) { + return true; + } + case MAP: + case LIST: + case STRUCT: + return description.getChildren() + .stream().anyMatch(st -> isNonOptionalUnion(st)); + default: + return false; + } + } + public static State getTableProps(Table table) { State tableProps = new State(); for (Map.Entry entry : table.getParameters().entrySet()) { diff --git a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java index 2f3d5eb71c3..24af2aa16e6 100644 --- a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java +++ b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java @@ -20,12 +20,18 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.function.Consumer; +import java.util.function.Function; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.gobblin.hive.HiveRegistrationUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat; import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; @@ -212,4 +218,120 @@ public void testGetHiveTable() throws Exception { Assert.assertEquals(fieldA.getType(), "int"); } + + @Test + public void testContainsUnionType_AvroSucceeds() { + final State serdeProps = new State(); + final String avroSchema = "{\"type\": \"record\", \"name\": \"TestEvent\",\"namespace\": \"test.namespace\", \"fields\": [{\"name\":\"fieldName\", \"type\": %s}]}"; + Consumer assertContainsNonOptionalUnionType = fieldType -> { + serdeProps.setProp("avro.schema.literal", String.format(avroSchema, fieldType)); + HiveTable hiveTable = createTestHiveTable_Avro(serdeProps); + Assert.assertEquals(hiveTable.getColumns().size(), 1); + Assert.assertTrue(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable)); + }; + + assertContainsNonOptionalUnionType.accept("[\"string\", \"int\"]"); + assertContainsNonOptionalUnionType.accept("[\"string\", \"int\", \"null\"]"); + assertContainsNonOptionalUnionType.accept("[{\"type\":\"map\",\"values\":[\"boolean\",\"null\", {\"type\": \"array\", \"items\":\"string\"}]},\"null\"]"); + } + + @Test + public void testContainsUnionType_AvroFails() { + final State serdeProps = new State(); + serdeProps.setProp("avro.schema.literal", "{\"type\": \"record\", \"name\": \"TestEvent\",\"namespace\": \"test.namespace\", " + + "\"fields\": [" + + "{\"name\":\"someString\", \"type\": \"string\"}, " + + "{\"name\":\"aNullableInt\", \"type\": [\"null\", \"int\"]}," + + "{\"name\":\"nonNullableInt\", \"type\": [\"int\"]}," + + "{\"name\":\"nonArray\", \"type\": [{\"type\": \"array\", \"items\":{\"type\":\"map\",\"values\":\"string\"}}]}" + + "]}"); + + HiveTable hiveTable = createTestHiveTable_Avro(serdeProps); + Assert.assertEquals(hiveTable.getColumns().size(), 4); + + Assert.assertFalse(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable)); + } + + @Test + public void testContainsUnionType_AvroNoSchemaLiteral() { + HiveTable table = new HiveTable.Builder().withDbName("db").withTableName("tb").build(); + Assert.assertThrows(RuntimeException.class, () -> HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(table)); + } + + @Test + public void testContainsUnionType_OrcUnionType() { + final State serdeProps = new State(); + serdeProps.setProp("columns", "someInt,someString,someMap,someUT"); + // NOTE: unlike in avro, all values in ORC are nullable, so it's not necessary to test null permutations + serdeProps.setProp("columns.types", "bigint,string,map,uniontype"); + + HiveTable hiveTable = createTestHiveTable_ORC(serdeProps); + Assert.assertEquals(hiveTable.getColumns().size(), 4); + + Assert.assertTrue(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable)); + } + + @Test + public void testContainsUnionType_OrcNestedValue() { + final State serdeProps = new State(); + serdeProps.setProp("columns", "nestedNonOptionalUT"); + serdeProps.setProp("columns.types", "map,struct>>>>"); + + HiveTable hiveTable = createTestHiveTable_ORC(serdeProps); + Assert.assertEquals(hiveTable.getColumns().size(), 1); + + Assert.assertTrue(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable)); + } + + @Test + public void testContainsUnionType_OrcNestedUnionPrimitive() { + final State serdeProps = new State(); + serdeProps.setProp("columns", "nesteduniontypeint"); + serdeProps.setProp("columns.types", "uniontype>>>>"); + + HiveTable hiveTable = createTestHiveTable_ORC(serdeProps); + Assert.assertEquals(hiveTable.getColumns().size(), 1); + + Assert.assertFalse(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable)); + } + + @Test + public void testContainsUnionType_OrcPrimitive() { + final State serdeProps = new State(); + serdeProps.setProp("columns", "timestamp,uniontypeint"); + serdeProps.setProp("columns.types", "bigint,uniontype"); + + HiveTable hiveTable = createTestHiveTable_ORC(serdeProps); + Assert.assertEquals(hiveTable.getColumns().size(), 2); + + Assert.assertFalse(HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(hiveTable)); + } + + private HiveTable createTestHiveTable_ORC(State props) { + return createTestHiveTable("testDb", "testTable", props, (hiveTable) -> { + hiveTable.setInputFormat(OrcInputFormat.class.getName()); + hiveTable.setOutputFormat(OrcOutputFormat.class.getName()); + hiveTable.setSerDeType(OrcSerde.class.getName()); + return null; + }); + } + + private HiveTable createTestHiveTable_Avro(State props) { + return createTestHiveTable("testDB", "testTable", props, (hiveTable) -> { + hiveTable.setInputFormat(AvroContainerInputFormat.class.getName()); + hiveTable.setOutputFormat(AvroContainerOutputFormat.class.getName()); + hiveTable.setSerDeType(AvroSerDe.class.getName()); + return null; + }); + } + + private HiveTable createTestHiveTable(String dbName, String tableName, State props, Function additionalSetup) { + HiveTable.Builder builder = new HiveTable.Builder(); + HiveTable hiveTable = builder.withDbName(dbName).withTableName(tableName).withProps(props).build(); + additionalSetup.apply(hiveTable); + + // Serialize then deserialize as a way to quickly setup tables for other tests in util class + Table table = HiveMetaStoreUtils.getTable(hiveTable); + return HiveMetaStoreUtils.getHiveTable(table); + } }