Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import java.io.{InputStream, IOException}
import java.io.{EOFException, InputStream, IOException}

import scala.io.Source

Expand Down Expand Up @@ -107,6 +107,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
}
}
} catch {
case _: EOFException if maybeTruncated =>
case ioe: IOException =>
throw ioe
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.spark.scheduler

import java.io.{File, PrintWriter}
import java.io._
import java.net.URI
import java.util.concurrent.atomic.AtomicInteger

import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfter

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec}
import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils}

/**
Expand Down Expand Up @@ -72,6 +73,59 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
}

/**
* Test replaying compressed spark history file that internally throws an EOFException. To
* avoid sensitivity to the compression specifics the test forces an EOFException to occur
* while reading bytes from the underlying stream (such as observed in actual history files
* in some cases) and forces specific failure handling. This validates correctness in both
* cases when maybeTruncated is true or false.
*/
test("Replay compressed inprogress log file succeeding on partial read") {
val buffered = new ByteArrayOutputStream
val codec = new LZ4CompressionCodec(new SparkConf())
val compstream = codec.compressedOutputStream(buffered)
val writer = new PrintWriter(compstream)

val applicationStart = SparkListenerApplicationStart("AppStarts", None,
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)

// scalastyle:off println
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
// scalastyle:on println
writer.close()

val logFilePath = Utils.getFilePath(testDir, "events.lz4.inprogress")
val fstream = fileSystem.create(logFilePath)
val bytes = buffered.toByteArray

fstream.write(bytes, 0, buffered.size)
fstream.close

// Read the compressed .inprogress file and verify only first event was parsed.
val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val replayer = new ReplayListenerBus()

val eventMonster = new EventMonster(conf)
replayer.addListener(eventMonster)

// Verify the replay returns the events given the input maybe truncated.
val logData = EventLoggingListener.openEventLog(logFilePath, fileSystem)
val failingStream = new EarlyEOFInputStream(logData, buffered.size - 10)
replayer.replay(failingStream, logFilePath.toString, true)

assert(eventMonster.loggedEvents.size === 1)
assert(failingStream.didFail)

// Verify the replay throws the EOF exception since the input may not be truncated.
val logData2 = EventLoggingListener.openEventLog(logFilePath, fileSystem)
val failingStream2 = new EarlyEOFInputStream(logData2, buffered.size - 10)
intercept[EOFException] {
replayer.replay(failingStream2, logFilePath.toString, false)
}
}

// This assumes the correctness of EventLoggingListener
test("End-to-end replay") {
testApplicationReplay()
Expand Down Expand Up @@ -156,4 +210,23 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
override def start() { }

}

/*
* This is a dummy input stream that wraps another input stream but ends prematurely when
* reading at the specified position, throwing an EOFExeption.
*/
private class EarlyEOFInputStream(in: InputStream, failAtPos: Int) extends InputStream {
private val countDown = new AtomicInteger(failAtPos)

def didFail: Boolean = countDown.get == 0

@throws[IOException]
def read: Int = {
if (countDown.get == 0) {
throw new EOFException("Stream ended prematurely")
}
countDown.decrementAndGet()
in.read
}
}
}