diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index d86cd75315bf3..e0f71217dcb4b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -32,7 +32,7 @@ import org.apache.hudi.sql.IExpressionEvaluator import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.hudi.SerDeUtils -import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, getMergedSchema} +import org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator, setWriteSchema, getMergedSchema} import org.apache.spark.sql.types.{StructField, StructType} import java.util.concurrent.Callable @@ -215,9 +215,7 @@ class ExpressionPayload(record: GenericRecord, */ private def initWriteSchemaIfNeed(properties: Properties): Unit = { if (writeSchema == null) { - ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key), - s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}") - writeSchema = new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key)) + writeSchema = setWriteSchema(properties) } } @@ -276,6 +274,18 @@ object ExpressionPayload { .maximumSize(1024) .build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]() + private val writeSchemaCache = CacheBuilder.newBuilder().build[String, Schema]() + + def setWriteSchema(properties: Properties): Schema = { + ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key), + s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}") + writeSchemaCache.get(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key), + new Callable[Schema] { + override def call(): Schema = + new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key)) + }) + } + /** * Do the CodeGen for each condition and assignment expressions.We will cache it to reduce * the compile time for each method call.