From 17f34cce538e92379a0fde1efe7aaf24b5c7b22b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 11 Jun 2025 19:26:05 -0700 Subject: [PATCH 1/2] [SPARK-52458][CORE] Support `spark.eventLog.excludedPatterns` --- .../deploy/history/EventLogFileWriters.scala | 4 ++++ .../spark/internal/config/package.scala | 7 +++++++ .../history/EventLogFileWritersSuite.scala | 20 +++++++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 990ab680f3aaf..4572bb45cb263 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -43,6 +43,7 @@ import org.apache.spark.util.Utils * spark.eventLog.compression.codec - The codec to compress logged events * spark.eventLog.overwrite - Whether to overwrite any existing files * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams + * spark.eventLog.excludedPatterns - Specifes a comma-separated event names to be excluded * * Note that descendant classes can maintain its own parameters: refer the javadoc of each class * for more details. @@ -58,6 +59,8 @@ abstract class EventLogFileWriter( protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) && !sparkConf.get(EVENT_LOG_COMPRESSION_CODEC).equalsIgnoreCase("none") + protected val excludedPatterns = sparkConf.get(EVENT_LOG_EXCLUDED_PATTERNS) + .toSeq.flatMap(Utils.stringToSeq).map(name => s"""{"Event":"$name"""") protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt * 1024 protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) @@ -117,6 +120,7 @@ abstract class EventLogFileWriter( } protected def writeLine(line: String, flushLogger: Boolean = false): Unit = { + if (excludedPatterns.exists(line.startsWith(_))) return // scalastyle:off println writer.foreach(_.println(line)) // scalastyle:on println diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 000de4d6c26d2..cb365c558fc57 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -224,6 +224,13 @@ package object config { .booleanConf .createWithDefault(false) + private[spark] val EVENT_LOG_EXCLUDED_PATTERNS = + ConfigBuilder("spark.eventLog.excludedPatterns") + .doc("Specifies a comma-separated event names to be excluded from the event logs.") + .version("4.1.0") + .stringConf + .createOptional + private[spark] val EVENT_LOG_ALLOW_EC = ConfigBuilder("spark.eventLog.erasureCoding.enabled") .version("3.0.0") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index da3614bf81a55..2c83709f80866 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -112,6 +112,26 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon assert(writer.compressionCodecName === EVENT_LOG_COMPRESSION_CODEC.defaultValue) } + test("SPARK-52458: Support spark.eventLog.excludedPatterns") { + val appId = getUniqueApplicationId + val attemptId = None + + val conf = getLoggingConf(testDirPath, None) + conf.set(EVENT_LOG_EXCLUDED_PATTERNS, "B,C") + + val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, + SparkHadoopUtil.get.newConfiguration(conf)) + + writer.start() + Seq("A", "B", "C", "D").foreach { name => + writer.writeEvent(s"""{"Event":"$name"}""", flushLogger = true) + } + writer.stop() + + verifyWriteEventLogFile(appId, attemptId, testDirPath.toUri, + None, Seq("""{"Event":"A"}""", """{"Event":"D"}""")) + } + protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): List[String] = { val logDataStream = EventLogFileReader.openEventLog(log, fs) try { From 1ba3858a596c467cba71b4a6e754829ae586c36e Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 11 Jun 2025 20:04:39 -0700 Subject: [PATCH 2/2] Address comment --- .../apache/spark/deploy/history/EventLogFileWriters.scala | 2 +- .../scala/org/apache/spark/internal/config/package.scala | 5 +++-- .../spark/deploy/history/EventLogFileWritersSuite.scala | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 4572bb45cb263..83bc1443112a0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -60,7 +60,7 @@ abstract class EventLogFileWriter( protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) && !sparkConf.get(EVENT_LOG_COMPRESSION_CODEC).equalsIgnoreCase("none") protected val excludedPatterns = sparkConf.get(EVENT_LOG_EXCLUDED_PATTERNS) - .toSeq.flatMap(Utils.stringToSeq).map(name => s"""{"Event":"$name"""") + .map(name => s"""{"Event":"$name"""") protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt * 1024 protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index cb365c558fc57..db9bb89be4ed0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -226,10 +226,11 @@ package object config { private[spark] val EVENT_LOG_EXCLUDED_PATTERNS = ConfigBuilder("spark.eventLog.excludedPatterns") - .doc("Specifies a comma-separated event names to be excluded from the event logs.") + .doc("Specifies comma-separated event names to be excluded from the event logs.") .version("4.1.0") .stringConf - .createOptional + .toSequence + .createWithDefault(Nil) private[spark] val EVENT_LOG_ALLOW_EC = ConfigBuilder("spark.eventLog.erasureCoding.enabled") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index 2c83709f80866..a2718b973b15d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -117,7 +117,7 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon val attemptId = None val conf = getLoggingConf(testDirPath, None) - conf.set(EVENT_LOG_EXCLUDED_PATTERNS, "B,C") + conf.set(EVENT_LOG_EXCLUDED_PATTERNS, Seq("B", "C")) val writer = createWriter(appId, attemptId, testDirPath.toUri, conf, SparkHadoopUtil.get.newConfiguration(conf))