Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"message" : [ "Field name %s is ambiguous and has %s matching fields in the struct." ],
"sqlState" : "42000"
},
"CONCURRENT_QUERY_ERROR" : {
"message" : [ "Another instance of this query was just started by a concurrent session." ]
},
"DIVIDE_BY_ZERO" : {
"message" : [ "divide by zero" ],
"sqlState" : "22012"
Expand All @@ -11,6 +14,13 @@
"message" : [ "Found duplicate keys '%s'" ],
"sqlState" : "23000"
},
"FAILED_RENAME_PATH" : {
"message" : [ "Failed to rename %s to %s as destination already exists" ],
"sqlState" : "22023"
},
"FAILED_SET_ORIGINAL_PERMISSION_BACK" : {
"message" : [ "Failed to set original permission %s back to the created path: %s. Exception: %s" ]
},
"GROUPING_COLUMN_MISMATCH" : {
"message" : [ "Column of grouping (%s) can't be found in grouping columns %s" ],
"sqlState" : "42000"
Expand All @@ -29,17 +39,32 @@
"message" : [ "Invalid pivot column '%s'. Pivot columns must be comparable." ],
"sqlState" : "42000"
},
"INCOMPATIBLE_DATASOURCE_REGISTER" : {
"message" : [ "Detected an incompatible DataSourceRegister. Please remove the incompatible library from classpath or upgrade it. Error: %s" ]
},
"INDEX_OUT_OF_BOUNDS" : {
"message" : [ "Index %s must be between 0 and the length of the ArrayData." ],
"sqlState" : "22023"
},
"INVALID_FIELD_NAME" : {
"message" : [ "Field name %s is invalid: %s is not a struct." ],
"sqlState" : "42000"
},
"INVALID_FRACTION_OF_SECOND" : {
"message" : [ "The fraction of sec must be zero. Valid range is [0, 60]." ],
"sqlState" : "22023"
},
"INVALID_JSON_SCHEMA_MAPTYPE" : {
"message" : [ "Input schema %s can only contain StringType as a key type for a MapType." ]
},
"MISSING_COLUMN" : {
"message" : [ "cannot resolve '%s' given input columns: [%s]" ],
"sqlState" : "42000"
},
"MISSING_METHOD" : {
"message" : [ "A method named \"%s\" is not declared in any enclosing class nor any supertype" ],
"sqlState" : "42000"
},
"MISSING_STATIC_PARTITION_COLUMN" : {
"message" : [ "Unknown static partition column: %s" ],
"sqlState" : "42000"
Expand All @@ -56,13 +81,29 @@
"message" : [ "Invalid pivot value '%s': value data type %s does not match pivot column data type %s" ],
"sqlState" : "42000"
},
"RENAME_SRC_PATH_NOT_FOUND" : {
"message" : [ "Failed to rename as %s was not found" ],
"sqlState" : "22023"
},
"SECOND_FUNCTION_ARGUMENT_NOT_INTEGER" : {
"message" : [ "The second argument of '%s' function needs to be an integer." ],
"sqlState" : "22023"
},
"UNABLE_TO_ACQUIRE_MEMORY" : {
"message" : [ "Unable to acquire %s bytes of memory, got %s" ]
},
"UNRECOGNIZED_SQL_TYPE" : {
"message" : [ "Unrecognized SQL type %s" ],
"sqlState" : "42000"
},
"UNSUPPORTED_LITERAL_TYPE" : {
"message" : [ "Unsupported literal type %s %s" ],
"sqlState" : "0A000"
},
"UNSUPPORTED_TRANSACTION_BY_JDBC_SERVER" : {
"message" : [ "The target JDBC server does not support transaction and can only support ALTER TABLE with a single action." ],
"sqlState" : "0A000"
},
"WRITING_JOB_ABORTED" : {
"message" : [ "Writing job aborted" ],
"sqlState" : "40000"
Expand Down
145 changes: 145 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.spark

import java.io.{FileNotFoundException, IOException}
import java.sql.{SQLException, SQLFeatureNotSupportedException}
import java.time.DateTimeException
import java.util.ConcurrentModificationException

import org.apache.hadoop.fs.FileAlreadyExistsException

class SparkException(
message: String,
cause: Throwable,
Expand Down Expand Up @@ -79,3 +86,141 @@ class SparkArithmeticException(errorClass: String, messageParameters: Array[Stri
override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Class not found exception thrown from Spark with an error class.
*/
class SparkClassNotFoundException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we mark these classes as private[spark]? cc @gengliangwang

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @HyukjinKwon @gengliangwang I post the follow PR. Could you take a look?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think so

errorClass: String,
messageParameters: Array[String],
cause: Throwable = null)
extends ClassNotFoundException(
SparkThrowableHelper.getMessage(errorClass, messageParameters), cause) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Concurrent modification exception thrown from Spark with an error class.
*/
class SparkConcurrentModificationException(
errorClass: String,
messageParameters: Array[String],
cause: Throwable = null)
extends ConcurrentModificationException(
SparkThrowableHelper.getMessage(errorClass, messageParameters), cause) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Datetime exception thrown from Spark with an error class.
*/
class SparkDateTimeException(errorClass: String, messageParameters: Array[String])
extends DateTimeException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Hadoop file already exists exception thrown from Spark with an error class.
*/
class SparkFileAlreadyExistsException(errorClass: String, messageParameters: Array[String])
extends FileAlreadyExistsException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* File not found exception thrown from Spark with an error class.
*/
class SparkFileNotFoundException(errorClass: String, messageParameters: Array[String])
extends FileNotFoundException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* No such method exception thrown from Spark with an error class.
*/
class SparkNoSuchMethodException(errorClass: String, messageParameters: Array[String])
extends NoSuchMethodException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Index out of bounds exception thrown from Spark with an error class.
*/
class SparkIndexOutOfBoundsException(errorClass: String, messageParameters: Array[String])
extends IndexOutOfBoundsException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* IO exception thrown from Spark with an error class.
*/
class SparkIOException(errorClass: String, messageParameters: Array[String])
extends IOException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

class SparkRuntimeException(
errorClass: String,
messageParameters: Array[String],
cause: Throwable = null)
extends RuntimeException(
SparkThrowableHelper.getMessage(errorClass, messageParameters), cause) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* Security exception thrown from Spark with an error class.
*/
class SparkSecurityException(errorClass: String, messageParameters: Array[String])
extends SecurityException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* SQL exception thrown from Spark with an error class.
*/
class SparkSQLException(errorClass: String, messageParameters: Array[String])
extends SQLException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}

/**
* SQL feature not supported exception thrown from Spark with an error class.
*/
class SparkSQLFeatureNotSupportedException(errorClass: String, messageParameters: Array[String])
extends SQLFeatureNotSupportedException(
SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable {

override def getErrorClass: String = errorClass
override def getSqlState: String = SparkThrowableHelper.getSqlState(errorClass)
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.fs.permission.FsPermission
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.InternalCompilerException

import org.apache.spark.{Partition, SparkArithmeticException, SparkException, SparkUpgradeException}
import org.apache.spark.{Partition, SparkArithmeticException, SparkClassNotFoundException, SparkConcurrentModificationException, SparkDateTimeException, SparkException, SparkFileAlreadyExistsException, SparkFileNotFoundException, SparkIndexOutOfBoundsException, SparkNoSuchMethodException, SparkRuntimeException, SparkSecurityException, SparkSQLException, SparkSQLFeatureNotSupportedException, SparkUpgradeException}
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.memory.SparkOutOfMemoryError
Expand Down Expand Up @@ -157,7 +157,7 @@ object QueryExecutionErrors {
}

def invalidFractionOfSecondError(): DateTimeException = {
new DateTimeException("The fraction of sec must be zero. Valid range is [0, 60].")
new SparkDateTimeException(errorClass = "INVALID_FRACTION_OF_SECOND", Array.empty)
}

def overflowInSumOfDecimalError(): ArithmeticException = {
Expand All @@ -179,7 +179,8 @@ object QueryExecutionErrors {
}

def literalTypeUnsupportedError(v: Any): RuntimeException = {
new RuntimeException(s"Unsupported literal type ${v.getClass} $v")
new SparkRuntimeException("UNSUPPORTED_LITERAL_TYPE",
Array(v.getClass.toString, v.toString))
}

def noDefaultForDataTypeError(dataType: DataType): RuntimeException = {
Expand Down Expand Up @@ -261,8 +262,7 @@ object QueryExecutionErrors {
}

def methodNotDeclaredError(name: String): Throwable = {
new NoSuchMethodException(s"""A method named "$name" is not declared """ +
"in any enclosing class nor any supertype")
new SparkNoSuchMethodException(errorClass = "MISSING_METHOD", Array(name))
}

def constructorNotFoundError(cls: String): Throwable = {
Expand Down Expand Up @@ -449,11 +449,7 @@ object QueryExecutionErrors {
}

def incompatibleDataSourceRegisterError(e: Throwable): Throwable = {
new ClassNotFoundException(
s"""
|Detected an incompatible DataSourceRegister. Please remove the incompatible
|library from classpath or upgrade it. Error: ${e.getMessage}
""".stripMargin, e)
new SparkClassNotFoundException("INCOMPATIBLE_DATASOURCE_REGISTER", Array(e.getMessage), e)
}

def unrecognizedFileFormatError(format: String): Throwable = {
Expand Down Expand Up @@ -675,7 +671,7 @@ object QueryExecutionErrors {
}

def unrecognizedSqlTypeError(sqlType: Int): Throwable = {
new SQLException(s"Unrecognized SQL type $sqlType")
new SparkSQLException(errorClass = "UNRECOGNIZED_SQL_TYPE", Array(sqlType.toString))
}

def unsupportedJdbcTypeError(content: String): Throwable = {
Expand All @@ -702,8 +698,8 @@ object QueryExecutionErrors {
}

def transactionUnsupportedByJdbcServerError(): Throwable = {
new SQLFeatureNotSupportedException("The target JDBC server does not support " +
"transaction and can only support ALTER TABLE with a single action.")
new SparkSQLFeatureNotSupportedException(errorClass = "UNSUPPORTED_TRANSACTION_BY_JDBC_SERVER",
Array.empty)
}

def dataTypeUnsupportedYetError(dataType: DataType): Throwable = {
Expand Down Expand Up @@ -952,8 +948,7 @@ object QueryExecutionErrors {
}

def concurrentQueryInstanceError(): Throwable = {
new ConcurrentModificationException(
"Another instance of this query was just started by a concurrent session.")
new SparkConcurrentModificationException("CONCURRENT_QUERY_ERROR", Array.empty)
}

def cannotParseJsonArraysAsStructsError(): Throwable = {
Expand Down Expand Up @@ -1233,8 +1228,7 @@ object QueryExecutionErrors {
}

def indexOutOfBoundsOfArrayDataError(idx: Int): Throwable = {
new IndexOutOfBoundsException(
s"Index $idx must be between 0 and the length of the ArrayData.")
new SparkIndexOutOfBoundsException(errorClass = "INDEX_OUT_OF_BOUNDS", Array(idx.toString))
}

def malformedRecordsDetectedInRecordParsingError(e: BadRecordException): Throwable = {
Expand Down Expand Up @@ -1354,16 +1348,17 @@ object QueryExecutionErrors {
}

def renamePathAsExistsPathError(srcPath: Path, dstPath: Path): Throwable = {
new FileAlreadyExistsException(
s"Failed to rename $srcPath to $dstPath as destination already exists")
new SparkFileAlreadyExistsException(errorClass = "FAILED_RENAME_PATH",
Array(srcPath.toString, dstPath.toString))
}

def renameAsExistsPathError(dstPath: Path): Throwable = {
new FileAlreadyExistsException(s"Failed to rename as $dstPath already exists")
}

def renameSrcPathNotFoundError(srcPath: Path): Throwable = {
new FileNotFoundException(s"Failed to rename as $srcPath was not found")
new SparkFileNotFoundException(errorClass = "RENAME_SRC_PATH_NOT_FOUND",
Array(srcPath.toString))
}

def failedRenameTempFileError(srcPath: Path, dstPath: Path): Throwable = {
Expand Down Expand Up @@ -1560,8 +1555,8 @@ object QueryExecutionErrors {
permission: FsPermission,
path: Path,
e: Throwable): Throwable = {
new SecurityException(s"Failed to set original permission $permission back to " +
s"the created path: $path. Exception: ${e.getMessage}")
new SparkSecurityException(errorClass = "FAILED_SET_ORIGINAL_PERMISSION_BACK",
Array(permission.toString, path.toString, e.getMessage))
}

def failToSetOriginalACLBackError(aclEntries: String, path: Path, e: Throwable): Throwable = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ SELECT make_timestamp(2021, 07, 11, 6, 30, 60.007)
-- !query schema
struct<>
-- !query output
java.time.DateTimeException
org.apache.spark.SparkDateTimeException
The fraction of sec must be zero. Valid range is [0, 60].


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ SELECT make_timestamp(2021, 07, 11, 6, 30, 60.007)
-- !query schema
struct<>
-- !query output
java.time.DateTimeException
org.apache.spark.SparkDateTimeException
The fraction of sec must be zero. Valid range is [0, 60].


Expand Down