Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,10 @@ private static Optional<Function<Block, Block>> createCoercer(TypeManager typeMa
return Optional.of(new MapCoercer(typeManager, fromHiveType, toHiveType));
}
if (isRowType(fromType) && isRowType(toType)) {
if (fromHiveType.getCategory() == ObjectInspector.Category.UNION || toHiveType.getCategory() == ObjectInspector.Category.UNION) {
HiveType fromHiveTypeStruct = HiveType.toHiveType(fromType);
HiveType toHiveTypeStruct = HiveType.toHiveType(toType);
return Optional.of(new StructCoercer(typeManager, fromHiveTypeStruct, toHiveTypeStruct));
}
return Optional.of(new StructCoercer(typeManager, fromHiveType, toHiveType));
HiveType fromHiveTypeStruct = (fromHiveType.getCategory() == ObjectInspector.Category.UNION) ? HiveType.toHiveType(fromType) : fromHiveType;
Comment thread
groupcache4321 marked this conversation as resolved.
Outdated
Comment thread
groupcache4321 marked this conversation as resolved.
Outdated
HiveType toHiveTypeStruct = (toHiveType.getCategory() == ObjectInspector.Category.UNION) ? HiveType.toHiveType(toType) : toHiveType;

return Optional.of(new StructCoercer(typeManager, fromHiveTypeStruct, toHiveTypeStruct));
}

throw new TrinoException(NOT_SUPPORTED, format("Unsupported coercion from %s to %s", fromHiveType, toHiveType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static io.trino.plugin.hive.HivePartition.UNPARTITIONED_ID;
import static io.trino.plugin.hive.HiveSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.trino.plugin.hive.HiveSessionProperties.isIgnoreAbsentPartitions;
import static io.trino.plugin.hive.HiveSessionProperties.isOptimizeSymlinkListing;
import static io.trino.plugin.hive.HiveSessionProperties.isPropagateTableScanSortingProperties;
Expand Down Expand Up @@ -403,15 +404,16 @@ private Iterator<HivePartitionMetadata> getPartitionMetadata(

private TableToPartitionMapping getTableToPartitionMapping(ConnectorSession session, Optional<HiveStorageFormat> storageFormat, SchemaTableName tableName, String partName, List<Column> tableColumns, List<Column> partitionColumns)
{
HiveTimestampPrecision hiveTimestampPrecision = getTimestampPrecision(session);
if (storageFormat.isPresent() && isPartitionUsesColumnNames(session, storageFormat.get())) {
return getTableToPartitionMappingByColumnNames(tableName, partName, tableColumns, partitionColumns);
return getTableToPartitionMappingByColumnNames(tableName, partName, tableColumns, partitionColumns, hiveTimestampPrecision);
}
ImmutableMap.Builder<Integer, HiveTypeName> columnCoercions = ImmutableMap.builder();
for (int i = 0; i < min(partitionColumns.size(), tableColumns.size()); i++) {
HiveType tableType = tableColumns.get(i).getType();
HiveType partitionType = partitionColumns.get(i).getType();
if (!tableType.equals(partitionType)) {
if (!canCoerce(typeManager, partitionType, tableType)) {
if (!canCoerce(typeManager, partitionType, tableType, hiveTimestampPrecision)) {
throw tablePartitionColumnMismatchException(tableName, partName, tableColumns.get(i).getName(), tableType, partitionColumns.get(i).getName(), partitionType);
}
columnCoercions.put(i, partitionType.getHiveTypeName());
Expand All @@ -436,7 +438,7 @@ private static boolean isPartitionUsesColumnNames(ConnectorSession session, Hive
}
}

private TableToPartitionMapping getTableToPartitionMappingByColumnNames(SchemaTableName tableName, String partName, List<Column> tableColumns, List<Column> partitionColumns)
private TableToPartitionMapping getTableToPartitionMappingByColumnNames(SchemaTableName tableName, String partName, List<Column> tableColumns, List<Column> partitionColumns, HiveTimestampPrecision hiveTimestampPrecision)
{
ImmutableMap.Builder<String, Integer> partitionColumnIndexesBuilder = ImmutableMap.builder();
for (int i = 0; i < partitionColumns.size(); i++) {
Expand All @@ -457,7 +459,7 @@ private TableToPartitionMapping getTableToPartitionMappingByColumnNames(SchemaTa
Column partitionColumn = partitionColumns.get(partitionColumnIndex);
HiveType partitionType = partitionColumn.getType();
if (!tableType.equals(partitionType)) {
if (!canCoerce(typeManager, partitionType, tableType)) {
if (!canCoerce(typeManager, partitionType, tableType, hiveTimestampPrecision)) {
throw tablePartitionColumnMismatchException(tableName, partName, tableColumn.getName(), tableType, partitionColumn.getName(), partitionType);
}
columnCoercions.put(partitionColumnIndex, partitionType.getHiveTypeName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive.util;

import io.trino.plugin.hive.HiveTimestampPrecision;
import io.trino.plugin.hive.HiveType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Type;
Expand All @@ -25,6 +26,7 @@

import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.hive.HiveType.HIVE_BYTE;
import static io.trino.plugin.hive.HiveType.HIVE_DOUBLE;
import static io.trino.plugin.hive.HiveType.HIVE_FLOAT;
Expand All @@ -33,6 +35,7 @@
import static io.trino.plugin.hive.HiveType.HIVE_SHORT;
import static io.trino.plugin.hive.util.HiveUtil.extractStructFieldTypes;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public final class HiveCoercionPolicy
Expand All @@ -44,15 +47,15 @@ private HiveCoercionPolicy(TypeManager typeManager)
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

public static boolean canCoerce(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType)
public static boolean canCoerce(TypeManager typeManager, HiveType fromHiveType, HiveType toHiveType, HiveTimestampPrecision hiveTimestampPrecision)
{
return new HiveCoercionPolicy(typeManager).canCoerce(fromHiveType, toHiveType);
return new HiveCoercionPolicy(typeManager).canCoerce(fromHiveType, toHiveType, hiveTimestampPrecision);
}

private boolean canCoerce(HiveType fromHiveType, HiveType toHiveType)
private boolean canCoerce(HiveType fromHiveType, HiveType toHiveType, HiveTimestampPrecision hiveTimestampPrecision)
{
Type fromType = typeManager.getType(fromHiveType.getTypeSignature());
Type toType = typeManager.getType(toHiveType.getTypeSignature());
Type fromType = typeManager.getType(fromHiveType.getTypeSignature(hiveTimestampPrecision));
Type toType = typeManager.getType(toHiveType.getTypeSignature(hiveTimestampPrecision));
if (fromType instanceof VarcharType) {
return toType instanceof VarcharType ||
toHiveType.equals(HIVE_BYTE) ||
Expand Down Expand Up @@ -82,25 +85,12 @@ private boolean canCoerce(HiveType fromHiveType, HiveType toHiveType)
return toType instanceof DecimalType || toHiveType.equals(HIVE_FLOAT) || toHiveType.equals(HIVE_DOUBLE);
}

return canCoerceForList(fromHiveType, toHiveType)
|| canCoerceForMap(fromHiveType, toHiveType)
|| canCoerceForStruct(fromHiveType, toHiveType)
|| canCoerceForUnionType(fromHiveType, toHiveType);
return canCoerceForList(fromHiveType, toHiveType, hiveTimestampPrecision)
|| canCoerceForMap(fromHiveType, toHiveType, hiveTimestampPrecision)
|| canCoerceForStructOrUnion(fromHiveType, toHiveType, hiveTimestampPrecision);
}

private boolean canCoerceForUnionType(HiveType fromHiveType, HiveType toHiveType)
Comment thread
groupcache4321 marked this conversation as resolved.
Outdated
{
if (fromHiveType.getCategory() != Category.UNION || toHiveType.getCategory() != Category.UNION) {
return false;
}

// Delegate to the struct coercion logic, since Trino sees union types as structs.
HiveType fromHiveTypeStruct = HiveType.toHiveType(fromHiveType.getType(typeManager));
HiveType toHiveTypeStruct = HiveType.toHiveType(toHiveType.getType(typeManager));
return canCoerceForStruct(fromHiveTypeStruct, toHiveTypeStruct);
}

private boolean canCoerceForMap(HiveType fromHiveType, HiveType toHiveType)
private boolean canCoerceForMap(HiveType fromHiveType, HiveType toHiveType, HiveTimestampPrecision hiveTimestampPrecision)
{
if (fromHiveType.getCategory() != Category.MAP || toHiveType.getCategory() != Category.MAP) {
return false;
Expand All @@ -109,29 +99,32 @@ private boolean canCoerceForMap(HiveType fromHiveType, HiveType toHiveType)
HiveType fromValueType = HiveType.valueOf(((MapTypeInfo) fromHiveType.getTypeInfo()).getMapValueTypeInfo().getTypeName());
HiveType toKeyType = HiveType.valueOf(((MapTypeInfo) toHiveType.getTypeInfo()).getMapKeyTypeInfo().getTypeName());
HiveType toValueType = HiveType.valueOf(((MapTypeInfo) toHiveType.getTypeInfo()).getMapValueTypeInfo().getTypeName());
return (fromKeyType.equals(toKeyType) || canCoerce(fromKeyType, toKeyType)) &&
(fromValueType.equals(toValueType) || canCoerce(fromValueType, toValueType));
return (fromKeyType.equals(toKeyType) || canCoerce(fromKeyType, toKeyType, hiveTimestampPrecision)) &&
(fromValueType.equals(toValueType) || canCoerce(fromValueType, toValueType, hiveTimestampPrecision));
}

private boolean canCoerceForList(HiveType fromHiveType, HiveType toHiveType)
private boolean canCoerceForList(HiveType fromHiveType, HiveType toHiveType, HiveTimestampPrecision hiveTimestampPrecision)
{
if (fromHiveType.getCategory() != Category.LIST || toHiveType.getCategory() != Category.LIST) {
return false;
}
HiveType fromElementType = HiveType.valueOf(((ListTypeInfo) fromHiveType.getTypeInfo()).getListElementTypeInfo().getTypeName());
HiveType toElementType = HiveType.valueOf(((ListTypeInfo) toHiveType.getTypeInfo()).getListElementTypeInfo().getTypeName());
return fromElementType.equals(toElementType) || canCoerce(fromElementType, toElementType);
return fromElementType.equals(toElementType) || canCoerce(fromElementType, toElementType, hiveTimestampPrecision);
}

private boolean canCoerceForStruct(HiveType fromHiveType, HiveType toHiveType)
private boolean canCoerceForStructOrUnion(HiveType fromHiveType, HiveType toHiveType, HiveTimestampPrecision hiveTimestampPrecision)
{
if (fromHiveType.getCategory() != Category.STRUCT || toHiveType.getCategory() != Category.STRUCT) {
if (!isStructOrUnion(fromHiveType) || !isStructOrUnion(toHiveType)) {
return false;
}
List<String> fromFieldNames = ((StructTypeInfo) fromHiveType.getTypeInfo()).getAllStructFieldNames();
List<String> toFieldNames = ((StructTypeInfo) toHiveType.getTypeInfo()).getAllStructFieldNames();
List<HiveType> fromFieldTypes = extractStructFieldTypes(fromHiveType);
List<HiveType> toFieldTypes = extractStructFieldTypes(toHiveType);
HiveType fromHiveTypeStruct = (fromHiveType.getCategory() == Category.UNION) ? convertUnionToStruct(fromHiveType, typeManager, hiveTimestampPrecision) : fromHiveType;
HiveType toHiveTypeStruct = (toHiveType.getCategory() == Category.UNION) ? convertUnionToStruct(toHiveType, typeManager, hiveTimestampPrecision) : toHiveType;

List<String> fromFieldNames = ((StructTypeInfo) fromHiveTypeStruct.getTypeInfo()).getAllStructFieldNames();
List<String> toFieldNames = ((StructTypeInfo) toHiveTypeStruct.getTypeInfo()).getAllStructFieldNames();
List<HiveType> fromFieldTypes = extractStructFieldTypes(fromHiveTypeStruct);
List<HiveType> toFieldTypes = extractStructFieldTypes(toHiveTypeStruct);
// Rule:
// * Fields may be added or dropped from the end.
// * For all other field indices, the corresponding fields must have
Expand All @@ -140,10 +133,21 @@ private boolean canCoerceForStruct(HiveType fromHiveType, HiveType toHiveType)
if (!fromFieldNames.get(i).equalsIgnoreCase(toFieldNames.get(i))) {
return false;
}
if (!fromFieldTypes.get(i).equals(toFieldTypes.get(i)) && !canCoerce(fromFieldTypes.get(i), toFieldTypes.get(i))) {
if (!fromFieldTypes.get(i).equals(toFieldTypes.get(i)) && !canCoerce(fromFieldTypes.get(i), toFieldTypes.get(i), hiveTimestampPrecision)) {
return false;
}
}
return true;
}

private static boolean isStructOrUnion(HiveType hiveType)
{
return (hiveType.getCategory() == Category.STRUCT) || (hiveType.getCategory() == Category.UNION);
}

private static HiveType convertUnionToStruct(HiveType unionType, TypeManager typeManager, HiveTimestampPrecision hiveTimestampPrecision)
{
checkArgument(unionType.getCategory() == Category.UNION, format("Can only convert union type to struct type, given type: %s", unionType.getTypeSignature(hiveTimestampPrecision)));
return HiveType.toHiveType(unionType.getType(typeManager, hiveTimestampPrecision));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,93 @@ public void testUnionTypeSchemaEvolution(String storageFormat)
}
}

/**
* When reading AVRO file, Trino needs the schema information from Hive metastore to deserialize Avro files.
* Therefore, when an ALTER table was issued in which the hive metastore changed the schema into an incompatible format,
* from Union to Struct or from Struct to Union in this case, Trino could not read those Avro files using the modified Hive metastore schema.
* However, when reading ORC files, Trino does not need schema information from Hive metastore to deserialize ORC files.
* Therefore, it can read ORC files even after changing the schema.
*/
@Test(groups = SMOKE)
public void testORCUnionToStructSchemaEvolution()
Comment thread
phd3 marked this conversation as resolved.
Outdated
{
// According to testing results, the Hive INSERT queries here only work in Hive 1.2
if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) {
throw new SkipException("This test can only be run with Hive 1.2 (default config)");
}
String tableReadUnionAsStruct = "test_read_union_as_struct_" + randomNameSuffix();

onHive().executeQuery("SET hive.exec.dynamic.partition.mode = nonstrict");
onHive().executeQuery("SET hive.exec.dynamic.partition=true");

onHive().executeQuery(format(
"CREATE TABLE %s(" +
"c1 UNIONTYPE<STRUCT<a:STRING,b:STRING>, STRUCT<c:STRING,d:STRING>>) " +
"PARTITIONED BY (p INT) STORED AS %s",
tableReadUnionAsStruct,
"ORC"));

onHive().executeQuery(format("INSERT INTO TABLE %s PARTITION(p) " +
"SELECT CREATE_UNION(1, NAMED_STRUCT('a', 'a1', 'b', 'b1'), NAMED_STRUCT('c', 'ignores', 'd', 'ignore')), 999 FROM (SELECT 1) t",
tableReadUnionAsStruct));

onHive().executeQuery(format("ALTER TABLE %s CHANGE COLUMN c1 c1 " +
" STRUCT<tag:INT, field0:STRUCT<a:STRING, b:STRING>, field1:STRUCT<c:STRING, d:STRING>>",
tableReadUnionAsStruct));

onHive().executeQuery(format("INSERT INTO TABLE %s PARTITION(p) " +
"SELECT NAMED_STRUCT('tag', 0, 'field0', NAMED_STRUCT('a', 'a11', 'b', 'b1b'), 'field1', NAMED_STRUCT('c', 'ignores', 'd', 'ignores')), 100 FROM (SELECT 1) t",
tableReadUnionAsStruct));
// using dereference
QueryResult selectAllResult = onTrino().executeQuery(format("SELECT c1.field0 FROM hive.default.%s", tableReadUnionAsStruct));
// the first insert didn't add value to field0, since the tag is 1 during inserting
assertThat(selectAllResult.column(1)).containsExactlyInAnyOrder(null, Row.builder().addField("a", "a11").addField("b", "b1b").build());
}

/**
* When reading AVRO file, Trino needs the schema information from Hive metastore to deserialize Avro files.
* Therefore, when an ALTER table was issued in which the hive metastore changed the schema into an incompatible format,
* from Union to Struct or from Struct to Union in this case, Trino could not read those Avro files using the modified Hive metastore schema.
* However, when reading ORC files, Trino does not need schema information from Hive metastore to deserialize ORC files.
* Therefore, it can read ORC files even after changing the schema.
*/
@Test(groups = SMOKE)
public void testORCStructToUnionSchemaEvolution()
{
// According to testing results, the Hive INSERT queries here only work in Hive 1.2
if (getHiveVersionMajor() != 1 || getHiveVersionMinor() != 2) {
throw new SkipException("This test can only be run with Hive 1.2 (default config)");
}
String tableReadStructAsUnion = "test_read_struct_as_union_" + randomNameSuffix();

onHive().executeQuery("SET hive.exec.dynamic.partition.mode = nonstrict");
onHive().executeQuery("SET hive.exec.dynamic.partition=true");

onHive().executeQuery(format(
"CREATE TABLE %s(" +
"c1 STRUCT<tag:TINYINT, field0:STRUCT<a:STRING, b:STRING>, field1:STRUCT<c:STRING, d:STRING>>) " +
"PARTITIONED BY (p INT) STORED AS %s",
tableReadStructAsUnion,
"ORC"));

onHive().executeQuery(format("INSERT INTO TABLE %s PARTITION(p) " +
"SELECT NAMED_STRUCT('tag', 0Y, 'field0', NAMED_STRUCT('a', 'a11', 'b', 'b1b'), 'field1', NAMED_STRUCT('c', 'ignores', 'd', 'ignores')), 100 FROM (SELECT 1) t",
tableReadStructAsUnion));

onHive().executeQuery(format("ALTER TABLE %s CHANGE COLUMN c1 c1 " +
" UNIONTYPE<STRUCT<a:STRING,b:STRING>, STRUCT<c:STRING,d:STRING>>",
tableReadStructAsUnion));

onHive().executeQuery(format("INSERT INTO TABLE %s PARTITION(p) " +
"SELECT CREATE_UNION(1, NAMED_STRUCT('a', 'a1', 'b', 'b1'), NAMED_STRUCT('c', 'ignores', 'd', 'ignore')), 999 from (SELECT 1) t",
tableReadStructAsUnion));

// using dereference
QueryResult selectAllResult = onTrino().executeQuery(format("SELECT c1.field0 FROM hive.default.%s", tableReadStructAsUnion));
// the second insert didn't add value to field0, since the tag is 1 during inserting
assertThat(selectAllResult.column(1)).containsExactlyInAnyOrder(null, Row.builder().addField("a", "a11").addField("b", "b1b").build());
}

@Test(groups = SMOKE)
public void testReadOrcUniontypeWithCheckpoint()
{
Expand Down