From 89546a84fed1b4634bf32e1e48d1fb6ea5134818 Mon Sep 17 00:00:00 2001 From: Vinitha Gankidi Date: Thu, 27 Jun 2019 16:45:22 -0700 Subject: [PATCH 1/2] SPARK-28188 Materialize Dataframe API --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 +++++ python/pyspark/rdd.py | 6 ++++++ python/pyspark/sql/dataframe.py | 8 ++++++++ .../src/main/scala/org/apache/spark/sql/Dataset.scala | 8 ++++++++ 4 files changed, 27 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1b67e9906457d..8424c32f7b004 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -209,6 +209,11 @@ abstract class RDD[T: ClassTag]( */ def cache(): this.type = persist() + def materialize(): RDD[T] = withScope{ + sc.runJob(this, Utils.getIteratorSize _) + this + } + /** * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. * diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index fa4609dc5ba12..e6f6165614184 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -326,6 +326,12 @@ def unpersist(self, blocking=False): self._jrdd.unpersist(blocking) return self + def materialize(self): + """ + Action to force materialization of this RDD. Returns a new RDD. + """ + return RDD(self._jrdd.rdd().materialize(), self.ctx, self._jrdd_deserializer) + def checkpoint(self): """ Mark this RDD for checkpointing. It will be saved to a file inside the diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 8b0e06d9fcab4..2b1f03b216cf8 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -602,6 +602,14 @@ def persist(self, storageLevel=StorageLevel.MEMORY_AND_DISK): self._jdf.persist(javaStorageLevel) return self + def materialize(self): + """ + Action to force materialization of this DF. + + Returns a new DF based on the underlying materialized RDD. + """ + return DataFrame(self._jdf.materialize(), self.sql_ctx) + @property @since(2.1) def storageLevel(self): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 45ec7dcb07a68..f17ba058baf0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2949,6 +2949,14 @@ class Dataset[T] private[sql]( */ def cache(): this.type = persist() + def materialize(): Dataset[T] = withAction("materialize", queryExecution) { _ => + withNewExecutionId { + var materializedRDD = queryExecution.toRdd.materialize().map( + exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer).fromRow) + sparkSession.createDataset(materializedRDD) + } + } + /** * Persist this Dataset with the given storage level. * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`, From 191972b279fbf467ab8dc2ce42bf613a9e51fa8b Mon Sep 17 00:00:00 2001 From: Vinitha Gankidi Date: Fri, 12 Jul 2019 12:00:40 -0700 Subject: [PATCH 2/2] Rename to noOpRun --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- python/pyspark/rdd.py | 6 +++--- python/pyspark/sql/dataframe.py | 8 +++----- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 6 +++--- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 8424c32f7b004..be74bf2c72e58 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -209,7 +209,7 @@ abstract class RDD[T: ClassTag]( */ def cache(): this.type = persist() - def materialize(): RDD[T] = withScope{ + def noOpRun(): RDD[T] = withScope{ sc.runJob(this, Utils.getIteratorSize _) this } diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index e6f6165614184..5f9e40c5d80d7 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -326,11 +326,11 @@ def unpersist(self, blocking=False): self._jrdd.unpersist(blocking) return self - def materialize(self): + def noOpRun(self): """ - Action to force materialization of this RDD. Returns a new RDD. + Action to run the job until this point. Returns a new RDD. """ - return RDD(self._jrdd.rdd().materialize(), self.ctx, self._jrdd_deserializer) + return RDD(self._jrdd.rdd().noOpRun(), self.ctx, self._jrdd_deserializer) def checkpoint(self): """ diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 2b1f03b216cf8..8efb71ecaae3a 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -602,13 +602,11 @@ def persist(self, storageLevel=StorageLevel.MEMORY_AND_DISK): self._jdf.persist(javaStorageLevel) return self - def materialize(self): + def noOpRun(self): """ - Action to force materialization of this DF. - - Returns a new DF based on the underlying materialized RDD. + Action to run the job until this point and return a new DF. """ - return DataFrame(self._jdf.materialize(), self.sql_ctx) + return DataFrame(self._jdf.noOpRun(), self.sql_ctx) @property @since(2.1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f17ba058baf0f..73a80b83bf2e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2949,11 +2949,11 @@ class Dataset[T] private[sql]( */ def cache(): this.type = persist() - def materialize(): Dataset[T] = withAction("materialize", queryExecution) { _ => + def noOpRun(): Dataset[T] = withAction("noOpRun", queryExecution) { _ => withNewExecutionId { - var materializedRDD = queryExecution.toRdd.materialize().map( + var resultRDD = queryExecution.toRdd.noOpRun().map( exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer).fromRow) - sparkSession.createDataset(materializedRDD) + sparkSession.createDataset(resultRDD) } }