@@ -32,12 +32,7 @@ import org.json4s.jackson.JsonMethods._
3232import org .apache .spark .{Logging , SparkConf , SparkContext }
3333import org .apache .spark .deploy .SparkHadoopUtil
3434import org .apache .spark .io .CompressionCodec
35- <<<<<<< HEAD
36- import org .apache .spark .SPARK_VERSION
37- import org .apache .spark .util .{FileLogger , JsonProtocol , Utils }
38- =======
3935import org .apache .spark .util .{JsonProtocol , Utils }
40- >>>>>>> Make event logger use a single file.
4136
4237/**
4338 * A SparkListener that logs events to persistent storage.
@@ -65,17 +60,11 @@ private[spark] class EventLoggingListener(
6560 private val testing = sparkConf.getBoolean(" spark.eventLog.testing" , false )
6661 private val outputBufferSize = sparkConf.getInt(" spark.eventLog.buffer.kb" , 100 ) * 1024
6762 private val logBaseDir = sparkConf.get(" spark.eventLog.dir" , DEFAULT_LOG_DIR ).stripSuffix(" /" )
68- <<<<<<< HEAD
69- private val name = appName.replaceAll(" [ :/]" , " -" ).replaceAll(" [${}'\" ]" , " _" )
70- .toLowerCase + " -" + System .currentTimeMillis
71- val logDir = Utils .resolveURI(logBaseDir) + " /" + name.stripSuffix(" /" )
72- =======
7363 private val fileSystem = Utils .getHadoopFileSystem(new URI (logBaseDir))
7464 private lazy val compressionCodec = CompressionCodec .createCodec(sparkConf)
7565
7666 // Only defined if the file system scheme is not local
7767 private var hadoopDataStream : Option [FSDataOutputStream ] = None
78- >>>>>>> Make event logger use a single file.
7968
8069 private var writer : Option [PrintWriter ] = None
8170
@@ -100,17 +89,6 @@ private[spark] class EventLoggingListener(
10089 }
10190
10291 /**
103- <<<<<<< HEAD
104- * Return only the unique application directory without the base directory.
105- */
106- def getApplicationLogDir (): String = {
107- name
108- }
109-
110- /**
111- * Begin logging events.
112- * If compression is used, log a file that indicates which compression library is used.
113- =======
11492 * Creates the log file in the configured log directory.
11593 *
11694 * The file name contains some metadata about its contents. It follows the following
@@ -128,16 +106,11 @@ private[spark] class EventLoggingListener(
128106 * used to write the file
129107 * - ".inprogress" will be present while the log file is still being written to, and
130108 * removed after the application is finished.
131- >>>>>>> Make event logger use a single file.
132109 */
133110 def start () {
134111 if (! fileSystem.isDirectory(new Path (logBaseDir))) {
135112 throw new IllegalArgumentException (s " Log directory $logBaseDir does not exist. " );
136113 }
137- <<<<<<< HEAD
138- logger.newFile(SPARK_VERSION_PREFIX + SPARK_VERSION )
139- logger.newFile(LOG_PREFIX + logger.fileIndex)
140- =======
141114
142115 val workingPath = logPath + IN_PROGRESS
143116 val uri = new URI (workingPath)
@@ -162,25 +135,17 @@ private[spark] class EventLoggingListener(
162135 writer = Some (new PrintWriter (cstream))
163136
164137 logInfo(" Logging events to %s" .format(logPath))
165- >>>>>>> Make event logger use a single file.
166138 }
167139
168140 /** Log the event as JSON. */
169- private def logEvent (event : SparkListenerEvent , flushLogger : Boolean = false ) {
141+ private def logEvent (event : SparkListenerEvent ) {
170142 val eventJson = JsonProtocol .sparkEventToJson(event)
171-
172143 writer.foreach(_.println(compact(render(eventJson))))
173- if (flushLogger) {
174- writer.foreach(_.flush())
175- hadoopDataStream.foreach(_.sync())
176- }
177-
178144 if (testing) {
179145 loggedEvents += eventJson
180146 }
181147 }
182148
183- // Events that do not trigger a flush
184149 override def onStageSubmitted (event : SparkListenerStageSubmitted ) =
185150 logEvent(event)
186151 override def onTaskStart (event : SparkListenerTaskStart ) =
@@ -191,24 +156,22 @@ private[spark] class EventLoggingListener(
191156 logEvent(event)
192157 override def onEnvironmentUpdate (event : SparkListenerEnvironmentUpdate ) =
193158 logEvent(event)
194-
195- // Events that trigger a flush
196159 override def onStageCompleted (event : SparkListenerStageCompleted ) =
197- logEvent(event, flushLogger = true )
160+ logEvent(event)
198161 override def onJobStart (event : SparkListenerJobStart ) =
199- logEvent(event, flushLogger = true )
162+ logEvent(event)
200163 override def onJobEnd (event : SparkListenerJobEnd ) =
201- logEvent(event, flushLogger = true )
164+ logEvent(event)
202165 override def onBlockManagerAdded (event : SparkListenerBlockManagerAdded ) =
203- logEvent(event, flushLogger = true )
166+ logEvent(event)
204167 override def onBlockManagerRemoved (event : SparkListenerBlockManagerRemoved ) =
205- logEvent(event, flushLogger = true )
168+ logEvent(event)
206169 override def onUnpersistRDD (event : SparkListenerUnpersistRDD ) =
207- logEvent(event, flushLogger = true )
170+ logEvent(event)
208171 override def onApplicationStart (event : SparkListenerApplicationStart ) =
209- logEvent(event, flushLogger = true )
172+ logEvent(event)
210173 override def onApplicationEnd (event : SparkListenerApplicationEnd ) =
211- logEvent(event, flushLogger = true )
174+ logEvent(event)
212175 // No-op because logging every update would be overkill
213176 override def onExecutorMetricsUpdate (event : SparkListenerExecutorMetricsUpdate ) { }
214177
@@ -234,7 +197,7 @@ private[spark] object EventLoggingListener extends Logging {
234197 val LOG_FILE_PERMISSIONS = FsPermission .createImmutable(Integer .parseInt(" 770" , 8 ).toShort)
235198
236199 // Regex for parsing log file names. See description of log file name format in start().
237- val LOG_FILE_NAME_REGEX = s " .+- [0-9]+-([0-9](?:\\ .[0-9])*)(?:-(.+?))?( \\ $IN_PROGRESS)? " .r
200+ val LOG_FILE_NAME_REGEX = s " (.+)-( [0-9]+) -([0-9](?:\\ .[0-9])*)(?:-(.+?))?( \\ $IN_PROGRESS)? " .r
238201
239202 // A cache for compression codecs to avoid creating the same codec many times
240203 private val codecMap = new mutable.HashMap [String , CompressionCodec ]
@@ -248,7 +211,7 @@ private[spark] object EventLoggingListener extends Logging {
248211 */
249212 def parseLoggingInfo (log : Path ): EventLoggingInfo = {
250213 try {
251- val LOG_FILE_NAME_REGEX (version, codecName, inprogress) = log.getName()
214+ val LOG_FILE_NAME_REGEX (_, _, version, codecName, inprogress) = log.getName()
252215 val codec : Option [CompressionCodec ] = if (codecName != null ) {
253216 val conf = new SparkConf ()
254217 conf.set(" spark.io.compression.codec" , codecName)
@@ -259,13 +222,8 @@ private[spark] object EventLoggingListener extends Logging {
259222 EventLoggingInfo (log, version, codec, inprogress == null )
260223 } catch {
261224 case e : Exception =>
262- <<<<<<< HEAD
263- logError(" Exception in parsing logging info from directory %s" .format(logDir), e)
264- EventLoggingInfo .empty
265- =======
266225 logError(" Exception in parsing logging info from file %s" .format(log), e)
267226 null
268- >>>>>>> Make event logger use a single file.
269227 }
270228 }
271229
0 commit comments