Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,25 @@
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
Expand All @@ -40,6 +50,7 @@
import org.apache.iceberg.hive.HiveClientPool;
import org.apache.iceberg.hive.HiveTableOperations;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -141,6 +152,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName));
if (tableExists) {
tbl = metaClients.run(client -> client.getTable(database, tableName));
fixMismatchedSchema(tbl);
} else {
final long currentTimeMillis = System.currentTimeMillis();
tbl = new Table(tableName,
Expand Down Expand Up @@ -209,6 +221,67 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
}
}

/**
* [LINKEDIN] Due to an issue that the table read in is sometimes corrupted and has incorrect columns, compare the
* table columns to the avro.schema.literal property (if it exists) and fix the table columns if there is a mismatch
*/
static void fixMismatchedSchema(Table table) {
String avroSchemaLiteral = getAvroSchemaLiteral(table);
if (Strings.isNullOrEmpty(avroSchemaLiteral)) {
return;
}
Schema schema = new Schema.Parser().parse(avroSchemaLiteral);
List<FieldSchema> hiveCols;
try {
hiveCols = getColsFromAvroSchema(schema);
} catch (SerDeException e) {
LOG.error("Failed to get get columns from avro schema when checking schema", e);
return;
}

boolean schemaMismatched;
if (table.getSd().getCols().size() != hiveCols.size()) {
schemaMismatched = true;
} else {
Map<String, String> hiveFieldMap = hiveCols.stream().collect(
Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
Map<String, String> tableFieldMap = table.getSd().getCols().stream().collect(
Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
schemaMismatched = !hiveFieldMap.equals(tableFieldMap);
}

if (schemaMismatched) {
LOG.warn("Schema columns don't match avro.schema.literal, setting columns to avro.schema.literal. Schema " +
"columns: {}, avro.schema.literal columns: {}",
table.getSd().getCols().stream().map(Object::toString).collect(Collectors.joining(", ")),
hiveCols.stream().map(Object::toString).collect(Collectors.joining(", ")));
table.getSd().setCols(hiveCols);
}
}

private static List<FieldSchema> getColsFromAvroSchema(Schema schema)
throws SerDeException {
AvroObjectInspectorGenerator avroOI = new AvroObjectInspectorGenerator(schema);
List<String> columnNames = avroOI.getColumnNames();
List<TypeInfo> columnTypes = avroOI.getColumnTypes();
if (columnNames.size() != columnTypes.size()) {
throw new IllegalStateException();
}

return IntStream.range(0, columnNames.size())
.mapToObj(i -> new FieldSchema(columnNames.get(i), columnTypes.get(i).getTypeName(), ""))
.collect(Collectors.toList());
}

private static String getAvroSchemaLiteral(Table table) {
String schemaStr = table.getParameters().get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
if (Strings.isNullOrEmpty(schemaStr)) {
schemaStr = table.getSd().getSerdeInfo().getParameters()
.get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
}
return schemaStr;
}

/**
* [LINKEDIN] a log-enhanced persistTable as a refactoring inspired by
* org.apache.iceberg.hive.HiveTableOperations#persistTable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.hivelink.core;

import java.util.Collections;
import java.util.Map;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;


public class TestHiveMetadataPreservingTableOperations {

@Test
public void testFixMismatchedSchema() {
// Schema literal with 3 fields (name, id, nested)
String testSchemaLiteral = "{\"name\":\"testSchema\",\"type\":\"record\",\"namespace\":\"com.linkedin.test\"," +
"\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"nested\"," +
"\"type\":{\"name\":\"nested\",\"type\":\"record\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\"}]}}]}";

long currentTimeMillis = System.currentTimeMillis();
StorageDescriptor storageDescriptor = new StorageDescriptor();
FieldSchema field1 = new FieldSchema("name", "string", "");
FieldSchema field2 = new FieldSchema("id", "int", "");
FieldSchema field3 = new FieldSchema("nested", "struct<field1:string,field2:string>", "");
// Set cols with incorrect nested type
storageDescriptor.setCols(ImmutableList.of(field1, field2, new FieldSchema("nested", "struct<field1:int," +
"field2:string>", "")));
Map<String, String> parameters = ImmutableMap.of("avro.schema.literal", testSchemaLiteral);
Table tbl = new Table("tableName",
"dbName",
System.getProperty("user.name"),
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
storageDescriptor,
Collections.emptyList(),
parameters,
null,
null,
TableType.EXTERNAL_TABLE.toString());

HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl);
Assert.assertEquals(3, tbl.getSd().getColsSize());
Assert.assertEquals(field1, tbl.getSd().getCols().get(0));
Assert.assertEquals(field2, tbl.getSd().getCols().get(1));
Assert.assertEquals(field3, tbl.getSd().getCols().get(2));
}
}