diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1b67e9906457d..be74bf2c72e58 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -209,6 +209,11 @@ abstract class RDD[T: ClassTag]( */ def cache(): this.type = persist() + def noOpRun(): RDD[T] = withScope{ + sc.runJob(this, Utils.getIteratorSize _) + this + } + /** * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. * diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index fa4609dc5ba12..5f9e40c5d80d7 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -326,6 +326,12 @@ def unpersist(self, blocking=False): self._jrdd.unpersist(blocking) return self + def noOpRun(self): + """ + Action to run the job until this point. Returns a new RDD. + """ + return RDD(self._jrdd.rdd().noOpRun(), self.ctx, self._jrdd_deserializer) + def checkpoint(self): """ Mark this RDD for checkpointing. It will be saved to a file inside the diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 8b0e06d9fcab4..8efb71ecaae3a 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -602,6 +602,12 @@ def persist(self, storageLevel=StorageLevel.MEMORY_AND_DISK): self._jdf.persist(javaStorageLevel) return self + def noOpRun(self): + """ + Action to run the job until this point and return a new DF. + """ + return DataFrame(self._jdf.noOpRun(), self.sql_ctx) + @property @since(2.1) def storageLevel(self): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 45ec7dcb07a68..73a80b83bf2e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2949,6 +2949,14 @@ class Dataset[T] private[sql]( */ def cache(): this.type = persist() + def noOpRun(): Dataset[T] = withAction("noOpRun", queryExecution) { _ => + withNewExecutionId { + var resultRDD = queryExecution.toRdd.noOpRun().map( + exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer).fromRow) + sparkSession.createDataset(resultRDD) + } + } + /** * Persist this Dataset with the given storage level. * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,