Skip to content

Commit 1b075f2

Browse files
Devaraj KMarcelo Vanzin
authored andcommitted
[SPARK-24787][CORE] Revert hsync in EventLoggingListener and make FsHistoryProvider to read lastBlockBeingWritten data for logs
## What changes were proposed in this pull request? `hsync` has been added as part of SPARK-19531 to get the latest data in the history sever ui, but that is causing the performance overhead and also leading to drop many history log events. `hsync` uses the force `FileChannel.force` to sync the data to the disk and happens for the data pipeline, it is costly operation and making the application to face overhead and drop the events. I think getting the latest data in history server can be done in different way (no impact to application while writing events), there is an api `DFSInputStream.getFileLength()` which gives the file length including the `lastBlockBeingWrittenLength`(different from `FileStatus.getLen()`), this api can be used when the file status length and previously cached length are equal to verify whether any new data has been written or not, if there is any update in data length then the history server can update the in progress history log. And also I made this change as configurable with the default value false, and can be enabled for history server if users want to see the updated data in ui. ## How was this patch tested? Added new test and verified manually, with the added conf `spark.history.fs.inProgressAbsoluteLengthCheck.enabled=true`, history server is reading the logs including the last block data which is being written and updating the Web UI with the latest data. Closes #22752 from devaraj-kavali/SPARK-24787. Authored-by: Devaraj K <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]> (cherry picked from commit 46d2d2c) Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 45ed76d commit 1b075f2

File tree

3 files changed

+56
-11
lines changed

3 files changed

+56
-11
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore
3434
import com.google.common.io.ByteStreams
3535
import com.google.common.util.concurrent.MoreExecutors
3636
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
37-
import org.apache.hadoop.hdfs.DistributedFileSystem
37+
import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem}
3838
import org.apache.hadoop.hdfs.protocol.HdfsConstants
3939
import org.apache.hadoop.security.AccessControlException
4040
import org.fusesource.leveldbjni.internal.NativeDB
@@ -449,7 +449,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
449449
listing.write(info.copy(lastProcessed = newLastScanTime, fileSize = entry.getLen()))
450450
}
451451

452-
if (info.fileSize < entry.getLen()) {
452+
if (shouldReloadLog(info, entry)) {
453453
if (info.appId.isDefined && fastInProgressParsing) {
454454
// When fast in-progress parsing is on, we don't need to re-parse when the
455455
// size changes, but we do need to invalidate any existing UIs.
@@ -541,6 +541,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
541541
}
542542
}
543543

544+
private[history] def shouldReloadLog(info: LogInfo, entry: FileStatus): Boolean = {
545+
var result = info.fileSize < entry.getLen
546+
if (!result && info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
547+
try {
548+
result = Utils.tryWithResource(fs.open(entry.getPath)) { in =>
549+
in.getWrappedStream match {
550+
case dfsIn: DFSInputStream => info.fileSize < dfsIn.getFileLength
551+
case _ => false
552+
}
553+
}
554+
} catch {
555+
case e: Exception =>
556+
logDebug(s"Failed to check the length for the file : ${info.logPath}", e)
557+
}
558+
}
559+
result
560+
}
561+
544562
private def cleanAppData(appId: String, attemptId: Option[String], logPath: String): Unit = {
545563
try {
546564
val app = load(appId)

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler
2020
import java.io._
2121
import java.net.URI
2222
import java.nio.charset.StandardCharsets
23-
import java.util.EnumSet
2423
import java.util.Locale
2524

2625
import scala.collection.mutable
@@ -29,8 +28,6 @@ import scala.collection.mutable.ArrayBuffer
2928
import org.apache.hadoop.conf.Configuration
3029
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
3130
import org.apache.hadoop.fs.permission.FsPermission
32-
import org.apache.hadoop.hdfs.DFSOutputStream
33-
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag
3431
import org.json4s.JsonAST.JValue
3532
import org.json4s.jackson.JsonMethods._
3633

@@ -144,10 +141,7 @@ private[spark] class EventLoggingListener(
144141
// scalastyle:on println
145142
if (flushLogger) {
146143
writer.foreach(_.flush())
147-
hadoopDataStream.foreach(ds => ds.getWrappedStream match {
148-
case wrapped: DFSOutputStream => wrapped.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH))
149-
case _ => ds.hflush()
150-
})
144+
hadoopDataStream.foreach(_.hflush())
151145
}
152146
if (testing) {
153147
loggedEvents += eventJson

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import scala.concurrent.duration._
2727
import scala.language.postfixOps
2828

2929
import com.google.common.io.{ByteStreams, Files}
30-
import org.apache.hadoop.fs.{FileStatus, Path}
31-
import org.apache.hadoop.hdfs.DistributedFileSystem
30+
import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path}
31+
import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem}
3232
import org.apache.hadoop.security.AccessControlException
3333
import org.json4s.jackson.JsonMethods._
3434
import org.mockito.ArgumentMatcher
@@ -856,6 +856,39 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
856856
assert(!mockedProvider.isBlacklisted(accessDeniedPath))
857857
}
858858

859+
test("check in-progress event logs absolute length") {
860+
val path = new Path("testapp.inprogress")
861+
val provider = new FsHistoryProvider(createTestConf())
862+
val mockedProvider = spy(provider)
863+
val mockedFs = mock(classOf[FileSystem])
864+
val in = mock(classOf[FSDataInputStream])
865+
val dfsIn = mock(classOf[DFSInputStream])
866+
when(mockedProvider.fs).thenReturn(mockedFs)
867+
when(mockedFs.open(path)).thenReturn(in)
868+
when(in.getWrappedStream).thenReturn(dfsIn)
869+
when(dfsIn.getFileLength).thenReturn(200)
870+
// FileStatus.getLen is more than logInfo fileSize
871+
var fileStatus = new FileStatus(200, false, 0, 0, 0, path)
872+
var logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 100)
873+
assert(mockedProvider.shouldReloadLog(logInfo, fileStatus))
874+
875+
fileStatus = new FileStatus()
876+
fileStatus.setPath(path)
877+
// DFSInputStream.getFileLength is more than logInfo fileSize
878+
logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 100)
879+
assert(mockedProvider.shouldReloadLog(logInfo, fileStatus))
880+
// DFSInputStream.getFileLength is equal to logInfo fileSize
881+
logInfo = new LogInfo(path.toString, 0, Some("appId"), Some("attemptId"), 200)
882+
assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
883+
// in.getWrappedStream returns other than DFSInputStream
884+
val bin = mock(classOf[BufferedInputStream])
885+
when(in.getWrappedStream).thenReturn(bin)
886+
assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
887+
// fs.open throws exception
888+
when(mockedFs.open(path)).thenThrow(new IOException("Throwing intentionally"))
889+
assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
890+
}
891+
859892
/**
860893
* Asks the provider to check for logs and calls a function to perform checks on the updated
861894
* app list. Example:

0 commit comments

Comments
 (0)