Skip to content

Commit ce6ed2a

Browse files
surongquantdas
authored andcommitted
[SPARK-3954][Streaming] Optimization to FileInputDStream
about convert files to RDDS there are 3 loops with files sequence in spark source. loops files sequence: 1.files.map(...) 2.files.zip(fileRDDs) 3.files-size.foreach It's will very time consuming when lots of files.So I do the following correction: 3 loops with files sequence => only one loop Author: surq <[email protected]> Closes apache#2811 from surq/SPARK-3954 and squashes the following commits: 321bbe8 [surq] updated the code style.The style from [for...yield]to [files.map(file=>{})] 88a2c20 [surq] Merge branch 'master' of https://github.com/apache/spark into SPARK-3954 178066f [surq] modify code's style. [Exceeds 100 columns] 626ef97 [surq] remove redundant import(ArrayBuffer) 739341f [surq] promote the speed of convert files to RDDS
1 parent a1fc059 commit ce6ed2a

File tree

1 file changed

+4
-3
lines changed

1 file changed

+4
-3
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
120120

121121
/** Generate one RDD from an array of files */
122122
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
123-
val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
124-
files.zip(fileRDDs).foreach { case (file, rdd) => {
123+
val fileRDDs = files.map(file =>{
124+
val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)
125125
if (rdd.partitions.size == 0) {
126126
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
127127
"files that have been \"moved\" to the directory assigned to the file stream. " +
128128
"Refer to the streaming programming guide for more details.")
129129
}
130-
}}
130+
rdd
131+
})
131132
new UnionRDD(context.sparkContext, fileRDDs)
132133
}
133134

0 commit comments

Comments
 (0)