Skip to content

Commit

Permalink
[CARBONDATA-4306] Fix Query Performance issue for Spark 3.1
Browse files Browse the repository at this point in the history
Why is this PR needed?
Currently, with Spark 3.1, some rules are applied many times resulting in performance degrade.

What changes were proposed in this PR?
Changed Rules apply strategy from Fixed to Once and CarbonOptimizer can directly extend SparkOptimizer avoiding applying same rules many times

This Closes #4229
  • Loading branch information
Indhumathi27 authored and kunal642 committed Oct 23, 2021
1 parent 305851e commit 8953cde
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private[sql] object CarbonSourceStrategy extends SparkStrategy {
var partitions : (Seq[CatalogTablePartition], Seq[PartitionSpec], Seq[Expression]) =
(null, null, Seq.empty)
var filterPredicates = allPredicates
if(names.nonEmpty) {
if(names.nonEmpty && partitionsFilter.nonEmpty) {
partitions = CarbonFilters.getCatalogTablePartitions(
partitionsFilter.filterNot(e => e.find(_.isInstanceOf[PlanExpression[_]]).isDefined),
SparkSession.getActiveSession.get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkOptimizer
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
Expand Down Expand Up @@ -238,33 +239,21 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
}

class CarbonOptimizer(session: SparkSession, optimizer: Optimizer) extends
Optimizer(session.sessionState.catalogManager) {
SparkOptimizer(session.sessionState.catalogManager,
session.sessionState.catalog,
session.sessionState.experimentalMethods) {

private lazy val mvRules = Seq(Batch("Materialized View Optimizers", Once,
Seq(new MVRewriteRule(session)): _*))

private lazy val iudRule = Batch("IUD Optimizers", fixedPoint,
private lazy val iudRule = Batch("IUD Optimizers", Once,
Seq(new CarbonIUDRule(), new CarbonUDFTransformRule(), new CarbonFileIndexReplaceRule()): _*)

private lazy val secondaryIndexRule = Batch("SI Optimizers", Once,
Seq(new CarbonSITransformationRule(session)): _*)

override def defaultBatches: Seq[Batch] = {
mvRules ++ convertedBatch() :+ iudRule :+ secondaryIndexRule
mvRules ++ super.defaultBatches :+ iudRule :+ secondaryIndexRule
}

def convertedBatch(): Seq[Batch] = {
optimizer.batches.map { batch =>
Batch(
batch.name,
batch.strategy match {
case optimizer.Once =>
Once
case _: optimizer.FixedPoint =>
fixedPoint
},
batch.rules: _*
)
}
}
}

0 comments on commit 8953cde

Please sign in to comment.