From ddc4f85cf9fbac0c27a409fc7fd6e445ecbba752 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 11 Mar 2019 18:58:39 +0900 Subject: [PATCH 1/2] Fix --- .../sql/catalyst/analysis/ResolveHints.scala | 39 +++++++++++++-- .../apache/spark/sql/internal/SQLConf.scala | 15 ++++++ .../catalyst/analysis/ResolveHintsSuite.scala | 48 +++++++++++++++++++ 3 files changed, 99 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index dbd4ed845e329..bc1c60b8f4ce4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -134,9 +134,42 @@ object ResolveHints { * This must be executed after all the other hint rules are executed. */ object RemoveAllHints extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { - case h: UnresolvedHint => h.child + + def apply(plan: LogicalPlan): LogicalPlan = { + val ignoredHintLogger = new IgnoredHintLogger() + plan resolveOperatorsUp { + case h: UnresolvedHint => + // Logs the unused plan hint before we remove it + ignoredHintLogger.log(h) + h.child + } } - } + private class IgnoredHintLogger { + + private val logLevel = SQLConf.get.planHintIgnoreLogLevel + + def log(h: UnresolvedHint): Unit = { + val message = { + val paramInfo = if (h.parameters.nonEmpty) { + s" params=${h.parameters.mkString(",")}" + } else { + "" + } + s""" + |=== Ignored Plan Hint: name=${h.name}$paramInfo === + |${h.child.treeString} + """.stripMargin + } + logLevel match { + case "TRACE" => logTrace(message) + case "DEBUG" => logDebug(message) + case "INFO" => logInfo(message) + case "WARN" => logWarning(message) + case "ERROR" => logError(message) + case _ => logTrace(message) + } + } + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2271bacb35bd6..4d4f385813623 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -201,6 +201,19 @@ object SQLConf { .stringConf .createOptional + val PLAN_HINT_IGNORE_LOG_LEVEL = + buildConf("spark.sql.planHintIgnoreLog.level") + .internal() + .doc("Configures the log level for logging the plan hint ignored in the analyzer. " + + "The value can be 'trace', 'debug', 'info', 'warn', or 'error'. " + + "The default log level is 'trace'.") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel), + "Invalid value for 'spark.sql.optimizer.planChangeLog.level'. Valid values are " + + "'trace', 'debug', 'info', 'warn' and 'error'.") + .createWithDefault("trace") + val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed") .doc("When set to true Spark SQL will automatically select a compression codec for each " + "column based on statistics of the data.") @@ -1788,6 +1801,8 @@ class SQLConf extends Serializable with Logging { def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE) + def planHintIgnoreLogLevel: String = getConf(PLAN_HINT_IGNORE_LOG_LEVEL) + def useCompression: Boolean = getConf(COMPRESS_CACHED) def orcCompressionCodec: String = getConf(ORC_COMPRESSION) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala index 563e8adf87edc..8d2ef14d3a862 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -17,12 +17,17 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.log4j.{AppenderSkeleton, Level, Logger} +import org.apache.log4j.spi.LoggingEvent + +import org.apache.spark.sql.catalyst.analysis.ResolveHints.RemoveAllHints import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf class ResolveHintsSuite extends AnalysisTest { import org.apache.spark.sql.catalyst.analysis.TestRelations._ @@ -155,4 +160,47 @@ class ResolveHintsSuite extends AnalysisTest { UnresolvedHint("REPARTITION", Seq(Literal(true)), table("TaBlE")), Seq(errMsgRepa)) } + + private def withLogLevel(logLevel: String)(f: => Unit): String = { + val builder = new StringBuilder() + val logAppender = new AppenderSkeleton() { + override def append(loggingEvent: LoggingEvent): Unit = { + builder.append(loggingEvent.getRenderedMessage) + } + override def close(): Unit = {} + override def requiresLayout(): Boolean = false + } + val logger = Logger.getLogger(RemoveAllHints.getClass.getName.dropRight(1)) + val restoreLevel = logger.getLevel + logger.setLevel(Level.toLevel(logLevel)) + logger.addAppender(logAppender) + try f finally { + logger.setLevel(restoreLevel) + logger.removeAppender(logAppender) + } + builder.toString + } + + test("ignored hints should be logged") { + withSQLConf(SQLConf.PLAN_HINT_IGNORE_LOG_LEVEL.key -> "info") { + val expectedMsg = "=== Ignored Plan Hint: name=unknown_hint params=1,3 ===" + Seq("trace", "debug", "info").foreach { logLevel => + val logMsg = withLogLevel(logLevel) { + checkAnalysis( + UnresolvedHint("unknown_hint", Seq(1, 3), testRelation), + testRelation) + } + assert(logMsg.contains(expectedMsg)) + } + + Seq("warn", "error").foreach { logLevel => + val logMsg = withLogLevel(logLevel) { + checkAnalysis( + UnresolvedHint("unknown_hint", Seq(1, 2), testRelation), + testRelation) + } + assert(!logMsg.contains(expectedMsg)) + } + } + } } From 82979d8beb3f9a9bebc49d47123e9d35f8d25a2d Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 20 Mar 2019 09:00:59 +0900 Subject: [PATCH 2/2] Fix --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4d4f385813623..ba6493edf5cad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -210,7 +210,7 @@ object SQLConf { .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel), - "Invalid value for 'spark.sql.optimizer.planChangeLog.level'. Valid values are " + + "Invalid value for 'spark.sql.optimizer.planHintIgnoreLog.level'. Valid values are " + "'trace', 'debug', 'info', 'warn' and 'error'.") .createWithDefault("trace")