Skip to content

Commit 3700586

Browse files
author
Marcelo Vanzin
committed
Restore log flushing.
1 parent f677930 commit 3700586

File tree

1 file changed

+21
-9
lines changed

1 file changed

+21
-9
lines changed

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ private[spark] class EventLoggingListener(
6666
// Only defined if the file system scheme is not local
6767
private var hadoopDataStream: Option[FSDataOutputStream] = None
6868

69+
// The Hadoop APIs have changed over time, so we use reflection to figure out
70+
// the correct method to use to flush a hadoop data stream. See SPARK-1518
71+
// for details.
72+
private val hadoopFlushMethod = {
73+
val cls = classOf[FSDataOutputStream]
74+
scala.util.Try(cls.getMethod("hflush")).getOrElse(cls.getMethod("sync"))
75+
}
76+
6977
private var writer: Option[PrintWriter] = None
7078

7179
// For testing. Keep track of all JSON serialized events that have been logged.
@@ -138,9 +146,13 @@ private[spark] class EventLoggingListener(
138146
}
139147

140148
/** Log the event as JSON. */
141-
private def logEvent(event: SparkListenerEvent) {
149+
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
142150
val eventJson = JsonProtocol.sparkEventToJson(event)
143151
writer.foreach(_.println(compact(render(eventJson))))
152+
if (flushLogger) {
153+
writer.foreach(_.flush())
154+
hadoopDataStream.foreach(hadoopFlushMethod.invoke(_))
155+
}
144156
if (testing) {
145157
loggedEvents += eventJson
146158
}
@@ -157,21 +169,21 @@ private[spark] class EventLoggingListener(
157169
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate) =
158170
logEvent(event)
159171
override def onStageCompleted(event: SparkListenerStageCompleted) =
160-
logEvent(event)
172+
logEvent(event, flushLogger = true)
161173
override def onJobStart(event: SparkListenerJobStart) =
162-
logEvent(event)
174+
logEvent(event, flushLogger = true)
163175
override def onJobEnd(event: SparkListenerJobEnd) =
164-
logEvent(event)
176+
logEvent(event, flushLogger = true)
165177
override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded) =
166-
logEvent(event)
178+
logEvent(event, flushLogger = true)
167179
override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved) =
168-
logEvent(event)
180+
logEvent(event, flushLogger = true)
169181
override def onUnpersistRDD(event: SparkListenerUnpersistRDD) =
170-
logEvent(event)
182+
logEvent(event, flushLogger = true)
171183
override def onApplicationStart(event: SparkListenerApplicationStart) =
172-
logEvent(event)
184+
logEvent(event, flushLogger = true)
173185
override def onApplicationEnd(event: SparkListenerApplicationEnd) =
174-
logEvent(event)
186+
logEvent(event, flushLogger = true)
175187
// No-op because logging every update would be overkill
176188
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { }
177189

0 commit comments

Comments
 (0)