Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,42 @@ object ResolveHints {
* This must be executed after all the other hint rules are executed.
*/
object RemoveAllHints extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case h: UnresolvedHint => h.child

def apply(plan: LogicalPlan): LogicalPlan = {
val ignoredHintLogger = new IgnoredHintLogger()
plan resolveOperatorsUp {
case h: UnresolvedHint =>
// Logs the unused plan hint before we remove it
ignoredHintLogger.log(h)
h.child
}
}
}

private class IgnoredHintLogger {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a new class here? Is there any reason why we can't use HintErrorLogger?

Copy link
Member Author

@maropu maropu Oct 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, HintErrorLogger can covert all the case in this pr, so I'll close this. Thanks (I didn't notice that the pr of HintErrorLogger had been merged after this pr made.


private val logLevel = SQLConf.get.planHintIgnoreLogLevel

def log(h: UnresolvedHint): Unit = {
val message = {
val paramInfo = if (h.parameters.nonEmpty) {
s" params=${h.parameters.mkString(",")}"
} else {
""
}
s"""
|=== Ignored Plan Hint: name=${h.name}$paramInfo ===
|${h.child.treeString}
""".stripMargin
}
logLevel match {
case "TRACE" => logTrace(message)
case "DEBUG" => logDebug(message)
case "INFO" => logInfo(message)
case "WARN" => logWarning(message)
case "ERROR" => logError(message)
case _ => logTrace(message)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,19 @@ object SQLConf {
.stringConf
.createOptional

val PLAN_HINT_IGNORE_LOG_LEVEL =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really worth a config?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I don't think we need to make it configurable. We can just log warning.

buildConf("spark.sql.planHintIgnoreLog.level")
.internal()
.doc("Configures the log level for logging the plan hint ignored in the analyzer. " +
"The value can be 'trace', 'debug', 'info', 'warn', or 'error'. " +
"The default log level is 'trace'.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel),
"Invalid value for 'spark.sql.optimizer.planHintIgnoreLog.level'. Valid values are " +
"'trace', 'debug', 'info', 'warn' and 'error'.")
.createWithDefault("trace")

val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
.doc("When set to true Spark SQL will automatically select a compression codec for each " +
"column based on statistics of the data.")
Expand Down Expand Up @@ -1788,6 +1801,8 @@ class SQLConf extends Serializable with Logging {

def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE)

def planHintIgnoreLogLevel: String = getConf(PLAN_HINT_IGNORE_LOG_LEVEL)

def useCompression: Boolean = getConf(COMPRESS_CACHED)

def orcCompressionCodec: String = getConf(ORC_COMPRESSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.log4j.{AppenderSkeleton, Level, Logger}
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.sql.catalyst.analysis.ResolveHints.RemoveAllHints
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf

class ResolveHintsSuite extends AnalysisTest {
import org.apache.spark.sql.catalyst.analysis.TestRelations._
Expand Down Expand Up @@ -155,4 +160,47 @@ class ResolveHintsSuite extends AnalysisTest {
UnresolvedHint("REPARTITION", Seq(Literal(true)), table("TaBlE")),
Seq(errMsgRepa))
}

private def withLogLevel(logLevel: String)(f: => Unit): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse SparkFunSuite.withLogAppender?

val builder = new StringBuilder()
val logAppender = new AppenderSkeleton() {
override def append(loggingEvent: LoggingEvent): Unit = {
builder.append(loggingEvent.getRenderedMessage)
}
override def close(): Unit = {}
override def requiresLayout(): Boolean = false
}
val logger = Logger.getLogger(RemoveAllHints.getClass.getName.dropRight(1))
val restoreLevel = logger.getLevel
logger.setLevel(Level.toLevel(logLevel))
logger.addAppender(logAppender)
try f finally {
logger.setLevel(restoreLevel)
logger.removeAppender(logAppender)
}
builder.toString
}

test("ignored hints should be logged") {
withSQLConf(SQLConf.PLAN_HINT_IGNORE_LOG_LEVEL.key -> "info") {
val expectedMsg = "=== Ignored Plan Hint: name=unknown_hint params=1,3 ==="
Seq("trace", "debug", "info").foreach { logLevel =>
val logMsg = withLogLevel(logLevel) {
checkAnalysis(
UnresolvedHint("unknown_hint", Seq(1, 3), testRelation),
testRelation)
}
assert(logMsg.contains(expectedMsg))
}

Seq("warn", "error").foreach { logLevel =>
val logMsg = withLogLevel(logLevel) {
checkAnalysis(
UnresolvedHint("unknown_hint", Seq(1, 2), testRelation),
testRelation)
}
assert(!logMsg.contains(expectedMsg))
}
}
}
}