diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 9dd113262653b..bb64ad8cb19f6 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -20,15 +20,17 @@ package org.apache.spark // scalastyle:off import java.io.File -import scala.annotation.tailrec +import org.apache.log4j.spi.LoggingEvent -import org.apache.log4j.{Appender, Level, Logger} +import scala.annotation.tailrec +import org.apache.log4j.{Appender, AppenderSkeleton, Level, Logger} import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, Outcome} - import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.util.{AccumulatorContext, Utils} +import scala.collection.mutable.ArrayBuffer + /** * Base abstract class for all unit tests in Spark for handling common functionality. * @@ -186,4 +188,17 @@ abstract class SparkFunSuite } } } + + class LogAppender(maxEvents: Int = 100) extends AppenderSkeleton { + val loggingEvents = new ArrayBuffer[LoggingEvent]() + + override def append(loggingEvent: LoggingEvent): Unit = { + if (loggingEvents.size >= maxEvents) { + throw new IllegalStateException(s"Number of logging event reached the limit: $maxEvents") + } + loggingEvents.append(loggingEvent) + } + override def close(): Unit = {} + override def requiresLayout(): Boolean = false + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala index 49ab34d2ea3a0..4fda65e201e71 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -17,10 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import scala.collection.mutable.ArrayBuffer - -import org.apache.log4j.{AppenderSkeleton, Level} -import org.apache.log4j.spi.LoggingEvent +import org.apache.log4j.Level import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -33,14 +30,6 @@ import org.apache.spark.sql.types.IntegerType class ResolveHintsSuite extends AnalysisTest { import org.apache.spark.sql.catalyst.analysis.TestRelations._ - class MockAppender extends AppenderSkeleton { - val loggingEvents = new ArrayBuffer[LoggingEvent]() - - override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent) - override def close(): Unit = {} - override def requiresLayout(): Boolean = false - } - test("invalid hints should be ignored") { checkAnalysis( UnresolvedHint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")), @@ -234,7 +223,7 @@ class ResolveHintsSuite extends AnalysisTest { } test("log warnings for invalid hints") { - val logAppender = new MockAppender() + val logAppender = new LogAppender withLogAppender(logAppender) { checkAnalysis( UnresolvedHint("unknown_hint", Seq("TaBlE"), table("TaBlE")), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 28d2607e6e43e..32cbb858d86f0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -19,9 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.Timestamp -import org.apache.log4j.AppenderSkeleton -import org.apache.log4j.spi.LoggingEvent - import org.apache.spark.SparkFunSuite import org.apache.spark.metrics.source.CodegenMetrics import org.apache.spark.sql.Row @@ -522,20 +519,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { } test("SPARK-25113: should log when there exists generated methods above HugeMethodLimit") { - class MockAppender extends AppenderSkeleton { - var seenMessage = false - - override def append(loggingEvent: LoggingEvent): Unit = { - if (loggingEvent.getRenderedMessage().contains("Generated method too long")) { - seenMessage = true - } - } - - override def close(): Unit = {} - override def requiresLayout(): Boolean = false - } - - val appender = new MockAppender() + val appender = new LogAppender withLogAppender(appender, loggerName = Some(classOf[CodeGenerator[_, _]].getName)) { val x = 42 val expr = HugeCodeIntExpression(x) @@ -543,7 +527,8 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { val actual = proj(null) assert(actual.getInt(0) == x) } - assert(appender.seenMessage) + assert(appender.loggingEvents + .exists(_.getRenderedMessage().contains("Generated method too long"))) } test("SPARK-28916: subexrepssion elimination can cause 64kb code limit on UnsafeProjection") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala index 7a432d269abe6..927adc1551a88 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala @@ -17,10 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import scala.collection.mutable.ArrayBuffer - -import org.apache.log4j.{Appender, AppenderSkeleton, Level, Logger} -import org.apache.log4j.spi.LoggingEvent +import org.apache.log4j.Level import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -39,34 +36,8 @@ class OptimizerLoggingSuite extends PlanTest { ColumnPruning) :: Nil } - class MockAppender extends AppenderSkeleton { - val loggingEvents = new ArrayBuffer[LoggingEvent]() - - override def append(loggingEvent: LoggingEvent): Unit = { - if (loggingEvent.getRenderedMessage().contains("Applying Rule") || - loggingEvent.getRenderedMessage().contains("Result of Batch") || - loggingEvent.getRenderedMessage().contains("has no effect")) { - loggingEvents.append(loggingEvent) - } - } - - override def close(): Unit = {} - override def requiresLayout(): Boolean = false - } - - private def withLogLevelAndAppender(level: Level, appender: Appender)(f: => Unit): Unit = { - val logger = Logger.getLogger(Optimize.getClass.getName.dropRight(1)) - val restoreLevel = logger.getLevel - logger.setLevel(level) - logger.addAppender(appender) - try f finally { - logger.setLevel(restoreLevel) - logger.removeAppender(appender) - } - } - private def verifyLog(expectedLevel: Level, expectedRulesOrBatches: Seq[String]): Unit = { - val logAppender = new MockAppender() + val logAppender = new LogAppender withLogAppender(logAppender, loggerName = Some(Optimize.getClass.getName.dropRight(1)), level = Some(Level.TRACE)) { val input = LocalRelation('a.int, 'b.string, 'c.double) @@ -74,10 +45,16 @@ class OptimizerLoggingSuite extends PlanTest { val expected = input.where('a > 1).select('a).analyze comparePlans(Optimize.execute(query), expected) } - val logMessages = logAppender.loggingEvents.map(_.getRenderedMessage) + val events = logAppender.loggingEvents.filter { + case event => Seq( + "Applying Rule", + "Result of Batch", + "has no effect").exists(event.getRenderedMessage().contains) + } + val logMessages = events.map(_.getRenderedMessage) assert(expectedRulesOrBatches.forall (ruleOrBatch => logMessages.exists(_.contains(ruleOrBatch)))) - assert(logAppender.loggingEvents.forall(_.getLevel == expectedLevel)) + assert(events.forall(_.getLevel == expectedLevel)) } test("test log level") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index e405864584d07..89af0b09f6393 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala @@ -17,10 +17,7 @@ package org.apache.spark.sql -import scala.collection.mutable.ArrayBuffer - -import org.apache.log4j.{AppenderSkeleton, Level} -import org.apache.log4j.spi.LoggingEvent +import org.apache.log4j.Level import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint import org.apache.spark.sql.catalyst.plans.PlanTest @@ -38,14 +35,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession { lazy val df2 = df.selectExpr("id as b1", "id as b2") lazy val df3 = df.selectExpr("id as c1", "id as c2") - class MockAppender extends AppenderSkeleton { - val loggingEvents = new ArrayBuffer[LoggingEvent]() - - override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent) - override def close(): Unit = {} - override def requiresLayout(): Boolean = false - } - def msgNoHintRelationFound(relation: String, hint: String): String = s"Count not find relation '$relation' specified in hint '$hint'." @@ -59,7 +48,7 @@ class JoinHintSuite extends PlanTest with SharedSparkSession { df: => DataFrame, expectedHints: Seq[JoinHint], warnings: Seq[String]): Unit = { - val logAppender = new MockAppender() + val logAppender = new LogAppender withLogAppender(logAppender) { verifyJoinHint(df, expectedHints) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f6adc7acb2772..ae9aaf15aae9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -32,8 +32,6 @@ import com.univocity.parsers.common.TextParsingException import org.apache.commons.lang3.time.FastDateFormat import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.log4j.{AppenderSkeleton, LogManager} -import org.apache.log4j.spi.LoggingEvent import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} @@ -1763,24 +1761,17 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData { } test("SPARK-23786: warning should be printed if CSV header doesn't conform to schema") { - class TestAppender extends AppenderSkeleton { - var events = new java.util.ArrayList[LoggingEvent] - override def close(): Unit = {} - override def requiresLayout: Boolean = false - protected def append(event: LoggingEvent): Unit = events.add(event) - } - - val testAppender1 = new TestAppender + val testAppender1 = new LogAppender withLogAppender(testAppender1) { val ds = Seq("columnA,columnB", "1.0,1000.0").toDS() val ischema = new StructType().add("columnB", DoubleType).add("columnA", DoubleType) spark.read.schema(ischema).option("header", true).option("enforceSchema", true).csv(ds) } - assert(testAppender1.events.asScala + assert(testAppender1.loggingEvents .exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema"))) - val testAppender2 = new TestAppender + val testAppender2 = new LogAppender withLogAppender(testAppender2) { withTempPath { path => val oschema = new StructType().add("f1", DoubleType).add("f2", DoubleType) @@ -1795,7 +1786,7 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData { .collect() } } - assert(testAppender2.events.asScala + assert(testAppender2.loggingEvents .exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema"))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 23a70f912a5d5..4a58b65325673 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -17,12 +17,10 @@ package org.apache.spark.sql.internal -import scala.collection.mutable.ArrayBuffer import scala.language.reflectiveCalls import org.apache.hadoop.fs.Path -import org.apache.log4j.{AppenderSkeleton, Level} -import org.apache.log4j.spi.LoggingEvent +import org.apache.log4j.Level import org.apache.spark.sql._ import org.apache.spark.sql.internal.StaticSQLConf._ @@ -337,13 +335,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { } test("log deprecation warnings") { - val logAppender = new AppenderSkeleton { - val loggingEvents = new ArrayBuffer[LoggingEvent]() - - override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent) - override def close(): Unit = {} - override def requiresLayout(): Boolean = false - } + val logAppender = new LogAppender def check(config: String): Unit = { assert(logAppender.loggingEvents.exists( e => e.getLevel == Level.WARN &&