Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 63 additions & 102 deletions hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.HoodieRecordPayload
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.{FSUtils, TypedProperties}
Expand Down Expand Up @@ -74,19 +75,14 @@ private[hudi] object HoodieSparkSqlWriter {
parameters(OPERATION_OPT_KEY)
}

var writeSuccessful: Boolean = false
var writeStatuses: JavaRDD[WriteStatus] = null

val jsc = new JavaSparkContext(sparkContext)
val basePath = new Path(parameters("path"))
val commitTime = HoodieActiveTimeline.createNewInstantTime();
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))

// Running into issues wrt generic type conversion from Java to Scala. Couldn't make common code paths for
// write and deletes. Specifically, instantiating client of type HoodieWriteClient<T extends HoodieRecordPayload>
// is having issues. Hence some codes blocks are same in both if and else blocks.
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
// register classes & schemas
val structName = s"${tblName.get}_record"
val nameSpace = s"hoodie.${tblName.get}"
Expand Down Expand Up @@ -147,54 +143,8 @@ private[hudi] object HoodieSparkSqlWriter {
return (true, common.util.Option.empty())
}
client.startCommitWithTime(commitTime)
writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
// Check for errors and commit the write.
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
writeSuccessful =
if (errorCount == 0) {
log.info("No errors. Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
val commitSuccess = if (metaMap.isEmpty) {
client.commit(commitTime, writeStatuses)
} else {
client.commit(commitTime, writeStatuses,
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
}

if (commitSuccess) {
log.info("Commit " + commitTime + " successful!")
}
else {
log.info("Commit " + commitTime + " failed!")
}

val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
syncHive(basePath, fs, parameters)
} else {
true
}
client.close()
commitSuccess && syncHiveSucess
} else {
log.error(s"$operation failed with ${errorCount} errors :");
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeStatuses.rdd.filter(ws => ws.hasErrors)
.take(100)
.foreach(ws => {
log.trace("Global error :", ws.getGlobalError)
if (ws.getErrors.size() > 0) {
ws.getErrors.foreach(kt =>
log.trace(s"Error for key: ${kt._1}", kt._2))
}
})
}
false
}
val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
(writeStatuses, client)
} else {

// Handle save modes
Expand Down Expand Up @@ -225,55 +175,12 @@ private[hudi] object HoodieSparkSqlWriter {

// Issue deletes
client.startCommitWithTime(commitTime)
writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
writeSuccessful =
if (errorCount == 0) {
log.info("No errors. Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
val commitSuccess = if (metaMap.isEmpty) {
client.commit(commitTime, writeStatuses)
} else {
client.commit(commitTime, writeStatuses,
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
}

if (commitSuccess) {
log.info("Commit " + commitTime + " successful!")
}
else {
log.info("Commit " + commitTime + " failed!")
}

val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
syncHive(basePath, fs, parameters)
} else {
true
}
client.close()
commitSuccess && syncHiveSucess
} else {
log.error(s"$operation failed with ${errorCount} errors :");
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeStatuses.rdd.filter(ws => ws.hasErrors)
.take(100)
.foreach(ws => {
log.trace("Global error :", ws.getGlobalError)
if (ws.getErrors.size() > 0) {
ws.getErrors.foreach(kt =>
log.trace(s"Error for key: ${kt._1}", kt._2))
}
})
}
false
}
val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
(writeStatuses, client)
}

// Check for errors and commit the write.
val writeSuccessful = checkWriteStatus(writeStatuses, parameters, writeClient, commitTime, basePath, operation, jsc)
(writeSuccessful, common.util.Option.ofNullable(commitTime))
}

Expand Down Expand Up @@ -340,4 +247,58 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
hiveSyncConfig
}

private def checkWriteStatus(writeStatuses: JavaRDD[WriteStatus],
parameters: Map[String, String],
client: HoodieWriteClient[_],
commitTime: String,
basePath: Path,
operation: String,
jsc: JavaSparkContext): Boolean = {
val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
if (errorCount == 0) {
log.info("No errors. Proceeding to commit the write.")
val metaMap = parameters.filter(kv =>
kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
val commitSuccess = if (metaMap.isEmpty) {
client.commit(commitTime, writeStatuses)
} else {
client.commit(commitTime, writeStatuses,
common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
}

if (commitSuccess) {
log.info("Commit " + commitTime + " successful!")
}
else {
log.info("Commit " + commitTime + " failed!")
}

val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
syncHive(basePath, fs, parameters)
} else {
true
}
client.close()
commitSuccess && syncHiveSucess
} else {
log.error(s"$operation failed with ${errorCount} errors :");
if (log.isTraceEnabled) {
log.trace("Printing out the top 100 errors")
writeStatuses.rdd.filter(ws => ws.hasErrors)
.take(100)
.foreach(ws => {
log.trace("Global error :", ws.getGlobalError)
if (ws.getErrors.size() > 0) {
ws.getErrors.foreach(kt =>
log.trace(s"Error for key: ${kt._1}", kt._2))
}
})
}
false
}
}
}