Skip to content

Commit 79e45c9

Browse files
sarutakandrewor14
authored andcommitted
[SPARK-3377] [SPARK-3610] Metrics can be accidentally aggregated / History server log name should not be based on user input
This PR is another solution for #2250 I'm using codahale base MetricsSystem of Spark with JMX or Graphite, and I saw following 2 problems. (1) When applications which have same spark.app.name run on cluster at the same time, some metrics names are mixed. For instance, if 2+ application is running on the cluster at the same time, each application emits the same named metric like "SparkPi.DAGScheduler.stage.failedStages" and Graphite cannot distinguish the metrics is for which application. (2) When 2+ executors run on the same machine, JVM metrics of each executors are mixed. For instance, 2+ executors running on the same node can emit the same named metric "jvm.memory" and Graphite cannot distinguish the metrics is from which application. And there is an similar issue. The directory for event logs is named using application name. Application name is defined by user and the name can includes illegal character for path names. Further more, the directory name consists of application name and System.currentTimeMillis even though each application has unique Application ID so if we run jobs which have same name, it's difficult to identify which directory is for which application. Closes #2250 Closes #1067 Author: Kousuke Saruta <[email protected]> Closes #2432 from sarutak/metrics-structure-improvement2 and squashes the following commits: 3288b2b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 39169e4 [Kousuke Saruta] Fixed style 6570494 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 817e4f0 [Kousuke Saruta] Simplified MetricsSystem#buildRegistryName 67fa5eb [Kousuke Saruta] Unified MetricsSystem#registerSources and registerSinks in start 10be654 [Kousuke Saruta] Fixed style. 990c078 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 f0c7fba [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 59cc2cd [Kousuke Saruta] Modified SparkContextSchedulerCreationSuite f9b6fb3 [Kousuke Saruta] Modified style. 2cf8a0f [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 389090d [Kousuke Saruta] Replaced taskScheduler.applicationId() with getApplicationId in SparkContext#postApplicationStart ff45c89 [Kousuke Saruta] Added some test cases to MetricsSystemSuite 69c46a6 [Kousuke Saruta] Added warning logging logic to MetricsSystem#buildRegistryName 5cca0d2 [Kousuke Saruta] Added Javadoc comment to SparkContext#getApplicationId 16a9f01 [Kousuke Saruta] Added data types to be returned to some methods 6434b06 [Kousuke Saruta] Reverted changes related to ApplicationId 0413b90 [Kousuke Saruta] Deleted ApplicationId.java and ApplicationIdSuite.java a42300c [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 0fc1b09 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 42bea55 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 248935d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 f6af132 [Kousuke Saruta] Modified SchedulerBackend and TaskScheduler to return System.currentTimeMillis as an unique Application Id 1b8b53e [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 97cb85c [Kousuke Saruta] Modified confliction of MimExcludes 2cdd009 [Kousuke Saruta] Modified defailt implementation of applicationId 9aadb0b [Kousuke Saruta] Modified NetworkReceiverSuite to ensure "executor.start()" is finished in test "network receiver life cycle" 3011efc [Kousuke Saruta] Added ApplicationIdSuite.scala d009c55 [Kousuke Saruta] Modified ApplicationId#equals to compare appIds dfc83fd [Kousuke Saruta] Modified ApplicationId to implement Serializable 9ff4851 [Kousuke Saruta] Modified MimaExcludes.scala to ignore createTaskScheduler method in SparkContext 4567ffc [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 6a91b14 [Kousuke Saruta] Modified SparkContextSchedulerCreationSuite, ExecutorRunnerTest and EventLoggingListenerSuite 0325caf [Kousuke Saruta] Added ApplicationId.scala 0a2fc14 [Kousuke Saruta] Modified style eabda80 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 0f890e6 [Kousuke Saruta] Modified SparkDeploySchedulerBackend and Master to pass baseLogDir instead f eventLogDir bcf25bf [Kousuke Saruta] Modified directory name for EventLogs 28d4d93 [Kousuke Saruta] Modified SparkContext and EventLoggingListener so that the directory for EventLogs is named same for Application ID 203634e [Kousuke Saruta] Modified comment in SchedulerBackend#applicationId and TaskScheduler#applicationId 424fea4 [Kousuke Saruta] Modified the subclasses of TaskScheduler and SchedulerBackend so that they can return non-optional Unique Application ID b311806 [Kousuke Saruta] Swapped last 2 arguments passed to CoarseGrainedExecutorBackend 8a2b6ec [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 086ee25 [Kousuke Saruta] Merge branch 'metrics-structure-improvement2' of github.com:sarutak/spark into metrics-structure-improvement2 e705386 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 36d2f7a [Kousuke Saruta] Added warning message for the situation we cannot get application id for the prefix for the name of metrics eea6e19 [Kousuke Saruta] Modified CoarseGrainedMesosSchedulerBackend and MesosSchedulerBackend so that we can get Application ID c229fbe [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 e719c39 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 4a93c7f [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 4776f9e [Kousuke Saruta] Modified MetricsSystemSuite.scala efcb6e1 [Kousuke Saruta] Modified to add application id to metrics name 2ec848a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 3ea7896 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement ead8966 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 08e627e [Kousuke Saruta] Revert "tmp" 7b67f5a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 45bd33d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 93e263a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 848819c [Kousuke Saruta] Merge branch 'metrics-structure-improvement' of github.com:sarutak/spark into metrics-structure-improvement 912a637 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement e4a4593 [Kousuke Saruta] tmp 3e098d8 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 4603a39 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement fa7175b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 15f88a3 [Kousuke Saruta] Modified MetricsSystem#buildRegistryName because conf.get does not return null when correspondin entry is absent 6f7dcd4 [Kousuke Saruta] Modified constructor of DAGSchedulerSource and BlockManagerSource because the instance of SparkContext is no longer used 6fc5560 [Kousuke Saruta] Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource 4e057c9 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 85ffc02 [Kousuke Saruta] Revert "Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource" 868e326 [Kousuke Saruta] Modified MetricsSystem to set registry name with unique application-id and driver/executor-id 71609f5 [Kousuke Saruta] Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource 55debab [Kousuke Saruta] Modified SparkContext and Executor to set spark.executor.id to identifiers 4180993 [Kousuke Saruta] Modified SparkContext to retain spark.unique.app.name property in SparkConf
1 parent 1eb8389 commit 79e45c9

File tree

29 files changed

+331
-85
lines changed

29 files changed

+331
-85
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging {
187187
val master = conf.get("spark.master")
188188
val appName = conf.get("spark.app.name")
189189

190+
private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
191+
private[spark] val eventLogDir: Option[String] = {
192+
if (isEventLogEnabled) {
193+
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
194+
} else {
195+
None
196+
}
197+
}
198+
190199
// Generate the random name for a temp folder in Tachyon
191200
// Add a timestamp as the suffix here to make it more safe
192201
val tachyonFolderName = "spark-" + randomUUID.toString()
@@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging {
200209
private[spark] val listenerBus = new LiveListenerBus
201210

202211
// Create the Spark execution environment (cache, map output tracker, etc)
212+
conf.set("spark.executor.id", "driver")
203213
private[spark] val env = SparkEnv.create(
204214
conf,
205215
"<driver>",
@@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging {
232242
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
233243
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
234244

235-
// Optionally log Spark events
236-
private[spark] val eventLogger: Option[EventLoggingListener] = {
237-
if (conf.getBoolean("spark.eventLog.enabled", false)) {
238-
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
239-
logger.start()
240-
listenerBus.addListener(logger)
241-
Some(logger)
242-
} else None
243-
}
244-
245-
// At this point, all relevant SparkListeners have been registered, so begin releasing events
246-
listenerBus.start()
247-
248245
val startTime = System.currentTimeMillis()
249246

250247
// Add each JAR given through the constructor
@@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging {
309306
// constructor
310307
taskScheduler.start()
311308

309+
val applicationId: String = taskScheduler.applicationId()
310+
conf.set("spark.app.id", applicationId)
311+
312+
val metricsSystem = env.metricsSystem
313+
314+
// The metrics system for Driver need to be set spark.app.id to app ID.
315+
// So it should start after we get app ID from the task scheduler and set spark.app.id.
316+
metricsSystem.start()
317+
318+
// Optionally log Spark events
319+
private[spark] val eventLogger: Option[EventLoggingListener] = {
320+
if (isEventLogEnabled) {
321+
val logger =
322+
new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
323+
logger.start()
324+
listenerBus.addListener(logger)
325+
Some(logger)
326+
} else None
327+
}
328+
329+
// At this point, all relevant SparkListeners have been registered, so begin releasing events
330+
listenerBus.start()
331+
312332
private[spark] val cleaner: Option[ContextCleaner] = {
313333
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
314334
Some(new ContextCleaner(this))
@@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging {
411431
// Post init
412432
taskScheduler.postStartHook()
413433

414-
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
415-
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
434+
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
435+
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
416436

417437
private def initDriverMetrics() {
418438
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
@@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging {
12781298
private def postApplicationStart() {
12791299
// Note: this code assumes that the task scheduler has been initialized and has contacted
12801300
// the cluster manager to get an application ID (in case the cluster manager provides one).
1281-
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
1301+
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
12821302
startTime, sparkUser))
12831303
}
12841304

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,15 @@ object SparkEnv extends Logging {
259259
}
260260

261261
val metricsSystem = if (isDriver) {
262+
// Don't start metrics system right now for Driver.
263+
// We need to wait for the task scheduler to give us an app ID.
264+
// Then we can start the metrics system.
262265
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
263266
} else {
264-
MetricsSystem.createMetricsSystem("executor", conf, securityManager)
267+
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
268+
ms.start()
269+
ms
265270
}
266-
metricsSystem.start()
267271

268272
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
269273
// this is a temporary directory; in distributed mode, this is the executor's current working

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
3333
import akka.serialization.SerializationExtension
3434

3535
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
36-
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState,
37-
SparkHadoopUtil}
36+
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
37+
ExecutorState, SparkHadoopUtil}
3838
import org.apache.spark.deploy.DeployMessages._
3939
import org.apache.spark.deploy.history.HistoryServer
4040
import org.apache.spark.deploy.master.DriverState.DriverState
@@ -693,16 +693,18 @@ private[spark] class Master(
693693
app.desc.appUiUrl = notFoundBasePath
694694
return false
695695
}
696-
val fileSystem = Utils.getHadoopFileSystem(eventLogDir,
696+
697+
val appEventLogDir = EventLoggingListener.getLogDirPath(eventLogDir, app.id)
698+
val fileSystem = Utils.getHadoopFileSystem(appEventLogDir,
697699
SparkHadoopUtil.get.newConfiguration(conf))
698-
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
700+
val eventLogInfo = EventLoggingListener.parseLoggingInfo(appEventLogDir, fileSystem)
699701
val eventLogPaths = eventLogInfo.logPaths
700702
val compressionCodec = eventLogInfo.compressionCodec
701703

702704
if (eventLogPaths.isEmpty) {
703705
// Event logging is enabled for this application, but no event logs are found
704706
val title = s"Application history not found (${app.id})"
705-
var msg = s"No event logs found for application $appName in $eventLogDir."
707+
var msg = s"No event logs found for application $appName in $appEventLogDir."
706708
logWarning(msg)
707709
msg += " Did you specify the correct logging directory?"
708710
msg = URLEncoder.encode(msg, "UTF-8")

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
106106
executorId: String,
107107
hostname: String,
108108
cores: Int,
109+
appId: String,
109110
workerUrl: Option[String]) {
110111

111112
SignalLogger.register(log)
@@ -122,7 +123,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
122123
val driver = fetcher.actorSelection(driverUrl)
123124
val timeout = AkkaUtils.askTimeout(executorConf)
124125
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
125-
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]]
126+
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++
127+
Seq[(String, String)](("spark.app.id", appId))
126128
fetcher.shutdown()
127129

128130
// Create a new ActorSystem using driver's Spark properties to run the backend.
@@ -144,16 +146,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
144146

145147
def main(args: Array[String]) {
146148
args.length match {
147-
case x if x < 4 =>
149+
case x if x < 5 =>
148150
System.err.println(
149151
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
150152
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
151-
"<cores> [<workerUrl>]")
153+
"<cores> <appid> [<workerUrl>] ")
152154
System.exit(1)
153-
case 4 =>
154-
run(args(0), args(1), args(2), args(3).toInt, None)
155-
case x if x > 4 =>
156-
run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
155+
case 5 =>
156+
run(args(0), args(1), args(2), args(3).toInt, args(4), None)
157+
case x if x > 5 =>
158+
run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
157159
}
158160
}
159161
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ private[spark] class Executor(
7474
val executorSource = new ExecutorSource(this, executorId)
7575

7676
// Initialize Spark environment (using system properties read above)
77+
conf.set("spark.executor.id", "executor." + executorId)
7778
private val env = {
7879
if (!isLocal) {
7980
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,

core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String)
3737

3838
override val metricRegistry = new MetricRegistry()
3939

40-
// TODO: It would be nice to pass the application name here
41-
override val sourceName = "executor.%s".format(executorId)
40+
override val sourceName = "executor"
4241

4342
// Gauge for executor thread pool's actively executing task counts
4443
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {

core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ private[spark] class MesosExecutorBackend
5252
slaveInfo: SlaveInfo) {
5353
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
5454
this.driver = driver
55-
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
55+
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
56+
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
5657
executor = new Executor(
5758
executorInfo.getExecutorId.getValue,
5859
slaveInfo.getHostname,

core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ private[spark] class MetricsSystem private (
8383
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
8484

8585
metricsConfig.initialize()
86-
registerSources()
87-
registerSinks()
8886

8987
def start() {
88+
registerSources()
89+
registerSinks()
9090
sinks.foreach(_.start)
9191
}
9292

@@ -98,19 +98,49 @@ private[spark] class MetricsSystem private (
9898
sinks.foreach(_.report())
9999
}
100100

101+
/**
102+
* Build a name that uniquely identifies each metric source.
103+
* The name is structured as follows: <app ID>.<executor ID (or "driver")>.<source name>.
104+
* If either ID is not available, this defaults to just using <source name>.
105+
*
106+
* @param source Metric source to be named by this method.
107+
* @return An unique metric name for each combination of
108+
* application, executor/driver and metric source.
109+
*/
110+
def buildRegistryName(source: Source): String = {
111+
val appId = conf.getOption("spark.app.id")
112+
val executorId = conf.getOption("spark.executor.id")
113+
val defaultName = MetricRegistry.name(source.sourceName)
114+
115+
if (instance == "driver" || instance == "executor") {
116+
if (appId.isDefined && executorId.isDefined) {
117+
MetricRegistry.name(appId.get, executorId.get, source.sourceName)
118+
} else {
119+
// Only Driver and Executor are set spark.app.id and spark.executor.id.
120+
// For instance, Master and Worker are not related to a specific application.
121+
val warningMsg = s"Using default name $defaultName for source because %s is not set."
122+
if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) }
123+
if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) }
124+
defaultName
125+
}
126+
} else { defaultName }
127+
}
128+
101129
def registerSource(source: Source) {
102130
sources += source
103131
try {
104-
registry.register(source.sourceName, source.metricRegistry)
132+
val regName = buildRegistryName(source)
133+
registry.register(regName, source.metricRegistry)
105134
} catch {
106135
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
107136
}
108137
}
109138

110139
def removeSource(source: Source) {
111140
sources -= source
141+
val regName = buildRegistryName(source)
112142
registry.removeMatching(new MetricFilter {
113-
def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
143+
def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
114144
})
115145
}
116146

@@ -125,7 +155,7 @@ private[spark] class MetricsSystem private (
125155
val source = Class.forName(classPath).newInstance()
126156
registerSource(source.asInstanceOf[Source])
127157
} catch {
128-
case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
158+
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
129159
}
130160
}
131161
}

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
2222
import org.apache.spark.SparkContext
2323
import org.apache.spark.metrics.source.Source
2424

25-
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
25+
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
2626
extends Source {
2727
override val metricRegistry = new MetricRegistry()
28-
override val sourceName = "%s.DAGScheduler".format(sc.appName)
28+
override val sourceName = "DAGScheduler"
2929

3030
metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
3131
override def getValue: Int = dagScheduler.failedStages.size

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,38 +43,29 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
4343
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
4444
*/
4545
private[spark] class EventLoggingListener(
46-
appName: String,
46+
appId: String,
47+
logBaseDir: String,
4748
sparkConf: SparkConf,
4849
hadoopConf: Configuration)
4950
extends SparkListener with Logging {
5051

5152
import EventLoggingListener._
5253

53-
def this(appName: String, sparkConf: SparkConf) =
54-
this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
54+
def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
55+
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
5556

5657
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
5758
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
5859
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
5960
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
60-
private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
61-
private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_")
62-
.toLowerCase + "-" + System.currentTimeMillis
63-
val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
64-
61+
val logDir = EventLoggingListener.getLogDirPath(logBaseDir, appId)
62+
val logDirName: String = logDir.split("/").last
6563
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
6664
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
6765

6866
// For testing. Keep track of all JSON serialized events that have been logged.
6967
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
7068

71-
/**
72-
* Return only the unique application directory without the base directory.
73-
*/
74-
def getApplicationLogDir(): String = {
75-
name
76-
}
77-
7869
/**
7970
* Begin logging events.
8071
* If compression is used, log a file that indicates which compression library is used.
@@ -184,6 +175,18 @@ private[spark] object EventLoggingListener extends Logging {
184175
} else ""
185176
}
186177

178+
/**
179+
* Return a file-system-safe path to the log directory for the given application.
180+
*
181+
* @param logBaseDir A base directory for the path to the log directory for given application.
182+
* @param appId A unique app ID.
183+
* @return A path which consists of file-system-safe characters.
184+
*/
185+
def getLogDirPath(logBaseDir: String, appId: String): String = {
186+
val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
187+
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
188+
}
189+
187190
/**
188191
* Parse the event logging information associated with the logs in the given directory.
189192
*

0 commit comments

Comments
 (0)