Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.{FSUtils, TypedProperties}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
Expand Down Expand Up @@ -74,11 +75,11 @@ private[hudi] object HoodieSparkSqlWriter {
}

var writeSuccessful: Boolean = false
var commitTime: String = null
var writeStatuses: JavaRDD[WriteStatus] = null

val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(parameters("path"))
val commitTime = HoodieActiveTimeline.createNewInstantTime();
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))

Expand Down Expand Up @@ -145,7 +146,7 @@ private[hudi] object HoodieSparkSqlWriter {
log.info("new batch has no new records, skipping...")
return (true, common.util.Option.empty())
}
commitTime = client.startCommit()
client.startCommitWithTime(commitTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar : does "rollbackPendingCommits" matter here? only diff I see between client.startCommit() and callingclient.startcommitWithTime(time) is rolling back pending commits. if that is not required in this path, this patch is good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing!
In the latest master, both startCommit() and startCommitWithTime(time) are rolling back the pending commits. So I think those two functions have the same behavior except the String commitTime.
https://github.com/apache/incubator-hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java#L1009

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool. can you please rebase with upstream/master and update the commit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s strange... This commit was created from the latest master and I don’t see any diff from master. Could that be any conflict?

writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
// Check for errors and commit the write.
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
Expand Down Expand Up @@ -223,7 +224,7 @@ private[hudi] object HoodieSparkSqlWriter {
)

// Issue deletes
commitTime = client.startCommit()
client.startCommitWithTime(commitTime)
writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
writeSuccessful =
Expand Down