Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,32 @@ package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.analysis._

private[spark] trait CatalystConf {
/**
* Interface for configuration options used in the catalyst module.
*/
trait CatalystConf {
def caseSensitiveAnalysis: Boolean

def orderByOrdinal: Boolean
def groupByOrdinal: Boolean

def optimizerMaxIterations: Int

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
*/
def resolver: Resolver = {
if (caseSensitiveAnalysis) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution
}
}

/**
* A trivial conf that is empty. Used for testing when all
* relations are already filled in and the analyser needs only to resolve attribute references.
*/
object EmptyConf extends CatalystConf {
override def caseSensitiveAnalysis: Boolean = {
throw new UnsupportedOperationException
}
override def orderByOrdinal: Boolean = {
throw new UnsupportedOperationException
}
override def groupByOrdinal: Boolean = {
throw new UnsupportedOperationException
}
}

/** A CatalystConf that can be used for local testing. */
case class SimpleCatalystConf(
caseSensitiveAnalysis: Boolean,
orderByOrdinal: Boolean = true,
groupByOrdinal: Boolean = true)

groupByOrdinal: Boolean = true,
optimizerMaxIterations: Int = 100)
extends CatalystConf {
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ import org.apache.spark.sql.types._
* Used for testing when all relations are already filled in and the analyzer needs only
* to resolve attribute references.
*/
object SimpleAnalyzer
extends SimpleAnalyzer(
EmptyFunctionRegistry,
object SimpleAnalyzer extends Analyzer(
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
new SimpleCatalystConf(caseSensitiveAnalysis = true)),
new SimpleCatalystConf(caseSensitiveAnalysis = true))

class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
extends Analyzer(new SessionCatalog(new InMemoryCatalog, functionRegistry, conf), conf)

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a
Expand All @@ -55,9 +54,13 @@ class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
class Analyzer(
catalog: SessionCatalog,
conf: CatalystConf,
maxIterations: Int = 100)
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {

def this(catalog: SessionCatalog, conf: CatalystConf) = {
this(catalog, conf, conf.optimizerMaxIterations)
}

def resolver: Resolver = {
if (conf.caseSensitiveAnalysis) {
caseSensitiveResolution
Expand All @@ -66,7 +69,7 @@ class Analyzer(
}
}

val fixedPoint = FixedPoint(maxIterations)
protected val fixedPoint = FixedPoint(maxIterations)

/**
* Override to provide additional rules for the "Resolution" batch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import scala.collection.immutable.HashSet

import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf}
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -36,9 +36,11 @@ import org.apache.spark.sql.types._
* Abstract class all optimizers should inherit of, contains the standard batches (extending
* Optimizers can override this.
*/
abstract class Optimizer(
conf: CatalystConf,
sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] {
abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
extends RuleExecutor[LogicalPlan] {

protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)

def batches: Seq[Batch] = {
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
Expand All @@ -59,12 +61,12 @@ abstract class Optimizer(
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
Batch("Replace Operators", FixedPoint(100),
Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", FixedPoint(100),
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions) ::
Batch("Operator Optimizations", FixedPoint(100),
Batch("Operator Optimizations", fixedPoint,
// Operator push down
SetOperationPushDown,
SamplePushDown,
Expand Down Expand Up @@ -95,11 +97,11 @@ abstract class Optimizer(
SimplifyCasts,
SimplifyCaseConversionExpressions,
EliminateSerialization) ::
Batch("Decimal Optimizations", FixedPoint(100),
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) ::
Batch("Typed Filter Optimization", FixedPoint(100),
Batch("Typed Filter Optimization", fixedPoint,
EmbedSerializerInFilter) ::
Batch("LocalRelation", FixedPoint(100),
Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation) ::
Batch("Subquery", Once,
OptimizeSubqueries) :: Nil
Expand All @@ -117,15 +119,19 @@ abstract class Optimizer(
}

/**
* Non-abstract representation of the standard Spark optimizing strategies
* An optimizer used in test code.
*
* To ensure extendability, we leave the standard rules in the abstract optimizer rules, while
* specific rules go to the subclasses
*/
object DefaultOptimizer
extends Optimizer(
EmptyConf,
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf))
object SimpleTestOptimizer extends SimpleTestOptimizer

class SimpleTestOptimizer extends Optimizer(
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
new SimpleCatalystConf(caseSensitiveAnalysis = true)),
new SimpleCatalystConf(caseSensitiveAnalysis = true))

/**
* Pushes operations down into a Sample.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -153,7 +153,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
expected: Any,
inputRow: InternalRow = EmptyRow): Unit = {
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
val optimizedPlan = DefaultOptimizer.execute(plan)
val optimizedPlan = SimpleTestOptimizer.execute(plan)
checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -151,7 +151,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
expression: Expression,
inputRow: InternalRow = EmptyRow): Unit = {
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
val optimizedPlan = DefaultOptimizer.execute(plan)
val optimizedPlan = SimpleTestOptimizer.execute(plan)
checkNaNWithoutCodegen(optimizedPlan.expressions.head, inputRow)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
* limitations under the License.
*/

package org.apache.spark.sql.catalyst
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.EmptyFunctionRegistry
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

Expand All @@ -40,10 +37,7 @@ class OptimizerExtendableSuite extends SparkFunSuite {
* This class represents a dummy extended optimizer that takes the batches of the
* Optimizer and adds custom ones.
*/
class ExtendedOptimizer
extends Optimizer(
EmptyConf,
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf)) {
class ExtendedOptimizer extends SimpleTestOptimizer {

// rules set to DummyRule, would not be executed anyways
val myBatches: Seq[Batch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package org.apache.spark.sql.execution

import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.internal.SQLConf

class SparkOptimizer(
conf: CatalystConf,
sessionCatalog: SessionCatalog,
experimentalMethods: ExperimentalMethods) extends Optimizer(conf, sessionCatalog) {
catalog: SessionCatalog,
conf: SQLConf,
experimentalMethods: ExperimentalMethods)
extends Optimizer(catalog, conf) {

override def batches: Seq[Batch] = super.batches :+ Batch(
"User Provided Optimizers", FixedPoint(100), experimentalMethods.extraOptimizations: _*)
"User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ object SQLConf {

}

val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
.doc("The max number of iterations the optimizer and analyzer runs")
.intConf
.createWithDefault(100)

val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
.doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
Expand Down Expand Up @@ -473,6 +478,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

/** ************************ Spark SQL Params/Hints ******************* */

def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)

def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)

def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* Logical query plan optimizer.
*/
lazy val optimizer: Optimizer = new SparkOptimizer(conf, catalog, experimentalMethods)
lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)

/**
* Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
Expand Down