diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 62bcbf684b836..69005cd75332c 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} import org.apache.spark.sql.{Dataset, Row, SparkSession} import scala.collection.JavaConversions._ @@ -144,7 +144,7 @@ object AvroConversionUtils { def convertStructTypeToAvroSchema(structType: DataType, structName: String, recordNamespace: String): Schema = { - getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)) + getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace), structType) } /** @@ -154,13 +154,20 @@ object AvroConversionUtils { * @param schema input avro schema * @return Avro schema with null default set to nullable fields */ - def getAvroSchemaWithDefaults(schema: Schema): Schema = { + def getAvroSchemaWithDefaults(schema: Schema, dataType: DataType): Schema = { schema.getType match { case Schema.Type.RECORD => { - + val structType = dataType.asInstanceOf[StructType] + val structFields = structType.fields val modifiedFields = schema.getFields.map(field => { - val newSchema = getAvroSchemaWithDefaults(field.schema()) + val i: Int = structType.fieldIndex(field.name()) + val comment: String = if (structFields(i).metadata.contains("comment")) { + structFields(i).metadata.getString("comment") + } else { + field.doc() + } + val newSchema = getAvroSchemaWithDefaults(field.schema(), structFields(i).dataType) field.schema().getType match { case Schema.Type.UNION => { val innerFields = newSchema.getTypes @@ -168,27 +175,27 @@ object AvroConversionUtils { if(containsNullSchema) { // Need to re shuffle the fields in list because to set null as default, null schema must be head in union schema val restructuredNewSchema = Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++ innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL))) - new Schema.Field(field.name(), restructuredNewSchema, field.doc(), JsonProperties.NULL_VALUE) + new Schema.Field(field.name(), restructuredNewSchema, comment, JsonProperties.NULL_VALUE) } else { - new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal()) + new Schema.Field(field.name(), newSchema, comment, field.defaultVal()) } } - case _ => new Schema.Field(field.name(), newSchema, field.doc(), field.defaultVal()) + case _ => new Schema.Field(field.name(), newSchema, comment, field.defaultVal()) } }).toList Schema.createRecord(schema.getName, schema.getDoc, schema.getNamespace, schema.isError, modifiedFields) } case Schema.Type.UNION => { - Schema.createUnion(schema.getTypes.map(innerSchema => getAvroSchemaWithDefaults(innerSchema))) + Schema.createUnion(schema.getTypes.map(innerSchema => getAvroSchemaWithDefaults(innerSchema, dataType))) } case Schema.Type.MAP => { - Schema.createMap(getAvroSchemaWithDefaults(schema.getValueType)) + Schema.createMap(getAvroSchemaWithDefaults(schema.getValueType, dataType.asInstanceOf[MapType].valueType)) } case Schema.Type.ARRAY => { - Schema.createArray(getAvroSchemaWithDefaults(schema.getElementType)) + Schema.createArray(getAvroSchemaWithDefaults(schema.getElementType, dataType.asInstanceOf[ArrayType].elementType)) } case _ => schema diff --git a/hudi-common/src/test/resources/simple-test-doced.avsc b/hudi-common/src/test/resources/simple-test-doced.avsc new file mode 100644 index 0000000000000..f6b53aff8ee8a --- /dev/null +++ b/hudi-common/src/test/resources/simple-test-doced.avsc @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ +"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string","doc":"name_comment"}, + {"name": "favorite_number", "type": "int","doc":"favorite_number_comment"}, + {"name": "favorite_color", "type": "string"} + ] +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index ead806bca5c15..634389b1e68b1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -316,6 +316,8 @@ public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String b if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION())) { hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION()); } + hiveSyncConfig.syncComment = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT().key(), + DataSourceWriteOptions.HIVE_SYNC_COMMENT().defaultValue())); return hiveSyncConfig; } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 8a98657f242e2..530e435783696 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -555,6 +555,11 @@ object DataSourceWriteOptions { .withDocumentation("Whether sync hive metastore bucket specification when using bucket index." + "The specification is 'CLUSTERED BY (trace_id) SORTED BY (trace_id ASC) INTO 65536 BUCKETS'") + val HIVE_SYNC_COMMENT: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.hive_sync.sync_comment") + .defaultValue("false") + .withDocumentation("Whether to sync the table column comments while syncing the table.") + // Async Compaction - Enabled by default for MOR val ASYNC_COMPACT_ENABLE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.compaction.async.enable") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 022eef5fff6a5..5919d4c0b270c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -600,6 +600,7 @@ object HoodieSparkSqlWriter { hiveSyncConfig.serdeProperties = hoodieConfig.getString(HIVE_TABLE_SERDE_PROPERTIES) hiveSyncConfig.tableProperties = hoodieConfig.getString(HIVE_TABLE_PROPERTIES) hiveSyncConfig.sparkVersion = SPARK_VERSION + hiveSyncConfig.syncComment = hoodieConfig.getStringOrDefault(HIVE_SYNC_COMMENT).toBoolean hiveSyncConfig } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala index 50137c9a580a3..bacd44753df35 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala @@ -161,4 +161,220 @@ class TestAvroConversionUtils extends FunSuite with Matchers { assert(avroSchema.equals(expectedAvroSchema)) } + + test("test convertStructTypeToAvroSchema with Nested StructField comment") { + val mapType = DataTypes.createMapType(StringType, new StructType().add("mapKey", "string", false, "mapKeyComment").add("mapVal", "integer", true)) + val arrayType = ArrayType(new StructType().add("arrayKey", "string", false).add("arrayVal", "integer", true, "arrayValComment")) + val innerStruct = new StructType().add("innerKey","string",false, "innerKeyComment").add("value", "long", true, "valueComment") + + val struct = new StructType().add("key", "string", false).add("version", "string", true, "versionComment") + .add("data1",innerStruct,false).add("data2",innerStruct,true) + .add("nullableMap", mapType, true).add("map",mapType,false) + .add("nullableArray", arrayType, true).add("array",arrayType,false) + + val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct, "SchemaName", "SchemaNS") + + val expectedSchemaStr = s""" + { + "type": "record", + "name": "SchemaName", + "namespace": "SchemaNS", + "fields": [ + { + "name": "key", + "type": "string" + }, + { + "name": "version", + "type": [ + "null", + "string" + ], + "doc": "versionComment", + "default": null + }, + { + "name": "data1", + "type": { + "type": "record", + "name": "data1", + "namespace": "SchemaNS.SchemaName", + "fields": [ + { + "name": "innerKey", + "type": "string", + "doc": "innerKeyComment" + }, + { + "name": "value", + "type": [ + "null", + "long" + ], + "doc": "valueComment", + "default": null + } + ] + } + }, + { + "name": "data2", + "type": [ + "null", + { + "type": "record", + "name": "data2", + "namespace": "SchemaNS.SchemaName", + "fields": [ + { + "name": "innerKey", + "type": "string", + "doc": "innerKeyComment" + }, + { + "name": "value", + "type": [ + "null", + "long" + ], + "doc": "valueComment", + "default": null + } + ] + } + ], + "default": null + }, + { + "name": "nullableMap", + "type": [ + "null", + { + "type": "map", + "values": [ + { + "type": "record", + "name": "nullableMap", + "namespace": "SchemaNS.SchemaName", + "fields": [ + { + "name": "mapKey", + "type": "string", + "doc": "mapKeyComment" + }, + { + "name": "mapVal", + "type": [ + "null", + "int" + ], + "default": null + } + ] + }, + "null" + ] + } + ], + "default": null + }, + { + "name": "map", + "type": { + "type": "map", + "values": [ + { + "type": "record", + "name": "map", + "namespace": "SchemaNS.SchemaName", + "fields": [ + { + "name": "mapKey", + "type": "string", + "doc": "mapKeyComment" + }, + { + "name": "mapVal", + "type": [ + "null", + "int" + ], + "default": null + } + ] + }, + "null" + ] + } + }, + { + "name": "nullableArray", + "type": [ + "null", + { + "type": "array", + "items": [ + { + "type": "record", + "name": "nullableArray", + "namespace": "SchemaNS.SchemaName", + "fields": [ + { + "name": "arrayKey", + "type": "string" + }, + { + "name": "arrayVal", + "type": [ + "null", + "int" + ], + "doc": "arrayValComment", + "default": null + } + ] + }, + "null" + ] + } + ], + "default": null + }, + { + "name": "array", + "type": { + "type": "array", + "items": [ + { + "type": "record", + "name": "array", + "namespace": "SchemaNS.SchemaName", + "fields": [ + { + "name": "arrayKey", + "type": "string" + }, + { + "name": "arrayVal", + "type": [ + "null", + "int" + ], + "doc": "arrayValComment", + "default": null + } + ] + }, + "null" + ] + } + } + ] + }} + """ + + val expectedAvroSchema = new Schema.Parser().parse(expectedSchemaStr) + + assert(avroSchema.equals(expectedAvroSchema)) + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 50991852b2c3b..8ee9daa836648 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -132,6 +132,9 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--spark-version"}, description = "The spark version", required = false) public String sparkVersion; + @Parameter(names = {"--sync-comment"}, description = "synchronize table comments to hive") + public boolean syncComment = false; + // enhance the similar function in child class public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); @@ -159,6 +162,7 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) { newConfig.withOperationField = cfg.withOperationField; newConfig.isConditionalSync = cfg.isConditionalSync; newConfig.sparkVersion = cfg.sparkVersion; + newConfig.syncComment = cfg.syncComment; return newConfig; } @@ -193,6 +197,7 @@ public String toString() { + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold + ", withOperationField=" + withOperationField + ", isConditionalSync=" + isConditionalSync + + ", syncComment=" + syncComment + '}'; } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 35200216ee9c0..952742b913330 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -19,9 +19,11 @@ package org.apache.hudi.hive; import com.beust.jcommander.JCommander; +import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; @@ -37,6 +39,7 @@ import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.sync.common.AbstractSyncTool; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.GroupType; @@ -261,6 +264,19 @@ private boolean syncSchema(String tableName, boolean tableExists, boolean useRea LOG.info("No Schema difference for " + tableName); } } + + if (cfg.syncComment) { + Schema avroSchemaWithoutMetadataFields = hoodieHiveClient.getAvroSchemaWithoutMetadataFields(); + Map newComments = avroSchemaWithoutMetadataFields.getFields() + .stream().collect(Collectors.toMap(Schema.Field::name, field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc())); + boolean allEmpty = newComments.values().stream().allMatch(StringUtils::isNullOrEmpty); + if (!allEmpty) { + List hiveSchema = hoodieHiveClient.getTableCommentUsingMetastoreClient(tableName); + hoodieHiveClient.updateTableComments(tableName, hiveSchema, avroSchemaWithoutMetadataFields.getFields()); + } else { + LOG.info(String.format("No comment %s need to add", tableName)); + } + } return schemaChanged; } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 1f8bcdf1c95b3..70a88a7aabbd1 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -19,21 +19,27 @@ package org.apache.hudi.hive; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.hive.util.HiveSchemaUtil; -import org.apache.hudi.sync.common.AbstractSyncHoodieClient; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.hive.ddl.DDLExecutor; import org.apache.hudi.hive.ddl.HMSDDLExecutor; import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor; import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.hive.ddl.JDBCExecutor; +import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient; +import org.apache.hudi.sync.common.HoodieSyncException; + +import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; @@ -46,7 +52,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP; @@ -343,4 +351,43 @@ public void updateLastCommitTimeSynced(String tableName) { } } } + + public Schema getAvroSchemaWithoutMetadataFields() { + try { + return new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields(); + } catch (Exception e) { + throw new HoodieSyncException("Failed to read avro schema", e); + } + } + + public List getTableCommentUsingMetastoreClient(String tableName) { + try { + return client.getSchema(syncConfig.databaseName, tableName); + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed to get table comments for : " + tableName, e); + } + } + + public void updateTableComments(String tableName, List oldSchema, List newSchema) { + Map newComments = newSchema.stream().collect(Collectors.toMap(field -> field.name().toLowerCase(Locale.ROOT), field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc())); + updateTableComments(tableName,oldSchema,newComments); + } + + public void updateTableComments(String tableName, List oldSchema, Map newComments) { + Map oldComments = oldSchema.stream().collect(Collectors.toMap(fieldSchema -> fieldSchema.getName().toLowerCase(Locale.ROOT), + fieldSchema -> StringUtils.isNullOrEmpty(fieldSchema.getComment()) ? "" : fieldSchema.getComment())); + Map types = oldSchema.stream().collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType)); + Map> alterComments = new HashMap<>(); + oldComments.forEach((name,comment) -> { + String newComment = newComments.getOrDefault(name,""); + if (!newComment.equals(comment)) { + alterComments.put(name,new ImmutablePair<>(types.get(name),newComment)); + } + }); + if (alterComments.size() > 0) { + ddlExecutor.updateTableComments(tableName, alterComments); + } else { + LOG.info(String.format("No comment difference of %s ",tableName)); + } + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java index a7228584f611a..8cab505f1465b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java @@ -18,6 +18,8 @@ package org.apache.hudi.hive.ddl; +import org.apache.hudi.common.util.collection.ImmutablePair; + import org.apache.parquet.schema.MessageType; import java.util.List; @@ -89,5 +91,13 @@ public void createTable(String tableName, MessageType storageSchema, String inpu */ public void dropPartitionsToTable(String tableName, List partitionsToDrop); + /** + * update table comments + * + * @param tableName + * @param newSchema + */ + public void updateTableComments(String tableName, Map> newSchema); + public void close(); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java index c3c5226cd0a45..f2e9905350192 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java @@ -18,9 +18,9 @@ package org.apache.hudi.hive.ddl; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.StorageSchemes; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.PartitionValueExtractor; @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; @@ -247,6 +248,27 @@ public void dropPartitionsToTable(String tableName, List partitionsToDro } } + @Override + public void updateTableComments(String tableName, Map> alterSchema) { + try { + Table table = client.getTable(syncConfig.databaseName, tableName); + StorageDescriptor sd = new StorageDescriptor(table.getSd()); + for (FieldSchema fieldSchema : sd.getCols()) { + if (alterSchema.containsKey(fieldSchema.getName())) { + String comment = alterSchema.get(fieldSchema.getName()).getRight(); + fieldSchema.setComment(comment); + } + } + table.setSd(sd); + EnvironmentContext environmentContext = new EnvironmentContext(); + client.alter_table_with_environmentContext(syncConfig.databaseName, tableName, table, environmentContext); + sd.clear(); + } catch (Exception e) { + LOG.error("Failed to update table comments for " + tableName, e); + throw new HoodieHiveSyncException("Failed to update table comments for " + tableName, e); + } + } + @Override public void close() { if (client != null) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index a1cc7721053c5..d9b663ccb00ca 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.StorageSchemes; import org.apache.hudi.common.util.PartitionPathEncodeUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; import org.apache.hudi.hive.PartitionValueExtractor; @@ -128,6 +129,24 @@ public void updatePartitionsToTable(String tableName, List changedPartit } } + @Override + public void updateTableComments(String tableName, Map> newSchema) { + for (Map.Entry> field : newSchema.entrySet()) { + String name = field.getKey(); + StringBuilder sql = new StringBuilder(); + String type = field.getValue().getLeft(); + String comment = field.getValue().getRight(); + comment = comment.replace("'",""); + sql.append("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER) + .append(config.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".") + .append(HIVE_ESCAPE_CHARACTER).append(tableName) + .append(HIVE_ESCAPE_CHARACTER) + .append(" CHANGE COLUMN `").append(name).append("` `").append(name) + .append("` ").append(type).append(" comment '").append(comment).append("' "); + runSQL(sql.toString()); + } + } + private List constructAddPartitions(String tableName, List partitions) { if (config.batchSyncNum <= 0) { throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter"); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 9a1012b649f47..9fc87fcb456b0 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -25,6 +25,8 @@ import org.apache.hudi.common.testutils.NetworkTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.hive.testutils.HiveTestUtil; import org.apache.hudi.hive.util.ConfigUtils; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; @@ -33,6 +35,7 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.Driver; @@ -52,7 +55,9 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.hudi.hive.testutils.HiveTestUtil.ddlExecutor; import static org.apache.hudi.hive.testutils.HiveTestUtil.fileSystem; @@ -524,6 +529,77 @@ public void testSyncIncrementalWithSchemaEvolution(String syncMode) throws Excep "The last commit that was synced should be 101"); } + @ParameterizedTest + @MethodSource("syncMode") + public void testUpdateTableComments(String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; + String commitTime = "100"; + HiveTestUtil.createCOWTableWithSchema(commitTime, "/simple-test.avsc"); + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + HoodieHiveClient hiveClient = + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + + Map> alterCommentSchema = new HashMap<>(); + //generate commented schema field + Schema schema = SchemaTestUtil.getSchemaFromResource(HiveTestUtil.class, "/simple-test.avsc"); + Schema commentedSchema = SchemaTestUtil.getSchemaFromResource(HiveTestUtil.class, "/simple-test-doced.avsc"); + Map fieldsNameAndDoc = commentedSchema.getFields().stream().collect(Collectors.toMap(field -> field.name().toLowerCase(Locale.ROOT), + field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc())); + for (Field field : schema.getFields()) { + String name = field.name().toLowerCase(Locale.ROOT); + String comment = fieldsNameAndDoc.get(name); + if (fieldsNameAndDoc.containsKey(name) && !comment.equals(field.doc())) { + alterCommentSchema.put(name, new ImmutablePair<>(field.schema().getType().name(),comment)); + } + } + + ddlExecutor.updateTableComments(hiveSyncConfig.tableName,alterCommentSchema); + + List fieldSchemas = hiveClient.getTableCommentUsingMetastoreClient(hiveSyncConfig.tableName); + int commentCnt = 0; + for (FieldSchema fieldSchema : fieldSchemas) { + if (!StringUtils.isNullOrEmpty(fieldSchema.getComment())) { + commentCnt++; + } + } + assertEquals(2, commentCnt, "hive schema field comment numbers should match the avro schema field doc numbers"); + } + + @ParameterizedTest + @MethodSource("syncMode") + public void testSyncWithCommentedSchema(String syncMode) throws Exception { + hiveSyncConfig.syncMode = syncMode; + hiveSyncConfig.syncComment = false; + String commitTime = "100"; + HiveTestUtil.createCOWTableWithSchema(commitTime, "/simple-test-doced.avsc"); + + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + HoodieHiveClient hiveClient = + new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + List fieldSchemas = hiveClient.getTableCommentUsingMetastoreClient(hiveSyncConfig.tableName); + int commentCnt = 0; + for (FieldSchema fieldSchema : fieldSchemas) { + if (!StringUtils.isNullOrEmpty(fieldSchema.getComment())) { + commentCnt++; + } + } + assertEquals(0, commentCnt, "hive schema field comment numbers should match the avro schema field doc numbers"); + + hiveSyncConfig.syncComment = true; + tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem); + tool.syncHoodieTable(); + fieldSchemas = hiveClient.getTableCommentUsingMetastoreClient(hiveSyncConfig.tableName); + commentCnt = 0; + for (FieldSchema fieldSchema : fieldSchemas) { + if (!StringUtils.isNullOrEmpty(fieldSchema.getComment())) { + commentCnt++; + } + } + assertEquals(2, commentCnt, "hive schema field comment numbers should match the avro schema field doc numbers"); + } + @ParameterizedTest @MethodSource("syncModeAndSchemaFromCommitMetadata") public void testSyncMergeOnRead(boolean useSchemaFromCommitMetadata, String syncMode) throws Exception {