Skip to content

Commit 037fe4d

Browse files
yinxusenmateiz
authored andcommitted
[SPARK-1415] Hadoop min split for wholeTextFiles()
JIRA issue [here](https://issues.apache.org/jira/browse/SPARK-1415). New Hadoop API of `InputFormat` does not provide the `minSplits` parameter, which makes the API incompatible between `HadoopRDD` and `NewHadoopRDD`. The PR is for constructing compatible APIs. Though `minSplits` is deprecated by New Hadoop API, we think it is better to make APIs compatible here. **Note** that `minSplits` in `wholeTextFiles` could only be treated as a *suggestion*, the real number of splits may not be greater than `minSplits` due to `isSplitable()=false`. Author: Xusen Yin <[email protected]> Closes #376 from yinxusen/hadoop-min-split and squashes the following commits: 76417f6 [Xusen Yin] refine comments c10af60 [Xusen Yin] refine comments and rewrite new class for wholeTextFile 766d05b [Xusen Yin] refine Java API and comments 4875755 [Xusen Yin] add minSplits for WholeTextFiles
1 parent 4bc07ee commit 037fe4d

File tree

6 files changed

+90
-19
lines changed

6 files changed

+90
-19
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -454,14 +454,21 @@ class SparkContext(config: SparkConf) extends Logging {
454454
* (a-hdfs-path/part-nnnnn, its content)
455455
* }}}
456456
*
457-
* @note Small files are preferred, as each file will be loaded fully in memory.
457+
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
458+
*
459+
* @param minSplits A suggestion value of the minimal splitting number for input data.
458460
*/
459-
def wholeTextFiles(path: String): RDD[(String, String)] = {
460-
newAPIHadoopFile(
461-
path,
461+
def wholeTextFiles(path: String, minSplits: Int = defaultMinSplits): RDD[(String, String)] = {
462+
val job = new NewHadoopJob(hadoopConfiguration)
463+
NewFileInputFormat.addInputPath(job, new Path(path))
464+
val updateConf = job.getConfiguration
465+
new WholeTextFileRDD(
466+
this,
462467
classOf[WholeTextFileInputFormat],
463468
classOf[String],
464-
classOf[String])
469+
classOf[String],
470+
updateConf,
471+
minSplits)
465472
}
466473

467474
/**

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,19 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
177177
* (a-hdfs-path/part-nnnnn, its content)
178178
* }}}
179179
*
180-
* @note Small files are preferred, as each file will be loaded fully in memory.
180+
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
181+
*
182+
* @param minSplits A suggestion value of the minimal splitting number for input data.
183+
*/
184+
def wholeTextFiles(path: String, minSplits: Int): JavaPairRDD[String, String] =
185+
new JavaPairRDD(sc.wholeTextFiles(path, minSplits))
186+
187+
/**
188+
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
189+
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
190+
* key-value pair, where the key is the path of each file, the value is the content of each file.
191+
*
192+
* @see `wholeTextFiles(path: String, minSplits: Int)`.
181193
*/
182194
def wholeTextFiles(path: String): JavaPairRDD[String, String] =
183195
new JavaPairRDD(sc.wholeTextFiles(path))

core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.input
1919

20+
import scala.collection.JavaConversions._
21+
2022
import org.apache.hadoop.fs.Path
2123
import org.apache.hadoop.mapreduce.InputSplit
2224
import org.apache.hadoop.mapreduce.JobContext
@@ -44,4 +46,16 @@ private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[Str
4446
context,
4547
classOf[WholeTextFileRecordReader])
4648
}
49+
50+
/**
51+
* Allow minSplits set by end-user in order to keep compatibility with old Hadoop API.
52+
*/
53+
def setMaxSplitSize(context: JobContext, minSplits: Int) {
54+
val files = listStatus(context)
55+
val totalLen = files.map { file =>
56+
if (file.isDir) 0L else file.getLen
57+
}.sum
58+
val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minSplits == 0) 1 else minSplits)).toLong
59+
super.setMaxSplitSize(maxSplitSize)
60+
}
4761
}

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,18 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
2424
import org.apache.hadoop.io.Writable
2525
import org.apache.hadoop.mapreduce._
2626

27-
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
2827
import org.apache.spark.annotation.DeveloperApi
29-
30-
private[spark]
31-
class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
28+
import org.apache.spark.input.WholeTextFileInputFormat
29+
import org.apache.spark.InterruptibleIterator
30+
import org.apache.spark.Logging
31+
import org.apache.spark.Partition
32+
import org.apache.spark.SerializableWritable
33+
import org.apache.spark.{SparkContext, TaskContext}
34+
35+
private[spark] class NewHadoopPartition(
36+
rddId: Int,
37+
val index: Int,
38+
@transient rawSplit: InputSplit with Writable)
3239
extends Partition {
3340

3441
val serializableHadoopSplit = new SerializableWritable(rawSplit)
@@ -65,17 +72,19 @@ class NewHadoopRDD[K, V](
6572
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
6673
// private val serializableConf = new SerializableWritable(conf)
6774

68-
private val jobtrackerId: String = {
75+
private val jobTrackerId: String = {
6976
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
7077
formatter.format(new Date())
7178
}
7279

73-
@transient private val jobId = new JobID(jobtrackerId, id)
80+
@transient protected val jobId = new JobID(jobTrackerId, id)
7481

7582
override def getPartitions: Array[Partition] = {
7683
val inputFormat = inputFormatClass.newInstance
77-
if (inputFormat.isInstanceOf[Configurable]) {
78-
inputFormat.asInstanceOf[Configurable].setConf(conf)
84+
inputFormat match {
85+
case configurable: Configurable =>
86+
configurable.setConf(conf)
87+
case _ =>
7988
}
8089
val jobContext = newJobContext(conf, jobId)
8190
val rawSplits = inputFormat.getSplits(jobContext).toArray
@@ -91,11 +100,13 @@ class NewHadoopRDD[K, V](
91100
val split = theSplit.asInstanceOf[NewHadoopPartition]
92101
logInfo("Input split: " + split.serializableHadoopSplit)
93102
val conf = confBroadcast.value.value
94-
val attemptId = newTaskAttemptID(jobtrackerId, id, isMap = true, split.index, 0)
103+
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
95104
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
96105
val format = inputFormatClass.newInstance
97-
if (format.isInstanceOf[Configurable]) {
98-
format.asInstanceOf[Configurable].setConf(conf)
106+
format match {
107+
case configurable: Configurable =>
108+
configurable.setConf(conf)
109+
case _ =>
99110
}
100111
val reader = format.createRecordReader(
101112
split.serializableHadoopSplit.value, hadoopAttemptContext)
@@ -141,3 +152,30 @@ class NewHadoopRDD[K, V](
141152
def getConf: Configuration = confBroadcast.value.value
142153
}
143154

155+
private[spark] class WholeTextFileRDD(
156+
sc : SparkContext,
157+
inputFormatClass: Class[_ <: WholeTextFileInputFormat],
158+
keyClass: Class[String],
159+
valueClass: Class[String],
160+
@transient conf: Configuration,
161+
minSplits: Int)
162+
extends NewHadoopRDD[String, String](sc, inputFormatClass, keyClass, valueClass, conf) {
163+
164+
override def getPartitions: Array[Partition] = {
165+
val inputFormat = inputFormatClass.newInstance
166+
inputFormat match {
167+
case configurable: Configurable =>
168+
configurable.setConf(conf)
169+
case _ =>
170+
}
171+
val jobContext = newJobContext(conf, jobId)
172+
inputFormat.setMaxSplitSize(jobContext, minSplits)
173+
val rawSplits = inputFormat.getSplits(jobContext).toArray
174+
val result = new Array[Partition](rawSplits.size)
175+
for (i <- 0 until rawSplits.size) {
176+
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
177+
}
178+
result
179+
}
180+
}
181+

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ public void wholeTextFiles() throws IOException {
626626
container.put(tempDirName+"/part-00000", new Text(content1).toString());
627627
container.put(tempDirName+"/part-00001", new Text(content2).toString());
628628

629-
JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName);
629+
JavaPairRDD<String, String> readRDD = sc.wholeTextFiles(tempDirName, 3);
630630
List<Tuple2<String, String>> result = readRDD.collect();
631631

632632
for (Tuple2<String, String> res : result) {

core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
7373
createNativeFile(dir, filename, contents)
7474
}
7575

76-
val res = sc.wholeTextFiles(dir.toString).collect()
76+
val res = sc.wholeTextFiles(dir.toString, 3).collect()
7777

7878
assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
7979
"Number of files read out does not fit with the actual value.")

0 commit comments

Comments
 (0)