From 4a6f903897d28a3038918997e692410259a90ae3 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 19 Jun 2020 10:36:52 +0800 Subject: [PATCH 1/2] Reuse completeNextStageWithFetchFailure --- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9d412f2dba3ce..762b14e170fcc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1796,9 +1796,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 - complete(taskSets(1), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDep1.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(1, 0, shuffleDep1) scheduler.resubmitFailedStages() // stage 0, attempt 1 should have the properties of job2 @@ -1872,9 +1870,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have the second stage complete normally completeShuffleMapStageSuccessfully(1, 0, 1, Seq("hostA", "hostC")) // fail the third stage because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // have DAGScheduler try again @@ -1900,9 +1896,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // complete stage 1 completeShuffleMapStageSuccessfully(1, 0, 1) // pretend stage 2 failed because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. From bebe4c21237be19d4900429b2d454e9c5ff35477 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Tue, 5 Jan 2021 15:02:48 +0800 Subject: [PATCH 2/2] Optimize code --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 --- .../spark/sql/catalyst/expressions/windowExpressions.scala | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fdd1cd0146c24..73e3b89e65117 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2150,9 +2150,6 @@ class Analyzer(override val catalogManager: CatalogManager) lead.copy(ignoreNulls = ignoreNulls) case lag: Lag => lag.copy(ignoreNulls = ignoreNulls) - case _ => - throw QueryCompilationErrors.functionWithUnsupportedSyntaxError( - owf.prettyName, "IGNORE NULLS") } } else { owf diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index b167499620c0f..1934a9b190fc7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -369,7 +369,7 @@ trait OffsetWindowFunction extends WindowFunction { * within the partition. For instance: a FrameLessOffsetWindowFunction for value x with offset -2, * will get the value of x 2 rows back from the current row in the partition. */ -abstract class FrameLessOffsetWindowFunction +sealed abstract class FrameLessOffsetWindowFunction extends OffsetWindowFunction with Unevaluable with ImplicitCastInputTypes { override def children: Seq[Expression] = Seq(input, offset, default)