Skip to content

Commit

Permalink
2.3 and 2.4 changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Indhumathi27 committed Oct 20, 2021
1 parent 0c86788 commit 8e3f955
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ trait SparkVersionAdapter {
}

def getCarbonOptimizer(session : SparkSession, sessionState: SessionState) : CarbonOptimizer = {
new CarbonOptimizer(session, sessionState.catalog, sessionState.optimizer)
new CarbonOptimizer(session, sessionState.catalog)
}

def isCharType(dataType: DataType): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexRepl
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, ExprId, NamedExpression, ScalaUDF, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.execution.SparkOptimizer
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule
Expand Down Expand Up @@ -195,34 +195,20 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {

class CarbonOptimizer(
session: SparkSession,
catalog: SessionCatalog,
optimizer: Optimizer) extends Optimizer(catalog) {
catalog: SessionCatalog)
extends SparkOptimizer(catalog, session.sessionState.experimentalMethods) {

private lazy val mvRules = Seq(Batch("Materialized View Optimizers", Once,
Seq(new MVRewriteRule(session)): _*))

private lazy val iudRule = Batch("IUD Optimizers", fixedPoint,
private lazy val iudRule = Batch("IUD Optimizers", Once,
Seq(new CarbonIUDRule(), new CarbonUDFTransformRule(), new CarbonFileIndexReplaceRule()): _*)

private lazy val secondaryIndexRule = Batch("SI Optimizers", Once,
Seq(new CarbonSITransformationRule(session)): _*)

override def batches: Seq[Batch] = {
mvRules ++ convertedBatch() :+ iudRule :+ secondaryIndexRule
}

def convertedBatch(): Seq[Batch] = {
optimizer.batches.map { batch =>
Batch(
batch.name,
batch.strategy match {
case optimizer.Once =>
Once
case _: optimizer.FixedPoint =>
fixedPoint
},
batch.rules: _*
)
}
mvRules ++ super.batches :+ iudRule :+ secondaryIndexRule
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCata
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.execution.SparkOptimizer
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule
Expand Down Expand Up @@ -237,34 +237,20 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {

class CarbonOptimizer(
session: SparkSession,
catalog: SessionCatalog,
optimizer: Optimizer) extends Optimizer(catalog) {
catalog: SessionCatalog)
extends SparkOptimizer(catalog, session.sessionState.experimentalMethods) {

private lazy val mvRules = Seq(Batch("Materialized View Optimizers", Once,
Seq(new MVRewriteRule(session)): _*))

private lazy val iudRule = Batch("IUD Optimizers", fixedPoint,
private lazy val iudRule = Batch("IUD Optimizers", Once,
Seq(new CarbonIUDRule(), new CarbonUDFTransformRule(), new CarbonFileIndexReplaceRule()): _*)

private lazy val secondaryIndexRule = Batch("SI Optimizers", Once,
Seq(new CarbonSITransformationRule(session)): _*)

override def defaultBatches: Seq[Batch] = {
mvRules ++ convertedBatch() :+ iudRule :+ secondaryIndexRule
mvRules ++ super.defaultBatches :+ iudRule :+ secondaryIndexRule
}

def convertedBatch(): Seq[Batch] = {
optimizer.batches.map { batch =>
Batch(
batch.name,
batch.strategy match {
case optimizer.Once =>
Once
case _: optimizer.FixedPoint =>
fixedPoint
},
batch.rules: _*
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, DynamicPruningSubquery, Expression, ExprId, NamedExpression, Predicate, ScalaUDF, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReference
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkOptimizer
Expand Down Expand Up @@ -238,9 +237,8 @@ object CarbonToSparkAdapter extends SparkVersionAdapter {
}
}

class CarbonOptimizer(session: SparkSession, optimizer: Optimizer) extends
SparkOptimizer(session.sessionState.catalogManager,
session.sessionState.catalog,
class CarbonOptimizer(session: SparkSession, catalog: SessionCatalog) extends
SparkOptimizer(session.sessionState.catalogManager, catalog,
session.sessionState.experimentalMethods) {

private lazy val mvRules = Seq(Batch("Materialized View Optimizers", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ trait SparkVersionAdapter {

def getCarbonOptimizer(session: SparkSession,
sessionState: SessionState): CarbonOptimizer = {
new CarbonOptimizer(session, sessionState.optimizer)
new CarbonOptimizer(session, sessionState.catalog)
}

def isCharType(dataType: DataType): Boolean = {
Expand Down

0 comments on commit 8e3f955

Please sign in to comment.