From bded22e93072bae51978c4a1b08fdc873ecea80c Mon Sep 17 00:00:00 2001 From: Fei Shao <1427357147@qq.com> Date: Sat, 22 Jul 2017 20:21:27 +0800 Subject: [PATCH] [SPARK-21357][DStreams] FileInputDStream not remove out of date RDD ### What changes were proposed in this pull request? ```DStreams class FileInputDStream protected[streaming] override def clearMetadata(time: Time) { batchTimeToSelectedFiles.synchronized { val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration)) batchTimeToSelectedFiles --= oldFiles.keys ``` The above code does not remove the old generatedRDDs. "super.clearMetadata(time)" was added to the beginning of clearMetadata to remove the old generatedRDDs. ## How was this patch tested At the end of clearMetadata, the testing code (print the number of generatedRDDs) was added to check the old RDDS were removed. --- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 905b1c52afa69..b8a5a96faf15c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -164,6 +164,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( /** Clear the old time-to-files mappings along with old RDDs */ protected[streaming] override def clearMetadata(time: Time) { + super.clearMetadata(time) batchTimeToSelectedFiles.synchronized { val oldFiles = batchTimeToSelectedFiles.filter(_._1 < (time - rememberDuration)) batchTimeToSelectedFiles --= oldFiles.keys