Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,6 @@ private[hudi] object HoodieSparkSqlWriter {
operation = WriteOperationType.INSERT
}

// If the mode is Overwrite, can set operation to INSERT_OVERWRITE_TABLE.
// Then in DataSourceUtils.doWriteOperation will use client.insertOverwriteTable to overwrite
// the table. This will replace the old fs.delete(tablepath) mode.
if (mode == SaveMode.Overwrite && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) {
operation = WriteOperationType.INSERT_OVERWRITE_TABLE
}

val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(path.get)
val instantTime = HoodieActiveTimeline.createNewInstantTime()
Expand Down Expand Up @@ -340,6 +333,12 @@ private[hudi] object HoodieSparkSqlWriter {
if (operation != WriteOperationType.DELETE) {
if (mode == SaveMode.ErrorIfExists && tableExists) {
throw new HoodieException(s"hoodie table at $tablePath already exists.")
} else if (mode == SaveMode.Overwrite && tableExists && operation != WriteOperationType.INSERT_OVERWRITE_TABLE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is inline with

if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {

// When user set operation as INSERT_OVERWRITE_TABLE,
// overwrite will use INSERT_OVERWRITE_TABLE operator in doWriteOperation
log.warn(s"hoodie table at $tablePath already exists. Deleting existing data & overwriting with new data.")
fs.delete(tablePath, true)
tableExists = false
}
} else {
// Delete Operation only supports Append mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

Expand All @@ -232,6 +232,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ class TestMORDataSource extends HoodieClientTestBase {
val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2))
inputDF5.write.format("org.apache.hudi")
.options(commonOpts)
.option("hoodie.compact.inline", "true")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garyli1019 Please also help to review, i delete the hoodie.compact.inline = true . Because when overwrite use replace commit, "Fifth Operation: " compaction will postponed until "Sixth Operation:", so " assertEquals(152, hudiIncDF6.count())" will be ok.
But when i modify the overwrite to fs.delete, compaction will do in "Fifth Operation: ", so after commit5Time will not have compaction commit, hudiIncDF6.count() will be 2.
So delete the inline compaction in "Fifth Operation: ", and it will do in "Sixth Operation:"
https://github.com/apache/hudi/pull/1938/files

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lw309637554 , this compaction was not supposed to be here. This probably the cause of the flaky test as well. cc: @vinothchandar

Copy link
Member

@vinothchandar vinothchandar Jan 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great. Thanks for diggin in both !

.mode(SaveMode.Append)
.save(basePath)
val commit5Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
Expand Down