Skip to content

Commit

Permalink
Fix Query Performance issue for Spark 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Indhumathi27 committed Oct 20, 2021
1 parent b8d9a97 commit f65ccdb
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private[sql] object CarbonSourceStrategy extends SparkStrategy {
var partitions : (Seq[CatalogTablePartition], Seq[PartitionSpec], Seq[Expression]) =
(null, null, Seq.empty)
var filterPredicates = allPredicates
if(names.nonEmpty) {
if(names.nonEmpty && partitionsFilter.nonEmpty) {
partitions = CarbonFilters.getCatalogTablePartitions(
partitionsFilter.filterNot(e => e.find(_.isInstanceOf[PlanExpression[_]]).isDefined),
SparkSession.getActiveSession.get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, DynamicPruningSubquery, Expression, ExprId, NamedExpression, Predicate, ScalaUDF, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkOptimizer
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
Expand Down Expand Up @@ -237,34 +237,21 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
}
}

class CarbonOptimizer(session: SparkSession, optimizer: Optimizer) extends
Optimizer(session.sessionState.catalogManager) {
class CarbonOptimizer(session: SparkSession, catalog: SessionCatalog) extends
SparkOptimizer(session.sessionState.catalogManager, catalog,
session.sessionState.experimentalMethods) {

private lazy val mvRules = Seq(Batch("Materialized View Optimizers", Once,
Seq(new MVRewriteRule(session)): _*))

private lazy val iudRule = Batch("IUD Optimizers", fixedPoint,
private lazy val iudRule = Batch("IUD Optimizers", Once,
Seq(new CarbonIUDRule(), new CarbonUDFTransformRule(), new CarbonFileIndexReplaceRule()): _*)

private lazy val secondaryIndexRule = Batch("SI Optimizers", Once,
Seq(new CarbonSITransformationRule(session)): _*)

override def defaultBatches: Seq[Batch] = {
mvRules ++ convertedBatch() :+ iudRule :+ secondaryIndexRule
mvRules ++ super.defaultBatches :+ iudRule :+ secondaryIndexRule
}

def convertedBatch(): Seq[Batch] = {
optimizer.batches.map { batch =>
Batch(
batch.name,
batch.strategy match {
case optimizer.Once =>
Once
case _: optimizer.FixedPoint =>
fixedPoint
},
batch.rules: _*
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ trait SparkVersionAdapter {

def getCarbonOptimizer(session: SparkSession,
sessionState: SessionState): CarbonOptimizer = {
new CarbonOptimizer(session, sessionState.optimizer)
new CarbonOptimizer(session, sessionState.catalog)
}

def isCharType(dataType: DataType): Boolean = {
Expand Down

0 comments on commit f65ccdb

Please sign in to comment.