Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore
import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.MoreExecutors
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem}
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
import org.fusesource.leveldbjni.internal.NativeDB
Expand Down Expand Up @@ -129,6 +129,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_))
private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)
private val inProgressAbsoluteLengthCheck = conf.get(IN_PROGRESS_ABSOLUTE_LENGTH_CHECK)

// Visible for testing.
private[history] val listing: KVStore = storePath.map { path =>
Expand Down Expand Up @@ -449,7 +450,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.write(info.copy(lastProcessed = newLastScanTime, fileSize = entry.getLen()))
}

if (info.fileSize < entry.getLen()) {
if (info.fileSize < entry.getLen() || checkAbsoluteLength(info, entry)) {
Copy link
Member

@gengliangwang gengliangwang Oct 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can create a function to get the length of given file:

  1. If the new conf is enabled and the input is DFSInputStream, use getFileLength (or max(getFileLength, entry.getLen()))
  2. otherwise entry.getLen()

The logic can be simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you looked @ this getFileLength() call to see how well it updates?

FwIW HADOOP-15606 proposes adding a method like this for all streams, though that proposal includes the need for specification and tests. Generally the HDFS team are a bit lax about that spec -> test workflow, which doesn't help downstream code or other implementations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gengliangwang for looking into this. Here it doesn't need to check the checkAbsoluteLength if the FileStatus.getLen() is more than the cached fileSize, if we update to max(getFileLength, entry.getLen())) it checks the absoluteLength always which may not be necessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @steveloughran for looking into this.

Have you looked @ this getFileLength() call to see how well it updates?

I looked at the DFSInputStream.getFileLength() api, it gives locatedBlocks.getFileLength() + lastBlockBeingWrittenLength, here locatedBlocks.getFileLength() is the value got from NameNode for all the completed blocks and lastBlockBeingWrittenLength is the lastblock lenth from DataNode which is not the completed block.

FwIW HADOOP-15606 proposes adding a method like this for all streams

Thanks for the pointer, once this is available we can update to use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...there's no timetable for that getLength thing, but if HDFS already supports the API, I'm more motivated to implement it. It has benefits in cloud stores in general

  1. saves apps going an up front HEAD/getFileStatus() to know how long their data is; the GET should return it.
  2. for S3 Select, you get back the filtered data so don't know how much you will see until the GET is issued

if (info.appId.isDefined && fastInProgressParsing) {
// When fast in-progress parsing is on, we don't need to re-parse when the
// size changes, but we do need to invalidate any existing UIs.
Expand Down Expand Up @@ -541,6 +542,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

private[history] def checkAbsoluteLength(info: LogInfo, entry: FileStatus): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the method and the return value are a little cryptic. What does it mean to check?

Might be better to calls it something like shouldReloadLog or something. You could also move the existing check into this function and make the call site simpler.

var result = false
if (inProgressAbsoluteLengthCheck && info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
try {
result = Utils.tryWithResource(fs.open(entry.getPath)) { in =>
in.getWrappedStream match {
case dfsIn: DFSInputStream => dfsIn.getFileLength > info.fileSize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring the discussion about the getFileLength API, If this is not a DFSInpytStream you'll get a MatchError. Better to handle the default case explicitly.

}
}
} catch {
case e: Exception =>
logDebug(s"Failed to check the length for the file : ${info.logPath}", e)
}
}
result
}

private def cleanAppData(appId: String, attemptId: Option[String], logPath: String): Unit = {
try {
val app = load(appId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,11 @@ private[spark] object config {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1m")

val IN_PROGRESS_ABSOLUTE_LENGTH_CHECK =
ConfigBuilder("spark.history.fs.inProgressAbsoluteLengthCheck.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any disadvantage in just leaving this always on? Otherwise this doesn't need to be configurable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new check adds overhead to the checkForLogs thread, made it disabled by default since most of the users may not want to see the history ui for the running applications, they can enable it explicitly if they want to see progress apps in history ui. I can remove this config if you think not much useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much overhead are we talking about?

That thread is not really in any performance critical path, and in general people won't have so many running apps that this should become a problem... but that kinda depends on how bad this new call is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call makes two invocations, one for getting blocks info from the NameNode and another for getting the last block info from DataNode. I agree this is not a performance critical path, I will remove the config in the update.

.doc("Enable to check the absolute length of the in-progress" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain a little bit in details? So that general user can know what the benefit is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have anything in mind to make it better? thanks

" logs while considering for re-parsing.")
.booleanConf
.createWithDefault(false)

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@ package org.apache.spark.scheduler
import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.EnumSet
import java.util.Locale

import scala.collection.mutable.{ArrayBuffer, Map}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.hdfs.DFSOutputStream
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._

Expand Down Expand Up @@ -149,10 +146,7 @@ private[spark] class EventLoggingListener(
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(ds => ds.getWrappedStream match {
case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH))
case _ => ds.hflush()
})
hadoopDataStream.foreach(_.hflush())
}
if (testing) {
loggedEvents += eventJson
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import scala.concurrent.duration._
import scala.language.postfixOps

import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path}
import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem}
import org.apache.hadoop.security.AccessControlException
import org.json4s.jackson.JsonMethods._
import org.mockito.ArgumentMatcher
Expand Down Expand Up @@ -856,6 +856,35 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
}

test("check in-progress event logs absolute length") {
val path = new Path("testapp.inprogress")
val provider = new FsHistoryProvider(createTestConf()
.set(IN_PROGRESS_ABSOLUTE_LENGTH_CHECK, true))
val mockedProvider = spy(provider)
val mockedFs = mock(classOf[FileSystem])
val in = mock(classOf[FSDataInputStream])
val dfsIn = mock(classOf[DFSInputStream])
when(mockedProvider.fs).thenReturn(mockedFs)
when(mockedFs.open(path)).thenReturn(in)
when(in.getWrappedStream).thenReturn(dfsIn)
when(dfsIn.getFileLength).thenReturn(200)
val fileStatus = new FileStatus()
fileStatus.setPath(path)
// file length is more than logInfo fileSize
var logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 100)
assert(mockedProvider.checkAbsoluteLength(logInfo, fileStatus))
// file length is equal to logInfo fileSize
logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 200)
assert(!mockedProvider.checkAbsoluteLength(logInfo, fileStatus))
// in.getWrappedStream returns other than DFSInputStream
val bin = mock(classOf[BufferedInputStream])
when(in.getWrappedStream).thenReturn(bin)
assert(!mockedProvider.checkAbsoluteLength(logInfo, fileStatus))
// fs.open throws exception
when(mockedFs.open(path)).thenThrow(new IOException("Throwing intentionally"))
assert(!mockedProvider.checkAbsoluteLength(logInfo, fileStatus))
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down