Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
Expand All @@ -53,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,6 +84,8 @@ public class HiveMetadataPreservingTableOperations extends HiveTableOperations {
.impl(HiveMetaStoreClient.class, "alter_table",
String.class, String.class, Table.class, EnvironmentContext.class)
.build();
public static final String ORC_COLUMNS = "columns";
public static final String ORC_COLUMNS_TYPES = "columns.types";

protected HiveMetadataPreservingTableOperations(Configuration conf, HiveClientPool metaClients, FileIO fileIO,
String catalogName, String database, String table) {
Expand Down Expand Up @@ -224,29 +228,30 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
/**
* [LINKEDIN] Due to an issue that the table read in is sometimes corrupted and has incorrect columns, compare the
* table columns to the avro.schema.literal property (if it exists) and fix the table columns if there is a mismatch
* @return true if the schema was mismatched and fixed
*/
static void fixMismatchedSchema(Table table) {
static boolean fixMismatchedSchema(Table table) {
String avroSchemaLiteral = getAvroSchemaLiteral(table);
if (Strings.isNullOrEmpty(avroSchemaLiteral)) {
return;
return false;
}
Schema schema = new Schema.Parser().parse(avroSchemaLiteral);
List<FieldSchema> hiveCols;
try {
hiveCols = getColsFromAvroSchema(schema);
} catch (SerDeException e) {
LOG.error("Failed to get get columns from avro schema when checking schema", e);
return;
return false;
}

boolean schemaMismatched;
if (table.getSd().getCols().size() != hiveCols.size()) {
schemaMismatched = true;
} else {
Map<String, String> hiveFieldMap = hiveCols.stream().collect(
Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
Collectors.toMap(field -> field.getName().toLowerCase(), field -> field.getType().toLowerCase()));
Map<String, String> tableFieldMap = table.getSd().getCols().stream().collect(
Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
Collectors.toMap(field -> field.getName().toLowerCase(), field -> field.getType().toLowerCase()));
schemaMismatched = !hiveFieldMap.equals(tableFieldMap);
}

Expand All @@ -256,7 +261,13 @@ static void fixMismatchedSchema(Table table) {
table.getSd().getCols().stream().map(Object::toString).collect(Collectors.joining(", ")),
hiveCols.stream().map(Object::toString).collect(Collectors.joining(", ")));
table.getSd().setCols(hiveCols);
if (!Strings.isNullOrEmpty(table.getSd().getInputFormat()) && table.getSd().getInputFormat()
.contains("OrcInputFormat")) {
updateORCStorageDesc(hiveCols, table);
}
}

return schemaMismatched;
}

private static List<FieldSchema> getColsFromAvroSchema(Schema schema)
Expand All @@ -282,6 +293,22 @@ private static String getAvroSchemaLiteral(Table table) {
return schemaStr;
}

private static void updateORCStorageDesc(List<FieldSchema> hiveCols, Table table) {
String columnsString = hiveCols.stream().map(FieldSchema::getName).collect(Collectors.joining(","));
String typesString = hiveCols.stream().map(FieldSchema::getType).collect(Collectors.joining(","));

if (!table.getSd().isSetSerdeInfo()) {
table.getSd().setSerdeInfo(new SerDeInfo());
}
if (!table.getSd().getSerdeInfo().isSetParameters()) {
table.getSd().getSerdeInfo().setParameters(Maps.newHashMap());
}

Map<String, String> sdParams = table.getSd().getSerdeInfo().getParameters();
sdParams.put(ORC_COLUMNS, columnsString);
sdParams.put(ORC_COLUMNS_TYPES, typesString);
}

/**
* [LINKEDIN] a log-enhanced persistTable as a refactoring inspired by
* org.apache.iceberg.hive.HiveTableOperations#persistTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public void testFixMismatchedSchema() {
"\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"nested\"," +
"\"type\":{\"name\":\"nested\",\"type\":\"record\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\"}]}}]}";
String testSchemaLiteralWithUppercase = "{\"name\":\"testSchema\",\"type\":\"record\",\"namespace\":\"com" +
".linkedin.test\", \"fields\":[{\"name\":\"Name\",\"type\":\"string\"},{\"name\":\"ID\",\"type\":\"int\"}" +
",{\"name\":\"Nested\", \"type\":{\"name\":\"Nested\",\"type\":\"record\",\"fields\":[{\"name\":\"Field1\"," +
"\"type\":\"string\"}, {\"name\":\"Field2\",\"type\":\"string\"}]}}]}";

long currentTimeMillis = System.currentTimeMillis();
StorageDescriptor storageDescriptor = new StorageDescriptor();
Expand All @@ -49,6 +53,7 @@ public void testFixMismatchedSchema() {
// Set cols with incorrect nested type
storageDescriptor.setCols(ImmutableList.of(field1, field2, new FieldSchema("nested", "struct<field1:int," +
"field2:string>", "")));
storageDescriptor.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
Map<String, String> parameters = ImmutableMap.of("avro.schema.literal", testSchemaLiteral);
Table tbl = new Table("tableName",
"dbName",
Expand All @@ -63,10 +68,18 @@ public void testFixMismatchedSchema() {
null,
TableType.EXTERNAL_TABLE.toString());

HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl);
Assert.assertTrue(HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl));
Assert.assertEquals(3, tbl.getSd().getColsSize());
Assert.assertEquals(field1, tbl.getSd().getCols().get(0));
Assert.assertEquals(field2, tbl.getSd().getCols().get(1));
Assert.assertEquals(field3, tbl.getSd().getCols().get(2));
Assert.assertTrue(storageDescriptor.getSerdeInfo().getParameters()
.containsKey(HiveMetadataPreservingTableOperations.ORC_COLUMNS));
Assert.assertTrue(storageDescriptor.getSerdeInfo().getParameters()
.containsKey(HiveMetadataPreservingTableOperations.ORC_COLUMNS_TYPES));

// Use same schema literal but containing uppercase and check no mismatch detected
tbl.setParameters(ImmutableMap.of("avro.schema.literal", testSchemaLiteralWithUppercase));
Assert.assertFalse(HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl));
}
}