Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public static boolean isOptionSchema(Schema schema) {
return false;
}

static Schema toOption(Schema schema) {
public static Schema toOption(Schema schema) {
if (schema.getType() == UNION) {
Preconditions.checkArgument(isOptionSchema(schema),
"Union schemas are not supported: %s", schema);
Expand All @@ -145,7 +145,7 @@ static Schema toOption(Schema schema) {
}
}

static Schema fromOption(Schema schema) {
public static Schema fromOption(Schema schema) {
Preconditions.checkArgument(schema.getType() == UNION,
"Expected union schema but was passed: %s", schema);
Preconditions.checkArgument(schema.getTypes().size() == 2,
Expand Down Expand Up @@ -322,7 +322,7 @@ private static int toInt(Object value) {
throw new UnsupportedOperationException("Cannot coerce value to int: " + value);
}

static Schema copyRecord(Schema record, List<Schema.Field> newFields, String newName) {
public static Schema copyRecord(Schema record, List<Schema.Field> newFields, String newName) {
Schema copy;
if (newName != null) {
copy = Schema.createRecord(newName, record.getDoc(), null, record.isError(), newFields);
Expand All @@ -342,7 +342,7 @@ static Schema copyRecord(Schema record, List<Schema.Field> newFields, String new
return copy;
}

static Schema.Field copyField(Schema.Field field, Schema newSchema, String newName) {
public static Schema.Field copyField(Schema.Field field, Schema newSchema, String newName) {
Schema.Field copy = new Schema.Field(newName,
newSchema, field.doc(), field.defaultVal(), field.order());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
Expand All @@ -39,6 +40,8 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -53,6 +56,8 @@
* updated.
*/
public class HiveMetadataPreservingTableOperations extends HiveTableOperations {
private static final Logger LOG = LoggerFactory.getLogger(HiveMetadataPreservingTableOperations.class);

private final HiveClientPool metaClients;
private final String database;
private final String tableName;
Expand All @@ -71,6 +76,20 @@ protected HiveMetadataPreservingTableOperations(Configuration conf, HiveClientPo
this.tableName = table;
}

private static void logTable(Table table) {
String columns = "";
try {
columns = table.getSd().getCols().stream().map(column -> column.getName() + " " + column.getType())
.collect(Collectors.joining("\n"));
} catch (Throwable throwable) {
LOG.debug("Encountered {} while fetching columns for {}.{}", throwable.getMessage(),
table.getDbName(), table.getTableName(), throwable);
return;
}
LOG.debug("Table: {}.{}", table.getDbName(), table.getTableName());
LOG.debug("Columns: \n{}", columns);
}

@Override
protected void doRefresh() {
String metadataLocation = null;
Expand Down Expand Up @@ -132,6 +151,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName));
if (tableExists) {
tbl = metaClients.run(client -> client.getTable(database, tableName));
LOG.debug("Following table has been fetched from metastore:");
logTable(tbl);
} else {
final long currentTimeMillis = System.currentTimeMillis();
tbl = new Table(tableName,
Expand Down Expand Up @@ -166,6 +187,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
EnvironmentContext envContext = new EnvironmentContext(
ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE)
);
LOG.debug("Updating the metadata location of the following table:");
logTable(tbl);
LOG.debug("Metadata Location: {}", tbl.getParameters().get(METADATA_LOCATION_PROP));
ALTER_TABLE.invoke(client, database, tableName, tbl, envContext);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hive.legacy;

import java.util.List;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
* A Hive {@link TypeInfo} visitor with an accompanying partner schema
*
* This visitor traverses the Hive {@link TypeInfo} tree contiguously accessing the schema tree for the partner schema
* using {@link PartnerAccessor}. When visiting each type in the Hive tree, the implementation is also presented
* with the corresponding type from the partner schema, or else a {@code null} if no match was found. Matching
* behavior can be controlled by implementing the methods in {@link PartnerAccessor}
*
* @param <P> type of partner schema
* @param <FP> type of the field representation in the partner schema
* @param <R> type of the resultant schema generated by the visitor
* @param <FR> type of the field representation in the resultant schema
*/
@SuppressWarnings("ClassTypeParameterName")
public abstract class HiveSchemaWithPartnerVisitor<P, FP, R, FR> {

/**
* Methods to access types in the partner schema corresponding to types in the Hive schema being traversed
*
* @param <P> type of partner schema
* @param <FP> type of the field representation in the partner schema
*/
public interface PartnerAccessor<P, FP> {

FP fieldPartner(P partnerStruct, String fieldName);

P fieldType(FP partnerField);

P mapKeyPartner(P partnerMap);

P mapValuePartner(P partnerMap);

P listElementPartner(P partnerList);
}

@SuppressWarnings("MethodTypeParameterName")
public static <P, FP, R, FR> R visit(TypeInfo typeInfo, P partner, HiveSchemaWithPartnerVisitor<P, FP, R, FR> visitor,
PartnerAccessor<P, FP> accessor) {
switch (typeInfo.getCategory()) {
case STRUCT:
StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
List<String> names = structTypeInfo.getAllStructFieldNames();
List<FR> results = Lists.newArrayListWithExpectedSize(names.size());
for (String name : names) {
TypeInfo fieldTypeInfo = structTypeInfo.getStructFieldTypeInfo(name);
FP fieldPartner = partner != null ? accessor.fieldPartner(partner, name) : null;
P fieldPartnerType = fieldPartner != null ? accessor.fieldType(fieldPartner) : null;
R result = visit(fieldTypeInfo, fieldPartnerType, visitor, accessor);
results.add(visitor.field(name, fieldTypeInfo, fieldPartner, result));
}
return visitor.struct(structTypeInfo, partner, results);

case LIST:
ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
P elementPartner = partner != null ? accessor.listElementPartner(partner) : null;
R elementResult = visit(elementTypeInfo, elementPartner, visitor, accessor);
return visitor.list(listTypeInfo, partner, elementResult);

case MAP:
MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
P keyPartner = partner != null ? accessor.mapKeyPartner(partner) : null;
R keyResult = visit(mapTypeInfo.getMapKeyTypeInfo(), keyPartner, visitor, accessor);
P valuePartner = partner != null ? accessor.mapValuePartner(partner) : null;
R valueResult = visit(mapTypeInfo.getMapValueTypeInfo(), valuePartner, visitor, accessor);
return visitor.map(mapTypeInfo, partner, keyResult, valueResult);

case PRIMITIVE:
return visitor.primitive((PrimitiveTypeInfo) typeInfo, partner);

case UNION:
throw new UnsupportedOperationException("Union data type not supported: " + typeInfo);

default:
throw new UnsupportedOperationException(typeInfo + " not supported");
}
}

public R struct(StructTypeInfo struct, P partner, List<FR> fieldResults) {
return null;
}

public FR field(String name, TypeInfo field, FP partner, R fieldResult) {
return null;
}

public R list(ListTypeInfo list, P partner, R elementResult) {
return null;
}

public R map(MapTypeInfo map, P partner, R keyResult, R valueResult) {
return null;
}

public R primitive(PrimitiveTypeInfo primitive, P partner) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
Expand All @@ -35,6 +36,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.AvroSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Conversions;
Expand All @@ -54,12 +56,25 @@ private LegacyHiveTableUtils() {
static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) {
Map<String, String> props = getTableProperties(table);
String schemaStr = props.get("avro.schema.literal");
org.apache.avro.Schema avroSchema = schemaStr != null ? new org.apache.avro.Schema.Parser().parse(schemaStr) : null;
Schema schema;
if (schemaStr != null) {
schema = AvroSchemaUtil.toIceberg(new org.apache.avro.Schema.Parser().parse(schemaStr));
if (avroSchema != null) {
String serde = table.getSd().getSerdeInfo().getSerializationLib();
org.apache.avro.Schema finalAvroSchema;
if (serde.equals("org.apache.hadoop.hive.serde2.avro.AvroSerDe") ||
HasDuplicateLowercaseColumnNames.visit(avroSchema)) {
// Case 1: If serde == AVRO, early escape; Hive column info is not reliable and can be empty for these tables
// Hive itself uses avro.schema.literal as source of truth for these tables, so this should be fine
// Case 2: If avro.schema.literal has duplicate column names when lowercased, that means we cannot do reliable
// matching with Hive schema as multiple Avro fields can map to the same Hive field
finalAvroSchema = avroSchema;
} else {
finalAvroSchema = MergeHiveSchemaWithAvro.visit(structTypeInfoFromCols(table.getSd().getCols()), avroSchema);
}
schema = AvroSchemaUtil.toIceberg(finalAvroSchema);
} else {
// TODO: Do we need to support column and column.types properties for ORC tables?
LOG.warn("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. " +
LOG.info("Table {}.{} does not have an avro.schema.literal set; using Hive schema instead. " +
"The schema will not have case sensitivity and nullability information",
table.getDbName(), table.getTableName());
Type icebergType = HiveTypeUtil.convert(structTypeInfoFromCols(table.getSd().getCols()));
Expand All @@ -74,7 +89,7 @@ static Schema getSchema(org.apache.hadoop.hive.metastore.api.Table table) {
return new Schema(fields);
}

static TypeInfo structTypeInfoFromCols(List<FieldSchema> cols) {
static StructTypeInfo structTypeInfoFromCols(List<FieldSchema> cols) {
Preconditions.checkArgument(cols != null && cols.size() > 0, "No Hive schema present");
List<String> fieldNames = cols
.stream()
Expand All @@ -84,7 +99,7 @@ static TypeInfo structTypeInfoFromCols(List<FieldSchema> cols) {
.stream()
.map(f -> TypeInfoUtils.getTypeInfoFromTypeString(f.getType()))
.collect(Collectors.toList());
return TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypeInfos);
return (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(fieldNames, fieldTypeInfos);
}

private static Schema partitionSchema(List<FieldSchema> partitionKeys, Schema dataSchema) {
Expand Down Expand Up @@ -169,4 +184,39 @@ private static FileFormat serdeToFileFormat(String serde) {
throw new IllegalArgumentException("Unrecognized serde: " + serde);
}
}

private static class HasDuplicateLowercaseColumnNames extends AvroSchemaVisitor<Boolean> {
private static final HasDuplicateLowercaseColumnNames INSTANCE = new HasDuplicateLowercaseColumnNames();

private static boolean visit(org.apache.avro.Schema schema) {
return AvroSchemaVisitor.visit(schema, INSTANCE);
}

@Override
public Boolean record(org.apache.avro.Schema record, List<String> names, List<Boolean> fieldResults) {
return fieldResults.stream().anyMatch(x -> x) ||
names.stream().collect(Collectors.groupingBy(String::toLowerCase))
.values().stream().anyMatch(x -> x.size() > 1);
}

@Override
public Boolean union(org.apache.avro.Schema union, List<Boolean> optionResults) {
return optionResults.stream().anyMatch(x -> x);
}

@Override
public Boolean array(org.apache.avro.Schema array, Boolean elementResult) {
return elementResult;
}

@Override
public Boolean map(org.apache.avro.Schema map, Boolean valueResult) {
return valueResult;
}

@Override
public Boolean primitive(org.apache.avro.Schema primitive) {
return false;
}
}
}
Loading