Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions core/src/test/scala/org/apache/spark/SparkFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ package org.apache.spark
// scalastyle:off
import java.io.File

import scala.annotation.tailrec
import org.apache.log4j.spi.LoggingEvent

import org.apache.log4j.{Appender, Level, Logger}
import scala.annotation.tailrec
import org.apache.log4j.{Appender, AppenderSkeleton, Level, Logger}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach, FunSuite, Outcome}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.util.{AccumulatorContext, Utils}

import scala.collection.mutable.ArrayBuffer

/**
* Base abstract class for all unit tests in Spark for handling common functionality.
*
Expand Down Expand Up @@ -186,4 +188,17 @@ abstract class SparkFunSuite
}
}
}

class LogAppender(maxEvents: Int = 100) extends AppenderSkeleton {
val loggingEvents = new ArrayBuffer[LoggingEvent]()

override def append(loggingEvent: LoggingEvent): Unit = {
if (loggingEvents.size >= maxEvents) {
throw new IllegalStateException(s"Number of logging event reached the limit: $maxEvents")
}
loggingEvents.append(loggingEvent)
}
override def close(): Unit = {}
override def requiresLayout(): Boolean = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable.ArrayBuffer

import org.apache.log4j.{AppenderSkeleton, Level}
import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.Level

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand All @@ -33,14 +30,6 @@ import org.apache.spark.sql.types.IntegerType
class ResolveHintsSuite extends AnalysisTest {
import org.apache.spark.sql.catalyst.analysis.TestRelations._

class MockAppender extends AppenderSkeleton {
val loggingEvents = new ArrayBuffer[LoggingEvent]()

override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent)
override def close(): Unit = {}
override def requiresLayout(): Boolean = false
}

test("invalid hints should be ignored") {
checkAnalysis(
UnresolvedHint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")),
Expand Down Expand Up @@ -234,7 +223,7 @@ class ResolveHintsSuite extends AnalysisTest {
}

test("log warnings for invalid hints") {
val logAppender = new MockAppender()
val logAppender = new LogAppender
withLogAppender(logAppender) {
checkAnalysis(
UnresolvedHint("unknown_hint", Seq("TaBlE"), table("TaBlE")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package org.apache.spark.sql.catalyst.expressions

import java.sql.Timestamp

import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.SparkFunSuite
import org.apache.spark.metrics.source.CodegenMetrics
import org.apache.spark.sql.Row
Expand Down Expand Up @@ -522,28 +519,16 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
}

test("SPARK-25113: should log when there exists generated methods above HugeMethodLimit") {
class MockAppender extends AppenderSkeleton {
var seenMessage = false

override def append(loggingEvent: LoggingEvent): Unit = {
if (loggingEvent.getRenderedMessage().contains("Generated method too long")) {
seenMessage = true
}
}

override def close(): Unit = {}
override def requiresLayout(): Boolean = false
}

val appender = new MockAppender()
val appender = new LogAppender
withLogAppender(appender, loggerName = Some(classOf[CodeGenerator[_, _]].getName)) {
val x = 42
val expr = HugeCodeIntExpression(x)
val proj = GenerateUnsafeProjection.generate(Seq(expr))
val actual = proj(null)
assert(actual.getInt(0) == x)
}
assert(appender.seenMessage)
assert(appender.loggingEvents
.exists(_.getRenderedMessage().contains("Generated method too long")))
}

test("SPARK-28916: subexrepssion elimination can cause 64kb code limit on UnsafeProjection") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import scala.collection.mutable.ArrayBuffer

import org.apache.log4j.{Appender, AppenderSkeleton, Level, Logger}
import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.Level

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand All @@ -39,45 +36,25 @@ class OptimizerLoggingSuite extends PlanTest {
ColumnPruning) :: Nil
}

class MockAppender extends AppenderSkeleton {
val loggingEvents = new ArrayBuffer[LoggingEvent]()

override def append(loggingEvent: LoggingEvent): Unit = {
if (loggingEvent.getRenderedMessage().contains("Applying Rule") ||
loggingEvent.getRenderedMessage().contains("Result of Batch") ||
loggingEvent.getRenderedMessage().contains("has no effect")) {
loggingEvents.append(loggingEvent)
}
}

override def close(): Unit = {}
override def requiresLayout(): Boolean = false
}

private def withLogLevelAndAppender(level: Level, appender: Appender)(f: => Unit): Unit = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is removed because the method is not used in the test suite.

val logger = Logger.getLogger(Optimize.getClass.getName.dropRight(1))
val restoreLevel = logger.getLevel
logger.setLevel(level)
logger.addAppender(appender)
try f finally {
logger.setLevel(restoreLevel)
logger.removeAppender(appender)
}
}

private def verifyLog(expectedLevel: Level, expectedRulesOrBatches: Seq[String]): Unit = {
val logAppender = new MockAppender()
val logAppender = new LogAppender
withLogAppender(logAppender,
loggerName = Some(Optimize.getClass.getName.dropRight(1)), level = Some(Level.TRACE)) {
val input = LocalRelation('a.int, 'b.string, 'c.double)
val query = input.select('a, 'b).select('a).where('a > 1).analyze
val expected = input.where('a > 1).select('a).analyze
comparePlans(Optimize.execute(query), expected)
}
val logMessages = logAppender.loggingEvents.map(_.getRenderedMessage)
val events = logAppender.loggingEvents.filter {
case event => Seq(
"Applying Rule",
"Result of Batch",
"has no effect").exists(event.getRenderedMessage().contains)
}
val logMessages = events.map(_.getRenderedMessage)
assert(expectedRulesOrBatches.forall
(ruleOrBatch => logMessages.exists(_.contains(ruleOrBatch))))
assert(logAppender.loggingEvents.forall(_.getLevel == expectedLevel))
assert(events.forall(_.getLevel == expectedLevel))
}

test("test log level") {
Expand Down
15 changes: 2 additions & 13 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.sql

import scala.collection.mutable.ArrayBuffer

import org.apache.log4j.{AppenderSkeleton, Level}
import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.Level

import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
import org.apache.spark.sql.catalyst.plans.PlanTest
Expand All @@ -38,14 +35,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession {
lazy val df2 = df.selectExpr("id as b1", "id as b2")
lazy val df3 = df.selectExpr("id as c1", "id as c2")

class MockAppender extends AppenderSkeleton {
val loggingEvents = new ArrayBuffer[LoggingEvent]()

override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent)
override def close(): Unit = {}
override def requiresLayout(): Boolean = false
}

def msgNoHintRelationFound(relation: String, hint: String): String =
s"Count not find relation '$relation' specified in hint '$hint'."

Expand All @@ -59,7 +48,7 @@ class JoinHintSuite extends PlanTest with SharedSparkSession {
df: => DataFrame,
expectedHints: Seq[JoinHint],
warnings: Seq[String]): Unit = {
val logAppender = new MockAppender()
val logAppender = new LogAppender
withLogAppender(logAppender) {
verifyJoinHint(df, expectedHints)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import com.univocity.parsers.common.TextParsingException
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.log4j.{AppenderSkeleton, LogManager}
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
Expand Down Expand Up @@ -1763,24 +1761,17 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData {
}

test("SPARK-23786: warning should be printed if CSV header doesn't conform to schema") {
class TestAppender extends AppenderSkeleton {
var events = new java.util.ArrayList[LoggingEvent]
override def close(): Unit = {}
override def requiresLayout: Boolean = false
protected def append(event: LoggingEvent): Unit = events.add(event)
}

val testAppender1 = new TestAppender
val testAppender1 = new LogAppender
withLogAppender(testAppender1) {
val ds = Seq("columnA,columnB", "1.0,1000.0").toDS()
val ischema = new StructType().add("columnB", DoubleType).add("columnA", DoubleType)

spark.read.schema(ischema).option("header", true).option("enforceSchema", true).csv(ds)
}
assert(testAppender1.events.asScala
assert(testAppender1.loggingEvents
.exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema")))

val testAppender2 = new TestAppender
val testAppender2 = new LogAppender
withLogAppender(testAppender2) {
withTempPath { path =>
val oschema = new StructType().add("f1", DoubleType).add("f2", DoubleType)
Expand All @@ -1795,7 +1786,7 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData {
.collect()
}
}
assert(testAppender2.events.asScala
assert(testAppender2.loggingEvents
.exists(msg => msg.getRenderedMessage.contains("CSV header does not conform to the schema")))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

package org.apache.spark.sql.internal

import scala.collection.mutable.ArrayBuffer
import scala.language.reflectiveCalls

import org.apache.hadoop.fs.Path
import org.apache.log4j.{AppenderSkeleton, Level}
import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.Level

import org.apache.spark.sql._
import org.apache.spark.sql.internal.StaticSQLConf._
Expand Down Expand Up @@ -337,13 +335,7 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
}

test("log deprecation warnings") {
val logAppender = new AppenderSkeleton {
val loggingEvents = new ArrayBuffer[LoggingEvent]()

override def append(loggingEvent: LoggingEvent): Unit = loggingEvents.append(loggingEvent)
override def close(): Unit = {}
override def requiresLayout(): Boolean = false
}
val logAppender = new LogAppender
def check(config: String): Unit = {
assert(logAppender.loggingEvents.exists(
e => e.getLevel == Level.WARN &&
Expand Down