diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 472f450337022..4e9caa56e0df8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -94,13 +94,6 @@ private[hudi] object HoodieSparkSqlWriter { operation = WriteOperationType.INSERT } - // If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE. - // Then in DataSourceUtils.doWriteOperation will use client.insertOverwriteTable to overwrite - // the table. This will replace the old fs.delete(tablepath) mode. - if (mode == SaveMode.Overwrite && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) { - operation = WriteOperationType.INSERT_OVERWRITE_TABLE - } - val jsc = new JavaSparkContext(sparkContext) val basePath = new Path(path.get) val instantTime = HoodieActiveTimeline.createNewInstantTime() @@ -340,6 +333,12 @@ private[hudi] object HoodieSparkSqlWriter { if (operation != WriteOperationType.DELETE) { if (mode == SaveMode.ErrorIfExists && tableExists) { throw new HoodieException(s"hoodie table at $tablePath already exists.") + } else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) { + // When user set operation as INSERT_OVERWRITE_TABLE, + // overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation + log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.") + fs.delete(tablePath, true) + tableExists = false } } else { // Delete Operation only supports Append mode diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index f315a2672298e..f14feb704fd17 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -205,7 +205,7 @@ class TestCOWDataSource extends HoodieClientTestBase { val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(commonOpts) - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL) .mode(SaveMode.Overwrite) .save(basePath) @@ -232,6 +232,7 @@ class TestCOWDataSource extends HoodieClientTestBase { val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2)) inputDF2.write.format("org.apache.hudi") .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL) .mode(SaveMode.Overwrite) .save(basePath) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 121957e8590b2..1ea6ceb879fb9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -278,7 +278,6 @@ class TestMORDataSource extends HoodieClientTestBase { val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2)) inputDF5.write.format("org.apache.hudi") .options(commonOpts) - .option("hoodie.compact.inline", "true") .mode(SaveMode.Append) .save(basePath) val commit5Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)