Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -961,12 +961,11 @@ class SparkContext(config: SparkConf) extends Logging {
classOf[LongWritable],
classOf[BytesWritable],
conf = conf)
val data = br.map { case (k, v) =>
val bytes = v.getBytes
br.map { case (k, v) =>
val bytes = v.copyBytes()
assert(bytes.length == recordLength, "Byte array does not have correct length")
bytes
}
data
}

/**
Expand Down
178 changes: 38 additions & 140 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io._
import java.nio.ByteBuffer
import java.util.zip.GZIPOutputStream

import scala.io.Source
Expand All @@ -30,7 +31,6 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}

import org.apache.spark.input.PortableDataStream
import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -237,184 +237,82 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
}

test("binary file input as byte array") {
sc = new SparkContext("local", "test")
private def writeBinaryData(testOutput: Array[Byte], testOutputCopies: Int): File = {
val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()

// create file
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
val bbuf = java.nio.ByteBuffer.wrap(testOutput)
// write data to file
val file = new java.io.FileOutputStream(outFile)
val file = new FileOutputStream(outFile)
val channel = file.getChannel
channel.write(bbuf)
for (i <- 0 until testOutputCopies) {
// Shift values by i so that they're different in the output
val alteredOutput = testOutput.map(b => (b + i).toByte)
channel.write(ByteBuffer.wrap(alteredOutput))
}
channel.close()
file.close()
outFile
}

val inRdd = sc.binaryFiles(outFileName)
val (infile: String, indata: PortableDataStream) = inRdd.collect.head

test("binary file input as byte array") {
sc = new SparkContext("local", "test")
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
val outFile = writeBinaryData(testOutput, 1)
val inRdd = sc.binaryFiles(outFile.getAbsolutePath)
val (infile, indata) = inRdd.collect().head
// Make sure the name and array match
assert(infile.contains(outFile.toURI.getPath)) // a prefix may get added
assert(indata.toArray === testOutput)
}

test("portabledatastream caching tests") {
sc = new SparkContext("local", "test")
val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()

// create file
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
val bbuf = java.nio.ByteBuffer.wrap(testOutput)
// write data to file
val file = new java.io.FileOutputStream(outFile)
val channel = file.getChannel
channel.write(bbuf)
channel.close()
file.close()

val inRdd = sc.binaryFiles(outFileName).cache()
inRdd.foreach{
curData: (String, PortableDataStream) =>
curData._2.toArray() // force the file to read
}
val mappedRdd = inRdd.map {
curData: (String, PortableDataStream) =>
(curData._2.getPath(), curData._2)
}
val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head

val outFile = writeBinaryData(testOutput, 1)
val inRdd = sc.binaryFiles(outFile.getAbsolutePath).cache()
inRdd.foreach(_._2.toArray()) // force the file to read
// Try reading the output back as an object file

assert(indata.toArray === testOutput)
assert(inRdd.values.collect().head.toArray === testOutput)
}

test("portabledatastream persist disk storage") {
sc = new SparkContext("local", "test")
val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()

// create file
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
val bbuf = java.nio.ByteBuffer.wrap(testOutput)
// write data to file
val file = new java.io.FileOutputStream(outFile)
val channel = file.getChannel
channel.write(bbuf)
channel.close()
file.close()

val inRdd = sc.binaryFiles(outFileName).persist(StorageLevel.DISK_ONLY)
inRdd.foreach{
curData: (String, PortableDataStream) =>
curData._2.toArray() // force the file to read
}
val mappedRdd = inRdd.map {
curData: (String, PortableDataStream) =>
(curData._2.getPath(), curData._2)
}
val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head

// Try reading the output back as an object file

assert(indata.toArray === testOutput)
val outFile = writeBinaryData(testOutput, 1)
val inRdd = sc.binaryFiles(outFile.getAbsolutePath).persist(StorageLevel.DISK_ONLY)
inRdd.foreach(_._2.toArray()) // force the file to read
assert(inRdd.values.collect().head.toArray === testOutput)
}

test("portabledatastream flatmap tests") {
sc = new SparkContext("local", "test")
val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()

// create file
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
val outFile = writeBinaryData(testOutput, 1)
val inRdd = sc.binaryFiles(outFile.getAbsolutePath)
val numOfCopies = 3
val bbuf = java.nio.ByteBuffer.wrap(testOutput)
// write data to file
val file = new java.io.FileOutputStream(outFile)
val channel = file.getChannel
channel.write(bbuf)
channel.close()
file.close()

val inRdd = sc.binaryFiles(outFileName)
val mappedRdd = inRdd.map {
curData: (String, PortableDataStream) =>
(curData._2.getPath(), curData._2)
}
val copyRdd = mappedRdd.flatMap {
curData: (String, PortableDataStream) =>
for (i <- 1 to numOfCopies) yield (i, curData._2)
}

val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect()

// Try reading the output back as an object file
val copyRdd = inRdd.flatMap(curData => (0 until numOfCopies).map(_ => curData._2))
val copyArr = copyRdd.collect()
assert(copyArr.length == numOfCopies)
copyArr.foreach{
cEntry: (Int, PortableDataStream) =>
assert(cEntry._2.toArray === testOutput)
for (i <- copyArr.indices) {
assert(copyArr(i).toArray === testOutput)
}

}

test("fixed record length binary file as byte array") {
// a fixed length of 6 bytes

sc = new SparkContext("local", "test")

val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()

// create file
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
val testOutputCopies = 10

// write data to file
val file = new java.io.FileOutputStream(outFile)
val channel = file.getChannel
for(i <- 1 to testOutputCopies) {
val bbuf = java.nio.ByteBuffer.wrap(testOutput)
channel.write(bbuf)
}
channel.close()
file.close()

val inRdd = sc.binaryRecords(outFileName, testOutput.length)
// make sure there are enough elements
val outFile = writeBinaryData(testOutput, testOutputCopies)
val inRdd = sc.binaryRecords(outFile.getAbsolutePath, testOutput.length)
assert(inRdd.count == testOutputCopies)

// now just compare the first one
val indata: Array[Byte] = inRdd.collect.head
assert(indata === testOutput)
val inArr = inRdd.collect()
for (i <- inArr.indices) {
assert(inArr(i) === testOutput.map(b => (b + i).toByte))
}
}

test ("negative binary record length should raise an exception") {
// a fixed length of 6 bytes
sc = new SparkContext("local", "test")

val outFile = new File(tempDir, "record-bytestream-00000.bin")
val outFileName = outFile.getAbsolutePath()

// create file
val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
val testOutputCopies = 10

// write data to file
val file = new java.io.FileOutputStream(outFile)
val channel = file.getChannel
for(i <- 1 to testOutputCopies) {
val bbuf = java.nio.ByteBuffer.wrap(testOutput)
channel.write(bbuf)
}
channel.close()
file.close()

val inRdd = sc.binaryRecords(outFileName, -1)

val outFile = writeBinaryData(Array[Byte](1, 2, 3, 4, 5, 6), 1)
intercept[SparkException] {
inRdd.count
sc.binaryRecords(outFile.getAbsolutePath, -1).count()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,12 @@ class StreamingContext private[streaming] (
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
val data = br.map { case (k, v) =>
val bytes = v.getBytes
br.map { case (k, v) =>
val bytes = v.copyBytes()
require(bytes.length == recordLength, "Byte array does not have correct length. " +
s"${bytes.length} did not equal recordLength: $recordLength")
bytes
}
data
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {

// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
val output = outputQueue.asScala.flatten.toArray
assert(output.length === expectedOutput.size)
for (i <- output.indices) {
assert(output(i) === expectedOutput(i))
Expand Down Expand Up @@ -155,25 +155,26 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// not enough to trigger a batch
clock.advance(batchDuration.milliseconds / 2)

val input = Seq(1, 2, 3, 4, 5)
input.foreach { i =>
val numCopies = 3
val input = Array[Byte](1, 2, 3, 4, 5)
for (i <- 0 until numCopies) {
Thread.sleep(batchDuration.milliseconds)
val file = new File(testDir, i.toString)
Files.write(Array[Byte](i.toByte), file)
Files.write(input.map(b => (b + i).toByte), file)
assert(file.setLastModified(clock.getTimeMillis()))
assert(file.lastModified === clock.getTimeMillis())
logInfo("Created file " + file)
logInfo(s"Created file $file")
// Advance the clock after creating the file to avoid a race when
// setting its modification time
clock.advance(batchDuration.milliseconds)
eventually(eventuallyTimeout) {
assert(batchCounter.getNumCompletedBatches === i)
}
}

val expectedOutput = input.map(i => i.toByte)
val obtainedOutput = outputQueue.asScala.flatten.toList.map(i => i(0).toByte)
assert(obtainedOutput.toSeq === expectedOutput)
val obtainedOutput = outputQueue.asScala.map(_.flatten).toSeq
for (i <- obtainedOutput.indices) {
assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
}
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
Expand Down Expand Up @@ -258,7 +259,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread)
MultiThreadTestReceiver.haveAllThreadsFinished = false
val outputQueue = new ConcurrentLinkedQueue[Seq[Long]]
def output: Iterable[Long] = outputQueue.asScala.flatMap(x => x)
def output: Iterable[Long] = outputQueue.asScala.flatten

// set up the network stream using the test receiver
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
Expand Down