1717
1818package org .apache .spark .sql
1919
20- import org .apache .spark .sql .catalyst .ScalaReflection
21-
2220import scala .language .implicitConversions
2321import scala .reflect .ClassTag
2422
2523import com .fasterxml .jackson .core .JsonFactory
2624
2725import org .apache .spark .rdd .RDD
2826import org .apache .spark .storage .StorageLevel
27+ import org .apache .spark .sql .catalyst .ScalaReflection
2928import org .apache .spark .sql .catalyst .expressions ._
3029import org .apache .spark .sql .catalyst .expressions .{Literal => LiteralExpr }
3130import org .apache .spark .sql .catalyst .plans .{JoinType , Inner }
3231import org .apache .spark .sql .catalyst .plans .logical ._
32+ import org .apache .spark .sql .execution .LogicalRDD
3333import org .apache .spark .sql .json .JsonRDD
3434import org .apache .spark .sql .types .{NumericType , StructType }
3535
3636
3737class DataFrame (
3838 val sqlContext : SQLContext ,
39- val logicalPlan : LogicalPlan ,
39+ val baseLogicalPlan : LogicalPlan ,
4040 operatorsEnabled : Boolean )
4141 extends DataFrameSpecificApi with RDDApi [Row ] {
4242
@@ -46,7 +46,16 @@ class DataFrame(
4646 def this (sqlContext : SQLContext , plan : LogicalPlan ) = this (sqlContext, plan, true )
4747
4848 @ transient
49- protected [sql] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
49+ protected [sql] lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
50+
51+ @ transient protected [sql] val logicalPlan : LogicalPlan = baseLogicalPlan match {
52+ // For various commands (like DDL) and queries with side effects, we force query optimization to
53+ // happen right away to let these side effects take place eagerly.
54+ case _ : Command | _ : InsertIntoTable | _ : CreateTableAsSelect [_] | _ : WriteToFile =>
55+ LogicalRDD (queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
56+ case _ =>
57+ baseLogicalPlan
58+ }
5059
5160 private [this ] implicit def toDataFrame (logicalPlan : LogicalPlan ): DataFrame = {
5261 new DataFrame (sqlContext, logicalPlan, true )
@@ -123,8 +132,8 @@ class DataFrame(
123132 override def as (name : String ): DataFrame = Subquery (name, logicalPlan)
124133
125134 @ scala.annotation.varargs
126- override def select (cols : Column * ): DataFrame = {
127- val exprs = cols.zipWithIndex.map {
135+ override def select (col : Column , cols : Column * ): DataFrame = {
136+ val exprs = (col +: cols) .zipWithIndex.map {
128137 case (Column (expr : NamedExpression ), _) =>
129138 expr
130139 case (Column (expr : Expression ), _) =>
0 commit comments