Skip to content

Commit 0139855

Browse files
committed
[SPARK-14677][SQL] Make the max number of iterations configurable for Catalyst
1 parent 8028a28 commit 0139855

File tree

9 files changed

+66
-51
lines changed

9 files changed

+66
-51
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,46 +19,32 @@ package org.apache.spark.sql.catalyst
1919

2020
import org.apache.spark.sql.catalyst.analysis._
2121

22-
private[spark] trait CatalystConf {
22+
/**
23+
* Interface for configuration options used in the catalyst module.
24+
*/
25+
trait CatalystConf {
2326
def caseSensitiveAnalysis: Boolean
2427

2528
def orderByOrdinal: Boolean
2629
def groupByOrdinal: Boolean
2730

31+
def optimizerMaxIterations: Int
32+
2833
/**
2934
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
3035
* identifiers are equal.
3136
*/
3237
def resolver: Resolver = {
33-
if (caseSensitiveAnalysis) {
34-
caseSensitiveResolution
35-
} else {
36-
caseInsensitiveResolution
37-
}
38+
if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution
3839
}
3940
}
4041

41-
/**
42-
* A trivial conf that is empty. Used for testing when all
43-
* relations are already filled in and the analyser needs only to resolve attribute references.
44-
*/
45-
object EmptyConf extends CatalystConf {
46-
override def caseSensitiveAnalysis: Boolean = {
47-
throw new UnsupportedOperationException
48-
}
49-
override def orderByOrdinal: Boolean = {
50-
throw new UnsupportedOperationException
51-
}
52-
override def groupByOrdinal: Boolean = {
53-
throw new UnsupportedOperationException
54-
}
55-
}
5642

5743
/** A CatalystConf that can be used for local testing. */
5844
case class SimpleCatalystConf(
5945
caseSensitiveAnalysis: Boolean,
6046
orderByOrdinal: Boolean = true,
61-
groupByOrdinal: Boolean = true)
62-
47+
groupByOrdinal: Boolean = true,
48+
optimizerMaxIterations: Int = 100)
6349
extends CatalystConf {
6450
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,13 @@ import org.apache.spark.sql.types._
3939
* Used for testing when all relations are already filled in and the analyzer needs only
4040
* to resolve attribute references.
4141
*/
42-
object SimpleAnalyzer
43-
extends SimpleAnalyzer(
44-
EmptyFunctionRegistry,
42+
object SimpleAnalyzer extends Analyzer(
43+
new SessionCatalog(
44+
new InMemoryCatalog,
45+
EmptyFunctionRegistry,
46+
new SimpleCatalystConf(caseSensitiveAnalysis = true)),
4547
new SimpleCatalystConf(caseSensitiveAnalysis = true))
4648

47-
class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
48-
extends Analyzer(new SessionCatalog(new InMemoryCatalog, functionRegistry, conf), conf)
49-
5049
/**
5150
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
5251
* [[UnresolvedRelation]]s into fully typed objects using information in a
@@ -55,9 +54,13 @@ class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
5554
class Analyzer(
5655
catalog: SessionCatalog,
5756
conf: CatalystConf,
58-
maxIterations: Int = 100)
57+
maxIterations: Int)
5958
extends RuleExecutor[LogicalPlan] with CheckAnalysis {
6059

60+
def this(catalog: SessionCatalog, conf: CatalystConf) = {
61+
this(catalog, conf, conf.optimizerMaxIterations)
62+
}
63+
6164
def resolver: Resolver = {
6265
if (conf.caseSensitiveAnalysis) {
6366
caseSensitiveResolution
@@ -66,7 +69,7 @@ class Analyzer(
6669
}
6770
}
6871

69-
val fixedPoint = FixedPoint(maxIterations)
72+
private val fixedPoint = FixedPoint(maxIterations)
7073

7174
/**
7275
* Override to provide additional rules for the "Resolution" batch.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.spark.sql.catalyst.optimizer
1919

20+
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
21+
2022
import scala.annotation.tailrec
2123
import scala.collection.immutable.HashSet
22-
23-
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases}
24+
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry}
25+
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
2426
import org.apache.spark.sql.catalyst.expressions._
2527
import org.apache.spark.sql.catalyst.expressions.aggregate._
2628
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
@@ -34,7 +36,11 @@ import org.apache.spark.sql.types._
3436
* Abstract class all optimizers should inherit of, contains the standard batches (extending
3537
* Optimizers can override this.
3638
*/
37-
abstract class Optimizer extends RuleExecutor[LogicalPlan] {
39+
abstract class Optimizer(catalog: SessionCatalog, conf: CatalystConf)
40+
extends RuleExecutor[LogicalPlan] {
41+
42+
private val fixedPoint = FixedPoint(conf.optimizerMaxIterations)
43+
3844
def batches: Seq[Batch] = {
3945
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
4046
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
@@ -54,12 +60,12 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
5460
// since the other rules might make two separate Unions operators adjacent.
5561
Batch("Union", Once,
5662
CombineUnions) ::
57-
Batch("Replace Operators", FixedPoint(100),
63+
Batch("Replace Operators", fixedPoint,
5864
ReplaceIntersectWithSemiJoin,
5965
ReplaceDistinctWithAggregate) ::
60-
Batch("Aggregate", FixedPoint(100),
66+
Batch("Aggregate", fixedPoint,
6167
RemoveLiteralFromGroupExpressions) ::
62-
Batch("Operator Optimizations", FixedPoint(100),
68+
Batch("Operator Optimizations", fixedPoint,
6369
// Operator push down
6470
SetOperationPushDown,
6571
SamplePushDown,
@@ -90,11 +96,11 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
9096
SimplifyCasts,
9197
SimplifyCaseConversionExpressions,
9298
EliminateSerialization) ::
93-
Batch("Decimal Optimizations", FixedPoint(100),
99+
Batch("Decimal Optimizations", fixedPoint,
94100
DecimalAggregates) ::
95-
Batch("Typed Filter Optimization", FixedPoint(100),
101+
Batch("Typed Filter Optimization", fixedPoint,
96102
EmbedSerializerInFilter) ::
97-
Batch("LocalRelation", FixedPoint(100),
103+
Batch("LocalRelation", fixedPoint,
98104
ConvertToLocalRelation) ::
99105
Batch("Subquery", Once,
100106
OptimizeSubqueries) :: Nil
@@ -112,12 +118,19 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
112118
}
113119

114120
/**
115-
* Non-abstract representation of the standard Spark optimizing strategies
121+
* An optimizer used in test code.
116122
*
117123
* To ensure extendability, we leave the standard rules in the abstract optimizer rules, while
118124
* specific rules go to the subclasses
119125
*/
120-
object DefaultOptimizer extends Optimizer
126+
object SimpleTestOptimizer extends SimpleTestOptimizer
127+
128+
class SimpleTestOptimizer extends Optimizer(
129+
new SessionCatalog(
130+
new InMemoryCatalog,
131+
EmptyFunctionRegistry,
132+
new SimpleCatalystConf(caseSensitiveAnalysis = true)),
133+
new SimpleCatalystConf(caseSensitiveAnalysis = true))
121134

122135
/**
123136
* Pushes operations down into a Sample.

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks
2424
import org.apache.spark.SparkFunSuite
2525
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
2626
import org.apache.spark.sql.catalyst.expressions.codegen._
27-
import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
27+
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
2828
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
2929
import org.apache.spark.sql.types.DataType
3030
import org.apache.spark.util.Utils
@@ -153,7 +153,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
153153
expected: Any,
154154
inputRow: InternalRow = EmptyRow): Unit = {
155155
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
156-
val optimizedPlan = DefaultOptimizer.execute(plan)
156+
val optimizedPlan = SimpleTestOptimizer.execute(plan)
157157
checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow)
158158
}
159159

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.SparkFunSuite
2525
import org.apache.spark.sql.catalyst.InternalRow
2626
import org.apache.spark.sql.catalyst.dsl.expressions._
2727
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
28-
import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
28+
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
2929
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
3030
import org.apache.spark.sql.types._
3131

@@ -151,7 +151,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
151151
expression: Expression,
152152
inputRow: InternalRow = EmptyRow): Unit = {
153153
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
154-
val optimizedPlan = DefaultOptimizer.execute(plan)
154+
val optimizedPlan = SimpleTestOptimizer.execute(plan)
155155
checkNaNWithoutCodegen(optimizedPlan.expressions.head, inputRow)
156156
}
157157

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.catalyst
18+
package org.apache.spark.sql.catalyst.optimizer
1919

2020
import org.apache.spark.SparkFunSuite
21-
import org.apache.spark.sql.catalyst.optimizer.Optimizer
2221
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2322
import org.apache.spark.sql.catalyst.rules.Rule
2423

@@ -38,7 +37,7 @@ class OptimizerExtendableSuite extends SparkFunSuite {
3837
* This class represents a dummy extended optimizer that takes the batches of the
3938
* Optimizer and adds custom ones.
4039
*/
41-
class ExtendedOptimizer extends Optimizer {
40+
class ExtendedOptimizer extends SimpleTestOptimizer {
4241

4342
// rules set to DummyRule, would not be executed anyways
4443
val myBatches: Seq[Batch] = {

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,16 @@
1818
package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.sql.ExperimentalMethods
21+
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
2122
import org.apache.spark.sql.catalyst.optimizer.Optimizer
23+
import org.apache.spark.sql.internal.SQLConf
24+
25+
class SparkOptimizer(
26+
catalog: SessionCatalog,
27+
conf: SQLConf,
28+
experimentalMethods: ExperimentalMethods)
29+
extends Optimizer(catalog, conf) {
2230

23-
class SparkOptimizer(experimentalMethods: ExperimentalMethods) extends Optimizer {
2431
override def batches: Seq[Batch] = super.batches :+ Batch(
2532
"User Provided Optimizers", FixedPoint(100), experimentalMethods.extraOptimizations: _*)
2633
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ object SQLConf {
5151

5252
}
5353

54+
val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
55+
.doc("The max number of iterations the optimizer and analyzer runs")
56+
.intConf
57+
.createWithDefault(100)
58+
5459
val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
5560
.doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
5661
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
@@ -473,6 +478,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
473478

474479
/** ************************ Spark SQL Params/Hints ******************* */
475480

481+
def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)
482+
476483
def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)
477484

478485
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)

sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ private[sql] class SessionState(ctx: SQLContext) {
8080
/**
8181
* Logical query plan optimizer.
8282
*/
83-
lazy val optimizer: Optimizer = new SparkOptimizer(experimentalMethods)
83+
lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)
8484

8585
/**
8686
* Parser that extracts expressions, plans, table identifiers etc. from SQL texts.

0 commit comments

Comments
 (0)