Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.MoreExecutors
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem}
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
import org.fusesource.leveldbjni.internal.NativeDB
Expand Down Expand Up @@ -449,7 +449,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.write(info.copy(lastProcessed = newLastScanTime, fileSize = entry.getLen()))
}

if (info.fileSize < entry.getLen()) {
if (shouldReloadLog(info, entry)) {
if (info.appId.isDefined && fastInProgressParsing) {
// When fast in-progress parsing is on, we don't need to re-parse when the
// size changes, but we do need to invalidate any existing UIs.
Expand Down Expand Up @@ -541,6 +541,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

private[history] def shouldReloadLog(info: LogInfo, entry: FileStatus): Boolean = {
var result = info.fileSize < entry.getLen
if (!result && info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
try {
result = Utils.tryWithResource(fs.open(entry.getPath)) { in =>
in.getWrappedStream match {
case dfsIn: DFSInputStream => info.fileSize < dfsIn.getFileLength
case _ => false
}
}
} catch {
case e: Exception =>
logDebug(s"Failed to check the length for the file : ${info.logPath}", e)
}
}
result
}

private def cleanAppData(appId: String, attemptId: Option[String], logPath: String): Unit = {
try {
val app = load(appId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@ package org.apache.spark.scheduler
import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.EnumSet
import java.util.Locale

import scala.collection.mutable.{ArrayBuffer, Map}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.hdfs.DFSOutputStream
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._

Expand Down Expand Up @@ -149,10 +146,7 @@ private[spark] class EventLoggingListener(
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(ds => ds.getWrappedStream match {
case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH))
case _ => ds.hflush()
})
hadoopDataStream.foreach(_.hflush())
}
if (testing) {
loggedEvents += eventJson
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path}
import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem}
import org.apache.hadoop.security.AccessControlException
import org.json4s.jackson.JsonMethods._
import org.mockito.ArgumentMatcher
Expand Down Expand Up @@ -856,6 +856,39 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
}

test("check in-progress event logs absolute length") {
val path = new Path("testapp.inprogress")
val provider = new FsHistoryProvider(createTestConf())
val mockedProvider = spy(provider)
val mockedFs = mock(classOf[FileSystem])
val in = mock(classOf[FSDataInputStream])
val dfsIn = mock(classOf[DFSInputStream])
when(mockedProvider.fs).thenReturn(mockedFs)
when(mockedFs.open(path)).thenReturn(in)
when(in.getWrappedStream).thenReturn(dfsIn)
when(dfsIn.getFileLength).thenReturn(200)
// FileStatus.getLen is more than logInfo fileSize
var fileStatus = new FileStatus(200, false, 0, 0, 0, path)
var logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 100)
assert(mockedProvider.shouldReloadLog(logInfo, fileStatus))

fileStatus = new FileStatus()
fileStatus.setPath(path)
// DFSInputStream.getFileLength is more than logInfo fileSize
logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 100)
assert(mockedProvider.shouldReloadLog(logInfo, fileStatus))
// DFSInputStream.getFileLength is equal to logInfo fileSize
logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 200)
assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
// in.getWrappedStream returns other than DFSInputStream
val bin = mock(classOf[BufferedInputStream])
when(in.getWrappedStream).thenReturn(bin)
assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
// fs.open throws exception
when(mockedFs.open(path)).thenThrow(new IOException("Throwing intentionally"))
assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down