diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 61eb601a18c3..8eeb3c389029 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -27,6 +27,7 @@ import scala.util.control.NonFatal import com.google.common.io.ByteStreams import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.hdfs.protocol.HdfsConstants import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging @@ -274,8 +275,19 @@ private[state] class HDFSBackedStateStoreProvider( private def commitUpdates(newVersion: Long, map: MapType, tempDeltaFile: Path): Path = { synchronized { val finalDeltaFile = deltaFile(newVersion) - if (!fs.rename(tempDeltaFile, finalDeltaFile)) { - throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") + /** When job restart, delta file may be already generated before on account of offsets WAL. */ + if (fs.getScheme == HdfsConstants.HDFS_URI_SCHEME) { + if (!fs.exists(finalDeltaFile)) { + if (!fs.rename(tempDeltaFile, finalDeltaFile)) { + throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") + } + } else { + fs.delete(tempDeltaFile, true) + } + } else { + if (!fs.rename(tempDeltaFile, finalDeltaFile)) { + throw new IOException(s"Failed to rename $tempDeltaFile to $finalDeltaFile") + } } loadedMaps.put(newVersion, map) finalDeltaFile