Skip to content

Commit 612c21d

Browse files
committed
Add Closeable / close() to Java context objects that expose a stop() lifecycle method
1 parent b734ed0 commit 612c21d

File tree

2 files changed

+11
-3
lines changed

2 files changed

+11
-3
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.api.java
1919

20+
import java.io.Closeable
2021
import java.util
2122
import java.util.{Map => JMap}
2223

@@ -40,7 +41,9 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
4041
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
4142
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
4243
*/
43-
class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
44+
class JavaSparkContext(val sc: SparkContext)
45+
extends JavaSparkContextVarargsWorkaround with Closeable {
46+
4447
/**
4548
* Create a JavaSparkContext that loads settings from system properties (for instance, when
4649
* launching with ./bin/spark-submit).
@@ -534,6 +537,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
534537
sc.stop()
535538
}
536539

540+
override def close(): Unit = stop()
541+
537542
/**
538543
* Get Spark's home location from either a value set through the constructor,
539544
* or the spark.home Java property, or the SPARK_HOME environment variable

streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package org.apache.spark.streaming.api.java
2121
import scala.collection.JavaConversions._
2222
import scala.reflect.ClassTag
2323

24-
import java.io.InputStream
24+
import java.io.{Closeable, InputStream}
2525
import java.util.{List => JList, Map => JMap}
2626

2727
import akka.actor.{Props, SupervisorStrategy}
@@ -49,7 +49,7 @@ import org.apache.spark.streaming.receiver.Receiver
4949
* respectively. `context.awaitTransformation()` allows the current thread to wait for the
5050
* termination of a context by `stop()` or by an exception.
5151
*/
52-
class JavaStreamingContext(val ssc: StreamingContext) {
52+
class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
5353

5454
/**
5555
* Create a StreamingContext.
@@ -540,6 +540,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
540540
def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = {
541541
ssc.stop(stopSparkContext, stopGracefully)
542542
}
543+
544+
override def close(): Unit = stop()
545+
543546
}
544547

545548
/**

0 commit comments

Comments
 (0)