diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 1f1dc4d4233bf..d59ed8a84faec 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -221,6 +221,23 @@ private[hudi] object HoodieSparkSqlWriter { val (writeSuccessful, compactionInstant) = commitAndPerformPostOperations(writeResult, parameters, writeClient, tableConfig, jsc, TableInstantInfo(basePath, instantTime, commitActionType, operation)) + + def unpersistRdd(rdd: RDD[_]): Unit = { + if (sparkContext.getPersistentRDDs.contains(rdd.id)) { + try { + rdd.unpersist() + } catch { + case t: Exception => log.warn("Got excepting trying to unpersist rdd", t) + } + } + val parentRdds = rdd.dependencies.map(_.rdd) + parentRdds.foreach { parentRdd => + unpersistRdd(parentRdd) + } + } + // it's safe to unpersist cached rdds here + unpersistRdd(writeResult.getWriteStatuses.rdd) + (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig) } }