diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 4bdcafce0d75..09d8934b4c13 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -111,7 +111,7 @@ trait FutureAction[T] extends Future[T] { */ @DeveloperApi class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T) - extends FutureAction[T] { + extends FutureAction[T] with SupportForceFinish { @volatile private var _cancelled: Boolean = false @@ -120,6 +120,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: jobWaiter.cancel() } + override def forceFinish(): Unit = { + jobWaiter.forceFinish() + } + override def ready(atMost: Duration)(implicit permit: CanAwait): SimpleFutureAction.this.type = { jobWaiter.completionFuture.ready(atMost) this @@ -172,6 +176,13 @@ trait JobSubmitter { resultFunc: => R): FutureAction[R] } +trait SupportForceFinish { + /** + * Force finish the execution of this action. + */ + def forceFinish(): Unit +} + /** * A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take, @@ -180,7 +191,7 @@ trait JobSubmitter { */ @DeveloperApi class ComplexFutureAction[T](run : JobSubmitter => Future[T]) - extends FutureAction[T] { self => + extends FutureAction[T] with SupportForceFinish { self => @volatile private var _cancelled = false @@ -195,6 +206,14 @@ class ComplexFutureAction[T](run : JobSubmitter => Future[T]) subActions.foreach(_.cancel()) } + override def forceFinish(): Unit = { + subActions.foreach { + case s: SupportForceFinish => + s.forceFinish() + case _ => + } + } + private def jobSubmitter = new JobSubmitter { def submitJob[T, U, R]( rdd: RDD[T], diff --git a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala index 34c04f4025a9..7b1e27c58fd6 100644 --- a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala +++ b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala @@ -56,6 +56,10 @@ private[spark] class ApproximateActionListener[T, U, R]( } } + override def forceFinish(result: Option[String]): Unit = { + finishedTasks = totalTasks + } + override def jobFailed(exception: Exception): Unit = { synchronized { failure = Some(exception) @@ -73,7 +77,7 @@ private[spark] class ApproximateActionListener[T, U, R]( val time = System.currentTimeMillis() if (failure.isDefined) { throw failure.get - } else if (finishedTasks == totalTasks) { + } else if (finishedTasks >= totalTasks) { return new PartialResult(evaluator.currentResult(), true) } else if (time >= finishTime) { resultObject = Some(new PartialResult(evaluator.currentResult(), false)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 00f505fa5a9b..bd77a1e5d6e4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1118,6 +1118,13 @@ private[spark] class DAGScheduler( eventProcessLoop.post(StageCancelled(stageId, reason)) } + /** + * Force finish job that is running or waiting in the queue. + */ + def forceFinshJob(jobId: Int, reason: Option[String]): Unit = { + eventProcessLoop.post(ForceFinishJob(jobId, reason)) + } + /** * Receives notification about shuffle push for a given shuffle from one map * task has completed @@ -2721,6 +2728,20 @@ private[spark] class DAGScheduler( } } + private[scheduler] def handleForceFinishJob(jobId: Int, reason: Option[String]): Unit = { + if (!jobIdToStageIds.contains(jobId)) { + logDebug("Trying to cancel unregistered job " + jobId) + } else { + val job = jobIdToActiveJob(jobId) + if (cancelRunningIndependentStages(job, "Unnecessary Stage", false)) { + cleanupStateForJobAndIndependentStages(job) + listenerBus.post(SparkListenerJobForceFinish(job.jobId, clock.getTimeMillis())) + listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) + job.listener.forceFinish(reason) + } + } + } + /** * Marks a stage as finished and removes it from the list of running stages. */ @@ -2798,7 +2819,10 @@ private[spark] class DAGScheduler( } /** Cancel all independent, running stages that are only used by this job. */ - private def cancelRunningIndependentStages(job: ActiveJob, reason: String): Boolean = { + private def cancelRunningIndependentStages( + job: ActiveJob, + reason: String, + byError: Boolean = true): Boolean = { var ableToCancelStages = true val stages = jobIdToStageIds(job.jobId) if (stages.isEmpty) { @@ -2819,7 +2843,7 @@ private[spark] class DAGScheduler( if (runningStages.contains(stage)) { try { // cancelTasks will fail if a SchedulerBackend does not implement killTask taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job), reason) - markStageAsFinished(stage, Some(reason)) + markStageAsFinished(stage, if (byError) Some(reason) else None) } catch { case e: UnsupportedOperationException => logWarning(s"Could not cancel tasks for stage $stageId", e) @@ -3000,6 +3024,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case JobCancelled(jobId, reason) => dagScheduler.handleJobCancellation(jobId, reason) + case ForceFinishJob(jobId, reason) => + dagScheduler.handleForceFinishJob(jobId, reason) + case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index f8cd27429060..179957adf52b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -63,6 +63,11 @@ private[scheduler] case class JobCancelled( reason: Option[String]) extends DAGSchedulerEvent +private[scheduler] case class ForceFinishJob( + jobId: Int, + reason: Option[String]) + extends DAGSchedulerEvent + private[scheduler] case class JobGroupCancelled(groupId: String) extends DAGSchedulerEvent private[scheduler] case class JobTagCancelled(tagName: String) extends DAGSchedulerEvent diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala b/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala index e0f7c8f02132..2d808832ed25 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala @@ -24,5 +24,6 @@ package org.apache.spark.scheduler */ private[spark] trait JobListener { def taskSucceeded(index: Int, result: Any): Unit + def forceFinish(result: Option[String]): Unit def jobFailed(exception: Exception): Unit } diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala index feed83162084..ea24665ad6f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala @@ -63,6 +63,15 @@ private[spark] class JobWaiter[T]( } } + override def forceFinish(result: Option[String]): Unit = { + jobPromise.success(()) + } + + def forceFinish(): Unit = { + dagScheduler.forceFinshJob(jobId, Some("Unnecessary stage")) + } + + override def jobFailed(exception: Exception): Unit = { if (!jobPromise.tryFailure(exception)) { logWarning("Ignore failure", exception) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index fd846545d689..92912a578342 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -103,6 +103,10 @@ case class SparkListenerJobEnd( jobResult: JobResult) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerJobForceFinish(jobId: Int, time: Long) + extends SparkListenerEvent + @DeveloperApi case class SparkListenerEnvironmentUpdate( environmentDetails: Map[String, collection.Seq[(String, String)]]) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3aeb52cd37d0..7d247739f7d6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -315,6 +315,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti var failure: Exception = _ val jobListener = new JobListener() { override def taskSucceeded(index: Int, result: Any) = results.put(index, result) + override def forceFinish(result: Option[String]): Unit = {} override def jobFailed(exception: Exception) = { failure = exception } } @@ -323,6 +324,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti val results = new HashMap[Int, Any] var failure: Exception = null override def taskSucceeded(index: Int, result: Any): Unit = results.put(index, result) + override def forceFinish(result: Option[String]): Unit = {} override def jobFailed(exception: Exception): Unit = { failure = exception } } @@ -519,6 +521,11 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti runEvent(JobCancelled(jobId, None)) } + /** Sends ForceFinishJob to the DAG scheduler. */ + private def forceFinishJob(jobId: Int): Unit = { + runEvent(ForceFinishJob(jobId, None)) + } + /** Make some tasks in task set success and check results. */ private def completeAndCheckAnswer( taskSet: TaskSet, @@ -715,6 +722,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti var failureReason: Option[Exception] = None val fakeListener = new JobListener() { override def taskSucceeded(partition: Int, value: Any): Unit = numResults += 1 + override def forceFinish(result: Option[String]): Unit = {} override def jobFailed(exception: Exception): Unit = { failureReason = Some(exception) } @@ -845,6 +853,15 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assertDataStructuresEmpty() } + test("trivial job force finish") { + val rdd = new MyRDD(sc, 1, Nil) + val jobId = submit(rdd, Array(0)) + forceFinishJob(jobId) + assert(sparkListener.failedStages === Seq()) + assert(sparkListener.successfulStages === Set(0)) + assertDataStructuresEmpty() + } + test("job cancellation no-kill backend") { // make sure that the DAGScheduler doesn't crash when the TaskScheduler // doesn't implement killTask() @@ -1929,6 +1946,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti class FailureRecordingJobListener() extends JobListener { var failureMessage: String = _ override def taskSucceeded(index: Int, result: Any): Unit = {} + override def forceFinish(result: Option[String]): Unit = {} override def jobFailed(exception: Exception): Unit = { failureMessage = exception.getMessage } } val listener1 = new FailureRecordingJobListener() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 7951a6f36b9b..783cdfba0e9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -49,10 +49,10 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { // - positive value means an estimated row count which can be over-estimated // - none means the plan has not materialized or the plan can not be estimated private def getEstimatedRowCount(plan: LogicalPlan): Option[BigInt] = plan match { - case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized => + case LogicalQueryStage(_, _, stage: QueryStageExec) if stage.isMaterialized => stage.getRuntimeStatistics.rowCount - case LogicalQueryStage(_, agg: BaseAggregateExec) if agg.groupingExpressions.nonEmpty && + case LogicalQueryStage(_, _, agg: BaseAggregateExec) if agg.groupingExpressions.nonEmpty && agg.child.isInstanceOf[QueryStageExec] => val stage = agg.child.asInstanceOf[QueryStageExec] if (stage.isMaterialized) { @@ -65,7 +65,7 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { } private def isRelationWithAllNullKeys(plan: LogicalPlan): Boolean = plan match { - case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.isMaterialized => + case LogicalQueryStage(_, _, stage: BroadcastQueryStageExec) if stage.isMaterialized => stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys case _ => false } @@ -76,7 +76,7 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { } override protected def userSpecifiedRepartition(p: LogicalPlan): Boolean = p match { - case LogicalQueryStage(_, ShuffleQueryStageExec(_, shuffle: ShuffleExchangeLike, _)) + case LogicalQueryStage(_, _, ShuffleQueryStageExec(_, shuffle: ShuffleExchangeLike, _)) if shuffle.shuffleOrigin == REPARTITION_BY_COL || shuffle.shuffleOrigin == REPARTITION_BY_NUM => true case _ => false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 26c95c0fedd1..bbdbbbce3e6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -247,6 +247,19 @@ case class AdaptiveSparkPlanExec( def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity) + private def cancelUselessUnfinishedStage( + newLogicalPlan: LogicalPlan, + stagesToReplace: Seq[QueryStageExec]): Set[Int] = { + var uselessStageMap = stagesToReplace.map(s => s.id -> s).toMap + newLogicalPlan.foreachUp { + case stage: LogicalQueryStage if uselessStageMap.contains(stage.stageId) => + uselessStageMap = uselessStageMap - stage.stageId + case _ => + } + uselessStageMap.values.foreach(_.forceFinish()) + uselessStageMap.keySet + } + private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { if (isFinalPlan) return currentPhysicalPlan @@ -262,6 +275,7 @@ case class AdaptiveSparkPlanExec( val events = new LinkedBlockingQueue[StageMaterializationEvent]() val errors = new mutable.ArrayBuffer[Throwable]() var stagesToReplace = Seq.empty[QueryStageExec] + val uselessStagesId = mutable.Set.empty[Int] while (!result.allChildStagesMaterialized) { currentPhysicalPlan = result.newPlan if (result.newStages.nonEmpty) { @@ -309,7 +323,7 @@ case class AdaptiveSparkPlanExec( case StageSuccess(stage, res) => stage.resultOption.set(Some(res)) case StageFailure(stage, ex) => - errors.append(ex) + if (!uselessStagesId.contains(stage.id)) errors.append(ex) } // In case of errors, we cancel all running stages and throw exception. @@ -341,6 +355,7 @@ case class AdaptiveSparkPlanExec( cleanUpTempTags(newPhysicalPlan) currentPhysicalPlan = newPhysicalPlan currentLogicalPlan = newLogicalPlan + uselessStagesId ++= cancelUselessUnfinishedStage(newLogicalPlan, stagesToReplace) stagesToReplace = Seq.empty[QueryStageExec] } } @@ -678,7 +693,7 @@ case class AdaptiveSparkPlanExec( // can be overwritten through re-planning processes. setTempTagRecursive(physicalNode.get, logicalNode) // Replace the corresponding logical node with LogicalQueryStage - val newLogicalNode = LogicalQueryStage(logicalNode, physicalNode.get) + val newLogicalNode = LogicalQueryStage(stage.id, logicalNode, physicalNode.get) val newLogicalPlan = logicalPlan.transformDown { case p if p.eq(logicalNode) => newLogicalNode } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala index 217569ae645c..824c7fecd2a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala @@ -57,14 +57,14 @@ object DynamicJoinSelection extends Rule[LogicalPlan] with JoinSelectionHelper { isLeft: Boolean): Option[JoinStrategyHint] = { val plan = if (isLeft) join.left else join.right plan match { - case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.isMaterialized + case LogicalQueryStage(_, _, stage: ShuffleQueryStageExec) if stage.isMaterialized && stage.mapStats.isDefined => val manyEmptyInPlan = hasManyEmptyPartitions(stage.mapStats.get) val canBroadcastPlan = (isLeft && canBuildBroadcastLeft(join.joinType)) || (!isLeft && canBuildBroadcastRight(join.joinType)) val manyEmptyInOther = (if (isLeft) join.right else join.left) match { - case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.isMaterialized + case LogicalQueryStage(_, _, stage: ShuffleQueryStageExec) if stage.isMaterialized && stage.mapStats.isDefined => hasManyEmptyPartitions(stage.mapStats.get) case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala index 8ce2452cc141..d23ab90ac43b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStage.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.aggregate.BaseAggregateExec // TODO we can potentially include only [[QueryStageExec]] in this class if we make the aggregation // planning aware of partitioning. case class LogicalQueryStage( + stageId: Int, logicalPlan: LogicalPlan, physicalPlan: SparkPlan) extends LeafNode { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala index e424af5343fc..f9cf06c4d8b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNes object LogicalQueryStageStrategy extends Strategy { private def isBroadcastStage(plan: LogicalPlan): Boolean = plan match { - case LogicalQueryStage(_, _: BroadcastQueryStageExec) => true + case LogicalQueryStage(_, _, _: BroadcastQueryStageExec) => true case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index d48b4fe17517..2a8dd4d6c02c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicReference import scala.concurrent.Future -import org.apache.spark.{FutureAction, MapOutputStatistics} +import org.apache.spark.{FutureAction, MapOutputStatistics, SupportForceFinish} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -63,6 +63,11 @@ abstract class QueryStageExec extends LeafExecNode { protected def doMaterialize(): Future[Any] + /** + * force the query stage to finish, and clean up any intermediate data if necessary. + */ + def forceFinish(): Unit + /** * Returns the runtime statistics after stage materialization. */ @@ -213,6 +218,14 @@ case class ShuffleQueryStageExec( } override def getRuntimeStatistics: Statistics = shuffle.runtimeStatistics + + override def forceFinish(): Unit = { + shuffleFuture match { + case action: SupportForceFinish if !action.isCompleted => + action.forceFinish() + case _ => + } + } } /** @@ -255,6 +268,10 @@ case class BroadcastQueryStageExec( } } + override def forceFinish(): Unit = { + // TODO support force finish job group + } + override def getRuntimeStatistics: Statistics = broadcast.runtimeStatistics } @@ -290,6 +307,14 @@ case class TableCacheQueryStageExec( } } + override def forceFinish(): Unit = { + future match { + case action: SupportForceFinish if !action.isCompleted => + action.forceFinish() + case _ => + } + } + override protected def doMaterialize(): Future[Any] = future override def getRuntimeStatistics: Statistics = inMemoryTableScan.relation.computeStats() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 68bae34790a0..a194a422a8ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.PrivateMethodTester import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException -import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobForceFinish, SparkListenerJobStart} import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} @@ -726,6 +726,25 @@ class AdaptiveQueryExecSuite } } + test("SPARK-43999: force finish useless stage") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + var forceEndCount = 0 + val listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case SparkListenerJobForceFinish(_, _) => + forceEndCount += 1 + case _ => + } + } + } + spark.sparkContext.addSparkListener(listener) + runAdaptiveAndVerifyResult( + "SELECT * FROM emptyTestData t1 LEFT OUTER JOIN testData t2 ON t1.key = t2.key") + assert(forceEndCount == 1) + } + } + test("SPARK-37753: Inhibit broadcast in left outer join when there are many empty" + " partitions on outer/left side") { // if the right side is completed first and the left side is still being executed,