Skip to content
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,17 @@ private[spark] object ReliableCheckpointRDD extends Logging {
val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
val fileInputStream = fs.open(partitionerFilePath, bufferSize)
val serializer = SparkEnv.get.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
val partitioner = Utils.tryWithSafeFinally[Partitioner] {
deserializeStream.readObject[Partitioner]
val partitioner = Utils.tryWithSafeFinally {
val deserializeStream = serializer.deserializeStream(fileInputStream)
Utils.tryWithSafeFinally {
deserializeStream.readObject[Partitioner]
} {
deserializeStream.close()
}
} {
deserializeStream.close()
fileInputStream.close()
}

logDebug(s"Read partitioner from $partitionerFilePath")
Some(partitioner)
} catch {
Expand Down
13 changes: 9 additions & 4 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,15 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
nums.saveAsTextFile(outputDir)
// Read the plain text file and check it's OK
val outputFile = new File(outputDir, "part-00000")
val content = Source.fromFile(outputFile).mkString
assert(content === "1\n2\n3\n4\n")
// Also try reading it in as a text file RDD
assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
val bufferSrc = Source.fromFile(outputFile)
Utils.tryWithSafeFinally {
val content = bufferSrc.mkString
assert(content === "1\n2\n3\n4\n")
// Also try reading it in as a text file RDD
assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
} {
bufferSrc.close()
}
}

test("text files (compressed)") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.util.ResetSystemProperties
import org.apache.spark.util.{ResetSystemProperties, Utils}

class RPackageUtilsSuite
extends SparkFunSuite
Expand Down Expand Up @@ -74,9 +74,13 @@ class RPackageUtilsSuite
val deps = Seq(dep1, dep2).mkString(",")
IvyTestUtils.withRepository(main, Some(deps), None, withR = true) { repo =>
val jars = Seq(main, dep1, dep2).map(c => new JarFile(getJarPath(c, new File(new URI(repo)))))
assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
Utils.tryWithSafeFinally {
assert(RPackageUtils.checkManifestForR(jars(0)), "should have R code")
assert(!RPackageUtils.checkManifestForR(jars(1)), "should not have R code")
assert(!RPackageUtils.checkManifestForR(jars(2)), "should not have R code")
} {
jars.foreach(_.close())
}
}
}

Expand Down Expand Up @@ -131,7 +135,7 @@ class RPackageUtilsSuite

test("SparkR zipping works properly") {
val tempDir = Files.createTempDir()
try {
Utils.tryWithSafeFinally {
IvyTestUtils.writeFile(tempDir, "test.R", "abc")
val fakeSparkRDir = new File(tempDir, "SparkR")
assert(fakeSparkRDir.mkdirs())
Expand All @@ -144,14 +148,19 @@ class RPackageUtilsSuite
IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc")
val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip")
assert(finalZip.exists())
val entries = new ZipFile(finalZip).entries().asScala.map(_.getName).toSeq
assert(entries.contains("/test.R"))
assert(entries.contains("/SparkR/abc.R"))
assert(entries.contains("/SparkR/DESCRIPTION"))
assert(!entries.contains("/package.zip"))
assert(entries.contains("/packageTest/def.R"))
assert(entries.contains("/packageTest/DESCRIPTION"))
} finally {
val zipFile = new ZipFile(finalZip)
Utils.tryWithSafeFinally {
val entries = zipFile.entries().asScala.map(_.getName).toSeq
assert(entries.contains("/test.R"))
assert(entries.contains("/SparkR/abc.R"))
assert(entries.contains("/SparkR/DESCRIPTION"))
assert(!entries.contains("/package.zip"))
assert(entries.contains("/packageTest/def.R"))
assert(entries.contains("/packageTest/DESCRIPTION"))
} {
zipFile.close()
}
} {
FileUtils.deleteDirectory(tempDir)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
val bstream = new BufferedOutputStream(cstream)
if (isNewFormat) {
EventLoggingListener.initEventLog(new FileOutputStream(file))
val newFormatStream = new FileOutputStream(file)
Utils.tryWithSafeFinally {
EventLoggingListener.initEventLog(newFormatStream)
} {
newFormatStream.close()
}
}

val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8)
Utils.tryWithSafeFinally {
events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,6 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit

// Make sure expected events exist in the log file.
val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
val logStart = SparkListenerLogStart(SPARK_VERSION)
val lines = readLines(logData)
val eventSet = mutable.Set(
SparkListenerApplicationStart,
SparkListenerBlockManagerAdded,
Expand All @@ -216,19 +214,25 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
SparkListenerTaskStart,
SparkListenerTaskEnd,
SparkListenerApplicationEnd).map(Utils.getFormattedClassName)
lines.foreach { line =>
eventSet.foreach { event =>
if (line.contains(event)) {
val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
val eventType = Utils.getFormattedClassName(parsedEvent)
if (eventType == event) {
eventSet.remove(event)
Utils.tryWithSafeFinally {
val logStart = SparkListenerLogStart(SPARK_VERSION)
val lines = readLines(logData)
lines.foreach { line =>
eventSet.foreach { event =>
if (line.contains(event)) {
val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
val eventType = Utils.getFormattedClassName(parsedEvent)
if (eventType == event) {
eventSet.remove(event)
}
}
}
}
assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
} {
logData.close()
}
assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
}

private def readLines(in: InputStream): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local

// ensure we reset the classloader after the test completes
val originalClassLoader = Thread.currentThread.getContextClassLoader
try {
val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
Utils.tryWithSafeFinally {
// load the exception from the jar
val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)
loader.addURL(jarFile.toURI.toURL)
Thread.currentThread().setContextClassLoader(loader)
val excClass: Class[_] = Utils.classForName("repro.MyException")
Expand All @@ -209,7 +209,8 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local

assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined)
assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty)
} finally {
} {
loader.close()
Thread.currentThread.setContextClassLoader(originalClassLoader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good idea to swap these two lines - so that classloader is always reset even if loader.close() fails

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense.

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,17 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
val tempDir = Utils.createTempDir()
val outputDir = new File(tempDir, "output")
MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString)
val lines = outputDir.listFiles()
val sources = outputDir.listFiles()
.filter(_.getName.startsWith("part-"))
.flatMap(Source.fromFile(_).getLines())
.toSet
val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
assert(lines === expected)
Utils.deleteRecursively(tempDir)
.map(Source.fromFile)
Utils.tryWithSafeFinally {
val lines = sources.flatMap(_.getLines()).toSet
val expected = Set("1.1 1:1.23 3:4.56", "0.0 1:1.01 2:2.02 3:3.03")
assert(lines === expected)
} {
sources.foreach(_.close())
Utils.deleteRecursively(tempDir)
}
}

test("appendBias") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
// Finally, stop the endpoint
ssc.env.rpcEnv.stop(endpoint)
endpoint = null
receivedBlockTracker.stop()
logInfo("ReceiverTracker stopped")
trackerState = Stopped
}

// note that the output writer is created at construction time, we have to close
// them even if it hasn't been started.
receivedBlockTracker.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is stop expected to be idempotent in this context ? Multiple invocations wont have an issue ?
(After this change, it can be invoked repeatedly even when trackerState == stopped).

This is not part of tests ... hence my query.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I just manually tested them by modifying the existing tests in ReceiverTrackerSuite and ReceivedBlockTrackerSuite and it seems fine with multiple invocations. Should we need a test maybe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually not sure, since I am not very familiar with streaming : wanted to get it clarified.
@tdas, @zsxwing any comments ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Another way to handle this is to move the initialization of receivedBlockTracker into start, which makes more sense actually. If that's getting risk for this PR, I'd just punt on this change and not move stop()

}

/** Allocate all unallocated blocks to the given batch. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1805,6 +1805,7 @@ public Integer call(String s) {
// will be re-processed after recovery
List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
ssc.stop();
Utils.deleteRecursively(tempDir);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,16 +642,18 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
val fileStream = ssc.textFileStream(testDir.toString)
// Make value 3 take a large time to process, to ensure that the driver
// shuts down in the middle of processing the 3rd batch
CheckpointSuite.batchThreeShouldBlockIndefinitely = true
val mappedStream = fileStream.map(s => {
CheckpointSuite.batchThreeShouldBlockALongTime = true
val mappedStream = fileStream.map { s =>
val i = s.toInt
if (i == 3) {
while (CheckpointSuite.batchThreeShouldBlockIndefinitely) {
Thread.sleep(Long.MaxValue)
if (CheckpointSuite.batchThreeShouldBlockALongTime) {
// It's not a good idea to let the thread run forever
// as resource won't be correctly released
Thread.sleep(6000)
}
}
i
})
}

// Reducing over a large window to ensure that recovery from driver failure
// requires reprocessing of all the files seen before the failure
Expand Down Expand Up @@ -691,7 +693,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}

// The original StreamingContext has now been stopped.
CheckpointSuite.batchThreeShouldBlockIndefinitely = false
CheckpointSuite.batchThreeShouldBlockALongTime = false

// Create files while the streaming driver is down
for (i <- Seq(4, 5, 6)) {
Expand Down Expand Up @@ -928,5 +930,5 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}

private object CheckpointSuite extends Serializable {
var batchThreeShouldBlockIndefinitely: Boolean = true
var batchThreeShouldBlockALongTime: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,16 @@ class MapWithStateSuite extends SparkFunSuite
protected val batchDuration = Seconds(1)

before {
StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) }
checkpointDir = Utils.createTempDir("checkpoint")
StreamingContext.getActive().foreach(_.stop(stopSparkContext = false))
}

after {
StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) }
if (checkpointDir != null) {
Utils.deleteRecursively(checkpointDir)
}
StreamingContext.getActive().foreach(_.stop(stopSparkContext = false))
}

override def beforeAll(): Unit = {
super.beforeAll()
checkpointDir = Utils.createTempDir("checkpoint")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might happen to be OK, but now the dir is not deleted between tests. Is that going to be OK?

Copy link
Member Author

@HyukjinKwon HyukjinKwon Nov 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it seems the original codes are already fine in here.. I took a look and ran several tests and it seems now I may understand why @taoli91 tried to fix the codes like this.

It seems somehow the state in ReceiverTracker went wrong on Windows and ended up without closing checkpointDir. It seems the original test was failed[1] due to the issue in ReceiverTracker. So, I think he tried to only create/delete the folder once[2] and ensure stopping ReceivedBlockTracker in ReceiverTracker regardless of the state.

I tested this with manually stopping ReceivedBlockTracker regardless of the state (the original proposal) and it seems fine without the changes in here, MapWithStateSuite.scala[3]. Of course, it is fine with this change[4].

[1]https://ci.appveyor.com/project/spark-test/spark/build/56-F88EDDAF-E576-4787-9530-A4185FC46B1E
[2]https://ci.appveyor.com/project/spark-test/spark/build/57-test-MapWithStateSuite
[3]https://ci.appveyor.com/project/spark-test/spark/build/58-test-MapWithStateSuite
[4]https://ci.appveyor.com/project/spark-test/spark/build/59-test-MapWithStateSuite

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this change is not valid. I will get rid of this. Thank you for pointing this out.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Nov 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the reason why [1][2] were failed on Windows (without ensuring stopping ReceivedBlockTracker) seems, the directory, checkpointDir, is being opened so it fails to delete the directory throwing an exception.

val conf = new SparkConf().setMaster("local").setAppName("MapWithStateSuite")
conf.set("spark.streaming.clock", classOf[ManualClock].getName())
sc = new SparkContext(conf)
Expand All @@ -63,6 +60,7 @@ class MapWithStateSuite extends SparkFunSuite
}
} finally {
super.afterAll()
Utils.deleteRecursively(checkpointDir)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ object MasterFailureTest extends Logging {
val mergedOutput = runStreams(ssc, lastExpectedOutput, maxTimeToRun)

fileGeneratingThread.join()
ssc.stop()
fs.delete(checkpointDir, true)
fs.delete(testDir, true)
logInfo("Finished test after " + killCount + " failures")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,15 @@ class ReceivedBlockTrackerSuite
val expectedWrittenData1 = blockInfos1.map(BlockAdditionEvent)
getWrittenLogData() shouldEqual expectedWrittenData1
getWriteAheadLogFiles() should have size 1
tracker1.stop()

incrementTime()

// Recovery without recovery from WAL and verify list of unallocated blocks is empty
val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false)
tracker1_.getUnallocatedBlocks(streamId) shouldBe empty
tracker1_.hasUnallocatedReceivedBlocks should be (false)
tracker1_.stop()

// Restart tracker and verify recovered list of unallocated blocks
val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
Expand All @@ -163,6 +165,7 @@ class ReceivedBlockTrackerSuite
val blockInfos2 = addBlockInfos(tracker2)
tracker2.allocateBlocksToBatch(batchTime2)
tracker2.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker2.stop()

// Verify whether log has correct contents
val expectedWrittenData2 = expectedWrittenData1 ++
Expand Down Expand Up @@ -192,6 +195,7 @@ class ReceivedBlockTrackerSuite
getWriteAheadLogFiles() should not contain oldestLogFile
}
printLogFiles("After clean")
tracker3.stop()

// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
Expand All @@ -200,6 +204,7 @@ class ReceivedBlockTrackerSuite
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker4.stop()
}

test("disable write ahead log when checkpoint directory is not set") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ abstract class CommonWriteAheadLogTests(
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
}
}
writeAheadLog.close()
}

test(testPrefix + "handling file errors while reading rotating logs") {
Expand Down