@@ -21,6 +21,7 @@ import java.util.Properties
2121import java .util .concurrent .{ConcurrentHashMap , TimeUnit }
2222
2323import scala .collection .JavaConverters ._
24+ import scala .collection .mutable .HashSet
2425import scala .util .Failure
2526
2627import org .apache .commons .lang .SerializationUtils
@@ -64,6 +65,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
6465
6566 private var eventLoop : EventLoop [JobSchedulerEvent ] = null
6667
68+ private val inputInfoMissedTimes = HashSet [Time ]()
69+
6770 def start (): Unit = synchronized {
6871 if (eventLoop != null ) return // scheduler has already been started
6972
@@ -139,6 +142,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
139142 def submitJobSet (jobSet : JobSet ) {
140143 if (jobSet.jobs.isEmpty) {
141144 logInfo(" No jobs added for time " + jobSet.time)
145+ inputInfoMissedTimes.add(jobSet.time)
142146 } else {
143147 listenerBus.post(StreamingListenerBatchSubmitted (jobSet.toBatchInfo))
144148 jobSets.put(jobSet.time, jobSet)
@@ -193,6 +197,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
193197 listenerBus.post(StreamingListenerOutputOperationCompleted (job.toOutputOperationInfo))
194198 logInfo(" Finished job " + job.id + " from job set of time " + jobSet.time)
195199 if (jobSet.hasCompleted) {
200+ // submit fake BatchCompleted event to show missing inputInfo on Streaming UI
201+ inputInfoMissedTimes.foreach (time => {
202+ val streamIdToInputInfos = inputInfoTracker.getInfo(time)
203+ val fakeJobSet = JobSet (time, Seq (), streamIdToInputInfos)
204+ listenerBus.post(StreamingListenerBatchCompleted (fakeJobSet.toBatchInfo))
205+ })
206+ inputInfoMissedTimes.clear()
207+
196208 jobSets.remove(jobSet.time)
197209 jobGenerator.onBatchCompletion(jobSet.time)
198210 logInfo(" Total delay: %.3f s for time %s (execution: %.3f s)" .format(
0 commit comments