diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 94b06f551a744..b97e500898f43 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -170,7 +170,7 @@ object AvroConversionUtils { val loader: java.util.function.Function[Schema, StructType] = key => { try { HoodieSparkAvroSchemaConverters.toSqlType(key) match { - case (dataType, _) => dataType.asInstanceOf[StructType] + case (dataType, _, _) => dataType.asInstanceOf[StructType] } } catch { case e: Exception => throw new HoodieSchemaException("Failed to convert avro schema to struct type: " + avroSchema, e) @@ -185,7 +185,7 @@ object AvroConversionUtils { def convertAvroSchemaToDataType(avroSchema: Schema): DataType = { try { HoodieSparkAvroSchemaConverters.toSqlType(avroSchema) match { - case (dataType, _) => dataType + case (dataType, _, _) => dataType } } catch { case e: Exception => throw new HoodieSchemaException("Failed to convert avro schema to DataType: " + avroSchema, e) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSchemaConverters.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSchemaConverters.scala index 9b068afac83d2..14dc5ed698ef6 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSchemaConverters.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSchemaConverters.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.types.DataType */ trait HoodieAvroSchemaConverters { - def toSqlType(avroSchema: Schema): (DataType, Boolean) + def toSqlType(avroSchema: Schema): (DataType, Boolean, Option[String]) def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String = ""): Schema diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala index a853c1fdecde4..f1d2207027756 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.types.DataType */ object HoodieSparkAvroSchemaConverters extends HoodieAvroSchemaConverters { - override def toSqlType(avroSchema: Schema): (DataType, Boolean) = + override def toSqlType(avroSchema: Schema): (DataType, Boolean, Option[String]) = SchemaConverters.toSqlType(avroSchema) match { - case SchemaType(dataType, nullable) => (dataType, nullable) + case SchemaType(dataType, nullable, doc) => (dataType, nullable, doc) } override def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema = diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 12ffef11095ca..57ff270339b14 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -48,7 +48,7 @@ private[sql] object SchemaConverters { * * @since 2.4.0 */ - case class SchemaType(dataType: DataType, nullable: Boolean) + case class SchemaType(dataType: DataType, nullable: Boolean, doc: Option[String]) /** * Converts an Avro schema to a corresponding Spark SQL schema. @@ -62,33 +62,33 @@ private[sql] object SchemaConverters { private val unionFieldMemberPrefix = "member" private def toSqlTypeHelper(avroSchema: Schema, existingRecordNames: Set[String]): SchemaType = { - avroSchema.getType match { - case INT => avroSchema.getLogicalType match { - case _: Date => SchemaType(DateType, nullable = false) - case _ => SchemaType(IntegerType, nullable = false) + (avroSchema.getType, Option(avroSchema.getDoc)) match { + case (INT, doc) => avroSchema.getLogicalType match { + case _: Date => SchemaType(DateType, nullable = false, doc) + case _ => SchemaType(IntegerType, nullable = false, doc) } - case STRING => SchemaType(StringType, nullable = false) - case BOOLEAN => SchemaType(BooleanType, nullable = false) - case BYTES | FIXED => avroSchema.getLogicalType match { + case (STRING, doc) => SchemaType(StringType, nullable = false, doc) + case (BOOLEAN, doc) => SchemaType(BooleanType, nullable = false, doc) + case (BYTES | FIXED, doc) => avroSchema.getLogicalType match { // For FIXED type, if the precision requires more bytes than fixed size, the logical // type will be null, which is handled by Avro library. - case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false) - case _ => SchemaType(BinaryType, nullable = false) + case d: Decimal => SchemaType(DecimalType(d.getPrecision, d.getScale), nullable = false, doc) + case _ => SchemaType(BinaryType, nullable = false, doc) } - case DOUBLE => SchemaType(DoubleType, nullable = false) - case FLOAT => SchemaType(FloatType, nullable = false) - case LONG => avroSchema.getLogicalType match { - case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false) - case _: LocalTimestampMillis | _: LocalTimestampMicros => SchemaType(TimestampNTZType, nullable = false) - case _ => SchemaType(LongType, nullable = false) + case (DOUBLE, doc) => SchemaType(DoubleType, nullable = false, doc) + case (FLOAT, doc) => SchemaType(FloatType, nullable = false, doc) + case (LONG, doc) => avroSchema.getLogicalType match { + case _: TimestampMillis | _: TimestampMicros => SchemaType(TimestampType, nullable = false, doc) + case _: LocalTimestampMillis | _: LocalTimestampMicros => SchemaType(TimestampNTZType, nullable = false, doc) + case _ => SchemaType(LongType, nullable = false, doc) } - case ENUM => SchemaType(StringType, nullable = false) + case (ENUM, doc) => SchemaType(StringType, nullable = false, doc) - case NULL => SchemaType(NullType, nullable = true) + case (NULL, doc) => SchemaType(NullType, nullable = true, doc) - case RECORD => + case (RECORD, doc) => if (existingRecordNames.contains(avroSchema.getFullName)) { throw new IncompatibleSchemaException( s""" @@ -107,21 +107,21 @@ private[sql] object SchemaConverters { StructField(f.name, schemaType.dataType, schemaType.nullable, metadata) } - SchemaType(StructType(fields.toSeq), nullable = false) + SchemaType(StructType(fields.toSeq), nullable = false, doc) - case ARRAY => + case (ARRAY, doc) => val schemaType = toSqlTypeHelper(avroSchema.getElementType, existingRecordNames) SchemaType( ArrayType(schemaType.dataType, containsNull = schemaType.nullable), - nullable = false) + nullable = false, doc) - case MAP => + case (MAP, doc) => val schemaType = toSqlTypeHelper(avroSchema.getValueType, existingRecordNames) SchemaType( MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable), - nullable = false) + nullable = false, doc) - case UNION => + case (UNION, doc) => if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) { // In case of a union with null, eliminate it and make a recursive call val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL) @@ -135,9 +135,9 @@ private[sql] object SchemaConverters { case Seq(t1) => toSqlTypeHelper(avroSchema.getTypes.get(0), existingRecordNames) case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) => - SchemaType(LongType, nullable = false) + SchemaType(LongType, nullable = false, doc) case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) => - SchemaType(DoubleType, nullable = false) + SchemaType(DoubleType, nullable = false, doc) case _ => // Convert complex unions to struct types where field names are member0, member1, etc. // This is consistent with the behavior when converting between Avro and Parquet. @@ -145,10 +145,11 @@ private[sql] object SchemaConverters { case (s, i) => val schemaType = toSqlTypeHelper(s, existingRecordNames) // All fields are nullable because only one of them is set at a time - StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true) + val metadata = if(schemaType.doc.isDefined) new MetadataBuilder().putString("comment", schemaType.doc.get).build() else Metadata.empty + StructField(s"$unionFieldMemberPrefix$i", schemaType.dataType, nullable = true, metadata) } - SchemaType(StructType(fields.toSeq), nullable = false) + SchemaType(StructType(fields.toSeq), nullable = false, doc) } case other => throw new IncompatibleSchemaException(s"Unsupported type $other") diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java index 795730e9b0986..8ad37a3407936 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java @@ -192,14 +192,17 @@ public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws Except + "{\"name\":\"uuid\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}," + "{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}," - + "{\"name\":\"infos\",\"type\":{\"type\":\"array\", \"elementType\":\"string\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"infos\",\"type\":{\"type\":\"array\",\"elementType\":\"string\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"ts_3\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"ts_6\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}," + "{\"name\":\"par1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"; - assertEquals(expectedAvroSchemaStr, avroSchemaStr); + // Use robust JSON comparison instead of brittle string comparison + ObjectMapper mapper = new ObjectMapper(); + JsonNode expectedNode = mapper.readTree(expectedAvroSchemaStr); + JsonNode actualNode = mapper.readTree(avroSchemaStr); + assertEquals(expectedNode, actualNode, "Schema JSON structure doesn't match"); // validate array field nullable - ObjectMapper mapper = new ObjectMapper(); JsonNode arrayFieldTypeNode = mapper.readTree(avroSchemaStr).get("fields").get(8).get("type"); assertThat(arrayFieldTypeNode.get("type").asText(), is("array")); assertThat(arrayFieldTypeNode.get("containsNull").asBoolean(), is(true)); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnComments.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnComments.scala new file mode 100644 index 0000000000000..9c9b66a08350a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnComments.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.common.model.{HoodieTableType} +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest + +import org.apache.spark.SparkContext +import org.apache.spark.sql._ +import org.apache.spark.sql.hudi.HoodieSparkSessionExtension +import org.apache.spark.sql.types.StructType +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.EnumSource + + +class TestColumnComments { + var spark : SparkSession = _ + var sqlContext: SQLContext = _ + var sc : SparkContext = _ + + def initSparkContext(): Unit = { + val sparkConf = getSparkConfForTest(getClass.getSimpleName) + spark = SparkSession.builder() + .withExtensions(new HoodieSparkSessionExtension) + .config(sparkConf) + .getOrCreate() + sc = spark.sparkContext + sc.setLogLevel("ERROR") + sqlContext = spark.sqlContext + } + + @BeforeEach + def setUp() { + initSparkContext() + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType], names = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testColumnCommentWithSparkDatasource(tableType: HoodieTableType): Unit = { + val basePath = java.nio.file.Files.createTempDirectory("hoodie_comments_path").toAbsolutePath.toString + val opts = Map( + HoodieWriteConfig.TBL_NAME.key -> "hoodie_comments", + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.toString, + DataSourceWriteOptions.OPERATION.key -> "bulk_insert", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition" + ) + val inputDF = spark.sql("select '0' as _row_key, '1' as content, '2' as partition, '3' as ts") + val struct = new StructType() + .add("_row_key", "string", true, "dummy comment") + .add("content", "string", true) + .add("partition", "string", true) + .add("ts", "string", true) + spark.createDataFrame(inputDF.rdd, struct) + .write.format("hudi") + .options(opts) + .mode(SaveMode.Overwrite) + .save(basePath) + spark.read.format("hudi").load(basePath).registerTempTable("test_tbl") + + // now confirm the comment is present at read time + assertEquals(1, spark.sql("desc extended test_tbl") + .filter("col_name = '_row_key' and comment = 'dummy comment'").count) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala index dbcbeb6e12e86..5228ab1f693a8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala @@ -50,7 +50,7 @@ class TestAvroSerDe extends SparkAdapterSupport { } val schema = HoodieSchema.fromAvroSchema(HoodieMetadataColumnStats.SCHEMA$) - val SchemaType(catalystSchema, _) = SchemaConverters.toSqlType(schema.getAvroSchema) + val SchemaType(catalystSchema, _, _) = SchemaConverters.toSqlType(schema.getAvroSchema) val deserializer = sparkAdapter.createAvroDeserializer(schema, catalystSchema) val serializer = sparkAdapter.createAvroSerializer(catalystSchema, schema, nullable = false) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala index f82d005f3a1a6..533a931a83887 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestSchemaConverters.scala @@ -22,22 +22,40 @@ import org.apache.hudi.avro.model.HoodieMetadataColumnStats import org.apache.avro.JsonProperties import org.apache.spark.sql.avro.SchemaConverters.SchemaType +import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test class TestSchemaConverters { + /** + * Helper method to strip metadata from DataType structures for comparison. + * This allows us to compare the core schema structure while ignoring documentation/comments. + */ + private def stripMetadata(dataType: DataType): DataType = dataType match { + case StructType(fields) => StructType(fields.map(f => + StructField(f.name, stripMetadata(f.dataType), f.nullable, Metadata.empty))) + case ArrayType(elementType, containsNull) => ArrayType(stripMetadata(elementType), containsNull) + case MapType(keyType, valueType, valueContainsNull) => + MapType(stripMetadata(keyType), stripMetadata(valueType), valueContainsNull) + case other => other + } + @Test def testAvroUnionConversion(): Unit = { val originalAvroSchema = HoodieMetadataColumnStats.SCHEMA$ - val SchemaType(convertedStructType, _) = SchemaConverters.toSqlType(originalAvroSchema) + val SchemaType(convertedStructType, _, _) = SchemaConverters.toSqlType(originalAvroSchema) val convertedAvroSchema = SchemaConverters.toAvroType(convertedStructType) // NOTE: Here we're validating that converting Avro -> Catalyst and Catalyst -> Avro are inverse - // transformations, but since it's not an easy endeavor to match Avro schemas, we match - // derived Catalyst schemas instead - assertEquals(convertedStructType, SchemaConverters.toSqlType(convertedAvroSchema).dataType) + // transformations for the core data structure. We strip metadata (comments/docs) since + // the toAvroType method doesn't preserve documentation from StructField metadata, making + // perfect round-trip conversion with docs challenging for complex union schemas. + val firstConversion = stripMetadata(convertedStructType) + val secondConversion = stripMetadata(SchemaConverters.toSqlType(convertedAvroSchema).dataType) + + assertEquals(firstConversion, secondConversion) // validate that the doc string and default null value are set originalAvroSchema.getFields.forEach { field => val convertedField = convertedAvroSchema.getField(field.name()) diff --git a/hudi-sync/hudi-adb-sync/pom.xml b/hudi-sync/hudi-adb-sync/pom.xml index a7b1167a4144d..0794bded0876c 100644 --- a/hudi-sync/hudi-adb-sync/pom.xml +++ b/hudi-sync/hudi-adb-sync/pom.xml @@ -47,6 +47,12 @@ hudi-sync-common ${project.version} + + org.apache.hudi + hudi-io + ${project.version} + shaded + org.apache.hudi hudi-hive-sync diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java index 5357af384f0e7..d0c95871a7ee7 100644 --- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java +++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.HadoopConfigUtils; import org.apache.hudi.common.util.Option; @@ -33,6 +34,7 @@ import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils; import com.beust.jcommander.JCommander; +import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; @@ -219,13 +221,20 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi Map tableProperties = ConfigUtils.toMap(config.getString(ADB_SYNC_TABLE_PROPERTIES)); Map serdeProperties = ConfigUtils.toMap(config.getString(ADB_SYNC_SERDE_PROPERTIES)); if (config.getBoolean(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE)) { - Map sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS), - config.getString(META_SYNC_SPARK_VERSION), config.getInt(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema); - Map sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, config.getString(META_SYNC_BASE_PATH)); - tableProperties.putAll(sparkTableProperties); - serdeProperties.putAll(sparkSerdeProperties); - LOG.info("Sync as spark datasource table, tableName:{}, tableExists:{}, tableProperties:{}, sederProperties:{}", - tableName, tableExists, tableProperties, serdeProperties); + try { + // Always include metadata fields for ADB sync + Schema avroSchema = new TableSchemaResolver(syncClient.getMetaClient()).getTableAvroSchema(true); + HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(avroSchema); + Map sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS), + config.getString(META_SYNC_SPARK_VERSION), config.getInt(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), hoodieSchema); + Map sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, config.getString(META_SYNC_BASE_PATH)); + tableProperties.putAll(sparkTableProperties); + serdeProperties.putAll(sparkSerdeProperties); + LOG.info("Sync as spark datasource table, tableName:{}, tableExists:{}, tableProperties:{}, sederProperties:{}", + tableName, tableExists, tableProperties, serdeProperties); + } catch (Exception e) { + throw new HoodieAdbSyncException("Failed to get Avro schema for ADB sync", e); + } } // Check and sync schema diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java index 78f8ec43984ef..b29a221935e14 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java @@ -25,7 +25,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.sync.datahub.config.DataHubSyncConfig; +import org.apache.avro.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,13 +87,20 @@ private static void addUserDefinedProperties(Map properties, Dat } private static void addSparkRelatedProperties(Map properties, DataHubSyncConfig config, HoodieTableMetadata tableMetadata) { - Map sparkProperties = SparkDataSourceTableUtils.getSparkTableProperties( - config.getSplitStrings(META_SYNC_PARTITION_FIELDS), - config.getStringOrDefault(META_SYNC_SPARK_VERSION), - config.getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), - tableMetadata.getSchema() - ); - properties.putAll(sparkProperties); + try { + // Always include metadata fields for DataHub sync (following hive-sync pattern) + Schema avroSchema = new TableSchemaResolver(tableMetadata.getMetaClient()).getTableAvroSchema(true); + HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(avroSchema); + Map sparkProperties = SparkDataSourceTableUtils.getSparkTableProperties( + config.getSplitStrings(META_SYNC_PARTITION_FIELDS), + config.getStringOrDefault(META_SYNC_SPARK_VERSION), + config.getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), + hoodieSchema + ); + properties.putAll(sparkProperties); + } catch (Exception e) { + throw new RuntimeException("Failed to get Avro schema for DataHub sync", e); + } properties.putAll(getSerdeProperties(config, false)); } @@ -131,5 +140,9 @@ public String getTableVersion() { public HoodieSchema getSchema() { return schema; } + + public HoodieTableMetaClient getMetaClient() { + return metaClient; + } } -} \ No newline at end of file +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java index 1eb95cf5529e3..1fbef046de327 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfigHolder.java @@ -99,7 +99,9 @@ public class HiveSyncConfigHolder { .key("hoodie.datasource.hive_sync.sync_as_datasource") .defaultValue("true") .markAdvanced() - .withDocumentation(""); + .withDocumentation("Add information to setup the spark datasource, including tables properties and spark schema." + + " This allow spark to use optimized reader." + + " Column comments are also added for the first level only."); public static final ConfigProperty HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD = ConfigProperty .key("hoodie.datasource.hive_sync.schema_string_length_thresh") .defaultValue(4000) diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 9cab763f8d57d..04bba64f59f41 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieSyncTableStrategy; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; @@ -36,6 +37,7 @@ import org.apache.hudi.sync.common.model.PartitionEvent.PartitionEventType; import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils; +import org.apache.avro.Schema; import com.beust.jcommander.JCommander; import com.codahale.metrics.Timer; import org.apache.hadoop.conf.Configuration; @@ -398,9 +400,16 @@ private void createOrReplaceTable(String tableName, boolean useRealtimeInputForm private Map getTableProperties(HoodieSchema schema) { Map tableProperties = ConfigUtils.toMap(config.getString(HIVE_TABLE_PROPERTIES)); if (config.getBoolean(HIVE_SYNC_AS_DATA_SOURCE_TABLE)) { - Map sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS), - config.getStringOrDefault(META_SYNC_SPARK_VERSION), config.getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), schema); - tableProperties.putAll(sparkTableProperties); + try { + // Always include metadata fields for Hive sync + Schema avroSchema = new TableSchemaResolver(syncClient.getMetaClient()).getTableAvroSchema(true); + HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(avroSchema); + Map sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(config.getSplitStrings(META_SYNC_PARTITION_FIELDS), + config.getStringOrDefault(META_SYNC_SPARK_VERSION), config.getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD), hoodieSchema); + tableProperties.putAll(sparkTableProperties); + } catch (Exception e) { + throw new HoodieException("Failed to get Avro schema for Hive sync", e); + } } return tableProperties; } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java index 7cba6f9b7673c..8662bc6cfc24c 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java @@ -90,7 +90,7 @@ private List updateHiveSQLs(List sqls) { for (String sql : sqls) { if (hiveDriver != null) { HoodieTimer timer = HoodieTimer.start(); - responses.add(hiveDriver.run(sql)); + responses.add(hiveDriver.run(escapeAntiSlash(sql))); LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, timer.endTimer())); } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java index 5c640062384a2..ae92cfad5cb28 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java @@ -65,7 +65,7 @@ public void runSQL(String s) { try { stmt = connection.createStatement(); LOG.info("Executing SQL " + s); - stmt.execute(s); + stmt.execute(escapeAntiSlash(s)); } catch (SQLException e) { throw new HoodieHiveSyncException("Failed in executing SQL " + s, e); } finally { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java index 6023b3135dc5c..5461175492fdb 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE; @@ -222,5 +223,14 @@ private List constructChangePartitions(String tableName, List pa } return changePartitions; } + + /** + * Escape anti slash for column comment, in case special character + * @param comment + * @return + */ + protected String escapeAntiSlash(String comment) { + return comment.replaceAll("\\\\", Matcher.quoteReplacement("\\\\")); + } } diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/AvroToSparkJson.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/AvroToSparkJson.java new file mode 100644 index 0000000000000..41c1350951529 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/AvroToSparkJson.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common.util; + +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utility class to convert Avro schemas directly to Spark SQL schema JSON format. + * This bypasses Parquet conversion and eliminates struct-wrapping issues with complex types. + * + * Based on Spark's SchemaConverters logic but implemented without Spark dependencies. + */ +public class AvroToSparkJson { + + private AvroToSparkJson() { + // Utility class + } + + /** + * Convert an Avro schema to Spark SQL schema JSON format. + * + * @param avroSchema The Avro schema to convert + * @return JSON string representing the Spark schema + */ + public static String convertToSparkSchemaJson(Schema avroSchema) { + if (avroSchema.getType() != Schema.Type.RECORD) { + throw new IllegalArgumentException("Top-level schema must be a RECORD type, got: " + avroSchema.getType()); + } + + SparkDataType sparkType = convertAvroType(avroSchema); + return sparkType.toJson(); + } + + /** + * Convert an Avro schema to Spark SQL schema JSON format with field reordering. + * Reorders fields to match Spark DataSource table convention: data columns first, partition columns last. + * + * @param avroSchema The Avro schema to convert + * @param partitionFieldNames List of partition field names to be moved to the end + * @return JSON string representing the Spark schema with reordered fields + */ + public static String convertToSparkSchemaJson(Schema avroSchema, List partitionFieldNames) { + if (avroSchema.getType() != Schema.Type.RECORD) { + throw new IllegalArgumentException("Top-level schema must be a RECORD type, got: " + avroSchema.getType()); + } + + if (partitionFieldNames == null || partitionFieldNames.isEmpty()) { + // No reordering needed, use the standard method + return convertToSparkSchemaJson(avroSchema); + } + + // Create reordered schema + Schema reorderedSchema = reorderSchemaFields(avroSchema, partitionFieldNames); + SparkDataType sparkType = convertAvroType(reorderedSchema); + return sparkType.toJson(); + } + + private static SparkDataType convertAvroType(Schema avroSchema) { + switch (avroSchema.getType()) { + case NULL: + return new PrimitiveSparkType("null"); + case BOOLEAN: + return new PrimitiveSparkType("boolean"); + case INT: + if (avroSchema.getLogicalType() instanceof LogicalTypes.Date) { + return new PrimitiveSparkType("date"); + } + return new PrimitiveSparkType("integer"); + case LONG: + if (avroSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis + || avroSchema.getLogicalType() instanceof LogicalTypes.TimestampMicros) { + return new PrimitiveSparkType("timestamp"); + } + return new PrimitiveSparkType("long"); + case FLOAT: + return new PrimitiveSparkType("float"); + case DOUBLE: + return new PrimitiveSparkType("double"); + case BYTES: + case FIXED: + if (avroSchema.getLogicalType() instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) avroSchema.getLogicalType(); + return new DecimalSparkType(decimal.getPrecision(), decimal.getScale()); + } + return new PrimitiveSparkType("binary"); + case STRING: + case ENUM: + return new PrimitiveSparkType("string"); + case ARRAY: + SparkDataType elementType = convertAvroType(avroSchema.getElementType()); + boolean containsNull = isNullable(avroSchema.getElementType()); + return new ArraySparkType(elementType, containsNull); + case MAP: + SparkDataType valueType = convertAvroType(avroSchema.getValueType()); + boolean valueContainsNull = isNullable(avroSchema.getValueType()); + return new MapSparkType(new PrimitiveSparkType("string"), valueType, valueContainsNull); + case RECORD: + List fields = avroSchema.getFields().stream() + .map(field -> { + SparkDataType fieldType = convertAvroType(field.schema()); + boolean nullable = isNullable(field.schema()); + String comment = field.doc(); + return new StructFieldType(field.name(), fieldType, nullable, comment); + }) + .collect(Collectors.toList()); + return new StructSparkType(fields); + case UNION: + return handleUnion(avroSchema); + default: + throw new IllegalArgumentException("Unsupported Avro type: " + avroSchema.getType()); + } + } + + private static SparkDataType handleUnion(Schema unionSchema) { + List types = unionSchema.getTypes(); + + // Handle nullable unions (common pattern: [null, actual_type]) + if (types.size() == 2 && types.stream().anyMatch(s -> s.getType() == Schema.Type.NULL)) { + Schema nonNullType = types.stream() + .filter(s -> s.getType() != Schema.Type.NULL) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Invalid union with only null type")); + return convertAvroType(nonNullType); + } + + // For complex unions, we could implement more sophisticated logic + // For now, treat as string (similar to how some systems handle this) + return new PrimitiveSparkType("string"); + } + + private static boolean isNullable(Schema schema) { + if (schema.getType() == Schema.Type.NULL) { + return true; + } + if (schema.getType() == Schema.Type.UNION) { + return schema.getTypes().stream().anyMatch(s -> s.getType() == Schema.Type.NULL); + } + return false; + } + + /** + * Abstract base class for Spark data types + */ + private abstract static class SparkDataType { + public abstract String toJson(); + } + + /** + * Primitive Spark types (string, int, boolean, etc.) + */ + private static class PrimitiveSparkType extends SparkDataType { + private final String typeName; + + public PrimitiveSparkType(String typeName) { + this.typeName = typeName; + } + + @Override + public String toJson() { + return "\"" + typeName + "\""; + } + } + + /** + * Decimal Spark type with precision and scale + */ + private static class DecimalSparkType extends SparkDataType { + private final int precision; + private final int scale; + + public DecimalSparkType(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + + @Override + public String toJson() { + return "\"decimal(" + precision + "," + scale + ")\""; + } + } + + /** + * Array Spark type + */ + private static class ArraySparkType extends SparkDataType { + private final SparkDataType elementType; + private final boolean containsNull; + + public ArraySparkType(SparkDataType elementType, boolean containsNull) { + this.elementType = elementType; + this.containsNull = containsNull; + } + + @Override + public String toJson() { + return "{\"type\":\"array\",\"elementType\":" + elementType.toJson() + + ",\"containsNull\":" + containsNull + "}"; + } + } + + /** + * Map Spark type + */ + private static class MapSparkType extends SparkDataType { + private final SparkDataType keyType; + private final SparkDataType valueType; + private final boolean valueContainsNull; + + public MapSparkType(SparkDataType keyType, SparkDataType valueType, boolean valueContainsNull) { + this.keyType = keyType; + this.valueType = valueType; + this.valueContainsNull = valueContainsNull; + } + + @Override + public String toJson() { + return "{\"type\":\"map\",\"keyType\":" + keyType.toJson() + + ",\"valueType\":" + valueType.toJson() + + ",\"valueContainsNull\":" + valueContainsNull + "}"; + } + } + + /** + * Struct Spark type + */ + private static class StructSparkType extends SparkDataType { + private final List fields; + + public StructSparkType(List fields) { + this.fields = fields; + } + + @Override + public String toJson() { + String fieldsJson = fields.stream() + .map(StructFieldType::toJson) + .collect(Collectors.joining(",")); + return "{\"type\":\"struct\",\"fields\":[" + fieldsJson + "]}"; + } + } + + /** + * Struct field type with metadata support + */ + private static class StructFieldType { + private final String name; + private final SparkDataType dataType; + private final boolean nullable; + private final String comment; + + public StructFieldType(String name, SparkDataType dataType, boolean nullable, String comment) { + this.name = name; + this.dataType = dataType; + this.nullable = nullable; + this.comment = comment; + } + + public String toJson() { + StringBuilder metadata = new StringBuilder("{"); + if (comment != null && !comment.trim().isEmpty()) { + // Escape quotes in comments + String escapedComment = comment.replace("\"", "\\\""); + metadata.append("\"comment\":\"").append(escapedComment).append("\""); + } + metadata.append("}"); + + return "{\"name\":\"" + name + "\"" + + ",\"type\":" + dataType.toJson() + + ",\"nullable\":" + nullable + + ",\"metadata\":" + metadata.toString() + "}"; + } + } + + /** + * Reorder Avro schema fields to match Spark DataSource table convention. + * Data columns first, partition columns last. + * + * @param originalSchema The original Avro schema + * @param partitionFieldNames List of partition field names + * @return New Avro schema with reordered fields + */ + private static Schema reorderSchemaFields(Schema originalSchema, List partitionFieldNames) { + if (originalSchema.getType() != Schema.Type.RECORD) { + return originalSchema; + } + + List originalFields = originalSchema.getFields(); + List dataFields = new ArrayList<>(); + List partitionFields = new ArrayList<>(); + + // Separate data fields and partition fields + for (Schema.Field field : originalFields) { + if (partitionFieldNames.contains(field.name())) { + partitionFields.add(field); + } else { + dataFields.add(field); + } + } + + // Create reordered field list: data fields first, partition fields last + List reorderedFields = new ArrayList<>(); + + // Add data fields first (with cloned field objects to avoid issues) + for (Schema.Field field : dataFields) { + Schema.Field clonedField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()); + // Copy field properties if they exist + if (field.getObjectProps() != null) { + for (Map.Entry prop : field.getObjectProps().entrySet()) { + clonedField.addProp(prop.getKey(), prop.getValue()); + } + } + reorderedFields.add(clonedField); + } + + // Add partition fields last (with cloned field objects) + for (Schema.Field field : partitionFields) { + Schema.Field clonedField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()); + // Copy field properties if they exist + if (field.getObjectProps() != null) { + for (Map.Entry prop : field.getObjectProps().entrySet()) { + clonedField.addProp(prop.getKey(), prop.getValue()); + } + } + reorderedFields.add(clonedField); + } + + // Create new schema with reordered fields + Schema reorderedSchema = Schema.createRecord( + originalSchema.getName(), + originalSchema.getDoc(), + originalSchema.getNamespace(), + originalSchema.isError(), + reorderedFields + ); + + // Copy schema-level properties if they exist + if (originalSchema.getObjectProps() != null) { + for (Map.Entry prop : originalSchema.getObjectProps().entrySet()) { + reorderedSchema.addProp(prop.getKey(), prop.getValue()); + } + } + + return reorderedSchema; + } +} \ No newline at end of file diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java index d747a169aa411..0edc62f42bff9 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java @@ -32,9 +32,12 @@ public class SparkDataSourceTableUtils { /** - * Get Spark Sql related table properties. This is used for spark datasource table. - * @param schema The schema to write to the table. - * @return A new parameters added the spark's table properties. + * Get Spark Sql related table properties with Hoodie schema for comments. + * @param partitionNames List of partition field names + * @param sparkVersion Spark version + * @param schemaLengthThreshold Schema length threshold + * @param schema Hoodie schema with field docs + * @return Map of Spark table properties */ public static Map getSparkTableProperties(List partitionNames, String sparkVersion, int schemaLengthThreshold, HoodieSchema schema) { @@ -66,7 +69,6 @@ public static Map getSparkTableProperties(List partition dataCols.forEach(field -> reOrderedFields.add(HoodieSchemaUtils.createNewSchemaField(field))); partitionCols.forEach(field -> reOrderedFields.add(HoodieSchemaUtils.createNewSchemaField(field))); HoodieSchema reOrderedSchema = HoodieSchema.createRecord(schema.getName(), null, null, reOrderedFields); - Map sparkProperties = new HashMap<>(); sparkProperties.put("spark.sql.sources.provider", "hudi"); if (!StringUtils.isNullOrEmpty(sparkVersion)) { diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkSchemaUtils.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkSchemaUtils.java index ff14b3a46f870..96809a79aef03 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkSchemaUtils.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkSchemaUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.sync.common.util; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; /** * Convert the Hoodie schema to spark schema' json string. @@ -26,15 +27,59 @@ * in spark project. */ public class SparkSchemaUtils { + private static final String METADATA_DOC_PREFIX = "Hudi metadata field: "; public static String convertToSparkSchemaJson(HoodieSchema schema) { String fieldsJsonString = schema.getFields().stream().map(field -> "{\"name\":\"" + field.name() + "\",\"type\":" + convertFieldType(field.getNonNullSchema()) - + ",\"nullable\":" + field.isNullable() + ",\"metadata\":{}}") + + ",\"nullable\":" + field.isNullable() + ",\"metadata\":" + toMetadataJson(field) + "}") .reduce((a, b) -> a + "," + b).orElse(""); return "{\"type\":\"struct\",\"fields\":[" + fieldsJsonString + "]}"; } + private static String toMetadataJson(HoodieSchemaField field) { + if (field.doc().isPresent() && !field.doc().get().isEmpty()) { + String doc = field.doc().get(); + if (!doc.startsWith(METADATA_DOC_PREFIX)) { + return "{\"comment\":\"" + escapeJson(doc) + "\"}"; + } + } + return "{}"; + } + + private static String escapeJson(String value) { + StringBuilder escaped = new StringBuilder(value.length()); + for (int i = 0; i < value.length(); i++) { + char c = value.charAt(i); + switch (c) { + case '\\': + escaped.append("\\\\"); + break; + case '"': + escaped.append("\\\""); + break; + case '\b': + escaped.append("\\b"); + break; + case '\f': + escaped.append("\\f"); + break; + case '\n': + escaped.append("\\n"); + break; + case '\r': + escaped.append("\\r"); + break; + case '\t': + escaped.append("\\t"); + break; + default: + escaped.append(c); + } + } + return escaped.toString(); + } + private static String convertFieldType(HoodieSchema originalFieldSchema) { HoodieSchema fieldSchema = originalFieldSchema.getNonNullType(); switch (fieldSchema.getType()) { diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/AvroToSparkJsonTest.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/AvroToSparkJsonTest.java new file mode 100644 index 0000000000000..decc6d6f52701 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/AvroToSparkJsonTest.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common.util; + +import org.apache.avro.Schema; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AvroToSparkJsonTest { + + @Test + public void testSimpleRecordConversion() { + // Create a simple Avro schema with basic types + String avroSchemaJson = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\"},\n" + + " {\"name\": \"name\", \"type\": \"string\"},\n" + + " {\"name\": \"active\", \"type\": \"boolean\"},\n" + + " {\"name\": \"score\", \"type\": [\"null\", \"double\"], \"default\": null},\n" + + " {\"name\": \"tags\", \"type\": {\"type\": \"array\", \"items\": \"string\"}},\n" + + " {\"name\": \"metadata\", \"type\": {\"type\": \"map\", \"values\": \"string\"}}\n" + + " ]\n" + + "}"; + + Schema avroSchema = new Schema.Parser().parse(avroSchemaJson); + String sparkSchemaJson = AvroToSparkJson.convertToSparkSchemaJson(avroSchema); + + // Verify the overall structure + assertTrue(sparkSchemaJson.contains("\"type\":\"struct\"")); + assertTrue(sparkSchemaJson.contains("\"fields\":[")); + + // Verify basic field types + assertTrue(sparkSchemaJson.contains("\"name\":\"id\"")); + assertTrue(sparkSchemaJson.contains("\"name\":\"name\"")); + assertTrue(sparkSchemaJson.contains("\"name\":\"active\"")); + assertTrue(sparkSchemaJson.contains("\"name\":\"score\"")); + assertTrue(sparkSchemaJson.contains("\"name\":\"tags\"")); + assertTrue(sparkSchemaJson.contains("\"name\":\"metadata\"")); + + // Verify array type is correctly converted (not wrapped in struct) + assertTrue(sparkSchemaJson.contains("\"type\":\"array\"")); + assertTrue(sparkSchemaJson.contains("\"elementType\":\"string\"")); + + // Verify map type is correctly converted (not wrapped in struct) + assertTrue(sparkSchemaJson.contains("\"type\":\"map\"")); + assertTrue(sparkSchemaJson.contains("\"keyType\":\"string\"")); + assertTrue(sparkSchemaJson.contains("\"valueType\":\"string\"")); + + // Verify no struct wrapping around arrays or maps + assertTrue(!sparkSchemaJson.contains("struct?/~`\"},\n" + + " {\"name\": \"unicode_field\", \"type\": [\"null\", \"string\"], \"default\": null, \"doc\": \"Unicode: 中文 العربية русский\"}\n" + + " ]\n" + + "}"; + + Schema avroSchema = new Schema.Parser().parse(avroSchemaJson); + String sparkSchemaJson = AvroToSparkJson.convertToSparkSchemaJson(avroSchema); + + // Verify proper escaping and character preservation + assertTrue(sparkSchemaJson.contains("\"comment\":\"ID with") && sparkSchemaJson.contains("quotes") && sparkSchemaJson.contains("backslashes")); + assertTrue(sparkSchemaJson.contains("\"comment\":\"Special chars: @#$%^&*()+=[]{}|;':,.<>?/~`\"")); + assertTrue(sparkSchemaJson.contains("\"comment\":\"Unicode: 中文 العربية русский\"")); + } + + @Test + public void testRecordWithoutComments() { + String avroSchemaJson = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\"},\n" + + " {\"name\": \"name\", \"type\": [\"null\", \"string\"], \"default\": null},\n" + + " {\"name\": \"nested\", \"type\": {\n" + + " \"type\": \"record\", \"name\": \"NestedRecord\", \"fields\": [\n" + + " {\"name\": \"value\", \"type\": \"string\"}\n" + + " ]\n" + + " }}\n" + + " ]\n" + + "}"; + + Schema avroSchema = new Schema.Parser().parse(avroSchemaJson); + String sparkSchemaJson = AvroToSparkJson.convertToSparkSchemaJson(avroSchema); + + // Verify that fields without comments have empty metadata + assertTrue(sparkSchemaJson.contains("\"metadata\":{}")); + // Verify no comment fields are present + assertTrue(!sparkSchemaJson.contains("\"comment\":")); + } + + @Test + public void testArrayOfMaps() { + String avroSchemaJson = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"config_sets\", \"type\": {\"type\": \"array\", \"items\": {\n" + + " \"type\": \"map\", \"values\": \"string\"\n" + + " }}, \"doc\": \"Array of configuration maps\"}\n" + + " ]\n" + + "}"; + + Schema avroSchema = new Schema.Parser().parse(avroSchemaJson); + String sparkSchemaJson = AvroToSparkJson.convertToSparkSchemaJson(avroSchema); + + assertTrue(sparkSchemaJson.contains("\"type\":\"array\"")); + assertTrue(sparkSchemaJson.contains("\"type\":\"map\"")); + assertTrue(sparkSchemaJson.contains("\"comment\":\"Array of configuration maps\"")); + } + + @Test + public void testMapWithArrayValues() { + String avroSchemaJson = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"tag_groups\", \"type\": {\"type\": \"map\", \"values\": {\n" + + " \"type\": \"array\", \"items\": \"string\"\n" + + " }}, \"doc\": \"Map of tag groups\"}\n" + + " ]\n" + + "}"; + + Schema avroSchema = new Schema.Parser().parse(avroSchemaJson); + String sparkSchemaJson = AvroToSparkJson.convertToSparkSchemaJson(avroSchema); + + assertTrue(sparkSchemaJson.contains("\"type\":\"map\"")); + assertTrue(sparkSchemaJson.contains("\"type\":\"array\"")); + assertTrue(sparkSchemaJson.contains("\"comment\":\"Map of tag groups\"")); + } + + @Test + public void testFieldReordering() { + // Create Avro schema with mixed data and partition fields + String avroSchemaJson = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"TestRecord\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\", \"doc\": \"Unique identifier\"},\n" + + " {\"name\": \"partition_col\", \"type\": \"string\", \"doc\": \"Partition column\"},\n" + + " {\"name\": \"name\", \"type\": [\"null\", \"string\"], \"default\": null, \"doc\": \"Person's name\"},\n" + + " {\"name\": \"age\", \"type\": \"int\"},\n" + + " {\"name\": \"year\", \"type\": \"int\", \"doc\": \"Year partition\"}\n" + + " ]\n" + + "}"; + + Schema avroSchema = new Schema.Parser().parse(avroSchemaJson); + List partitionFields = Arrays.asList("partition_col", "year"); + + // Test without reordering (original order) + String originalSchemaJson = AvroToSparkJson.convertToSparkSchemaJson(avroSchema); + + // Test with reordering (data fields first, partition fields last) + String reorderedSchemaJson = AvroToSparkJson.convertToSparkSchemaJson(avroSchema, partitionFields); + + // Parse JSON to verify field order + assertTrue(originalSchemaJson.contains("\"name\":\"id\"")); + assertTrue(originalSchemaJson.contains("\"name\":\"partition_col\"")); + assertTrue(reorderedSchemaJson.contains("\"name\":\"id\"")); + assertTrue(reorderedSchemaJson.contains("\"name\":\"partition_col\"")); + + // Verify that reordered schema has data fields (id, name, age) before partition fields (partition_col, year) + int idIndex = reorderedSchemaJson.indexOf("\"name\":\"id\""); + int nameIndex = reorderedSchemaJson.indexOf("\"name\":\"name\""); + int ageIndex = reorderedSchemaJson.indexOf("\"name\":\"age\""); + int partitionColIndex = reorderedSchemaJson.indexOf("\"name\":\"partition_col\""); + int yearIndex = reorderedSchemaJson.indexOf("\"name\":\"year\""); + + // Data fields should come before partition fields + assertTrue(idIndex < partitionColIndex, "id field should come before partition_col"); + assertTrue(nameIndex < partitionColIndex, "name field should come before partition_col"); + assertTrue(ageIndex < partitionColIndex, "age field should come before partition_col"); + assertTrue(ageIndex < yearIndex, "age field should come before year partition field"); + + // Verify comments are preserved in reordered schema + assertTrue(reorderedSchemaJson.contains("\"comment\":\"Unique identifier\"")); + assertTrue(reorderedSchemaJson.contains("\"comment\":\"Partition column\"")); + assertTrue(reorderedSchemaJson.contains("\"comment\":\"Person's name\"")); + assertTrue(reorderedSchemaJson.contains("\"comment\":\"Year partition\"")); + } +} \ No newline at end of file diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSparkDataSourceTableUtils.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSparkDataSourceTableUtils.java new file mode 100644 index 0000000000000..a79ae2b7292e2 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSparkDataSourceTableUtils.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sync.common.util; + +import org.apache.avro.Schema; +import org.apache.hudi.common.schema.HoodieSchema; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestSparkDataSourceTableUtils { + + @Test + public void testGetSparkTablePropertiesWithComments() { + // Create Avro schema with comments + String avroSchemaJson = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"test_schema\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\", \"doc\": \"Unique identifier\"},\n" + + " {\"name\": \"name\", \"type\": [\"null\", \"string\"], \"doc\": \"Person's full name\"},\n" + + " {\"name\": \"partition_col\", \"type\": \"string\", \"doc\": \"Partition column for data organization\"}\n" + + " ]\n" + + "}"; + Schema avroSchema = new Schema.Parser().parse(avroSchemaJson); + HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(avroSchema); + + List partitionNames = Arrays.asList("partition_col"); + String sparkVersion = "3.3.0"; + int schemaLengthThreshold = 4000; + + // Get Spark table properties with comments + Map properties = SparkDataSourceTableUtils.getSparkTableProperties( + partitionNames, sparkVersion, schemaLengthThreshold, hoodieSchema); + + // Verify that properties contain the expected Spark DataSource settings + assertTrue(properties.containsKey("spark.sql.sources.provider"), + "Should contain Spark provider property"); + assertTrue(properties.containsKey("spark.sql.sources.schema.numParts"), + "Should contain schema parts count"); + assertTrue(properties.containsKey("spark.sql.sources.schema.part.0"), + "Should contain schema part"); + + // Verify that comments are included in the schema JSON + String schemaPart = properties.get("spark.sql.sources.schema.part.0"); + assertTrue(schemaPart.contains("\"comment\":\"Unique identifier\""), + "Should contain comment for id field"); + assertTrue(schemaPart.contains("\"comment\":\"Person's full name\""), + "Should contain comment for name field"); + assertTrue(schemaPart.contains("\"comment\":\"Partition column for data organization\""), + "Should contain comment for partition column"); + } + + @Test + public void testGetSparkTablePropertiesWithoutComments() { + // Create equivalent Avro schema without comments (matching the original MessageType) + String avroSchemaJson = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"test_schema\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": \"int\"},\n" + + " {\"name\": \"name\", \"type\": [\"null\", \"string\"]}\n" + + " ]\n" + + "}"; + Schema avroSchema = new Schema.Parser().parse(avroSchemaJson); + HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(avroSchema); + + List partitionNames = Arrays.asList(); + String sparkVersion = "3.3.0"; + int schemaLengthThreshold = 4000; + + // Get Spark table properties with Avro schema but test the behavior without comments + Map properties = SparkDataSourceTableUtils.getSparkTableProperties( + partitionNames, sparkVersion, schemaLengthThreshold, hoodieSchema); + + // Verify that properties are generated correctly + assertTrue(properties.containsKey("spark.sql.sources.provider"), + "Should contain Spark provider property"); + assertTrue(properties.containsKey("spark.sql.sources.schema.part.0"), + "Should contain schema part"); + + // Verify that no comments are present + String schemaPart = properties.get("spark.sql.sources.schema.part.0"); + assertTrue(!schemaPart.contains("\"comment\":"), + "Should not contain any comment fields when no comments provided"); + assertTrue(schemaPart.contains("\"metadata\":{}"), + "Should contain empty metadata when no comments provided"); + } +}