Skip to content

Commit a3598de

Browse files
committed
Do not close file system with ReplayBus + fix bind address
1 parent bc46fc8 commit a3598de

File tree

4 files changed

+33
-15
lines changed

4 files changed

+33
-15
lines changed

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717

1818
package org.apache.spark.deploy.history
1919

20-
import java.net.URI
2120
import javax.servlet.http.HttpServletRequest
2221

2322
import scala.collection.mutable
2423
import scala.concurrent._
2524
import scala.concurrent.ExecutionContext.Implicits.global
26-
import scala.util.{Failure, Success}
2725

2826
import org.apache.hadoop.fs.{FileStatus, Path}
2927
import org.eclipse.jetty.servlet.ServletContextHandler
@@ -52,8 +50,9 @@ import org.apache.spark.scheduler.{ApplicationEventListener, ReplayListenerBus}
5250
class HistoryServer(val baseLogDir: String, requestedPort: Int)
5351
extends SparkUIContainer("History Server") with Logging {
5452

55-
private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir))
56-
private val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
53+
private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
54+
private val bindHost = Utils.localHostName()
55+
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
5756
private val port = requestedPort
5857
private val conf = new SparkConf
5958
private val securityManager = new SecurityManager(conf)
@@ -73,8 +72,8 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
7372
/** Bind to the HTTP server behind this web interface */
7473
override def bind() {
7574
try {
76-
serverInfo = Some(startJettyServer(host, port, handlers, conf))
77-
logInfo("Started HistoryServer at http://%s:%d".format(host, boundPort))
75+
serverInfo = Some(startJettyServer(bindHost, port, handlers, conf))
76+
logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
7877
} catch {
7978
case e: Exception =>
8079
logError("Failed to bind HistoryServer", e)
@@ -133,7 +132,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
133132
* which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
134133
*/
135134
private def maybeRenderUI(appId: String, logPath: String, lastUpdated: Long) {
136-
val replayBus = new ReplayListenerBus(logPath)
135+
val replayBus = new ReplayListenerBus(logPath, fileSystem)
137136
replayBus.start()
138137

139138
// If the application completion file is found
@@ -157,14 +156,19 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
157156
} else {
158157
logWarning("Skipping incomplete application: %s".format(logPath))
159158
}
160-
replayBus.stop()
159+
}
160+
161+
/** Stop the server and close the file system. */
162+
override def stop() {
163+
super.stop()
164+
fileSystem.close()
161165
}
162166

163167
/** Parse app ID from the given log path. */
164168
def getAppId(logPath: String): String = logPath.split("/").last
165169

166170
/** Return the address of this server. */
167-
def getAddress = "http://" + host + ":" + boundPort
171+
def getAddress = "http://" + publicHost + ":" + boundPort
168172

169173
/** Return when this directory was last modified. */
170174
private def getModificationTime(dir: FileStatus): Long = {

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import akka.actor._
2929
import akka.pattern.ask
3030
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
3131
import akka.serialization.SerializationExtension
32+
import org.apache.hadoop.fs.FileSystem
3233

3334
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
3435
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
@@ -45,7 +46,8 @@ private[spark] class Master(
4546
host: String,
4647
port: Int,
4748
webUiPort: Int,
48-
val securityMgr: SecurityManager) extends Actor with Logging {
49+
val securityMgr: SecurityManager)
50+
extends Actor with Logging {
4951

5052
import context.dispatcher // to use Akka's scheduler.schedule()
5153

@@ -71,6 +73,7 @@ private[spark] class Master(
7173
var nextAppNumber = 0
7274

7375
val appIdToUI = new HashMap[String, SparkUI]
76+
val fileSystems = new HashSet[FileSystem]
7477

7578
val drivers = new HashSet[DriverInfo]
7679
val completedDrivers = new ArrayBuffer[DriverInfo]
@@ -149,6 +152,7 @@ private[spark] class Master(
149152

150153
override def postStop() {
151154
webUi.stop()
155+
fileSystems.foreach(_.close())
152156
masterMetricsSystem.stop()
153157
applicationMetricsSystem.stop()
154158
persistenceEngine.close()
@@ -662,11 +666,11 @@ private[spark] class Master(
662666
val eventLogDir = app.desc.eventLogDir.getOrElse { return None }
663667
val replayBus = new ReplayListenerBus(eventLogDir)
664668
val ui = new SparkUI(replayBus, "%s (finished)".format(appName), "/history/%s".format(app.id))
669+
fileSystems += replayBus.fileSystem
665670

666671
// Do not call ui.bind() to avoid creating a new server for each application
667672
ui.start()
668673
val success = replayBus.replay()
669-
replayBus.stop()
670674
if (success) Some(ui) else None
671675
}
672676

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.io.InputStream
21-
import java.net.URI
2221

2322
import scala.io.Source
2423

@@ -36,8 +35,13 @@ import org.apache.spark.util.{JsonProtocol, Utils}
3635
* This class expects files to be appropriately prefixed as specified in EventLoggingListener.
3736
* There exists a one-to-one mapping between ReplayListenerBus and event logging applications.
3837
*/
39-
private[spark] class ReplayListenerBus(logDir: String) extends SparkListenerBus with Logging {
40-
private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
38+
private[spark] class ReplayListenerBus(
39+
logDir: String,
40+
val fileSystem: FileSystem)
41+
extends SparkListenerBus with Logging {
42+
43+
def this(logDir: String) = this(logDir, Utils.getHadoopFileSystem(logDir))
44+
4145
private var applicationComplete = false
4246
private var compressionCodec: Option[CompressionCodec] = None
4347
private var logPaths = Array[Path]()

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
2626
import scala.collection.JavaConversions._
2727
import scala.collection.Map
2828
import scala.collection.mutable.ArrayBuffer
29-
import scala.collection.mutable.SortedSet
3029
import scala.io.Source
3130
import scala.reflect.ClassTag
3231

@@ -1022,4 +1021,11 @@ private[spark] object Utils extends Logging {
10221021
def getHadoopFileSystem(path: URI): FileSystem = {
10231022
FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
10241023
}
1024+
1025+
/**
1026+
* Return a Hadoop FileSystem with the scheme encoded in the given path.
1027+
*/
1028+
def getHadoopFileSystem(path: String): FileSystem = {
1029+
getHadoopFileSystem(new URI(path))
1030+
}
10251031
}

0 commit comments

Comments
 (0)