@@ -33,7 +33,47 @@ object Optimizer extends RuleExecutor[LogicalPlan] {
3333 Batch (" Filter Pushdown" , Once ,
3434 CombineFilters ,
3535 PushPredicateThroughProject ,
36- PushPredicateThroughInnerJoin ) :: Nil
36+ PushPredicateThroughInnerJoin ,
37+ ColumnPruning ) :: Nil
38+ }
39+
40+ /**
41+ * Attempts to eliminate the reading of unneeded columns from the query plan using the following
42+ * transformations:
43+ *
44+ * - Inserting Projections beneath the following operators:
45+ * - Aggregate
46+ * - Project <- Join
47+ * - Collapse adjacent projections, performing alias substitution.
48+ */
49+ object ColumnPruning extends Rule [LogicalPlan ] {
50+ def apply (plan : LogicalPlan ): LogicalPlan = plan transform {
51+ case a @ Aggregate (_, _, child) if (child.outputSet -- a.references).nonEmpty =>
52+ a.copy(child = Project (a.references.toSeq, child))
53+
54+ case Project (projectList, Join (left, right, joinType, condition)) =>
55+ val allReferences : Set [Attribute ] =
56+ projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set .empty)
57+ def prunedChild (c : LogicalPlan ) =
58+ if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) {
59+ Project (allReferences.filter(c.outputSet.contains).toSeq, c)
60+ } else {
61+ c
62+ }
63+
64+ Project (projectList, Join (prunedChild(left), prunedChild(right), joinType, condition))
65+
66+ case Project (project1, Project (project2, child)) =>
67+ val aliasMap = project2.collect {
68+ case a @ Alias (e, _) => (a.toAttribute: Expression , a)
69+ }.toMap
70+ // TODO: Fix TransformBase.
71+ val substitutedProjection = project1.map(_.transform {
72+ case a if aliasMap.contains(a) => aliasMap(a)
73+ }).asInstanceOf [Seq [NamedExpression ]]
74+
75+ Project (substitutedProjection, child)
76+ }
3777}
3878
3979/**
0 commit comments