Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.iceberg.types.Type;


public class HiveTypeUtil {
private HiveTypeUtil() {
}

public static Type convert(TypeInfo typeInfo) {
return HiveTypeUtil.visit(typeInfo, new HiveTypeToIcebergType());
}

public static <T> T visit(TypeInfo typeInfo, HiveSchemaVisitor<T> visitor) {
switch (typeInfo.getCategory()) {
case STRUCT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.hive.legacy;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
Expand All @@ -28,7 +29,9 @@
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -51,14 +54,17 @@ private LegacyHiveTableUtils() {
static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) {
Map<String, String> props = getTableProperties(table);
String schemaStr = props.get("avro.schema.literal");
if (schemaStr == null) {
LOG.warn("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. The schema will not" +
" have case sensitivity and nullability information", table.getDbName(), table.getTableName());
// TODO: Add support for tables without avro.schema.literal
throw new UnsupportedOperationException("Reading tables without avro.schema.literal not implemented yet");
Schema schema;
if (schemaStr != null) {
schema = AvroSchemaUtil.toIceberg(new org.apache.avro.Schema.Parser().parse(schemaStr));
} else {
//TODO: Do we need to support column and column.types properties for ORC tables?
LOG.warn("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. " +
"The schema will not have case sensitivity and nullability information",
table.getDbName(), table.getTableName());
Type icebergType = HiveTypeUtil.convert(parseTypeInfo(table.getSd().getCols()));
schema = new Schema(icebergType.asNestedType().asStructType().fields());
}

Schema schema = AvroSchemaUtil.toIceberg(new org.apache.avro.Schema.Parser().parse(schemaStr));
Types.StructType dataStructType = schema.asStruct();
List<Types.NestedField> fields = Lists.newArrayList(dataStructType.fields());

Expand All @@ -68,6 +74,19 @@ static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) {
return new Schema(fields);
}

private static TypeInfo parseTypeInfo(List<FieldSchema> cols) {
Preconditions.checkArgument(cols != null && cols.size() > 0, "No Hive schema present");
List<String> fieldNames = cols
.stream()
.map(FieldSchema::getName)
.collect(Collectors.toList());
List<TypeInfo> fieldTypeInfos = cols
.stream()
.map(f -> TypeInfoUtils.getTypeInfoFromTypeString(f.getType()))
.collect(Collectors.toList());
return TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypeInfos);
}

private static Schema partitionSchema(List<FieldSchema> partitionKeys, Schema dataSchema) {
AtomicInteger fieldId = new AtomicInteger(10000);
List<Types.NestedField> partitionFields = Lists.newArrayList();
Expand All @@ -78,14 +97,14 @@ private static Schema partitionSchema(List<FieldSchema> partitionKeys, Schema da
}
partitionFields.add(
Types.NestedField.optional(
fieldId.incrementAndGet(), f.getName(), icebergType(f.getType()), f.getComment()));
fieldId.incrementAndGet(), f.getName(), primitiveIcebergType(f.getType()), f.getComment()));
});
return new Schema(partitionFields);
}

private static Type icebergType(String hiveTypeString) {
private static Type primitiveIcebergType(String hiveTypeString) {
PrimitiveTypeInfo primitiveTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo(hiveTypeString);
return HiveTypeUtil.visit(primitiveTypeInfo, new HiveTypeToIcebergType());
return HiveTypeUtil.convert(primitiveTypeInfo);
}

static Map<String, String> getTableProperties(org.apache.hadoop.hive.metastore.api.Table table) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive.legacy;

import com.google.common.collect.Lists;
import java.util.List;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;


public class TestHiveSchemaConversions {
@Test
public void testPrimitiveTypes() {
List<Type> primitives = Lists.newArrayList(
Types.BooleanType.get(),
Types.IntegerType.get(),
Types.LongType.get(),
Types.FloatType.get(),
Types.DoubleType.get(),
Types.DateType.get(),
Types.TimestampType.withoutZone(),
Types.StringType.get(),
Types.BinaryType.get(),
Types.DecimalType.of(9, 4)
);

List<PrimitiveTypeInfo> hivePrimitives = Lists.newArrayList(
TypeInfoFactory.booleanTypeInfo,
TypeInfoFactory.intTypeInfo,
TypeInfoFactory.longTypeInfo,
TypeInfoFactory.floatTypeInfo,
TypeInfoFactory.doubleTypeInfo,
TypeInfoFactory.dateTypeInfo,
TypeInfoFactory.timestampTypeInfo,
TypeInfoFactory.stringTypeInfo,
TypeInfoFactory.binaryTypeInfo,
TypeInfoFactory.getDecimalTypeInfo(9, 4)
);

for (int i = 0; i < primitives.size(); i += 1) {
Type icebergType = primitives.get(i);
PrimitiveTypeInfo hiveType = hivePrimitives.get(i);
Assert.assertEquals("Hive schema to primitive: " + hiveType, icebergType, HiveTypeUtil.convert(hiveType));
}
}

@Test
public void testConversions() {
check("struct<1: a: optional int, 2: b: optional string>", "struct<a:int,b:string>");
check("struct<5: a: optional map<string, map<string, double>>>", "struct<a:map<string,map<string,double>>>");
check("struct<2: l1: optional list<boolean>>", "struct<l1:array<boolean>>");
check("struct<3: l1: optional list<struct<1: l2: optional string>>>", "struct<l1:array<struct<l2:string>>>");
check("list<map<string, list<map<string, list<double>>>>>", "array<map<string,array<map<string,array<double>>>>>");
check("struct<" +
"6: length: optional int, 7: count: optional int, " +
"8: list: optional list<struct<1: lastword: optional string, 2: lastwordlength: optional int>>, " +
"9: wordcounts: optional map<string, int>>",
"struct<" +
"length:int,count:int,list:array<struct<lastword:string,lastwordlength:int>>," +
"wordcounts:map<string,int>>");
}

private static void check(String icebergTypeStr, String hiveTypeStr) {
Type icebergType = HiveTypeUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(hiveTypeStr));
Assert.assertEquals(icebergTypeStr, icebergType.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ public void testHiveScanMultiPartition() throws Exception {
filesMatch(ImmutableMap.of("pcol=ds/pIntCol=2/B", AVRO, "pcol=ds/pIntCol=1/A", AVRO), hiveScan(table));
}

@Test
public void testHiveScanNoAvroSchema() throws Exception {
String tableName = "hive_scan_no_avro_schema";
Table table = createTable(tableName, DATA_COLUMNS, PARTITION_COLUMNS, ORC);
addPartition(table, ImmutableList.of("ds", 1), ORC, "A");
addPartition(table, ImmutableList.of("ds", 2), ORC, "B");
filesMatch(ImmutableMap.of("pcol=ds/pIntCol=2/B", ORC, "pcol=ds/pIntCol=1/A", ORC), hiveScan(table));
}

@Test
public void testHiveScanMultiPartitionWithFilter() throws Exception {
String tableName = "multi_partition_with_filter";
Expand Down Expand Up @@ -152,6 +161,12 @@ public void testHiveScanHybridTable() throws Exception {

private static Table createTable(String tableName, List<FieldSchema> columns, List<FieldSchema> partitionColumns)
throws Exception {
return createTable(tableName, columns, partitionColumns, AVRO);
}

private static Table createTable(
String tableName, List<FieldSchema> columns, List<FieldSchema> partitionColumns, FileFormat format)
throws Exception {
long currentTimeMillis = System.currentTimeMillis();
Path tableLocation = dbPath.resolve(tableName);
Files.createDirectories(tableLocation);
Expand All @@ -161,7 +176,7 @@ private static Table createTable(String tableName, List<FieldSchema> columns, Li
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
storageDescriptor(columns, tableLocation.toString(), AVRO),
storageDescriptor(columns, tableLocation.toString(), format),
partitionColumns,
new HashMap<>(),
null,
Expand All @@ -182,6 +197,8 @@ private static StorageDescriptor storageDescriptor(List<FieldSchema> columns, St
storageDescriptor.setOutputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat");
storageDescriptor.setInputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat");
serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.avro.AvroSerDe");
storageDescriptor.setParameters(ImmutableMap.of(
AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schemaLiteral(columns)));
break;
case ORC:
storageDescriptor.setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat");
Expand All @@ -192,8 +209,6 @@ private static StorageDescriptor storageDescriptor(List<FieldSchema> columns, St
throw new UnsupportedOperationException("Unsupported file format: " + format);
}
storageDescriptor.setSerdeInfo(serDeInfo);
storageDescriptor.setParameters(ImmutableMap.of(
AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schemaLiteral(columns)));
return storageDescriptor;
}

Expand Down