-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-14914][CORE] Fix Resource not closed after using, mostly for unit tests #15618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
ceb1681
98ee4ab
ed52ec7
448508f
34078a3
efb7227
a06bffc
45262dc
b3c0c96
a176adb
35aacd2
9f50128
55b360e
91f82b5
f3713d1
863ea7f
3949dbe
49cb4e7
1521572
a9a5f06
d680a2f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -183,9 +183,9 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local | |
|
|
||
| // ensure we reset the classloader after the test completes | ||
| val originalClassLoader = Thread.currentThread.getContextClassLoader | ||
| val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader) | ||
| try { | ||
| // load the exception from the jar | ||
| val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader) | ||
| loader.addURL(jarFile.toURI.toURL) | ||
| Thread.currentThread().setContextClassLoader(loader) | ||
| val excClass: Class[_] = Utils.classForName("repro.MyException") | ||
|
|
@@ -210,6 +210,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local | |
| assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined) | ||
| assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty) | ||
| } finally { | ||
| loader.close() | ||
| Thread.currentThread.setContextClassLoader(originalClassLoader) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be good idea to swap these two lines - so that classloader is always reset even if loader.close() fails
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, makes sense. |
||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -194,10 +194,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false | |
| // Finally, stop the endpoint | ||
| ssc.env.rpcEnv.stop(endpoint) | ||
| endpoint = null | ||
| receivedBlockTracker.stop() | ||
| logInfo("ReceiverTracker stopped") | ||
| trackerState = Stopped | ||
| } | ||
|
|
||
| // note that the output writer is created at construction time, we have to close | ||
| // them even if it hasn't been started. | ||
| receivedBlockTracker.stop() | ||
|
||
| } | ||
|
|
||
| /** Allocate all unallocated blocks to the given batch. */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,19 +38,16 @@ class MapWithStateSuite extends SparkFunSuite | |
| protected val batchDuration = Seconds(1) | ||
|
|
||
| before { | ||
| StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } | ||
| checkpointDir = Utils.createTempDir("checkpoint") | ||
| StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) | ||
| } | ||
|
|
||
| after { | ||
| StreamingContext.getActive().foreach { _.stop(stopSparkContext = false) } | ||
| if (checkpointDir != null) { | ||
| Utils.deleteRecursively(checkpointDir) | ||
| } | ||
| StreamingContext.getActive().foreach(_.stop(stopSparkContext = false)) | ||
| } | ||
|
|
||
| override def beforeAll(): Unit = { | ||
| super.beforeAll() | ||
| checkpointDir = Utils.createTempDir("checkpoint") | ||
|
||
| val conf = new SparkConf().setMaster("local").setAppName("MapWithStateSuite") | ||
| conf.set("spark.streaming.clock", classOf[ManualClock].getName()) | ||
| sc = new SparkContext(conf) | ||
|
|
@@ -63,6 +60,7 @@ class MapWithStateSuite extends SparkFunSuite | |
| } | ||
| } finally { | ||
| super.afterAll() | ||
| Utils.deleteRecursively(checkpointDir) | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be in a try {} finally {} instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not here because the stream needs to be open afterwards. I had a similar discussion on the original pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mean having the finally here on this line...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry to ask repeatedly but could I please ask how I should change this a little bit more?
I was thinking I should not use
finallyuponfileInputStreamas it should not always be closed at this point or around this point..There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, did you mean something like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think that's a good idea. Do you need the generic types on the "try" methods, even?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah-ha, sure. Let me then try. Thanks for bearing with me.