From 7fbd7b44edec74d07356148c349c1a6fbe163e1b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 30 Aug 2018 22:43:22 +0200 Subject: [PATCH 1/3] Revert "Porting UnionRDD on parmap" This reverts commit 72cdfeb765cda13ab03ed8515a83fa24657894ac. --- .../scala/org/apache/spark/rdd/UnionRDD.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 4b6f73235a57..60e383afadf1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -20,13 +20,12 @@ package org.apache.spark.rdd import java.io.{IOException, ObjectOutputStream} import scala.collection.mutable.ArrayBuffer -import scala.concurrent.ExecutionContext +import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.forkjoin.ForkJoinPool import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.ThreadUtils.parmap import org.apache.spark.util.Utils /** @@ -60,7 +59,8 @@ private[spark] class UnionPartition[T: ClassTag]( } object UnionRDD { - private[spark] lazy val threadPool = new ForkJoinPool(8) + private[spark] lazy val partitionEvalTaskSupport = + new ForkJoinTaskSupport(new ForkJoinPool(8)) } @DeveloperApi @@ -74,13 +74,14 @@ class UnionRDD[T: ClassTag]( rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10) override def getPartitions: Array[Partition] = { - val partitionLengths = if (isPartitionListingParallel) { - implicit val ec = ExecutionContext.fromExecutor(UnionRDD.threadPool) - parmap(rdds)(_.partitions.length) + val parRDDs = if (isPartitionListingParallel) { + val parArray = rdds.par + parArray.tasksupport = UnionRDD.partitionEvalTaskSupport + parArray } else { - rdds.map(_.partitions.length) + rdds } - val array = new Array[Partition](partitionLengths.sum) + val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum) var pos = 0 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) { array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) From 6fd4c0099c85df240ef7fb9d915f0b20c06b0040 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 30 Aug 2018 22:48:05 +0200 Subject: [PATCH 2/3] Removing potentially dangerous method --- .../org/apache/spark/util/ThreadUtils.scala | 32 +++---------------- .../util/FileBasedWriteAheadLog.scala | 4 ++- 2 files changed, 7 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index f0e5addbe5b5..cb0c20541d0d 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -284,36 +284,12 @@ private[spark] object ThreadUtils { try { implicit val ec = ExecutionContext.fromExecutor(pool) - parmap(in)(f) + val futures = in.map(x => Future(f(x))) + val futureSeq = Future.sequence(futures) + + awaitResult(futureSeq, Duration.Inf) } finally { pool.shutdownNow() } } - - /** - * Transforms input collection by applying the given function to each element in parallel fashion. - * Comparing to the map() method of Scala parallel collections, this method can be interrupted - * at any time. This is useful on canceling of task execution, for example. - * - * @param in - the input collection which should be transformed in parallel. - * @param f - the lambda function will be applied to each element of `in`. - * @param ec - an execution context for parallel applying of the given function `f`. - * @tparam I - the type of elements in the input collection. - * @tparam O - the type of elements in resulted collection. - * @return new collection in which each element was given from the input collection `in` by - * applying the lambda function `f`. - */ - def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]] - (in: Col[I]) - (f: I => O) - (implicit - cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map - cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]], // for Future.sequence - ec: ExecutionContext - ): Col[O] = { - val futures = in.map(x => Future(f(x))) - val futureSeq = Future.sequence(futures) - - awaitResult(futureSeq, Duration.Inf) - } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index bba071e80c0e..8ff605d045b6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -315,7 +315,9 @@ private[streaming] object FileBasedWriteAheadLog { implicit val ec = executionContext source.grouped(groupSize).flatMap { group => - ThreadUtils.parmap(group)(handler) + val parallelCollection = group.par + parallelCollection.tasksupport = taskSupport + parallelCollection.map(handler) }.flatten } } From e868edb45d6e0ea4dda68a44d1c6e6426f292bd0 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 31 Aug 2018 09:21:29 +0200 Subject: [PATCH 3/3] Removing unused val --- .../org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 8ff605d045b6..f0161e1465c2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -312,7 +312,6 @@ private[streaming] object FileBasedWriteAheadLog { handler: I => Iterator[O]): Iterator[O] = { val taskSupport = new ExecutionContextTaskSupport(executionContext) val groupSize = taskSupport.parallelismLevel.max(8) - implicit val ec = executionContext source.grouped(groupSize).flatMap { group => val parallelCollection = group.par