From c3210f547af52f009ffd13232f4a15c39c653713 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Tue, 7 Aug 2018 02:02:29 +0800 Subject: [PATCH 1/7] init. --- .../org/apache/spark/api/java/JavaRDD.scala | 15 +++++ .../spark/api/java/JavaRDDBarrier.scala | 57 +++++++++++++++++++ python/pyspark/rdd.py | 32 +++++++++++ 3 files changed, 104 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/api/java/JavaRDDBarrier.scala diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 41b5cab601c3..1f3b1bf40c41 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -21,6 +21,7 @@ import scala.language.implicitConversions import scala.reflect.ClassTag import org.apache.spark._ +import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.rdd.RDD @@ -59,6 +60,14 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) */ def unpersist(blocking: Boolean): JavaRDD[T] = wrapRDD(rdd.unpersist(blocking)) + /** + * :: Experimental :: + * Indicates that Spark must launch the tasks together for the current stage. + */ + @Experimental + @Since("2.4.0") + def barrier(): JavaRDDBarrier[T] = new JavaRDDBarrier[T](this) + // Transformations (return a new RDD) /** @@ -206,6 +215,12 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) wrapRDD(rdd.sortBy(fn, ascending, numPartitions)) } + /** + * Whether the JavaRDD is in a barrier stage. Spark must launch all the tasks at the same time + * for a barrier stage. + */ + private[spark] def isBarrier(): Boolean = rdd.isBarrier() + } object JavaRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDBarrier.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDBarrier.scala new file mode 100644 index 000000000000..e94ddbf174cf --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDBarrier.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java + +import scala.reflect.ClassTag + +import org.apache.spark.BarrierTaskContext +import org.apache.spark.TaskContext +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.rdd.MapPartitionsRDD + +/** + * A Java-friendly version of [[org.apache.spark.rdd.RDDBarrier]] that returns + * [[org.apache.spark.api.java.JavaRDD]]s. + * + * An RDD barrier turns an RDD into a barrier RDD, which forces Spark to launch tasks of the stage + * contains this RDD together. + */ +class JavaRDDBarrier[T: ClassTag](javaRdd: JavaRDD[T]) { + + /** + * :: Experimental :: + * Maps partitions together with a provided [[org.apache.spark.BarrierTaskContext]]. + * + * `preservesPartitioning` indicates whether the input function preserves the partitioner, which + * should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys. + */ + @Experimental + @Since("2.4.0") + def mapPartitions[S: ClassTag]( + f: (Iterator[T], BarrierTaskContext) => Iterator[S], + preservesPartitioning: Boolean = false): JavaRDD[S] = javaRdd.withScope { + val cleanedF = javaRdd.sparkContext.clean(f) + JavaRDD.fromRDD(new MapPartitionsRDD( + javaRdd.rdd, + (context: TaskContext, index: Int, iter: Iterator[T]) => + cleanedF(iter, context.asInstanceOf[BarrierTaskContext]), + preservesPartitioning, + isFromBarrier = true + )) + } +} diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 951851804b1d..1f6b240e4141 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2406,6 +2406,17 @@ def toLocalIterator(self): sock_info = self.ctx._jvm.PythonRDD.toLocalIteratorAndServe(self._jrdd.rdd()) return _load_from_socket(sock_info, self._jrdd_deserializer) + def barrier(self): + """ + Indicates that Spark must launch the tasks together for the current stage. + """ + return RDDBarrier(self) + + def isBarrier(self): + """ + Whether this RDD is in a barrier stage. + """ + return self._jrdd.isBarrier() def _prepare_for_python_RDD(sc, command): # the serialized command will be compressed by broadcast @@ -2428,6 +2439,27 @@ def _wrap_function(sc, func, deserializer, serializer, profiler=None): return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec, sc.pythonVer, broadcast_vars, sc._javaAccumulator) +class RDDBarrier(object): + + """ + .. note:: Experimental + + An RDDBarrier turns an RDD into a barrier RDD, which forces Spark to launch tasks of the stage + contains this RDD together. + """ + + def __init__(self, rdd): + self.rdd = rdd + self._jrdd = rdd._jrdd + + def mapPartitions(self, f, preservesPartitioning=False): + """ + Return a new RDD by applying a function to each partition of this RDD. + """ + def func(s, iterator): + return f(iterator) + jrdd = self._jrdd.barrier().mapPartitions(f, preservesPartitioning) + return RDD(jrdd, self.rdd.ctx, self.rdd._jrdd_deserializer) class PipelinedRDD(RDD): From 4140472a09761ac5281e3b99ec69c4e762367d8e Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Tue, 7 Aug 2018 02:26:12 +0800 Subject: [PATCH 2/7] update --- python/pyspark/rdd.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1f6b240e4141..4c5e65014912 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2418,6 +2418,7 @@ def isBarrier(self): """ return self._jrdd.isBarrier() + def _prepare_for_python_RDD(sc, command): # the serialized command will be compressed by broadcast ser = CloudPickleSerializer() @@ -2439,6 +2440,7 @@ def _wrap_function(sc, func, deserializer, serializer, profiler=None): return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec, sc.pythonVer, broadcast_vars, sc._javaAccumulator) + class RDDBarrier(object): """ @@ -2461,6 +2463,7 @@ def func(s, iterator): jrdd = self._jrdd.barrier().mapPartitions(f, preservesPartitioning) return RDD(jrdd, self.rdd.ctx, self.rdd._jrdd_deserializer) + class PipelinedRDD(RDD): """ From 1ee80254c869b9fe42d05f401a4802d8b4e1662a Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Fri, 10 Aug 2018 01:25:31 +0800 Subject: [PATCH 3/7] update --- .../org/apache/spark/api/java/JavaRDD.scala | 15 ----- .../spark/api/java/JavaRDDBarrier.scala | 57 ------------------- .../org/apache/spark/rdd/RDDBarrier.scala | 15 ++++- python/pyspark/rdd.py | 17 +++++- 4 files changed, 28 insertions(+), 76 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/api/java/JavaRDDBarrier.scala diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 1f3b1bf40c41..41b5cab601c3 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -21,7 +21,6 @@ import scala.language.implicitConversions import scala.reflect.ClassTag import org.apache.spark._ -import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.rdd.RDD @@ -60,14 +59,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) */ def unpersist(blocking: Boolean): JavaRDD[T] = wrapRDD(rdd.unpersist(blocking)) - /** - * :: Experimental :: - * Indicates that Spark must launch the tasks together for the current stage. - */ - @Experimental - @Since("2.4.0") - def barrier(): JavaRDDBarrier[T] = new JavaRDDBarrier[T](this) - // Transformations (return a new RDD) /** @@ -215,12 +206,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) wrapRDD(rdd.sortBy(fn, ascending, numPartitions)) } - /** - * Whether the JavaRDD is in a barrier stage. Spark must launch all the tasks at the same time - * for a barrier stage. - */ - private[spark] def isBarrier(): Boolean = rdd.isBarrier() - } object JavaRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDBarrier.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDBarrier.scala deleted file mode 100644 index e94ddbf174cf..000000000000 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDBarrier.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.java - -import scala.reflect.ClassTag - -import org.apache.spark.BarrierTaskContext -import org.apache.spark.TaskContext -import org.apache.spark.annotation.{Experimental, Since} -import org.apache.spark.rdd.MapPartitionsRDD - -/** - * A Java-friendly version of [[org.apache.spark.rdd.RDDBarrier]] that returns - * [[org.apache.spark.api.java.JavaRDD]]s. - * - * An RDD barrier turns an RDD into a barrier RDD, which forces Spark to launch tasks of the stage - * contains this RDD together. - */ -class JavaRDDBarrier[T: ClassTag](javaRdd: JavaRDD[T]) { - - /** - * :: Experimental :: - * Maps partitions together with a provided [[org.apache.spark.BarrierTaskContext]]. - * - * `preservesPartitioning` indicates whether the input function preserves the partitioner, which - * should be `false` unless `rdd` is a pair RDD and the input function doesn't modify the keys. - */ - @Experimental - @Since("2.4.0") - def mapPartitions[S: ClassTag]( - f: (Iterator[T], BarrierTaskContext) => Iterator[S], - preservesPartitioning: Boolean = false): JavaRDD[S] = javaRdd.withScope { - val cleanedF = javaRdd.sparkContext.clean(f) - JavaRDD.fromRDD(new MapPartitionsRDD( - javaRdd.rdd, - (context: TaskContext, index: Int, iter: Iterator[T]) => - cleanedF(iter, context.asInstanceOf[BarrierTaskContext]), - preservesPartitioning, - isFromBarrier = true - )) - } -} diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala index 978e7c004e5e..705105f028fb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala @@ -19,9 +19,9 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.BarrierTaskContext import org.apache.spark.TaskContext import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.api.java.JavaRDD /** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */ class RDDBarrier[T: ClassTag](rdd: RDD[T]) { @@ -47,5 +47,18 @@ class RDDBarrier[T: ClassTag](rdd: RDD[T]) { ) } + /** + * Expose a JavaRDD that wraps a barrier RDD generated from the prev RDD, to support launch + * barrier stage from python side. + */ + private[spark] def toJavaRDD(): JavaRDD[T] = { + val barrierRDD = new MapPartitionsRDD[T, T]( + rdd, + (context, pid, iter) => iter, + preservesPartitioning = false, + isFromBarrier = true) + JavaRDD.fromRDD(barrierRDD) + } + /** TODO extra conf(e.g. timeout) */ } diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 4c5e65014912..bc7f6043daed 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2408,7 +2408,11 @@ def toLocalIterator(self): def barrier(self): """ + .. note:: Experimental + Indicates that Spark must launch the tasks together for the current stage. + + .. versionadded:: 2.4.0 """ return RDDBarrier(self) @@ -2416,7 +2420,7 @@ def isBarrier(self): """ Whether this RDD is in a barrier stage. """ - return self._jrdd.isBarrier() + return self._jrdd.rdd().isBarrier() def _prepare_for_python_RDD(sc, command): @@ -2448,6 +2452,8 @@ class RDDBarrier(object): An RDDBarrier turns an RDD into a barrier RDD, which forces Spark to launch tasks of the stage contains this RDD together. + + .. versionadded:: 2.4.0 """ def __init__(self, rdd): @@ -2456,12 +2462,17 @@ def __init__(self, rdd): def mapPartitions(self, f, preservesPartitioning=False): """ + .. note:: Experimental + Return a new RDD by applying a function to each partition of this RDD. + + .. versionadded:: 2.4.0 """ def func(s, iterator): return f(iterator) - jrdd = self._jrdd.barrier().mapPartitions(f, preservesPartitioning) - return RDD(jrdd, self.rdd.ctx, self.rdd._jrdd_deserializer) + jBarrierRdd = self._jrdd.rdd().barrier().toJavaRDD() + pyBarrierRdd = RDD(jBarrierRdd, self.rdd.ctx, self.rdd._jrdd_deserializer) + return pyBarrierRdd.mapPartitions(f, preservesPartitioning) class PipelinedRDD(RDD): From b6f4847b617d940f59a8619786fa3933bdc07ad5 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Fri, 10 Aug 2018 01:27:28 +0800 Subject: [PATCH 4/7] update --- python/pyspark/rdd.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index bc7f6043daed..82e7af9142c3 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2418,7 +2418,11 @@ def barrier(self): def isBarrier(self): """ + .. note:: Experimental + Whether this RDD is in a barrier stage. + + .. versionadded:: 2.4.0 """ return self._jrdd.rdd().isBarrier() From d508fc5df6680a8f30ce4c17004a1677a96d91eb Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Fri, 10 Aug 2018 01:28:11 +0800 Subject: [PATCH 5/7] update --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 82e7af9142c3..972f55b94ed0 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2421,7 +2421,7 @@ def isBarrier(self): .. note:: Experimental Whether this RDD is in a barrier stage. - + .. versionadded:: 2.4.0 """ return self._jrdd.rdd().isBarrier() From ea2330baa61e427665ba824c3c42d1e4ec1a7934 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Fri, 10 Aug 2018 16:22:03 +0800 Subject: [PATCH 6/7] update --- .../org/apache/spark/api/python/PythonRDD.scala | 6 +++++- .../scala/org/apache/spark/rdd/RDDBarrier.scala | 14 -------------- python/pyspark/rdd.py | 13 +++++++------ 3 files changed, 12 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 8bc0ff7936da..8c2ce883093c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -45,7 +45,8 @@ import org.apache.spark.util._ private[spark] class PythonRDD( parent: RDD[_], func: PythonFunction, - preservePartitoning: Boolean) + preservePartitoning: Boolean, + isFromBarrier: Boolean = false) extends RDD[Array[Byte]](parent) { val bufferSize = conf.getInt("spark.buffer.size", 65536) @@ -63,6 +64,9 @@ private[spark] class PythonRDD( val runner = PythonRunner(func, bufferSize, reuseWorker) runner.compute(firstParent.iterator(split, context), split.index, context) } + + @transient protected lazy override val isBarrier_ : Boolean = + isFromBarrier || dependencies.exists(_.rdd.isBarrier()) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala index 705105f028fb..b399bf9febae 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDBarrier.scala @@ -21,7 +21,6 @@ import scala.reflect.ClassTag import org.apache.spark.TaskContext import org.apache.spark.annotation.{Experimental, Since} -import org.apache.spark.api.java.JavaRDD /** Represents an RDD barrier, which forces Spark to launch tasks of this stage together. */ class RDDBarrier[T: ClassTag](rdd: RDD[T]) { @@ -47,18 +46,5 @@ class RDDBarrier[T: ClassTag](rdd: RDD[T]) { ) } - /** - * Expose a JavaRDD that wraps a barrier RDD generated from the prev RDD, to support launch - * barrier stage from python side. - */ - private[spark] def toJavaRDD(): JavaRDD[T] = { - val barrierRDD = new MapPartitionsRDD[T, T]( - rdd, - (context, pid, iter) => iter, - preservesPartitioning = false, - isFromBarrier = true) - JavaRDD.fromRDD(barrierRDD) - } - /** TODO extra conf(e.g. timeout) */ } diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 972f55b94ed0..874bd0f3672f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2462,7 +2462,6 @@ class RDDBarrier(object): def __init__(self, rdd): self.rdd = rdd - self._jrdd = rdd._jrdd def mapPartitions(self, f, preservesPartitioning=False): """ @@ -2474,9 +2473,7 @@ def mapPartitions(self, f, preservesPartitioning=False): """ def func(s, iterator): return f(iterator) - jBarrierRdd = self._jrdd.rdd().barrier().toJavaRDD() - pyBarrierRdd = RDD(jBarrierRdd, self.rdd.ctx, self.rdd._jrdd_deserializer) - return pyBarrierRdd.mapPartitions(f, preservesPartitioning) + return PipelinedRDD(self.rdd, func, preservesPartitioning, isFromBarrier=True) class PipelinedRDD(RDD): @@ -2498,7 +2495,7 @@ class PipelinedRDD(RDD): 20 """ - def __init__(self, prev, func, preservesPartitioning=False): + def __init__(self, prev, func, preservesPartitioning=False, isFromBarrier=False): if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): # This transformation is the first in its stage: self.func = func @@ -2524,6 +2521,7 @@ def pipeline_func(split, iterator): self._jrdd_deserializer = self.ctx.serializer self._bypass_serializer = False self.partitioner = prev.partitioner if self.preservesPartitioning else None + self.is_barrier = prev.isBarrier() or isFromBarrier def getNumPartitions(self): return self._prev_jrdd.partitions().size() @@ -2543,7 +2541,7 @@ def _jrdd(self): wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer, self._jrdd_deserializer, profiler) python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func, - self.preservesPartitioning) + self.preservesPartitioning, self.is_barrier) self._jrdd_val = python_rdd.asJavaRDD() if profiler: @@ -2559,6 +2557,9 @@ def id(self): def _is_pipelinable(self): return not (self.is_cached or self.is_checkpointed) + def isBarrier(self): + return self.is_barrier + def _test(): import doctest From cf3853177d0ed76efbffee8ced1021003b085a26 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Sat, 11 Aug 2018 01:18:19 +0800 Subject: [PATCH 7/7] update --- python/pyspark/rdd.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 874bd0f3672f..d17a8eb76ad4 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2416,13 +2416,9 @@ def barrier(self): """ return RDDBarrier(self) - def isBarrier(self): + def _is_barrier(self): """ - .. note:: Experimental - Whether this RDD is in a barrier stage. - - .. versionadded:: 2.4.0 """ return self._jrdd.rdd().isBarrier() @@ -2521,7 +2517,7 @@ def pipeline_func(split, iterator): self._jrdd_deserializer = self.ctx.serializer self._bypass_serializer = False self.partitioner = prev.partitioner if self.preservesPartitioning else None - self.is_barrier = prev.isBarrier() or isFromBarrier + self.is_barrier = prev._is_barrier() or isFromBarrier def getNumPartitions(self): return self._prev_jrdd.partitions().size() @@ -2557,7 +2553,7 @@ def id(self): def _is_pipelinable(self): return not (self.is_cached or self.is_checkpointed) - def isBarrier(self): + def _is_barrier(self): return self.is_barrier