From 9cca5f3cbc039e71feabad9f48050ec7ebce17a7 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Mon, 30 Jan 2023 16:34:22 -0800 Subject: [PATCH] Closing write client for spark ds writer in all cases (including exception) --- .../apache/hudi/HoodieSparkSqlWriter.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 7e234775faa28..304a1303a3bdf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -376,12 +376,22 @@ object HoodieSparkSqlWriter { } // Check for errors and commit the write. - val (writeSuccessful, compactionInstant, clusteringInstant) = - commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, - writeResult, parameters, writeClient, tableConfig, jsc, - TableInstantInfo(basePath, instantTime, commitActionType, operation), extraPreCommitFn) + try { + val (writeSuccessful, compactionInstant, clusteringInstant) = + commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, + writeResult, parameters, writeClient, tableConfig, jsc, + TableInstantInfo(basePath, instantTime, commitActionType, operation), extraPreCommitFn) - (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig) + (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, clusteringInstant, writeClient, tableConfig) + } finally { + // close the write client in all cases + val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient, tableConfig, parameters, jsc.hadoopConfiguration()) + val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient, parameters) + if (!asyncCompactionEnabled && !asyncClusteringEnabled) { + log.info("Closing write client") + writeClient.close() + } + } } } @@ -959,9 +969,6 @@ object HoodieSparkSqlWriter { tableInstantInfo.basePath, schema) log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled") - if (!asyncCompactionEnabled && !asyncClusteringEnabled) { - client.close() - } (commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant) } else { log.error(s"${tableInstantInfo.operation} failed with errors")