diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 6f5b3b5a1347..9c494c043796 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -453,7 +453,7 @@ "Max offset with rowsPerSecond is , but it's now." ] }, - "INCORRECT_RUMP_UP_RATE" : { + "INCORRECT_RAMP_UP_RATE" : { "message" : [ "Max offset with rowsPerSecond is , but 'rampUpTimeSeconds' is ." ] @@ -4312,5 +4312,40 @@ "message" : [ "Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting to -1 or increase the spark driver memory by setting to a higher value" ] + }, + "_LEGACY_ERROR_TEMP_2276" : { + "message" : [ + "Hive table with ANSI intervals is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_2277" : { + "message" : [ + "Number of dynamic partitions created is , which is more than . To solve this try to set to at least ." + ] + }, + "_LEGACY_ERROR_TEMP_2278" : { + "message" : [ + "The input '' does not match the given number format: ''" + ] + }, + "_LEGACY_ERROR_TEMP_2279" : { + "message" : [ + "Multiple bucket transforms are not supported." + ] + }, + "_LEGACY_ERROR_TEMP_2280" : { + "message" : [ + "Create namespace comment is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_2281" : { + "message" : [ + "Remove namespace comment is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_2282" : { + "message" : [ + "Drop namespace restrict is not supported" + ] } } \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala index c985c72f2adc..df7cd0b44c90 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala @@ -171,7 +171,11 @@ private class DedicatedMessageLoop( } (1 to endpoint.threadCount()).foreach { _ => - threadpool.submit(receiveLoopRunnable) + /** + * We need to be careful not to use [[ExecutorService#submit]]. + * `submit` api will swallow uncaught exceptions in [[FutureTask#setException]]. + * */ + threadpool.execute(receiveLoopRunnable) } // Mark active to handle the OnStart message. diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index a8b1304b76f8..64789ca94e08 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -21,6 +21,7 @@ import java.io.File import java.nio.ByteBuffer import java.util.Properties import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger import scala.collection.concurrent.TrieMap import scala.collection.mutable @@ -36,11 +37,13 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.TestUtils._ +import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} +import org.apache.spark.internal.config.PLUGINS import org.apache.spark.resource._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.rpc.RpcEnv -import org.apache.spark.scheduler.TaskDescription +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{KillTask, LaunchTask} import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.{SerializableBuffer, ThreadUtils, Utils} @@ -535,6 +538,39 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite } } + /** + * A fatal error occurred when [[Executor]] was initialized, this should be caught by + * [[SparkUncaughtExceptionHandler]] and [[Executor]] can exit by itself. + */ + test("SPARK-40320 Executor should exit when initialization failed for fatal error") { + val conf = new SparkConf() + .setMaster("local-cluster[1, 1, 1024]") + .set(PLUGINS, Seq(classOf[TestFatalErrorPlugin].getName)) + .setAppName("test") + sc = new SparkContext(conf) + val executorAddCounter = new AtomicInteger(0) + val executorRemovedCounter = new AtomicInteger(0) + + val listener = new SparkListener() { + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + executorAddCounter.getAndIncrement() + } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + executorRemovedCounter.getAndIncrement() + } + } + try { + sc.addSparkListener(listener) + eventually(timeout(15.seconds)) { + assert(executorAddCounter.get() >= 2) + assert(executorRemovedCounter.get() >= 2) + } + } finally { + sc.removeSparkListener(listener) + } + } + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer, rpcEnv: Option[RpcEnv] = None): SparkEnv = { val mockEnv = mock[SparkEnv] @@ -547,3 +583,24 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite mockEnv } } + +private class TestFatalErrorPlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = new TestDriverPlugin() + + override def executorPlugin(): ExecutorPlugin = new TestErrorExecutorPlugin() +} + +private class TestDriverPlugin extends DriverPlugin { +} + +private class TestErrorExecutorPlugin extends ExecutorPlugin { + + override def init(_ctx: PluginContext, extraConf: java.util.Map[String, String]): Unit = { + // scalastyle:off throwerror + /** + * A fatal error. See nonFatal definition in [[NonFatal]]. + */ + throw new UnsatisfiedLinkError("Mock throws fatal error.") + // scalastyle:on throwerror + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 6df03aa8e84e..6d8c2e83ef79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -242,7 +242,13 @@ abstract class Expression extends TreeNode[Expression] { * This means that the lazy `cannonicalized` is called and computed only on the root of the * adjacent expressions. */ - lazy val canonicalized: Expression = { + lazy val canonicalized: Expression = withCanonicalizedChildren + + /** + * The default process of canonicalization. It is a one pass, bottum-up expression tree + * computation based oncanonicalizing children before canonicalizing the current node. + */ + final protected def withCanonicalizedChildren: Expression = { val canonicalizedChildren = children.map(_.canonicalized) withNewChildren(canonicalizedChildren) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index 3e8ec94c33ce..4d99c3b02a07 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -479,7 +479,15 @@ case class Add( override lazy val canonicalized: Expression = { // TODO: do not reorder consecutive `Add`s with different `evalMode` - orderCommutative({ case Add(l, r, _) => Seq(l, r) }).reduce(Add(_, _, evalMode)) + val reorderResult = + orderCommutative({ case Add(l, r, _) => Seq(l, r) }).reduce(Add(_, _, evalMode)) + if (resolved && reorderResult.resolved && reorderResult.dataType == dataType) { + reorderResult + } else { + // SPARK-40903: Avoid reordering decimal Add for canonicalization if the result data type is + // changed, which may cause data checking error within ComplexTypeMergingExpression. + withCanonicalizedChildren + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 4aedfb3b03da..7e870e23fba0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2387,11 +2387,11 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { new SparkException("Foreach writer has been aborted due to a task failure") } - def incorrectRumpUpRate(rowsPerSecond: Long, + def incorrectRampUpRate(rowsPerSecond: Long, maxSeconds: Long, rampUpTimeSeconds: Long): Throwable = { new SparkRuntimeException( - errorClass = "INCORRECT_RUMP_UP_RATE", + errorClass = "INCORRECT_RAMP_UP_RATE", messageParameters = Map( "rowsPerSecond" -> rowsPerSecond.toString, "maxSeconds" -> maxSeconds.toString, @@ -2577,8 +2577,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { "expected" -> s"Detail message: $detailMessage")) } - def hiveTableWithAnsiIntervalsError(tableName: String): Throwable = { - new UnsupportedOperationException(s"Hive table $tableName with ANSI intervals is not supported") + def hiveTableWithAnsiIntervalsError(tableName: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2276", + messageParameters = Map("tableName" -> tableName)) } def cannotConvertOrcTimestampToTimestampNTZError(): Throwable = { @@ -2602,31 +2604,47 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { maxDynamicPartitions: Int, maxDynamicPartitionsKey: String): Throwable = { new SparkException( - s"Number of dynamic partitions created is $numWrittenParts" + - s", which is more than $maxDynamicPartitions" + - s". To solve this try to set $maxDynamicPartitionsKey" + - s" to at least $numWrittenParts.") + errorClass = "_LEGACY_ERROR_TEMP_2277", + messageParameters = Map( + "numWrittenParts" -> numWrittenParts.toString(), + "maxDynamicPartitionsKey" -> maxDynamicPartitionsKey, + "maxDynamicPartitions" -> maxDynamicPartitions.toString(), + "numWrittenParts" -> numWrittenParts.toString()), + cause = null) } - def invalidNumberFormatError(valueType: String, input: String, format: String): Throwable = { - new IllegalArgumentException( - s"The input $valueType '$input' does not match the given number format: '$format'") + def invalidNumberFormatError( + valueType: String, input: String, format: String): SparkIllegalArgumentException = { + new SparkIllegalArgumentException( + errorClass = "_LEGACY_ERROR_TEMP_2278", + messageParameters = Map( + "valueType" -> valueType, + "input" -> input, + "format" -> format)) } - def multipleBucketTransformsError(): Throwable = { - new UnsupportedOperationException("Multiple bucket transforms are not supported.") + def multipleBucketTransformsError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2279", + messageParameters = Map.empty) } - def unsupportedCreateNamespaceCommentError(): Throwable = { - new SQLFeatureNotSupportedException("Create namespace comment is not supported") + def unsupportedCreateNamespaceCommentError(): SparkSQLFeatureNotSupportedException = { + new SparkSQLFeatureNotSupportedException( + errorClass = "_LEGACY_ERROR_TEMP_2280", + messageParameters = Map.empty) } - def unsupportedRemoveNamespaceCommentError(): Throwable = { - new SQLFeatureNotSupportedException("Remove namespace comment is not supported") + def unsupportedRemoveNamespaceCommentError(): SparkSQLFeatureNotSupportedException = { + new SparkSQLFeatureNotSupportedException( + errorClass = "_LEGACY_ERROR_TEMP_2281", + messageParameters = Map.empty) } - def unsupportedDropNamespaceRestrictError(): Throwable = { - new SQLFeatureNotSupportedException("Drop namespace restrict is not supported") + def unsupportedDropNamespaceRestrictError(): SparkSQLFeatureNotSupportedException = { + new SparkSQLFeatureNotSupportedException( + errorClass = "_LEGACY_ERROR_TEMP_2282", + messageParameters = Map.empty) } def timestampAddOverflowError(micros: Long, amount: Int, unit: String): ArithmeticException = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala index 43b7f35f7bb2..057fb98c2398 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CanonicalizeSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans.logical.Range -import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, LongType, StringType, StructField, StructType} class CanonicalizeSuite extends SparkFunSuite { @@ -187,7 +187,23 @@ class CanonicalizeSuite extends SparkFunSuite { test("SPARK-40362: Commutative operator under BinaryComparison") { Seq(EqualTo, EqualNullSafe, GreaterThan, LessThan, GreaterThanOrEqual, LessThanOrEqual) .foreach { bc => - assert(bc(Add($"a", $"b"), Literal(10)).semanticEquals(bc(Add($"b", $"a"), Literal(10)))) + assert(bc(Multiply($"a", $"b"), Literal(10)).semanticEquals( + bc(Multiply($"b", $"a"), Literal(10)))) } } + + test("SPARK-40903: Only reorder decimal Add when the result data type is not changed") { + val d = Decimal(1.2) + val literal1 = Literal.create(d, DecimalType(2, 1)) + val literal2 = Literal.create(d, DecimalType(2, 1)) + val literal3 = Literal.create(d, DecimalType(3, 2)) + assert(Add(literal1, literal2).semanticEquals(Add(literal2, literal1))) + assert(Add(Add(literal1, literal2), literal3).semanticEquals( + Add(Add(literal3, literal2), literal1))) + + val literal4 = Literal.create(d, DecimalType(12, 5)) + val literal5 = Literal.create(d, DecimalType(12, 6)) + assert(!Add(Add(literal4, literal5), literal1).semanticEquals( + Add(Add(literal1, literal5), literal4))) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala index 94ae774070c8..15513037fe1b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/StringExpressionsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.math.{BigDecimal => JavaBigDecimal} -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException} import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -1124,7 +1124,8 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { ).foreach { case (str: String, format: String) => val toNumberExpr = ToNumber(Literal(str), Literal(format)) assert(toNumberExpr.checkInputDataTypes() == TypeCheckResult.TypeCheckSuccess) - checkExceptionInExpression[IllegalArgumentException]( + + checkExceptionInExpression[SparkIllegalArgumentException]( toNumberExpr, "does not match the given number format") val tryToNumberExpr = TryToNumber(Literal(str), Literal(format)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala index 45c37d8ae772..5640c7d3ca76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala @@ -52,7 +52,7 @@ class RateStreamMicroBatchStream( private val maxSeconds = Long.MaxValue / rowsPerSecond if (rampUpTimeSeconds > maxSeconds) { - throw QueryExecutionErrors.incorrectRumpUpRate( + throw QueryExecutionErrors.incorrectRampUpRate( rowsPerSecond, maxSeconds, rampUpTimeSeconds) } diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out index 53a57ee270ba..9ddd87f10de1 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out @@ -4694,8 +4694,15 @@ SELECT '' AS to_number_1, to_number('-34,338,492', '99G999G999') -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -The input string '-34,338,492' does not match the given number format: '99G999G999' +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2278", + "messageParameters" : { + "format" : "99G999G999", + "input" : "-34,338,492", + "valueType" : "string" + } +} -- !query @@ -4761,8 +4768,15 @@ SELECT '' AS to_number_16, to_number('123456','999G999') -- !query schema struct<> -- !query output -java.lang.IllegalArgumentException -The input string '123456' does not match the given number format: '999G999' +org.apache.spark.SparkIllegalArgumentException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_2278", + "messageParameters" : { + "format" : "999G999", + "input" : "123456", + "valueType" : "string" + } +} -- !query diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 030e68d227aa..dd3ad0f4d6bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4518,6 +4518,14 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } } + + test("SPARK-40903: Don't reorder Add for canonicalize if it is decimal type") { + val tableName = "decimalTable" + withTable(tableName) { + sql(s"create table $tableName(a decimal(12, 5), b decimal(12, 6)) using orc") + checkAnswer(sql(s"select sum(coalesce(a + b + 1.75, a)) from $tableName"), Row(null)) + } + } } case class Foo(bar: Option[String]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index 175f26dc0bb8..4f37d2d35365 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -197,7 +197,7 @@ class RateStreamProviderSuite extends StreamTest { } } - testQuietly("microbatch - rump up error") { + testQuietly("microbatch - ramp up error") { val e = intercept[SparkRuntimeException]( new RateStreamMicroBatchStream( rowsPerSecond = Long.MaxValue, @@ -207,7 +207,7 @@ class RateStreamProviderSuite extends StreamTest { checkError( exception = e, - errorClass = "INCORRECT_RUMP_UP_RATE", + errorClass = "INCORRECT_RAMP_UP_RATE", parameters = Map( "rowsPerSecond" -> Long.MaxValue.toString, "maxSeconds" -> "1",