Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object EliminateResolvedHint extends Rule[LogicalPlan] {
* in this method will be cleaned up later by this rule, and may emit warnings depending on the
* configurations.
*/
private def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
private[sql] def extractHintsFromPlan(plan: LogicalPlan): (LogicalPlan, Seq[HintInfo]) = {
plan match {
case h: ResolvedHint =>
val (plan, hints) = extractHintsFromPlan(h.child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
Expand Down Expand Up @@ -212,17 +213,18 @@ class CacheManager extends Logging {
def useCachedData(plan: LogicalPlan): LogicalPlan = {
val newPlan = plan transformDown {
case command: IgnoreCachedData => command
// Do not lookup the cache by hint node. Hint node is special, we should ignore it when
// canonicalizing plans, so that plans which are same except hint can hit the same cache.
// However, we also want to keep the hint info after cache lookup. Here we skip the hint
// node, so that the returned caching plan won't replace the hint node and drop the hint info
// from the original plan.
case hint: ResolvedHint => hint

case currentFragment =>
lookupCachedData(currentFragment)
.map(_.cachedRepresentation.withOutput(currentFragment.output))
.getOrElse(currentFragment)
lookupCachedData(currentFragment).map { cached =>
// After cache lookup, we should still keep the hints from the input plan.
Copy link
Member

@gatorsmile gatorsmile May 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the original cached plan has a hint, should we keep/respect them? We need to define a clear behavior in our cache manager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter, because

  1. as a cache key, the lookup relies on semanticEquals, so having the hint node in the plan has no effect.
  2. the cache lookup returns InMemoryRelation, which has no hint.

I think the behavior is pretty clear: for any query, the hint behavior should be the same no matter some sub-plans are cached or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, we ignore the hints that are specified in the original cached plans. If users want to use hints, they should specify them in the queries.

val hints = EliminateResolvedHint.extractHintsFromPlan(currentFragment)._2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extractHintsFromPlan(currentFragment)._2 was originally a private function. Asking the caller to call reverse is weird. We can add a new function in EliminateResolvedHint or even add a new object for Hint processing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's natural to return the hints in a top-down fashion. And the caller side is free to process the returned hints, including reverse it.

val cachedPlan = cached.cachedRepresentation.withOutput(currentFragment.output)
// The returned hint list is in top-down order, we should create the hint nodes from
// right to left.
hints.foldRight[LogicalPlan](cachedPlan) { case (hint, p) =>
ResolvedHint(p, hint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the same (semantically) as original cached plan?

We can take one example in added test: broadcast(spark.range(1000)).filter($"id" > 100). Originally, the plan broadcasted is spark.range(1000). After using cached data, seems cached spark.range(1000).filter($"id" > 100) is broadcasted by the hint, actually. It is slightly difference, but maybe in significant effect it might cause?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantic of a hint node is special. By design only join node has hints, so Hint(Filter(Relation)) is the same as Filter(Hint(Relation)), as they both indicate that the left/right sub-tree of a join node has a hint.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see. Makes sense and it's fine.

}
}.getOrElse(currentFragment)
}

newPlan transformAllExpressions {
Expand Down
56 changes: 41 additions & 15 deletions sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join}
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
import org.apache.spark.sql.execution.{RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
Expand Down Expand Up @@ -938,23 +938,49 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}
}

test("Cache should respect the broadcast hint") {
val df = broadcast(spark.range(1000)).cache()
val df2 = spark.range(1000).cache()
df.count()
df2.count()
test("Cache should respect the hint") {
def testHint(df: Dataset[_], expectedHint: JoinStrategyHint): Unit = {
val df2 = spark.range(2000).cache()
df2.count()

// Test the broadcast hint.
val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan
val hint = joinPlan.collect {
case Join(_, _, _, _, hint) => hint
def checkHintExists(): Unit = {
// Test the broadcast hint.
val joinPlan = df.join(df2, "id").queryExecution.optimizedPlan
val joinHints = joinPlan.collect {
case Join(_, _, _, _, hint) => hint
}
assert(joinHints.size == 1)
assert(joinHints(0).leftHint.get.strategy.contains(expectedHint))
assert(joinHints(0).rightHint.isEmpty)
}

// Make sure the hint does exist when `df` is not cached.
checkHintExists()

df.cache()
try {
df.count()
// Make sure the hint still exists when `df` is cached.
checkHintExists()
} finally {
// Clean-up
df.unpersist()
}
}
assert(hint.size == 1)
assert(hint(0).leftHint.get.strategy.contains(BROADCAST))
assert(hint(0).rightHint.isEmpty)

// Clean-up
df.unpersist()
// The hint is the root node
testHint(broadcast(spark.range(1000)), BROADCAST)
// The hint is under subquery alias
testHint(broadcast(spark.range(1000)).as("df"), BROADCAST)
// The hint is under filter
testHint(broadcast(spark.range(1000)).filter($"id" > 100), BROADCAST)
// If there are 2 adjacent hints, the top one takes effect.
testHint(
spark.range(1000)
.hint("SHUFFLE_MERGE")
.hint("SHUFFLE_HASH")
.as("df"),
SHUFFLE_HASH)
}

test("analyzes column statistics in cached query") {
Expand Down