diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index c23a659e76df..c4517d3dfd93 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -34,7 +34,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore import com.google.common.io.ByteStreams import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} -import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem} import org.apache.hadoop.hdfs.protocol.HdfsConstants import org.apache.hadoop.security.AccessControlException import org.fusesource.leveldbjni.internal.NativeDB @@ -449,7 +449,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(info.copy(lastProcessed = newLastScanTime, fileSize = entry.getLen())) } - if (info.fileSize < entry.getLen()) { + if (shouldReloadLog(info, entry)) { if (info.appId.isDefined && fastInProgressParsing) { // When fast in-progress parsing is on, we don't need to re-parse when the // size changes, but we do need to invalidate any existing UIs. @@ -541,6 +541,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + private[history] def shouldReloadLog(info: LogInfo, entry: FileStatus): Boolean = { + var result = info.fileSize < entry.getLen + if (!result && info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) { + try { + result = Utils.tryWithResource(fs.open(entry.getPath)) { in => + in.getWrappedStream match { + case dfsIn: DFSInputStream => info.fileSize < dfsIn.getFileLength + case _ => false + } + } + } catch { + case e: Exception => + logDebug(s"Failed to check the length for the file : ${info.logPath}", e) + } + } + result + } + private def cleanAppData(appId: String, attemptId: Option[String], logPath: String): Unit = { try { val app = load(appId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 1629e1797977..f89fcd18ef56 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler import java.io._ import java.net.URI import java.nio.charset.StandardCharsets -import java.util.EnumSet import java.util.Locale import scala.collection.mutable.{ArrayBuffer, Map} @@ -28,8 +27,6 @@ import scala.collection.mutable.{ArrayBuffer, Map} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.hdfs.DFSOutputStream -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ @@ -149,10 +146,7 @@ private[spark] class EventLoggingListener( // scalastyle:on println if (flushLogger) { writer.foreach(_.flush()) - hadoopDataStream.foreach(ds => ds.getWrappedStream match { - case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)) - case _ => ds.hflush() - }) + hadoopDataStream.foreach(_.hflush()) } if (testing) { loggedEvents += eventJson diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 444e8d6e11f8..6a761d43a5a6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -27,8 +27,8 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path} +import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem} import org.apache.hadoop.security.AccessControlException import org.json4s.jackson.JsonMethods._ import org.mockito.ArgumentMatcher @@ -856,6 +856,39 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc assert(!mockedProvider.isBlacklisted(accessDeniedPath)) } + test("check in-progress event logs absolute length") { + val path = new Path("testapp.inprogress") + val provider = new FsHistoryProvider(createTestConf()) + val mockedProvider = spy(provider) + val mockedFs = mock(classOf[FileSystem]) + val in = mock(classOf[FSDataInputStream]) + val dfsIn = mock(classOf[DFSInputStream]) + when(mockedProvider.fs).thenReturn(mockedFs) + when(mockedFs.open(path)).thenReturn(in) + when(in.getWrappedStream).thenReturn(dfsIn) + when(dfsIn.getFileLength).thenReturn(200) + // FileStatus.getLen is more than logInfo fileSize + var fileStatus = new FileStatus(200, false, 0, 0, 0, path) + var logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 100) + assert(mockedProvider.shouldReloadLog(logInfo, fileStatus)) + + fileStatus = new FileStatus() + fileStatus.setPath(path) + // DFSInputStream.getFileLength is more than logInfo fileSize + logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 100) + assert(mockedProvider.shouldReloadLog(logInfo, fileStatus)) + // DFSInputStream.getFileLength is equal to logInfo fileSize + logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 200) + assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus)) + // in.getWrappedStream returns other than DFSInputStream + val bin = mock(classOf[BufferedInputStream]) + when(in.getWrappedStream).thenReturn(bin) + assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus)) + // fs.open throws exception + when(mockedFs.open(path)).thenThrow(new IOException("Throwing intentionally")) + assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus)) + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: