Skip to content

Commit 84035f1

Browse files
committed
adding binary and byte file support spark
1 parent 81c5f12 commit 84035f1

File tree

3 files changed

+171
-1
lines changed

3 files changed

+171
-1
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.mesos.MesosNativeLibrary
4040
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4141
import org.apache.spark.broadcast.Broadcast
4242
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
43-
import org.apache.spark.input.WholeTextFileInputFormat
43+
import org.apache.spark.input.{WholeTextFileInputFormat,ByteInputFormat}
4444
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4545
import org.apache.spark.rdd._
4646
import org.apache.spark.scheduler._
@@ -510,6 +510,26 @@ class SparkContext(config: SparkConf) extends Logging {
510510
minPartitions).setName(path)
511511
}
512512

513+
/**
514+
* Get an RDD for a Hadoop-readable dataset as byte-streams for each file (useful for binary data)
515+
*
516+
* @param minPartitions A suggestion value of the minimal splitting number for input data.
517+
*
518+
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
519+
*/
520+
def byteFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = {
521+
val job = new NewHadoopJob(hadoopConfiguration)
522+
NewFileInputFormat.addInputPath(job, new Path(path))
523+
val updateConf = job.getConfiguration
524+
new BinaryFileRDD(
525+
this,
526+
classOf[ByteInputFormat],
527+
classOf[String],
528+
classOf[Array[Byte]],
529+
updateConf,
530+
minPartitions).setName(path)
531+
}
532+
513533
/**
514534
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
515535
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package org.apache.spark.input
2+
3+
import scala.collection.JavaConversions._
4+
import com.google.common.io.{ByteStreams, Closeables}
5+
import org.apache.hadoop.mapreduce.InputSplit
6+
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
7+
import org.apache.hadoop.mapreduce.RecordReader
8+
import org.apache.hadoop.mapreduce.TaskAttemptContext
9+
import org.apache.hadoop.fs.Path
10+
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
11+
import org.apache.hadoop.mapreduce.JobContext
12+
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
13+
14+
15+
/**
16+
* The new (Hadoop 2.0) InputFormat for while binary files (not be to be confused with the recordreader itself)
17+
*/
18+
@serializable abstract class BinaryFileInputFormat[T]
19+
extends CombineFileInputFormat[String,T] {
20+
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
21+
/**
22+
* Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
23+
*/
24+
def setMaxSplitSize(context: JobContext, minPartitions: Int) {
25+
val files = listStatus(context)
26+
val totalLen = files.map { file =>
27+
if (file.isDir) 0L else file.getLen
28+
}.sum
29+
30+
/** val maxSplitSize = Math.ceil(totalLen * 1.0 /
31+
(if (minPartitions == 0) 1 else minPartitions)).toLong **/
32+
val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
33+
super.setMaxSplitSize(maxSplitSize)
34+
}
35+
36+
def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String,T]
37+
38+
}
39+
40+
/**
41+
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole tiff file
42+
* out in a key-value pair, where the key is the file path and the value is the entire content of
43+
* the file as a TSliceReader (to keep the size information
44+
*/
45+
@serializable abstract class BinaryRecordReader[T](
46+
split: CombineFileSplit,
47+
context: TaskAttemptContext,
48+
index: Integer)
49+
extends RecordReader[String, T] {
50+
51+
private val path = split.getPath(index)
52+
private val fs = path.getFileSystem(context.getConfiguration)
53+
54+
// True means the current file has been processed, then skip it.
55+
private var processed = false
56+
57+
private val key = path.toString
58+
private var value: T = null.asInstanceOf[T]
59+
override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
60+
override def close() = {}
61+
62+
override def getProgress = if (processed) 1.0f else 0.0f
63+
64+
override def getCurrentKey = key
65+
66+
override def getCurrentValue = value
67+
68+
override def nextKeyValue = {
69+
if (!processed) {
70+
val fileIn = fs.open(path)
71+
val innerBuffer = ByteStreams.toByteArray(fileIn)
72+
value = parseByteArray(innerBuffer)
73+
Closeables.close(fileIn, false)
74+
75+
processed = true
76+
true
77+
} else {
78+
false
79+
}
80+
}
81+
def parseByteArray(inArray: Array[Byte]): T
82+
}
83+
84+
/**
85+
* A demo class for extracting just the byte array itself
86+
*/
87+
88+
@serializable class ByteInputFormat extends BinaryFileInputFormat[Array[Byte]] {
89+
override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)=
90+
{
91+
new CombineFileRecordReader[String,Array[Byte]](split.asInstanceOf[CombineFileSplit],taContext,classOf[ByteRecordReader])
92+
}
93+
}
94+
95+
@serializable class ByteRecordReader(
96+
split: CombineFileSplit,
97+
context: TaskAttemptContext,
98+
index: Integer)
99+
extends BinaryRecordReader[Array[Byte]](split,context,index) {
100+
101+
def parseByteArray(inArray: Array[Byte]) = inArray
102+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package org.apache.spark.rdd
2+
3+
/** Allows better control of the partitioning
4+
*
5+
*/
6+
import java.text.SimpleDateFormat
7+
import java.util.Date
8+
9+
import org.apache.hadoop.conf.{Configurable, Configuration}
10+
import org.apache.hadoop.io.Writable
11+
import org.apache.hadoop.mapreduce._
12+
13+
import org.apache.spark.annotation.DeveloperApi
14+
import org.apache.spark.input.WholeTextFileInputFormat
15+
import org.apache.spark.InterruptibleIterator
16+
import org.apache.spark.Logging
17+
import org.apache.spark.Partition
18+
import org.apache.spark.SerializableWritable
19+
import org.apache.spark.{SparkContext, TaskContext}
20+
21+
import org.apache.spark.input.BinaryFileInputFormat
22+
23+
private[spark] class BinaryFileRDD[T](
24+
sc : SparkContext,
25+
inputFormatClass: Class[_ <: BinaryFileInputFormat[T]],
26+
keyClass: Class[String],
27+
valueClass: Class[T],
28+
@transient conf: Configuration,
29+
minPartitions: Int)
30+
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {
31+
32+
override def getPartitions: Array[Partition] = {
33+
val inputFormat = inputFormatClass.newInstance
34+
inputFormat match {
35+
case configurable: Configurable =>
36+
configurable.setConf(conf)
37+
case _ =>
38+
}
39+
val jobContext = newJobContext(conf, jobId)
40+
inputFormat.setMaxSplitSize(jobContext, minPartitions)
41+
val rawSplits = inputFormat.getSplits(jobContext).toArray
42+
val result = new Array[Partition](rawSplits.size)
43+
for (i <- 0 until rawSplits.size) {
44+
result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
45+
}
46+
result
47+
}
48+
}

0 commit comments

Comments
 (0)