diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 1fb7b76d43d0..f0de1e6cce1f 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -114,6 +114,17 @@ private[spark] class AppStatusListener( kvstore.write(new ApplicationInfoWrapper(appInfo)) kvstore.write(appSummary) + + // Update the driver block manager with logs from this event. The SparkContext initialization + // code registers the driver before this event is sent. + event.driverLogs.foreach { logs => + val driver = liveExecutors.get(SparkContext.DRIVER_IDENTIFIER) + .orElse(liveExecutors.get(SparkContext.LEGACY_DRIVER_IDENTIFIER)) + driver.foreach { d => + d.executorLogs = logs.toMap + update(d, System.nanoTime()) + } + } } override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 9cf4f7efb24a..d5345b933ed3 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -939,6 +939,24 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("driver logs") { + val listener = new AppStatusListener(store, conf, true) + + val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 42) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(time, driver, 42L)) + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + Some(Map("stdout" -> "file.txt")))) + + check[ExecutorSummaryWrapper](SparkContext.DRIVER_IDENTIFIER) { d => + assert(d.info.executorLogs("stdout") === "file.txt") + } + } + private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId) private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {