Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.trino.spi.type.VarcharType;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
Expand Down Expand Up @@ -349,6 +350,11 @@ private static Optional<Function<Block, Block>> createCoercer(TypeManager typeMa
return Optional.of(new MapCoercer(typeManager, fromHiveType, toHiveType));
}
if (isRowType(fromType) && isRowType(toType)) {
if (fromHiveType.getCategory() == ObjectInspector.Category.UNION || toHiveType.getCategory() == ObjectInspector.Category.UNION) {
HiveType fromHiveTypeStruct = HiveType.toHiveType(fromType);
HiveType toHiveTypeStruct = HiveType.toHiveType(toType);
return Optional.of(new StructCoercer(typeManager, fromHiveTypeStruct, toHiveTypeStruct));
}
return Optional.of(new StructCoercer(typeManager, fromHiveType, toHiveType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,22 @@ private boolean canCoerce(HiveType fromHiveType, HiveType toHiveType)
return toType instanceof DecimalType || toHiveType.equals(HIVE_FLOAT) || toHiveType.equals(HIVE_DOUBLE);
}

return canCoerceForList(fromHiveType, toHiveType) || canCoerceForMap(fromHiveType, toHiveType) || canCoerceForStruct(fromHiveType, toHiveType);
return canCoerceForList(fromHiveType, toHiveType)
|| canCoerceForMap(fromHiveType, toHiveType)
|| canCoerceForStruct(fromHiveType, toHiveType)
|| canCoerceForUnionType(fromHiveType, toHiveType);
}

private boolean canCoerceForUnionType(HiveType fromHiveType, HiveType toHiveType)
{
if (fromHiveType.getCategory() != Category.UNION || toHiveType.getCategory() != Category.UNION) {
return false;
}

// Delegate to the struct coercion logic, since Trino sees union types as structs.
HiveType fromHiveTypeStruct = HiveType.toHiveType(fromHiveType.getType(typeManager));
HiveType toHiveTypeStruct = HiveType.toHiveType(toHiveType.getType(typeManager));
return canCoerceForStruct(fromHiveTypeStruct, toHiveTypeStruct);
}

private boolean canCoerceForMap(HiveType fromHiveType, HiveType toHiveType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ public class TestReadUniontype
extends HiveProductTest
{
private static final String TABLE_NAME = "test_read_uniontype";
private static final String TABLE_NAME_SCHEMA_EVOLUTION = "test_read_uniontype_with_schema_evolution";

@BeforeTestWithContext
@AfterTestWithContext
public void cleanup()
{
onHive().executeQuery(format("DROP TABLE IF EXISTS %s", TABLE_NAME));
onHive().executeQuery(format("DROP TABLE IF EXISTS %s", TABLE_NAME_SCHEMA_EVOLUTION));
}

@DataProvider(name = "storage_formats")
Expand All @@ -49,9 +51,14 @@ public static Object[][] storageFormats()
return new String[][] {{"ORC"}, {"AVRO"}};
}

private void createTestTable(String storageFormat)
@Test(dataProvider = "storage_formats", groups = SMOKE)
public void testReadUniontype(String storageFormat)
{
cleanup();
// According to testing results, the Hive INSERT queries here only work in Hive 1.2
if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) {
throw new SkipException("This test can only be run with Hive 1.2 (default config)");
}

onHive().executeQuery(format(
"CREATE TABLE %s (id INT,foo UNIONTYPE<" +
"INT," +
Expand All @@ -60,16 +67,7 @@ private void createTestTable(String storageFormat)
"STORED AS %s",
TABLE_NAME,
storageFormat));
}

@Test(dataProvider = "storage_formats", groups = SMOKE)
public void testReadUniontype(String storageFormat)
{
// According to testing results, the Hive INSERT queries here only work in Hive 1.2
if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) {
throw new SkipException("This test can only be run with Hive 1.2 (default config)");
}
createTestTable(storageFormat);
// Generate a file with rows:
// 0, {0: 36}
// 1, {1: 7.2}
Expand Down Expand Up @@ -139,6 +137,139 @@ public void testReadUniontype(String storageFormat)
}
}

@Test(dataProvider = "storage_formats", groups = SMOKE)
public void testUnionTypeSchemaEvolution(String storageFormat)
{
// According to testing results, the Hive INSERT queries here only work in Hive 1.2
if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) {
throw new SkipException("This test can only be run with Hive 1.2 (default config)");
}

onHive().executeQuery(format(
"CREATE TABLE %s ("
+ "c0 INT,"
+ "c1 UNIONTYPE<"
+ " STRUCT<a:STRING, b:STRING>, "
+ " STRUCT<c:STRING>>) "
+ "PARTITIONED BY (c2 INT) "
+ "STORED AS %s",
TABLE_NAME_SCHEMA_EVOLUTION,
storageFormat));
switch (storageFormat) {
case "AVRO":
testAvroSchemaEvolution();
break;
case "ORC":
testORCSchemaEvolution();
break;
default:
throw new UnsupportedOperationException("Unsupported table format.");
}
}

private void testORCSchemaEvolution()
{
// Generate a file with rows:
// 0, {0: <a="a1",b="b1">}
// 1, {1: <c="c1">}
onHive().executeQuery(format("INSERT INTO TABLE %s PARTITION (c2 = 5) "
+ "SELECT 0, create_union(0, named_struct('a', 'a1', 'b', 'b1'), named_struct('c', 'ignore')) "
+ "UNION ALL "
+ "SELECT 1, create_union(1, named_struct('a', 'ignore', 'b', 'ignore'), named_struct('c', 'c1'))",
TABLE_NAME_SCHEMA_EVOLUTION));

// Add a coercible change inside union type column.
onHive().executeQuery(format("ALTER TABLE %S CHANGE COLUMN c1 c1 UNIONTYPE<STRUCT<a:STRING, b:STRING>, STRUCT<c:STRING, d:STRING>>",
TABLE_NAME_SCHEMA_EVOLUTION));

QueryResult selectAllResult = onTrino().executeQuery(format("SELECT c0, c1 FROM %s", TABLE_NAME_SCHEMA_EVOLUTION));
assertEquals(selectAllResult.rows().size(), 2);
for (List<?> row : selectAllResult.rows()) {
int id = (Integer) row.get(0);
switch (id) {
case 0:
Row rowValueFirst = rowBuilder().addField("a", "a1").addField("b", "b1").build();
assertStructEquals(row.get(1), new Object[]{(byte) 0, rowValueFirst, null});
break;
case 1:
Row rowValueSecond = rowBuilder().addField("c", "c1").addField("d", null).build();
assertStructEquals(row.get(1), new Object[]{(byte) 1, null, rowValueSecond});
break;
}
}
}

private void testAvroSchemaEvolution()
{
/**
* The following insertion fails on avro.
*
* hive (default)> INSERT INTO TABLE u_username.test_ut_avro partition (c2 = 5)
* > SELECT 1, create_union(1, named_struct('a', 'ignore', 'b', 'ignore'), named_struct('c', 'c1'));
*
* Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing writable (null)
* at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:179)
* at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
* at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:459)
* at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
* at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:177)
* at java.security.AccessController.doPrivileged(Native Method)
* at javax.security.auth.Subject.doAs(Subject.java:422)
* at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
* at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
* Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing writable (null)
* at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:505)
* at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:170)
* ... 8 more
* Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
* at org.apache.avro.generic.GenericData$Record.get(GenericData.java:135)
* at org.apache.avro.generic.GenericData.getField(GenericData.java:580)
* at org.apache.avro.generic.GenericData.validate(GenericData.java:373)
* at org.apache.avro.generic.GenericData.validate(GenericData.java:395)
* at org.apache.avro.generic.GenericData.validate(GenericData.java:373)
* at org.apache.hadoop.hive.serde2.avro.AvroSerializer.serialize(AvroSerializer.java:96)
*
* So we try coercion logic on the first struct field inside the union (i.e. only for <a,b> struct) only.
*
*/
// Generate a file with rows:
// 0, {0: <a="a1",b="b1">}
// 1, {0: <a="a2",b="b2">}
onHive().executeQuery(format(
"INSERT INTO TABLE %s PARTITION (c2 = 5) "
+ "SELECT 0, create_union(0, named_struct('a', 'a1', 'b', 'b1'), named_struct('c', 'ignore')) "
+ "UNION ALL "
+ "SELECT 1, create_union(0, named_struct('a', 'a2', 'b', 'b2'), named_struct('c', 'ignore'))",
TABLE_NAME_SCHEMA_EVOLUTION));

// Add a coercible change inside union type column.
onHive().executeQuery(format("ALTER TABLE %S CHANGE COLUMN c1 c1 UNIONTYPE<STRUCT<a:STRING, b:STRING, d:STRING>, STRUCT<c:STRING>>", TABLE_NAME_SCHEMA_EVOLUTION));

QueryResult selectAllResult = onTrino().executeQuery(format("SELECT c0, c1 FROM %s", TABLE_NAME_SCHEMA_EVOLUTION));
assertEquals(selectAllResult.rows().size(), 2);
for (List<?> row : selectAllResult.rows()) {
int id = (Integer) row.get(0);
switch (id) {
case 0:
Row rowValueFirst = rowBuilder()
.addField("a", "a1")
.addField("b", "b1")
.addField("d", null)
.build();
assertStructEquals(row.get(1), new Object[] {(byte) 0, rowValueFirst, null});
break;
case 1:
Row rowValueSecond = rowBuilder()
.addField("a", "a2")
.addField("b", "b2")
.addField("d", null)
.build();
assertStructEquals(row.get(1), new Object[] {(byte) 0, rowValueSecond, null});
break;
}
}
}

// TODO use Row as expected too, and use tempto QueryAssert
private static void assertStructEquals(Object actual, Object[] expected)
{
Expand All @@ -149,4 +280,9 @@ private static void assertStructEquals(Object actual, Object[] expected)
assertEquals(actualRow.getFields().get(i).getValue(), expected[i]);
}
}

private static io.trino.jdbc.Row.Builder rowBuilder()
{
return io.trino.jdbc.Row.builder();
}
}