From d904b25a7037e2b12693158f29e069f13aa0fa78 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 3 Dec 2015 01:30:27 -0800 Subject: [PATCH 1/2] Remove duplicate --- .../org/apache/spark/streaming/scheduler/JobGenerator.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 2de035d166e7..32efe9210cde 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -220,7 +220,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times - val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering) + val timesToReschedule = (pendingTimes ++ downTimes).filter { _ != restartTime } + .distinct.sorted(Time.ordering) logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach { time => From fe69fbfe185e12e70d46003a40988fff57cd24b7 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 3 Dec 2015 12:04:01 -0800 Subject: [PATCH 2/2] Changed filter --- .../org/apache/spark/streaming/scheduler/JobGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 32efe9210cde..8dfdc1f57b40 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -220,7 +220,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times - val timesToReschedule = (pendingTimes ++ downTimes).filter { _ != restartTime } + val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime } .distinct.sorted(Time.ordering) logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", "))