diff --git a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java index aa557b847e..5790f88e8a 100644 --- a/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java +++ b/hivelink-core/src/main/java/org/apache/iceberg/hivelink/core/HiveMetadataPreservingTableOperations.java @@ -22,15 +22,25 @@ import java.net.UnknownHostException; import java.util.Collections; import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.exceptions.AlreadyExistsException; @@ -40,6 +50,7 @@ import org.apache.iceberg.hive.HiveClientPool; import org.apache.iceberg.hive.HiveTableOperations; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Strings; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.thrift.TException; @@ -141,6 +152,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { boolean tableExists = metaClients.run(client -> client.tableExists(database, tableName)); if (tableExists) { tbl = metaClients.run(client -> client.getTable(database, tableName)); + fixMismatchedSchema(tbl); } else { final long currentTimeMillis = System.currentTimeMillis(); tbl = new Table(tableName, @@ -209,6 +221,67 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } } + /** + * [LINKEDIN] Due to an issue that the table read in is sometimes corrupted and has incorrect columns, compare the + * table columns to the avro.schema.literal property (if it exists) and fix the table columns if there is a mismatch + */ + static void fixMismatchedSchema(Table table) { + String avroSchemaLiteral = getAvroSchemaLiteral(table); + if (Strings.isNullOrEmpty(avroSchemaLiteral)) { + return; + } + Schema schema = new Schema.Parser().parse(avroSchemaLiteral); + List hiveCols; + try { + hiveCols = getColsFromAvroSchema(schema); + } catch (SerDeException e) { + LOG.error("Failed to get get columns from avro schema when checking schema", e); + return; + } + + boolean schemaMismatched; + if (table.getSd().getCols().size() != hiveCols.size()) { + schemaMismatched = true; + } else { + Map hiveFieldMap = hiveCols.stream().collect( + Collectors.toMap(FieldSchema::getName, FieldSchema::getType)); + Map tableFieldMap = table.getSd().getCols().stream().collect( + Collectors.toMap(FieldSchema::getName, FieldSchema::getType)); + schemaMismatched = !hiveFieldMap.equals(tableFieldMap); + } + + if (schemaMismatched) { + LOG.warn("Schema columns don't match avro.schema.literal, setting columns to avro.schema.literal. Schema " + + "columns: {}, avro.schema.literal columns: {}", + table.getSd().getCols().stream().map(Object::toString).collect(Collectors.joining(", ")), + hiveCols.stream().map(Object::toString).collect(Collectors.joining(", "))); + table.getSd().setCols(hiveCols); + } + } + + private static List getColsFromAvroSchema(Schema schema) + throws SerDeException { + AvroObjectInspectorGenerator avroOI = new AvroObjectInspectorGenerator(schema); + List columnNames = avroOI.getColumnNames(); + List columnTypes = avroOI.getColumnTypes(); + if (columnNames.size() != columnTypes.size()) { + throw new IllegalStateException(); + } + + return IntStream.range(0, columnNames.size()) + .mapToObj(i -> new FieldSchema(columnNames.get(i), columnTypes.get(i).getTypeName(), "")) + .collect(Collectors.toList()); + } + + private static String getAvroSchemaLiteral(Table table) { + String schemaStr = table.getParameters().get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()); + if (Strings.isNullOrEmpty(schemaStr)) { + schemaStr = table.getSd().getSerdeInfo().getParameters() + .get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()); + } + return schemaStr; + } + /** * [LINKEDIN] a log-enhanced persistTable as a refactoring inspired by * org.apache.iceberg.hive.HiveTableOperations#persistTable diff --git a/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestHiveMetadataPreservingTableOperations.java b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestHiveMetadataPreservingTableOperations.java new file mode 100644 index 0000000000..e26166eb3e --- /dev/null +++ b/hivelink-core/src/test/java/org/apache/iceberg/hivelink/core/TestHiveMetadataPreservingTableOperations.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.hivelink.core; + +import java.util.Collections; +import java.util.Map; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + + +public class TestHiveMetadataPreservingTableOperations { + + @Test + public void testFixMismatchedSchema() { + // Schema literal with 3 fields (name, id, nested) + String testSchemaLiteral = "{\"name\":\"testSchema\",\"type\":\"record\",\"namespace\":\"com.linkedin.test\"," + + "\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"nested\"," + + "\"type\":{\"name\":\"nested\",\"type\":\"record\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," + + "{\"name\":\"field2\",\"type\":\"string\"}]}}]}"; + + long currentTimeMillis = System.currentTimeMillis(); + StorageDescriptor storageDescriptor = new StorageDescriptor(); + FieldSchema field1 = new FieldSchema("name", "string", ""); + FieldSchema field2 = new FieldSchema("id", "int", ""); + FieldSchema field3 = new FieldSchema("nested", "struct", ""); + // Set cols with incorrect nested type + storageDescriptor.setCols(ImmutableList.of(field1, field2, new FieldSchema("nested", "struct", ""))); + Map parameters = ImmutableMap.of("avro.schema.literal", testSchemaLiteral); + Table tbl = new Table("tableName", + "dbName", + System.getProperty("user.name"), + (int) currentTimeMillis / 1000, + (int) currentTimeMillis / 1000, + Integer.MAX_VALUE, + storageDescriptor, + Collections.emptyList(), + parameters, + null, + null, + TableType.EXTERNAL_TABLE.toString()); + + HiveMetadataPreservingTableOperations.fixMismatchedSchema(tbl); + Assert.assertEquals(3, tbl.getSd().getColsSize()); + Assert.assertEquals(field1, tbl.getSd().getCols().get(0)); + Assert.assertEquals(field2, tbl.getSd().getCols().get(1)); + Assert.assertEquals(field3, tbl.getSd().getCols().get(2)); + } +}