From 2ecca39a1757fc1bac45811ba966d28e1513d8fa Mon Sep 17 00:00:00 2001 From: xiarixiaoyao Date: Mon, 15 Mar 2021 11:01:30 +0800 Subject: [PATCH] =?UTF-8?q?[HUDI-1688]hudi=20write=20should=20uncache=20rd?= =?UTF-8?q?d=EF=BC=8C=20when=20the=20write=20operation=20is=20finnished?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 1f1dc4d4233bf..d59ed8a84faec 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -221,6 +221,23 @@ private[hudi] object HoodieSparkSqlWriter { val (writeSuccessful, compactionInstant) = commitAndPerformPostOperations(writeResult, parameters, writeClient, tableConfig, jsc, TableInstantInfo(basePath, instantTime, commitActionType, operation)) + + def unpersistRdd(rdd: RDD[_]): Unit = { + if (sparkContext.getPersistentRDDs.contains(rdd.id)) { + try { + rdd.unpersist() + } catch { + case t: Exception => log.warn("Got excepting trying to unpersist rdd", t) + } + } + val parentRdds = rdd.dependencies.map(_.rdd) + parentRdds.foreach { parentRdd => + unpersistRdd(parentRdd) + } + } + // it's safe to unpersist cached rdds here + unpersistRdd(writeResult.getWriteStatuses.rdd) + (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig) } }