diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dd1b2595461f..9177c1b56a47 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -767,10 +767,17 @@ private[spark] class DAGScheduler( callSite: CallSite, timeout: Long, properties: Properties): PartialResult[R] = { - val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) - val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val partitions = (0 until rdd.partitions.length).toArray val jobId = nextJobId.getAndIncrement() + if (partitions.isEmpty) { + // Return immediately if the job is running 0 tasks + val time = clock.getTimeMillis() + listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties)) + listenerBus.post(SparkListenerJobEnd(jobId, time, JobSucceeded)) + return new PartialResult(evaluator.currentResult(), true) + } + val listener = new ApproximateActionListener(rdd, func, evaluator, timeout) + val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions, callSite, listener, SerializationUtils.clone(properties))) listener.awaitResult() // Will throw an exception if the job fails diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e17d264cced9..e74f4627db9b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.util.Properties +import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.annotation.meta.param @@ -2849,6 +2850,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + test("SPARK-27164: RDD.countApprox on empty RDDs schedules jobs which never complete") { + val latch = new CountDownLatch(1) + val jobListener = new SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + latch.countDown() + } + } + sc.addSparkListener(jobListener) + sc.emptyRDD[Int].countApprox(10000).getFinalValue() + assert(latch.await(10, TimeUnit.SECONDS)) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID.