From fbfd3c51491b4b2e8f4b7b1a4cf7295f1951e9bd Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Wed, 29 Apr 2015 01:46:59 -0700 Subject: [PATCH 1/4] make the DataFrame.queryExecution mutable for cache/persist/unpersist --- .../org/apache/spark/sql/DataFrame.scala | 7 ++++--- .../org/apache/spark/sql/SQLQuerySuite.scala | 21 +++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 034d88790197..eb3ae111a68f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -115,7 +115,7 @@ private[sql] object DataFrame { @Experimental class DataFrame private[sql]( @transient val sqlContext: SQLContext, - @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) + @DeveloperApi @transient var queryExecution: SQLContext#QueryExecution) extends RDDApi[Row] with Serializable { /** @@ -1302,8 +1302,7 @@ class DataFrame private[sql]( * @since 1.3.0 */ override def persist(): this.type = { - sqlContext.cacheManager.cacheQuery(this) - this + persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK) } /** @@ -1318,6 +1317,7 @@ class DataFrame private[sql]( */ override def persist(newLevel: StorageLevel): this.type = { sqlContext.cacheManager.cacheQuery(this, None, newLevel) + this.queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical) this } @@ -1327,6 +1327,7 @@ class DataFrame private[sql]( */ override def unpersist(blocking: Boolean): this.type = { sqlContext.cacheManager.tryUncacheQuery(this, blocking) + this.queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical) this } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 63f7d314fb69..e308bc241529 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -117,6 +117,27 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { ) } + test("SPARK-7158 collect and take return different results") { + import java.util.UUID + import org.apache.spark.sql.types._ + + val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index") + def id:() => String = () => { UUID.randomUUID().toString() } + + // Expect the ID to have materialized at this point + val dfWithId = df.withColumn("id", callUDF(id, StringType)) + val cached = dfWithId.cache() + val d0 = dfWithId.collect() + val d1 = cached.collect() + val d2 = cached.collect() + + assert(d0.map(_(0)) === d2.map(_(0))) + assert(d0.map(_(1)) === d2.map(_(1))) + + assert(d1.map(_(0)) === d2.map(_(0))) + assert(d1.map(_(1)) === d2.map(_(1))) + } + test("grouping on nested fields") { read.json(sparkContext.parallelize("""{"nested": {"attribute": 1}, "value": 2}""" :: Nil)) .registerTempTable("rows") From a5647d9d61b16d56a77d19113b71ecaafaabadec Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Sat, 16 May 2015 00:37:36 +0800 Subject: [PATCH 2/4] hide the queryExecution of DataFrame --- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 8 +++++--- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 ++++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index eb3ae111a68f..fdadda825a3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -115,7 +115,7 @@ private[sql] object DataFrame { @Experimental class DataFrame private[sql]( @transient val sqlContext: SQLContext, - @DeveloperApi @transient var queryExecution: SQLContext#QueryExecution) + @DeveloperApi @transient private var _queryExecution: SQLContext#QueryExecution) extends RDDApi[Row] with Serializable { /** @@ -134,6 +134,8 @@ class DataFrame private[sql]( }) } + @DeveloperApi def queryExecution: SQLContext#QueryExecution = _queryExecution + @transient protected[sql] val logicalPlan: LogicalPlan = queryExecution.logical match { // For various commands (like DDL) and queries with side effects, we force query optimization to // happen right away to let these side effects take place eagerly. @@ -1317,7 +1319,7 @@ class DataFrame private[sql]( */ override def persist(newLevel: StorageLevel): this.type = { sqlContext.cacheManager.cacheQuery(this, None, newLevel) - this.queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical) + this._queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical) this } @@ -1327,7 +1329,7 @@ class DataFrame private[sql]( */ override def unpersist(blocking: Boolean): this.type = { sqlContext.cacheManager.tryUncacheQuery(this, blocking) - this.queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical) + this._queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical) this } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e308bc241529..b7d2f54707aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -122,15 +122,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { import org.apache.spark.sql.types._ val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index") + // we except the id is materialized once def id:() => String = () => { UUID.randomUUID().toString() } - // Expect the ID to have materialized at this point val dfWithId = df.withColumn("id", callUDF(id, StringType)) + // Make a new DataFrame (actually the same reference to the old one) val cached = dfWithId.cache() + // Trigger the cache val d0 = dfWithId.collect() val d1 = cached.collect() val d2 = cached.collect() + // Since the ID is only materialized once, then all of the records + // should come from the cache, not by re-computing. Otherwise, the ID + // will be different assert(d0.map(_(0)) === d2.map(_(0))) assert(d0.map(_(1)) === d2.map(_(1))) From 2bf740f6ada4aa61ecc84befc8c915d1e47e1416 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 1 Jun 2015 22:14:03 +0800 Subject: [PATCH 3/4] create new QueryExecution instance for CacheManager --- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 9 +++------ .../org/apache/spark/sql/execution/CacheManager.scala | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index fdadda825a3c..034d88790197 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -115,7 +115,7 @@ private[sql] object DataFrame { @Experimental class DataFrame private[sql]( @transient val sqlContext: SQLContext, - @DeveloperApi @transient private var _queryExecution: SQLContext#QueryExecution) + @DeveloperApi @transient val queryExecution: SQLContext#QueryExecution) extends RDDApi[Row] with Serializable { /** @@ -134,8 +134,6 @@ class DataFrame private[sql]( }) } - @DeveloperApi def queryExecution: SQLContext#QueryExecution = _queryExecution - @transient protected[sql] val logicalPlan: LogicalPlan = queryExecution.logical match { // For various commands (like DDL) and queries with side effects, we force query optimization to // happen right away to let these side effects take place eagerly. @@ -1304,7 +1302,8 @@ class DataFrame private[sql]( * @since 1.3.0 */ override def persist(): this.type = { - persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK) + sqlContext.cacheManager.cacheQuery(this) + this } /** @@ -1319,7 +1318,6 @@ class DataFrame private[sql]( */ override def persist(newLevel: StorageLevel): this.type = { sqlContext.cacheManager.cacheQuery(this, None, newLevel) - this._queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical) this } @@ -1329,7 +1327,6 @@ class DataFrame private[sql]( */ override def unpersist(blocking: Boolean): this.type = { sqlContext.cacheManager.tryUncacheQuery(this, blocking) - this._queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical) this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 5fcc48a67948..a4b38d364d54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -103,7 +103,7 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging { sqlContext.conf.useCompression, sqlContext.conf.columnBatchSize, storageLevel, - query.queryExecution.executedPlan, + sqlContext.executePlan(query.logicalPlan).executedPlan, tableName)) } } From 58ea8aab465bf63d6fdae1de62857c12fc128b4c Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 1 Jun 2015 07:51:50 -0700 Subject: [PATCH 4/4] style issue --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b7d2f54707aa..76b1713b8a2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -123,7 +123,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index") // we except the id is materialized once - def id:() => String = () => { UUID.randomUUID().toString() } + def id: () => String = () => { UUID.randomUUID().toString() } val dfWithId = df.withColumn("id", callUDF(id, StringType)) // Make a new DataFrame (actually the same reference to the old one)