From 087daf0101ce9f101d2d83fd9b2f336ed70acdd2 Mon Sep 17 00:00:00 2001 From: Hao Zhu Date: Mon, 10 Aug 2015 14:53:54 -0700 Subject: [PATCH 1/3] SPARK-9801 Spark streaming deletes the temp file and backup files without checking if they exist or not --- .../scala/org/apache/spark/streaming/Checkpoint.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 2780d5b6adbc..3305d1828cae 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -192,7 +192,9 @@ class CheckpointWriter( + "'") // Write checkpoint to temp file - fs.delete(tempFile, true) // just in case it exists + if (fs.exists(tempFile)) { + fs.delete(tempFile, true) // just in case it exists + } val fos = fs.create(tempFile) Utils.tryWithSafeFinally { fos.write(bytes) @@ -219,7 +221,9 @@ class CheckpointWriter( if (allCheckpointFiles.size > 10) { allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => { logInfo("Deleting " + file) - fs.delete(file, true) + if (fs.exists(file)) { + fs.delete(file, true) + } }) } From fd143f28081bd5351b0a9aef14553591d2d4fb4d Mon Sep 17 00:00:00 2001 From: Hao Zhu Date: Mon, 10 Aug 2015 15:55:49 -0700 Subject: [PATCH 2/3] [SPARK-9801][Streaming]Check if backupFile exists before deleting backupFile files. --- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 3305d1828cae..67815f7b1b8d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -205,7 +205,9 @@ class CheckpointWriter( // If the checkpoint file exists, back it up // If the backup exists as well, just delete it, otherwise rename will fail if (fs.exists(checkpointFile)) { - fs.delete(backupFile, true) // just in case it exists + if (fs.exists(backupFile)){ + fs.delete(backupFile, true) // just in case it exists + } if (!fs.rename(checkpointFile, backupFile)) { logWarning("Could not rename " + checkpointFile + " to " + backupFile) } From 242d05f34b4fd789d4fe864751e732ab53b40ded Mon Sep 17 00:00:00 2001 From: Hao Zhu Date: Mon, 10 Aug 2015 16:14:38 -0700 Subject: [PATCH 3/3] [SPARK-9801][Streaming]No need to check the existence of those files --- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 67815f7b1b8d..6f6b449accc3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -223,9 +223,7 @@ class CheckpointWriter( if (allCheckpointFiles.size > 10) { allCheckpointFiles.take(allCheckpointFiles.size - 10).foreach(file => { logInfo("Deleting " + file) - if (fs.exists(file)) { - fs.delete(file, true) - } + fs.delete(file, true) }) }