Skip to content

Commit 44f67ac

Browse files
Davies LiuJoshRosen
authored andcommitted
[SPARK-2672] support compressed file in wholeTextFile
The wholeFile() can not read compressed files, it should be, just like textFile(). Author: Davies Liu <[email protected]> Closes #3005 from davies/whole and squashes the following commits: a43fcfb [Davies Liu] remove semicolon c83571a [Davies Liu] remove = if return type is Unit 83c844f [Davies Liu] Merge branch 'master' of github.com:apache/spark into whole 22e8b3e [Davies Liu] support compressed file in wholeTextFile (cherry picked from commit d7d54a4) Signed-off-by: Josh Rosen <[email protected]>
1 parent 16da988 commit 44f67ac

File tree

3 files changed

+103
-13
lines changed

3 files changed

+103
-13
lines changed

core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,38 @@ package org.apache.spark.input
1919

2020
import scala.collection.JavaConversions._
2121

22+
import org.apache.hadoop.conf.{Configuration, Configurable}
2223
import org.apache.hadoop.fs.Path
2324
import org.apache.hadoop.mapreduce.InputSplit
2425
import org.apache.hadoop.mapreduce.JobContext
2526
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
2627
import org.apache.hadoop.mapreduce.RecordReader
2728
import org.apache.hadoop.mapreduce.TaskAttemptContext
28-
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
29-
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
3029

3130
/**
3231
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
3332
* reading whole text files. Each file is read as key-value pair, where the key is the file path and
3433
* the value is the entire content of file.
3534
*/
3635

37-
private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
36+
private[spark] class WholeTextFileInputFormat
37+
extends CombineFileInputFormat[String, String] with Configurable {
38+
3839
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
3940

41+
private var conf: Configuration = _
42+
def setConf(c: Configuration) {
43+
conf = c
44+
}
45+
def getConf: Configuration = conf
46+
4047
override def createRecordReader(
4148
split: InputSplit,
4249
context: TaskAttemptContext): RecordReader[String, String] = {
4350

44-
new CombineFileRecordReader[String, String](
45-
split.asInstanceOf[CombineFileSplit],
46-
context,
47-
classOf[WholeTextFileRecordReader])
51+
val reader = new WholeCombineFileRecordReader(split, context)
52+
reader.setConf(conf)
53+
reader
4854
}
4955

5056
/**

core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.spark.input
1919

20+
import org.apache.hadoop.conf.{Configuration, Configurable}
2021
import com.google.common.io.{ByteStreams, Closeables}
2122

2223
import org.apache.hadoop.io.Text
24+
import org.apache.hadoop.io.compress.CompressionCodecFactory
2325
import org.apache.hadoop.mapreduce.InputSplit
24-
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
26+
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
2527
import org.apache.hadoop.mapreduce.RecordReader
2628
import org.apache.hadoop.mapreduce.TaskAttemptContext
2729

@@ -34,7 +36,13 @@ private[spark] class WholeTextFileRecordReader(
3436
split: CombineFileSplit,
3537
context: TaskAttemptContext,
3638
index: Integer)
37-
extends RecordReader[String, String] {
39+
extends RecordReader[String, String] with Configurable {
40+
41+
private var conf: Configuration = _
42+
def setConf(c: Configuration) {
43+
conf = c
44+
}
45+
def getConf: Configuration = conf
3846

3947
private[this] val path = split.getPath(index)
4048
private[this] val fs = path.getFileSystem(context.getConfiguration)
@@ -57,8 +65,16 @@ private[spark] class WholeTextFileRecordReader(
5765

5866
override def nextKeyValue(): Boolean = {
5967
if (!processed) {
68+
val conf = new Configuration
69+
val factory = new CompressionCodecFactory(conf)
70+
val codec = factory.getCodec(path) // infers from file ext.
6071
val fileIn = fs.open(path)
61-
val innerBuffer = ByteStreams.toByteArray(fileIn)
72+
val innerBuffer = if (codec != null) {
73+
ByteStreams.toByteArray(codec.createInputStream(fileIn))
74+
} else {
75+
ByteStreams.toByteArray(fileIn)
76+
}
77+
6278
value = new Text(innerBuffer).toString
6379
Closeables.close(fileIn, false)
6480
processed = true
@@ -68,3 +84,33 @@ private[spark] class WholeTextFileRecordReader(
6884
}
6985
}
7086
}
87+
88+
89+
/**
90+
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
91+
* out in a key-value pair, where the key is the file path and the value is the entire content of
92+
* the file.
93+
*/
94+
private[spark] class WholeCombineFileRecordReader(
95+
split: InputSplit,
96+
context: TaskAttemptContext)
97+
extends CombineFileRecordReader[String, String](
98+
split.asInstanceOf[CombineFileSplit],
99+
context,
100+
classOf[WholeTextFileRecordReader]
101+
) with Configurable {
102+
103+
private var conf: Configuration = _
104+
def setConf(c: Configuration) {
105+
conf = c
106+
}
107+
def getConf: Configuration = conf
108+
109+
override def initNextRecordReader(): Boolean = {
110+
val r = super.initNextRecordReader()
111+
if (r) {
112+
this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
113+
}
114+
r
115+
}
116+
}

core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text
3030

3131
import org.apache.spark.SparkContext
3232
import org.apache.spark.util.Utils
33+
import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodecFactory, GzipCodec}
3334

3435
/**
3536
* Tests the correctness of
@@ -38,20 +39,32 @@ import org.apache.spark.util.Utils
3839
*/
3940
class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
4041
private var sc: SparkContext = _
42+
private var factory: CompressionCodecFactory = _
4143

4244
override def beforeAll() {
4345
sc = new SparkContext("local", "test")
4446

4547
// Set the block size of local file system to test whether files are split right or not.
4648
sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
49+
sc.hadoopConfiguration.set("io.compression.codecs",
50+
"org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec")
51+
factory = new CompressionCodecFactory(sc.hadoopConfiguration)
4752
}
4853

4954
override def afterAll() {
5055
sc.stop()
5156
}
5257

53-
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = {
54-
val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName"))
58+
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
59+
compress: Boolean) = {
60+
val out = if (compress) {
61+
val codec = new GzipCodec
62+
val path = s"${inputDir.toString}/$fileName${codec.getDefaultExtension}"
63+
codec.createOutputStream(new DataOutputStream(new FileOutputStream(path)))
64+
} else {
65+
val path = s"${inputDir.toString}/$fileName"
66+
new DataOutputStream(new FileOutputStream(path))
67+
}
5568
out.write(contents, 0, contents.length)
5669
out.close()
5770
}
@@ -68,7 +81,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
6881
println(s"Local disk address is ${dir.toString}.")
6982

7083
WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
71-
createNativeFile(dir, filename, contents)
84+
createNativeFile(dir, filename, contents, false)
7285
}
7386

7487
val res = sc.wholeTextFiles(dir.toString, 3).collect()
@@ -86,6 +99,31 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
8699

87100
Utils.deleteRecursively(dir)
88101
}
102+
103+
test("Correctness of WholeTextFileRecordReader with GzipCodec.") {
104+
val dir = Utils.createTempDir()
105+
println(s"Local disk address is ${dir.toString}.")
106+
107+
WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
108+
createNativeFile(dir, filename, contents, true)
109+
}
110+
111+
val res = sc.wholeTextFiles(dir.toString, 3).collect()
112+
113+
assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
114+
"Number of files read out does not fit with the actual value.")
115+
116+
for ((filename, contents) <- res) {
117+
val shortName = filename.split('/').last.split('.')(0)
118+
119+
assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
120+
s"Missing file name $filename.")
121+
assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
122+
s"file $filename contents can not match.")
123+
}
124+
125+
Utils.deleteRecursively(dir)
126+
}
89127
}
90128

91129
/**

0 commit comments

Comments
 (0)