Skip to content

Commit

Permalink
Fix Query Performance issue for Spark 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Indhumathi27 committed Oct 20, 2021
1 parent b8d9a97 commit 97d686b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private[sql] object CarbonSourceStrategy extends SparkStrategy {
var partitions : (Seq[CatalogTablePartition], Seq[PartitionSpec], Seq[Expression]) =
(null, null, Seq.empty)
var filterPredicates = allPredicates
if(names.nonEmpty) {
if(names.nonEmpty && partitionsFilter.nonEmpty) {
partitions = CarbonFilters.getCatalogTablePartitions(
partitionsFilter.filterNot(e => e.find(_.isInstanceOf[PlanExpression[_]]).isDefined),
SparkSession.getActiveSession.get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkOptimizer
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
Expand Down Expand Up @@ -238,33 +239,21 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
}

class CarbonOptimizer(session: SparkSession, optimizer: Optimizer) extends
Optimizer(session.sessionState.catalogManager) {
SparkOptimizer(session.sessionState.catalogManager,
session.sessionState.catalog,
session.sessionState.experimentalMethods) {

private lazy val mvRules = Seq(Batch("Materialized View Optimizers", Once,
Seq(new MVRewriteRule(session)): _*))

private lazy val iudRule = Batch("IUD Optimizers", fixedPoint,
private lazy val iudRule = Batch("IUD Optimizers", Once,
Seq(new CarbonIUDRule(), new CarbonUDFTransformRule(), new CarbonFileIndexReplaceRule()): _*)

private lazy val secondaryIndexRule = Batch("SI Optimizers", Once,
Seq(new CarbonSITransformationRule(session)): _*)

override def defaultBatches: Seq[Batch] = {
mvRules ++ convertedBatch() :+ iudRule :+ secondaryIndexRule
mvRules ++ super.defaultBatches :+ iudRule :+ secondaryIndexRule
}

def convertedBatch(): Seq[Batch] = {
optimizer.batches.map { batch =>
Batch(
batch.name,
batch.strategy match {
case optimizer.Once =>
Once
case _: optimizer.FixedPoint =>
fixedPoint
},
batch.rules: _*
)
}
}
}

0 comments on commit 97d686b

Please sign in to comment.