Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def testCreateTableWithProperty(tbl: String): Unit = {}

def checkErrorFailedLoadTable(e: AnalysisException, tbl: String): Unit = {
checkError(
private def checkErrorFailedJDBC(
e: AnalysisException,
errorClass: String,
tbl: String): Unit = {
checkErrorMatchPVals(
exception = e,
errorClass = "FAILED_JDBC.UNCLASSIFIED",
errorClass = errorClass,
parameters = Map(
"url" -> "jdbc:",
"message" -> s"Failed to load table: $tbl"
)
"url" -> "jdbc:.*",
"tableName" -> s"`$tbl`")
)
}

Expand Down Expand Up @@ -132,7 +134,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... drop column") {
Expand All @@ -154,7 +156,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... update column type") {
Expand All @@ -170,7 +172,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... rename column") {
Expand Down Expand Up @@ -198,7 +200,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... update column nullability") {
Expand All @@ -209,7 +211,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL")
}
checkErrorFailedLoadTable(e, "not_existing_table")
checkErrorFailedJDBC(e, "FAILED_JDBC.LOAD_TABLE", "not_existing_table")
}

test("CREATE TABLE with table comment") {
Expand All @@ -231,7 +233,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $catalogName.new_table (i INT) TBLPROPERTIES('a'='1')")
}
assert(e.getErrorClass == "FAILED_JDBC.UNCLASSIFIED")
checkErrorFailedJDBC(e, "FAILED_JDBC.CREATE_TABLE", "new_table")
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1261,14 +1261,13 @@ object JdbcUtils extends Logging with SQLConfHelper {
def classifyException[T](
errorClass: String,
messageParameters: Map[String, String],
dialect: JdbcDialect,
description: String)(f: => T): T = {
dialect: JdbcDialect)(f: => T): T = {
try {
f
} catch {
case e: SparkThrowable with Throwable => throw e
case e: Throwable =>
throw dialect.classifyException(e, errorClass, messageParameters, description)
throw dialect.classifyException(e, errorClass, messageParameters)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
"url" -> jdbcOptions.getRedactUrl(),
"indexName" -> toSQLId(indexName),
"tableName" -> toSQLId(name)),
dialect = JdbcDialects.get(jdbcOptions.url),
description = s"Failed to create index $indexName in ${name()}") {
dialect = JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.createIndex(
conn, indexName, ident, columns, columnsProperties, properties, jdbcOptions)
}
Expand All @@ -91,8 +90,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt
"url" -> jdbcOptions.getRedactUrl(),
"indexName" -> toSQLId(indexName),
"tableName" -> toSQLId(name)),
dialect = JdbcDialects.get(jdbcOptions.url),
description = s"Failed to drop index $indexName in ${name()}") {
dialect = JdbcDialects.get(jdbcOptions.url)) {
JdbcUtils.dropIndex(conn, indexName, ident, jdbcOptions)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(namespace.toSeq)),
dialect,
description = s"Failed get tables from: ${namespace.mkString(".")}") {
dialect) {
conn.getMetaData.getTables(null, schemaPattern, "%", Array("TABLE"))
}
new Iterator[Identifier] {
Expand All @@ -93,8 +92,7 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect,
description = s"Failed table existence check: $ident") {
dialect) {
JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions))
}
}
Expand All @@ -120,8 +118,7 @@ class JDBCTableCatalog extends TableCatalog
"url" -> options.getRedactUrl(),
"oldName" -> toSQLId(oldIdent),
"newName" -> toSQLId(newIdent)),
dialect,
description = s"Failed table renaming from $oldIdent to $newIdent") {
dialect) {
JdbcUtils.renameTable(conn, oldIdent, newIdent, options)
}
}
Expand All @@ -136,9 +133,7 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect,
description = s"Failed to load table: $ident"
) {
dialect) {
val schema = JDBCRDD.resolveTable(optionsWithTableName)
JDBCTable(ident, schema, optionsWithTableName)
}
Expand Down Expand Up @@ -200,8 +195,7 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect,
description = s"Failed table creation: $ident") {
dialect) {
JdbcUtils.createTable(conn, getTableName(ident), schema, caseSensitive, writeOptions)
}
}
Expand All @@ -217,8 +211,7 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect,
description = s"Failed table altering: $ident") {
dialect) {
JdbcUtils.alterTable(conn, getTableName(ident), changes, options)
}
loadTable(ident)
Expand All @@ -233,8 +226,7 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(namespace.toSeq)),
dialect,
description = s"Failed namespace exists: ${namespace.mkString}") {
dialect) {
JdbcUtils.schemaExists(conn, options, db)
}
}
Expand All @@ -246,8 +238,7 @@ class JDBCTableCatalog extends TableCatalog
JdbcUtils.classifyException(
errorClass = "FAILED_JDBC.LIST_NAMESPACES",
messageParameters = Map("url" -> options.getRedactUrl()),
dialect,
description = s"Failed list namespaces") {
dialect) {
JdbcUtils.listSchemas(conn, options)
}
}
Expand Down Expand Up @@ -300,8 +291,7 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect,
description = s"Failed create name space: $db") {
dialect) {
JdbcUtils.createSchema(conn, options, db, comment)
}
}
Expand All @@ -325,8 +315,7 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect,
description = s"Failed create comment on name space: $db") {
dialect) {
JdbcUtils.alterSchemaComment(conn, options, db, set.value)
}
}
Expand All @@ -342,8 +331,7 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect,
description = s"Failed remove comment on name space: $db") {
dialect) {
JdbcUtils.removeSchemaComment(conn, options, db)
}
}
Expand All @@ -370,8 +358,7 @@ class JDBCTableCatalog extends TableCatalog
messageParameters = Map(
"url" -> options.getRedactUrl(),
"namespace" -> toSQLId(db)),
dialect,
description = s"Failed drop name space: $db") {
dialect) {
JdbcUtils.dropSchema(conn, options, db, cascade)
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper {
override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
messageParameters: Map[String, String]): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getSQLState match {
Expand All @@ -167,9 +166,9 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper {
namespace = messageParameters.get("namespace").toArray,
details = sqlException.getMessage,
cause = Some(e))
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters)
}
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ private[sql] case class H2Dialect() extends JdbcDialect {
override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
messageParameters: Map[String, String]): AnalysisException = {
e match {
case exception: SQLException =>
// Error codes are from https://www.h2database.com/javadoc/org/h2/api/ErrorCode.html
Expand Down Expand Up @@ -244,7 +243,7 @@ private[sql] case class H2Dialect() extends JdbcDialect {
}
case _ => // do nothing
}
super.classifyException(e, errorClass, messageParameters, description)
super.classifyException(e, errorClass, messageParameters)
}

override def compileExpression(expr: Expression): Option[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,15 +740,16 @@ abstract class JdbcDialect extends Serializable with Logging {
* @param e The dialect specific exception.
* @param errorClass The error class assigned in the case of an unclassified `e`
* @param messageParameters The message parameters of `errorClass`
* @param description The error description
* @return `AnalysisException` or its sub-class.
*/
def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code logic, it seems that there is no need for the field description to exist. Let's remove it.
Although JdbcDialect is marked as DeveloperApi, this method has been added from Spark version 4.0 and the version 4.0 has not been released yet.
Can we directly remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there was a reason to call the legacy classifyException method here. Can we dig into it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am investigating history.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guarantees pre-implemented legacy classifyExceptions from third-party to be correctly called

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's right, see PR below:
1.#44358, the modification of this PR resulted in break changes, and with the suggestion of cloud-fan:
image
the following PR has been created.

2.#44449, this PR has restored compatibility behavior
image

classifyException(description, e)
messageParameters: Map[String, String]): AnalysisException = {
new AnalysisException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalysisException seems unreasonable here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should already be in the execution phase, so it's not reasonable.
Let me go through the history first.

errorClass = errorClass,
messageParameters = messageParameters,
cause = Some(e))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,7 @@ private case class MsSqlServerDialect() extends JdbcDialect {
override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
messageParameters: Map[String, String]): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
Expand All @@ -216,9 +215,9 @@ private case class MsSqlServerDialect() extends JdbcDialect {
namespace = messageParameters.get("namespace").toArray,
details = sqlException.getMessage,
cause = Some(e))
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters)
}
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper {
override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
messageParameters: Map[String, String]): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getErrorCode match {
Expand All @@ -345,10 +344,10 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper {
val indexName = messageParameters("indexName")
val tableName = messageParameters("tableName")
throw new NoSuchIndexException(indexName, tableName, cause = Some(e))
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters)
}
case unsupported: UnsupportedOperationException => throw unsupported
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,7 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {
override def classifyException(
e: Throwable,
errorClass: String,
messageParameters: Map[String, String],
description: String): AnalysisException = {
messageParameters: Map[String, String]): AnalysisException = {
e match {
case sqlException: SQLException =>
sqlException.getSQLState match {
Expand All @@ -278,7 +277,7 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {
if (tblRegexp.nonEmpty) {
throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1))
} else {
super.classifyException(e, errorClass, messageParameters, description)
super.classifyException(e, errorClass, messageParameters)
}
}
case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" =>
Expand All @@ -290,10 +289,10 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper {
namespace = messageParameters.get("namespace").toArray,
details = sqlException.getMessage,
cause = Some(e))
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters)
}
case unsupported: UnsupportedOperationException => throw unsupported
case _ => super.classifyException(e, errorClass, messageParameters, description)
case _ => super.classifyException(e, errorClass, messageParameters)
}
}

Expand Down
Loading