Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@
"Max offset with <rowsPerSecond> rowsPerSecond is <maxSeconds>, but it's <endSeconds> now."
]
},
"INCORRECT_RUMP_UP_RATE" : {
"INCORRECT_RAMP_UP_RATE" : {
"message" : [
"Max offset with <rowsPerSecond> rowsPerSecond is <maxSeconds>, but 'rampUpTimeSeconds' is <rampUpTimeSeconds>."
]
Expand Down Expand Up @@ -4312,5 +4312,40 @@
"message" : [
"Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting <autoBroadcastjoinThreshold> to -1 or increase the spark driver memory by setting <driverMemory> to a higher value<analyzeTblMsg>"
]
},
"_LEGACY_ERROR_TEMP_2276" : {
"message" : [
"Hive table <tableName> with ANSI intervals is not supported"
]
},
"_LEGACY_ERROR_TEMP_2277" : {
"message" : [
"Number of dynamic partitions created is <numWrittenParts>, which is more than <maxDynamicPartitions>. To solve this try to set <maxDynamicPartitionsKey> to at least <numWrittenParts>."
]
},
"_LEGACY_ERROR_TEMP_2278" : {
"message" : [
"The input <valueType> '<input>' does not match the given number format: '<format>'"
]
},
"_LEGACY_ERROR_TEMP_2279" : {
"message" : [
"Multiple bucket transforms are not supported."
]
},
"_LEGACY_ERROR_TEMP_2280" : {
"message" : [
"Create namespace comment is not supported"
]
},
"_LEGACY_ERROR_TEMP_2281" : {
"message" : [
"Remove namespace comment is not supported"
]
},
"_LEGACY_ERROR_TEMP_2282" : {
"message" : [
"Drop namespace restrict is not supported"
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ private class DedicatedMessageLoop(
}

(1 to endpoint.threadCount()).foreach { _ =>
threadpool.submit(receiveLoopRunnable)
/**
* We need to be careful not to use [[ExecutorService#submit]].
* `submit` api will swallow uncaught exceptions in [[FutureTask#setException]].
* */
threadpool.execute(receiveLoopRunnable)
}

// Mark active to handle the OnStart message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.concurrent.TrieMap
import scala.collection.mutable
Expand All @@ -36,11 +37,13 @@ import org.scalatestplus.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.TestUtils._
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.internal.config.PLUGINS
import org.apache.spark.resource._
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{KillTask, LaunchTask}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{SerializableBuffer, ThreadUtils, Utils}
Expand Down Expand Up @@ -535,6 +538,39 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
}
}

/**
* A fatal error occurred when [[Executor]] was initialized, this should be caught by
* [[SparkUncaughtExceptionHandler]] and [[Executor]] can exit by itself.
*/
test("SPARK-40320 Executor should exit when initialization failed for fatal error") {
val conf = new SparkConf()
.setMaster("local-cluster[1, 1, 1024]")
.set(PLUGINS, Seq(classOf[TestFatalErrorPlugin].getName))
.setAppName("test")
sc = new SparkContext(conf)
val executorAddCounter = new AtomicInteger(0)
val executorRemovedCounter = new AtomicInteger(0)

val listener = new SparkListener() {
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
executorAddCounter.getAndIncrement()
}

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
executorRemovedCounter.getAndIncrement()
}
}
try {
sc.addSparkListener(listener)
eventually(timeout(15.seconds)) {
assert(executorAddCounter.get() >= 2)
assert(executorRemovedCounter.get() >= 2)
}
} finally {
sc.removeSparkListener(listener)
}
}

private def createMockEnv(conf: SparkConf, serializer: JavaSerializer,
rpcEnv: Option[RpcEnv] = None): SparkEnv = {
val mockEnv = mock[SparkEnv]
Expand All @@ -547,3 +583,24 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
mockEnv
}
}

private class TestFatalErrorPlugin extends SparkPlugin {
override def driverPlugin(): DriverPlugin = new TestDriverPlugin()

override def executorPlugin(): ExecutorPlugin = new TestErrorExecutorPlugin()
}

private class TestDriverPlugin extends DriverPlugin {
}

private class TestErrorExecutorPlugin extends ExecutorPlugin {

override def init(_ctx: PluginContext, extraConf: java.util.Map[String, String]): Unit = {
// scalastyle:off throwerror
/**
* A fatal error. See nonFatal definition in [[NonFatal]].
*/
throw new UnsatisfiedLinkError("Mock throws fatal error.")
// scalastyle:on throwerror
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,13 @@ abstract class Expression extends TreeNode[Expression] {
* This means that the lazy `cannonicalized` is called and computed only on the root of the
* adjacent expressions.
*/
lazy val canonicalized: Expression = {
lazy val canonicalized: Expression = withCanonicalizedChildren

/**
* The default process of canonicalization. It is a one pass, bottum-up expression tree
* computation based oncanonicalizing children before canonicalizing the current node.
*/
final protected def withCanonicalizedChildren: Expression = {
val canonicalizedChildren = children.map(_.canonicalized)
withNewChildren(canonicalizedChildren)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,15 @@ case class Add(

override lazy val canonicalized: Expression = {
// TODO: do not reorder consecutive `Add`s with different `evalMode`
orderCommutative({ case Add(l, r, _) => Seq(l, r) }).reduce(Add(_, _, evalMode))
val reorderResult =
orderCommutative({ case Add(l, r, _) => Seq(l, r) }).reduce(Add(_, _, evalMode))
if (resolved && reorderResult.resolved && reorderResult.dataType == dataType) {
reorderResult
} else {
// SPARK-40903: Avoid reordering decimal Add for canonicalization if the result data type is
// changed, which may cause data checking error within ComplexTypeMergingExpression.
withCanonicalizedChildren
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2387,11 +2387,11 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
new SparkException("Foreach writer has been aborted due to a task failure")
}

def incorrectRumpUpRate(rowsPerSecond: Long,
def incorrectRampUpRate(rowsPerSecond: Long,
maxSeconds: Long,
rampUpTimeSeconds: Long): Throwable = {
new SparkRuntimeException(
errorClass = "INCORRECT_RUMP_UP_RATE",
errorClass = "INCORRECT_RAMP_UP_RATE",
messageParameters = Map(
"rowsPerSecond" -> rowsPerSecond.toString,
"maxSeconds" -> maxSeconds.toString,
Expand Down Expand Up @@ -2577,8 +2577,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
"expected" -> s"Detail message: $detailMessage"))
}

def hiveTableWithAnsiIntervalsError(tableName: String): Throwable = {
new UnsupportedOperationException(s"Hive table $tableName with ANSI intervals is not supported")
def hiveTableWithAnsiIntervalsError(tableName: String): SparkUnsupportedOperationException = {
new SparkUnsupportedOperationException(
errorClass = "_LEGACY_ERROR_TEMP_2276",
messageParameters = Map("tableName" -> tableName))
}

def cannotConvertOrcTimestampToTimestampNTZError(): Throwable = {
Expand All @@ -2602,31 +2604,47 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
maxDynamicPartitions: Int,
maxDynamicPartitionsKey: String): Throwable = {
new SparkException(
s"Number of dynamic partitions created is $numWrittenParts" +
s", which is more than $maxDynamicPartitions" +
s". To solve this try to set $maxDynamicPartitionsKey" +
s" to at least $numWrittenParts.")
errorClass = "_LEGACY_ERROR_TEMP_2277",
messageParameters = Map(
"numWrittenParts" -> numWrittenParts.toString(),
"maxDynamicPartitionsKey" -> maxDynamicPartitionsKey,
"maxDynamicPartitions" -> maxDynamicPartitions.toString(),
"numWrittenParts" -> numWrittenParts.toString()),
cause = null)
}

def invalidNumberFormatError(valueType: String, input: String, format: String): Throwable = {
new IllegalArgumentException(
s"The input $valueType '$input' does not match the given number format: '$format'")
def invalidNumberFormatError(
valueType: String, input: String, format: String): SparkIllegalArgumentException = {
new SparkIllegalArgumentException(
errorClass = "_LEGACY_ERROR_TEMP_2278",
messageParameters = Map(
"valueType" -> valueType,
"input" -> input,
"format" -> format))
}

def multipleBucketTransformsError(): Throwable = {
new UnsupportedOperationException("Multiple bucket transforms are not supported.")
def multipleBucketTransformsError(): SparkUnsupportedOperationException = {
new SparkUnsupportedOperationException(
errorClass = "_LEGACY_ERROR_TEMP_2279",
messageParameters = Map.empty)
}

def unsupportedCreateNamespaceCommentError(): Throwable = {
new SQLFeatureNotSupportedException("Create namespace comment is not supported")
def unsupportedCreateNamespaceCommentError(): SparkSQLFeatureNotSupportedException = {
new SparkSQLFeatureNotSupportedException(
errorClass = "_LEGACY_ERROR_TEMP_2280",
messageParameters = Map.empty)
}

def unsupportedRemoveNamespaceCommentError(): Throwable = {
new SQLFeatureNotSupportedException("Remove namespace comment is not supported")
def unsupportedRemoveNamespaceCommentError(): SparkSQLFeatureNotSupportedException = {
new SparkSQLFeatureNotSupportedException(
errorClass = "_LEGACY_ERROR_TEMP_2281",
messageParameters = Map.empty)
}

def unsupportedDropNamespaceRestrictError(): Throwable = {
new SQLFeatureNotSupportedException("Drop namespace restrict is not supported")
def unsupportedDropNamespaceRestrictError(): SparkSQLFeatureNotSupportedException = {
new SparkSQLFeatureNotSupportedException(
errorClass = "_LEGACY_ERROR_TEMP_2282",
messageParameters = Map.empty)
}

def timestampAddOverflowError(micros: Long, amount: Int, unit: String): ArithmeticException = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.types.{Decimal, DecimalType, IntegerType, LongType, StringType, StructField, StructType}

class CanonicalizeSuite extends SparkFunSuite {

Expand Down Expand Up @@ -187,7 +187,23 @@ class CanonicalizeSuite extends SparkFunSuite {
test("SPARK-40362: Commutative operator under BinaryComparison") {
Seq(EqualTo, EqualNullSafe, GreaterThan, LessThan, GreaterThanOrEqual, LessThanOrEqual)
.foreach { bc =>
assert(bc(Add($"a", $"b"), Literal(10)).semanticEquals(bc(Add($"b", $"a"), Literal(10))))
assert(bc(Multiply($"a", $"b"), Literal(10)).semanticEquals(
bc(Multiply($"b", $"a"), Literal(10))))
}
}

test("SPARK-40903: Only reorder decimal Add when the result data type is not changed") {
val d = Decimal(1.2)
val literal1 = Literal.create(d, DecimalType(2, 1))
val literal2 = Literal.create(d, DecimalType(2, 1))
val literal3 = Literal.create(d, DecimalType(3, 2))
assert(Add(literal1, literal2).semanticEquals(Add(literal2, literal1)))
assert(Add(Add(literal1, literal2), literal3).semanticEquals(
Add(Add(literal3, literal2), literal1)))

val literal4 = Literal.create(d, DecimalType(12, 5))
val literal5 = Literal.create(d, DecimalType(12, 6))
assert(!Add(Add(literal4, literal5), literal1).semanticEquals(
Add(Add(literal1, literal5), literal4)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import java.math.{BigDecimal => JavaBigDecimal}

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand Down Expand Up @@ -1124,7 +1124,8 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
).foreach { case (str: String, format: String) =>
val toNumberExpr = ToNumber(Literal(str), Literal(format))
assert(toNumberExpr.checkInputDataTypes() == TypeCheckResult.TypeCheckSuccess)
checkExceptionInExpression[IllegalArgumentException](

checkExceptionInExpression[SparkIllegalArgumentException](
toNumberExpr, "does not match the given number format")

val tryToNumberExpr = TryToNumber(Literal(str), Literal(format))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class RateStreamMicroBatchStream(
private val maxSeconds = Long.MaxValue / rowsPerSecond

if (rampUpTimeSeconds > maxSeconds) {
throw QueryExecutionErrors.incorrectRumpUpRate(
throw QueryExecutionErrors.incorrectRampUpRate(
rowsPerSecond, maxSeconds, rampUpTimeSeconds)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4694,8 +4694,15 @@ SELECT '' AS to_number_1, to_number('-34,338,492', '99G999G999')
-- !query schema
struct<>
-- !query output
java.lang.IllegalArgumentException
The input string '-34,338,492' does not match the given number format: '99G999G999'
org.apache.spark.SparkIllegalArgumentException
{
"errorClass" : "_LEGACY_ERROR_TEMP_2278",
"messageParameters" : {
"format" : "99G999G999",
"input" : "-34,338,492",
"valueType" : "string"
}
}


-- !query
Expand Down Expand Up @@ -4761,8 +4768,15 @@ SELECT '' AS to_number_16, to_number('123456','999G999')
-- !query schema
struct<>
-- !query output
java.lang.IllegalArgumentException
The input string '123456' does not match the given number format: '999G999'
org.apache.spark.SparkIllegalArgumentException
{
"errorClass" : "_LEGACY_ERROR_TEMP_2278",
"messageParameters" : {
"format" : "999G999",
"input" : "123456",
"valueType" : "string"
}
}


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4518,6 +4518,14 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}
}

test("SPARK-40903: Don't reorder Add for canonicalize if it is decimal type") {
val tableName = "decimalTable"
withTable(tableName) {
sql(s"create table $tableName(a decimal(12, 5), b decimal(12, 6)) using orc")
checkAnswer(sql(s"select sum(coalesce(a + b + 1.75, a)) from $tableName"), Row(null))
}
}
}

case class Foo(bar: Option[String])
Loading