Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.util.Utils
* spark.eventLog.compression.codec - The codec to compress logged events
* spark.eventLog.overwrite - Whether to overwrite any existing files
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
* spark.eventLog.excludedPatterns - Specifes a comma-separated event names to be excluded
*
* Note that descendant classes can maintain its own parameters: refer the javadoc of each class
* for more details.
Expand All @@ -58,6 +59,8 @@ abstract class EventLogFileWriter(

protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) &&
!sparkConf.get(EVENT_LOG_COMPRESSION_CODEC).equalsIgnoreCase("none")
protected val excludedPatterns = sparkConf.get(EVENT_LOG_EXCLUDED_PATTERNS)
.map(name => s"""{"Event":"$name"""")
protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt * 1024
protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
Expand Down Expand Up @@ -117,6 +120,7 @@ abstract class EventLogFileWriter(
}

protected def writeLine(line: String, flushLogger: Boolean = false): Unit = {
if (excludedPatterns.exists(line.startsWith(_))) return
// scalastyle:off println
writer.foreach(_.println(line))
// scalastyle:on println
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val EVENT_LOG_EXCLUDED_PATTERNS =
ConfigBuilder("spark.eventLog.excludedPatterns")
.doc("Specifies comma-separated event names to be excluded from the event logs.")
.version("4.1.0")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val EVENT_LOG_ALLOW_EC =
ConfigBuilder("spark.eventLog.erasureCoding.enabled")
.version("3.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon
assert(writer.compressionCodecName === EVENT_LOG_COMPRESSION_CODEC.defaultValue)
}

test("SPARK-52458: Support spark.eventLog.excludedPatterns") {
val appId = getUniqueApplicationId
val attemptId = None

val conf = getLoggingConf(testDirPath, None)
conf.set(EVENT_LOG_EXCLUDED_PATTERNS, Seq("B", "C"))

val writer = createWriter(appId, attemptId, testDirPath.toUri, conf,
SparkHadoopUtil.get.newConfiguration(conf))

writer.start()
Seq("A", "B", "C", "D").foreach { name =>
writer.writeEvent(s"""{"Event":"$name"}""", flushLogger = true)
}
writer.stop()

verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri,
None, Seq("""{"Event":"A"}""", """{"Event":"D"}"""))
}

protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): List[String] = {
val logDataStream = EventLogFileReader.openEventLog(log, fs)
try {
Expand Down