Skip to content

Commit 0eb7722

Browse files
twinkle-gMarcelo Vanzin
authored andcommitted
SPARK-4705: Doing cherry-pick of fix into master
1 parent c83e039 commit 0eb7722

File tree

7 files changed

+50
-9
lines changed

7 files changed

+50
-9
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
386386
taskScheduler.start()
387387

388388
val applicationId: String = taskScheduler.applicationId()
389+
val applicationAttemptId : String = taskScheduler.applicationAttemptId()
389390
conf.set("spark.app.id", applicationId)
390391

391392
env.blockManager.initialize(applicationId)
@@ -402,7 +403,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
402403
private[spark] val eventLogger: Option[EventLoggingListener] = {
403404
if (isEventLogEnabled) {
404405
val logger =
405-
new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
406+
new EventLoggingListener(applicationId, applicationAttemptId,
407+
eventLogDir.get, conf, hadoopConfiguration)
406408
logger.start()
407409
listenerBus.addListener(logger)
408410
Some(logger)

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
4747
*/
4848
private[spark] class EventLoggingListener(
4949
appId: String,
50+
appAttemptId : String,
5051
logBaseDir: URI,
5152
sparkConf: SparkConf,
5253
hadoopConf: Configuration)
@@ -55,7 +56,7 @@ private[spark] class EventLoggingListener(
5556
import EventLoggingListener._
5657

5758
def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
58-
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
59+
this(appId, "", logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
5960

6061
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
6162
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
@@ -89,7 +90,7 @@ private[spark] class EventLoggingListener(
8990
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
9091

9192
// Visible for tests only.
92-
private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName)
93+
private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName, appAttemptId)
9394

9495
/**
9596
* Creates the log file in the configured log directory.
@@ -254,18 +255,30 @@ private[spark] object EventLoggingListener extends Logging {
254255
*
255256
* @param logBaseDir Directory where the log file will be written.
256257
* @param appId A unique app ID.
258+
* @param appAttemptId A unique attempt id of appId.
257259
* @param compressionCodecName Name to identify the codec used to compress the contents
258260
* of the log, or None if compression is not enabled.
259261
* @return A path which consists of file-system-safe characters.
260262
*/
261263
def getLogPath(
262-
logBaseDir: URI,
264+
logBaseDir: String,
263265
appId: String,
266+
appAttemptId: String,
264267
compressionCodecName: Option[String] = None): String = {
265-
val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
266-
// e.g. app_123, app_123.lzf
267-
val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
268-
logBaseDir.toString.stripSuffix("/") + "/" + logName
268+
val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
269+
270+
if (appAttemptId.equals("")) {
271+
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
272+
} else {
273+
Utils.resolveURI(logBaseDir) + "/" + appAttemptId + "/" + name.stripSuffix("/")
274+
}
275+
}
276+
277+
def getLogPath(
278+
logBaseDir: String,
279+
appId: String,
280+
compressionCodecName: Option[String] = None): String = {
281+
getLogPath(logBaseDir, appId, "", compressionCodecName)
269282
}
270283

271284
/**

core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,11 @@ private[spark] trait SchedulerBackend {
4141
*/
4242
def applicationId(): String = appId
4343

44+
/**
45+
* Get an application ID associated with the job.
46+
*
47+
* @return An application attempt id
48+
*/
49+
def applicationAttemptId(): String = ""
50+
4451
}

core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,12 @@ private[spark] trait TaskScheduler {
7878
* Process a lost executor
7979
*/
8080
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
81+
82+
/**
83+
* Get an application's attempt Id associated with the job.
84+
*
85+
* @return An application's Attempt ID
86+
*/
87+
def applicationAttemptId(): String = ""
88+
8189
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,8 @@ private[spark] class TaskSchedulerImpl(
514514
}
515515

516516
override def applicationId(): String = backend.applicationId()
517+
518+
override def applicationAttemptId() : String = backend.applicationAttemptId()
517519

518520
}
519521

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ private[spark] class ApplicationMaster(
9090

9191
// Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
9292
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
93+
94+
//Propagate the attempt if, so that in case of event logging, different attempt's logs gets created in different directory
95+
System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString())
96+
9397
}
9498

9599
logInfo("ApplicationAttemptId: " + appAttemptId)

yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,10 @@ private[spark] class YarnClusterSchedulerBackend(
4646
logError("Application ID is not set.")
4747
super.applicationId
4848
}
49-
49+
50+
override def applicationAttemptId(): String =
51+
sc.getConf.getOption("spark.yarn.app.attemptid").getOrElse {
52+
logError("Application attempt ID is not set.")
53+
super.applicationAttemptId
54+
}
5055
}

0 commit comments

Comments
 (0)