diff --git a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java index 5790f88e8a..40c8a9c9ec 100644 --- a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java +++ b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator; @@ -53,6 +54,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +84,8 @@ public class HiveMetadataPreservingTableOperations extends HiveTableOperations { .impl(HiveMetaStoreClient.class, "alter_table", String.class, String.class, Table.class, EnvironmentContext.class) .build(); + public static final String ORC_COLUMNS = "columns"; + public static final String ORC_COLUMNS_TYPES = "columns.types"; protected HiveMetadataPreservingTableOperations(Configuration conf, HiveClientPool metaClients, FileIO fileIO, String catalogName, String database, String table) { @@ -224,11 +228,12 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { /** * [LINKEDIN] Due to an issue that the table read in is sometimes corrupted and has incorrect columns, compare the * table columns to the avro.schema.literal property (if it exists) and fix the table columns if there is a mismatch + * @return true if the schema was mismatched and fixed */ - static void fixMismatchedSchema(Table table) { + static boolean fixMismatchedSchema(Table table) { String avroSchemaLiteral = getAvroSchemaLiteral(table); if (Strings.isNullOrEmpty(avroSchemaLiteral)) { - return; + return false; } Schema schema = new Schema.Parser().parse(avroSchemaLiteral); List hiveCols; @@ -236,7 +241,7 @@ static void fixMismatchedSchema(Table table) { hiveCols = getColsFromAvroSchema(schema); } catch (SerDeException e) { LOG.error("Failed to get get columns from avro schema when checking schema", e); - return; + return false; } boolean schemaMismatched; @@ -244,9 +249,9 @@ static void fixMismatchedSchema(Table table) { schemaMismatched = true; } else { Map hiveFieldMap = hiveCols.stream().collect( - Collectors.toMap(FieldSchema::getName, FieldSchema::getType)); + Collectors.toMap(field -> field.getName().toLowerCase(), field -> field.getType().toLowerCase())); Map tableFieldMap = table.getSd().getCols().stream().collect( - Collectors.toMap(FieldSchema::getName, FieldSchema::getType)); + Collectors.toMap(field -> field.getName().toLowerCase(), field -> field.getType().toLowerCase())); schemaMismatched = !hiveFieldMap.equals(tableFieldMap); } @@ -256,7 +261,13 @@ static void fixMismatchedSchema(Table table) { table.getSd().getCols().stream().map(Object::toString).collect(Collectors.joining(", ")), hiveCols.stream().map(Object::toString).collect(Collectors.joining(", "))); table.getSd().setCols(hiveCols); + if (!Strings.isNullOrEmpty(table.getSd().getInputFormat()) && table.getSd().getInputFormat() + .contains("OrcInputFormat")) { + updateORCStorageDesc(hiveCols, table); + } } + + return schemaMismatched; } private static List getColsFromAvroSchema(Schema schema) @@ -282,6 +293,22 @@ private static String getAvroSchemaLiteral(Table table) { return schemaStr; } + private static void updateORCStorageDesc(List hiveCols, Table table) { + String columnsString = hiveCols.stream().map(FieldSchema::getName).collect(Collectors.joining(",")); + String typesString = hiveCols.stream().map(FieldSchema::getType).collect(Collectors.joining(",")); + + if (!table.getSd().isSetSerdeInfo()) { + table.getSd().setSerdeInfo(new SerDeInfo()); + } + if (!table.getSd().getSerdeInfo().isSetParameters()) { + table.getSd().getSerdeInfo().setParameters(Maps.newHashMap()); + } + + Map sdParams = table.getSd().getSerdeInfo().getParameters(); + sdParams.put(ORC_COLUMNS, columnsString); + sdParams.put(ORC_COLUMNS_TYPES, typesString); + } + /** * [LINKEDIN] a log-enhanced persistTable as a refactoring inspired by * org.apache.iceberg.hive.HiveTableOperations#persistTable diff --git a/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestHiveMetadataPreservingTableOperations.java b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestHiveMetadataPreservingTableOperations.java index e26166eb3e..cbe3d4acc5 100644 --- a/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestHiveMetadataPreservingTableOperations.java +++ b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestHiveMetadataPreservingTableOperations.java @@ -40,6 +40,10 @@ public void testFixMismatchedSchema() { "\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"nested\"," + "\"type\":{\"name\":\"nested\",\"type\":\"record\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," + "{\"name\":\"field2\",\"type\":\"string\"}]}}]}"; + String testSchemaLiteralWithUppercase = "{\"name\":\"testSchema\",\"type\":\"record\",\"namespace\":\"com" + + ".linkedin.test\", \"fields\":[{\"name\":\"Name\",\"type\":\"string\"},{\"name\":\"ID\",\"type\":\"int\"}" + + ",{\"name\":\"Nested\", \"type\":{\"name\":\"Nested\",\"type\":\"record\",\"fields\":[{\"name\":\"Field1\"," + + "\"type\":\"string\"}, {\"name\":\"Field2\",\"type\":\"string\"}]}}]}"; long currentTimeMillis = System.currentTimeMillis(); StorageDescriptor storageDescriptor = new StorageDescriptor(); @@ -49,6 +53,7 @@ public void testFixMismatchedSchema() { // Set cols with incorrect nested type storageDescriptor.setCols(ImmutableList.of(field1, field2, new FieldSchema("nested", "struct", ""))); + storageDescriptor.setInputFormat("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); Map parameters = ImmutableMap.of("avro.schema.literal", testSchemaLiteral); Table tbl = new Table("tableName", "dbName", @@ -63,10 +68,18 @@ public void testFixMismatchedSchema() { null, TableType.EXTERNAL_TABLE.toString()); - HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl); + Assert.assertTrue(HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl)); Assert.assertEquals(3, tbl.getSd().getColsSize()); Assert.assertEquals(field1, tbl.getSd().getCols().get(0)); Assert.assertEquals(field2, tbl.getSd().getCols().get(1)); Assert.assertEquals(field3, tbl.getSd().getCols().get(2)); + Assert.assertTrue(storageDescriptor.getSerdeInfo().getParameters() + .containsKey(HiveMetadataPreservingTableOperations.ORC_COLUMNS)); + Assert.assertTrue(storageDescriptor.getSerdeInfo().getParameters() + .containsKey(HiveMetadataPreservingTableOperations.ORC_COLUMNS_TYPES)); + + // Use same schema literal but containing uppercase and check no mismatch detected + tbl.setParameters(ImmutableMap.of("avro.schema.literal", testSchemaLiteralWithUppercase)); + Assert.assertFalse(HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl)); } }