diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java index 7db2b87fbbd4..d7780f4ada70 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HivePageSource.java @@ -354,12 +354,10 @@ private static Optional> createCoercer(TypeManager typeMa return Optional.of(new MapCoercer(typeManager, fromHiveType, toHiveType)); } if (isRowType(fromType) && isRowType(toType)) { - if (fromHiveType.getCategory() == ObjectInspector.Category.UNION || toHiveType.getCategory() == ObjectInspector.Category.UNION) { - HiveType fromHiveTypeStruct = HiveType.toHiveType(fromType); - HiveType toHiveTypeStruct = HiveType.toHiveType(toType); - return Optional.of(new StructCoercer(typeManager, fromHiveTypeStruct, toHiveTypeStruct)); - } - return Optional.of(new StructCoercer(typeManager, fromHiveType, toHiveType)); + HiveType fromHiveTypeStruct = (fromHiveType.getCategory() == ObjectInspector.Category.UNION) ? HiveType.toHiveType(fromType) : fromHiveType; + HiveType toHiveTypeStruct = (toHiveType.getCategory() == ObjectInspector.Category.UNION) ? HiveType.toHiveType(toType) : toHiveType; + + return Optional.of(new StructCoercer(typeManager, fromHiveTypeStruct, toHiveTypeStruct)); } throw new TrinoException(NOT_SUPPORTED, format("Unsupported coercion from %s to %s", fromHiveType, toHiveType)); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java index c537fd841907..20015103e676 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java @@ -71,6 +71,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH; import static io.trino.plugin.hive.HivePartition.UNPARTITIONED_ID; import static io.trino.plugin.hive.HiveSessionProperties.getDynamicFilteringWaitTimeout; +import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision; import static io.trino.plugin.hive.HiveSessionProperties.isIgnoreAbsentPartitions; import static io.trino.plugin.hive.HiveSessionProperties.isOptimizeSymlinkListing; import static io.trino.plugin.hive.HiveSessionProperties.isPropagateTableScanSortingProperties; @@ -403,15 +404,16 @@ private Iterator getPartitionMetadata( private TableToPartitionMapping getTableToPartitionMapping(ConnectorSession session, Optional storageFormat, SchemaTableName tableName, String partName, List tableColumns, List partitionColumns) { + HiveTimestampPrecision hiveTimestampPrecision = getTimestampPrecision(session); if (storageFormat.isPresent() && isPartitionUsesColumnNames(session, storageFormat.get())) { - return getTableToPartitionMappingByColumnNames(tableName, partName, tableColumns, partitionColumns); + return getTableToPartitionMappingByColumnNames(tableName, partName, tableColumns, partitionColumns, hiveTimestampPrecision); } ImmutableMap.Builder columnCoercions = ImmutableMap.builder(); for (int i = 0; i < min(partitionColumns.size(), tableColumns.size()); i++) { HiveType tableType = tableColumns.get(i).getType(); HiveType partitionType = partitionColumns.get(i).getType(); if (!tableType.equals(partitionType)) { - if (!canCoerce(typeManager, partitionType, tableType)) { + if (!canCoerce(typeManager, partitionType, tableType, hiveTimestampPrecision)) { throw tablePartitionColumnMismatchException(tableName, partName, tableColumns.get(i).getName(), tableType, partitionColumns.get(i).getName(), partitionType); } columnCoercions.put(i, partitionType.getHiveTypeName()); @@ -436,7 +438,7 @@ private static boolean isPartitionUsesColumnNames(ConnectorSession session, Hive } } - private TableToPartitionMapping getTableToPartitionMappingByColumnNames(SchemaTableName tableName, String partName, List tableColumns, List partitionColumns) + private TableToPartitionMapping getTableToPartitionMappingByColumnNames(SchemaTableName tableName, String partName, List tableColumns, List partitionColumns, HiveTimestampPrecision hiveTimestampPrecision) { ImmutableMap.Builder partitionColumnIndexesBuilder = ImmutableMap.builder(); for (int i = 0; i < partitionColumns.size(); i++) { @@ -457,7 +459,7 @@ private TableToPartitionMapping getTableToPartitionMappingByColumnNames(SchemaTa Column partitionColumn = partitionColumns.get(partitionColumnIndex); HiveType partitionType = partitionColumn.getType(); if (!tableType.equals(partitionType)) { - if (!canCoerce(typeManager, partitionType, tableType)) { + if (!canCoerce(typeManager, partitionType, tableType, hiveTimestampPrecision)) { throw tablePartitionColumnMismatchException(tableName, partName, tableColumn.getName(), tableType, partitionColumn.getName(), partitionType); } columnCoercions.put(partitionColumnIndex, partitionType.getHiveTypeName()); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveCoercionPolicy.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveCoercionPolicy.java index d9de134844e9..f3e3d8ecd431 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveCoercionPolicy.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveCoercionPolicy.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.hive.util; +import io.trino.plugin.hive.HiveTimestampPrecision; import io.trino.plugin.hive.HiveType; import io.trino.spi.type.DecimalType; import io.trino.spi.type.Type; @@ -25,6 +26,7 @@ import java.util.List; +import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.hive.HiveType.HIVE_BYTE; import static io.trino.plugin.hive.HiveType.HIVE_DOUBLE; import static io.trino.plugin.hive.HiveType.HIVE_FLOAT; @@ -33,6 +35,7 @@ import static io.trino.plugin.hive.HiveType.HIVE_SHORT; import static io.trino.plugin.hive.util.HiveUtil.extractStructFieldTypes; import static java.lang.Math.min; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; public final class HiveCoercionPolicy @@ -44,15 +47,15 @@ private HiveCoercionPolicy(TypeManager typeManager) this.typeManager = requireNonNull(typeManager, "typeManager is null"); } - public static boolean canCoerce(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType) + public static boolean canCoerce(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType, HiveTimestampPrecision hiveTimestampPrecision) { - return new HiveCoercionPolicy(typeManager).canCoerce(fromHiveType, toHiveType); + return new HiveCoercionPolicy(typeManager).canCoerce(fromHiveType, toHiveType, hiveTimestampPrecision); } - private boolean canCoerce(HiveType fromHiveType, HiveType toHiveType) + private boolean canCoerce(HiveType fromHiveType, HiveType toHiveType, HiveTimestampPrecision hiveTimestampPrecision) { - Type fromType = typeManager.getType(fromHiveType.getTypeSignature()); - Type toType = typeManager.getType(toHiveType.getTypeSignature()); + Type fromType = typeManager.getType(fromHiveType.getTypeSignature(hiveTimestampPrecision)); + Type toType = typeManager.getType(toHiveType.getTypeSignature(hiveTimestampPrecision)); if (fromType instanceof VarcharType) { return toType instanceof VarcharType || toHiveType.equals(HIVE_BYTE) || @@ -82,25 +85,12 @@ private boolean canCoerce(HiveType fromHiveType, HiveType toHiveType) return toType instanceof DecimalType || toHiveType.equals(HIVE_FLOAT) || toHiveType.equals(HIVE_DOUBLE); } - return canCoerceForList(fromHiveType, toHiveType) - || canCoerceForMap(fromHiveType, toHiveType) - || canCoerceForStruct(fromHiveType, toHiveType) - || canCoerceForUnionType(fromHiveType, toHiveType); + return canCoerceForList(fromHiveType, toHiveType, hiveTimestampPrecision) + || canCoerceForMap(fromHiveType, toHiveType, hiveTimestampPrecision) + || canCoerceForStructOrUnion(fromHiveType, toHiveType, hiveTimestampPrecision); } - private boolean canCoerceForUnionType(HiveType fromHiveType, HiveType toHiveType) - { - if (fromHiveType.getCategory() != Category.UNION || toHiveType.getCategory() != Category.UNION) { - return false; - } - - // Delegate to the struct coercion logic, since Trino sees union types as structs. - HiveType fromHiveTypeStruct = HiveType.toHiveType(fromHiveType.getType(typeManager)); - HiveType toHiveTypeStruct = HiveType.toHiveType(toHiveType.getType(typeManager)); - return canCoerceForStruct(fromHiveTypeStruct, toHiveTypeStruct); - } - - private boolean canCoerceForMap(HiveType fromHiveType, HiveType toHiveType) + private boolean canCoerceForMap(HiveType fromHiveType, HiveType toHiveType, HiveTimestampPrecision hiveTimestampPrecision) { if (fromHiveType.getCategory() != Category.MAP || toHiveType.getCategory() != Category.MAP) { return false; @@ -109,29 +99,32 @@ private boolean canCoerceForMap(HiveType fromHiveType, HiveType toHiveType) HiveType fromValueType = HiveType.valueOf(((MapTypeInfo) fromHiveType.getTypeInfo()).getMapValueTypeInfo().getTypeName()); HiveType toKeyType = HiveType.valueOf(((MapTypeInfo) toHiveType.getTypeInfo()).getMapKeyTypeInfo().getTypeName()); HiveType toValueType = HiveType.valueOf(((MapTypeInfo) toHiveType.getTypeInfo()).getMapValueTypeInfo().getTypeName()); - return (fromKeyType.equals(toKeyType) || canCoerce(fromKeyType, toKeyType)) && - (fromValueType.equals(toValueType) || canCoerce(fromValueType, toValueType)); + return (fromKeyType.equals(toKeyType) || canCoerce(fromKeyType, toKeyType, hiveTimestampPrecision)) && + (fromValueType.equals(toValueType) || canCoerce(fromValueType, toValueType, hiveTimestampPrecision)); } - private boolean canCoerceForList(HiveType fromHiveType, HiveType toHiveType) + private boolean canCoerceForList(HiveType fromHiveType, HiveType toHiveType, HiveTimestampPrecision hiveTimestampPrecision) { if (fromHiveType.getCategory() != Category.LIST || toHiveType.getCategory() != Category.LIST) { return false; } HiveType fromElementType = HiveType.valueOf(((ListTypeInfo) fromHiveType.getTypeInfo()).getListElementTypeInfo().getTypeName()); HiveType toElementType = HiveType.valueOf(((ListTypeInfo) toHiveType.getTypeInfo()).getListElementTypeInfo().getTypeName()); - return fromElementType.equals(toElementType) || canCoerce(fromElementType, toElementType); + return fromElementType.equals(toElementType) || canCoerce(fromElementType, toElementType, hiveTimestampPrecision); } - private boolean canCoerceForStruct(HiveType fromHiveType, HiveType toHiveType) + private boolean canCoerceForStructOrUnion(HiveType fromHiveType, HiveType toHiveType, HiveTimestampPrecision hiveTimestampPrecision) { - if (fromHiveType.getCategory() != Category.STRUCT || toHiveType.getCategory() != Category.STRUCT) { + if (!isStructOrUnion(fromHiveType) || !isStructOrUnion(toHiveType)) { return false; } - List fromFieldNames = ((StructTypeInfo) fromHiveType.getTypeInfo()).getAllStructFieldNames(); - List toFieldNames = ((StructTypeInfo) toHiveType.getTypeInfo()).getAllStructFieldNames(); - List fromFieldTypes = extractStructFieldTypes(fromHiveType); - List toFieldTypes = extractStructFieldTypes(toHiveType); + HiveType fromHiveTypeStruct = (fromHiveType.getCategory() == Category.UNION) ? convertUnionToStruct(fromHiveType, typeManager, hiveTimestampPrecision) : fromHiveType; + HiveType toHiveTypeStruct = (toHiveType.getCategory() == Category.UNION) ? convertUnionToStruct(toHiveType, typeManager, hiveTimestampPrecision) : toHiveType; + + List fromFieldNames = ((StructTypeInfo) fromHiveTypeStruct.getTypeInfo()).getAllStructFieldNames(); + List toFieldNames = ((StructTypeInfo) toHiveTypeStruct.getTypeInfo()).getAllStructFieldNames(); + List fromFieldTypes = extractStructFieldTypes(fromHiveTypeStruct); + List toFieldTypes = extractStructFieldTypes(toHiveTypeStruct); // Rule: // * Fields may be added or dropped from the end. // * For all other field indices, the corresponding fields must have @@ -140,10 +133,21 @@ private boolean canCoerceForStruct(HiveType fromHiveType, HiveType toHiveType) if (!fromFieldNames.get(i).equalsIgnoreCase(toFieldNames.get(i))) { return false; } - if (!fromFieldTypes.get(i).equals(toFieldTypes.get(i)) && !canCoerce(fromFieldTypes.get(i), toFieldTypes.get(i))) { + if (!fromFieldTypes.get(i).equals(toFieldTypes.get(i)) && !canCoerce(fromFieldTypes.get(i), toFieldTypes.get(i), hiveTimestampPrecision)) { return false; } } return true; } + + private static boolean isStructOrUnion(HiveType hiveType) + { + return (hiveType.getCategory() == Category.STRUCT) || (hiveType.getCategory() == Category.UNION); + } + + private static HiveType convertUnionToStruct(HiveType unionType, TypeManager typeManager, HiveTimestampPrecision hiveTimestampPrecision) + { + checkArgument(unionType.getCategory() == Category.UNION, format("Can only convert union type to struct type, given type: %s", unionType.getTypeSignature(hiveTimestampPrecision))); + return HiveType.toHiveType(unionType.getType(typeManager, hiveTimestampPrecision)); + } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestReadUniontype.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestReadUniontype.java index 062ae91d9c3b..17a77ae8e36c 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestReadUniontype.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/hive/TestReadUniontype.java @@ -268,6 +268,93 @@ public void testUnionTypeSchemaEvolution(String storageFormat) } } + /** + * When reading AVRO file, Trino needs the schema information from Hive metastore to deserialize Avro files. + * Therefore, when an ALTER table was issued in which the hive metastore changed the schema into an incompatible format, + * from Union to Struct or from Struct to Union in this case, Trino could not read those Avro files using the modified Hive metastore schema. + * However, when reading ORC files, Trino does not need schema information from Hive metastore to deserialize ORC files. + * Therefore, it can read ORC files even after changing the schema. + */ + @Test(groups = SMOKE) + public void testORCUnionToStructSchemaEvolution() + { + // According to testing results, the Hive INSERT queries here only work in Hive 1.2 + if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) { + throw new SkipException("This test can only be run with Hive 1.2 (default config)"); + } + String tableReadUnionAsStruct = "test_read_union_as_struct_" + randomNameSuffix(); + + onHive().executeQuery("SET hive.exec.dynamic.partition.mode = nonstrict"); + onHive().executeQuery("SET hive.exec.dynamic.partition=true"); + + onHive().executeQuery(format( + "CREATE TABLE %s(" + + "c1 UNIONTYPE, STRUCT>) " + + "PARTITIONED BY (p INT) STORED AS %s", + tableReadUnionAsStruct, + "ORC")); + + onHive().executeQuery(format("INSERT INTO TABLE %s PARTITION(p) " + + "SELECT CREATE_UNION(1, NAMED_STRUCT('a', 'a1', 'b', 'b1'), NAMED_STRUCT('c', 'ignores', 'd', 'ignore')), 999 FROM (SELECT 1) t", + tableReadUnionAsStruct)); + + onHive().executeQuery(format("ALTER TABLE %s CHANGE COLUMN c1 c1 " + + " STRUCT, field1:STRUCT>", + tableReadUnionAsStruct)); + + onHive().executeQuery(format("INSERT INTO TABLE %s PARTITION(p) " + + "SELECT NAMED_STRUCT('tag', 0, 'field0', NAMED_STRUCT('a', 'a11', 'b', 'b1b'), 'field1', NAMED_STRUCT('c', 'ignores', 'd', 'ignores')), 100 FROM (SELECT 1) t", + tableReadUnionAsStruct)); + // using dereference + QueryResult selectAllResult = onTrino().executeQuery(format("SELECT c1.field0 FROM hive.default.%s", tableReadUnionAsStruct)); + // the first insert didn't add value to field0, since the tag is 1 during inserting + assertThat(selectAllResult.column(1)).containsExactlyInAnyOrder(null, Row.builder().addField("a", "a11").addField("b", "b1b").build()); + } + + /** + * When reading AVRO file, Trino needs the schema information from Hive metastore to deserialize Avro files. + * Therefore, when an ALTER table was issued in which the hive metastore changed the schema into an incompatible format, + * from Union to Struct or from Struct to Union in this case, Trino could not read those Avro files using the modified Hive metastore schema. + * However, when reading ORC files, Trino does not need schema information from Hive metastore to deserialize ORC files. + * Therefore, it can read ORC files even after changing the schema. + */ + @Test(groups = SMOKE) + public void testORCStructToUnionSchemaEvolution() + { + // According to testing results, the Hive INSERT queries here only work in Hive 1.2 + if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) { + throw new SkipException("This test can only be run with Hive 1.2 (default config)"); + } + String tableReadStructAsUnion = "test_read_struct_as_union_" + randomNameSuffix(); + + onHive().executeQuery("SET hive.exec.dynamic.partition.mode = nonstrict"); + onHive().executeQuery("SET hive.exec.dynamic.partition=true"); + + onHive().executeQuery(format( + "CREATE TABLE %s(" + + "c1 STRUCT, field1:STRUCT>) " + + "PARTITIONED BY (p INT) STORED AS %s", + tableReadStructAsUnion, + "ORC")); + + onHive().executeQuery(format("INSERT INTO TABLE %s PARTITION(p) " + + "SELECT NAMED_STRUCT('tag', 0Y, 'field0', NAMED_STRUCT('a', 'a11', 'b', 'b1b'), 'field1', NAMED_STRUCT('c', 'ignores', 'd', 'ignores')), 100 FROM (SELECT 1) t", + tableReadStructAsUnion)); + + onHive().executeQuery(format("ALTER TABLE %s CHANGE COLUMN c1 c1 " + + " UNIONTYPE, STRUCT>", + tableReadStructAsUnion)); + + onHive().executeQuery(format("INSERT INTO TABLE %s PARTITION(p) " + + "SELECT CREATE_UNION(1, NAMED_STRUCT('a', 'a1', 'b', 'b1'), NAMED_STRUCT('c', 'ignores', 'd', 'ignore')), 999 from (SELECT 1) t", + tableReadStructAsUnion)); + + // using dereference + QueryResult selectAllResult = onTrino().executeQuery(format("SELECT c1.field0 FROM hive.default.%s", tableReadStructAsUnion)); + // the second insert didn't add value to field0, since the tag is 1 during inserting + assertThat(selectAllResult.column(1)).containsExactlyInAnyOrder(null, Row.builder().addField("a", "a11").addField("b", "b1b").build()); + } + @Test(groups = SMOKE) public void testReadOrcUniontypeWithCheckpoint() {