diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala index cb9b42607dfb9..d06d7a9f2c788 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/payload/SqlTypedRecord.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hudi.command.payload -import com.google.common.cache.CacheBuilder +import com.google.common.cache.{Cache, CacheBuilder} import org.apache.avro.Schema import org.apache.avro.generic.IndexedRecord import org.apache.hudi.HoodieSparkUtils.sparkAdapter @@ -51,7 +51,10 @@ object SqlTypedRecord { private val sqlTypeCache = CacheBuilder.newBuilder().build[Schema, StructType]() - private val avroDeserializerCache = CacheBuilder.newBuilder().build[Schema, HoodieAvroDeserializer]() + private val avroDeserializerCacheLocal = new ThreadLocal[Cache[Schema, HoodieAvroDeserializer]] { + override def initialValue(): Cache[Schema, HoodieAvroDeserializer] = + CacheBuilder.newBuilder().build[Schema, HoodieAvroDeserializer]() + } def getSqlType(schema: Schema): StructType = { sqlTypeCache.get(schema, new Callable[StructType] { @@ -64,10 +67,10 @@ object SqlTypedRecord { } def getAvroDeserializer(schema: Schema): HoodieAvroDeserializer= { - avroDeserializerCache.get(schema, new Callable[HoodieAvroDeserializer] { + avroDeserializerCacheLocal.get().get(schema, new Callable[HoodieAvroDeserializer] { override def call(): HoodieAvroDeserializer = { val deserializer = sparkAdapter.createAvroDeserializer(schema, getSqlType(schema)) - avroDeserializerCache.put(schema, deserializer) + avroDeserializerCacheLocal.get().put(schema, deserializer) deserializer } })