Skip to content

Conversation

@MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Jul 29, 2018

What changes were proposed in this pull request?

In the PR, I propose to replace Scala parallel collections by new methods parmap(). The methods use futures to transform a sequential collection by applying a lambda function to each element in parallel. The result of parmap is another regular (sequential) collection.

The proposed parmap method aims to solve the problem of impossibility to interrupt parallel Scala collection. This possibility is needed for reliable task preemption.

How was this patch tested?

A test was added to ThreadUtilsSuite

@holdensmagicalunicorn
Copy link

@MaxGekk, thanks! I am a bot who has found some folks who might be able to help with the review:@cloud-fan, @tdas and @rxin

@SparkQA
Copy link

SparkQA commented Jul 29, 2018

Test build #93754 has finished for PR 21913 at commit 506fa93.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 29, 2018

Test build #93755 has finished for PR 21913 at commit 515d393.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 30, 2018

Test build #93756 has finished for PR 21913 at commit e3df464.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 30, 2018

Test build #93780 has finished for PR 21913 at commit 3601550.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @zsxwing

@srowen
Copy link
Member

srowen commented Jul 31, 2018

What problem does this solve?

@zsxwing
Copy link
Member

zsxwing commented Jul 31, 2018

What problem does this solve?

@srowen readParquetFootersInParallel is called in executors. When a task is cancelled, it will still keeping run. If it reads lots of files, it will take pretty long and occupy the task slow. This change is basically to make it interruptible.

@MaxGekk Maybe add the following test to ThreadUtilsSuite. It shows what this PR is fixing.

  test("parmap should be interruptible") {
    val t = new Thread() {
      setDaemon(true)

      override def run() {
        try {
          // "par" is uninterruptible. The following will keep running even if the thread is
          // interrupted. We should prefer to use "ThreadUtils.parmap".
          //
          // (1 to 10).par.flatMap { i =>
          //   Thread.sleep(100000)
          //   1 to i
          // }
          //
          ThreadUtils.parmap(1 to 10, "test", 2) { i =>
            Thread.sleep(100000)
            1 to i
          }.flatten
        } catch {
          case _: InterruptedException => // excepted
        }
      }
    }
    t.start()
    eventually(timeout(10.seconds)) {
      assert(t.isAlive)
    }
    t.interrupt()
    eventually(timeout(10.seconds)) {
      assert(!t.isAlive)
    }
  }

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good. Left some comments.

val parallelCollection = group.par
parallelCollection.tasksupport = taskSupport
parallelCollection.map(handler)
ThreadUtils.parmap(group)(handler)(executionContext)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to not change DStream codes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any specific reasons, or you think parmap is not reliable enough?

* @return new collection in which each element was given from the input collection `in` by
* applying the lambda function `f`.
*/
def parmap[I, O](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can inline this method to the above one if you revert DStream changes.

* applying the lambda function `f`.
*/
def parmap[I, O](
in: TraversableOnce[I],
Copy link
Member

@zsxwing zsxwing Jul 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use Scala generic to make this method return the same Collection type as in. Such as

import scala.collection.TraversableLike
import scala.collection.generic.CanBuildFrom
import scala.language.higherKinds

  def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]]
      (in: Col[I], prefix: String, maxThreads: Int)
      (f: I => O)
      (implicit
        cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
        cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]] // for Future.sequence
      ): Col[O] = {
    val pool = newForkJoinPool(prefix, maxThreads)
    try {
      implicit val ec = ExecutionContext.fromExecutor(pool)
      val futures = in.map(x => Future(f(x)))
      val futureSeq = Future.sequence(futures)
      awaitResult(futureSeq, Duration.Inf)
    } finally {
      pool.shutdownNow()
    }
  }

Then the caller side doesn't need to call toSeq.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing Thank you very much for the code.

implicit val ec = ExecutionContext.fromExecutor(pool)
parmap(in)(f)
} finally {
pool.shutdown()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use shutdownNow to interrupt the running tasks.

@srowen
Copy link
Member

srowen commented Jul 31, 2018

OK, got it. Yes that's worth mentioning in the doc too.
I'll have to go read up to get why this works differently, but obviously accept that it does.
There are other instances of .par, like in UnionRDD and in ddl.scala. Same treatment?

@zsxwing
Copy link
Member

zsxwing commented Jul 31, 2018

Yeah, let's also fix other instances.

@MaxGekk MaxGekk changed the title [WIP][SPARK-24005][CORE] Remove usage of Scala’s parallel collection [SPARK-24005][CORE] Remove usage of Scala’s parallel collection Aug 3, 2018
@SparkQA
Copy link

SparkQA commented Aug 3, 2018

Test build #94134 has finished for PR 21913 at commit 6a5f2ae.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

/**
* Transforms input collection by applying the given function to each element in parallel fashion.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still include a note in these docs about what this does differently from .par. Just a sentence about it being interruptible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment about this.

@SparkQA
Copy link

SparkQA commented Aug 5, 2018

Test build #94242 has finished for PR 21913 at commit cf48d8d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Aug 5, 2018

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Aug 5, 2018

Test build #94246 has finished for PR 21913 at commit cf48d8d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 5, 2018

Test build #94248 has finished for PR 21913 at commit 610154c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

@asfgit asfgit closed this in 131ca14 Aug 7, 2018
eventually(timeout(10.seconds)) {
assert(!t.isAlive)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @MaxGekk @zsxwing Could you tell me why this can be interrupted? while (1 to 10).par can't.


parmap(in)(f)
} finally {
pool.shutdownNow()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConeyLiu this line interrupts the tasks in the thread pool. Scala par doesn't do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing, thanks very much for your answer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants