@@ -80,6 +80,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
8080
8181 private val serializableConfOpt = conf.map(new SerializableWritable (_))
8282
83+ /**
84+ * Minimum duration of remembering the information of selected files. Defaults to 1 minute.
85+ *
86+ * Files with mod times older than this "window" of remembering will be ignored. So if new
87+ * files are visible within this window, then the file will get selected in the next batch.
88+ */
89+ private val minRememberDurationMin = Minutes (ssc.sparkContext.getConf
90+ .get(" spark.streaming.minRememberDurationMin" , " 1" )
91+ .toLong)
92+
8393 // This is a def so that it works during checkpoint recovery:
8494 private def clock = ssc.scheduler.clock
8595
@@ -95,7 +105,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
95105 * This would allow us to filter away not-too-old files which have already been recently
96106 * selected and processed.
97107 */
98- private val numBatchesToRemember = FileInputDStream .calculateNumBatchesToRemember(slideDuration)
108+ private val numBatchesToRemember = FileInputDStream
109+ .calculateNumBatchesToRemember(slideDuration, minRememberDurationMin)
99110 private val durationToRemember = slideDuration * numBatchesToRemember
100111 remember(durationToRemember)
101112
@@ -330,23 +341,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
330341private [streaming]
331342object FileInputDStream {
332343
333- /**
334- * Minimum duration of remembering the information of selected files. Defaults to 1 minute.
335- *
336- * Files with mod times older than this "window" of remembering will be ignored. So if new
337- * files are visible within this window, then the file will get selected in the next batch.
338- */
339- private val minRememberDurationMin = Minutes (new SparkConf ()
340- .get(" spark.streaming.minRememberDurationMin" , " 1" )
341- .toLong)
342-
343344 def defaultFilter (path : Path ): Boolean = ! path.getName().startsWith(" ." )
344345
345346 /**
346347 * Calculate the number of last batches to remember, such that all the files selected in
347348 * at least last minRememberDurationMin duration can be remembered.
348349 */
349- def calculateNumBatchesToRemember (batchDuration : Duration ): Int = {
350+ def calculateNumBatchesToRemember (batchDuration : Duration ,
351+ minRememberDurationMin : Duration ): Int = {
350352 math.ceil(minRememberDurationMin.milliseconds.toDouble / batchDuration.milliseconds).toInt
351353 }
352354}
0 commit comments